Skip to content

Commit

Permalink
Add max-txn-size flag to gazctl
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Skelcy committed Feb 25, 2019
1 parent eab0734 commit 755e9d2
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 36 deletions.
20 changes: 18 additions & 2 deletions v2/cmd/gazctl/journals_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
)
Expand All @@ -33,10 +34,13 @@ func (cmd *cmdJournalsApply) Execute([]string) error {
}

var ctx = context.Background()
resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req)
resp, err := client.ApplyJournalsInBatches(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize)
if err == rpctypes.ErrGRPCTooManyOps {
tooManyOpsPanic()
}
mbp.Must(err, "failed to apply journals")

log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}

Expand All @@ -59,3 +63,15 @@ func newJournalSpecApplyRequest(tree *journalspace.Node) *pb.ApplyRequest {
})
return req
}

func tooManyOpsPanic() {
log.Panic(`
This operation has generated more changes than are possible in a single etcd
transaction given the current server configation. Gazctl supports a
max transaction size flag (--max-txn-size) which will send the changes in
batches of at most the max transaction size, however this means a loss
of transactionality and should be used with caution. Instead it is recomended
that additional label selectors are used to limit the number of changes within
this operation.
`)
}
14 changes: 10 additions & 4 deletions v2/cmd/gazctl/journals_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (

"github.com/LiveRamp/gazette/v2/cmd/gazctl/editor"
"github.com/LiveRamp/gazette/v2/pkg/client"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -54,11 +57,14 @@ func (cmd *cmdJournalsEdit) applySpecs(b []byte) error {
}

var ctx = context.Background()
if resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req); err != nil {
return err
} else {
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
var resp *pb.ApplyResponse
var err error
resp, err = client.ApplyJournalsInBatches(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize)
if err == rpctypes.ErrTooManyOps {
tooManyOpsPanic()
}
mbp.Must(err, "failed to apply journals")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}
9 changes: 6 additions & 3 deletions v2/cmd/gazctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type ListConfig struct {

// ApplyConfig is common configuration of apply operations.
type ApplyConfig struct {
SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"`
DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"`
SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"`
DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"`
MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"maximum number of specs to be processed within an apply transaction. If 0, the default, all changes are issued in a single transaction"`
}

// EditConfig is common configuration for exit operations.
type EditConfig struct {
Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"`
Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"`
MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"maximum number of specs to be processed within an apply transaction. If 0, the default, all changes are issued in a single transaction"`
}

func (cfg ApplyConfig) decode(into interface{}) error {
Expand Down
10 changes: 8 additions & 2 deletions v2/cmd/gazctl/shards_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/LiveRamp/gazette/v2/pkg/consumer"
"github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
)
Expand All @@ -30,10 +31,15 @@ func (cmd *cmdShardsApply) Execute([]string) error {
}

var ctx = context.Background()
resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req)
var resp *consumer.ApplyResponse
var err error
resp, err = consumer.ApplyShardsInBatches(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize)
if err == rpctypes.ErrTooManyOps {
tooManyOpsPanic()
}
mbp.Must(err, "failed to apply shards")

log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}

Expand Down
18 changes: 11 additions & 7 deletions v2/cmd/gazctl/shards_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/LiveRamp/gazette/v2/cmd/gazctl/editor"
"github.com/LiveRamp/gazette/v2/pkg/consumer"
"github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
Expand All @@ -21,7 +23,7 @@ func (cmd *cmdShardsEdit) Execute([]string) error {
return editor.EditRetryLoop(editor.RetryLoopArgs{
FilePrefix: "gazctl-shards-edit-",
SelectFn: cmd.selectSpecs,
ApplyFn: applyShardSpecYAML,
ApplyFn: cmd.applyShardSpecYAML,
AbortIfUnchanged: true,
})
}
Expand All @@ -38,7 +40,7 @@ func (cmd *cmdShardsEdit) selectSpecs() io.Reader {
return buf
}

func applyShardSpecYAML(b []byte) error {
func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error {
var set shardspace.Set
if err := yaml.UnmarshalStrict(b, &set); err != nil {
return err
Expand All @@ -49,11 +51,13 @@ func applyShardSpecYAML(b []byte) error {
}

var ctx = context.Background()
if resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req); err != nil {
return err
} else {
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
var resp *consumer.ApplyResponse
var err error
resp, err = consumer.ApplyShardsInBatches(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize)
if err == rpctypes.ErrGRPCTooManyOps {
tooManyOpsPanic()
}

mbp.Must(err, "failed to apply shards")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
return nil
}
51 changes: 42 additions & 9 deletions v2/pkg/client/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,49 @@ func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRe
return resp, nil
}

// ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error.
// ApplyJournals invokes the Apply RPC.
func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
if r, err := jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return r, err
} else if err = r.Validate(); err != nil {
return r, err
} else if r.Status != pb.Status_OK {
return r, errors.New(r.Status.String())
} else {
return r, nil
return ApplyJournalsInBatches(ctx, jc, req, 0)
}

// ApplyJournalsInBatches is a helper function for applying changes to journals which
// may be larger than the configured etcd transaction size size. The changes in
// |req| will be sent serially in batches of size |size|. If
// |size| is 0 all changes will be attempted as part of a single
// transaction. This function will return the response of the final
// ShardClient.Apply call. Response validation or !OK status from Apply RPC are
// mapped to error.
func ApplyJournalsInBatches(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest, size int) (*pb.ApplyResponse, error) {
if len(req.Changes) == 0 {
return &pb.ApplyResponse{}, nil
}
if size == 0 {
size = len(req.Changes)
}
var curReq = &pb.ApplyRequest{}
var offset = 0

for {
if len(req.Changes[offset:]) > size {
curReq.Changes = req.Changes[offset : offset+size]
} else {
curReq.Changes = req.Changes[offset:]
}

var resp *pb.ApplyResponse
var err error
if resp, err = jc.Apply(pb.WithDispatchDefault(ctx), curReq, grpc.FailFast(false)); err != nil {
return resp, err
} else if err = resp.Validate(); err != nil {
return resp, err
} else if resp.Status != pb.Status_OK {
return resp, errors.New(resp.Status.String())
}

offset = offset + len(curReq.Changes)
if offset == len(req.Changes) {
return resp, nil
}
}
}

Expand Down
107 changes: 107 additions & 0 deletions v2/pkg/client/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,113 @@ func (s *ListSuite) TestListAllFragments(c *gc.C) {
c.Check(err, gc.ErrorMatches, `Status: invalid status \(1000\)`)
}

func (s *ListSuite) TestApplyJournalsInBatches(c *gc.C) {
var ctx, cancel = context.WithCancel(context.Background())
defer cancel()

var broker = teststub.NewBroker(c, ctx)

var client = pb.NewRoutedJournalClient(broker.MustClient(), NewRouteCache(2, time.Hour))

var hdr = buildHeaderFixture(broker)
// Case: size is 0. All changes are submitted.
var fixture = buildApplyReqFixtue()
var expected = &pb.ApplyResponse{
Status: pb.Status_OK,
Header: *hdr,
}
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Check(req, gc.DeepEquals, fixture)
return expected, nil
}
resp, err := ApplyJournalsInBatches(ctx, client, fixture, 0)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)

// Case: size == len(req.Changes). All changes are submitted.
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 3)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)

// Case: size < len(req.Changes). Changes are batched.
var iter = 0
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Check(req, gc.DeepEquals, &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
{Upsert: fixture.Changes[iter].Upsert, ExpectModRevision: 1},
},
})
iter++
return expected, nil
}
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)

// Case: empty list of changes.
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Error("should not be called")
return nil, nil
}
resp, err = ApplyJournalsInBatches(ctx, client, &pb.ApplyRequest{}, 1)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, &pb.ApplyResponse{})

// Case: Return Error from backend.
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return nil, errors.New("something has gone wrong")
}
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1)
c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong")

// Case: Status !OK mapped as an error.
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{
Status: pb.Status_ETCD_TRANSACTION_FAILED,
Header: *hdr,
}, nil
}
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1)
c.Check(err.Error(), gc.Matches, pb.Status_ETCD_TRANSACTION_FAILED.String())

// Case: Validation error mapped as error.
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{
Status: pb.Status_ETCD_TRANSACTION_FAILED,
}, nil
}
resp, err = ApplyJournalsInBatches(ctx, client, fixture, 1)
c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`)
}

func buildApplyReqFixtue() *pb.ApplyRequest {
// Create a fixture of JournalSpecs which we'll list.
var fragSpec = pb.JournalSpec_Fragment{
Length: 1024,
RefreshInterval: time.Second,
CompressionCodec: pb.CompressionCodec_SNAPPY,
}
var specA = &pb.JournalSpec{
Name: "journal/1/A",
LabelSet: pb.MustLabelSet("foo", "bar"),
Replication: 1,
Fragment: fragSpec,
}
var specB = &pb.JournalSpec{
Name: "journal/2/B",
LabelSet: pb.MustLabelSet("bar", "baz"),
Replication: 1,
Fragment: fragSpec,
}

return &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
{Upsert: specA, ExpectModRevision: 1},
{Upsert: specB, ExpectModRevision: 1},
},
}
}

func buildListResponseFixture(names ...pb.Journal) (out []pb.ListResponse_Journal) {
for _, n := range names {
out = append(out, pb.ListResponse_Journal{
Expand Down
51 changes: 42 additions & 9 deletions v2/pkg/consumer/shard_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,49 @@ func ListShards(ctx context.Context, sc ShardClient, req *ListRequest) (*ListRes
}
}

// ApplyShards invokes the Apply RPC, and maps a validation or !OK status to an error.
// ApplyShards invokes the Apply RPC.
func ApplyShards(ctx context.Context, sc ShardClient, req *ApplyRequest) (*ApplyResponse, error) {
if r, err := sc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return r, err
} else if err = r.Validate(); err != nil {
return r, err
} else if r.Status != Status_OK {
return r, errors.New(r.Status.String())
} else {
return r, nil
return ApplyShardsInBatches(ctx, sc, req, 0)
}

// ApplyShardsInBatches is a helper function for applying changes to shards which
// may be larger than the configured etcd transaction size size. The changes in
// |req| will be sent serially in batches of size |size|. If
// |size| is 0 all changes will be attempted as part of a single
// transaction. This function will return the response of the final
// ShardClient.Apply call. Response validation or !OK status from Apply RPC are
// mapped to error.
func ApplyShardsInBatches(ctx context.Context, sc ShardClient, req *ApplyRequest, size int) (*ApplyResponse, error) {
if len(req.Changes) == 0 {
return &ApplyResponse{}, nil
}
if size == 0 {
size = len(req.Changes)
}
var curReq = &ApplyRequest{}
var offset = 0

for {
if len(req.Changes[offset:]) > size {
curReq.Changes = req.Changes[offset : offset+size]
} else {
curReq.Changes = req.Changes[offset:]
}

var resp *ApplyResponse
var err error
if resp, err = sc.Apply(pb.WithDispatchDefault(ctx), curReq, grpc.FailFast(false)); err != nil {
return resp, err
} else if err = resp.Validate(); err != nil {
return resp, err
} else if resp.Status != Status_OK {
return resp, errors.New(resp.Status.String())
}

offset = offset + len(curReq.Changes)
if offset == len(req.Changes) {
return resp, nil
}
}
}

Expand Down
Loading

0 comments on commit 755e9d2

Please sign in to comment.