Skip to content

Updated Dependency Management and add a new function to list all keys across all nodes in the Memcached cluster. #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions elasticache/elasticache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/bradfitz/gomemcache/memcache"
"github.com/integralist/go-findroot/find"
"github.com/thompsonlabs/go-elasticache/lister"
)

// Node is a single ElastiCache node
Expand All @@ -29,6 +30,7 @@ type Item memcache.Item
// Client embeds the memcache client so we can hide those details away
type Client struct {
*memcache.Client
clusterNodeLister *lister.ClusterNodesKeyLister
}

// Set abstracts the memcache client details away,
Expand All @@ -45,6 +47,10 @@ func (c *Client) Set(item *Item) error {

var logger *log.Logger

//new var to hold endpoint environment variable name this instance should use.
//this allows each instance of this class to be associated with their own endpoint.
var endpointEnvironmentVarName string

func init() {
logger = log.New(os.Stdout, "go-elasticache: ", log.Ldate|log.Ltime|log.Lshortfile)

Expand All @@ -67,12 +73,40 @@ func init() {

// New returns an instance of the memcache client
func New() (*Client, error) {
endpointEnvironmentVarName = "ELASTICACHE_ENDPOINT"
urls, err := clusterNodes()
if err != nil {
return &Client{Client: memcache.New()}, err
}

return &Client{Client: memcache.New(urls...)}, nil
return &Client{Client: memcache.New(urls...),
clusterNodeLister: lister.NewClusterNodeKeyLister(urls)}, nil
}

// NewInstance - returns an instance of the memcache client, this alternative constructor
// allows an endpoint environment variable to be specified specific to this
// instance. Where a value is not provided the default value: ELASTICACHE_ENDPOINT
// will be used.
func NewInstance(endpointEnvVarName string) (*Client, error) {
if len(endpointEnvVarName) < 1 {
endpointEnvironmentVarName = "ELASTICACHE_ENDPOINT"
} else {
endpointEnvironmentVarName = endpointEnvVarName
}

urls, err := clusterNodes()
if err != nil {
return &Client{Client: memcache.New()}, err
}

return &Client{Client: memcache.New(urls...),
clusterNodeLister: lister.NewClusterNodeKeyLister(urls)}, nil
}

//Lists all keys stored accross all nodes in the Cluster.
func (c *Client) ListAllKeys() (*[]string, error) {

return c.clusterNodeLister.ListAllHostKeys()
}

func clusterNodes() ([]string, error) {
Expand Down Expand Up @@ -107,7 +141,7 @@ func clusterNodes() ([]string, error) {
func elasticache() (string, error) {
var endpoint string

endpoint = os.Getenv("ELASTICACHE_ENDPOINT")
endpoint = os.Getenv(endpointEnvironmentVarName)
if len(endpoint) == 0 {
logger.Println("ElastiCache endpoint not set")
return "", errors.New("ElastiCache endpoint not set")
Expand Down
12 changes: 0 additions & 12 deletions glide.lock

This file was deleted.

4 changes: 0 additions & 4 deletions glide.yaml

This file was deleted.

Binary file added go-elasticache
Binary file not shown.
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/thompsonlabs/go-elasticache

go 1.16

require (
github.com/bradfitz/gomemcache v0.0.0-20170208213004-1952afaa557d
github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/bradfitz/gomemcache v0.0.0-20170208213004-1952afaa557d h1:7IjN4QP3c38xhg6wz8R3YjoU+6S9e7xBc0DAVLLIpHE=
github.com/bradfitz/gomemcache v0.0.0-20170208213004-1952afaa557d/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60=
github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc h1:4IZpk3M4m6ypx0IlRoEyEyY1gAdicWLMQ0NcG/gBnnA=
github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc/go.mod h1:UlaC6ndby46IJz9m/03cZPKKkR9ykeIVBBDE3UDBdJk=
134 changes: 134 additions & 0 deletions lister/ClusterNodeKeyLister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package lister

import (
"bufio"
"fmt"
"log"
"net"
"regexp"
"strconv"
)

/**
Utility component that iterates over all nodes in the memchached
cluster and lists all their respective keys.
*/
type ClusterNodesKeyLister struct {
clusterNodeUrls []string
}

//NewClusterNodeKeyLister - Returns a new ClusterNodeKeyLister instance.
func NewClusterNodeKeyLister(nodeUrls []string) *ClusterNodesKeyLister {

return &ClusterNodesKeyLister{

clusterNodeUrls: nodeUrls,
}

}

//ListAllHostKeys - Lists all keys associated with all nodes in the cluster.
func (cnkl *ClusterNodesKeyLister) ListAllHostKeys() (*[]string, error) {

allClusterNodeKeys := make([]string, 0)

for _, currentNode := range cnkl.clusterNodeUrls {

currentNodeKeys, err := cnkl.listHostKeys(currentNode)

if err != nil {

log.Println("An error occured whilst attempting to fetch keys for node: " + currentNode + " " + err.Error())

} else {

for _, currentNodeCurrentKey := range *currentNodeKeys {

allClusterNodeKeys = append(allClusterNodeKeys, currentNodeCurrentKey)
}
}

}

return &allClusterNodeKeys, nil
}

//private functions.

func (cnkl *ClusterNodesKeyLister) getNewConnection(hostAndPortNumber string) (net.Conn, error) {

conn, err := net.Dial("tcp", hostAndPortNumber)

if err != nil {

return nil, err
}

return conn, nil

}

func (cnkl *ClusterNodesKeyLister) dispatchRequestAndReadResponse(connection net.Conn, command string, responseDelimiters []string) []string {
fmt.Fprintf(connection, command)
scanner := bufio.NewScanner(connection)
var result []string

OUTER:
for scanner.Scan() {
line := scanner.Text()
for _, delimeter := range responseDelimiters {
if line == delimeter {
break OUTER
}
}
result = append(result, line)
// if there is no delimiter specified, then the response is just a single line and we should return after
// reading that first line (e.g. version command)
if len(responseDelimiters) == 0 {
break OUTER
}
}
return result
}

func (cnkl *ClusterNodesKeyLister) listHostKeys(aHostAddressAndPort string) (*[]string, error) {
keys := make([]string, 0)
conn, err := cnkl.getNewConnection(aHostAddressAndPort)
if err != nil {

log.Println("An error occured whilst attempting to connect to Memcached cluster node at: " + aHostAddressAndPort + " " + err.Error())

return nil, err
}

//result := client.executer.execute("stats items\r\n", []string{"END"})
result := cnkl.dispatchRequestAndReadResponse(conn, "stats items\r\n", []string{"END"})

// identify all slabs and their number of items by parsing the 'stats items' command
r, _ := regexp.Compile("STAT items:([0-9]*):number ([0-9]*)")
slabCounts := map[int]int{}
for _, stat := range result {
matches := r.FindStringSubmatch(stat)
if len(matches) == 3 {
slabId, _ := strconv.Atoi(matches[1])
slabItemCount, _ := strconv.Atoi(matches[2])
slabCounts[slabId] = slabItemCount
}
}

// For each slab, dump all items and add each key to the `keys` slice
r, _ = regexp.Compile("ITEM (.*?) .*")
for slabId, slabCount := range slabCounts {
command := fmt.Sprintf("stats cachedump %v %v\n", slabId, slabCount)
//commandResult := client.executer.execute(command, []string{"END"})
commandResult := cnkl.dispatchRequestAndReadResponse(conn, command, []string{"END"})
for _, item := range commandResult {
matches := r.FindStringSubmatch(item)
keys = append(keys, matches[1])
}
}

conn.Close()

return &keys, nil
}
3 changes: 0 additions & 3 deletions vendor/github.com/bradfitz/gomemcache/.gitignore

This file was deleted.

Loading