Skip to content

Commit

Permalink
support all redis connection options
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Torubarov <[email protected]>
  • Loading branch information
arttor committed Nov 15, 2024
1 parent bdf1fa2 commit adb6e9e
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 45 deletions.
48 changes: 39 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package config
import (
"embed"
"fmt"
"io"
"os"
"strings"

"github.com/clyso/chorus/pkg/features"
"github.com/clyso/chorus/pkg/log"
"github.com/clyso/chorus/pkg/metrics"
"github.com/clyso/chorus/pkg/trace"
stdlog "github.com/rs/zerolog/log"
"github.com/spf13/viper"
"io"
"os"
"strings"
)

//go:embed config.yaml
Expand All @@ -42,12 +43,38 @@ type Common struct {
}

type Redis struct {
Address string `yaml:"address"`
Password string `yaml:"password"`
MetaDB int `yaml:"metaDB"`
QueueDB int `yaml:"queueDB"`
LockDB int `yaml:"lockDB"`
ConfigDB int `yaml:"configDB"`
// Deprecated: Address is deprecated: use Addresses
Address string `yaml:"address"`
Addresses []string `yaml:"addresses"`
Sentinel RedisSentinel `yaml:"sentinel"`
User string `yaml:"user"`
Password string `yaml:"password"`
TLS TLS `yaml:"tls"`
MetaDB int `yaml:"metaDB"`
QueueDB int `yaml:"queueDB"`
LockDB int `yaml:"lockDB"`
ConfigDB int `yaml:"configDB"`
}

type RedisSentinel struct {
MasterName string `yaml:"masterName"`
Password string `yaml:"password"`
User string `yaml:"user"`
}

type TLS struct {
Enabled bool `yaml:"enabled"`
Insecure bool `yaml:"insecure"`
}

func (r *Redis) validate() error {
if r.Address != "" && len(r.Addresses) != 0 {
return fmt.Errorf("invalid redis config: either address or addresses must be used, but not both")
}
if r.Address == "" && len(r.Addresses) == 0 {
return fmt.Errorf("invalid redis config: address is not set")
}
return nil
}

func Get(conf any, sources ...Src) error {
Expand Down Expand Up @@ -111,6 +138,9 @@ func (c *Common) Validate() error {
if c.Redis == nil {
return fmt.Errorf("app config: empty Redis config")
}
if err := c.Redis.validate(); err != nil {
return err
}
if c.Metrics == nil {
return fmt.Errorf("app config: empty Metrics config")
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,19 @@ trace:
enabled: false
endpoint: # url to Jaeger or other open trace provider
redis:
# redis master address for standalone installation
address: "127.0.0.1:6379"
# list of redis addresses for HA (cluster or sentinel) setup
addresses: []
user:
password:
tls:
enabled: false
insecure: false
sentinel:
masterName:
user:
password:
appDB: 0
queueDB: 1
lockDB: 2
Expand All @@ -20,4 +31,4 @@ features:
tagging: true # sync object/bucket tags
acl: true # sync object/bucket ACLs
lifecycle: false # sync bucket Lifecycle
policy: false # sync bucket Policies
policy: false # sync bucket Policies
81 changes: 80 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package config

import (
"testing"

"github.com/clyso/chorus/pkg/s3"
"github.com/stretchr/testify/require"
"testing"
)

func TestGet(t *testing.T) {
Expand Down Expand Up @@ -98,3 +99,81 @@ func TestStorageConfig_RateLimitConf(t *testing.T) {
r.EqualValues(conf.Storages["f1"].RateLimit, res["f1"])
r.EqualValues(conf.Storages["f2"].RateLimit, res["f2"])
}

func TestRedis_validate(t *testing.T) {
type fields struct {
Address string
Addresses []string
Sentinel RedisSentinel
User string
Password string
UseTLS bool
MetaDB int
QueueDB int
LockDB int
ConfigDB int
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "invalid: no address set",
fields: fields{
Address: "",
Addresses: []string{},
},
wantErr: true,
},
{
name: "invalid: both addresses set",
fields: fields{
Address: "addr",
Addresses: []string{"addr"},
},
wantErr: true,
},
{
name: "valid: only address set",
fields: fields{
Address: "addr",
Addresses: []string{},
},
wantErr: false,
},
{
name: "valid: only addresses set",
fields: fields{
Address: "",
Addresses: []string{"addr"},
},
wantErr: false,
},
{
name: "valid: addresses is nil",
fields: fields{
Address: "addr",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &Redis{
Address: tt.fields.Address,
Addresses: tt.fields.Addresses,
Sentinel: tt.fields.Sentinel,
User: tt.fields.User,
Password: tt.fields.Password,
MetaDB: tt.fields.MetaDB,
QueueDB: tt.fields.QueueDB,
LockDB: tt.fields.LockDB,
ConfigDB: tt.fields.ConfigDB,
}
if err := r.validate(); (err != nil) != tt.wantErr {
t.Errorf("Redis.validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/lock/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func MigrationCostsKey(fromStorage, toStorage string) key {
}
}

func New(redisClient *redis.Client) Service {
func New(redisClient redis.UniversalClient) Service {
return &svc{locker: redislock.New(redisClient)}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/version_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ type VersionService interface {
UpdateBucketTagsIfGreater(ctx context.Context, bucket string, storage string, version int64) error
}

func NewVersionService(client *redis.Client) VersionService {
func NewVersionService(client redis.UniversalClient) VersionService {
return &versionSvc{client: client}
}

type versionSvc struct {
client *redis.Client
client redis.UniversalClient
}

func (s *versionSvc) GetObj(ctx context.Context, obj dom.Object) (map[string]int64, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/policy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ type Service interface {
DeleteBucketReplicationsByUser(ctx context.Context, user, from string, to string) ([]string, error)
}

func NewService(client *redis.Client) Service {
func NewService(client redis.UniversalClient) Service {
return &policySvc{client: client}
}

type policySvc struct {
client *redis.Client
client redis.UniversalClient
}

func (s *policySvc) ObjListStarted(ctx context.Context, user, bucket, from, to string) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ratelimit/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (l *Local) TryAcquireN(ctx context.Context, n int64) (func(), error) {
return release, nil
}

func GlobalSemaphore(rc *redis.Client, c SemaphoreConfig, name string) *Global {
func GlobalSemaphore(rc redis.UniversalClient, c SemaphoreConfig, name string) *Global {
return &Global{
c: c,
rc: rc,
Expand All @@ -110,7 +110,7 @@ var _ Semaphore = &Global{}

type Global struct {
c SemaphoreConfig
rc *redis.Client
rc redis.UniversalClient
name string
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ratelimit/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type RPM interface {
StorReqN(ctx context.Context, storage string, n int) error
}

func New(rc *redis.Client, conf map[string]s3.RateLimit) *Svc {
func New(rc redis.UniversalClient, conf map[string]s3.RateLimit) *Svc {
limiter := redis_rate.NewLimiter(rc)

return &Svc{
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/agent_redis_request_reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const (
)

type AgentClient struct {
client *redis.Client
client redis.UniversalClient
}

func NewAgentClient(client *redis.Client) *AgentClient {
func NewAgentClient(client redis.UniversalClient) *AgentClient {
return &AgentClient{client: client}
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func (a *AgentClient) Ping(ctx context.Context) ([]AgentInfo, error) {
}
}

func AgentServe(ctx context.Context, client *redis.Client, url, fromStorage string) error {
func AgentServe(ctx context.Context, client redis.UniversalClient, url, fromStorage string) error {
ps := client.Subscribe(ctx, agentPing)
defer ps.Close()
defer ps.Unsubscribe(context.Background(), agentPing)
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/proxy_redis_request_reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func (f ProxyGetCredentials) GetCredentials(ctx context.Context) (*pb.GetProxyCr
var _ Proxy = &ProxyClient{}

type ProxyClient struct {
client *redis.Client
client redis.UniversalClient
}

func NewProxyClient(client *redis.Client) Proxy {
func NewProxyClient(client redis.UniversalClient) Proxy {
return &ProxyClient{client: client}
}

Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *ProxyClient) GetCredentials(ctx context.Context) (*pb.GetProxyCredentia
}
}

func ProxyServe(ctx context.Context, client *redis.Client, proxy Proxy) error {
func ProxyServe(ctx context.Context, client redis.UniversalClient, proxy Proxy) error {
ps := client.Subscribe(ctx, proxyCreds)
defer ps.Close()
defer ps.Unsubscribe(context.Background(), proxyCreds)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type Service interface {
ExistsUploads(ctx context.Context, user, bucket string) (bool, error)
}

func New(client *redis.Client) Service {
func New(client redis.UniversalClient) Service {
return &svc{client: client}
}

type svc struct {
client *redis.Client
client redis.UniversalClient
}

func (s *svc) DelLastListedObj(ctx context.Context, task tasks.MigrateBucketListObjectsPayload) error {
Expand Down
68 changes: 68 additions & 0 deletions pkg/util/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package util

import (
"crypto/tls"

"github.com/clyso/chorus/pkg/config"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)

func NewRedis(conf *config.Redis, db int) redis.UniversalClient {
if conf.Address != "" {
conf.Addresses = append(conf.Addresses, conf.Address)
}
opt := &redis.UniversalOptions{
Addrs: conf.Addresses,
DB: db,
Username: conf.User,
Password: conf.Password,
SentinelUsername: conf.Sentinel.User,
SentinelPassword: conf.Sentinel.Password,
MasterName: conf.Sentinel.MasterName,
}
if conf.TLS.Enabled {
opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure}
}
return redis.NewUniversalClient(opt)
}

func NewRedisAsynq(conf *config.Redis, db int) asynq.RedisConnOpt {
if conf.Address != "" {
conf.Addresses = append(conf.Addresses, conf.Address)
}
if len(conf.Addresses) == 1 {
// Standalone
opt := asynq.RedisClientOpt{Addr: conf.Addresses[0], Username: conf.User, Password: conf.Password, DB: db}
if conf.TLS.Enabled {
opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure}
}
return &opt
}

if conf.Sentinel.MasterName != "" {
// Sentinel
opt := asynq.RedisFailoverClientOpt{
MasterName: conf.Sentinel.MasterName,
SentinelAddrs: conf.Addresses,
SentinelPassword: conf.Sentinel.Password,
Username: conf.User,
Password: conf.Password,
DB: db,
}
if conf.TLS.Enabled {
opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure}
}
return &opt
}
// Cluster
opt := asynq.RedisClusterClientOpt{
Addrs: conf.Addresses,
Username: conf.User,
Password: conf.Password,
}
if conf.TLS.Enabled {
opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure}
}
return &opt
}
Loading

0 comments on commit adb6e9e

Please sign in to comment.