Skip to content

Commit

Permalink
add go report
Browse files Browse the repository at this point in the history
  • Loading branch information
arttor committed May 17, 2024
1 parent e48efc3 commit 5de068f
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 64 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
[![CI](https://github.com/clyso/chorus/actions/workflows/ci-go.yml/badge.svg)](https://github.com/clyso/chorus/actions/workflows/ci-go.yml)
![GitHub go.mod Go version](https://img.shields.io/github/go-mod/go-version/clyso/chorus)
![License](https://img.shields.io/github/license/clyso/chorus)
[![Go Report Card](https://goreportcard.com/badge/github.com/clyso/chorus)](https://goreportcard.com/report/github.com/clyso/chorus)
[![GoDoc](https://godoc.org/github.com/clyso/chorus?status.svg)](https://pkg.go.dev/github.com/clyso/chorus?tab=doc)

# Chorus
![chorus.png](./docs/media/banner.png)

Expand Down
116 changes: 58 additions & 58 deletions pkg/policy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/clyso/chorus/pkg/dom"
"github.com/clyso/chorus/pkg/tasks"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"strconv"
"strings"
"time"
)

var (
Expand Down Expand Up @@ -184,16 +185,16 @@ type policySvc struct {

func (s *policySvc) ObjListStarted(ctx context.Context, user, bucket, from, to string) error {
if user == "" {
return fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg)
}
if bucket == "" {
return fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg)
}
if from == "" {
return fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg)
}
if to == "" {
return fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to)
return s.hSetKeyExists(ctx, key, "listing_started", true)
Expand All @@ -213,7 +214,7 @@ func (s *policySvc) GetRoutingPolicy(ctx context.Context, user, bucket string) (

func (s *policySvc) GetUserRoutingPolicy(ctx context.Context, user string) (string, error) {
if user == "" {
return "", fmt.Errorf("%w: user is requred to get routing policy", dom.ErrInvalidArg)
return "", fmt.Errorf("%w: user is required to get routing policy", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:route:%s", user)
toStor, err := s.client.Get(ctx, key).Result()
Expand All @@ -228,10 +229,10 @@ func (s *policySvc) GetUserRoutingPolicy(ctx context.Context, user string) (stri

func (s *policySvc) getBucketRoutingPolicy(ctx context.Context, user, bucket string) (string, error) {
if user == "" {
return "", fmt.Errorf("%w: user is requred to get routing policy", dom.ErrInvalidArg)
return "", fmt.Errorf("%w: user is required to get routing policy", dom.ErrInvalidArg)
}
if bucket == "" {
return "", fmt.Errorf("%w: bucket is requred to get routing policy", dom.ErrInvalidArg)
return "", fmt.Errorf("%w: bucket is required to get routing policy", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:route:%s:%s", user, bucket)
toStor, err := s.client.Get(ctx, key).Result()
Expand All @@ -246,10 +247,10 @@ func (s *policySvc) getBucketRoutingPolicy(ctx context.Context, user, bucket str

func (s *policySvc) AddUserRoutingPolicy(ctx context.Context, user, toStorage string) error {
if user == "" {
return fmt.Errorf("%w: user is requred to add user routing policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to add user routing policy", dom.ErrInvalidArg)
}
if toStorage == "" {
return fmt.Errorf("%w: toStorage is requred to add user routing policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: toStorage is required to add user routing policy", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:route:%s", user)
set, err := s.client.SetNX(ctx, key, toStorage, 0).Result()
Expand All @@ -264,13 +265,13 @@ func (s *policySvc) AddUserRoutingPolicy(ctx context.Context, user, toStorage st

func (s *policySvc) addBucketRoutingPolicy(ctx context.Context, user, bucket, toStorage string) error {
if user == "" {
return fmt.Errorf("%w: user is requred to add bucket routing policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to add bucket routing policy", dom.ErrInvalidArg)
}
if bucket == "" {
return fmt.Errorf("%w: bucket is requred to add bucket routing policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: bucket is required to add bucket routing policy", dom.ErrInvalidArg)
}
if toStorage == "" {
return fmt.Errorf("%w: toStorage is requred to add bucket routing policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: toStorage is required to add bucket routing policy", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:route:%s:%s", user, bucket)
set, err := s.client.SetNX(ctx, key, toStorage, 0).Result()
Expand All @@ -285,10 +286,10 @@ func (s *policySvc) addBucketRoutingPolicy(ctx context.Context, user, bucket, to

func (s *policySvc) GetReplicationSwitch(ctx context.Context, user, bucket string) (ReplicationSwitch, error) {
if user == "" {
return ReplicationSwitch{}, fmt.Errorf("%w: user is requred to get replication Switch", dom.ErrInvalidArg)
return ReplicationSwitch{}, fmt.Errorf("%w: user is required to get replication Switch", dom.ErrInvalidArg)
}
if bucket == "" {
return ReplicationSwitch{}, fmt.Errorf("%w: bucket is requred to get replication Switch", dom.ErrInvalidArg)
return ReplicationSwitch{}, fmt.Errorf("%w: bucket is required to get replication Switch", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:switch:%s:%s", user, bucket)
exists, err := s.client.Exists(ctx, key).Result()
Expand All @@ -305,10 +306,10 @@ func (s *policySvc) GetReplicationSwitch(ctx context.Context, user, bucket strin

func (s *policySvc) IsReplicationSwitchInProgress(ctx context.Context, user, bucket string) (bool, error) {
if user == "" {
return false, fmt.Errorf("%w: user is requred to get replication policy", dom.ErrInvalidArg)
return false, fmt.Errorf("%w: user is required to get replication policy", dom.ErrInvalidArg)
}
if bucket == "" {
return false, fmt.Errorf("%w: bucket is requred to get replication policy", dom.ErrInvalidArg)
return false, fmt.Errorf("%w: bucket is required to get replication policy", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:switch:%s:%s", user, bucket)
isDoneStr, err := s.client.HGet(ctx, key, "IsDone").Result()
Expand All @@ -324,10 +325,10 @@ func (s *policySvc) IsReplicationSwitchInProgress(ctx context.Context, user, buc

func (s *policySvc) GetBucketReplicationPolicies(ctx context.Context, user, bucket string) (ReplicationPolicies, error) {
if user == "" {
return ReplicationPolicies{}, fmt.Errorf("%w: user is requred to get replication policy", dom.ErrInvalidArg)
return ReplicationPolicies{}, fmt.Errorf("%w: user is required to get replication policy", dom.ErrInvalidArg)
}
if bucket == "" {
return ReplicationPolicies{}, fmt.Errorf("%w: bucket is requred to get replication policy", dom.ErrInvalidArg)
return ReplicationPolicies{}, fmt.Errorf("%w: bucket is required to get replication policy", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:repl:%s:%s", user, bucket)
return s.getReplicationPolicies(ctx, key)
Expand Down Expand Up @@ -379,16 +380,16 @@ func (s *policySvc) getReplicationPolicies(ctx context.Context, key string) (Rep

func (s *policySvc) GetReplicationPolicyInfo(ctx context.Context, user, bucket, from, to string) (ReplicationPolicyStatus, error) {
if user == "" {
return ReplicationPolicyStatus{}, fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg)
return ReplicationPolicyStatus{}, fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg)
}
if bucket == "" {
return ReplicationPolicyStatus{}, fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg)
return ReplicationPolicyStatus{}, fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg)
}
if from == "" {
return ReplicationPolicyStatus{}, fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg)
return ReplicationPolicyStatus{}, fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg)
}
if to == "" {
return ReplicationPolicyStatus{}, fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg)
return ReplicationPolicyStatus{}, fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg)
}

fKey := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to)
Expand Down Expand Up @@ -434,7 +435,6 @@ func (s *policySvc) GetReplicationPolicyInfo(ctx context.Context, user, bucket,
}

func (s *policySvc) ListReplicationPolicyInfo(ctx context.Context) ([]ReplicationPolicyStatusExtended, error) {

iter := s.client.Scan(ctx, 0, "p:repl_st:*", 0).Iterator()

resCh := make(chan ReplicationPolicyStatusExtended)
Expand Down Expand Up @@ -500,16 +500,16 @@ func (s *policySvc) IsReplicationPolicyExists(ctx context.Context, user, bucket,

func (s *policySvc) IsReplicationPolicyPaused(ctx context.Context, user, bucket, from, to string) (bool, error) {
if user == "" {
return false, fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg)
return false, fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg)
}
if bucket == "" {
return false, fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg)
return false, fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg)
}
if from == "" {
return false, fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg)
return false, fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg)
}
if to == "" {
return false, fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg)
return false, fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg)
}

fKey := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to)
Expand All @@ -525,16 +525,16 @@ func (s *policySvc) IsReplicationPolicyPaused(ctx context.Context, user, bucket,

func (s *policySvc) IncReplInitObjListed(ctx context.Context, user, bucket, from, to string, bytes int64, eventTime time.Time) error {
if user == "" {
return fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg)
}
if bucket == "" {
return fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg)
}
if from == "" {
return fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg)
}
if to == "" {
return fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg)
}
if bytes < 0 {
return fmt.Errorf("%w: bytes must be positive", dom.ErrInvalidArg)
Expand All @@ -557,16 +557,16 @@ func (s *policySvc) IncReplInitObjListed(ctx context.Context, user, bucket, from

func (s *policySvc) IncReplInitObjDone(ctx context.Context, user, bucket, from, to string, bytes int64, eventTime time.Time) error {
if user == "" {
return fmt.Errorf("%w: user is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to get replication policy status", dom.ErrInvalidArg)
}
if bucket == "" {
return fmt.Errorf("%w: bucket is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: bucket is required to get replication policy status", dom.ErrInvalidArg)
}
if from == "" {
return fmt.Errorf("%w: from is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: from is required to get replication policy status", dom.ErrInvalidArg)
}
if to == "" {
return fmt.Errorf("%w: to is requred to get replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: to is required to get replication policy status", dom.ErrInvalidArg)
}
if bytes < 0 {
return fmt.Errorf("%w: bytes must be positive", dom.ErrInvalidArg)
Expand Down Expand Up @@ -621,16 +621,16 @@ func (s *policySvc) IncReplInitObjDone(ctx context.Context, user, bucket, from,

func (s *policySvc) IncReplEvents(ctx context.Context, user, bucket, from, to string, eventTime time.Time) error {
if user == "" {
return fmt.Errorf("%w: user is requred to inc replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to inc replication policy status", dom.ErrInvalidArg)
}
if bucket == "" {
return fmt.Errorf("%w: bucket is requred to inc replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: bucket is required to inc replication policy status", dom.ErrInvalidArg)
}
if from == "" {
return fmt.Errorf("%w: from is requred to inc replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: from is required to inc replication policy status", dom.ErrInvalidArg)
}
if to == "" {
return fmt.Errorf("%w: to is requred to inc replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: to is required to inc replication policy status", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to)
err := s.incIfKeyExists(ctx, key, "events", 1)
Expand All @@ -647,16 +647,16 @@ func (s *policySvc) IncReplEvents(ctx context.Context, user, bucket, from, to st

func (s *policySvc) IncReplEventsDone(ctx context.Context, user, bucket, from, to string, eventTime time.Time) error {
if user == "" {
return fmt.Errorf("%w: user is requred to inc replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to inc replication policy status", dom.ErrInvalidArg)
}
if bucket == "" {
return fmt.Errorf("%w: bucket is requred to inc replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: bucket is required to inc replication policy status", dom.ErrInvalidArg)
}
if from == "" {
return fmt.Errorf("%w: from is requred to inc replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: from is required to inc replication policy status", dom.ErrInvalidArg)
}
if to == "" {
return fmt.Errorf("%w: to is requred to inc replication policy status", dom.ErrInvalidArg)
return fmt.Errorf("%w: to is required to inc replication policy status", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:repl_st:%s:%s:%s:%s", user, bucket, from, to)
err := s.incIfKeyExists(ctx, key, "events_done", 1)
Expand Down Expand Up @@ -715,21 +715,21 @@ func (s *policySvc) hSetKeyExists(ctx context.Context, key, field string, val in

func (s *policySvc) GetUserReplicationPolicies(ctx context.Context, user string) (ReplicationPolicies, error) {
if user == "" {
return ReplicationPolicies{}, fmt.Errorf("%w: user is requred to get replication policy", dom.ErrInvalidArg)
return ReplicationPolicies{}, fmt.Errorf("%w: user is required to get replication policy", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:repl:%s", user)
return s.getReplicationPolicies(ctx, key)
}

func (s *policySvc) AddUserReplicationPolicy(ctx context.Context, user string, from string, to string, priority tasks.Priority) error {
if user == "" {
return fmt.Errorf("%w: user is requred to add replication policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to add replication policy", dom.ErrInvalidArg)
}
if from == "" {
return fmt.Errorf("%w: from is requred to add replication policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: from is required to add replication policy", dom.ErrInvalidArg)
}
if to == "" {
return fmt.Errorf("%w: to is requred to add replication policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: to is required to add replication policy", dom.ErrInvalidArg)
}
if from == to {
return fmt.Errorf("%w: invalid replication policy: from and to should be different", dom.ErrInvalidArg)
Expand Down Expand Up @@ -817,16 +817,16 @@ func (s *policySvc) DeleteBucketReplicationsByUser(ctx context.Context, user, fr

func (s *policySvc) AddBucketReplicationPolicy(ctx context.Context, user, bucket, from string, to string, priority tasks.Priority, agentURL *string) (err error) {
if user == "" {
return fmt.Errorf("%w: user is requred to add replication policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to add replication policy", dom.ErrInvalidArg)
}
if bucket == "" {
return fmt.Errorf("%w: bucket is requred to add replication policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: bucket is required to add replication policy", dom.ErrInvalidArg)
}
if from == "" {
return fmt.Errorf("%w: from is requred to add replication policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: from is required to add replication policy", dom.ErrInvalidArg)
}
if to == "" {
return fmt.Errorf("%w: to is requred to add replication policy", dom.ErrInvalidArg)
return fmt.Errorf("%w: to is required to add replication policy", dom.ErrInvalidArg)
}
if from == to {
return fmt.Errorf("%w: invalid replication policy: from and to should be different", dom.ErrInvalidArg)
Expand Down Expand Up @@ -941,7 +941,7 @@ func (s *policySvc) DoReplicationSwitch(ctx context.Context, user, bucket, newMa
if _, ok := replPolicies.To[newMain]; !ok {
return fmt.Errorf("%w: no previous replication policy to switch", dom.ErrInvalidArg)
}
for prevFollower, _ := range replPolicies.To {
for prevFollower := range replPolicies.To {
prevReplication, err := s.GetReplicationPolicyInfo(ctx, user, bucket, prevMain, prevFollower)
if err != nil {
return err
Expand All @@ -960,7 +960,7 @@ func (s *policySvc) DoReplicationSwitch(ctx context.Context, user, bucket, newMa
}
}
oldFollowers := replPolicies.To
const multipartTTL = time.Hour //todo: move to config or api param
const multipartTTL = time.Hour // todo: move to config or api param
_, err = s.client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
// adjust route policy
routeKey := fmt.Sprintf("p:route:%s:%s", user, bucket)
Expand Down Expand Up @@ -1005,10 +1005,10 @@ func (s *policySvc) DoReplicationSwitch(ctx context.Context, user, bucket, newMa

func (s *policySvc) ReplicationSwitchDone(ctx context.Context, user, bucket string) error {
if user == "" {
return fmt.Errorf("%w: user is requred to get replication Switch", dom.ErrInvalidArg)
return fmt.Errorf("%w: user is required to get replication Switch", dom.ErrInvalidArg)
}
if bucket == "" {
return fmt.Errorf("%w: bucket is requred to get replication Switch", dom.ErrInvalidArg)
return fmt.Errorf("%w: bucket is required to get replication Switch", dom.ErrInvalidArg)
}
key := fmt.Sprintf("p:switch:%s:%s", user, bucket)
err := s.hSetKeyExists(ctx, key, "IsDone", true)
Expand Down
Loading

0 comments on commit 5de068f

Please sign in to comment.