diff --git a/v2/cmd/gazctl/journals_apply.go b/v2/cmd/gazctl/journals_apply.go index 643b919f..b0433c3a 100644 --- a/v2/cmd/gazctl/journals_apply.go +++ b/v2/cmd/gazctl/journals_apply.go @@ -33,10 +33,10 @@ func (cmd *cmdJournalsApply) Execute([]string) error { } var ctx = context.Background() - resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req) + var resp, err = client.ApplyJournalsInBatches(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize) mbp.Must(err, "failed to apply journals") - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") + return nil } diff --git a/v2/cmd/gazctl/journals_edit.go b/v2/cmd/gazctl/journals_edit.go index df5a8a8a..230165f9 100644 --- a/v2/cmd/gazctl/journals_edit.go +++ b/v2/cmd/gazctl/journals_edit.go @@ -7,6 +7,7 @@ import ( "github.com/LiveRamp/gazette/v2/cmd/gazctl/editor" "github.com/LiveRamp/gazette/v2/pkg/client" + mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate" "github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" @@ -54,11 +55,9 @@ func (cmd *cmdJournalsEdit) applySpecs(b []byte) error { } var ctx = context.Background() - if resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req); err != nil { - return err - } else { - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") - } + var resp, err = client.ApplyJournalsInBatches(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize) + mbp.Must(err, "failed to apply journals") + log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") return nil } diff --git a/v2/cmd/gazctl/main.go b/v2/cmd/gazctl/main.go index fc50642d..d4c94417 100644 --- a/v2/cmd/gazctl/main.go +++ b/v2/cmd/gazctl/main.go @@ -40,12 +40,15 @@ type ListConfig struct { // ApplyConfig is common configuration of apply operations. type ApplyConfig struct { - SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"` - DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"` + SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"` + DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"` + MaxTxnSize int `long:"max-txn-size" default:"0" description:"maximum number of specs to be processed within an apply transaction. If 0, the default, all changes are issued in a single transaction"` } +// EditConfig is common configuration for exit operations. type EditConfig struct { - Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"` + Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"` + MaxTxnSize int `long:"max-txn-size" default:"0" description:"maximum number of specs to be processed within an apply transaction. If 0, the default, all changes are issued in a single transaction"` } func (cfg ApplyConfig) decode(into interface{}) error { @@ -147,7 +150,7 @@ journals or parents thereof in the hierarchy. Note that deleted parent prefixes will cascade only to JournalSpecs *explicitly listed* as children of the prefix in the YAML, and not to other JournalSpecs which may exist with the prefix but are not enumerated. -`, &cmdJournalsApply{}) +`+maxTxnSizeWarning, &cmdJournalsApply{}) _ = addCmd(cmdShards, "apply", "Apply shard specifications", ` Apply a collection of ShardSpec creations, updates, or deletions. @@ -161,7 +164,7 @@ collection of ShardSpecs; this check ensures concurrent modifications are caught ShardSpecs may be created by setting "revision" to zero or omitting it altogether. ShardSpecs may be deleted by setting their field "delete" to true. -`, &cmdShardsApply{}) +`+maxTxnSizeWarning, &cmdShardsApply{}) _ = addCmd(cmdJournals, "read", "Read journal contents", ` Read the contents journal or journals as a stream. @@ -183,8 +186,8 @@ To read from an arbitrary offset into a journal(s) use the --offset flag. If not passed the default value is -1 which will read from the head of the journal. `, &cmdJournalRead{}) - _ = addCmd(cmdJournals, "edit", "Edit journal specifications", journalsEditLongDesc, &cmdJournalsEdit{}) - _ = addCmd(cmdShards, "edit", "Edit shard specifications", shardsEditLongDesc, &cmdShardsEdit{}) + _ = addCmd(cmdJournals, "edit", "Edit journal specifications", journalsEditLongDesc+maxTxnSizeWarningEdit, &cmdJournalsEdit{}) + _ = addCmd(cmdShards, "edit", "Edit shard specifications", shardsEditLongDesc+maxTxnSizeWarningEdit, &cmdShardsEdit{}) _ = addCmd(cmdShards, "prune", "Removes fragments of a hinted recovery log which are no longer needed", ` Recovery logs capture every write which has ever occurred in a Shard DB. This includes all prior writes of client keys & values, and also RocksDB @@ -242,6 +245,17 @@ const editCmdLongDescription = `The edit command allows you to directly edit jou Upon exiting the editor, if the file has been changed, it will be validated and applied. If the file is invalid or fails to apply, the editor is re-opened. Exiting the editor with no changes or saving an empty file are interpreted as the user aborting the edit attempt.` +const maxTxnSizeWarning = ` +In the event that this command generates more changes than are possible in a +single etcd transaction given the current server configation (default 128). +Gazctl supports a max transaction size flag (--max-txn-size) which will send +the changes in batches of at most the max transaction size, however this means +a loss of transactionality and should be used with caution.` + +const maxTxnSizeWarningEdit = maxTxnSizeWarning + ` Instead it is +recomended that additional label selectors are used to limit the number of +changes within this operation.` + type editDescription struct { Type, HelpCommand, Examples string } diff --git a/v2/cmd/gazctl/shards_apply.go b/v2/cmd/gazctl/shards_apply.go index 0227a774..cc9fd8fd 100644 --- a/v2/cmd/gazctl/shards_apply.go +++ b/v2/cmd/gazctl/shards_apply.go @@ -30,10 +30,10 @@ func (cmd *cmdShardsApply) Execute([]string) error { } var ctx = context.Background() - resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req) + var resp, err = consumer.ApplyShardsInBatches(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize) mbp.Must(err, "failed to apply shards") - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") + return nil } diff --git a/v2/cmd/gazctl/shards_edit.go b/v2/cmd/gazctl/shards_edit.go index b7a1a06e..fd36539e 100644 --- a/v2/cmd/gazctl/shards_edit.go +++ b/v2/cmd/gazctl/shards_edit.go @@ -8,6 +8,7 @@ import ( "github.com/LiveRamp/gazette/v2/cmd/gazctl/editor" "github.com/LiveRamp/gazette/v2/pkg/consumer" "github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace" + mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -21,7 +22,7 @@ func (cmd *cmdShardsEdit) Execute([]string) error { return editor.EditRetryLoop(editor.RetryLoopArgs{ FilePrefix: "gazctl-shards-edit-", SelectFn: cmd.selectSpecs, - ApplyFn: applyShardSpecYAML, + ApplyFn: cmd.applyShardSpecYAML, AbortIfUnchanged: true, }) } @@ -38,7 +39,7 @@ func (cmd *cmdShardsEdit) selectSpecs() io.Reader { return buf } -func applyShardSpecYAML(b []byte) error { +func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error { var set shardspace.Set if err := yaml.UnmarshalStrict(b, &set); err != nil { return err @@ -49,11 +50,8 @@ func applyShardSpecYAML(b []byte) error { } var ctx = context.Background() - if resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req); err != nil { - return err - } else { - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") - } - + var resp, err = consumer.ApplyShardsInBatches(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize) + mbp.Must(err, "failed to apply shards") + log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") return nil } diff --git a/v2/pkg/client/list.go b/v2/pkg/client/list.go index 12fc0519..24559634 100644 --- a/v2/pkg/client/list.go +++ b/v2/pkg/client/list.go @@ -94,16 +94,49 @@ func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRe return resp, nil } -// ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error. +// ApplyJournals invokes the Apply RPC. func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { - if r, err := jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil { - return r, err - } else if err = r.Validate(); err != nil { - return r, err - } else if r.Status != pb.Status_OK { - return r, errors.New(r.Status.String()) - } else { - return r, nil + return ApplyJournalsInBatches(ctx, jc, req, 0) +} + +// ApplyJournalsInBatches applies changes to journals which +// may be larger than the configured etcd transaction size size. The changes in +// |req| will be sent serially in batches of size |size|. If +// |size| is 0 all changes will be attempted as part of a single +// transaction. This function will return the response of the final +// ShardClient.Apply call. Response validation or !OK status from Apply RPC are +// mapped to error. +func ApplyJournalsInBatches(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest, size int) (*pb.ApplyResponse, error) { + if len(req.Changes) == 0 { + return &pb.ApplyResponse{}, nil + } + if size == 0 { + size = len(req.Changes) + } + var curReq = &pb.ApplyRequest{} + var offset = 0 + + for { + if len(req.Changes[offset:]) > size { + curReq.Changes = req.Changes[offset : offset+size] + } else { + curReq.Changes = req.Changes[offset:] + } + + var resp *pb.ApplyResponse + var err error + if resp, err = jc.Apply(pb.WithDispatchDefault(ctx), curReq, grpc.WaitForReady(true)); err != nil { + return resp, err + } else if err = resp.Validate(); err != nil { + return resp, err + } else if resp.Status != pb.Status_OK { + return resp, errors.New(resp.Status.String()) + } + + offset = offset + len(curReq.Changes) + if offset == len(req.Changes) { + return resp, nil + } } } diff --git a/v2/pkg/client/list_test.go b/v2/pkg/client/list_test.go index bd365acd..b7bd631e 100644 --- a/v2/pkg/client/list_test.go +++ b/v2/pkg/client/list_test.go @@ -185,6 +185,113 @@ func (s *ListSuite) TestListAllFragments(c *gc.C) { c.Check(err, gc.ErrorMatches, `Status: invalid status \(1000\)`) } +func (s *ListSuite) TestApplyJournalsInBatches(c *gc.C) { + var ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + var broker = teststub.NewBroker(c, ctx) + + var client = pb.NewRoutedJournalClient(broker.MustClient(), NewRouteCache(2, time.Hour)) + + var hdr = buildHeaderFixture(broker) + // Case: size is 0. All changes are submitted. + var fixture = buildApplyReqFixtue() + var expected = &pb.ApplyResponse{ + Status: pb.Status_OK, + Header: *hdr, + } + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Check(req, gc.DeepEquals, fixture) + return expected, nil + } + resp, err := ApplyJournalsInBatches(ctx, client, fixture, 0) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: size == len(req.Changes). All changes are submitted. + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 3) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: size < len(req.Changes). Changes are batched. + var iter = 0 + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Check(req, gc.DeepEquals, &pb.ApplyRequest{ + Changes: []pb.ApplyRequest_Change{ + {Upsert: fixture.Changes[iter].Upsert, ExpectModRevision: 1}, + }, + }) + iter++ + return expected, nil + } + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: empty list of changes. + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Error("should not be called") + return nil, nil + } + resp, err = ApplyJournalsInBatches(ctx, client, &pb.ApplyRequest{}, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, &pb.ApplyResponse{}) + + // Case: Return Error from backend. + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return nil, errors.New("something has gone wrong") + } + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong") + + // Case: Status !OK mapped as an error. + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return &pb.ApplyResponse{ + Status: pb.Status_ETCD_TRANSACTION_FAILED, + Header: *hdr, + }, nil + } + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1) + c.Check(err.Error(), gc.Matches, pb.Status_ETCD_TRANSACTION_FAILED.String()) + + // Case: Validation error mapped as error. + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return &pb.ApplyResponse{ + Status: pb.Status_ETCD_TRANSACTION_FAILED, + }, nil + } + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`) +} + +func buildApplyReqFixtue() *pb.ApplyRequest { + // Create a fixture of JournalSpecs which we'll list. + var fragSpec = pb.JournalSpec_Fragment{ + Length: 1024, + RefreshInterval: time.Second, + CompressionCodec: pb.CompressionCodec_SNAPPY, + } + var specA = &pb.JournalSpec{ + Name: "journal/1/A", + LabelSet: pb.MustLabelSet("foo", "bar"), + Replication: 1, + Fragment: fragSpec, + } + var specB = &pb.JournalSpec{ + Name: "journal/2/B", + LabelSet: pb.MustLabelSet("bar", "baz"), + Replication: 1, + Fragment: fragSpec, + } + + return &pb.ApplyRequest{ + Changes: []pb.ApplyRequest_Change{ + {Upsert: specA, ExpectModRevision: 1}, + {Upsert: specB, ExpectModRevision: 1}, + }, + } +} + func buildListResponseFixture(names ...pb.Journal) (out []pb.ListResponse_Journal) { for _, n := range names { out = append(out, pb.ListResponse_Journal{ diff --git a/v2/pkg/consumer/shard_api.go b/v2/pkg/consumer/shard_api.go index 5ac3857f..8cb93b87 100644 --- a/v2/pkg/consumer/shard_api.go +++ b/v2/pkg/consumer/shard_api.go @@ -191,16 +191,49 @@ func ListShards(ctx context.Context, sc ShardClient, req *ListRequest) (*ListRes } } -// ApplyShards invokes the Apply RPC, and maps a validation or !OK status to an error. +// ApplyShards invokes the Apply RPC. func ApplyShards(ctx context.Context, sc ShardClient, req *ApplyRequest) (*ApplyResponse, error) { - if r, err := sc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil { - return r, err - } else if err = r.Validate(); err != nil { - return r, err - } else if r.Status != Status_OK { - return r, errors.New(r.Status.String()) - } else { - return r, nil + return ApplyShardsInBatches(ctx, sc, req, 0) +} + +// ApplyShardsInBatches applies changes to shards which +// may be larger than the configured etcd transaction size size. The changes in +// |req| will be sent serially in batches of size |size|. If +// |size| is 0 all changes will be attempted as part of a single +// transaction. This function will return the response of the final +// ShardClient.Apply call. Response validation or !OK status from Apply RPC are +// mapped to error. +func ApplyShardsInBatches(ctx context.Context, sc ShardClient, req *ApplyRequest, size int) (*ApplyResponse, error) { + if len(req.Changes) == 0 { + return &ApplyResponse{}, nil + } + if size == 0 { + size = len(req.Changes) + } + var curReq = &ApplyRequest{} + var offset = 0 + + for { + if len(req.Changes[offset:]) > size { + curReq.Changes = req.Changes[offset : offset+size] + } else { + curReq.Changes = req.Changes[offset:] + } + + var resp *ApplyResponse + var err error + if resp, err = sc.Apply(pb.WithDispatchDefault(ctx), curReq, grpc.WaitForReady(true)); err != nil { + return resp, err + } else if err = resp.Validate(); err != nil { + return resp, err + } else if resp.Status != Status_OK { + return resp, errors.New(resp.Status.String()) + } + + offset = offset + len(curReq.Changes) + if offset == len(req.Changes) { + return resp, nil + } } } diff --git a/v2/pkg/consumer/shard_api_test.go b/v2/pkg/consumer/shard_api_test.go index b00aae62..1bedd17d 100644 --- a/v2/pkg/consumer/shard_api_test.go +++ b/v2/pkg/consumer/shard_api_test.go @@ -1,6 +1,11 @@ package consumer import ( + "context" + "errors" + "time" + + "github.com/LiveRamp/gazette/v2/pkg/client" pb "github.com/LiveRamp/gazette/v2/pkg/protocol" "github.com/LiveRamp/gazette/v2/pkg/recoverylog" gc "github.com/go-check/check" @@ -183,6 +188,84 @@ func (s *APISuite) TestApplyCases(c *gc.C) { c.Check(err, gc.ErrorMatches, `Changes\[0\].Delete: not a valid token \(invalid shard id\)`) } +func (s *APISuite) TestApplyShardsInBatches(c *gc.C) { + var ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + var ss = newShardServerStub(ctx, c) + var client = NewRoutedShardClient(ss.MustClient(), client.NewRouteCache(2, time.Hour)) + + var hdr = buildHeaderFixture(ss) + // Case: size is 0. All changes are submitted. + var fixture = buildApplyReqFixtue() + var expected = &ApplyResponse{ + Status: Status_OK, + Header: *hdr, + } + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Check(req, gc.DeepEquals, fixture) + return expected, nil + } + resp, err := ApplyShardsInBatches(ctx, client, fixture, 0) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: size == len(req.Changes). All changes are submitted. + resp, err = ApplyShardsInBatches(ctx, client, fixture, 3) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: size < len(req.Changes). Changes are batched. + var iter = 0 + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Check(req, gc.DeepEquals, &ApplyRequest{ + Changes: []ApplyRequest_Change{ + {Upsert: fixture.Changes[iter].Upsert, ExpectModRevision: 1}, + }, + }) + iter++ + return expected, nil + } + resp, err = ApplyShardsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: empty list of changes. + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Error("should not be called") + return nil, nil + } + resp, err = ApplyShardsInBatches(ctx, client, &ApplyRequest{}, 1) + c.Check(resp, gc.DeepEquals, &ApplyResponse{}) + c.Check(err, gc.IsNil) + + // Case: Return Error from backend. + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return nil, errors.New("something has gone wrong") + } + resp, err = ApplyShardsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong") + + // Case: Status !OK mapped as an error. + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return &ApplyResponse{ + Status: Status_ETCD_TRANSACTION_FAILED, + Header: *hdr, + }, nil + } + resp, err = ApplyShardsInBatches(ctx, client, fixture, 1) + c.Check(err.Error(), gc.Matches, Status_ETCD_TRANSACTION_FAILED.String()) + + // Case: Validation error mapped as error. + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return &ApplyResponse{ + Status: Status_ETCD_TRANSACTION_FAILED, + }, nil + } + resp, err = ApplyShardsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`) +} + func (s *APISuite) TestHintsCases(c *gc.C) { var tf, cleanup = newTestFixture(c) defer cleanup() @@ -280,4 +363,33 @@ func (s *APISuite) TestHintsCases(c *gc.C) { tf.allocateShard(c, spec) // Cleanup. } +func buildHeaderFixture(ep interface{ Endpoint() pb.Endpoint }) *pb.Header { + return &pb.Header{ + ProcessId: pb.ProcessSpec_ID{Zone: "a", Suffix: "broker"}, + Route: pb.Route{ + Members: []pb.ProcessSpec_ID{{Zone: "a", Suffix: "broker"}}, + Endpoints: []pb.Endpoint{ep.Endpoint()}, + Primary: 0, + }, + Etcd: pb.Header_Etcd{ + ClusterId: 12, + MemberId: 34, + Revision: 56, + RaftTerm: 78, + }, + } +} + +func buildApplyReqFixtue() *ApplyRequest { + var specA = makeShard(shardA) + var specB = makeShard(shardB) + + return &ApplyRequest{ + Changes: []ApplyRequest_Change{ + {Upsert: specA, ExpectModRevision: 1}, + {Upsert: specB, ExpectModRevision: 1}, + }, + } +} + var _ = gc.Suite(&APISuite{})