From 4e9fbf3f0a9e66136bb4d76f7051da2ef53707bc Mon Sep 17 00:00:00 2001 From: debbyku Date: Mon, 1 Nov 2021 14:50:42 +0000 Subject: [PATCH 1/2] health check failed if no active redis connection Signed-off-by: debbyku --- README.md | 7 +++++++ src/redis/cache_impl.go | 4 ++-- src/redis/driver_impl.go | 17 ++++++++++++----- src/server/health.go | 5 +++++ src/server/server.go | 3 +++ src/server/server_impl.go | 8 ++++++++ src/service_cmd/runner/runner.go | 3 ++- src/settings/settings.go | 3 ++- test/redis/bench_test.go | 2 +- test/redis/driver_impl_test.go | 6 +++--- 10 files changed, 45 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 33e367ba..656e615a 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ - [Pipelining](#pipelining) - [One Redis Instance](#one-redis-instance) - [Two Redis Instances](#two-redis-instances) + - [Health Checking for Redis Active Connection](#health-checking-for-redis-active-connection) - [Memcache](#memcache) - [Custom headers](#custom-headers) - [Contact](#contact) @@ -711,6 +712,12 @@ To configure two Redis instances use the following environment variables: This setup will use the Redis server configured with the `_PERSECOND_` vars for per second limits, and the other Redis server for all other limits. +## Health Checking for Redis Active Connection + +To configure whether to return health check failure if there is no active redis connection + +1. `REDIS_HEALTH_CHECK_ACTIVE_CONNECTION` : (default is "false") + # Memcache Experimental Memcache support has been added as an alternative to Redis in v1.5. diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index 1c9417d7..715e670d 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -16,11 +16,11 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca var perSecondPool Client if s.RedisPerSecond { perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType, - s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig) + s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv) } var otherPool Client otherPool = NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize, - s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig) + s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv) return NewFixedRateLimitCacheImpl( otherPool, diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 18a65df1..8044d3dc 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -6,11 +6,12 @@ import ( "strings" "time" - "github.com/mediocregopher/radix/v3/trace" - stats "github.com/lyft/gostats" "github.com/mediocregopher/radix/v3" + "github.com/mediocregopher/radix/v3/trace" logger "github.com/sirupsen/logrus" + + "github.com/envoyproxy/ratelimit/src/server" ) type poolStats struct { @@ -27,15 +28,21 @@ func newPoolStats(scope stats.Scope) poolStats { return ret } -func poolTrace(ps *poolStats) trace.PoolTrace { +func poolTrace(ps *poolStats, healthCheckActiveConnection bool, srv server.Server) trace.PoolTrace { return trace.PoolTrace{ ConnCreated: func(_ trace.PoolConnCreated) { ps.connectionTotal.Add(1) ps.connectionActive.Add(1) + if healthCheckActiveConnection && srv != nil { + srv.HealthCheckOK() + } }, ConnClosed: func(_ trace.PoolConnClosed) { ps.connectionActive.Sub(1) ps.connectionClose.Add(1) + if healthCheckActiveConnection && srv != nil && ps.connectionActive.Value() == 0 { + srv.HealthCheckFail() + } }, } } @@ -53,7 +60,7 @@ func checkError(err error) { } func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int, - pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config) Client { + pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server) Client { logger.Warnf("connecting to redis on %s with pool size %d", url, poolSize) df := func(network, addr string) (radix.Conn, error) { @@ -78,7 +85,7 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT stats := newPoolStats(scope) - opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats))} + opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats, healthCheckActiveConnection, srv))} implicitPipelining := true if pipelineWindow == 0 && pipelineLimit == 0 { diff --git a/src/server/health.go b/src/server/health.go index d5ba5ac8..d2eb2b76 100644 --- a/src/server/health.go +++ b/src/server/health.go @@ -51,6 +51,11 @@ func (hc *HealthChecker) Fail() { hc.grpc.SetServingStatus(hc.name, healthpb.HealthCheckResponse_NOT_SERVING) } +func (hc *HealthChecker) Ok() { + atomic.StoreUint32(&hc.ok, 1) + hc.grpc.SetServingStatus(hc.name, healthpb.HealthCheckResponse_SERVING) +} + func (hc *HealthChecker) Server() *health.Server { return hc.grpc } diff --git a/src/server/server.go b/src/server/server.go index 7f2b1b05..46c8ea5d 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -43,4 +43,7 @@ type Server interface { * Stops serving the grpc port (for integration testing). */ Stop() + + HealthCheckFail() + HealthCheckOK() } diff --git a/src/server/server_impl.go b/src/server/server_impl.go index bfd66616..bab235ec 100644 --- a/src/server/server_impl.go +++ b/src/server/server_impl.go @@ -311,3 +311,11 @@ func (server *server) handleGracefulShutdown() { os.Exit(0) }() } + +func (server *server) HealthCheckFail() { + server.health.Fail() +} + +func (server *server) HealthCheckOK() { + server.health.Ok() +} diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index 3d616229..e6f3ccfd 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -57,7 +57,8 @@ func createLimiter(srv server.Server, s settings.Settings, localCache *freecache utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), s.ExpirationJitterMaxSeconds, - statsManager) + statsManager, + ) case "memcache": return memcached.NewRateLimitCacheImplFromSettings( s, diff --git a/src/settings/settings.go b/src/settings/settings.go index c9afb210..5190dc20 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -88,7 +88,8 @@ type Settings struct { // RedisPerSecondPipelineLimit sets maximum number of commands that can be pipelined before flushing for per second redis. // See comments of RedisPipelineLimit for details. RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"` - + // Enable healthcheck to check Redis Connection. If there is no active connection, healthcheck failed. + RedisHealthCheckActiveConnection bool `envconfig:"REDIS_HEALTH_CHECK_ACTIVE_CONNECTION" default:"false"` // Memcache settings MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""` // MemcacheMaxIdleConns sets the maximum number of idle TCP connections per memcached node. diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go index 66f6ccbb..27eaf6d6 100644 --- a/test/redis/bench_test.go +++ b/test/redis/bench_test.go @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) { return func(b *testing.B) { statsStore := gostats.NewStore(gostats.NewNullSink(), false) sm := stats.NewMockStatManager(statsStore) - client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil) + client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil) defer client.Close() cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm) diff --git a/test/redis/driver_impl_test.go b/test/redis/driver_impl_test.go index f549a754..3f924e4a 100644 --- a/test/redis/driver_impl_test.go +++ b/test/redis/driver_impl_test.go @@ -37,7 +37,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(auth, addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil) + return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil) } t.Run("connection refused", func(t *testing.T) { @@ -104,7 +104,7 @@ func TestDoCmd(t *testing.T) { statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil) } t.Run("SETGET ok", func(t *testing.T) { @@ -149,7 +149,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f statsStore := stats.NewStore(stats.NewNullSink(), false) mkRedisClient := func(addr string) redis.Client { - return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil) + return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil) } t.Run("SETGET ok", func(t *testing.T) { From 104c27bc3505c85f21813b0765776808564cd5b2 Mon Sep 17 00:00:00 2001 From: debbyku Date: Fri, 19 Nov 2021 12:54:08 +0000 Subject: [PATCH 2/2] solve redis connection active count error Signed-off-by: debbyku --- src/redis/driver_impl.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index 8044d3dc..6014774f 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -30,11 +30,15 @@ func newPoolStats(scope stats.Scope) poolStats { func poolTrace(ps *poolStats, healthCheckActiveConnection bool, srv server.Server) trace.PoolTrace { return trace.PoolTrace{ - ConnCreated: func(_ trace.PoolConnCreated) { - ps.connectionTotal.Add(1) - ps.connectionActive.Add(1) - if healthCheckActiveConnection && srv != nil { - srv.HealthCheckOK() + ConnCreated: func(newConn trace.PoolConnCreated) { + if newConn.Err == nil { + ps.connectionTotal.Add(1) + ps.connectionActive.Add(1) + if healthCheckActiveConnection && srv != nil { + srv.HealthCheckOK() + } + } else { + fmt.Println("creating redis connection error :", newConn.Err) } }, ConnClosed: func(_ trace.PoolConnClosed) {