diff --git a/v2/cmd/gazctl/journals_apply.go b/v2/cmd/gazctl/journals_apply.go index 643b919ff..21520a9d5 100644 --- a/v2/cmd/gazctl/journals_apply.go +++ b/v2/cmd/gazctl/journals_apply.go @@ -8,6 +8,7 @@ import ( mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate" pb "github.com/LiveRamp/gazette/v2/pkg/protocol" "github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/gogo/protobuf/proto" log "github.com/sirupsen/logrus" ) @@ -33,10 +34,13 @@ func (cmd *cmdJournalsApply) Execute([]string) error { } var ctx = context.Background() - resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req) + resp, err := client.ApplyJournalsInBatches(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize) + if err == rpctypes.ErrGRPCTooManyOps { + tooManyOpsPanic() + } mbp.Must(err, "failed to apply journals") - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") + return nil } @@ -59,3 +63,15 @@ func newJournalSpecApplyRequest(tree *journalspace.Node) *pb.ApplyRequest { }) return req } + +func tooManyOpsPanic() { + log.Panic(` +This operation has generated more changes than are possible in a single etcd +transaction given the current server configation. Gazctl supports a +max transaction size flag (--max-txn-size) which will send the changes in +batches of at most the max transaction size, however this means a loss +of transactionality and should be used with caution. Instead it is recomended +that additional label selectors are used to limit the number of changes within +this operation. +`) +} diff --git a/v2/cmd/gazctl/journals_edit.go b/v2/cmd/gazctl/journals_edit.go index df5a8a8ab..a7a0e3724 100644 --- a/v2/cmd/gazctl/journals_edit.go +++ b/v2/cmd/gazctl/journals_edit.go @@ -7,7 +7,10 @@ import ( "github.com/LiveRamp/gazette/v2/cmd/gazctl/editor" "github.com/LiveRamp/gazette/v2/pkg/client" + mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate" + pb "github.com/LiveRamp/gazette/v2/pkg/protocol" "github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -54,11 +57,14 @@ func (cmd *cmdJournalsEdit) applySpecs(b []byte) error { } var ctx = context.Background() - if resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req); err != nil { - return err - } else { - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") + var resp *pb.ApplyResponse + var err error + resp, err = client.ApplyJournalsInBatches(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize) + if err == rpctypes.ErrTooManyOps { + tooManyOpsPanic() } + mbp.Must(err, "failed to apply journals") + log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") return nil } diff --git a/v2/cmd/gazctl/main.go b/v2/cmd/gazctl/main.go index fc50642df..ec2fa53ec 100644 --- a/v2/cmd/gazctl/main.go +++ b/v2/cmd/gazctl/main.go @@ -40,12 +40,15 @@ type ListConfig struct { // ApplyConfig is common configuration of apply operations. type ApplyConfig struct { - SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"` - DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"` + SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"` + DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"` + MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"maximum number of specs to be processed within an apply transaction. If 0, the default, all changes are issued in a single transaction"` } +// EditConfig is common configuration for exit operations. type EditConfig struct { - Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"` + Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"` + MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"maximum number of specs to be processed within an apply transaction. If 0, the default, all changes are issued in a single transaction"` } func (cfg ApplyConfig) decode(into interface{}) error { diff --git a/v2/cmd/gazctl/shards_apply.go b/v2/cmd/gazctl/shards_apply.go index 0227a774b..b99010c53 100644 --- a/v2/cmd/gazctl/shards_apply.go +++ b/v2/cmd/gazctl/shards_apply.go @@ -7,6 +7,7 @@ import ( "github.com/LiveRamp/gazette/v2/pkg/consumer" "github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace" mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/gogo/protobuf/proto" log "github.com/sirupsen/logrus" ) @@ -30,10 +31,15 @@ func (cmd *cmdShardsApply) Execute([]string) error { } var ctx = context.Background() - resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req) + var resp *consumer.ApplyResponse + var err error + resp, err = consumer.ApplyShardsInBatches(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize) + if err == rpctypes.ErrTooManyOps { + tooManyOpsPanic() + } mbp.Must(err, "failed to apply shards") - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") + return nil } diff --git a/v2/cmd/gazctl/shards_edit.go b/v2/cmd/gazctl/shards_edit.go index b7a1a06e4..f25ff4544 100644 --- a/v2/cmd/gazctl/shards_edit.go +++ b/v2/cmd/gazctl/shards_edit.go @@ -8,6 +8,8 @@ import ( "github.com/LiveRamp/gazette/v2/cmd/gazctl/editor" "github.com/LiveRamp/gazette/v2/pkg/consumer" "github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace" + mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -21,7 +23,7 @@ func (cmd *cmdShardsEdit) Execute([]string) error { return editor.EditRetryLoop(editor.RetryLoopArgs{ FilePrefix: "gazctl-shards-edit-", SelectFn: cmd.selectSpecs, - ApplyFn: applyShardSpecYAML, + ApplyFn: cmd.applyShardSpecYAML, AbortIfUnchanged: true, }) } @@ -38,7 +40,7 @@ func (cmd *cmdShardsEdit) selectSpecs() io.Reader { return buf } -func applyShardSpecYAML(b []byte) error { +func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error { var set shardspace.Set if err := yaml.UnmarshalStrict(b, &set); err != nil { return err @@ -49,11 +51,13 @@ func applyShardSpecYAML(b []byte) error { } var ctx = context.Background() - if resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req); err != nil { - return err - } else { - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") + var resp *consumer.ApplyResponse + var err error + resp, err = consumer.ApplyShardsInBatches(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize) + if err == rpctypes.ErrGRPCTooManyOps { + tooManyOpsPanic() } - + mbp.Must(err, "failed to apply shards") + log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") return nil } diff --git a/v2/pkg/client/list.go b/v2/pkg/client/list.go index 12fc05197..825c47a04 100644 --- a/v2/pkg/client/list.go +++ b/v2/pkg/client/list.go @@ -94,16 +94,49 @@ func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRe return resp, nil } -// ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error. +// ApplyJournals invokes the Apply RPC. func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { - if r, err := jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil { - return r, err - } else if err = r.Validate(); err != nil { - return r, err - } else if r.Status != pb.Status_OK { - return r, errors.New(r.Status.String()) - } else { - return r, nil + return ApplyJournalsInBatches(ctx, jc, req, 0) +} + +// ApplyJournalsInBatches is a helper function for applying changes to journals which +// may be larger than the configured etcd transaction size size. The changes in +// |req| will be sent serially in batches of size |size|. If +// |size| is 0 all changes will be attempted as part of a single +// transaction. This function will return the response of the final +// ShardClient.Apply call. Response validation or !OK status from Apply RPC are +// mapped to error. +func ApplyJournalsInBatches(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest, size int) (*pb.ApplyResponse, error) { + if len(req.Changes) == 0 { + return &pb.ApplyResponse{}, nil + } + if size == 0 { + size = len(req.Changes) + } + var curReq = &pb.ApplyRequest{} + var offset = 0 + + for { + if len(req.Changes[offset:]) > size { + curReq.Changes = req.Changes[offset : offset+size] + } else { + curReq.Changes = req.Changes[offset:] + } + + var resp *pb.ApplyResponse + var err error + if resp, err = jc.Apply(pb.WithDispatchDefault(ctx), curReq, grpc.FailFast(false)); err != nil { + return resp, err + } else if err = resp.Validate(); err != nil { + return resp, err + } else if resp.Status != pb.Status_OK { + return resp, errors.New(resp.Status.String()) + } + + offset = offset + len(curReq.Changes) + if offset == len(req.Changes) { + return resp, nil + } } } diff --git a/v2/pkg/client/list_test.go b/v2/pkg/client/list_test.go index bd365acd2..b7bd631e5 100644 --- a/v2/pkg/client/list_test.go +++ b/v2/pkg/client/list_test.go @@ -185,6 +185,113 @@ func (s *ListSuite) TestListAllFragments(c *gc.C) { c.Check(err, gc.ErrorMatches, `Status: invalid status \(1000\)`) } +func (s *ListSuite) TestApplyJournalsInBatches(c *gc.C) { + var ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + var broker = teststub.NewBroker(c, ctx) + + var client = pb.NewRoutedJournalClient(broker.MustClient(), NewRouteCache(2, time.Hour)) + + var hdr = buildHeaderFixture(broker) + // Case: size is 0. All changes are submitted. + var fixture = buildApplyReqFixtue() + var expected = &pb.ApplyResponse{ + Status: pb.Status_OK, + Header: *hdr, + } + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Check(req, gc.DeepEquals, fixture) + return expected, nil + } + resp, err := ApplyJournalsInBatches(ctx, client, fixture, 0) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: size == len(req.Changes). All changes are submitted. + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 3) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: size < len(req.Changes). Changes are batched. + var iter = 0 + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Check(req, gc.DeepEquals, &pb.ApplyRequest{ + Changes: []pb.ApplyRequest_Change{ + {Upsert: fixture.Changes[iter].Upsert, ExpectModRevision: 1}, + }, + }) + iter++ + return expected, nil + } + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: empty list of changes. + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Error("should not be called") + return nil, nil + } + resp, err = ApplyJournalsInBatches(ctx, client, &pb.ApplyRequest{}, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, &pb.ApplyResponse{}) + + // Case: Return Error from backend. + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return nil, errors.New("something has gone wrong") + } + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong") + + // Case: Status !OK mapped as an error. + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return &pb.ApplyResponse{ + Status: pb.Status_ETCD_TRANSACTION_FAILED, + Header: *hdr, + }, nil + } + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1) + c.Check(err.Error(), gc.Matches, pb.Status_ETCD_TRANSACTION_FAILED.String()) + + // Case: Validation error mapped as error. + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return &pb.ApplyResponse{ + Status: pb.Status_ETCD_TRANSACTION_FAILED, + }, nil + } + resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`) +} + +func buildApplyReqFixtue() *pb.ApplyRequest { + // Create a fixture of JournalSpecs which we'll list. + var fragSpec = pb.JournalSpec_Fragment{ + Length: 1024, + RefreshInterval: time.Second, + CompressionCodec: pb.CompressionCodec_SNAPPY, + } + var specA = &pb.JournalSpec{ + Name: "journal/1/A", + LabelSet: pb.MustLabelSet("foo", "bar"), + Replication: 1, + Fragment: fragSpec, + } + var specB = &pb.JournalSpec{ + Name: "journal/2/B", + LabelSet: pb.MustLabelSet("bar", "baz"), + Replication: 1, + Fragment: fragSpec, + } + + return &pb.ApplyRequest{ + Changes: []pb.ApplyRequest_Change{ + {Upsert: specA, ExpectModRevision: 1}, + {Upsert: specB, ExpectModRevision: 1}, + }, + } +} + func buildListResponseFixture(names ...pb.Journal) (out []pb.ListResponse_Journal) { for _, n := range names { out = append(out, pb.ListResponse_Journal{ diff --git a/v2/pkg/consumer/shard_api.go b/v2/pkg/consumer/shard_api.go index 5ac3857f1..00ed769e2 100644 --- a/v2/pkg/consumer/shard_api.go +++ b/v2/pkg/consumer/shard_api.go @@ -191,16 +191,49 @@ func ListShards(ctx context.Context, sc ShardClient, req *ListRequest) (*ListRes } } -// ApplyShards invokes the Apply RPC, and maps a validation or !OK status to an error. +// ApplyShards invokes the Apply RPC. func ApplyShards(ctx context.Context, sc ShardClient, req *ApplyRequest) (*ApplyResponse, error) { - if r, err := sc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil { - return r, err - } else if err = r.Validate(); err != nil { - return r, err - } else if r.Status != Status_OK { - return r, errors.New(r.Status.String()) - } else { - return r, nil + return ApplyShardsInBatches(ctx, sc, req, 0) +} + +// ApplyShardsInBatches is a helper function for applying changes to shards which +// may be larger than the configured etcd transaction size size. The changes in +// |req| will be sent serially in batches of size |size|. If +// |size| is 0 all changes will be attempted as part of a single +// transaction. This function will return the response of the final +// ShardClient.Apply call. Response validation or !OK status from Apply RPC are +// mapped to error. +func ApplyShardsInBatches(ctx context.Context, sc ShardClient, req *ApplyRequest, size int) (*ApplyResponse, error) { + if len(req.Changes) == 0 { + return &ApplyResponse{}, nil + } + if size == 0 { + size = len(req.Changes) + } + var curReq = &ApplyRequest{} + var offset = 0 + + for { + if len(req.Changes[offset:]) > size { + curReq.Changes = req.Changes[offset : offset+size] + } else { + curReq.Changes = req.Changes[offset:] + } + + var resp *ApplyResponse + var err error + if resp, err = sc.Apply(pb.WithDispatchDefault(ctx), curReq, grpc.FailFast(false)); err != nil { + return resp, err + } else if err = resp.Validate(); err != nil { + return resp, err + } else if resp.Status != Status_OK { + return resp, errors.New(resp.Status.String()) + } + + offset = offset + len(curReq.Changes) + if offset == len(req.Changes) { + return resp, nil + } } } diff --git a/v2/pkg/consumer/shard_api_test.go b/v2/pkg/consumer/shard_api_test.go index b00aae627..1bedd17dc 100644 --- a/v2/pkg/consumer/shard_api_test.go +++ b/v2/pkg/consumer/shard_api_test.go @@ -1,6 +1,11 @@ package consumer import ( + "context" + "errors" + "time" + + "github.com/LiveRamp/gazette/v2/pkg/client" pb "github.com/LiveRamp/gazette/v2/pkg/protocol" "github.com/LiveRamp/gazette/v2/pkg/recoverylog" gc "github.com/go-check/check" @@ -183,6 +188,84 @@ func (s *APISuite) TestApplyCases(c *gc.C) { c.Check(err, gc.ErrorMatches, `Changes\[0\].Delete: not a valid token \(invalid shard id\)`) } +func (s *APISuite) TestApplyShardsInBatches(c *gc.C) { + var ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + var ss = newShardServerStub(ctx, c) + var client = NewRoutedShardClient(ss.MustClient(), client.NewRouteCache(2, time.Hour)) + + var hdr = buildHeaderFixture(ss) + // Case: size is 0. All changes are submitted. + var fixture = buildApplyReqFixtue() + var expected = &ApplyResponse{ + Status: Status_OK, + Header: *hdr, + } + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Check(req, gc.DeepEquals, fixture) + return expected, nil + } + resp, err := ApplyShardsInBatches(ctx, client, fixture, 0) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: size == len(req.Changes). All changes are submitted. + resp, err = ApplyShardsInBatches(ctx, client, fixture, 3) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: size < len(req.Changes). Changes are batched. + var iter = 0 + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Check(req, gc.DeepEquals, &ApplyRequest{ + Changes: []ApplyRequest_Change{ + {Upsert: fixture.Changes[iter].Upsert, ExpectModRevision: 1}, + }, + }) + iter++ + return expected, nil + } + resp, err = ApplyShardsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: empty list of changes. + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Error("should not be called") + return nil, nil + } + resp, err = ApplyShardsInBatches(ctx, client, &ApplyRequest{}, 1) + c.Check(resp, gc.DeepEquals, &ApplyResponse{}) + c.Check(err, gc.IsNil) + + // Case: Return Error from backend. + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return nil, errors.New("something has gone wrong") + } + resp, err = ApplyShardsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong") + + // Case: Status !OK mapped as an error. + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return &ApplyResponse{ + Status: Status_ETCD_TRANSACTION_FAILED, + Header: *hdr, + }, nil + } + resp, err = ApplyShardsInBatches(ctx, client, fixture, 1) + c.Check(err.Error(), gc.Matches, Status_ETCD_TRANSACTION_FAILED.String()) + + // Case: Validation error mapped as error. + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return &ApplyResponse{ + Status: Status_ETCD_TRANSACTION_FAILED, + }, nil + } + resp, err = ApplyShardsInBatches(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`) +} + func (s *APISuite) TestHintsCases(c *gc.C) { var tf, cleanup = newTestFixture(c) defer cleanup() @@ -280,4 +363,33 @@ func (s *APISuite) TestHintsCases(c *gc.C) { tf.allocateShard(c, spec) // Cleanup. } +func buildHeaderFixture(ep interface{ Endpoint() pb.Endpoint }) *pb.Header { + return &pb.Header{ + ProcessId: pb.ProcessSpec_ID{Zone: "a", Suffix: "broker"}, + Route: pb.Route{ + Members: []pb.ProcessSpec_ID{{Zone: "a", Suffix: "broker"}}, + Endpoints: []pb.Endpoint{ep.Endpoint()}, + Primary: 0, + }, + Etcd: pb.Header_Etcd{ + ClusterId: 12, + MemberId: 34, + Revision: 56, + RaftTerm: 78, + }, + } +} + +func buildApplyReqFixtue() *ApplyRequest { + var specA = makeShard(shardA) + var specB = makeShard(shardB) + + return &ApplyRequest{ + Changes: []ApplyRequest_Change{ + {Upsert: specA, ExpectModRevision: 1}, + {Upsert: specB, ExpectModRevision: 1}, + }, + } +} + var _ = gc.Suite(&APISuite{})