Skip to content

Commit

Permalink
Merge pull request #54267 from nvanbenschoten/backport20.2-54140
Browse files Browse the repository at this point in the history
release-20.2: kv: don't leak raft application tracing spans on or after ErrRemoved
  • Loading branch information
nvanbenschoten authored Sep 11, 2020
2 parents 1fdef61 + 334b72d commit 2f40eb2
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func registerKVRangeLookups(r *testRegistry) {
duration := " --duration=10m"
readPercent := " --read-percent=50"
// We run kv with --tolerate-errors, since the relocate workload is
// expected to create `result is ambiguous (removing replica)` errors.
// expected to create `result is ambiguous (replica removed)` errors.
cmd = fmt.Sprintf("./workload run kv --tolerate-errors"+
concurrency+duration+readPercent+
" {pgurl:1-%d}", nodes)
Expand Down
56 changes: 43 additions & 13 deletions pkg/kv/kvserver/apply/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ type Command interface {
// that were locally proposed typically have a client waiting on a
// response, so there is additional urgency to apply them quickly.
IsLocal() bool
// AckErrAndFinish signals that the application of the command has been
// rejected due to the provided error. It also relays this rejection of
// the command to its client if it was proposed locally. An error will
// immediately stall entry application, so one must only be returned if
// the state machine is no longer able to make progress.
//
// Either AckOutcomeAndFinish or AckErrAndFinish will be called exactly
// once per Command.
AckErrAndFinish(context.Context, error) error
}

// CheckedCommand is a command that has been checked to see whether it can
Expand All @@ -41,7 +50,7 @@ type CheckedCommand interface {
CanAckBeforeApplication() bool
// AckSuccess acknowledges the success of the command to its client.
// Must only be called if !Rejected.
AckSuccess() error
AckSuccess(context.Context) error
}

// AppliedCommand is a command that has been applied to the replicated state
Expand All @@ -52,13 +61,15 @@ type CheckedCommand interface {
// the state machine.
type AppliedCommand interface {
CheckedCommand
// FinishAndAckOutcome signals that the application of the command has
// AckOutcomeAndFinish signals that the application of the command has
// completed. It also acknowledges the outcome of the command to its
// client if it was proposed locally. An error will immediately stall
// entry application, so one must only be returned if the state machine
// is no longer able to make progress. The method will be called exactly
// is no longer able to make progress.
//
// Either AckOutcomeAndFinish or AckErrAndFinish will be called exactly
// once per Command.
FinishAndAckOutcome(context.Context) error
AckOutcomeAndFinish(context.Context) error
}

// CommandIteratorBase is a common interface extended by all iterator and
Expand Down Expand Up @@ -163,6 +174,7 @@ func mapCmdIter(
for iter.Valid() {
checked, err := fn(iter.Cur())
if err != nil {
ret.Close()
return nil, err
}
iter.Next()
Expand All @@ -183,6 +195,7 @@ func mapCheckedCmdIter(
for iter.Valid() {
applied, err := fn(iter.CurChecked())
if err != nil {
ret.Close()
return nil, err
}
iter.Next()
Expand All @@ -191,12 +204,35 @@ func mapCheckedCmdIter(
return ret, nil
}

// In the following three functions, fn is written with ctx as a 2nd param
// because callers want to bind it to methods that have Commands (or variants)
// as the receiver, which mandates that to be the first param. The caller didn't
// want to introduce a callback instead to make it clear that nothing escapes to
// the heap.

// forEachCmdIter calls a closure on each command in the provided iterator. The
// function closes the provided iterator.
func forEachCmdIter(
ctx context.Context, iter CommandIterator, fn func(Command, context.Context) error,
) error {
defer iter.Close()
for iter.Valid() {
if err := fn(iter.Cur(), ctx); err != nil {
return err
}
iter.Next()
}
return nil
}

// forEachCheckedCmdIter calls a closure on each command in the provided
// iterator. The function closes the provided iterator.
func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand) error) error {
func forEachCheckedCmdIter(
ctx context.Context, iter CheckedCommandIterator, fn func(CheckedCommand, context.Context) error,
) error {
defer iter.Close()
for iter.Valid() {
if err := fn(iter.CurChecked()); err != nil {
if err := fn(iter.CurChecked(), ctx); err != nil {
return err
}
iter.Next()
Expand All @@ -207,13 +243,7 @@ func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand)
// forEachAppliedCmdIter calls a closure on each command in the provided
// iterator. The function closes the provided iterator.
func forEachAppliedCmdIter(
ctx context.Context,
iter AppliedCommandIterator,
// fn is weirdly written with ctx as a 2nd param because the caller wants to
// bind it to a method that has the AppliedCommand on the receiver, thus
// forcing that to be the first param. The caller didn't want to introduce a
// callback instead to make it clear that nothing escapes to the heap.
fn func(AppliedCommand, context.Context) error,
ctx context.Context, iter AppliedCommandIterator, fn func(AppliedCommand, context.Context) error,
) error {
defer iter.Close()
for iter.Valid() {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/apply/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ it likes.
// applying side-effects of command 3
// applying side-effects of command 4
// finishing command 1; rejected=false
// finishing and acknowledging command 2; rejected=false
// finishing and acknowledging command 3; rejected=true
// acknowledging and finishing command 2; rejected=false
// acknowledging and finishing command 3; rejected=true
// finishing command 4; rejected=false
// applying batch with commands=[5]
// applying side-effects of command 5
// finishing and acknowledging command 5; rejected=false
// acknowledging and finishing command 5; rejected=false
// applying batch with commands=[6 7]
// applying side-effects of command 6
// applying side-effects of command 7
// finishing and acknowledging command 6; rejected=true
// finishing and acknowledging command 7; rejected=false
// acknowledging and finishing command 6; rejected=true
// acknowledging and finishing command 7; rejected=false
}
22 changes: 16 additions & 6 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type StateMachine interface {
ApplySideEffects(CheckedCommand) (AppliedCommand, error)
}

// ErrRemoved can be returned from ApplySideEffects which will stop the
// task from processing more commands and return immediately.
// ErrRemoved can be returned from ApplySideEffects which will stop the task
// from processing more commands and return immediately. The error should
// only be thrown by non-trivial commands.
var ErrRemoved = errors.New("replica removed")

// Batch accumulates a series of updates from Commands and performs them
Expand Down Expand Up @@ -222,9 +223,9 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde
// in the batch and can be acknowledged before they are actually applied.
// Don't acknowledge rejected proposals early because the StateMachine may
// want to retry the command instead of returning the error to the client.
return forEachCheckedCmdIter(stagedIter, func(cmd CheckedCommand) error {
return forEachCheckedCmdIter(ctx, stagedIter, func(cmd CheckedCommand, ctx context.Context) error {
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
return cmd.AckSuccess()
return cmd.AckSuccess(ctx)
}
return nil
})
Expand All @@ -242,12 +243,21 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {
t.assertDecoded()

iter := t.dec.NewCommandIter()
defer iter.Close()
for iter.Valid() {
if err := t.applyOneBatch(ctx, iter); err != nil {
// If the batch threw an error, reject all remaining commands in the
// iterator to avoid leaking resources or leaving a proposer hanging.
//
// NOTE: forEachCmdIter closes iter.
if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error {
return cmd.AckErrAndFinish(ctx, err)
}); rejectErr != nil {
return rejectErr
}
return err
}
}
iter.Close()
return nil
}

Expand Down Expand Up @@ -284,7 +294,7 @@ func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error {
}

// Finish and acknowledge the outcome of each command.
return forEachAppliedCmdIter(ctx, appliedIter, AppliedCommand.FinishAndAckOutcome)
return forEachAppliedCmdIter(ctx, appliedIter, AppliedCommand.AckOutcomeAndFinish)
}

// trivialPolicy encodes a batching policy that allows a batch to consist of
Expand Down
89 changes: 69 additions & 20 deletions pkg/kv/kvserver/apply/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ func setLogging(on bool) func() {
}

type cmd struct {
index uint64
nonTrivial bool
nonLocal bool
shouldReject bool
index uint64
nonTrivial bool
nonLocal bool
shouldReject bool
shouldThrowErrRemoved bool

acked bool
finished bool
Expand All @@ -51,27 +52,35 @@ type appliedCmd struct {
*checkedCmd
}

func (c *cmd) Index() uint64 { return c.index }
func (c *cmd) IsTrivial() bool { return !c.nonTrivial }
func (c *cmd) IsLocal() bool { return !c.nonLocal }
func (c *cmd) Index() uint64 { return c.index }
func (c *cmd) IsTrivial() bool { return !c.nonTrivial }
func (c *cmd) IsLocal() bool { return !c.nonLocal }
func (c *cmd) AckErrAndFinish(_ context.Context, err error) error {
c.acked = true
c.finished = true
if logging {
fmt.Printf(" acknowledging rejected command %d with err=%s\n", c.Index(), err)
}
return nil
}
func (c *checkedCmd) Rejected() bool { return c.rejected }
func (c *checkedCmd) CanAckBeforeApplication() bool { return true }
func (c *checkedCmd) AckSuccess() error {
func (c *checkedCmd) AckSuccess(context.Context) error {
c.acked = true
if logging {
fmt.Printf(" acknowledging command %d before application\n", c.Index())
}
return nil
}
func (c *appliedCmd) FinishAndAckOutcome(context.Context) error {
func (c *appliedCmd) AckOutcomeAndFinish(context.Context) error {
c.finished = true
if c.acked {
if logging {
fmt.Printf(" finishing command %d; rejected=%t\n", c.Index(), c.Rejected())
}
} else {
if logging {
fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected())
fmt.Printf(" acknowledging and finishing command %d; rejected=%t\n", c.Index(), c.Rejected())
}
c.acked = true
}
Expand Down Expand Up @@ -136,6 +145,11 @@ func (sm *testStateMachine) ApplySideEffects(
if logging {
fmt.Printf(" applying side-effects of command %d\n", cmd.Index())
}
if cmd.shouldThrowErrRemoved {
err := apply.ErrRemoved
_ = cmd.AckErrAndFinish(context.Background(), err)
return nil, err
}
acmd := appliedCmd{checkedCmd: cmd}
return &acmd, nil
}
Expand Down Expand Up @@ -168,18 +182,20 @@ func (b *testBatch) Close() {
}

type testDecoder struct {
nonTrivial map[uint64]bool
nonLocal map[uint64]bool
shouldReject map[uint64]bool
nonTrivial map[uint64]bool
nonLocal map[uint64]bool
shouldReject map[uint64]bool
shouldThrowErrRemoved map[uint64]bool

cmds []*cmd
}

func newTestDecoder() *testDecoder {
return &testDecoder{
nonTrivial: make(map[uint64]bool),
nonLocal: make(map[uint64]bool),
shouldReject: make(map[uint64]bool),
nonTrivial: make(map[uint64]bool),
nonLocal: make(map[uint64]bool),
shouldReject: make(map[uint64]bool),
shouldThrowErrRemoved: make(map[uint64]bool),
}
}

Expand All @@ -188,10 +204,11 @@ func (d *testDecoder) DecodeAndBind(_ context.Context, ents []raftpb.Entry) (boo
for i, ent := range ents {
idx := ent.Index
cmd := &cmd{
index: idx,
nonTrivial: d.nonTrivial[idx],
nonLocal: d.nonLocal[idx],
shouldReject: d.shouldReject[idx],
index: idx,
nonTrivial: d.nonTrivial[idx],
nonLocal: d.nonLocal[idx],
shouldReject: d.shouldReject[idx],
shouldThrowErrRemoved: d.shouldThrowErrRemoved[idx],
}
d.cmds[i] = cmd
if logging {
Expand Down Expand Up @@ -347,3 +364,35 @@ func TestAckCommittedEntriesBeforeApplication(t *testing.T) {
require.False(t, cmd.finished)
}
}

func TestApplyCommittedEntriesWithErr(t *testing.T) {
ctx := context.Background()
ents := makeEntries(6)

sm := getTestStateMachine()
dec := newTestDecoder()
dec.nonTrivial[3] = true
dec.shouldThrowErrRemoved[3] = true
dec.nonTrivial[6] = true

// Use an apply.Task to apply all commands.
appT := apply.MakeTask(sm, dec)
defer appT.Close()
require.NoError(t, appT.Decode(ctx, ents))
require.Equal(t, apply.ErrRemoved, appT.ApplyCommittedEntries(ctx))

// Assert that only commands up to the replica removal were applied.
exp := testStateMachine{
batches: [][]uint64{{1, 2}, {3}},
applied: []uint64{1, 2, 3},
appliedSideEffects: []uint64{1, 2, 3},
}
require.Equal(t, exp, *sm)

// Assert that all commands were acknowledged and finished, even though not
// all were applied.
for _, cmd := range dec.cmds {
require.True(t, cmd.acked)
require.True(t, cmd.finished)
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5289,7 +5289,7 @@ func TestReplicaRemovalClosesProposalQuota(t *testing.T) {
RangeID: desc.RangeID,
}, putArgs(k, bytes.Repeat([]byte{'a'}, 1000)))
require.Regexp(t,
`result is ambiguous \(removing replica\)|`+
`result is ambiguous \(replica removed\)|`+
`r`+strconv.Itoa(int(desc.RangeID))+" was not found on s1", pErr.GoError())
}(i)
}
Expand Down
Loading

0 comments on commit 2f40eb2

Please sign in to comment.