Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SRV records for memcache clients #253

Merged
merged 1 commit into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,9 @@ Experimental Memcache support has been added as an alternative to Redis in v1.5.

To configure a Memcache instance use the following environment variables instead of the Redis variables:

1. `MEMCACHE_HOST_PORT=`: a comma separated list of hostname:port pairs for memcache nodes.
1. `MEMCACHE_HOST_PORT=`: a comma separated list of hostname:port pairs for memcache nodes (mutually exclusive with `MEMCACHE_SRV`)
1. `MEMCACHE_SRV=`: an SRV record to lookup hosts from (mutually exclusive with `MEMCACHE_HOST_PORT`)
1. `MEMCACHE_SRV_REFRESH=0`: refresh the list of hosts every n seconds, if 0 no refreshing will happen, supports duration suffixes: "ns", "us" (or "µs"), "ms", "s", "m", "h".
1. `BACKEND_TYPE=memcache`
1. `CACHE_KEY_PREFIX`: a string to prepend to all cache keys
1. `MEMCACHE_MAX_IDLE_CONNS=2`: the maximum number of idle TCP connections per memcache node, `2` is the default of the underlying library
Expand Down
70 changes: 67 additions & 3 deletions src/memcached/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math/rand"
"strconv"
"sync"
"time"

"github.com/coocood/freecache"
stats "github.com/lyft/gostats"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/envoyproxy/ratelimit/src/config"
"github.com/envoyproxy/ratelimit/src/limiter"
"github.com/envoyproxy/ratelimit/src/settings"
"github.com/envoyproxy/ratelimit/src/srv"
"github.com/envoyproxy/ratelimit/src/utils"
)

Expand Down Expand Up @@ -173,6 +175,70 @@ func (this *rateLimitMemcacheImpl) Flush() {
this.waitGroup.Wait()
}

func refreshServersPeriodically(serverList memcache.ServerList, srv string, d time.Duration, finish <-chan struct{}) {
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-t.C:
err := refreshServers(serverList, srv)
if err != nil {
logger.Warn("failed to refresh memcahce hosts")
} else {
logger.Debug("refreshed memcache hosts")
}
case <-finish:
return
}
}
}

func refreshServers(serverList memcache.ServerList, srv_ string) error {
servers, err := srv.ServerStringsFromSrv(srv_)
if err != nil {
return err
}
err = serverList.SetServers(servers...)
if err != nil {
return err
}
return nil
}

func newMemcachedFromSrv(srv_ string, d time.Duration) Client {
serverList := new(memcache.ServerList)
err := refreshServers(*serverList, srv_)
if err != nil {
errorText := "Unable to fetch servers from SRV"
logger.Errorf(errorText)
panic(MemcacheError(errorText))
}

if d > 0 {
logger.Infof("refreshing memcache hosts every: %v milliseconds", d.Milliseconds())
finish := make(chan struct{})
go refreshServersPeriodically(*serverList, srv_, d, finish)
} else {
logger.Debugf("not periodically refreshing memcached hosts")
}

return memcache.NewFromSelector(serverList)
}

func newMemcacheFromSettings(s settings.Settings) Client {
if s.MemcacheSrv != "" && len(s.MemcacheHostPort) > 0 {
panic(MemcacheError("Both MEMCADHE_HOST_PORT and MEMCACHE_SRV are set"))
}
if s.MemcacheSrv != "" {
logger.Debugf("Using MEMCACHE_SRV: %v", s.MemcacheSrv)
return newMemcachedFromSrv(s.MemcacheSrv, s.MemcacheSrvRefresh)
}
logger.Debugf("Usng MEMCACHE_HOST_PORT:: %v", s.MemcacheHostPort)
client := memcache.New(s.MemcacheHostPort...)
client.MaxIdleConns = s.MemcacheMaxIdleConns
return client
}

func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRand *rand.Rand,
expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope, nearLimitRatio float32, cacheKeyPrefix string) limiter.RateLimitCache {
return &rateLimitMemcacheImpl{
Expand All @@ -188,10 +254,8 @@ func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRan

func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource utils.TimeSource, jitterRand *rand.Rand,
localCache *freecache.Cache, scope stats.Scope) limiter.RateLimitCache {
var client = memcache.New(s.MemcacheHostPort...)
client.MaxIdleConns = s.MemcacheMaxIdleConns
return NewRateLimitCacheImpl(
CollectStats(client, scope.Scope("memcache")),
CollectStats(newMemcacheFromSettings(s), scope.Scope("memcache")),
timeSource,
jitterRand,
s.ExpirationJitterMaxSeconds,
Expand Down
7 changes: 7 additions & 0 deletions src/memcached/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import (
"github.com/bradfitz/gomemcache/memcache"
)

// Errors that may be raised during config parsing.
type MemcacheError string

func (e MemcacheError) Error() string {
return string(e)
}

var _ Client = (*memcache.Client)(nil)

// Interface for memcached, used for mocking.
Expand Down
4 changes: 3 additions & 1 deletion src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ type Settings struct {
// number of connections to memcache kept idle in pool, if a connection is needed but none
// are idle a new connection is opened, used and closed and can be left in a time-wait state
// which can result in high CPU usage.
MemcacheMaxIdleConns int `envconfig:"MEMCACHE_MAX_IDLE_CONNS" default:"2"`
MemcacheMaxIdleConns int `envconfig:"MEMCACHE_MAX_IDLE_CONNS" default:"2"`
MemcacheSrv string `envconfig:"MEMCACHE_SRV" default:""`
MemcacheSrvRefresh time.Duration `envconfig:"MEMCACHE_SRV_REFRESH" default:"0"`
}

type Option func(*Settings)
Expand Down
49 changes: 49 additions & 0 deletions src/srv/srv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package srv

import (
"errors"
"fmt"
"net"
"regexp"

logger "github.com/sirupsen/logrus"
)

var srvRegex = regexp.MustCompile(`^_(.+?)\._(.+?)\.(.+)$`)

func ParseSrv(srv string) (string, string, string, error) {
matches := srvRegex.FindStringSubmatch(srv)
if matches == nil {
errorText := fmt.Sprintf("could not parse %s to SRV parts", srv)
logger.Errorf(errorText)
return "", "", "", errors.New(errorText)
}
return matches[1], matches[2], matches[3], nil
}

func ServerStringsFromSrv(srv string) ([]string, error) {
service, proto, name, err := ParseSrv(srv)

if err != nil {
logger.Errorf("failed to parse SRV: %s", err)
return nil, err
}

_, srvs, err := net.LookupSRV(service, proto, name)

if err != nil {
logger.Errorf("failed to lookup SRV: %s", err)
return nil, err
}

logger.Debugf("found %v servers(s) from SRV", len(srvs))

serversFromSrv := make([]string, len(srvs))
for i, srv := range srvs {
server := fmt.Sprintf("%s:%v", srv.Target, srv.Port)
logger.Debugf("server from srv[%v]: %s", i, server)
serversFromSrv[i] = fmt.Sprintf("%s:%v", srv.Target, srv.Port)
}

return serversFromSrv, nil
}
36 changes: 36 additions & 0 deletions test/memcached/cache_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/envoyproxy/ratelimit/src/config"
"github.com/envoyproxy/ratelimit/src/limiter"
"github.com/envoyproxy/ratelimit/src/memcached"
"github.com/envoyproxy/ratelimit/src/settings"
"github.com/envoyproxy/ratelimit/src/utils"
stats "github.com/lyft/gostats"

Expand Down Expand Up @@ -583,6 +584,41 @@ func TestMemcacheAdd(t *testing.T) {
cache.Flush()
}

func TestNewRateLimitCacheImplFromSettingsWhenSrvCannotBeResolved(t *testing.T) {
assert := assert.New(t)
controller := gomock.NewController(t)
defer controller.Finish()

timeSource := mock_utils.NewMockTimeSource(controller)
statsStore := stats.NewStore(stats.NewNullSink(), false)

var s settings.Settings
s.NearLimitRatio = 0.8
s.CacheKeyPrefix = ""
s.ExpirationJitterMaxSeconds = 300
s.MemcacheSrv = "_something._tcp.example.invalid"

assert.Panics(func() { memcached.NewRateLimitCacheImplFromSettings(s, timeSource, nil, nil, statsStore) })
}

func TestNewRateLimitCacheImplFromSettingsWhenHostAndPortAndSrvAreBothSet(t *testing.T) {
assert := assert.New(t)
controller := gomock.NewController(t)
defer controller.Finish()

timeSource := mock_utils.NewMockTimeSource(controller)
statsStore := stats.NewStore(stats.NewNullSink(), false)

var s settings.Settings
s.NearLimitRatio = 0.8
s.CacheKeyPrefix = ""
s.ExpirationJitterMaxSeconds = 300
s.MemcacheSrv = "_something._tcp.example.invalid"
s.MemcacheHostPort = []string{"example.org:11211"}

assert.Panics(func() { memcached.NewRateLimitCacheImplFromSettings(s, timeSource, nil, nil, statsStore) })
}

func getMultiResult(vals map[string]int) map[string]*memcache.Item {
result := make(map[string]*memcache.Item, len(vals))
for k, v := range vals {
Expand Down
56 changes: 56 additions & 0 deletions test/srv/srv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package srv

import (
"errors"
"net"
"testing"

"github.com/envoyproxy/ratelimit/src/srv"
"github.com/stretchr/testify/assert"
)

func TestParseSrv(t *testing.T) {
service, proto, name, err := srv.ParseSrv("_something._tcp.example.org.")
assert.Equal(t, service, "something")
assert.Equal(t, proto, "tcp")
assert.Equal(t, name, "example.org.")
assert.Nil(t, err)

service, proto, name, err = srv.ParseSrv("_something-else._udp.example.org")
assert.Equal(t, service, "something-else")
assert.Equal(t, proto, "udp")
assert.Equal(t, name, "example.org")
assert.Nil(t, err)

_, _, _, err = srv.ParseSrv("example.org")
assert.Equal(t, err, errors.New("could not parse example.org to SRV parts"))
}

func TestServerStringsFromSrvWhenSrvIsNotWellFormed(t *testing.T) {
_, err := srv.ServerStringsFromSrv("example.org")
assert.Equal(t, err, errors.New("could not parse example.org to SRV parts"))
}

func TestServerStringsFromSevWhenSrvIsWellFormedButNotLookupable(t *testing.T) {
_, err := srv.ServerStringsFromSrv("_something._tcp.example.invalid")
var e *net.DNSError
if errors.As(err, &e) {
assert.Equal(t, e.Err, "no such host")
assert.Equal(t, e.Name, "_something._tcp.example.invalid")
assert.False(t, e.IsTimeout)
assert.False(t, e.IsTemporary)
assert.True(t, e.IsNotFound)
} else {
t.Fail()
}
}

func TestServerStrings(t *testing.T) {
// it seems reasonable to think _xmpp-server._tcp.gmail.com will be available for a long time!
servers, err := srv.ServerStringsFromSrv("_xmpp-server._tcp.gmail.com.")
assert.True(t, len(servers) > 0)
for _, s := range servers {
assert.Regexp(t, `^.*xmpp-server.*google.com.:\d+$`, s)
}
assert.Nil(t, err)
}