Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support all redis connection options #47

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package config
import (
"embed"
"fmt"
"io"
"os"
"strings"

"github.com/clyso/chorus/pkg/features"
"github.com/clyso/chorus/pkg/log"
"github.com/clyso/chorus/pkg/metrics"
"github.com/clyso/chorus/pkg/trace"
stdlog "github.com/rs/zerolog/log"
"github.com/spf13/viper"
"io"
"os"
"strings"
)

//go:embed config.yaml
Expand All @@ -42,12 +43,38 @@ type Common struct {
}

type Redis struct {
Address string `yaml:"address"`
Password string `yaml:"password"`
MetaDB int `yaml:"metaDB"`
QueueDB int `yaml:"queueDB"`
LockDB int `yaml:"lockDB"`
ConfigDB int `yaml:"configDB"`
// Deprecated: Address is deprecated: use Addresses
Address string `yaml:"address"`
Addresses []string `yaml:"addresses"`
Sentinel RedisSentinel `yaml:"sentinel"`
User string `yaml:"user"`
Password string `yaml:"password"`
TLS TLS `yaml:"tls"`
MetaDB int `yaml:"metaDB"`
QueueDB int `yaml:"queueDB"`
LockDB int `yaml:"lockDB"`
ConfigDB int `yaml:"configDB"`
}

type RedisSentinel struct {
MasterName string `yaml:"masterName"`
Password string `yaml:"password"`
User string `yaml:"user"`
}

type TLS struct {
Enabled bool `yaml:"enabled"`
Insecure bool `yaml:"insecure"`
}

func (r *Redis) validate() error {
if r.Address != "" && len(r.Addresses) != 0 {
return fmt.Errorf("invalid redis config: either address or addresses must be used, but not both")
}
if r.Address == "" && len(r.Addresses) == 0 {
return fmt.Errorf("invalid redis config: address is not set")
}
return nil
}

func Get(conf any, sources ...Src) error {
Expand Down Expand Up @@ -111,6 +138,9 @@ func (c *Common) Validate() error {
if c.Redis == nil {
return fmt.Errorf("app config: empty Redis config")
}
if err := c.Redis.validate(); err != nil {
return err
}
if c.Metrics == nil {
return fmt.Errorf("app config: empty Metrics config")
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,19 @@ trace:
enabled: false
endpoint: # url to Jaeger or other open trace provider
redis:
# redis master address for standalone installation
address: "127.0.0.1:6379"
# list of redis addresses for HA (cluster or sentinel) setup
addresses: []
user:
password:
tls:
enabled: false
insecure: false
sentinel:
masterName:
user:
password:
appDB: 0
queueDB: 1
lockDB: 2
Expand All @@ -20,4 +31,4 @@ features:
tagging: true # sync object/bucket tags
acl: true # sync object/bucket ACLs
lifecycle: false # sync bucket Lifecycle
policy: false # sync bucket Policies
policy: false # sync bucket Policies
81 changes: 80 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package config

import (
"testing"

"github.com/clyso/chorus/pkg/s3"
"github.com/stretchr/testify/require"
"testing"
)

func TestGet(t *testing.T) {
Expand Down Expand Up @@ -98,3 +99,81 @@ func TestStorageConfig_RateLimitConf(t *testing.T) {
r.EqualValues(conf.Storages["f1"].RateLimit, res["f1"])
r.EqualValues(conf.Storages["f2"].RateLimit, res["f2"])
}

func TestRedis_validate(t *testing.T) {
type fields struct {
Address string
Addresses []string
Sentinel RedisSentinel
User string
Password string
UseTLS bool
MetaDB int
QueueDB int
LockDB int
ConfigDB int
}
tests := []struct {
name string
fields fields
wantErr bool
aiivashchenko marked this conversation as resolved.
Show resolved Hide resolved
}{
{
name: "invalid: no address set",
fields: fields{
Address: "",
Addresses: []string{},
},
wantErr: true,
},
{
name: "invalid: both addresses set",
fields: fields{
Address: "addr",
Addresses: []string{"addr"},
},
wantErr: true,
},
{
name: "valid: only address set",
fields: fields{
Address: "addr",
Addresses: []string{},
},
wantErr: false,
},
{
name: "valid: only addresses set",
fields: fields{
Address: "",
Addresses: []string{"addr"},
},
wantErr: false,
},
arttor marked this conversation as resolved.
Show resolved Hide resolved
{
name: "valid: addresses is nil",
fields: fields{
Address: "addr",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &Redis{
Address: tt.fields.Address,
Addresses: tt.fields.Addresses,
Sentinel: tt.fields.Sentinel,
User: tt.fields.User,
Password: tt.fields.Password,
MetaDB: tt.fields.MetaDB,
QueueDB: tt.fields.QueueDB,
LockDB: tt.fields.LockDB,
ConfigDB: tt.fields.ConfigDB,
}
if err := r.validate(); (err != nil) != tt.wantErr {
t.Errorf("Redis.validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/lock/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func MigrationCostsKey(fromStorage, toStorage string) key {
}
}

func New(redisClient *redis.Client) Service {
func New(redisClient redis.UniversalClient) Service {
return &svc{locker: redislock.New(redisClient)}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/meta/version_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ type VersionService interface {
UpdateBucketTagsIfGreater(ctx context.Context, bucket string, storage string, version int64) error
}

func NewVersionService(client *redis.Client) VersionService {
func NewVersionService(client redis.UniversalClient) VersionService {
return &versionSvc{client: client}
}

type versionSvc struct {
client *redis.Client
client redis.UniversalClient
}

func (s *versionSvc) GetObj(ctx context.Context, obj dom.Object) (map[string]int64, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/policy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ type Service interface {
DeleteBucketReplicationsByUser(ctx context.Context, user, from string, to string) ([]string, error)
}

func NewService(client *redis.Client) Service {
func NewService(client redis.UniversalClient) Service {
return &policySvc{client: client}
}

type policySvc struct {
client *redis.Client
client redis.UniversalClient
}

func (s *policySvc) ObjListStarted(ctx context.Context, user, bucket, from, to string) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ratelimit/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (l *Local) TryAcquireN(ctx context.Context, n int64) (func(), error) {
return release, nil
}

func GlobalSemaphore(rc *redis.Client, c SemaphoreConfig, name string) *Global {
func GlobalSemaphore(rc redis.UniversalClient, c SemaphoreConfig, name string) *Global {
return &Global{
c: c,
rc: rc,
Expand All @@ -110,7 +110,7 @@ var _ Semaphore = &Global{}

type Global struct {
c SemaphoreConfig
rc *redis.Client
rc redis.UniversalClient
name string
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ratelimit/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type RPM interface {
StorReqN(ctx context.Context, storage string, n int) error
}

func New(rc *redis.Client, conf map[string]s3.RateLimit) *Svc {
func New(rc redis.UniversalClient, conf map[string]s3.RateLimit) *Svc {
limiter := redis_rate.NewLimiter(rc)

return &Svc{
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/agent_redis_request_reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const (
)

type AgentClient struct {
client *redis.Client
client redis.UniversalClient
}

func NewAgentClient(client *redis.Client) *AgentClient {
func NewAgentClient(client redis.UniversalClient) *AgentClient {
return &AgentClient{client: client}
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func (a *AgentClient) Ping(ctx context.Context) ([]AgentInfo, error) {
}
}

func AgentServe(ctx context.Context, client *redis.Client, url, fromStorage string) error {
func AgentServe(ctx context.Context, client redis.UniversalClient, url, fromStorage string) error {
ps := client.Subscribe(ctx, agentPing)
defer ps.Close()
defer ps.Unsubscribe(context.Background(), agentPing)
Expand Down
6 changes: 3 additions & 3 deletions pkg/rpc/proxy_redis_request_reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func (f ProxyGetCredentials) GetCredentials(ctx context.Context) (*pb.GetProxyCr
var _ Proxy = &ProxyClient{}

type ProxyClient struct {
client *redis.Client
client redis.UniversalClient
}

func NewProxyClient(client *redis.Client) Proxy {
func NewProxyClient(client redis.UniversalClient) Proxy {
return &ProxyClient{client: client}
}

Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *ProxyClient) GetCredentials(ctx context.Context) (*pb.GetProxyCredentia
}
}

func ProxyServe(ctx context.Context, client *redis.Client, proxy Proxy) error {
func ProxyServe(ctx context.Context, client redis.UniversalClient, proxy Proxy) error {
ps := client.Subscribe(ctx, proxyCreds)
defer ps.Close()
defer ps.Unsubscribe(context.Background(), proxyCreds)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type Service interface {
ExistsUploads(ctx context.Context, user, bucket string) (bool, error)
}

func New(client *redis.Client) Service {
func New(client redis.UniversalClient) Service {
return &svc{client: client}
}

type svc struct {
client *redis.Client
client redis.UniversalClient
}

func (s *svc) DelLastListedObj(ctx context.Context, task tasks.MigrateBucketListObjectsPayload) error {
Expand Down
68 changes: 68 additions & 0 deletions pkg/util/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package util

import (
"crypto/tls"

"github.com/clyso/chorus/pkg/config"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)

func NewRedis(conf *config.Redis, db int) redis.UniversalClient {
if conf.Address != "" {
conf.Addresses = append(conf.Addresses, conf.Address)
}
opt := &redis.UniversalOptions{
Addrs: conf.Addresses,
DB: db,
Username: conf.User,
Password: conf.Password,
SentinelUsername: conf.Sentinel.User,
SentinelPassword: conf.Sentinel.Password,
MasterName: conf.Sentinel.MasterName,
}
if conf.TLS.Enabled {
opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure}
}
return redis.NewUniversalClient(opt)
}

func NewRedisAsynq(conf *config.Redis, db int) asynq.RedisConnOpt {
if conf.Address != "" {
conf.Addresses = append(conf.Addresses, conf.Address)
}
if len(conf.Addresses) == 1 {
// Standalone
opt := asynq.RedisClientOpt{Addr: conf.Addresses[0], Username: conf.User, Password: conf.Password, DB: db}
if conf.TLS.Enabled {
opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure}
}
return &opt
}

if conf.Sentinel.MasterName != "" {
// Sentinel
opt := asynq.RedisFailoverClientOpt{
MasterName: conf.Sentinel.MasterName,
SentinelAddrs: conf.Addresses,
SentinelPassword: conf.Sentinel.Password,
Username: conf.User,
Password: conf.Password,
DB: db,
}
if conf.TLS.Enabled {
opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure}
}
return &opt
}
// Cluster
opt := asynq.RedisClusterClientOpt{
Addrs: conf.Addresses,
Username: conf.User,
Password: conf.Password,
}
if conf.TLS.Enabled {
opt.TLSConfig = &tls.Config{InsecureSkipVerify: conf.TLS.Insecure}
}
return &opt
}
Loading
Loading