diff --git a/v2/cmd/gazctl/journals_apply.go b/v2/cmd/gazctl/journals_apply.go index 643b919ff..67b0dcf46 100644 --- a/v2/cmd/gazctl/journals_apply.go +++ b/v2/cmd/gazctl/journals_apply.go @@ -33,10 +33,10 @@ func (cmd *cmdJournalsApply) Execute([]string) error { } var ctx = context.Background() - resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req) + resp, err := client.ApplyJournalsLimit(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize) mbp.Must(err, "failed to apply journals") - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") + return nil } diff --git a/v2/cmd/gazctl/journals_edit.go b/v2/cmd/gazctl/journals_edit.go index df5a8a8ab..f0b2b22ea 100644 --- a/v2/cmd/gazctl/journals_edit.go +++ b/v2/cmd/gazctl/journals_edit.go @@ -7,6 +7,8 @@ import ( "github.com/LiveRamp/gazette/v2/cmd/gazctl/editor" "github.com/LiveRamp/gazette/v2/pkg/client" + mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate" + pb "github.com/LiveRamp/gazette/v2/pkg/protocol" "github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" @@ -54,11 +56,11 @@ func (cmd *cmdJournalsEdit) applySpecs(b []byte) error { } var ctx = context.Background() - if resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req); err != nil { - return err - } else { - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") - } + var resp *pb.ApplyResponse + var err error + resp, err = client.ApplyJournalsLimit(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize) + mbp.Must(err, "failed to apply journals") + log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") return nil } diff --git a/v2/cmd/gazctl/main.go b/v2/cmd/gazctl/main.go index fc50642df..282c8430f 100644 --- a/v2/cmd/gazctl/main.go +++ b/v2/cmd/gazctl/main.go @@ -40,12 +40,15 @@ type ListConfig struct { // ApplyConfig is common configuration of apply operations. type ApplyConfig struct { - SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"` - DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"` + SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"` + DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"` + MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"Maxium number of transactions to be processed within a single change batch"` } +// EditConfig is common configuration for exit operations. type EditConfig struct { - Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"` + Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"` + MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"Maxium number of transactions to be processed within a single change batch"` } func (cfg ApplyConfig) decode(into interface{}) error { diff --git a/v2/cmd/gazctl/shards_apply.go b/v2/cmd/gazctl/shards_apply.go index 0227a774b..cccede34d 100644 --- a/v2/cmd/gazctl/shards_apply.go +++ b/v2/cmd/gazctl/shards_apply.go @@ -30,10 +30,12 @@ func (cmd *cmdShardsApply) Execute([]string) error { } var ctx = context.Background() - resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req) + var resp *consumer.ApplyResponse + var err error + resp, err = consumer.ApplyShardsLimit(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize) mbp.Must(err, "failed to apply shards") - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") + return nil } diff --git a/v2/cmd/gazctl/shards_edit.go b/v2/cmd/gazctl/shards_edit.go index b7a1a06e4..2fa34ce59 100644 --- a/v2/cmd/gazctl/shards_edit.go +++ b/v2/cmd/gazctl/shards_edit.go @@ -8,6 +8,7 @@ import ( "github.com/LiveRamp/gazette/v2/cmd/gazctl/editor" "github.com/LiveRamp/gazette/v2/pkg/consumer" "github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace" + mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -21,7 +22,7 @@ func (cmd *cmdShardsEdit) Execute([]string) error { return editor.EditRetryLoop(editor.RetryLoopArgs{ FilePrefix: "gazctl-shards-edit-", SelectFn: cmd.selectSpecs, - ApplyFn: applyShardSpecYAML, + ApplyFn: cmd.applyShardSpecYAML, AbortIfUnchanged: true, }) } @@ -38,7 +39,7 @@ func (cmd *cmdShardsEdit) selectSpecs() io.Reader { return buf } -func applyShardSpecYAML(b []byte) error { +func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error { var set shardspace.Set if err := yaml.UnmarshalStrict(b, &set); err != nil { return err @@ -49,11 +50,10 @@ func applyShardSpecYAML(b []byte) error { } var ctx = context.Background() - if resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req); err != nil { - return err - } else { - log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") - } - + var resp *consumer.ApplyResponse + var err error + resp, err = consumer.ApplyShardsLimit(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize) + mbp.Must(err, "failed to apply shards") + log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied") return nil } diff --git a/v2/pkg/client/list.go b/v2/pkg/client/list.go index 12fc05197..0a2968bc9 100644 --- a/v2/pkg/client/list.go +++ b/v2/pkg/client/list.go @@ -94,17 +94,56 @@ func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRe return resp, nil } -// ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error. +// ApplyJournals invokes the Apply RPC. func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { - if r, err := jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil { - return r, err - } else if err = r.Validate(); err != nil { - return r, err - } else if r.Status != pb.Status_OK { - return r, errors.New(r.Status.String()) - } else { - return r, nil + return ApplyJournalsLimit(ctx, jc, req, 0) +} + +// ApplyJournalsLimit is a helper function for applying changes to journals +// which may be larger than the configured etcd transaction size limit. The +// changes in |parentReq| will be batched with a size of |maxTxnSize| and +// sent serially. If |maxTxnSize| is 0 all changes will be attempted as +// part of a single transaction. This function will return the response of +// the final JournalsClient.Apply call. Response validation or !OK status +// from Apply RPC are mapped to error. In the event of an error any +// unapplied changes will be available on |parentReq|. +func ApplyJournalsLimit( + ctx context.Context, + jc pb.JournalClient, + parentReq *pb.ApplyRequest, + maxTxnSize int, +) (*pb.ApplyResponse, error) { + var changes []pb.ApplyRequest_Change + if maxTxnSize == 0 { + maxTxnSize = len(parentReq.Changes) + } + var finalResp *pb.ApplyResponse + + for len(parentReq.Changes) > 0 { + if len(parentReq.Changes) > maxTxnSize { + changes = parentReq.Changes[:maxTxnSize] + } else { + changes = parentReq.Changes + } + + var req = &pb.ApplyRequest{} + for _, change := range changes { + req.Changes = append(req.Changes, change) + } + + var resp *pb.ApplyResponse + var err error + if resp, err = jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil { + return resp, err + } else if err = resp.Validate(); err != nil { + return resp, err + } else if resp.Status != pb.Status_OK { + return resp, errors.New(resp.Status.String()) + } + finalResp = resp + parentReq.Changes = parentReq.Changes[len(changes):] } + return finalResp, nil } // ListAllFragments performs multiple Fragments RPCs, as required to join across multiple diff --git a/v2/pkg/client/list_test.go b/v2/pkg/client/list_test.go index bd365acd2..77fc0d957 100644 --- a/v2/pkg/client/list_test.go +++ b/v2/pkg/client/list_test.go @@ -185,6 +185,124 @@ func (s *ListSuite) TestListAllFragments(c *gc.C) { c.Check(err, gc.ErrorMatches, `Status: invalid status \(1000\)`) } +func (s *ListSuite) TestApplyJournalsLimit(c *gc.C) { + var ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + var broker = teststub.NewBroker(c, ctx) + + var client = pb.NewRoutedJournalClient(broker.MustClient(), NewRouteCache(2, time.Hour)) + + var hdr = buildHeaderFixture(broker) + // Case: MxnTxnSize is 0. All changes are submitted. + var fixture = buildApplyReqFixtue() + var expected = &pb.ApplyResponse{ + Status: pb.Status_OK, + Header: *hdr, + } + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Check(req, gc.DeepEquals, fixture) + return expected, nil + } + resp, err := ApplyJournalsLimit(ctx, client, fixture, 0) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + c.Check(len(fixture.Changes), gc.Equals, 0) + + // Case: MxnTxnSize == len(req.Changes). All changes are submitted. + fixture = buildApplyReqFixtue() + resp, err = ApplyJournalsLimit(ctx, client, fixture, 3) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + c.Check(len(fixture.Changes), gc.Equals, 0) + + // Case: MxnTxnSize < len(req.Changes). Changes are batched. + fixture = buildApplyReqFixtue() + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Check(req, gc.DeepEquals, &pb.ApplyRequest{ + Changes: []pb.ApplyRequest_Change{ + {Upsert: fixture.Changes[0].Upsert, ExpectModRevision: 1}, + }, + }) + return expected, nil + } + resp, err = ApplyJournalsLimit(ctx, client, fixture, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + c.Check(len(fixture.Changes), gc.Equals, 0) + + // Case: MxnTxnSize == 0. + fixture = &pb.ApplyRequest{} + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + c.Error("should not be called") + return nil, nil + } + resp, err = ApplyJournalsLimit(ctx, client, &pb.ApplyRequest{}, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.IsNil) + c.Check(len(fixture.Changes), gc.Equals, 0) + + // Case: Return Error from backend. + fixture = buildApplyReqFixtue() + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return nil, errors.New("something has gone wrong") + } + resp, err = ApplyJournalsLimit(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong") + c.Check(len(fixture.Changes), gc.Equals, 2) + + // Case: Status !OK mapped as an error. + fixture = buildApplyReqFixtue() + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return &pb.ApplyResponse{ + Status: pb.Status_ETCD_TRANSACTION_FAILED, + Header: *hdr, + }, nil + } + resp, err = ApplyJournalsLimit(ctx, client, fixture, 1) + c.Check(err.Error(), gc.Matches, pb.Status_ETCD_TRANSACTION_FAILED.String()) + c.Check(len(fixture.Changes), gc.Equals, 2) + + // Case: Validation error mapped as error. + fixture = buildApplyReqFixtue() + broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + return &pb.ApplyResponse{ + Status: pb.Status_ETCD_TRANSACTION_FAILED, + }, nil + } + resp, err = ApplyJournalsLimit(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`) + c.Check(len(fixture.Changes), gc.Equals, 2) +} + +func buildApplyReqFixtue() *pb.ApplyRequest { + // Create a fixture of JournalSpecs which we'll list. + var fragSpec = pb.JournalSpec_Fragment{ + Length: 1024, + RefreshInterval: time.Second, + CompressionCodec: pb.CompressionCodec_SNAPPY, + } + var specA = &pb.JournalSpec{ + Name: "journal/1/A", + LabelSet: pb.MustLabelSet("foo", "bar"), + Replication: 1, + Fragment: fragSpec, + } + var specB = &pb.JournalSpec{ + Name: "journal/2/B", + LabelSet: pb.MustLabelSet("bar", "baz"), + Replication: 1, + Fragment: fragSpec, + } + + return &pb.ApplyRequest{ + Changes: []pb.ApplyRequest_Change{ + {Upsert: specA, ExpectModRevision: 1}, + {Upsert: specB, ExpectModRevision: 1}, + }, + } +} + func buildListResponseFixture(names ...pb.Journal) (out []pb.ListResponse_Journal) { for _, n := range names { out = append(out, pb.ListResponse_Journal{ diff --git a/v2/pkg/consumer/shard_api.go b/v2/pkg/consumer/shard_api.go index 5ac3857f1..b50d7d349 100644 --- a/v2/pkg/consumer/shard_api.go +++ b/v2/pkg/consumer/shard_api.go @@ -191,17 +191,56 @@ func ListShards(ctx context.Context, sc ShardClient, req *ListRequest) (*ListRes } } -// ApplyShards invokes the Apply RPC, and maps a validation or !OK status to an error. +// ApplyShards invokes the Apply RPC. func ApplyShards(ctx context.Context, sc ShardClient, req *ApplyRequest) (*ApplyResponse, error) { - if r, err := sc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil { - return r, err - } else if err = r.Validate(); err != nil { - return r, err - } else if r.Status != Status_OK { - return r, errors.New(r.Status.String()) - } else { - return r, nil + return ApplyShardsLimit(ctx, sc, req, 0) +} + +// ApplyShardsLimit is a helper function for applying changes to journals +// which may be larger than the configured etcd transaction size limit. The +// changes in |parentReq| will be broken in request with |maxTxnSize| changes +// and sent serially. If |maxTxnSize| is 0 all changes will be attempted as +// part of a single transaction. This function will return the response of +// the final ShardClient.Apply call. Response validation or !OK status +// from Apply RPC are mapped to error. In the event of an error any +// unapplied changes will be available on |parentReq|. +func ApplyShardsLimit( + ctx context.Context, + sc ShardClient, + parentReq *ApplyRequest, + maxTxnSize int, +) (*ApplyResponse, error) { + var changes []ApplyRequest_Change + if maxTxnSize == 0 { + maxTxnSize = len(parentReq.Changes) + } + var finalResp *ApplyResponse + + for len(parentReq.Changes) > 0 { + if len(parentReq.Changes) > maxTxnSize { + changes = parentReq.Changes[:maxTxnSize] + } else { + changes = parentReq.Changes + } + + var req = &ApplyRequest{} + for _, change := range changes { + req.Changes = append(req.Changes, change) + } + + var resp *ApplyResponse + var err error + if resp, err = sc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil { + return resp, err + } else if err = resp.Validate(); err != nil { + return resp, err + } else if resp.Status != Status_OK { + return resp, errors.New(resp.Status.String()) + } + finalResp = resp + parentReq.Changes = parentReq.Changes[len(changes):] } + return finalResp, nil } // FetchHints invokes the Hints RPC, and maps a validation or !OK status to an error. diff --git a/v2/pkg/consumer/shard_api_test.go b/v2/pkg/consumer/shard_api_test.go index b00aae627..48c782894 100644 --- a/v2/pkg/consumer/shard_api_test.go +++ b/v2/pkg/consumer/shard_api_test.go @@ -1,6 +1,11 @@ package consumer import ( + "context" + "errors" + "time" + + "github.com/LiveRamp/gazette/v2/pkg/client" pb "github.com/LiveRamp/gazette/v2/pkg/protocol" "github.com/LiveRamp/gazette/v2/pkg/recoverylog" gc "github.com/go-check/check" @@ -183,6 +188,94 @@ func (s *APISuite) TestApplyCases(c *gc.C) { c.Check(err, gc.ErrorMatches, `Changes\[0\].Delete: not a valid token \(invalid shard id\)`) } +func (s *APISuite) TestApplyShardsLimit(c *gc.C) { + var ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + + var ss = NewShardServerStub(ctx, c) + var client = NewRoutedShardClient(ss.MustClient(), client.NewRouteCache(2, time.Hour)) + + var hdr = buildHeaderFixture(ss) + // Case: MxnTxnSize is 0. All changes are submitted. + var fixture = buildApplyReqFixtue() + var expected = &ApplyResponse{ + Status: Status_OK, + Header: *hdr, + } + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Check(req, gc.DeepEquals, fixture) + return expected, nil + } + resp, err := ApplyShardsLimit(ctx, client, fixture, 0) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + + // Case: MxnTxnSize == len(req.Changes). All changes are submitted. + fixture = buildApplyReqFixtue() + resp, err = ApplyShardsLimit(ctx, client, fixture, 3) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + c.Check(len(fixture.Changes), gc.Equals, 0) + + // Case: MxnTxnSize < len(req.Changes). Changes are batched. + fixture = buildApplyReqFixtue() + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Check(req, gc.DeepEquals, &ApplyRequest{ + Changes: []ApplyRequest_Change{ + {Upsert: fixture.Changes[0].Upsert, ExpectModRevision: 1}, + }, + }) + return expected, nil + } + resp, err = ApplyShardsLimit(ctx, client, fixture, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.DeepEquals, expected) + c.Check(len(fixture.Changes), gc.Equals, 0) + + // Case: MxnTxnSize == 0. + fixture = &ApplyRequest{} + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + c.Error("should not be called") + return nil, nil + } + resp, err = ApplyShardsLimit(ctx, client, &ApplyRequest{}, 1) + c.Check(err, gc.IsNil) + c.Check(resp, gc.IsNil) + c.Check(len(fixture.Changes), gc.Equals, 0) + + // Case: Return Error from backend. + fixture = buildApplyReqFixtue() + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return nil, errors.New("something has gone wrong") + } + resp, err = ApplyShardsLimit(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong") + c.Check(len(fixture.Changes), gc.Equals, 2) + + // Case: Status !OK mapped as an error. + fixture = buildApplyReqFixtue() + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return &ApplyResponse{ + Status: Status_ETCD_TRANSACTION_FAILED, + Header: *hdr, + }, nil + } + resp, err = ApplyShardsLimit(ctx, client, fixture, 1) + c.Check(err.Error(), gc.Matches, Status_ETCD_TRANSACTION_FAILED.String()) + c.Check(len(fixture.Changes), gc.Equals, 2) + + // Case: Validation error mapped as error. + fixture = buildApplyReqFixtue() + ss.ApplyFunc = func(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + return &ApplyResponse{ + Status: Status_ETCD_TRANSACTION_FAILED, + }, nil + } + resp, err = ApplyShardsLimit(ctx, client, fixture, 1) + c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`) + c.Check(len(fixture.Changes), gc.Equals, 2) +} + func (s *APISuite) TestHintsCases(c *gc.C) { var tf, cleanup = newTestFixture(c) defer cleanup() @@ -280,4 +373,33 @@ func (s *APISuite) TestHintsCases(c *gc.C) { tf.allocateShard(c, spec) // Cleanup. } +func buildHeaderFixture(ep interface{ Endpoint() pb.Endpoint }) *pb.Header { + return &pb.Header{ + ProcessId: pb.ProcessSpec_ID{Zone: "a", Suffix: "broker"}, + Route: pb.Route{ + Members: []pb.ProcessSpec_ID{{Zone: "a", Suffix: "broker"}}, + Endpoints: []pb.Endpoint{ep.Endpoint()}, + Primary: 0, + }, + Etcd: pb.Header_Etcd{ + ClusterId: 12, + MemberId: 34, + Revision: 56, + RaftTerm: 78, + }, + } +} + +func buildApplyReqFixtue() *ApplyRequest { + var specA = makeShard(shardA) + var specB = makeShard(shardB) + + return &ApplyRequest{ + Changes: []ApplyRequest_Change{ + {Upsert: specA, ExpectModRevision: 1}, + {Upsert: specB, ExpectModRevision: 1}, + }, + } +} + var _ = gc.Suite(&APISuite{})