Skip to content

Commit

Permalink
Add max-txn-size flag to gazctl
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Skelcy committed Feb 21, 2019
1 parent 99a79a0 commit 34e19ed
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 36 deletions.
20 changes: 18 additions & 2 deletions v2/cmd/gazctl/journals_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
)
Expand All @@ -33,10 +34,13 @@ func (cmd *cmdJournalsApply) Execute([]string) error {
}

var ctx = context.Background()
resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req)
resp, err := client.ApplyJournalsLimit(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize)
if err == rpctypes.ErrGRPCTooManyOps {
tooManyOpsPanic()
}
mbp.Must(err, "failed to apply journals")

log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}

Expand All @@ -59,3 +63,15 @@ func newJournalSpecApplyRequest(tree *journalspace.Node) *pb.ApplyRequest {
})
return req
}

func tooManyOpsPanic() {
log.Panic(`
This operation has generated more changes than are possible in a single etcd
transaction given the current server configation. Gazctl supports a
max transaction size flag (--max-txn-size) which will send the changes in
batches of at most the max transaction size, however this means a loss
of transactionality and should be used with caution. Instead it is recomended
that additional label selectors are used to limit the number of changes within
this operation.
`)
}
14 changes: 10 additions & 4 deletions v2/cmd/gazctl/journals_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (

"github.com/LiveRamp/gazette/v2/cmd/gazctl/editor"
"github.com/LiveRamp/gazette/v2/pkg/client"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -54,11 +57,14 @@ func (cmd *cmdJournalsEdit) applySpecs(b []byte) error {
}

var ctx = context.Background()
if resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req); err != nil {
return err
} else {
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
var resp *pb.ApplyResponse
var err error
resp, err = client.ApplyJournalsLimit(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize)
if err == rpctypes.ErrTooManyOps {
tooManyOpsPanic()
}
mbp.Must(err, "failed to apply journals")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}
9 changes: 6 additions & 3 deletions v2/cmd/gazctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type ListConfig struct {

// ApplyConfig is common configuration of apply operations.
type ApplyConfig struct {
SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"`
DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"`
SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"`
DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"`
MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"Maxium number of transactions to be processed within a single batch"`
}

// EditConfig is common configuration for exit operations.
type EditConfig struct {
Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"`
Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"`
MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"Maxium number of transactions to be processed within a single batch"`
}

func (cfg ApplyConfig) decode(into interface{}) error {
Expand Down
10 changes: 8 additions & 2 deletions v2/cmd/gazctl/shards_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/LiveRamp/gazette/v2/pkg/consumer"
"github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
)
Expand All @@ -30,10 +31,15 @@ func (cmd *cmdShardsApply) Execute([]string) error {
}

var ctx = context.Background()
resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req)
var resp *consumer.ApplyResponse
var err error
resp, err = consumer.ApplyShardsLimit(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize)
if err == rpctypes.ErrTooManyOps {
tooManyOpsPanic()
}
mbp.Must(err, "failed to apply shards")

log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}

Expand Down
18 changes: 11 additions & 7 deletions v2/cmd/gazctl/shards_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/LiveRamp/gazette/v2/cmd/gazctl/editor"
"github.com/LiveRamp/gazette/v2/pkg/consumer"
"github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
Expand All @@ -21,7 +23,7 @@ func (cmd *cmdShardsEdit) Execute([]string) error {
return editor.EditRetryLoop(editor.RetryLoopArgs{
FilePrefix: "gazctl-shards-edit-",
SelectFn: cmd.selectSpecs,
ApplyFn: applyShardSpecYAML,
ApplyFn: cmd.applyShardSpecYAML,
AbortIfUnchanged: true,
})
}
Expand All @@ -38,7 +40,7 @@ func (cmd *cmdShardsEdit) selectSpecs() io.Reader {
return buf
}

func applyShardSpecYAML(b []byte) error {
func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error {
var set shardspace.Set
if err := yaml.UnmarshalStrict(b, &set); err != nil {
return err
Expand All @@ -49,11 +51,13 @@ func applyShardSpecYAML(b []byte) error {
}

var ctx = context.Background()
if resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req); err != nil {
return err
} else {
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
var resp *consumer.ApplyResponse
var err error
resp, err = consumer.ApplyShardsLimit(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize)
if err == rpctypes.ErrGRPCTooManyOps {
tooManyOpsPanic()
}

mbp.Must(err, "failed to apply shards")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
return nil
}
57 changes: 48 additions & 9 deletions v2/pkg/client/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,56 @@ func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRe
return resp, nil
}

// ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error.
// ApplyJournals invokes the Apply RPC.
func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
if r, err := jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return r, err
} else if err = r.Validate(); err != nil {
return r, err
} else if r.Status != pb.Status_OK {
return r, errors.New(r.Status.String())
} else {
return r, nil
return ApplyJournalsLimit(ctx, jc, req, 0)
}

// ApplyJournalsLimit is a helper function for applying changes to journals which
// may be larger than the configured etcd transaction size limit. The changes in
// |parentReq| will be sent serially in batches of size |maxTxnSize|. If
// |maxTxnSize| is 0 all changes will be attempted as part of a single
// transaction. This function will return the response of the final
// ShardClient.Apply call. Response validation or !OK status from Apply RPC are
// mapped to error. In the event of an error any unapplied changes will be
// available on |parentReq|.
func ApplyJournalsLimit(
ctx context.Context,
jc pb.JournalClient,
parentReq *pb.ApplyRequest,
maxTxnSize int,
) (*pb.ApplyResponse, error) {
var changes []pb.ApplyRequest_Change
if maxTxnSize == 0 {
maxTxnSize = len(parentReq.Changes)
}
var finalResp *pb.ApplyResponse

for len(parentReq.Changes) > 0 {
if len(parentReq.Changes) > maxTxnSize {
changes = parentReq.Changes[:maxTxnSize]
} else {
changes = parentReq.Changes
}

var req = &pb.ApplyRequest{}
for _, change := range changes {
req.Changes = append(req.Changes, change)
}

var resp *pb.ApplyResponse
var err error
if resp, err = jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return resp, err
} else if err = resp.Validate(); err != nil {
return resp, err
} else if resp.Status != pb.Status_OK {
return resp, errors.New(resp.Status.String())
}
finalResp = resp
parentReq.Changes = parentReq.Changes[len(changes):]
}
return finalResp, nil
}

// ListAllFragments performs multiple Fragments RPCs, as required to join across multiple
Expand Down
118 changes: 118 additions & 0 deletions v2/pkg/client/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,124 @@ func (s *ListSuite) TestListAllFragments(c *gc.C) {
c.Check(err, gc.ErrorMatches, `Status: invalid status \(1000\)`)
}

func (s *ListSuite) TestApplyJournalsLimit(c *gc.C) {
var ctx, cancel = context.WithCancel(context.Background())
defer cancel()

var broker = teststub.NewBroker(c, ctx)

var client = pb.NewRoutedJournalClient(broker.MustClient(), NewRouteCache(2, time.Hour))

var hdr = buildHeaderFixture(broker)
// Case: MxnTxnSize is 0. All changes are submitted.
var fixture = buildApplyReqFixtue()
var expected = &pb.ApplyResponse{
Status: pb.Status_OK,
Header: *hdr,
}
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Check(req, gc.DeepEquals, fixture)
return expected, nil
}
resp, err := ApplyJournalsLimit(ctx, client, fixture, 0)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)
c.Check(len(fixture.Changes), gc.Equals, 0)

// Case: MxnTxnSize == len(req.Changes). All changes are submitted.
fixture = buildApplyReqFixtue()
resp, err = ApplyJournalsLimit(ctx, client, fixture, 3)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)
c.Check(len(fixture.Changes), gc.Equals, 0)

// Case: MxnTxnSize < len(req.Changes). Changes are batched.
fixture = buildApplyReqFixtue()
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Check(req, gc.DeepEquals, &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
{Upsert: fixture.Changes[0].Upsert, ExpectModRevision: 1},
},
})
return expected, nil
}
resp, err = ApplyJournalsLimit(ctx, client, fixture, 1)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)
c.Check(len(fixture.Changes), gc.Equals, 0)

// Case: MxnTxnSize == 0.
fixture = &pb.ApplyRequest{}
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Error("should not be called")
return nil, nil
}
resp, err = ApplyJournalsLimit(ctx, client, &pb.ApplyRequest{}, 1)
c.Check(err, gc.IsNil)
c.Check(resp, gc.IsNil)
c.Check(len(fixture.Changes), gc.Equals, 0)

// Case: Return Error from backend.
fixture = buildApplyReqFixtue()
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return nil, errors.New("something has gone wrong")
}
resp, err = ApplyJournalsLimit(ctx, client, fixture, 1)
c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong")
c.Check(len(fixture.Changes), gc.Equals, 2)

// Case: Status !OK mapped as an error.
fixture = buildApplyReqFixtue()
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{
Status: pb.Status_ETCD_TRANSACTION_FAILED,
Header: *hdr,
}, nil
}
resp, err = ApplyJournalsLimit(ctx, client, fixture, 1)
c.Check(err.Error(), gc.Matches, pb.Status_ETCD_TRANSACTION_FAILED.String())
c.Check(len(fixture.Changes), gc.Equals, 2)

// Case: Validation error mapped as error.
fixture = buildApplyReqFixtue()
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{
Status: pb.Status_ETCD_TRANSACTION_FAILED,
}, nil
}
resp, err = ApplyJournalsLimit(ctx, client, fixture, 1)
c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`)
c.Check(len(fixture.Changes), gc.Equals, 2)
}

func buildApplyReqFixtue() *pb.ApplyRequest {
// Create a fixture of JournalSpecs which we'll list.
var fragSpec = pb.JournalSpec_Fragment{
Length: 1024,
RefreshInterval: time.Second,
CompressionCodec: pb.CompressionCodec_SNAPPY,
}
var specA = &pb.JournalSpec{
Name: "journal/1/A",
LabelSet: pb.MustLabelSet("foo", "bar"),
Replication: 1,
Fragment: fragSpec,
}
var specB = &pb.JournalSpec{
Name: "journal/2/B",
LabelSet: pb.MustLabelSet("bar", "baz"),
Replication: 1,
Fragment: fragSpec,
}

return &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
{Upsert: specA, ExpectModRevision: 1},
{Upsert: specB, ExpectModRevision: 1},
},
}
}

func buildListResponseFixture(names ...pb.Journal) (out []pb.ListResponse_Journal) {
for _, n := range names {
out = append(out, pb.ListResponse_Journal{
Expand Down
Loading

0 comments on commit 34e19ed

Please sign in to comment.