From b175745372e062c12abd17d32c5a0cdde7d87618 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 9 Sep 2020 10:48:42 -0400 Subject: [PATCH 1/3] kv/kvserver/apply: close iterators on err in map functions Release justification: low risk, high benefit changes to existing functionality. --- pkg/kv/kvserver/apply/cmd.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/kv/kvserver/apply/cmd.go b/pkg/kv/kvserver/apply/cmd.go index 467dea671e71..d28df27a6765 100644 --- a/pkg/kv/kvserver/apply/cmd.go +++ b/pkg/kv/kvserver/apply/cmd.go @@ -163,6 +163,7 @@ func mapCmdIter( for iter.Valid() { checked, err := fn(iter.Cur()) if err != nil { + ret.Close() return nil, err } iter.Next() @@ -183,6 +184,7 @@ func mapCheckedCmdIter( for iter.Valid() { applied, err := fn(iter.CurChecked()) if err != nil { + ret.Close() return nil, err } iter.Next() From 1a0ad39f75c9c1d8ca3d810f50e800306a2906e4 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 9 Sep 2020 14:24:15 -0400 Subject: [PATCH 2/3] kv: don't leak raft application tracing spans on ErrRemoved Fixes #53677. This commit ensures that we properly finish tracing spans of Raft commands that throw ErrRemoved errors in ApplySideEffects. I was originally intending to do something more dramatic and make `replicaStateMachine.ApplySideEffects` responsible for acknowledging proposers in all cases, but doing so turned out to be pretty invasive so I was concerned that it would be harder to backport to v20.2 and to v20.1. I may revisit that in the future. Release justification: low risk, high benefit changes to existing functionality. --- pkg/kv/kvserver/replica_application_cmd.go | 16 +++++++++++++++- .../replica_application_state_machine.go | 4 +++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index a4c5ed43cd28..51d590502d5e 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -155,13 +155,27 @@ func (c *replicatedCmd) AckSuccess() error { // FinishAndAckOutcome implements the apply.AppliedCommand interface. func (c *replicatedCmd) FinishAndAckOutcome(ctx context.Context) error { - tracing.FinishSpan(c.sp) if c.IsLocal() { c.proposal.finishApplication(ctx, c.response) } + c.finishTracingSpan() return nil } +// FinishNonLocal is like AckOutcomeAndFinish, but instead of acknowledging the +// command's proposal if it is local, it asserts that the proposal is not local. +func (c *replicatedCmd) FinishNonLocal(ctx context.Context) { + if c.IsLocal() { + log.Fatalf(ctx, "proposal unexpectedly local: %v", c.replicatedResult()) + } + c.finishTracingSpan() +} + +func (c *replicatedCmd) finishTracingSpan() { + tracing.FinishSpan(c.sp) + c.ctx, c.sp = nil, nil +} + // decode decodes the entry e into the decodedRaftEntry. func (d *decodedRaftEntry) decode(ctx context.Context, e *raftpb.Entry) error { *d = decodedRaftEntry{} diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 05ef2b3d17da..3972a4426ca9 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -1044,8 +1044,10 @@ func (sm *replicaStateMachine) ApplySideEffects( clearTrivialReplicatedEvalResultFields(cmd.replicatedResult()) if !cmd.IsTrivial() { shouldAssert, isRemoved := sm.handleNonTrivialReplicatedEvalResult(ctx, cmd.replicatedResult()) - if isRemoved { + // The proposal must not have been local, because we don't allow a + // proposing replica to remove itself from the Range. + cmd.FinishNonLocal(ctx) return nil, apply.ErrRemoved } // NB: Perform state assertion before acknowledging the client. From 334b72d97ffbf38f3b3b3ecdcaca648e18a396c9 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 9 Sep 2020 14:30:25 -0400 Subject: [PATCH 3/3] kv: don't leak tracing spans or clients after ErrRemoved Extension of #53677. This commit ensures that we properly finish tracing spans of Raft commands that follow a command that throws a ErrRemoved. Before this commit, these commands would be abandoned and would never be finished. The effects of this are theoretically even worse than those fixed in the previous commit because these leaked commands could be locally proposed, so we may be abandoning a local proposer indefinitely. It's not clear that we ever saw an instance of this. It seems rare for a local proposal to end up in the same CommittedEntries batch as a command that removes a replica because of the lease requirements, but it doesn't seem impossible, especially of the local proposal was a RequestLease request. Release justification: low risk, high benefit changes to existing functionality. --- pkg/cmd/roachtest/kv.go | 2 +- pkg/kv/kvserver/apply/cmd.go | 54 +++++++++---- pkg/kv/kvserver/apply/doc_test.go | 10 +-- pkg/kv/kvserver/apply/task.go | 22 ++++-- pkg/kv/kvserver/apply/task_test.go | 89 +++++++++++++++++----- pkg/kv/kvserver/client_raft_test.go | 2 +- pkg/kv/kvserver/replica_application_cmd.go | 18 ++++- pkg/kv/kvserver/replica_destroy.go | 5 +- 8 files changed, 151 insertions(+), 51 deletions(-) diff --git a/pkg/cmd/roachtest/kv.go b/pkg/cmd/roachtest/kv.go index 3d756c9748e2..aed25a9b98f8 100644 --- a/pkg/cmd/roachtest/kv.go +++ b/pkg/cmd/roachtest/kv.go @@ -602,7 +602,7 @@ func registerKVRangeLookups(r *testRegistry) { duration := " --duration=10m" readPercent := " --read-percent=50" // We run kv with --tolerate-errors, since the relocate workload is - // expected to create `result is ambiguous (removing replica)` errors. + // expected to create `result is ambiguous (replica removed)` errors. cmd = fmt.Sprintf("./workload run kv --tolerate-errors"+ concurrency+duration+readPercent+ " {pgurl:1-%d}", nodes) diff --git a/pkg/kv/kvserver/apply/cmd.go b/pkg/kv/kvserver/apply/cmd.go index d28df27a6765..f3b161b41d1a 100644 --- a/pkg/kv/kvserver/apply/cmd.go +++ b/pkg/kv/kvserver/apply/cmd.go @@ -24,6 +24,15 @@ type Command interface { // that were locally proposed typically have a client waiting on a // response, so there is additional urgency to apply them quickly. IsLocal() bool + // AckErrAndFinish signals that the application of the command has been + // rejected due to the provided error. It also relays this rejection of + // the command to its client if it was proposed locally. An error will + // immediately stall entry application, so one must only be returned if + // the state machine is no longer able to make progress. + // + // Either AckOutcomeAndFinish or AckErrAndFinish will be called exactly + // once per Command. + AckErrAndFinish(context.Context, error) error } // CheckedCommand is a command that has been checked to see whether it can @@ -41,7 +50,7 @@ type CheckedCommand interface { CanAckBeforeApplication() bool // AckSuccess acknowledges the success of the command to its client. // Must only be called if !Rejected. - AckSuccess() error + AckSuccess(context.Context) error } // AppliedCommand is a command that has been applied to the replicated state @@ -52,13 +61,15 @@ type CheckedCommand interface { // the state machine. type AppliedCommand interface { CheckedCommand - // FinishAndAckOutcome signals that the application of the command has + // AckOutcomeAndFinish signals that the application of the command has // completed. It also acknowledges the outcome of the command to its // client if it was proposed locally. An error will immediately stall // entry application, so one must only be returned if the state machine - // is no longer able to make progress. The method will be called exactly + // is no longer able to make progress. + // + // Either AckOutcomeAndFinish or AckErrAndFinish will be called exactly // once per Command. - FinishAndAckOutcome(context.Context) error + AckOutcomeAndFinish(context.Context) error } // CommandIteratorBase is a common interface extended by all iterator and @@ -193,12 +204,35 @@ func mapCheckedCmdIter( return ret, nil } +// In the following three functions, fn is written with ctx as a 2nd param +// because callers want to bind it to methods that have Commands (or variants) +// as the receiver, which mandates that to be the first param. The caller didn't +// want to introduce a callback instead to make it clear that nothing escapes to +// the heap. + +// forEachCmdIter calls a closure on each command in the provided iterator. The +// function closes the provided iterator. +func forEachCmdIter( + ctx context.Context, iter CommandIterator, fn func(Command, context.Context) error, +) error { + defer iter.Close() + for iter.Valid() { + if err := fn(iter.Cur(), ctx); err != nil { + return err + } + iter.Next() + } + return nil +} + // forEachCheckedCmdIter calls a closure on each command in the provided // iterator. The function closes the provided iterator. -func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand) error) error { +func forEachCheckedCmdIter( + ctx context.Context, iter CheckedCommandIterator, fn func(CheckedCommand, context.Context) error, +) error { defer iter.Close() for iter.Valid() { - if err := fn(iter.CurChecked()); err != nil { + if err := fn(iter.CurChecked(), ctx); err != nil { return err } iter.Next() @@ -209,13 +243,7 @@ func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand) // forEachAppliedCmdIter calls a closure on each command in the provided // iterator. The function closes the provided iterator. func forEachAppliedCmdIter( - ctx context.Context, - iter AppliedCommandIterator, - // fn is weirdly written with ctx as a 2nd param because the caller wants to - // bind it to a method that has the AppliedCommand on the receiver, thus - // forcing that to be the first param. The caller didn't want to introduce a - // callback instead to make it clear that nothing escapes to the heap. - fn func(AppliedCommand, context.Context) error, + ctx context.Context, iter AppliedCommandIterator, fn func(AppliedCommand, context.Context) error, ) error { defer iter.Close() for iter.Valid() { diff --git a/pkg/kv/kvserver/apply/doc_test.go b/pkg/kv/kvserver/apply/doc_test.go index e91a4ecc25c4..6f34af877499 100644 --- a/pkg/kv/kvserver/apply/doc_test.go +++ b/pkg/kv/kvserver/apply/doc_test.go @@ -95,15 +95,15 @@ it likes. // applying side-effects of command 3 // applying side-effects of command 4 // finishing command 1; rejected=false - // finishing and acknowledging command 2; rejected=false - // finishing and acknowledging command 3; rejected=true + // acknowledging and finishing command 2; rejected=false + // acknowledging and finishing command 3; rejected=true // finishing command 4; rejected=false // applying batch with commands=[5] // applying side-effects of command 5 - // finishing and acknowledging command 5; rejected=false + // acknowledging and finishing command 5; rejected=false // applying batch with commands=[6 7] // applying side-effects of command 6 // applying side-effects of command 7 - // finishing and acknowledging command 6; rejected=true - // finishing and acknowledging command 7; rejected=false + // acknowledging and finishing command 6; rejected=true + // acknowledging and finishing command 7; rejected=false } diff --git a/pkg/kv/kvserver/apply/task.go b/pkg/kv/kvserver/apply/task.go index 456c59e907f6..63c4e53b6a69 100644 --- a/pkg/kv/kvserver/apply/task.go +++ b/pkg/kv/kvserver/apply/task.go @@ -55,8 +55,9 @@ type StateMachine interface { ApplySideEffects(CheckedCommand) (AppliedCommand, error) } -// ErrRemoved can be returned from ApplySideEffects which will stop the -// task from processing more commands and return immediately. +// ErrRemoved can be returned from ApplySideEffects which will stop the task +// from processing more commands and return immediately. The error should +// only be thrown by non-trivial commands. var ErrRemoved = errors.New("replica removed") // Batch accumulates a series of updates from Commands and performs them @@ -222,9 +223,9 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde // in the batch and can be acknowledged before they are actually applied. // Don't acknowledge rejected proposals early because the StateMachine may // want to retry the command instead of returning the error to the client. - return forEachCheckedCmdIter(stagedIter, func(cmd CheckedCommand) error { + return forEachCheckedCmdIter(ctx, stagedIter, func(cmd CheckedCommand, ctx context.Context) error { if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() { - return cmd.AckSuccess() + return cmd.AckSuccess(ctx) } return nil }) @@ -242,12 +243,21 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { t.assertDecoded() iter := t.dec.NewCommandIter() - defer iter.Close() for iter.Valid() { if err := t.applyOneBatch(ctx, iter); err != nil { + // If the batch threw an error, reject all remaining commands in the + // iterator to avoid leaking resources or leaving a proposer hanging. + // + // NOTE: forEachCmdIter closes iter. + if rejectErr := forEachCmdIter(ctx, iter, func(cmd Command, ctx context.Context) error { + return cmd.AckErrAndFinish(ctx, err) + }); rejectErr != nil { + return rejectErr + } return err } } + iter.Close() return nil } @@ -284,7 +294,7 @@ func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error { } // Finish and acknowledge the outcome of each command. - return forEachAppliedCmdIter(ctx, appliedIter, AppliedCommand.FinishAndAckOutcome) + return forEachAppliedCmdIter(ctx, appliedIter, AppliedCommand.AckOutcomeAndFinish) } // trivialPolicy encodes a batching policy that allows a batch to consist of diff --git a/pkg/kv/kvserver/apply/task_test.go b/pkg/kv/kvserver/apply/task_test.go index 8806409a995b..df5628bab70d 100644 --- a/pkg/kv/kvserver/apply/task_test.go +++ b/pkg/kv/kvserver/apply/task_test.go @@ -33,10 +33,11 @@ func setLogging(on bool) func() { } type cmd struct { - index uint64 - nonTrivial bool - nonLocal bool - shouldReject bool + index uint64 + nonTrivial bool + nonLocal bool + shouldReject bool + shouldThrowErrRemoved bool acked bool finished bool @@ -51,19 +52,27 @@ type appliedCmd struct { *checkedCmd } -func (c *cmd) Index() uint64 { return c.index } -func (c *cmd) IsTrivial() bool { return !c.nonTrivial } -func (c *cmd) IsLocal() bool { return !c.nonLocal } +func (c *cmd) Index() uint64 { return c.index } +func (c *cmd) IsTrivial() bool { return !c.nonTrivial } +func (c *cmd) IsLocal() bool { return !c.nonLocal } +func (c *cmd) AckErrAndFinish(_ context.Context, err error) error { + c.acked = true + c.finished = true + if logging { + fmt.Printf(" acknowledging rejected command %d with err=%s\n", c.Index(), err) + } + return nil +} func (c *checkedCmd) Rejected() bool { return c.rejected } func (c *checkedCmd) CanAckBeforeApplication() bool { return true } -func (c *checkedCmd) AckSuccess() error { +func (c *checkedCmd) AckSuccess(context.Context) error { c.acked = true if logging { fmt.Printf(" acknowledging command %d before application\n", c.Index()) } return nil } -func (c *appliedCmd) FinishAndAckOutcome(context.Context) error { +func (c *appliedCmd) AckOutcomeAndFinish(context.Context) error { c.finished = true if c.acked { if logging { @@ -71,7 +80,7 @@ func (c *appliedCmd) FinishAndAckOutcome(context.Context) error { } } else { if logging { - fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected()) + fmt.Printf(" acknowledging and finishing command %d; rejected=%t\n", c.Index(), c.Rejected()) } c.acked = true } @@ -136,6 +145,11 @@ func (sm *testStateMachine) ApplySideEffects( if logging { fmt.Printf(" applying side-effects of command %d\n", cmd.Index()) } + if cmd.shouldThrowErrRemoved { + err := apply.ErrRemoved + _ = cmd.AckErrAndFinish(context.Background(), err) + return nil, err + } acmd := appliedCmd{checkedCmd: cmd} return &acmd, nil } @@ -168,18 +182,20 @@ func (b *testBatch) Close() { } type testDecoder struct { - nonTrivial map[uint64]bool - nonLocal map[uint64]bool - shouldReject map[uint64]bool + nonTrivial map[uint64]bool + nonLocal map[uint64]bool + shouldReject map[uint64]bool + shouldThrowErrRemoved map[uint64]bool cmds []*cmd } func newTestDecoder() *testDecoder { return &testDecoder{ - nonTrivial: make(map[uint64]bool), - nonLocal: make(map[uint64]bool), - shouldReject: make(map[uint64]bool), + nonTrivial: make(map[uint64]bool), + nonLocal: make(map[uint64]bool), + shouldReject: make(map[uint64]bool), + shouldThrowErrRemoved: make(map[uint64]bool), } } @@ -188,10 +204,11 @@ func (d *testDecoder) DecodeAndBind(_ context.Context, ents []raftpb.Entry) (boo for i, ent := range ents { idx := ent.Index cmd := &cmd{ - index: idx, - nonTrivial: d.nonTrivial[idx], - nonLocal: d.nonLocal[idx], - shouldReject: d.shouldReject[idx], + index: idx, + nonTrivial: d.nonTrivial[idx], + nonLocal: d.nonLocal[idx], + shouldReject: d.shouldReject[idx], + shouldThrowErrRemoved: d.shouldThrowErrRemoved[idx], } d.cmds[i] = cmd if logging { @@ -347,3 +364,35 @@ func TestAckCommittedEntriesBeforeApplication(t *testing.T) { require.False(t, cmd.finished) } } + +func TestApplyCommittedEntriesWithErr(t *testing.T) { + ctx := context.Background() + ents := makeEntries(6) + + sm := getTestStateMachine() + dec := newTestDecoder() + dec.nonTrivial[3] = true + dec.shouldThrowErrRemoved[3] = true + dec.nonTrivial[6] = true + + // Use an apply.Task to apply all commands. + appT := apply.MakeTask(sm, dec) + defer appT.Close() + require.NoError(t, appT.Decode(ctx, ents)) + require.Equal(t, apply.ErrRemoved, appT.ApplyCommittedEntries(ctx)) + + // Assert that only commands up to the replica removal were applied. + exp := testStateMachine{ + batches: [][]uint64{{1, 2}, {3}}, + applied: []uint64{1, 2, 3}, + appliedSideEffects: []uint64{1, 2, 3}, + } + require.Equal(t, exp, *sm) + + // Assert that all commands were acknowledged and finished, even though not + // all were applied. + for _, cmd := range dec.cmds { + require.True(t, cmd.acked) + require.True(t, cmd.finished) + } +} diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 8bee792b7a6d..85a93ad22251 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -5289,7 +5289,7 @@ func TestReplicaRemovalClosesProposalQuota(t *testing.T) { RangeID: desc.RangeID, }, putArgs(k, bytes.Repeat([]byte{'a'}, 1000))) require.Regexp(t, - `result is ambiguous \(removing replica\)|`+ + `result is ambiguous \(replica removed\)|`+ `r`+strconv.Itoa(int(desc.RangeID))+" was not found on s1", pErr.GoError()) }(i) } diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index 51d590502d5e..cb47eacf14b3 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -57,7 +57,7 @@ type replicatedCmd struct { // ApplyCommittedEntries. ctx context.Context // sp is the tracing span corresponding to ctx. It is closed in - // FinishAndAckOutcome. This span "follows from" the proposer's span (even + // finishTracingSpan. This span "follows from" the proposer's span (even // when the proposer is remote; we marshall tracing info through the // proposal). sp opentracing.Span @@ -115,6 +115,16 @@ func (c *replicatedCmd) IsLocal() bool { return c.proposal != nil } +// AckErrAndFinish implements the apply.Command interface. +func (c *replicatedCmd) AckErrAndFinish(ctx context.Context, err error) error { + if c.IsLocal() { + c.response.Err = roachpb.NewError( + roachpb.NewAmbiguousResultError( + err.Error())) + } + return c.AckOutcomeAndFinish(ctx) +} + // Rejected implements the apply.CheckedCommand interface. func (c *replicatedCmd) Rejected() bool { return c.forcedErr != nil @@ -134,7 +144,7 @@ func (c *replicatedCmd) CanAckBeforeApplication() bool { } // AckSuccess implements the apply.CheckedCommand interface. -func (c *replicatedCmd) AckSuccess() error { +func (c *replicatedCmd) AckSuccess(_ context.Context) error { if !c.IsLocal() { return nil } @@ -153,8 +163,8 @@ func (c *replicatedCmd) AckSuccess() error { return nil } -// FinishAndAckOutcome implements the apply.AppliedCommand interface. -func (c *replicatedCmd) FinishAndAckOutcome(ctx context.Context) error { +// AckOutcomeAndFinish implements the apply.AppliedCommand interface. +func (c *replicatedCmd) AckOutcomeAndFinish(ctx context.Context) error { if c.IsLocal() { c.proposal.finishApplication(ctx, c.response) } diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 6ee4419b8dd5..0cc9f265f045 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -16,6 +16,7 @@ import ( "math" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -209,7 +210,9 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { // NB: each proposal needs its own version of the error (i.e. don't try to // share the error across proposals). p.finishApplication(ctx, proposalResult{ - Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")), + Err: roachpb.NewError( + roachpb.NewAmbiguousResultError( + apply.ErrRemoved.Error())), }) } r.mu.internalRaftGroup = nil