From db2f642f33da3d82620b10207af5b9975c35ee67 Mon Sep 17 00:00:00 2001 From: petedmarsh Date: Tue, 25 May 2021 19:29:18 +0200 Subject: [PATCH 1/7] Add env var to configure max idle connections per memcache node (#246) The underlying memcache client library allows this to be configured, and currently defaults to a value of 2, see: https://github.com/bradfitz/gomemcache/blob/master/memcache/memcache.go#L72 https://github.com/bradfitz/gomemcache/blob/master/memcache/memcache.go#L145 https://github.com/bradfitz/gomemcache/blob/master/memcache/memcache.go#L239 This allows this value to be configured by a new environmet variable: MEMCACHE_MAX_IDLE_CONNS which defaults to -1 meaning the default from the library will apply (which is the current behaviour). Signed-off-by: Peter Marsh --- README.md | 1 + src/memcached/cache_impl.go | 4 +++- src/settings/settings.go | 6 ++++++ test/integration/integration_test.go | 15 +++++++++++++++ 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a584edc0..c3c74b6f 100644 --- a/README.md +++ b/README.md @@ -585,6 +585,7 @@ To configure a Memcache instance use the following environment variables instead 1. `MEMCACHE_HOST_PORT=`: a comma separated list of hostname:port pairs for memcache nodes. 1. `BACKEND_TYPE=memcache` 1. `CACHE_KEY_PREFIX`: a string to prepend to all cache keys +1. `MEMCACHE_MAX_IDLE_CONNS=2`: the maximum number of idle TCP connections per memcache node, `2` is the default of the underlying library With memcache mode increments will happen asynchronously, so it's technically possible for a client to exceed quota briefly if multiple requests happen at exactly the same time. diff --git a/src/memcached/cache_impl.go b/src/memcached/cache_impl.go index 1e7b0b69..b9ecdd87 100644 --- a/src/memcached/cache_impl.go +++ b/src/memcached/cache_impl.go @@ -188,8 +188,10 @@ func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRan func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource utils.TimeSource, jitterRand *rand.Rand, localCache *freecache.Cache, scope stats.Scope) limiter.RateLimitCache { + var client = memcache.New(s.MemcacheHostPort...) + client.MaxIdleConns = s.MemcacheMaxIdleConns return NewRateLimitCacheImpl( - CollectStats(memcache.New(s.MemcacheHostPort...), scope.Scope("memcache")), + CollectStats(client, scope.Scope("memcache")), timeSource, jitterRand, s.ExpirationJitterMaxSeconds, diff --git a/src/settings/settings.go b/src/settings/settings.go index 82b8ebe5..642d8d75 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -68,6 +68,12 @@ type Settings struct { // Memcache settings MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""` + // MemcacheMaxIdleConns sets the maximum number of idle TCP connections per memcached node. + // The default is 2 as that is the default of the underlying library. This is the maximum + // number of connections to memcache kept idle in pool, if a connection is needed but none + // are idle a new connection is opened, used and closed and can be left in a time-wait state + // which can result in high CPU usage. + MemcacheMaxIdleConns int `envconfig:"MEMCACHE_MAX_IDLE_CONNS" default:"2"` } type Option func(*Settings) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index da7cfa6d..249d9b2c 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -209,6 +209,21 @@ func TestBasicConfigMemcache(t *testing.T) { }) } +func TestConfigMemcacheWithMaxIdleConns(t *testing.T) { + singleNodePort := []int{6394} + assert := assert.New(t) + common.WithMultiMemcache(t, []common.MemcacheConfig{ + {Port: 6394}, + }, func() { + withDefaultMaxIdleConns := makeSimpleMemcacheSettings(singleNodePort, 0) + assert.Equal(2, withDefaultMaxIdleConns.MemcacheMaxIdleConns) + t.Run("MemcacheWithDefaultMaxIdleConns", testBasicConfig(withDefaultMaxIdleConns)) + withSpecifiedMaxIdleConns := makeSimpleMemcacheSettings(singleNodePort, 0) + withSpecifiedMaxIdleConns.MemcacheMaxIdleConns = 100 + t.Run("MemcacheWithSpecifiedMaxIdleConns", testBasicConfig(withSpecifiedMaxIdleConns)) + }) +} + func TestMultiNodeMemcache(t *testing.T) { multiNodePorts := []int{6494, 6495} common.WithMultiMemcache(t, []common.MemcacheConfig{ From 1f0aec56b6fbeebecd27daf7e97f28efde634ab7 Mon Sep 17 00:00:00 2001 From: devincd <505259926@qq.com> Date: Wed, 26 May 2021 01:32:27 +0800 Subject: [PATCH 2/7] delete unuseful code (#254) Signed-off-by: devincd <505259926@qq.com> --- src/redis/driver_impl.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 18e213f1..f6449ea5 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -59,14 +59,10 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth string, redisType string df := func(network, addr string) (radix.Conn, error) { var dialOpts []radix.DialOpt - var err error if useTls { dialOpts = append(dialOpts, radix.DialUseTLS(&tls.Config{})) } - if err != nil { - return nil, err - } if auth != "" { logger.Warnf("enabling authentication to redis on %s", url) From 83a222a6cd94efe7aa93084937395e8c01a93bcc Mon Sep 17 00:00:00 2001 From: petedmarsh Date: Tue, 25 May 2021 19:32:44 +0200 Subject: [PATCH 3/7] Hook up /debug/pprof/trace (#249) Signed-off-by: Peter Marsh --- src/server/server_impl.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/server/server_impl.go b/src/server/server_impl.go index b60d1e32..0afd443a 100644 --- a/src/server/server_impl.go +++ b/src/server/server_impl.go @@ -254,6 +254,14 @@ func newServer(s settings.Settings, name string, store stats.Store, localCache * }) }) + // setup trace endpoint + ret.AddDebugHttpEndpoint( + "/debug/pprof/trace", + "trace endpoint", + func(writer http.ResponseWriter, request *http.Request) { + pprof.Trace(writer, request) + }) + // setup debug root ret.debugListener.debugMux.HandleFunc( "/", From bc0e9faec3397a6dad9b27fc983be8a229306ff6 Mon Sep 17 00:00:00 2001 From: Sunjay Bhatia <5337253+sunjayBhatia@users.noreply.github.com> Date: Tue, 25 May 2021 13:33:49 -0400 Subject: [PATCH 4/7] GRPC, HTTP, and Debug server listen addresses fully configurable (#252) - servers listen addresses are configurable via environment variable - matches port configurability providing *HOST environment variables Fixes #245 Signed-off-by: Sunjay Bhatia --- README.md | 2 +- src/server/server_impl.go | 30 ++++++++++++++---------------- src/settings/settings.go | 11 +++++++---- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index c3c74b6f..1715b7ad 100644 --- a/README.md +++ b/README.md @@ -501,7 +501,7 @@ $ curl 0:6070/ /stats: print out stats ``` -You can specify the debug port with the `DEBUG_PORT` environment variable. It defaults to `6070`. +You can specify the debug server address with the `DEBUG_HOST` and `DEBUG_PORT` environment variables. They currently default to `0.0.0.0` and `6070` respectively. # Local Cache diff --git a/src/server/server_impl.go b/src/server/server_impl.go index 0afd443a..e77df069 100644 --- a/src/server/server_impl.go +++ b/src/server/server_impl.go @@ -9,6 +9,7 @@ import ( "net/http/pprof" "path/filepath" "sort" + "strconv" "sync" "os" @@ -39,9 +40,9 @@ type serverDebugListener struct { } type server struct { - port int - grpcPort int - debugPort int + httpAddress string + grpcAddress string + debugAddress string router *mux.Router grpcServer *grpc.Server store stats.Store @@ -114,11 +115,10 @@ func (server *server) GrpcServer() *grpc.Server { func (server *server) Start() { go func() { - addr := fmt.Sprintf(":%d", server.debugPort) - logger.Warnf("Listening for debug on '%s'", addr) + logger.Warnf("Listening for debug on '%s'", server.debugAddress) var err error server.listenerMu.Lock() - server.debugListener.listener, err = reuseport.Listen("tcp", addr) + server.debugListener.listener, err = reuseport.Listen("tcp", server.debugAddress) server.listenerMu.Unlock() if err != nil { @@ -133,9 +133,8 @@ func (server *server) Start() { server.handleGracefulShutdown() - addr := fmt.Sprintf(":%d", server.port) - logger.Warnf("Listening for HTTP on '%s'", addr) - list, err := reuseport.Listen("tcp", addr) + logger.Warnf("Listening for HTTP on '%s'", server.httpAddress) + list, err := reuseport.Listen("tcp", server.httpAddress) if err != nil { logger.Fatalf("Failed to open HTTP listener: '%+v'", err) } @@ -151,9 +150,8 @@ func (server *server) Start() { } func (server *server) startGrpc() { - addr := fmt.Sprintf(":%d", server.grpcPort) - logger.Warnf("Listening for gRPC on '%s'", addr) - lis, err := reuseport.Listen("tcp", addr) + logger.Warnf("Listening for gRPC on '%s'", server.grpcAddress) + lis, err := reuseport.Listen("tcp", server.grpcAddress) if err != nil { logger.Fatalf("Failed to listen for gRPC: %v", err) } @@ -180,10 +178,10 @@ func newServer(s settings.Settings, name string, store stats.Store, localCache * ret := new(server) ret.grpcServer = grpc.NewServer(s.GrpcUnaryInterceptor) - // setup ports - ret.port = s.Port - ret.grpcPort = s.GrpcPort - ret.debugPort = s.DebugPort + // setup listen addresses + ret.httpAddress = net.JoinHostPort(s.Host, strconv.Itoa(s.Port)) + ret.grpcAddress = net.JoinHostPort(s.GrpcHost, strconv.Itoa(s.GrpcPort)) + ret.debugAddress = net.JoinHostPort(s.DebugHost, strconv.Itoa(s.DebugPort)) // setup stats ret.store = store diff --git a/src/settings/settings.go b/src/settings/settings.go index 642d8d75..4c58894b 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -10,10 +10,13 @@ import ( type Settings struct { // runtime options GrpcUnaryInterceptor grpc.ServerOption - // env config - Port int `envconfig:"PORT" default:"8080"` - GrpcPort int `envconfig:"GRPC_PORT" default:"8081"` - DebugPort int `envconfig:"DEBUG_PORT" default:"6070"` + // Server listen address config + Host string `envconfig:"HOST" default:"0.0.0.0"` + Port int `envconfig:"PORT" default:"8080"` + GrpcHost string `envconfig:"GRPC_HOST" default:"0.0.0.0"` + GrpcPort int `envconfig:"GRPC_PORT" default:"8081"` + DebugHost string `envconfig:"DEBUG_HOST" default:"0.0.0.0"` + DebugPort int `envconfig:"DEBUG_PORT" default:"6070"` // Logging settings LogLevel string `envconfig:"LOG_LEVEL" default:"WARN"` From 60c8eb07c16fb1df6ce21b4b79b398a806482b68 Mon Sep 17 00:00:00 2001 From: Sunjay Bhatia <5337253+sunjayBhatia@users.noreply.github.com> Date: Tue, 25 May 2021 17:30:00 -0400 Subject: [PATCH 5/7] Fix flaky test TestServiceLegacy (#258) User deferred barrier.signal() so panic definitely occurs before we continue on in test. Config reload uses recover() and increments config load counter, tests were failing to see config load error counter increment. Fixes: #256 Signed-off-by: Sunjay Bhatia --- test/service/ratelimit_legacy_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/service/ratelimit_legacy_test.go b/test/service/ratelimit_legacy_test.go index a51ddbe9..d90f202e 100644 --- a/test/service/ratelimit_legacy_test.go +++ b/test/service/ratelimit_legacy_test.go @@ -121,7 +121,7 @@ func TestServiceLegacy(test *testing.T) { t.configLoader.EXPECT().Load( []config.RateLimitConfigToLoad{{"config.basic_config", "fake_yaml"}}, gomock.Any()).Do( func([]config.RateLimitConfigToLoad, stats.Scope) { - barrier.signal() + defer barrier.signal() panic(config.RateLimitConfigError("load error")) }) t.runtimeUpdateCallback <- 1 From 60ba180ec1d357e7726ae5b9ad630967d87cf510 Mon Sep 17 00:00:00 2001 From: petedmarsh Date: Thu, 27 May 2021 18:54:46 +0200 Subject: [PATCH 6/7] Add support for SRV records for memcache clients (#253) This allows MEMCAHE_SRV to be specified as an SRV record from which multiple memcache hosts can be resolved. For example: MEMCACHE_SRV=_memcache._tcp.mylovelydomain.com This can be used instead of MEMCACHE_HOST_PORT. This will then be resolved and whatever set of servers it represents will be used as the set of memcache servers to connect to. At this stage neither priority or weight is supported, though weight could be fairly straightforwardly in future. The SRV can be polled periodically for new servers by setting the following env var (with 0 meaning "never check"): MEMCACHE_SRV_REFRESH=600s # supports standard go durations Signed-off-by: Peter Marsh --- README.md | 4 +- src/memcached/cache_impl.go | 70 +++++++++++++++++++++++++++++-- src/memcached/client.go | 7 ++++ src/settings/settings.go | 4 +- src/srv/srv.go | 49 ++++++++++++++++++++++ test/memcached/cache_impl_test.go | 36 ++++++++++++++++ test/srv/srv_test.go | 56 +++++++++++++++++++++++++ 7 files changed, 221 insertions(+), 5 deletions(-) create mode 100644 src/srv/srv.go create mode 100644 test/srv/srv_test.go diff --git a/README.md b/README.md index 1715b7ad..6e4cdee5 100644 --- a/README.md +++ b/README.md @@ -582,7 +582,9 @@ Experimental Memcache support has been added as an alternative to Redis in v1.5. To configure a Memcache instance use the following environment variables instead of the Redis variables: -1. `MEMCACHE_HOST_PORT=`: a comma separated list of hostname:port pairs for memcache nodes. +1. `MEMCACHE_HOST_PORT=`: a comma separated list of hostname:port pairs for memcache nodes (mutually exclusive with `MEMCACHE_SRV`) +1. `MEMCACHE_SRV=`: an SRV record to lookup hosts from (mutually exclusive with `MEMCACHE_HOST_PORT`) +1. `MEMCACHE_SRV_REFRESH=0`: refresh the list of hosts every n seconds, if 0 no refreshing will happen, supports duration suffixes: "ns", "us" (or "µs"), "ms", "s", "m", "h". 1. `BACKEND_TYPE=memcache` 1. `CACHE_KEY_PREFIX`: a string to prepend to all cache keys 1. `MEMCACHE_MAX_IDLE_CONNS=2`: the maximum number of idle TCP connections per memcache node, `2` is the default of the underlying library diff --git a/src/memcached/cache_impl.go b/src/memcached/cache_impl.go index b9ecdd87..de3e095d 100644 --- a/src/memcached/cache_impl.go +++ b/src/memcached/cache_impl.go @@ -20,6 +20,7 @@ import ( "math/rand" "strconv" "sync" + "time" "github.com/coocood/freecache" stats "github.com/lyft/gostats" @@ -33,6 +34,7 @@ import ( "github.com/envoyproxy/ratelimit/src/config" "github.com/envoyproxy/ratelimit/src/limiter" "github.com/envoyproxy/ratelimit/src/settings" + "github.com/envoyproxy/ratelimit/src/srv" "github.com/envoyproxy/ratelimit/src/utils" ) @@ -173,6 +175,70 @@ func (this *rateLimitMemcacheImpl) Flush() { this.waitGroup.Wait() } +func refreshServersPeriodically(serverList memcache.ServerList, srv string, d time.Duration, finish <-chan struct{}) { + t := time.NewTicker(d) + defer t.Stop() + for { + select { + case <-t.C: + err := refreshServers(serverList, srv) + if err != nil { + logger.Warn("failed to refresh memcahce hosts") + } else { + logger.Debug("refreshed memcache hosts") + } + case <-finish: + return + } + } +} + +func refreshServers(serverList memcache.ServerList, srv_ string) error { + servers, err := srv.ServerStringsFromSrv(srv_) + if err != nil { + return err + } + err = serverList.SetServers(servers...) + if err != nil { + return err + } + return nil +} + +func newMemcachedFromSrv(srv_ string, d time.Duration) Client { + serverList := new(memcache.ServerList) + err := refreshServers(*serverList, srv_) + if err != nil { + errorText := "Unable to fetch servers from SRV" + logger.Errorf(errorText) + panic(MemcacheError(errorText)) + } + + if d > 0 { + logger.Infof("refreshing memcache hosts every: %v milliseconds", d.Milliseconds()) + finish := make(chan struct{}) + go refreshServersPeriodically(*serverList, srv_, d, finish) + } else { + logger.Debugf("not periodically refreshing memcached hosts") + } + + return memcache.NewFromSelector(serverList) +} + +func newMemcacheFromSettings(s settings.Settings) Client { + if s.MemcacheSrv != "" && len(s.MemcacheHostPort) > 0 { + panic(MemcacheError("Both MEMCADHE_HOST_PORT and MEMCACHE_SRV are set")) + } + if s.MemcacheSrv != "" { + logger.Debugf("Using MEMCACHE_SRV: %v", s.MemcacheSrv) + return newMemcachedFromSrv(s.MemcacheSrv, s.MemcacheSrvRefresh) + } + logger.Debugf("Usng MEMCACHE_HOST_PORT:: %v", s.MemcacheHostPort) + client := memcache.New(s.MemcacheHostPort...) + client.MaxIdleConns = s.MemcacheMaxIdleConns + return client +} + func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope, nearLimitRatio float32, cacheKeyPrefix string) limiter.RateLimitCache { return &rateLimitMemcacheImpl{ @@ -188,10 +254,8 @@ func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRan func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource utils.TimeSource, jitterRand *rand.Rand, localCache *freecache.Cache, scope stats.Scope) limiter.RateLimitCache { - var client = memcache.New(s.MemcacheHostPort...) - client.MaxIdleConns = s.MemcacheMaxIdleConns return NewRateLimitCacheImpl( - CollectStats(client, scope.Scope("memcache")), + CollectStats(newMemcacheFromSettings(s), scope.Scope("memcache")), timeSource, jitterRand, s.ExpirationJitterMaxSeconds, diff --git a/src/memcached/client.go b/src/memcached/client.go index 55c0ec31..e8090269 100644 --- a/src/memcached/client.go +++ b/src/memcached/client.go @@ -4,6 +4,13 @@ import ( "github.com/bradfitz/gomemcache/memcache" ) +// Errors that may be raised during config parsing. +type MemcacheError string + +func (e MemcacheError) Error() string { + return string(e) +} + var _ Client = (*memcache.Client)(nil) // Interface for memcached, used for mocking. diff --git a/src/settings/settings.go b/src/settings/settings.go index 4c58894b..2646b5b2 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -76,7 +76,9 @@ type Settings struct { // number of connections to memcache kept idle in pool, if a connection is needed but none // are idle a new connection is opened, used and closed and can be left in a time-wait state // which can result in high CPU usage. - MemcacheMaxIdleConns int `envconfig:"MEMCACHE_MAX_IDLE_CONNS" default:"2"` + MemcacheMaxIdleConns int `envconfig:"MEMCACHE_MAX_IDLE_CONNS" default:"2"` + MemcacheSrv string `envconfig:"MEMCACHE_SRV" default:""` + MemcacheSrvRefresh time.Duration `envconfig:"MEMCACHE_SRV_REFRESH" default:"0"` } type Option func(*Settings) diff --git a/src/srv/srv.go b/src/srv/srv.go new file mode 100644 index 00000000..041ceb95 --- /dev/null +++ b/src/srv/srv.go @@ -0,0 +1,49 @@ +package srv + +import ( + "errors" + "fmt" + "net" + "regexp" + + logger "github.com/sirupsen/logrus" +) + +var srvRegex = regexp.MustCompile(`^_(.+?)\._(.+?)\.(.+)$`) + +func ParseSrv(srv string) (string, string, string, error) { + matches := srvRegex.FindStringSubmatch(srv) + if matches == nil { + errorText := fmt.Sprintf("could not parse %s to SRV parts", srv) + logger.Errorf(errorText) + return "", "", "", errors.New(errorText) + } + return matches[1], matches[2], matches[3], nil +} + +func ServerStringsFromSrv(srv string) ([]string, error) { + service, proto, name, err := ParseSrv(srv) + + if err != nil { + logger.Errorf("failed to parse SRV: %s", err) + return nil, err + } + + _, srvs, err := net.LookupSRV(service, proto, name) + + if err != nil { + logger.Errorf("failed to lookup SRV: %s", err) + return nil, err + } + + logger.Debugf("found %v servers(s) from SRV", len(srvs)) + + serversFromSrv := make([]string, len(srvs)) + for i, srv := range srvs { + server := fmt.Sprintf("%s:%v", srv.Target, srv.Port) + logger.Debugf("server from srv[%v]: %s", i, server) + serversFromSrv[i] = fmt.Sprintf("%s:%v", srv.Target, srv.Port) + } + + return serversFromSrv, nil +} diff --git a/test/memcached/cache_impl_test.go b/test/memcached/cache_impl_test.go index 1e2ba8d7..652ac7c7 100644 --- a/test/memcached/cache_impl_test.go +++ b/test/memcached/cache_impl_test.go @@ -16,6 +16,7 @@ import ( "github.com/envoyproxy/ratelimit/src/config" "github.com/envoyproxy/ratelimit/src/limiter" "github.com/envoyproxy/ratelimit/src/memcached" + "github.com/envoyproxy/ratelimit/src/settings" "github.com/envoyproxy/ratelimit/src/utils" stats "github.com/lyft/gostats" @@ -583,6 +584,41 @@ func TestMemcacheAdd(t *testing.T) { cache.Flush() } +func TestNewRateLimitCacheImplFromSettingsWhenSrvCannotBeResolved(t *testing.T) { + assert := assert.New(t) + controller := gomock.NewController(t) + defer controller.Finish() + + timeSource := mock_utils.NewMockTimeSource(controller) + statsStore := stats.NewStore(stats.NewNullSink(), false) + + var s settings.Settings + s.NearLimitRatio = 0.8 + s.CacheKeyPrefix = "" + s.ExpirationJitterMaxSeconds = 300 + s.MemcacheSrv = "_something._tcp.example.invalid" + + assert.Panics(func() { memcached.NewRateLimitCacheImplFromSettings(s, timeSource, nil, nil, statsStore) }) +} + +func TestNewRateLimitCacheImplFromSettingsWhenHostAndPortAndSrvAreBothSet(t *testing.T) { + assert := assert.New(t) + controller := gomock.NewController(t) + defer controller.Finish() + + timeSource := mock_utils.NewMockTimeSource(controller) + statsStore := stats.NewStore(stats.NewNullSink(), false) + + var s settings.Settings + s.NearLimitRatio = 0.8 + s.CacheKeyPrefix = "" + s.ExpirationJitterMaxSeconds = 300 + s.MemcacheSrv = "_something._tcp.example.invalid" + s.MemcacheHostPort = []string{"example.org:11211"} + + assert.Panics(func() { memcached.NewRateLimitCacheImplFromSettings(s, timeSource, nil, nil, statsStore) }) +} + func getMultiResult(vals map[string]int) map[string]*memcache.Item { result := make(map[string]*memcache.Item, len(vals)) for k, v := range vals { diff --git a/test/srv/srv_test.go b/test/srv/srv_test.go new file mode 100644 index 00000000..5e3e8f79 --- /dev/null +++ b/test/srv/srv_test.go @@ -0,0 +1,56 @@ +package srv + +import ( + "errors" + "net" + "testing" + + "github.com/envoyproxy/ratelimit/src/srv" + "github.com/stretchr/testify/assert" +) + +func TestParseSrv(t *testing.T) { + service, proto, name, err := srv.ParseSrv("_something._tcp.example.org.") + assert.Equal(t, service, "something") + assert.Equal(t, proto, "tcp") + assert.Equal(t, name, "example.org.") + assert.Nil(t, err) + + service, proto, name, err = srv.ParseSrv("_something-else._udp.example.org") + assert.Equal(t, service, "something-else") + assert.Equal(t, proto, "udp") + assert.Equal(t, name, "example.org") + assert.Nil(t, err) + + _, _, _, err = srv.ParseSrv("example.org") + assert.Equal(t, err, errors.New("could not parse example.org to SRV parts")) +} + +func TestServerStringsFromSrvWhenSrvIsNotWellFormed(t *testing.T) { + _, err := srv.ServerStringsFromSrv("example.org") + assert.Equal(t, err, errors.New("could not parse example.org to SRV parts")) +} + +func TestServerStringsFromSevWhenSrvIsWellFormedButNotLookupable(t *testing.T) { + _, err := srv.ServerStringsFromSrv("_something._tcp.example.invalid") + var e *net.DNSError + if errors.As(err, &e) { + assert.Equal(t, e.Err, "no such host") + assert.Equal(t, e.Name, "_something._tcp.example.invalid") + assert.False(t, e.IsTimeout) + assert.False(t, e.IsTemporary) + assert.True(t, e.IsNotFound) + } else { + t.Fail() + } +} + +func TestServerStrings(t *testing.T) { + // it seems reasonable to think _xmpp-server._tcp.gmail.com will be available for a long time! + servers, err := srv.ServerStringsFromSrv("_xmpp-server._tcp.gmail.com.") + assert.True(t, len(servers) > 0) + for _, s := range servers { + assert.Regexp(t, `^.*xmpp-server.*google.com.:\d+$`, s) + } + assert.Nil(t, err) +} From c0cdd752f8d538d16d6476ea24cfbf2e355cc25c Mon Sep 17 00:00:00 2001 From: Bohdan Storozhuk Date: Fri, 28 May 2021 16:45:49 +0100 Subject: [PATCH 7/7] Reduce short living tasks produced by memcached implementation and move them to goroutines pool (#251) Signed-off-by: bstorozhuk --- src/memcached/cache_impl.go | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/memcached/cache_impl.go b/src/memcached/cache_impl.go index de3e095d..1f39dd36 100644 --- a/src/memcached/cache_impl.go +++ b/src/memcached/cache_impl.go @@ -123,7 +123,7 @@ func (this *rateLimitMemcacheImpl) DoLimit( } this.waitGroup.Add(1) - go this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend)) + runAsync(func() { this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend)) }) if AutoFlushForIntegrationTests { this.Flush() } @@ -239,6 +239,40 @@ func newMemcacheFromSettings(s settings.Settings) Client { return client } +var taskQueue = make(chan func()) + +func runAsync(task func()) { + select { + case taskQueue <- task: + // submitted, everything is ok + + default: + go func() { + // do the given task + task() + + tasksProcessedWithinOnePeriod := 0 + const tickDuration = 10 * time.Second + tick := time.NewTicker(tickDuration) + defer tick.Stop() + + for { + select { + case t := <-taskQueue: + t() + tasksProcessedWithinOnePeriod++ + case <-tick.C: + if tasksProcessedWithinOnePeriod > 0 { + tasksProcessedWithinOnePeriod = 0 + continue + } + return + } + } + }() + } +} + func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope, nearLimitRatio float32, cacheKeyPrefix string) limiter.RateLimitCache { return &rateLimitMemcacheImpl{