Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into metric-refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Radnic <[email protected]>
  • Loading branch information
pradnic committed Jun 2, 2021
2 parents 56c3e52 + c0cdd75 commit d03eba2
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 29 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ $ curl 0:6070/
/stats: print out stats
```

You can specify the debug port with the `DEBUG_PORT` environment variable. It defaults to `6070`.
You can specify the debug server address with the `DEBUG_HOST` and `DEBUG_PORT` environment variables. They currently default to `0.0.0.0` and `6070` respectively.

# Local Cache

Expand Down Expand Up @@ -582,9 +582,12 @@ Experimental Memcache support has been added as an alternative to Redis in v1.5.

To configure a Memcache instance use the following environment variables instead of the Redis variables:

1. `MEMCACHE_HOST_PORT=`: a comma separated list of hostname:port pairs for memcache nodes.
1. `MEMCACHE_HOST_PORT=`: a comma separated list of hostname:port pairs for memcache nodes (mutually exclusive with `MEMCACHE_SRV`)
1. `MEMCACHE_SRV=`: an SRV record to lookup hosts from (mutually exclusive with `MEMCACHE_HOST_PORT`)
1. `MEMCACHE_SRV_REFRESH=0`: refresh the list of hosts every n seconds, if 0 no refreshing will happen, supports duration suffixes: "ns", "us" (or "µs"), "ms", "s", "m", "h".
1. `BACKEND_TYPE=memcache`
1. `CACHE_KEY_PREFIX`: a string to prepend to all cache keys
1. `MEMCACHE_MAX_IDLE_CONNS=2`: the maximum number of idle TCP connections per memcache node, `2` is the default of the underlying library

With memcache mode increments will happen asynchronously, so it's technically possible for
a client to exceed quota briefly if multiple requests happen at exactly the same time.
Expand Down
104 changes: 102 additions & 2 deletions src/memcached/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math/rand"
"strconv"
"sync"
"time"

"github.com/coocood/freecache"
gostats "github.com/lyft/gostats"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/envoyproxy/ratelimit/src/config"
"github.com/envoyproxy/ratelimit/src/limiter"
"github.com/envoyproxy/ratelimit/src/settings"
"github.com/envoyproxy/ratelimit/src/srv"
"github.com/envoyproxy/ratelimit/src/utils"
)

Expand Down Expand Up @@ -122,7 +124,7 @@ func (this *rateLimitMemcacheImpl) DoLimit(
}

this.waitGroup.Add(1)
go this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend))
runAsync(func() { this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend)) })
if AutoFlushForIntegrationTests {
this.Flush()
}
Expand Down Expand Up @@ -174,6 +176,104 @@ func (this *rateLimitMemcacheImpl) Flush() {
this.waitGroup.Wait()
}

func refreshServersPeriodically(serverList memcache.ServerList, srv string, d time.Duration, finish <-chan struct{}) {
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-t.C:
err := refreshServers(serverList, srv)
if err != nil {
logger.Warn("failed to refresh memcahce hosts")
} else {
logger.Debug("refreshed memcache hosts")
}
case <-finish:
return
}
}
}

func refreshServers(serverList memcache.ServerList, srv_ string) error {
servers, err := srv.ServerStringsFromSrv(srv_)
if err != nil {
return err
}
err = serverList.SetServers(servers...)
if err != nil {
return err
}
return nil
}

func newMemcachedFromSrv(srv_ string, d time.Duration) Client {
serverList := new(memcache.ServerList)
err := refreshServers(*serverList, srv_)
if err != nil {
errorText := "Unable to fetch servers from SRV"
logger.Errorf(errorText)
panic(MemcacheError(errorText))
}

if d > 0 {
logger.Infof("refreshing memcache hosts every: %v milliseconds", d.Milliseconds())
finish := make(chan struct{})
go refreshServersPeriodically(*serverList, srv_, d, finish)
} else {
logger.Debugf("not periodically refreshing memcached hosts")
}

return memcache.NewFromSelector(serverList)
}

func newMemcacheFromSettings(s settings.Settings) Client {
if s.MemcacheSrv != "" && len(s.MemcacheHostPort) > 0 {
panic(MemcacheError("Both MEMCADHE_HOST_PORT and MEMCACHE_SRV are set"))
}
if s.MemcacheSrv != "" {
logger.Debugf("Using MEMCACHE_SRV: %v", s.MemcacheSrv)
return newMemcachedFromSrv(s.MemcacheSrv, s.MemcacheSrvRefresh)
}
logger.Debugf("Usng MEMCACHE_HOST_PORT:: %v", s.MemcacheHostPort)
client := memcache.New(s.MemcacheHostPort...)
client.MaxIdleConns = s.MemcacheMaxIdleConns
return client
}

var taskQueue = make(chan func())

func runAsync(task func()) {
select {
case taskQueue <- task:
// submitted, everything is ok

default:
go func() {
// do the given task
task()

tasksProcessedWithinOnePeriod := 0
const tickDuration = 10 * time.Second
tick := time.NewTicker(tickDuration)
defer tick.Stop()

for {
select {
case t := <-taskQueue:
t()
tasksProcessedWithinOnePeriod++
case <-tick.C:
if tasksProcessedWithinOnePeriod > 0 {
tasksProcessedWithinOnePeriod = 0
continue
}
return
}
}
}()
}
}

func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRand *rand.Rand,
expirationJitterMaxSeconds int64, localCache *freecache.Cache, statsManager stats.Manager, nearLimitRatio float32, cacheKeyPrefix string) limiter.RateLimitCache {
return &rateLimitMemcacheImpl{
Expand All @@ -190,7 +290,7 @@ func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRan
func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource utils.TimeSource, jitterRand *rand.Rand,
localCache *freecache.Cache, scope gostats.Scope, statsManager stats.Manager) limiter.RateLimitCache {
return NewRateLimitCacheImpl(
CollectStats(memcache.New(s.MemcacheHostPort...), scope.Scope("memcache")),
CollectStats(newMemcacheFromSettings(s), scope.Scope("memcache")),
timeSource,
jitterRand,
s.ExpirationJitterMaxSeconds,
Expand Down
7 changes: 7 additions & 0 deletions src/memcached/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import (
"github.com/bradfitz/gomemcache/memcache"
)

// Errors that may be raised during config parsing.
type MemcacheError string

func (e MemcacheError) Error() string {
return string(e)
}

var _ Client = (*memcache.Client)(nil)

// Interface for memcached, used for mocking.
Expand Down
4 changes: 0 additions & 4 deletions src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,10 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth string, redisType string
df := func(network, addr string) (radix.Conn, error) {
var dialOpts []radix.DialOpt

var err error
if useTls {
dialOpts = append(dialOpts, radix.DialUseTLS(&tls.Config{}))
}

if err != nil {
return nil, err
}
if auth != "" {
logger.Warnf("enabling authentication to redis on %s", url)

Expand Down
38 changes: 22 additions & 16 deletions src/server/server_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/pprof"
"path/filepath"
"sort"
"strconv"
"sync"

"os"
Expand Down Expand Up @@ -40,9 +41,9 @@ type serverDebugListener struct {
}

type server struct {
port int
grpcPort int
debugPort int
httpAddress string
grpcAddress string
debugAddress string
router *mux.Router
grpcServer *grpc.Server
store gostats.Store
Expand Down Expand Up @@ -115,11 +116,10 @@ func (server *server) GrpcServer() *grpc.Server {

func (server *server) Start() {
go func() {
addr := fmt.Sprintf(":%d", server.debugPort)
logger.Warnf("Listening for debug on '%s'", addr)
logger.Warnf("Listening for debug on '%s'", server.debugAddress)
var err error
server.listenerMu.Lock()
server.debugListener.listener, err = reuseport.Listen("tcp", addr)
server.debugListener.listener, err = reuseport.Listen("tcp", server.debugAddress)
server.listenerMu.Unlock()

if err != nil {
Expand All @@ -134,9 +134,8 @@ func (server *server) Start() {

server.handleGracefulShutdown()

addr := fmt.Sprintf(":%d", server.port)
logger.Warnf("Listening for HTTP on '%s'", addr)
list, err := reuseport.Listen("tcp", addr)
logger.Warnf("Listening for HTTP on '%s'", server.httpAddress)
list, err := reuseport.Listen("tcp", server.httpAddress)
if err != nil {
logger.Fatalf("Failed to open HTTP listener: '%+v'", err)
}
Expand All @@ -152,9 +151,8 @@ func (server *server) Start() {
}

func (server *server) startGrpc() {
addr := fmt.Sprintf(":%d", server.grpcPort)
logger.Warnf("Listening for gRPC on '%s'", addr)
lis, err := reuseport.Listen("tcp", addr)
logger.Warnf("Listening for gRPC on '%s'", server.grpcAddress)
lis, err := reuseport.Listen("tcp", server.grpcAddress)
if err != nil {
logger.Fatalf("Failed to listen for gRPC: %v", err)
}
Expand All @@ -181,10 +179,10 @@ func newServer(s settings.Settings, name string, statsManager stats.Manager, loc
ret := new(server)
ret.grpcServer = grpc.NewServer(s.GrpcUnaryInterceptor)

// setup ports
ret.port = s.Port
ret.grpcPort = s.GrpcPort
ret.debugPort = s.DebugPort
// setup listen addresses
ret.httpAddress = net.JoinHostPort(s.Host, strconv.Itoa(s.Port))
ret.grpcAddress = net.JoinHostPort(s.GrpcHost, strconv.Itoa(s.GrpcPort))
ret.debugAddress = net.JoinHostPort(s.DebugHost, strconv.Itoa(s.DebugPort))

// setup stats
ret.store = statsManager.GetStatsStore()
Expand Down Expand Up @@ -255,6 +253,14 @@ func newServer(s settings.Settings, name string, statsManager stats.Manager, loc
})
})

// setup trace endpoint
ret.AddDebugHttpEndpoint(
"/debug/pprof/trace",
"trace endpoint",
func(writer http.ResponseWriter, request *http.Request) {
pprof.Trace(writer, request)
})

// setup debug root
ret.debugListener.debugMux.HandleFunc(
"/",
Expand Down
19 changes: 15 additions & 4 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
type Settings struct {
// runtime options
GrpcUnaryInterceptor grpc.ServerOption
// env config
Port int `envconfig:"PORT" default:"8080"`
GrpcPort int `envconfig:"GRPC_PORT" default:"8081"`
DebugPort int `envconfig:"DEBUG_PORT" default:"6070"`
// Server listen address config
Host string `envconfig:"HOST" default:"0.0.0.0"`
Port int `envconfig:"PORT" default:"8080"`
GrpcHost string `envconfig:"GRPC_HOST" default:"0.0.0.0"`
GrpcPort int `envconfig:"GRPC_PORT" default:"8081"`
DebugHost string `envconfig:"DEBUG_HOST" default:"0.0.0.0"`
DebugPort int `envconfig:"DEBUG_PORT" default:"6070"`

// Logging settings
LogLevel string `envconfig:"LOG_LEVEL" default:"WARN"`
Expand Down Expand Up @@ -68,6 +71,14 @@ type Settings struct {

// Memcache settings
MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""`
// MemcacheMaxIdleConns sets the maximum number of idle TCP connections per memcached node.
// The default is 2 as that is the default of the underlying library. This is the maximum
// number of connections to memcache kept idle in pool, if a connection is needed but none
// are idle a new connection is opened, used and closed and can be left in a time-wait state
// which can result in high CPU usage.
MemcacheMaxIdleConns int `envconfig:"MEMCACHE_MAX_IDLE_CONNS" default:"2"`
MemcacheSrv string `envconfig:"MEMCACHE_SRV" default:""`
MemcacheSrvRefresh time.Duration `envconfig:"MEMCACHE_SRV_REFRESH" default:"0"`
}

type Option func(*Settings)
Expand Down
49 changes: 49 additions & 0 deletions src/srv/srv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package srv

import (
"errors"
"fmt"
"net"
"regexp"

logger "github.com/sirupsen/logrus"
)

var srvRegex = regexp.MustCompile(`^_(.+?)\._(.+?)\.(.+)$`)

func ParseSrv(srv string) (string, string, string, error) {
matches := srvRegex.FindStringSubmatch(srv)
if matches == nil {
errorText := fmt.Sprintf("could not parse %s to SRV parts", srv)
logger.Errorf(errorText)
return "", "", "", errors.New(errorText)
}
return matches[1], matches[2], matches[3], nil
}

func ServerStringsFromSrv(srv string) ([]string, error) {
service, proto, name, err := ParseSrv(srv)

if err != nil {
logger.Errorf("failed to parse SRV: %s", err)
return nil, err
}

_, srvs, err := net.LookupSRV(service, proto, name)

if err != nil {
logger.Errorf("failed to lookup SRV: %s", err)
return nil, err
}

logger.Debugf("found %v servers(s) from SRV", len(srvs))

serversFromSrv := make([]string, len(srvs))
for i, srv := range srvs {
server := fmt.Sprintf("%s:%v", srv.Target, srv.Port)
logger.Debugf("server from srv[%v]: %s", i, server)
serversFromSrv[i] = fmt.Sprintf("%s:%v", srv.Target, srv.Port)
}

return serversFromSrv, nil
}
15 changes: 15 additions & 0 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,21 @@ func TestBasicConfigMemcache(t *testing.T) {
})
}

func TestConfigMemcacheWithMaxIdleConns(t *testing.T) {
singleNodePort := []int{6394}
assert := assert.New(t)
common.WithMultiMemcache(t, []common.MemcacheConfig{
{Port: 6394},
}, func() {
withDefaultMaxIdleConns := makeSimpleMemcacheSettings(singleNodePort, 0)
assert.Equal(2, withDefaultMaxIdleConns.MemcacheMaxIdleConns)
t.Run("MemcacheWithDefaultMaxIdleConns", testBasicConfig(withDefaultMaxIdleConns))
withSpecifiedMaxIdleConns := makeSimpleMemcacheSettings(singleNodePort, 0)
withSpecifiedMaxIdleConns.MemcacheMaxIdleConns = 100
t.Run("MemcacheWithSpecifiedMaxIdleConns", testBasicConfig(withSpecifiedMaxIdleConns))
})
}

func TestMultiNodeMemcache(t *testing.T) {
multiNodePorts := []int{6494, 6495}
common.WithMultiMemcache(t, []common.MemcacheConfig{
Expand Down
Loading

0 comments on commit d03eba2

Please sign in to comment.