Skip to content

Commit

Permalink
health check failed if no active redis connection (envoyproxy#310)
Browse files Browse the repository at this point in the history
Signed-off-by: debbyku <[email protected]>
  • Loading branch information
debbyku authored and timcovar committed Jan 16, 2024
1 parent 7d6858b commit c96394f
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 13 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- [Pipelining](#pipelining)
- [One Redis Instance](#one-redis-instance)
- [Two Redis Instances](#two-redis-instances)
- [Health Checking for Redis Active Connection](#health-checking-for-redis-active-connection)
- [Memcache](#memcache)
- [Custom headers](#custom-headers)
- [Contact](#contact)
Expand Down Expand Up @@ -711,6 +712,12 @@ To configure two Redis instances use the following environment variables:
This setup will use the Redis server configured with the `_PERSECOND_` vars for
per second limits, and the other Redis server for all other limits.

## Health Checking for Redis Active Connection

To configure whether to return health check failure if there is no active redis connection

1. `REDIS_HEALTH_CHECK_ACTIVE_CONNECTION` : (default is "false")

# Memcache

Experimental Memcache support has been added as an alternative to Redis in v1.5.
Expand Down
4 changes: 2 additions & 2 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca
var perSecondPool Client
if s.RedisPerSecond {
perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType,
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig)
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv)
}
var otherPool Client
otherPool = NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig)
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv)

return NewFixedRateLimitCacheImpl(
otherPool,
Expand Down
17 changes: 12 additions & 5 deletions src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"strings"
"time"

"github.com/mediocregopher/radix/v3/trace"

stats "github.com/lyft/gostats"
"github.com/mediocregopher/radix/v3"
"github.com/mediocregopher/radix/v3/trace"
logger "github.com/sirupsen/logrus"

"github.com/envoyproxy/ratelimit/src/server"
)

type poolStats struct {
Expand All @@ -27,15 +28,21 @@ func newPoolStats(scope stats.Scope) poolStats {
return ret
}

func poolTrace(ps *poolStats) trace.PoolTrace {
func poolTrace(ps *poolStats, healthCheckActiveConnection bool, srv server.Server) trace.PoolTrace {
return trace.PoolTrace{
ConnCreated: func(_ trace.PoolConnCreated) {
ps.connectionTotal.Add(1)
ps.connectionActive.Add(1)
if healthCheckActiveConnection && srv != nil {
srv.HealthCheckOK()
}
},
ConnClosed: func(_ trace.PoolConnClosed) {
ps.connectionActive.Sub(1)
ps.connectionClose.Add(1)
if healthCheckActiveConnection && srv != nil && ps.connectionActive.Value() == 0 {
srv.HealthCheckFail()
}
},
}
}
Expand All @@ -53,7 +60,7 @@ func checkError(err error) {
}

func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int,
pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config) Client {
pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server) Client {
logger.Warnf("connecting to redis on %s with pool size %d", url, poolSize)

df := func(network, addr string) (radix.Conn, error) {
Expand All @@ -78,7 +85,7 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT

stats := newPoolStats(scope)

opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats))}
opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats, healthCheckActiveConnection, srv))}

implicitPipelining := true
if pipelineWindow == 0 && pipelineLimit == 0 {
Expand Down
5 changes: 5 additions & 0 deletions src/server/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (hc *HealthChecker) Fail() {
hc.grpc.SetServingStatus(hc.name, healthpb.HealthCheckResponse_NOT_SERVING)
}

func (hc *HealthChecker) Ok() {
atomic.StoreUint32(&hc.ok, 1)
hc.grpc.SetServingStatus(hc.name, healthpb.HealthCheckResponse_SERVING)
}

func (hc *HealthChecker) Server() *health.Server {
return hc.grpc
}
3 changes: 3 additions & 0 deletions src/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ type Server interface {
* Stops serving the grpc port (for integration testing).
*/
Stop()

HealthCheckFail()
HealthCheckOK()
}
8 changes: 8 additions & 0 deletions src/server/server_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,11 @@ func (server *server) handleGracefulShutdown() {
os.Exit(0)
}()
}

func (server *server) HealthCheckFail() {
server.health.Fail()
}

func (server *server) HealthCheckOK() {
server.health.Ok()
}
3 changes: 2 additions & 1 deletion src/service_cmd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func createLimiter(srv server.Server, s settings.Settings, localCache *freecache
utils.NewTimeSourceImpl(),
rand.New(utils.NewLockedSource(time.Now().Unix())),
s.ExpirationJitterMaxSeconds,
statsManager)
statsManager,
)
case "memcache":
return memcached.NewRateLimitCacheImplFromSettings(
s,
Expand Down
3 changes: 2 additions & 1 deletion src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type Settings struct {
// RedisPerSecondPipelineLimit sets maximum number of commands that can be pipelined before flushing for per second redis.
// See comments of RedisPipelineLimit for details.
RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"0"`

// Enable healthcheck to check Redis Connection. If there is no active connection, healthcheck failed.
RedisHealthCheckActiveConnection bool `envconfig:"REDIS_HEALTH_CHECK_ACTIVE_CONNECTION" default:"false"`
// Memcache settings
MemcacheHostPort []string `envconfig:"MEMCACHE_HOST_PORT" default:""`
// MemcacheMaxIdleConns sets the maximum number of idle TCP connections per memcached node.
Expand Down
2 changes: 1 addition & 1 deletion test/redis/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) {
return func(b *testing.B) {
statsStore := gostats.NewStore(gostats.NewNullSink(), false)
sm := stats.NewMockStatManager(statsStore)
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil)
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil)
defer client.Close()

cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm)
Expand Down
6 changes: 3 additions & 3 deletions test/redis/driver_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(auth, addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil)
return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil)
}

t.Run("connection refused", func(t *testing.T) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestDoCmd(t *testing.T) {
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil)
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil)
}

t.Run("SETGET ok", func(t *testing.T) {
Expand Down Expand Up @@ -149,7 +149,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil)
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil)
}

t.Run("SETGET ok", func(t *testing.T) {
Expand Down

0 comments on commit c96394f

Please sign in to comment.