Skip to content

Commit

Permalink
fix: Redis should reconnect on connectivity issue (argoproj#7207)
Browse files Browse the repository at this point in the history
fix: Redis should reconnect on connectivity issue (argoproj#7207)

Signed-off-by: pashavictorovich <[email protected]>
  • Loading branch information
pasha-codefresh authored Sep 15, 2021
1 parent 9cf71be commit 6f794d0
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 16 deletions.
54 changes: 39 additions & 15 deletions util/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,43 @@ func NewCache(client CacheClient) *Cache {
return &Cache{client}
}

func buildRedisClient(redisAddress, password string, redisDB, maxRetries int, tlsConfig *tls.Config) *redis.Client {
opts := &redis.Options{
Addr: redisAddress,
Password: password,
DB: redisDB,
MaxRetries: maxRetries,
TLSConfig: tlsConfig,
}

client := redis.NewClient(opts)

client.AddHook(redis.Hook(NewArgoRedisHook(func() {
*client = *buildRedisClient(redisAddress, password, redisDB, maxRetries, tlsConfig)
})))

return client
}

func buildFailoverRedisClient(sentinelMaster, password string, redisDB, maxRetries int, tlsConfig *tls.Config, sentinelAddresses []string) *redis.Client {
opts := &redis.FailoverOptions{
MasterName: sentinelMaster,
SentinelAddrs: sentinelAddresses,
DB: redisDB,
Password: password,
MaxRetries: maxRetries,
TLSConfig: tlsConfig,
}

client := redis.NewFailoverClient(opts)

client.AddHook(redis.Hook(NewArgoRedisHook(func() {
*client = *buildFailoverRedisClient(sentinelMaster, password, redisDB, maxRetries, tlsConfig, sentinelAddresses)
})))

return client
}

// AddCacheFlagsToCmd adds flags which control caching to the specified command
func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...func(client *redis.Client)) func() (*Cache, error) {
redisAddress := ""
Expand Down Expand Up @@ -84,14 +121,7 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...func(client *redis.Client))
password := os.Getenv(envRedisPassword)
maxRetries := env.ParseNumFromEnv(envRedisRetryCount, defaultRedisRetryCount, 0, math.MaxInt32)
if len(sentinelAddresses) > 0 {
client := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: sentinelMaster,
SentinelAddrs: sentinelAddresses,
DB: redisDB,
Password: password,
MaxRetries: maxRetries,
TLSConfig: tlsConfig,
})
client := buildFailoverRedisClient(sentinelMaster, password, redisDB, maxRetries, tlsConfig, sentinelAddresses)
for i := range opts {
opts[i](client)
}
Expand All @@ -101,13 +131,7 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...func(client *redis.Client))
redisAddress = common.DefaultRedisAddr
}

client := redis.NewClient(&redis.Options{
Addr: redisAddress,
Password: password,
DB: redisDB,
MaxRetries: maxRetries,
TLSConfig: tlsConfig,
})
client := buildRedisClient(redisAddress, password, redisDB, maxRetries, tlsConfig)
for i := range opts {
opts[i](client)
}
Expand Down
39 changes: 39 additions & 0 deletions util/cache/redis_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cache

import (
"context"
"strings"

"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
)

const NoSuchHostErr = "no such host"

type argoRedisHooks struct {
reconnectCallback func()
}

func NewArgoRedisHook(reconnectCallback func()) *argoRedisHooks {
return &argoRedisHooks{reconnectCallback: reconnectCallback}
}

func (hook *argoRedisHooks) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
return ctx, nil
}

func (hook *argoRedisHooks) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
if cmd.Err() != nil && strings.Contains(cmd.Err().Error(), NoSuchHostErr) {
log.Warnf("Reconnect to redis because error: \"%v\"", cmd.Err())
hook.reconnectCallback()
}
return nil
}

func (hook *argoRedisHooks) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
return ctx, nil
}

func (hook *argoRedisHooks) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
return nil
}
4 changes: 3 additions & 1 deletion util/session/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ func (storage *userStateStorage) watchRevokedTokens(ctx context.Context) {
}

func (storage *userStateStorage) loadRevokedTokensSafe() {
for err := storage.loadRevokedTokens(); err != nil; {
err := storage.loadRevokedTokens()
for err != nil {
log.Warnf("Failed to resync revoked tokens. retrying again in 1 minute: %v", err)
time.Sleep(time.Minute)
err = storage.loadRevokedTokens()
}
}

Expand Down

0 comments on commit 6f794d0

Please sign in to comment.