Skip to content

Commit

Permalink
Add max-txn-size flag to gazctl
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Skelcy authored and jskelcy committed Feb 25, 2019
1 parent e2c1219 commit 08aa452
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 42 deletions.
4 changes: 2 additions & 2 deletions v2/cmd/gazctl/journals_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func (cmd *cmdJournalsApply) Execute([]string) error {
}

var ctx = context.Background()
resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req)
var resp, err = client.ApplyJournalsInBatches(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply journals")

log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}

Expand Down
9 changes: 4 additions & 5 deletions v2/cmd/gazctl/journals_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/LiveRamp/gazette/v2/cmd/gazctl/editor"
"github.com/LiveRamp/gazette/v2/pkg/client"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
"github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -54,11 +55,9 @@ func (cmd *cmdJournalsEdit) applySpecs(b []byte) error {
}

var ctx = context.Background()
if resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req); err != nil {
return err
} else {
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
}
var resp, err = client.ApplyJournalsInBatches(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply journals")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}
28 changes: 21 additions & 7 deletions v2/cmd/gazctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type ListConfig struct {

// ApplyConfig is common configuration of apply operations.
type ApplyConfig struct {
SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"`
DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"`
SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"`
DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"`
MaxTxnSize int `long:"max-txn-size" default:"0" description:"maximum number of specs to be processed within an apply transaction. If 0, the default, all changes are issued in a single transaction"`
}

// EditConfig is common configuration for exit operations.
type EditConfig struct {
Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"`
Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"`
MaxTxnSize int `long:"max-txn-size" default:"0" description:"maximum number of specs to be processed within an apply transaction. If 0, the default, all changes are issued in a single transaction"`
}

func (cfg ApplyConfig) decode(into interface{}) error {
Expand Down Expand Up @@ -147,7 +150,7 @@ journals or parents thereof in the hierarchy. Note that deleted parent prefixes
will cascade only to JournalSpecs *explicitly listed* as children of the prefix
in the YAML, and not to other JournalSpecs which may exist with the prefix but
are not enumerated.
`, &cmdJournalsApply{})
`+maxTxnSizeWarning, &cmdJournalsApply{})

_ = addCmd(cmdShards, "apply", "Apply shard specifications", `
Apply a collection of ShardSpec creations, updates, or deletions.
Expand All @@ -161,7 +164,7 @@ collection of ShardSpecs; this check ensures concurrent modifications are caught
ShardSpecs may be created by setting "revision" to zero or omitting it altogether.
ShardSpecs may be deleted by setting their field "delete" to true.
`, &cmdShardsApply{})
`+maxTxnSizeWarning, &cmdShardsApply{})

_ = addCmd(cmdJournals, "read", "Read journal contents", `
Read the contents journal or journals as a stream.
Expand All @@ -183,8 +186,8 @@ To read from an arbitrary offset into a journal(s) use the --offset flag.
If not passed the default value is -1 which will read from the head of the journal.
`, &cmdJournalRead{})

_ = addCmd(cmdJournals, "edit", "Edit journal specifications", journalsEditLongDesc, &cmdJournalsEdit{})
_ = addCmd(cmdShards, "edit", "Edit shard specifications", shardsEditLongDesc, &cmdShardsEdit{})
_ = addCmd(cmdJournals, "edit", "Edit journal specifications", journalsEditLongDesc+maxTxnSizeWarningEdit, &cmdJournalsEdit{})
_ = addCmd(cmdShards, "edit", "Edit shard specifications", shardsEditLongDesc+maxTxnSizeWarningEdit, &cmdShardsEdit{})
_ = addCmd(cmdShards, "prune", "Removes fragments of a hinted recovery log which are no longer needed", `
Recovery logs capture every write which has ever occurred in a Shard DB.
This includes all prior writes of client keys & values, and also RocksDB
Expand Down Expand Up @@ -242,6 +245,17 @@ const editCmdLongDescription = `The edit command allows you to directly edit jou
Upon exiting the editor, if the file has been changed, it will be validated and applied. If the file is invalid or fails to apply, the editor is re-opened. Exiting the editor with no changes or saving an empty file are interpreted as the user aborting the edit attempt.`

const maxTxnSizeWarning = `
In the event that this command generates more changes than are possible in a
single etcd transaction given the current server configation (default 128).
Gazctl supports a max transaction size flag (--max-txn-size) which will send
the changes in batches of at most the max transaction size, however this means
a loss of transactionality and should be used with caution.`

const maxTxnSizeWarningEdit = maxTxnSizeWarning + ` Instead it is
recomended that additional label selectors are used to limit the number of
changes within this operation.`

type editDescription struct {
Type, HelpCommand, Examples string
}
Expand Down
4 changes: 2 additions & 2 deletions v2/cmd/gazctl/shards_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func (cmd *cmdShardsApply) Execute([]string) error {
}

var ctx = context.Background()
resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req)
var resp, err = consumer.ApplyShardsInBatches(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply shards")

log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}

Expand Down
14 changes: 6 additions & 8 deletions v2/cmd/gazctl/shards_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/LiveRamp/gazette/v2/cmd/gazctl/editor"
"github.com/LiveRamp/gazette/v2/pkg/consumer"
"github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
Expand All @@ -21,7 +22,7 @@ func (cmd *cmdShardsEdit) Execute([]string) error {
return editor.EditRetryLoop(editor.RetryLoopArgs{
FilePrefix: "gazctl-shards-edit-",
SelectFn: cmd.selectSpecs,
ApplyFn: applyShardSpecYAML,
ApplyFn: cmd.applyShardSpecYAML,
AbortIfUnchanged: true,
})
}
Expand All @@ -38,7 +39,7 @@ func (cmd *cmdShardsEdit) selectSpecs() io.Reader {
return buf
}

func applyShardSpecYAML(b []byte) error {
func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error {
var set shardspace.Set
if err := yaml.UnmarshalStrict(b, &set); err != nil {
return err
Expand All @@ -49,11 +50,8 @@ func applyShardSpecYAML(b []byte) error {
}

var ctx = context.Background()
if resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req); err != nil {
return err
} else {
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
}

var resp, err = consumer.ApplyShardsInBatches(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply shards")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
return nil
}
51 changes: 42 additions & 9 deletions v2/pkg/client/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,49 @@ func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRe
return resp, nil
}

// ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error.
// ApplyJournals invokes the Apply RPC.
func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
if r, err := jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return r, err
} else if err = r.Validate(); err != nil {
return r, err
} else if r.Status != pb.Status_OK {
return r, errors.New(r.Status.String())
} else {
return r, nil
return ApplyJournalsInBatches(ctx, jc, req, 0)
}

// ApplyJournalsInBatches applies changes to journals which
// may be larger than the configured etcd transaction size size. The changes in
// |req| will be sent serially in batches of size |size|. If
// |size| is 0 all changes will be attempted as part of a single
// transaction. This function will return the response of the final
// ShardClient.Apply call. Response validation or !OK status from Apply RPC are
// mapped to error.
func ApplyJournalsInBatches(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest, size int) (*pb.ApplyResponse, error) {
if len(req.Changes) == 0 {
return &pb.ApplyResponse{}, nil
}
if size == 0 {
size = len(req.Changes)
}
var curReq = &pb.ApplyRequest{}
var offset = 0

for {
if len(req.Changes[offset:]) > size {
curReq.Changes = req.Changes[offset : offset+size]
} else {
curReq.Changes = req.Changes[offset:]
}

var resp *pb.ApplyResponse
var err error
if resp, err = jc.Apply(pb.WithDispatchDefault(ctx), curReq, grpc.WaitForReady(true)); err != nil {
return resp, err
} else if err = resp.Validate(); err != nil {
return resp, err
} else if resp.Status != pb.Status_OK {
return resp, errors.New(resp.Status.String())
}

offset = offset + len(curReq.Changes)
if offset == len(req.Changes) {
return resp, nil
}
}
}

Expand Down
107 changes: 107 additions & 0 deletions v2/pkg/client/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,113 @@ func (s *ListSuite) TestListAllFragments(c *gc.C) {
c.Check(err, gc.ErrorMatches, `Status: invalid status \(1000\)`)
}

func (s *ListSuite) TestApplyJournalsInBatches(c *gc.C) {
var ctx, cancel = context.WithCancel(context.Background())
defer cancel()

var broker = teststub.NewBroker(c, ctx)

var client = pb.NewRoutedJournalClient(broker.MustClient(), NewRouteCache(2, time.Hour))

var hdr = buildHeaderFixture(broker)
// Case: size is 0. All changes are submitted.
var fixture = buildApplyReqFixtue()
var expected = &pb.ApplyResponse{
Status: pb.Status_OK,
Header: *hdr,
}
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Check(req, gc.DeepEquals, fixture)
return expected, nil
}
resp, err := ApplyJournalsInBatches(ctx, client, fixture, 0)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)

// Case: size == len(req.Changes). All changes are submitted.
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 3)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)

// Case: size < len(req.Changes). Changes are batched.
var iter = 0
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Check(req, gc.DeepEquals, &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
{Upsert: fixture.Changes[iter].Upsert, ExpectModRevision: 1},
},
})
iter++
return expected, nil
}
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)

// Case: empty list of changes.
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Error("should not be called")
return nil, nil
}
resp, err = ApplyJournalsInBatches(ctx, client, &pb.ApplyRequest{}, 1)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, &pb.ApplyResponse{})

// Case: Return Error from backend.
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return nil, errors.New("something has gone wrong")
}
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1)
c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong")

// Case: Status !OK mapped as an error.
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{
Status: pb.Status_ETCD_TRANSACTION_FAILED,
Header: *hdr,
}, nil
}
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1)
c.Check(err.Error(), gc.Matches, pb.Status_ETCD_TRANSACTION_FAILED.String())

// Case: Validation error mapped as error.
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{
Status: pb.Status_ETCD_TRANSACTION_FAILED,
}, nil
}
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1)
c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`)
}

func buildApplyReqFixtue() *pb.ApplyRequest {
// Create a fixture of JournalSpecs which we'll list.
var fragSpec = pb.JournalSpec_Fragment{
Length: 1024,
RefreshInterval: time.Second,
CompressionCodec: pb.CompressionCodec_SNAPPY,
}
var specA = &pb.JournalSpec{
Name: "journal/1/A",
LabelSet: pb.MustLabelSet("foo", "bar"),
Replication: 1,
Fragment: fragSpec,
}
var specB = &pb.JournalSpec{
Name: "journal/2/B",
LabelSet: pb.MustLabelSet("bar", "baz"),
Replication: 1,
Fragment: fragSpec,
}

return &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
{Upsert: specA, ExpectModRevision: 1},
{Upsert: specB, ExpectModRevision: 1},
},
}
}

func buildListResponseFixture(names ...pb.Journal) (out []pb.ListResponse_Journal) {
for _, n := range names {
out = append(out, pb.ListResponse_Journal{
Expand Down
51 changes: 42 additions & 9 deletions v2/pkg/consumer/shard_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,49 @@ func ListShards(ctx context.Context, sc ShardClient, req *ListRequest) (*ListRes
}
}

// ApplyShards invokes the Apply RPC, and maps a validation or !OK status to an error.
// ApplyShards invokes the Apply RPC.
func ApplyShards(ctx context.Context, sc ShardClient, req *ApplyRequest) (*ApplyResponse, error) {
if r, err := sc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return r, err
} else if err = r.Validate(); err != nil {
return r, err
} else if r.Status != Status_OK {
return r, errors.New(r.Status.String())
} else {
return r, nil
return ApplyShardsInBatches(ctx, sc, req, 0)
}

// ApplyShardsInBatches applies changes to shards which
// may be larger than the configured etcd transaction size size. The changes in
// |req| will be sent serially in batches of size |size|. If
// |size| is 0 all changes will be attempted as part of a single
// transaction. This function will return the response of the final
// ShardClient.Apply call. Response validation or !OK status from Apply RPC are
// mapped to error.
func ApplyShardsInBatches(ctx context.Context, sc ShardClient, req *ApplyRequest, size int) (*ApplyResponse, error) {
if len(req.Changes) == 0 {
return &ApplyResponse{}, nil
}
if size == 0 {
size = len(req.Changes)
}
var curReq = &ApplyRequest{}
var offset = 0

for {
if len(req.Changes[offset:]) > size {
curReq.Changes = req.Changes[offset : offset+size]
} else {
curReq.Changes = req.Changes[offset:]
}

var resp *ApplyResponse
var err error
if resp, err = sc.Apply(pb.WithDispatchDefault(ctx), curReq, grpc.WaitForReady(true)); err != nil {
return resp, err
} else if err = resp.Validate(); err != nil {
return resp, err
} else if resp.Status != Status_OK {
return resp, errors.New(resp.Status.String())
}

offset = offset + len(curReq.Changes)
if offset == len(req.Changes) {
return resp, nil
}
}
}

Expand Down
Loading

0 comments on commit 08aa452

Please sign in to comment.