Skip to content

Commit

Permalink
add custom dest bucket to chorctl
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Torubarov <[email protected]>
  • Loading branch information
arttor committed Jan 13, 2025
1 parent 8bc7f34 commit 4c9f828
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 93 deletions.
24 changes: 12 additions & 12 deletions pkg/policy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ type policySvc struct {
}

func (s *policySvc) ObjListStarted(ctx context.Context, user, bucket, from, to string, toBucket *string) error {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (s *policySvc) getReplicationPolicies(ctx context.Context, key string) (Rep
}

func (s *policySvc) GetReplicationPolicyInfo(ctx context.Context, user, bucket, from, to string, toBucket *string) (ReplicationPolicyStatus, error) {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down Expand Up @@ -549,7 +549,7 @@ func (s *policySvc) ListReplicationPolicyInfo(ctx context.Context) ([]Replicatio
}

func (s *policySvc) IsReplicationPolicyExists(ctx context.Context, user, bucket, from, to string, toBucket *string) (bool, error) {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand All @@ -569,7 +569,7 @@ func (s *policySvc) IsReplicationPolicyExists(ctx context.Context, user, bucket,
}

func (s *policySvc) IsReplicationPolicyPaused(ctx context.Context, user, bucket, from, to string, toBucket *string) (bool, error) {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down Expand Up @@ -601,7 +601,7 @@ func (s *policySvc) IsReplicationPolicyPaused(ctx context.Context, user, bucket,
}

func (s *policySvc) IncReplInitObjListed(ctx context.Context, user, bucket, from, to string, toBucket *string, bytes int64, eventTime time.Time) error {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down Expand Up @@ -640,7 +640,7 @@ func (s *policySvc) IncReplInitObjListed(ctx context.Context, user, bucket, from
}

func (s *policySvc) IncReplInitObjDone(ctx context.Context, user, bucket, from, to string, toBucket *string, bytes int64, eventTime time.Time) error {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down Expand Up @@ -711,7 +711,7 @@ func (s *policySvc) IncReplInitObjDone(ctx context.Context, user, bucket, from,
}

func (s *policySvc) IncReplEvents(ctx context.Context, user, bucket, from, to string, toBucket *string, eventTime time.Time) error {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down Expand Up @@ -744,7 +744,7 @@ func (s *policySvc) IncReplEvents(ctx context.Context, user, bucket, from, to st
}

func (s *policySvc) IncReplEventsDone(ctx context.Context, user, bucket, from, to string, toBucket *string, eventTime time.Time) error {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down Expand Up @@ -926,7 +926,7 @@ func (s *policySvc) DeleteBucketReplicationsByUser(ctx context.Context, user, fr
}

func (s *policySvc) AddBucketReplicationPolicy(ctx context.Context, user, bucket, fromStor string, toStor string, toBucket *string, priority tasks.Priority, agentURL *string) (err error) {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func fromStrPtr(s *string) string {
}

func (s *policySvc) PauseReplication(ctx context.Context, user, bucket, from string, to string, toBucket *string) error {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand All @@ -1025,7 +1025,7 @@ func (s *policySvc) PauseReplication(ctx context.Context, user, bucket, from str
}

func (s *policySvc) ResumeReplication(ctx context.Context, user, bucket, from string, to string, toBucket *string) error {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand All @@ -1041,7 +1041,7 @@ func (s *policySvc) ResumeReplication(ctx context.Context, user, bucket, from st
}

func (s *policySvc) DeleteReplication(ctx context.Context, user, bucket, fromStor, toStor string, toBucket *string) error {
if toBucket != nil && *toBucket == bucket {
if toBucket != nil && (*toBucket == "" || *toBucket == bucket) {
// custom dest bucket makes sense only if different from src bucket
toBucket = nil
}
Expand Down
31 changes: 19 additions & 12 deletions tools/chorctl/cmd/repl_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@ var (
raTo string
raUser string
raAgentURL string
raBuckets []string
raBucket string
raToBucket string
)

// addCmd represents the add command
var addCmd = &cobra.Command{
Use: "add",
Short: "adds new bucket replication rule(-s)",
Short: "adds new bucket replication rule",
Long: `Example:
chorctl repl add -f main -t follower -u admin -b bucket1 -b bucket2
- will create 2 replication rules for 2 buckets`,
chorctl repl add -f main -t follower -u admin -b bucket1
- will replicate bucket "bucket1" from storage "main" to storage "follower"
chorctl repl add -f main -t follower -u admin -b src-bucket --to-buckt=dest-bucket
- will replicate bucket "src-bucket" from storage "main" to bucket "dest-bucket" in storage "follower"`,
Run: func(cmd *cobra.Command, args []string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -49,18 +53,20 @@ chorctl repl add -f main -t follower -u admin -b bucket1 -b bucket2
defer conn.Close()
client := pb.NewChorusClient(conn)

req := &pb.AddReplicationRequest{
User: raUser,
From: raFrom,
To: raTo,
Buckets: raBuckets,
IsForAllBuckets: false,
req := &pb.AddBucketReplicationRequest{
User: raUser,
FromStorage: raFrom,
ToStorage: raTo,
FromBucket: raBucket,
}
if raAgentURL != "" {
req.AgentUrl = &raAgentURL
}
if raToBucket != "" {
req.ToBucket = &raToBucket
}

_, err = client.AddReplication(ctx, req)
_, err = client.AddBucketReplication(ctx, req)
if err != nil {
logrus.WithError(err).WithField("address", address).Fatal("unable to add replication")
}
Expand All @@ -73,7 +79,8 @@ func init() {
addCmd.Flags().StringVarP(&raTo, "to", "t", "", "to storage")
addCmd.Flags().StringVarP(&raUser, "user", "u", "", "storage user")
addCmd.Flags().StringVar(&raAgentURL, "agent-url", "", "notifications agent url")
addCmd.Flags().StringArrayVarP(&raBuckets, "bucket", "b", nil, "bucket: multiple values supported: -b bucket1 -b bucket2")
addCmd.Flags().StringVarP(&raBucket, "bucket", "b", "", "bucket name to replicate")
addCmd.Flags().StringVar(&raToBucket, "to-bucket", "", "custom destinatin bucket name. Set if destination bucket should have different name from source bucket")
err := addCmd.MarkFlagRequired("from")
if err != nil {
logrus.WithError(err).Fatal()
Expand Down
18 changes: 12 additions & 6 deletions tools/chorctl/cmd/repl_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
)

var (
rdFrom string
rdTo string
rdUser string
rdBucket string
rdFrom string
rdTo string
rdUser string
rdBucket string
rdToBucket string
)

// deleteCmd represents the delete command
Expand All @@ -48,12 +49,16 @@ chorctl repl delete -f main -t follower -u admin -b bucket1`,
defer conn.Close()
client := pb.NewChorusClient(conn)

_, err = client.DeleteReplication(ctx, &pb.ReplicationRequest{
req := &pb.ReplicationRequest{
User: rdUser,
Bucket: rdBucket,
From: rdFrom,
To: rdTo,
})
}
if rdToBucket != "" {
req.ToBucket = &rdToBucket
}
_, err = client.DeleteReplication(ctx, req)
if err != nil {
logrus.WithError(err).Fatal("unable to add replication")
}
Expand All @@ -66,6 +71,7 @@ func init() {
deleteCmd.Flags().StringVarP(&rdTo, "to", "t", "", "to storage")
deleteCmd.Flags().StringVarP(&rdUser, "user", "u", "", "storage user")
deleteCmd.Flags().StringVarP(&rdBucket, "bucket", "b", "", "bucket name")
deleteCmd.Flags().StringVar(&rdToBucket, "to-bucket", "", "custom destinatin bucket name. Set if destination bucket should have different name from source bucket")
err := deleteCmd.MarkFlagRequired("from")
if err != nil {
logrus.WithError(err).Fatal()
Expand Down
26 changes: 16 additions & 10 deletions tools/chorctl/cmd/repl_pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
)

var (
rpFrom string
rpTo string
rpUser string
rpBucket string
rpFrom string
rpTo string
rpUser string
rpBucket string
rpToBucket string
)

// pauseCmd represents the pause command
Expand All @@ -48,12 +49,16 @@ chorctl repl pause -f main -t follower -u admin -b bucket1`,
defer conn.Close()
client := pb.NewChorusClient(conn)

_, err = client.PauseReplication(ctx, &pb.ReplicationRequest{
User: rpUser,
Bucket: rpBucket,
From: rpFrom,
To: rpTo,
})
req := &pb.ReplicationRequest{
User: rdUser,
Bucket: rdBucket,
From: rdFrom,
To: rdTo,
}
if rpToBucket != "" {
req.ToBucket = &rpToBucket
}
_, err = client.PauseReplication(ctx, req)
if err != nil {
logrus.WithError(err).Fatal("unable to add replication")
}
Expand All @@ -66,6 +71,7 @@ func init() {
pauseCmd.Flags().StringVarP(&rpTo, "to", "t", "", "to storage")
pauseCmd.Flags().StringVarP(&rpUser, "user", "u", "", "storage user")
pauseCmd.Flags().StringVarP(&rpBucket, "bucket", "b", "", "bucket name")
pauseCmd.Flags().StringVar(&rpToBucket, "to-bucket", "", "custom destinatin bucket name. Set if destination bucket should have different name from source bucket")
err := pauseCmd.MarkFlagRequired("from")
if err != nil {
logrus.WithError(err).Fatal()
Expand Down
26 changes: 16 additions & 10 deletions tools/chorctl/cmd/repl_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
)

var (
rrFrom string
rrTo string
rrUser string
rrBucket string
rrFrom string
rrTo string
rrUser string
rrBucket string
rrToBucket string
)

// resumeCmd represents the pause command
Expand All @@ -48,12 +49,16 @@ chorctl repl resume -f main -t follower -u admin -b bucket1`,
defer conn.Close()
client := pb.NewChorusClient(conn)

_, err = client.ResumeReplication(ctx, &pb.ReplicationRequest{
User: rrUser,
Bucket: rrBucket,
From: rrFrom,
To: rrTo,
})
req := &pb.ReplicationRequest{
User: rdUser,
Bucket: rdBucket,
From: rdFrom,
To: rdTo,
}
if rrToBucket != "" {
req.ToBucket = &rrToBucket
}
_, err = client.ResumeReplication(ctx, req)
if err != nil {
logrus.WithError(err).Fatal("unable to add replication")
}
Expand All @@ -66,6 +71,7 @@ func init() {
resumeCmd.Flags().StringVarP(&rrTo, "to", "t", "", "to storage")
resumeCmd.Flags().StringVarP(&rrUser, "user", "u", "", "storage user")
resumeCmd.Flags().StringVarP(&rrBucket, "bucket", "b", "", "bucket name")
resumeCmd.Flags().StringVar(&rrToBucket, "to-bucket", "", "custom destinatin bucket name. Set if destination bucket should have different name from source bucket")
err := resumeCmd.MarkFlagRequired("from")
if err != nil {
logrus.WithError(err).Fatal()
Expand Down
25 changes: 13 additions & 12 deletions tools/chorctl/go.mod
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
module github.com/clyso/chorus/tools/chorctl

go 1.21
go 1.22

toolchain go1.23.1

require (
github.com/charmbracelet/bubbles v0.16.1
github.com/charmbracelet/bubbletea v0.24.2
github.com/charmbracelet/lipgloss v0.7.1
github.com/clyso/chorus v0.0.0-20230613144403-9f72f3d28ede
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
github.com/spf13/viper v1.16.0
github.com/clyso/chorus v0.0.0-20230613144403-9f72f3d28ede
google.golang.org/grpc v1.58.2
google.golang.org/protobuf v1.31.0
google.golang.org/grpc v1.69.2
google.golang.org/protobuf v1.36.2
)

require (
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand All @@ -39,13 +40,13 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230920204549-e6e6cdab5c13 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
Loading

0 comments on commit 4c9f828

Please sign in to comment.