Skip to content

Commit

Permalink
Add max-txn-size flag to gazctl
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Skelcy committed Feb 21, 2019
1 parent 99a79a0 commit ae1aede
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 38 deletions.
4 changes: 2 additions & 2 deletions v2/cmd/gazctl/journals_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func (cmd *cmdJournalsApply) Execute([]string) error {
}

var ctx = context.Background()
resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req)
resp, err := client.ApplyJournalsLimit(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply journals")

log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}

Expand Down
12 changes: 7 additions & 5 deletions v2/cmd/gazctl/journals_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/LiveRamp/gazette/v2/cmd/gazctl/editor"
"github.com/LiveRamp/gazette/v2/pkg/client"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
"github.com/LiveRamp/gazette/v2/pkg/protocol/journalspace"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -54,11 +56,11 @@ func (cmd *cmdJournalsEdit) applySpecs(b []byte) error {
}

var ctx = context.Background()
if resp, err := client.ApplyJournals(ctx, journalsCfg.Broker.JournalClient(ctx), req); err != nil {
return err
} else {
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
}
var resp *pb.ApplyResponse
var err error
resp, err = client.ApplyJournalsLimit(ctx, journalsCfg.Broker.JournalClient(ctx), req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply journals")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}
9 changes: 6 additions & 3 deletions v2/cmd/gazctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type ListConfig struct {

// ApplyConfig is common configuration of apply operations.
type ApplyConfig struct {
SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"`
DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"`
SpecsPath string `long:"specs" description:"Path to specifications file to apply. Stdin is used if not set"`
DryRun bool `long:"dry-run" description:"Perform a dry-run of the apply"`
MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"Maxium number of transactions to be processed within a single change batch"`
}

// EditConfig is common configuration for exit operations.
type EditConfig struct {
Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"`
Selector string `long:"selector" short:"l" required:"true" description:"Label Selector query to filter on" no-ini:"true"`
MaxTxnSize int `long:"max-txn-size" short:"m" default:"0" description:"Maxium number of transactions to be processed within a single change batch"`
}

func (cfg ApplyConfig) decode(into interface{}) error {
Expand Down
6 changes: 4 additions & 2 deletions v2/cmd/gazctl/shards_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ func (cmd *cmdShardsApply) Execute([]string) error {
}

var ctx = context.Background()
resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req)
var resp *consumer.ApplyResponse
var err error
resp, err = consumer.ApplyShardsLimit(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply shards")

log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")

return nil
}

Expand Down
16 changes: 8 additions & 8 deletions v2/cmd/gazctl/shards_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/LiveRamp/gazette/v2/cmd/gazctl/editor"
"github.com/LiveRamp/gazette/v2/pkg/consumer"
"github.com/LiveRamp/gazette/v2/pkg/consumer/shardspace"
mbp "github.com/LiveRamp/gazette/v2/pkg/mainboilerplate"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
Expand All @@ -21,7 +22,7 @@ func (cmd *cmdShardsEdit) Execute([]string) error {
return editor.EditRetryLoop(editor.RetryLoopArgs{
FilePrefix: "gazctl-shards-edit-",
SelectFn: cmd.selectSpecs,
ApplyFn: applyShardSpecYAML,
ApplyFn: cmd.applyShardSpecYAML,
AbortIfUnchanged: true,
})
}
Expand All @@ -38,7 +39,7 @@ func (cmd *cmdShardsEdit) selectSpecs() io.Reader {
return buf
}

func applyShardSpecYAML(b []byte) error {
func (cmd *cmdShardsEdit) applyShardSpecYAML(b []byte) error {
var set shardspace.Set
if err := yaml.UnmarshalStrict(b, &set); err != nil {
return err
Expand All @@ -49,11 +50,10 @@ func applyShardSpecYAML(b []byte) error {
}

var ctx = context.Background()
if resp, err := consumer.ApplyShards(ctx, shardsCfg.Consumer.ShardClient(ctx), req); err != nil {
return err
} else {
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
}

var resp *consumer.ApplyResponse
var err error
resp, err = consumer.ApplyShardsLimit(ctx, shardsCfg.Consumer.ShardClient(ctx), req, cmd.MaxTxnSize)
mbp.Must(err, "failed to apply shards")
log.WithField("rev", resp.Header.Etcd.Revision).Info("successfully applied")
return nil
}
57 changes: 48 additions & 9 deletions v2/pkg/client/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,56 @@ func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRe
return resp, nil
}

// ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error.
// ApplyJournals invokes the Apply RPC.
func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
if r, err := jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return r, err
} else if err = r.Validate(); err != nil {
return r, err
} else if r.Status != pb.Status_OK {
return r, errors.New(r.Status.String())
} else {
return r, nil
return ApplyJournalsLimit(ctx, jc, req, 0)
}

// ApplyJournalsLimit is a helper function for applying changes to journals
// which may be larger than the configured etcd transaction size limit. The
// changes in |parentReq| will be batched with a size of |maxTxnSize| and
// sent serially. If |maxTxnSize| is 0 all changes will be attempted as
// part of a single transaction. This function will return the response of
// the final JournalsClient.Apply call. Response validation or !OK status
// from Apply RPC are mapped to error. In the event of an error any
// unapplied changes will be available on |parentReq|.
func ApplyJournalsLimit(
ctx context.Context,
jc pb.JournalClient,
parentReq *pb.ApplyRequest,
maxTxnSize int,
) (*pb.ApplyResponse, error) {
var changes []pb.ApplyRequest_Change
if maxTxnSize == 0 {
maxTxnSize = len(parentReq.Changes)
}
var finalResp *pb.ApplyResponse

for len(parentReq.Changes) > 0 {
if len(parentReq.Changes) > maxTxnSize {
changes = parentReq.Changes[:maxTxnSize]
} else {
changes = parentReq.Changes
}

var req = &pb.ApplyRequest{}
for _, change := range changes {
req.Changes = append(req.Changes, change)
}

var resp *pb.ApplyResponse
var err error
if resp, err = jc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return resp, err
} else if err = resp.Validate(); err != nil {
return resp, err
} else if resp.Status != pb.Status_OK {
return resp, errors.New(resp.Status.String())
}
finalResp = resp
parentReq.Changes = parentReq.Changes[len(changes):]
}
return finalResp, nil
}

// ListAllFragments performs multiple Fragments RPCs, as required to join across multiple
Expand Down
118 changes: 118 additions & 0 deletions v2/pkg/client/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,124 @@ func (s *ListSuite) TestListAllFragments(c *gc.C) {
c.Check(err, gc.ErrorMatches, `Status: invalid status \(1000\)`)
}

func (s *ListSuite) TestApplyJournalsLimit(c *gc.C) {
var ctx, cancel = context.WithCancel(context.Background())
defer cancel()

var broker = teststub.NewBroker(c, ctx)

var client = pb.NewRoutedJournalClient(broker.MustClient(), NewRouteCache(2, time.Hour))

var hdr = buildHeaderFixture(broker)
// Case: MxnTxnSize is 0. All changes are submitted.
var fixture = buildApplyReqFixtue()
var expected = &pb.ApplyResponse{
Status: pb.Status_OK,
Header: *hdr,
}
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Check(req, gc.DeepEquals, fixture)
return expected, nil
}
resp, err := ApplyJournalsLimit(ctx, client, fixture, 0)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)
c.Check(len(fixture.Changes), gc.Equals, 0)

// Case: MxnTxnSize == len(req.Changes). All changes are submitted.
fixture = buildApplyReqFixtue()
resp, err = ApplyJournalsLimit(ctx, client, fixture, 3)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)
c.Check(len(fixture.Changes), gc.Equals, 0)

// Case: MxnTxnSize < len(req.Changes). Changes are batched.
fixture = buildApplyReqFixtue()
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Check(req, gc.DeepEquals, &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
{Upsert: fixture.Changes[0].Upsert, ExpectModRevision: 1},
},
})
return expected, nil
}
resp, err = ApplyJournalsLimit(ctx, client, fixture, 1)
c.Check(err, gc.IsNil)
c.Check(resp, gc.DeepEquals, expected)
c.Check(len(fixture.Changes), gc.Equals, 0)

// Case: MxnTxnSize == 0.
fixture = &pb.ApplyRequest{}
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
c.Error("should not be called")
return nil, nil
}
resp, err = ApplyJournalsLimit(ctx, client, &pb.ApplyRequest{}, 1)
c.Check(err, gc.IsNil)
c.Check(resp, gc.IsNil)
c.Check(len(fixture.Changes), gc.Equals, 0)

// Case: Return Error from backend.
fixture = buildApplyReqFixtue()
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return nil, errors.New("something has gone wrong")
}
resp, err = ApplyJournalsLimit(ctx, client, fixture, 1)
c.Check(err, gc.ErrorMatches, "rpc error: code = Unknown desc = something has gone wrong")
c.Check(len(fixture.Changes), gc.Equals, 2)

// Case: Status !OK mapped as an error.
fixture = buildApplyReqFixtue()
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{
Status: pb.Status_ETCD_TRANSACTION_FAILED,
Header: *hdr,
}, nil
}
resp, err = ApplyJournalsLimit(ctx, client, fixture, 1)
c.Check(err.Error(), gc.Matches, pb.Status_ETCD_TRANSACTION_FAILED.String())
c.Check(len(fixture.Changes), gc.Equals, 2)

// Case: Validation error mapped as error.
fixture = buildApplyReqFixtue()
broker.ApplyFunc = func(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{
Status: pb.Status_ETCD_TRANSACTION_FAILED,
}, nil
}
resp, err = ApplyJournalsLimit(ctx, client, fixture, 1)
c.Check(err, gc.ErrorMatches, `Header.Route: invalid Primary \(0; expected -1 <= Primary < 0\)`)
c.Check(len(fixture.Changes), gc.Equals, 2)
}

func buildApplyReqFixtue() *pb.ApplyRequest {
// Create a fixture of JournalSpecs which we'll list.
var fragSpec = pb.JournalSpec_Fragment{
Length: 1024,
RefreshInterval: time.Second,
CompressionCodec: pb.CompressionCodec_SNAPPY,
}
var specA = &pb.JournalSpec{
Name: "journal/1/A",
LabelSet: pb.MustLabelSet("foo", "bar"),
Replication: 1,
Fragment: fragSpec,
}
var specB = &pb.JournalSpec{
Name: "journal/2/B",
LabelSet: pb.MustLabelSet("bar", "baz"),
Replication: 1,
Fragment: fragSpec,
}

return &pb.ApplyRequest{
Changes: []pb.ApplyRequest_Change{
{Upsert: specA, ExpectModRevision: 1},
{Upsert: specB, ExpectModRevision: 1},
},
}
}

func buildListResponseFixture(names ...pb.Journal) (out []pb.ListResponse_Journal) {
for _, n := range names {
out = append(out, pb.ListResponse_Journal{
Expand Down
57 changes: 48 additions & 9 deletions v2/pkg/consumer/shard_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,56 @@ func ListShards(ctx context.Context, sc ShardClient, req *ListRequest) (*ListRes
}
}

// ApplyShards invokes the Apply RPC, and maps a validation or !OK status to an error.
// ApplyShards invokes the Apply RPC.
func ApplyShards(ctx context.Context, sc ShardClient, req *ApplyRequest) (*ApplyResponse, error) {
if r, err := sc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return r, err
} else if err = r.Validate(); err != nil {
return r, err
} else if r.Status != Status_OK {
return r, errors.New(r.Status.String())
} else {
return r, nil
return ApplyShardsLimit(ctx, sc, req, 0)
}

// ApplyShardsLimit is a helper function for applying changes to journals
// which may be larger than the configured etcd transaction size limit. The
// changes in |parentReq| will be broken in request with |maxTxnSize| changes
// and sent serially. If |maxTxnSize| is 0 all changes will be attempted as
// part of a single transaction. This function will return the response of
// the final ShardClient.Apply call. Response validation or !OK status
// from Apply RPC are mapped to error. In the event of an error any
// unapplied changes will be available on |parentReq|.
func ApplyShardsLimit(
ctx context.Context,
sc ShardClient,
parentReq *ApplyRequest,
maxTxnSize int,
) (*ApplyResponse, error) {
var changes []ApplyRequest_Change
if maxTxnSize == 0 {
maxTxnSize = len(parentReq.Changes)
}
var finalResp *ApplyResponse

for len(parentReq.Changes) > 0 {
if len(parentReq.Changes) > maxTxnSize {
changes = parentReq.Changes[:maxTxnSize]
} else {
changes = parentReq.Changes
}

var req = &ApplyRequest{}
for _, change := range changes {
req.Changes = append(req.Changes, change)
}

var resp *ApplyResponse
var err error
if resp, err = sc.Apply(pb.WithDispatchDefault(ctx), req, grpc.FailFast(false)); err != nil {
return resp, err
} else if err = resp.Validate(); err != nil {
return resp, err
} else if resp.Status != Status_OK {
return resp, errors.New(resp.Status.String())
}
finalResp = resp
parentReq.Changes = parentReq.Changes[len(changes):]
}
return finalResp, nil
}

// FetchHints invokes the Hints RPC, and maps a validation or !OK status to an error.
Expand Down
Loading

0 comments on commit ae1aede

Please sign in to comment.