diff --git a/README.md b/README.md index 1633cbc..b1b0270 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,9 @@ +[![CI](https://github.com/clyso/chorus/actions/workflows/ci-go.yml/badge.svg)](https://github.com/clyso/chorus/actions/workflows/ci-go.yml) +![GitHub go.mod Go version](https://img.shields.io/github/go-mod/go-version/clyso/chorus) +![License](https://img.shields.io/github/license/clyso/chorus) +[![Go Report Card](https://goreportcard.com/badge/github.com/clyso/chorus)](https://goreportcard.com/report/github.com/clyso/chorus) +[![GoDoc](https://godoc.org/github.com/clyso/chorus?status.svg)](https://pkg.go.dev/github.com/clyso/chorus?tab=doc) + # Chorus ![chorus.png](./docs/media/banner.png) diff --git a/pkg/policy/service.go b/pkg/policy/service.go index 7bceac9..365eb6b 100644 --- a/pkg/policy/service.go +++ b/pkg/policy/service.go @@ -20,14 +20,15 @@ import ( "context" "errors" "fmt" + "strconv" + "strings" + "time" + "github.com/clyso/chorus/pkg/dom" "github.com/clyso/chorus/pkg/tasks" "github.com/redis/go-redis/v9" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" - "strconv" - "strings" - "time" ) var ( @@ -184,16 +185,16 @@ type policySvc struct { func (s *policySvc) ObjListStarted(ctx context.Context, user, bucket, from, to string) error { if user == "" { - return fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg) } if bucket == "" { - return fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg) } if from == "" { - return fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg) } if to == "" { - return fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg) } key := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to) return s.hSetKeyExists(ctx, key, "listing_started", true) @@ -213,7 +214,7 @@ func (s *policySvc) GetRoutingPolicy(ctx context.Context, user, bucket string) ( func (s *policySvc) GetUserRoutingPolicy(ctx context.Context, user string) (string, error) { if user == "" { - return "", fmt.Errorf("%w: user is requred to get routing policy", dom.ErrInvalidArg) + return "", fmt.Errorf("%w: user is required to get routing policy", dom.ErrInvalidArg) } key := fmt.Sprintf("p:route:%s", user) toStor, err := s.client.Get(ctx, key).Result() @@ -228,10 +229,10 @@ func (s *policySvc) GetUserRoutingPolicy(ctx context.Context, user string) (stri func (s *policySvc) getBucketRoutingPolicy(ctx context.Context, user, bucket string) (string, error) { if user == "" { - return "", fmt.Errorf("%w: user is requred to get routing policy", dom.ErrInvalidArg) + return "", fmt.Errorf("%w: user is required to get routing policy", dom.ErrInvalidArg) } if bucket == "" { - return "", fmt.Errorf("%w: bucket is requred to get routing policy", dom.ErrInvalidArg) + return "", fmt.Errorf("%w: bucket is required to get routing policy", dom.ErrInvalidArg) } key := fmt.Sprintf("p:route:%s:%s", user, bucket) toStor, err := s.client.Get(ctx, key).Result() @@ -246,10 +247,10 @@ func (s *policySvc) getBucketRoutingPolicy(ctx context.Context, user, bucket str func (s *policySvc) AddUserRoutingPolicy(ctx context.Context, user, toStorage string) error { if user == "" { - return fmt.Errorf("%w: user is requred to add user routing policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to add user routing policy", dom.ErrInvalidArg) } if toStorage == "" { - return fmt.Errorf("%w: toStorage is requred to add user routing policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: toStorage is required to add user routing policy", dom.ErrInvalidArg) } key := fmt.Sprintf("p:route:%s", user) set, err := s.client.SetNX(ctx, key, toStorage, 0).Result() @@ -264,13 +265,13 @@ func (s *policySvc) AddUserRoutingPolicy(ctx context.Context, user, toStorage st func (s *policySvc) addBucketRoutingPolicy(ctx context.Context, user, bucket, toStorage string) error { if user == "" { - return fmt.Errorf("%w: user is requred to add bucket routing policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to add bucket routing policy", dom.ErrInvalidArg) } if bucket == "" { - return fmt.Errorf("%w: bucket is requred to add bucket routing policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: bucket is required to add bucket routing policy", dom.ErrInvalidArg) } if toStorage == "" { - return fmt.Errorf("%w: toStorage is requred to add bucket routing policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: toStorage is required to add bucket routing policy", dom.ErrInvalidArg) } key := fmt.Sprintf("p:route:%s:%s", user, bucket) set, err := s.client.SetNX(ctx, key, toStorage, 0).Result() @@ -285,10 +286,10 @@ func (s *policySvc) addBucketRoutingPolicy(ctx context.Context, user, bucket, to func (s *policySvc) GetReplicationSwitch(ctx context.Context, user, bucket string) (ReplicationSwitch, error) { if user == "" { - return ReplicationSwitch{}, fmt.Errorf("%w: user is requred to get replication Switch", dom.ErrInvalidArg) + return ReplicationSwitch{}, fmt.Errorf("%w: user is required to get replication Switch", dom.ErrInvalidArg) } if bucket == "" { - return ReplicationSwitch{}, fmt.Errorf("%w: bucket is requred to get replication Switch", dom.ErrInvalidArg) + return ReplicationSwitch{}, fmt.Errorf("%w: bucket is required to get replication Switch", dom.ErrInvalidArg) } key := fmt.Sprintf("p:switch:%s:%s", user, bucket) exists, err := s.client.Exists(ctx, key).Result() @@ -305,10 +306,10 @@ func (s *policySvc) GetReplicationSwitch(ctx context.Context, user, bucket strin func (s *policySvc) IsReplicationSwitchInProgress(ctx context.Context, user, bucket string) (bool, error) { if user == "" { - return false, fmt.Errorf("%w: user is requred to get replication policy", dom.ErrInvalidArg) + return false, fmt.Errorf("%w: user is required to get replication policy", dom.ErrInvalidArg) } if bucket == "" { - return false, fmt.Errorf("%w: bucket is requred to get replication policy", dom.ErrInvalidArg) + return false, fmt.Errorf("%w: bucket is required to get replication policy", dom.ErrInvalidArg) } key := fmt.Sprintf("p:switch:%s:%s", user, bucket) isDoneStr, err := s.client.HGet(ctx, key, "IsDone").Result() @@ -324,10 +325,10 @@ func (s *policySvc) IsReplicationSwitchInProgress(ctx context.Context, user, buc func (s *policySvc) GetBucketReplicationPolicies(ctx context.Context, user, bucket string) (ReplicationPolicies, error) { if user == "" { - return ReplicationPolicies{}, fmt.Errorf("%w: user is requred to get replication policy", dom.ErrInvalidArg) + return ReplicationPolicies{}, fmt.Errorf("%w: user is required to get replication policy", dom.ErrInvalidArg) } if bucket == "" { - return ReplicationPolicies{}, fmt.Errorf("%w: bucket is requred to get replication policy", dom.ErrInvalidArg) + return ReplicationPolicies{}, fmt.Errorf("%w: bucket is required to get replication policy", dom.ErrInvalidArg) } key := fmt.Sprintf("p:repl:%s:%s", user, bucket) return s.getReplicationPolicies(ctx, key) @@ -379,16 +380,16 @@ func (s *policySvc) getReplicationPolicies(ctx context.Context, key string) (Rep func (s *policySvc) GetReplicationPolicyInfo(ctx context.Context, user, bucket, from, to string) (ReplicationPolicyStatus, error) { if user == "" { - return ReplicationPolicyStatus{}, fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg) + return ReplicationPolicyStatus{}, fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg) } if bucket == "" { - return ReplicationPolicyStatus{}, fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg) + return ReplicationPolicyStatus{}, fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg) } if from == "" { - return ReplicationPolicyStatus{}, fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg) + return ReplicationPolicyStatus{}, fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg) } if to == "" { - return ReplicationPolicyStatus{}, fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg) + return ReplicationPolicyStatus{}, fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg) } fKey := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to) @@ -434,7 +435,6 @@ func (s *policySvc) GetReplicationPolicyInfo(ctx context.Context, user, bucket, } func (s *policySvc) ListReplicationPolicyInfo(ctx context.Context) ([]ReplicationPolicyStatusExtended, error) { - iter := s.client.Scan(ctx, 0, "p:repl_st:*", 0).Iterator() resCh := make(chan ReplicationPolicyStatusExtended) @@ -500,16 +500,16 @@ func (s *policySvc) IsReplicationPolicyExists(ctx context.Context, user, bucket, func (s *policySvc) IsReplicationPolicyPaused(ctx context.Context, user, bucket, from, to string) (bool, error) { if user == "" { - return false, fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg) + return false, fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg) } if bucket == "" { - return false, fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg) + return false, fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg) } if from == "" { - return false, fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg) + return false, fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg) } if to == "" { - return false, fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg) + return false, fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg) } fKey := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to) @@ -525,16 +525,16 @@ func (s *policySvc) IsReplicationPolicyPaused(ctx context.Context, user, bucket, func (s *policySvc) IncReplInitObjListed(ctx context.Context, user, bucket, from, to string, bytes int64, eventTime time.Time) error { if user == "" { - return fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg) } if bucket == "" { - return fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg) } if from == "" { - return fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg) } if to == "" { - return fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg) } if bytes < 0 { return fmt.Errorf("%w: bytes must be positive", dom.ErrInvalidArg) @@ -557,16 +557,16 @@ func (s *policySvc) IncReplInitObjListed(ctx context.Context, user, bucket, from func (s *policySvc) IncReplInitObjDone(ctx context.Context, user, bucket, from, to string, bytes int64, eventTime time.Time) error { if user == "" { - return fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg) } if bucket == "" { - return fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg) } if from == "" { - return fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg) } if to == "" { - return fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg) } if bytes < 0 { return fmt.Errorf("%w: bytes must be positive", dom.ErrInvalidArg) @@ -621,16 +621,16 @@ func (s *policySvc) IncReplInitObjDone(ctx context.Context, user, bucket, from, func (s *policySvc) IncReplEvents(ctx context.Context, user, bucket, from, to string, eventTime time.Time) error { if user == "" { - return fmt.Errorf("%w: user is requred to inc replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to inc replication policy status", dom.ErrInvalidArg) } if bucket == "" { - return fmt.Errorf("%w: bucket is requred to inc replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: bucket is required to inc replication policy status", dom.ErrInvalidArg) } if from == "" { - return fmt.Errorf("%w: from is requred to inc replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: from is required to inc replication policy status", dom.ErrInvalidArg) } if to == "" { - return fmt.Errorf("%w: to is requred to inc replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: to is required to inc replication policy status", dom.ErrInvalidArg) } key := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to) err := s.incIfKeyExists(ctx, key, "events", 1) @@ -647,16 +647,16 @@ func (s *policySvc) IncReplEvents(ctx context.Context, user, bucket, from, to st func (s *policySvc) IncReplEventsDone(ctx context.Context, user, bucket, from, to string, eventTime time.Time) error { if user == "" { - return fmt.Errorf("%w: user is requred to inc replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to inc replication policy status", dom.ErrInvalidArg) } if bucket == "" { - return fmt.Errorf("%w: bucket is requred to inc replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: bucket is required to inc replication policy status", dom.ErrInvalidArg) } if from == "" { - return fmt.Errorf("%w: from is requred to inc replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: from is required to inc replication policy status", dom.ErrInvalidArg) } if to == "" { - return fmt.Errorf("%w: to is requred to inc replication policy status", dom.ErrInvalidArg) + return fmt.Errorf("%w: to is required to inc replication policy status", dom.ErrInvalidArg) } key := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to) err := s.incIfKeyExists(ctx, key, "events_done", 1) @@ -715,7 +715,7 @@ func (s *policySvc) hSetKeyExists(ctx context.Context, key, field string, val in func (s *policySvc) GetUserReplicationPolicies(ctx context.Context, user string) (ReplicationPolicies, error) { if user == "" { - return ReplicationPolicies{}, fmt.Errorf("%w: user is requred to get replication policy", dom.ErrInvalidArg) + return ReplicationPolicies{}, fmt.Errorf("%w: user is required to get replication policy", dom.ErrInvalidArg) } key := fmt.Sprintf("p:repl:%s", user) return s.getReplicationPolicies(ctx, key) @@ -723,13 +723,13 @@ func (s *policySvc) GetUserReplicationPolicies(ctx context.Context, user string) func (s *policySvc) AddUserReplicationPolicy(ctx context.Context, user string, from string, to string, priority tasks.Priority) error { if user == "" { - return fmt.Errorf("%w: user is requred to add replication policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to add replication policy", dom.ErrInvalidArg) } if from == "" { - return fmt.Errorf("%w: from is requred to add replication policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: from is required to add replication policy", dom.ErrInvalidArg) } if to == "" { - return fmt.Errorf("%w: to is requred to add replication policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: to is required to add replication policy", dom.ErrInvalidArg) } if from == to { return fmt.Errorf("%w: invalid replication policy: from and to should be different", dom.ErrInvalidArg) @@ -817,16 +817,16 @@ func (s *policySvc) DeleteBucketReplicationsByUser(ctx context.Context, user, fr func (s *policySvc) AddBucketReplicationPolicy(ctx context.Context, user, bucket, from string, to string, priority tasks.Priority, agentURL *string) (err error) { if user == "" { - return fmt.Errorf("%w: user is requred to add replication policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to add replication policy", dom.ErrInvalidArg) } if bucket == "" { - return fmt.Errorf("%w: bucket is requred to add replication policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: bucket is required to add replication policy", dom.ErrInvalidArg) } if from == "" { - return fmt.Errorf("%w: from is requred to add replication policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: from is required to add replication policy", dom.ErrInvalidArg) } if to == "" { - return fmt.Errorf("%w: to is requred to add replication policy", dom.ErrInvalidArg) + return fmt.Errorf("%w: to is required to add replication policy", dom.ErrInvalidArg) } if from == to { return fmt.Errorf("%w: invalid replication policy: from and to should be different", dom.ErrInvalidArg) @@ -941,7 +941,7 @@ func (s *policySvc) DoReplicationSwitch(ctx context.Context, user, bucket, newMa if _, ok := replPolicies.To[newMain]; !ok { return fmt.Errorf("%w: no previous replication policy to switch", dom.ErrInvalidArg) } - for prevFollower, _ := range replPolicies.To { + for prevFollower := range replPolicies.To { prevReplication, err := s.GetReplicationPolicyInfo(ctx, user, bucket, prevMain, prevFollower) if err != nil { return err @@ -960,7 +960,7 @@ func (s *policySvc) DoReplicationSwitch(ctx context.Context, user, bucket, newMa } } oldFollowers := replPolicies.To - const multipartTTL = time.Hour //todo: move to config or api param + const multipartTTL = time.Hour // todo: move to config or api param _, err = s.client.Pipelined(ctx, func(pipe redis.Pipeliner) error { // adjust route policy routeKey := fmt.Sprintf("p:route:%s:%s", user, bucket) @@ -1005,10 +1005,10 @@ func (s *policySvc) DoReplicationSwitch(ctx context.Context, user, bucket, newMa func (s *policySvc) ReplicationSwitchDone(ctx context.Context, user, bucket string) error { if user == "" { - return fmt.Errorf("%w: user is requred to get replication Switch", dom.ErrInvalidArg) + return fmt.Errorf("%w: user is required to get replication Switch", dom.ErrInvalidArg) } if bucket == "" { - return fmt.Errorf("%w: bucket is requred to get replication Switch", dom.ErrInvalidArg) + return fmt.Errorf("%w: bucket is required to get replication Switch", dom.ErrInvalidArg) } key := fmt.Sprintf("p:switch:%s:%s", user, bucket) err := s.hSetKeyExists(ctx, key, "IsDone", true) diff --git a/test/bucket_test.go b/test/bucket_test.go index 8c5d85f..67a450a 100644 --- a/test/bucket_test.go +++ b/test/bucket_test.go @@ -331,7 +331,7 @@ func TestApi_Bucket_List(t *testing.T) { r.Eventually(func() bool { objCh = mainClient.ListObjects(tstCtx, b2, mclient.ListObjectsOptions{WithMetadata: true}) objNum = 0 - for _ = range objCh { + for range objCh { objNum++ } if objNum != 3 { @@ -340,7 +340,7 @@ func TestApi_Bucket_List(t *testing.T) { objCh = f1Client.ListObjects(tstCtx, b2, mclient.ListObjectsOptions{WithMetadata: true}) objNum = 0 - for _ = range objCh { + for range objCh { objNum++ } if objNum != 3 { @@ -349,7 +349,7 @@ func TestApi_Bucket_List(t *testing.T) { objCh = f2Client.ListObjects(tstCtx, b2, mclient.ListObjectsOptions{WithMetadata: true}) objNum = 0 - for _ = range objCh { + for range objCh { objNum++ } if objNum != 3 { @@ -382,7 +382,7 @@ func TestApi_Bucket_List(t *testing.T) { r.Eventually(func() bool { objCh = mainClient.ListObjects(tstCtx, b2, mclient.ListObjectsOptions{WithMetadata: true}) objNum = 0 - for _ = range objCh { + for range objCh { objNum++ } if objNum != 0 { @@ -391,7 +391,7 @@ func TestApi_Bucket_List(t *testing.T) { objCh = f1Client.ListObjects(tstCtx, b2, mclient.ListObjectsOptions{WithMetadata: true}) objNum = 0 - for _ = range objCh { + for range objCh { objNum++ } if objNum != 0 { @@ -400,7 +400,7 @@ func TestApi_Bucket_List(t *testing.T) { objCh = f2Client.ListObjects(tstCtx, b2, mclient.ListObjectsOptions{WithMetadata: true}) objNum = 0 - for _ = range objCh { + for range objCh { objNum++ } if objNum != 0 {