From adb6e9ed8b4b010f676676d82f7949566247c8ac Mon Sep 17 00:00:00 2001 From: Artem Torubarov Date: Fri, 15 Nov 2024 15:26:55 +0100 Subject: [PATCH] support all redis connection options Signed-off-by: Artem Torubarov --- pkg/config/config.go | 48 +++++++++++++---- pkg/config/config.yaml | 13 ++++- pkg/config/config_test.go | 81 +++++++++++++++++++++++++++- pkg/lock/service.go | 2 +- pkg/meta/version_service.go | 4 +- pkg/policy/service.go | 4 +- pkg/ratelimit/semaphore.go | 4 +- pkg/ratelimit/service.go | 2 +- pkg/rpc/agent_redis_request_reply.go | 6 +-- pkg/rpc/proxy_redis_request_reply.go | 6 +-- pkg/storage/storage.go | 4 +- pkg/util/redis.go | 68 +++++++++++++++++++++++ service/agent/config.yaml | 15 +++++- service/agent/server.go | 9 ++-- service/proxy/server.go | 9 ++-- service/worker/server.go | 8 +-- 16 files changed, 238 insertions(+), 45 deletions(-) create mode 100644 pkg/util/redis.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 34c1d4b..c22496f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -19,15 +19,16 @@ package config import ( "embed" "fmt" + "io" + "os" + "strings" + "github.com/clyso/chorus/pkg/features" "github.com/clyso/chorus/pkg/log" "github.com/clyso/chorus/pkg/metrics" "github.com/clyso/chorus/pkg/trace" stdlog "github.com/rs/zerolog/log" "github.com/spf13/viper" - "io" - "os" - "strings" ) //go:embed config.yaml @@ -42,12 +43,38 @@ type Common struct { } type Redis struct { - Address string `yaml:"address"` - Password string `yaml:"password"` - MetaDB int `yaml:"metaDB"` - QueueDB int `yaml:"queueDB"` - LockDB int `yaml:"lockDB"` - ConfigDB int `yaml:"configDB"` + // Deprecated: Address is deprecated: use Addresses + Address string `yaml:"address"` + Addresses []string `yaml:"addresses"` + Sentinel RedisSentinel `yaml:"sentinel"` + User string `yaml:"user"` + Password string `yaml:"password"` + TLS TLS `yaml:"tls"` + MetaDB int `yaml:"metaDB"` + QueueDB int `yaml:"queueDB"` + LockDB int `yaml:"lockDB"` + ConfigDB int `yaml:"configDB"` +} + +type RedisSentinel struct { + MasterName string `yaml:"masterName"` + Password string `yaml:"password"` + User string `yaml:"user"` +} + +type TLS struct { + Enabled bool `yaml:"enabled"` + Insecure bool `yaml:"insecure"` +} + +func (r *Redis) validate() error { + if r.Address != "" && len(r.Addresses) != 0 { + return fmt.Errorf("invalid redis config: either address or addresses must be used, but not both") + } + if r.Address == "" && len(r.Addresses) == 0 { + return fmt.Errorf("invalid redis config: address is not set") + } + return nil } func Get(conf any, sources ...Src) error { @@ -111,6 +138,9 @@ func (c *Common) Validate() error { if c.Redis == nil { return fmt.Errorf("app config: empty Redis config") } + if err := c.Redis.validate(); err != nil { + return err + } if c.Metrics == nil { return fmt.Errorf("app config: empty Metrics config") } diff --git a/pkg/config/config.yaml b/pkg/config/config.yaml index 1d7a2ee..1698d83 100644 --- a/pkg/config/config.yaml +++ b/pkg/config/config.yaml @@ -8,8 +8,19 @@ trace: enabled: false endpoint: # url to Jaeger or other open trace provider redis: + # redis master address for standalone installation address: "127.0.0.1:6379" + # list of redis addresses for HA (cluster or sentinel) setup + addresses: [] + user: password: + tls: + enabled: false + insecure: false + sentinel: + masterName: + user: + password: appDB: 0 queueDB: 1 lockDB: 2 @@ -20,4 +31,4 @@ features: tagging: true # sync object/bucket tags acl: true # sync object/bucket ACLs lifecycle: false # sync bucket Lifecycle - policy: false # sync bucket Policies \ No newline at end of file + policy: false # sync bucket Policies diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 360c6be..a8105bb 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -1,9 +1,10 @@ package config import ( + "testing" + "github.com/clyso/chorus/pkg/s3" "github.com/stretchr/testify/require" - "testing" ) func TestGet(t *testing.T) { @@ -98,3 +99,81 @@ func TestStorageConfig_RateLimitConf(t *testing.T) { r.EqualValues(conf.Storages["f1"].RateLimit, res["f1"]) r.EqualValues(conf.Storages["f2"].RateLimit, res["f2"]) } + +func TestRedis_validate(t *testing.T) { + type fields struct { + Address string + Addresses []string + Sentinel RedisSentinel + User string + Password string + UseTLS bool + MetaDB int + QueueDB int + LockDB int + ConfigDB int + } + tests := []struct { + name string + fields fields + wantErr bool + }{ + { + name: "invalid: no address set", + fields: fields{ + Address: "", + Addresses: []string{}, + }, + wantErr: true, + }, + { + name: "invalid: both addresses set", + fields: fields{ + Address: "addr", + Addresses: []string{"addr"}, + }, + wantErr: true, + }, + { + name: "valid: only address set", + fields: fields{ + Address: "addr", + Addresses: []string{}, + }, + wantErr: false, + }, + { + name: "valid: only addresses set", + fields: fields{ + Address: "", + Addresses: []string{"addr"}, + }, + wantErr: false, + }, + { + name: "valid: addresses is nil", + fields: fields{ + Address: "addr", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &Redis{ + Address: tt.fields.Address, + Addresses: tt.fields.Addresses, + Sentinel: tt.fields.Sentinel, + User: tt.fields.User, + Password: tt.fields.Password, + MetaDB: tt.fields.MetaDB, + QueueDB: tt.fields.QueueDB, + LockDB: tt.fields.LockDB, + ConfigDB: tt.fields.ConfigDB, + } + if err := r.validate(); (err != nil) != tt.wantErr { + t.Errorf("Redis.validate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/lock/service.go b/pkg/lock/service.go index d2ba6fe..5c10ed6 100644 --- a/pkg/lock/service.go +++ b/pkg/lock/service.go @@ -142,7 +142,7 @@ func MigrationCostsKey(fromStorage, toStorage string) key { } } -func New(redisClient *redis.Client) Service { +func New(redisClient redis.UniversalClient) Service { return &svc{locker: redislock.New(redisClient)} } diff --git a/pkg/meta/version_service.go b/pkg/meta/version_service.go index 78304a1..40412ea 100644 --- a/pkg/meta/version_service.go +++ b/pkg/meta/version_service.go @@ -96,12 +96,12 @@ type VersionService interface { UpdateBucketTagsIfGreater(ctx context.Context, bucket string, storage string, version int64) error } -func NewVersionService(client *redis.Client) VersionService { +func NewVersionService(client redis.UniversalClient) VersionService { return &versionSvc{client: client} } type versionSvc struct { - client *redis.Client + client redis.UniversalClient } func (s *versionSvc) GetObj(ctx context.Context, obj dom.Object) (map[string]int64, error) { diff --git a/pkg/policy/service.go b/pkg/policy/service.go index 365eb6b..0b1b69f 100644 --- a/pkg/policy/service.go +++ b/pkg/policy/service.go @@ -175,12 +175,12 @@ type Service interface { DeleteBucketReplicationsByUser(ctx context.Context, user, from string, to string) ([]string, error) } -func NewService(client *redis.Client) Service { +func NewService(client redis.UniversalClient) Service { return &policySvc{client: client} } type policySvc struct { - client *redis.Client + client redis.UniversalClient } func (s *policySvc) ObjListStarted(ctx context.Context, user, bucket, from, to string) error { diff --git a/pkg/ratelimit/semaphore.go b/pkg/ratelimit/semaphore.go index 963838f..c6dc43a 100644 --- a/pkg/ratelimit/semaphore.go +++ b/pkg/ratelimit/semaphore.go @@ -98,7 +98,7 @@ func (l *Local) TryAcquireN(ctx context.Context, n int64) (func(), error) { return release, nil } -func GlobalSemaphore(rc *redis.Client, c SemaphoreConfig, name string) *Global { +func GlobalSemaphore(rc redis.UniversalClient, c SemaphoreConfig, name string) *Global { return &Global{ c: c, rc: rc, @@ -110,7 +110,7 @@ var _ Semaphore = &Global{} type Global struct { c SemaphoreConfig - rc *redis.Client + rc redis.UniversalClient name string } diff --git a/pkg/ratelimit/service.go b/pkg/ratelimit/service.go index 4c7ff6e..b481098 100644 --- a/pkg/ratelimit/service.go +++ b/pkg/ratelimit/service.go @@ -35,7 +35,7 @@ type RPM interface { StorReqN(ctx context.Context, storage string, n int) error } -func New(rc *redis.Client, conf map[string]s3.RateLimit) *Svc { +func New(rc redis.UniversalClient, conf map[string]s3.RateLimit) *Svc { limiter := redis_rate.NewLimiter(rc) return &Svc{ diff --git a/pkg/rpc/agent_redis_request_reply.go b/pkg/rpc/agent_redis_request_reply.go index 5a055b7..2b37871 100644 --- a/pkg/rpc/agent_redis_request_reply.go +++ b/pkg/rpc/agent_redis_request_reply.go @@ -31,10 +31,10 @@ const ( ) type AgentClient struct { - client *redis.Client + client redis.UniversalClient } -func NewAgentClient(client *redis.Client) *AgentClient { +func NewAgentClient(client redis.UniversalClient) *AgentClient { return &AgentClient{client: client} } @@ -95,7 +95,7 @@ func (a *AgentClient) Ping(ctx context.Context) ([]AgentInfo, error) { } } -func AgentServe(ctx context.Context, client *redis.Client, url, fromStorage string) error { +func AgentServe(ctx context.Context, client redis.UniversalClient, url, fromStorage string) error { ps := client.Subscribe(ctx, agentPing) defer ps.Close() defer ps.Unsubscribe(context.Background(), agentPing) diff --git a/pkg/rpc/proxy_redis_request_reply.go b/pkg/rpc/proxy_redis_request_reply.go index 2de297b..598d154 100644 --- a/pkg/rpc/proxy_redis_request_reply.go +++ b/pkg/rpc/proxy_redis_request_reply.go @@ -51,10 +51,10 @@ func (f ProxyGetCredentials) GetCredentials(ctx context.Context) (*pb.GetProxyCr var _ Proxy = &ProxyClient{} type ProxyClient struct { - client *redis.Client + client redis.UniversalClient } -func NewProxyClient(client *redis.Client) Proxy { +func NewProxyClient(client redis.UniversalClient) Proxy { return &ProxyClient{client: client} } @@ -106,7 +106,7 @@ func (c *ProxyClient) GetCredentials(ctx context.Context) (*pb.GetProxyCredentia } } -func ProxyServe(ctx context.Context, client *redis.Client, proxy Proxy) error { +func ProxyServe(ctx context.Context, client redis.UniversalClient, proxy Proxy) error { ps := client.Subscribe(ctx, proxyCreds) defer ps.Close() defer ps.Unsubscribe(context.Background(), proxyCreds) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 97ac3fc..7119112 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -40,12 +40,12 @@ type Service interface { ExistsUploads(ctx context.Context, user, bucket string) (bool, error) } -func New(client *redis.Client) Service { +func New(client redis.UniversalClient) Service { return &svc{client: client} } type svc struct { - client *redis.Client + client redis.UniversalClient } func (s *svc) DelLastListedObj(ctx context.Context, task tasks.MigrateBucketListObjectsPayload) error { diff --git a/pkg/util/redis.go b/pkg/util/redis.go new file mode 100644 index 0000000..ff78d73 --- /dev/null +++ b/pkg/util/redis.go @@ -0,0 +1,68 @@ +package util + +import ( + "crypto/tls" + + "github.com/clyso/chorus/pkg/config" + "github.com/hibiken/asynq" + "github.com/redis/go-redis/v9" +) + +func NewRedis(conf *config.Redis, db int) redis.UniversalClient { + if conf.Address != "" { + conf.Addresses = append(conf.Addresses, conf.Address) + } + opt := &redis.UniversalOptions{ + Addrs: conf.Addresses, + DB: db, + Username: conf.User, + Password: conf.Password, + SentinelUsername: conf.Sentinel.User, + SentinelPassword: conf.Sentinel.Password, + MasterName: conf.Sentinel.MasterName, + } + if conf.TLS.Enabled { + opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure} + } + return redis.NewUniversalClient(opt) +} + +func NewRedisAsynq(conf *config.Redis, db int) asynq.RedisConnOpt { + if conf.Address != "" { + conf.Addresses = append(conf.Addresses, conf.Address) + } + if len(conf.Addresses) == 1 { + // Standalone + opt := asynq.RedisClientOpt{Addr: conf.Addresses[0], Username: conf.User, Password: conf.Password, DB: db} + if conf.TLS.Enabled { + opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure} + } + return &opt + } + + if conf.Sentinel.MasterName != "" { + // Sentinel + opt := asynq.RedisFailoverClientOpt{ + MasterName: conf.Sentinel.MasterName, + SentinelAddrs: conf.Addresses, + SentinelPassword: conf.Sentinel.Password, + Username: conf.User, + Password: conf.Password, + DB: db, + } + if conf.TLS.Enabled { + opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure} + } + return &opt + } + // Cluster + opt := asynq.RedisClusterClientOpt{ + Addrs: conf.Addresses, + Username: conf.User, + Password: conf.Password, + } + if conf.TLS.Enabled { + opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure} + } + return &opt +} diff --git a/service/agent/config.yaml b/service/agent/config.yaml index 564320f..40a1309 100644 --- a/service/agent/config.yaml +++ b/service/agent/config.yaml @@ -11,9 +11,20 @@ trace: enabled: false endpoint: # url to Jaeger or other open trace provider redis: - address: "127.0.0.1:6379" # Chorus redis instance used by chorus worker + # redis master address for standalone installation + address: "127.0.0.1:6379" + # list of redis addresses for HA (cluster or sentinel) setup + addresses: [] + user: password: + tls: + enabled: false + insecure: false + sentinel: + masterName: + user: + password: appDB: 0 queueDB: 1 lockDB: 2 - configDB: 3 \ No newline at end of file + configDB: 3 diff --git a/service/agent/server.go b/service/agent/server.go index 7a7d88d..6efa7c7 100644 --- a/service/agent/server.go +++ b/service/agent/server.go @@ -54,8 +54,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { } defer shutdown(context.Background()) - appRedis := redis.NewClient(&redis.Options{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.MetaDB}) - logger.Info().Interface("redis_pool", appRedis.PoolStats()).Int("redis_pool_size", appRedis.Options().PoolSize).Msg("redis app pool stats") + appRedis := util.NewRedis(conf.Redis, conf.Redis.MetaDB) defer appRedis.Close() err = appRedis.Ping(ctx).Err() if err != nil { @@ -69,8 +68,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { logger.Info().Msg("app redis connected") verSvc := meta.NewVersionService(appRedis) - confRedis := redis.NewClient(&redis.Options{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.ConfigDB}) - logger.Info().Interface("redis_pool", appRedis.PoolStats()).Int("redis_pool_size", appRedis.Options().PoolSize).Msg("redis conf pool stats") + confRedis := util.NewRedis(conf.Redis, conf.Redis.ConfigDB) defer confRedis.Close() err = redisotel.InstrumentTracing(confRedis, redisotel.WithTracerProvider(tp)) if err != nil { @@ -78,8 +76,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { } policySvc := policy.NewService(confRedis) - queueRedis := asynq.RedisClientOpt{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.QueueDB} - logger.Info().Int("redis_pool_size", queueRedis.PoolSize).Msg("redis queue pool stats") + queueRedis := util.NewRedisAsynq(conf.Redis, conf.Redis.QueueDB) taskClient := asynq.NewClient(queueRedis) defer taskClient.Close() diff --git a/service/proxy/server.go b/service/proxy/server.go index 64fc7f1..6e2a621 100644 --- a/service/proxy/server.go +++ b/service/proxy/server.go @@ -63,8 +63,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { } defer shutdown(context.Background()) - appRedis := redis.NewClient(&redis.Options{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.MetaDB}) - logger.Info().Interface("redis_pool", appRedis.PoolStats()).Int("redis_pool_size", appRedis.Options().PoolSize).Msg("redis app pool stats") + appRedis := util.NewRedis(conf.Redis, conf.Redis.MetaDB) defer appRedis.Close() err = appRedis.Ping(ctx).Err() if err != nil { @@ -81,8 +80,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { storageSvc := storage.New(appRedis) limiter := ratelimit.New(appRedis, conf.Storage.RateLimitConf()) - confRedis := redis.NewClient(&redis.Options{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.ConfigDB}) - logger.Info().Interface("redis_pool", appRedis.PoolStats()).Int("redis_pool_size", appRedis.Options().PoolSize).Msg("redis conf pool stats") + confRedis := util.NewRedis(conf.Redis, conf.Redis.ConfigDB) defer confRedis.Close() err = redisotel.InstrumentTracing(confRedis, redisotel.WithTracerProvider(tp)) if err != nil { @@ -92,8 +90,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { metricsSvc := metrics.NewS3Service(conf.Metrics.Enabled) - queueRedis := asynq.RedisClientOpt{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.QueueDB} - logger.Info().Int("redis_pool_size", queueRedis.PoolSize).Msg("redis queue pool stats") + queueRedis := util.NewRedisAsynq(conf.Redis, conf.Redis.QueueDB) taskClient := asynq.NewClient(queueRedis) defer taskClient.Close() diff --git a/service/worker/server.go b/service/worker/server.go index 01a5428..deef238 100644 --- a/service/worker/server.go +++ b/service/worker/server.go @@ -67,7 +67,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { } defer shutdown(context.Background()) - appRedis := redis.NewClient(&redis.Options{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.MetaDB}) + appRedis := util.NewRedis(conf.Redis, conf.Redis.MetaDB) defer appRedis.Close() err = appRedis.Ping(ctx).Err() if err != nil { @@ -82,7 +82,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { versionSvc := meta.NewVersionService(appRedis) storageSvc := storage.New(appRedis) - confRedis := redis.NewClient(&redis.Options{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.ConfigDB}) + confRedis := util.NewRedis(conf.Redis, conf.Redis.ConfigDB) defer confRedis.Close() err = redisotel.InstrumentTracing(confRedis, redisotel.WithTracerProvider(tp)) if err != nil { @@ -121,7 +121,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { } logger.Info().Msg("rclone connected") - queueRedis := asynq.RedisClientOpt{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.QueueDB} + queueRedis := util.NewRedisAsynq(conf.Redis, conf.Redis.QueueDB) taskClient := asynq.NewClient(queueRedis) defer taskClient.Close() @@ -131,7 +131,7 @@ func Start(ctx context.Context, app dom.AppInfo, conf *Config) error { } limiter := ratelimit.New(appRedis, conf.Storage.RateLimitConf()) - lockRedis := redis.NewClient(&redis.Options{Addr: conf.Redis.Address, Password: conf.Redis.Password, DB: conf.Redis.LockDB}) + lockRedis := util.NewRedis(conf.Redis, conf.Redis.LockDB) defer lockRedis.Close() if conf.Lock.Overlap > 0 { lock.UpdateOverlap(conf.Lock.Overlap)