From 45936915a544a4bbb77dd4cd0f4cb902c663740e Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Wed, 27 Feb 2019 17:44:50 -0500 Subject: [PATCH 01/10] storage: Rename Replica.propose to evalAndPropose Release note: None --- pkg/storage/replica_raft.go | 8 +++++--- pkg/storage/replica_test.go | 14 +++++++------- pkg/storage/replica_write.go | 2 +- pkg/storage/store_test.go | 2 +- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 2f1577eb6f8c..c98274fb9c71 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -91,10 +91,12 @@ func makeIDKey() storagebase.CmdIDKey { return storagebase.CmdIDKey(idKeyBuf) } -// propose prepares the necessary pending command struct and initializes a +// evalAndPropose prepares the necessary pending command struct and initializes a // client command ID if one hasn't been. A verified lease is supplied // as a parameter if the command requires a lease; nil otherwise. It -// then proposes the command to Raft if necessary and returns +// then evaluates the command and proposes it to Raft on success. +// +// Return values: // - a channel which receives a response or error upon application // - a closure used to attempt to abandon the command. When called, it tries to // remove the pending command from the internal commands map. This is @@ -107,7 +109,7 @@ func makeIDKey() storagebase.CmdIDKey { // - the MaxLeaseIndex of the resulting proposal, if any. // - any error obtained during the creation or proposal of the command, in // which case the other returned values are zero. -func (r *Replica) propose( +func (r *Replica) evalAndPropose( ctx context.Context, lease roachpb.Lease, ba roachpb.BatchRequest, diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 621e4faba05b..741ca2fa317d 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -499,7 +499,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *l}) exLease, _ := r.GetLease() - ch, _, _, pErr := r.propose(context.TODO(), exLease, ba, nil, &allSpans) + ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, ba, nil, &allSpans) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1275,7 +1275,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { ba := roachpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) - ch, _, _, pErr := tc.repl.propose(context.Background(), exLease, ba, nil, &allSpans) + ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, ba, nil, &allSpans) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -7067,7 +7067,7 @@ func TestReplicaIDChangePending(t *testing.T) { }, Value: roachpb.MakeValueFromBytes([]byte("val")), }) - _, _, _, err := repl.propose(context.Background(), lease, ba, nil, &allSpans) + _, _, _, err := repl.evalAndPropose(context.Background(), lease, ba, nil, &allSpans) if err != nil { t.Fatal(err) } @@ -8525,7 +8525,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { } exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.propose( + ch, _, _, pErr := repl.evalAndPropose( context.Background(), exLease, ba, nil /* endCmds */, &allSpans, ) if pErr != nil { @@ -8572,7 +8572,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.propose( + ch, _, _, pErr := repl.evalAndPropose( context.Background(), exLease, ba, nil /* endCmds */, &allSpans, ) if pErr != nil { @@ -8637,7 +8637,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - _, _, _, pErr := repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + _, _, _, pErr := repl.evalAndPropose(ctx, exLease, ba, nil /* endCmds */, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8655,7 +8655,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba2.Timestamp = tc.Clock().Now() var pErr *roachpb.Error - ch, _, _, pErr = repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, ba, nil /* endCmds */, &allSpans) if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index ef4aa58ad787..29ce2974dcfa 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -168,7 +168,7 @@ func (r *Replica) tryExecuteWriteBatch( log.Event(ctx, "applied timestamp cache") - ch, tryAbandon, maxLeaseIndex, pErr := r.propose(ctx, lease, ba, endCmds, spans) + ch, tryAbandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, lease, ba, endCmds, spans) if pErr != nil { if maxLeaseIndex != 0 { log.Fatalf( diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 91e408521b0b..66d96a81ea34 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -726,7 +726,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { t.Fatal("replica was not marked as destroyed") } - if _, _, _, pErr := repl1.propose( + if _, _, _, pErr := repl1.evalAndPropose( context.Background(), lease, roachpb.BatchRequest{}, nil, &allSpans, ); !pErr.Equal(expErr) { t.Fatalf("expected error %s, but got %v", expErr, pErr) From bd8cbc5c39170b4b22cc8d04d03c97a05f007a8f Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Wed, 27 Feb 2019 18:13:28 -0500 Subject: [PATCH 02/10] storage: Move TestingProposalFilter a little earlier in the process Release note: None --- pkg/storage/replica_raft.go | 24 ++++++++++++------------ pkg/storage/replica_test.go | 10 ++++++---- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index c98274fb9c71..2c91aebe0ec0 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -227,6 +227,18 @@ func (r *Replica) evalAndPropose( return nil, nil, 0, roachpb.NewError(err) } + if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil { + filterArgs := storagebase.ProposalFilterArgs{ + Ctx: ctx, + Cmd: *proposal.command, + CmdID: idKey, + Req: ba, + } + if pErr := filter(filterArgs); pErr != nil { + return nil, nil, 0, pErr + } + } + // submitProposalLocked calls withRaftGroupLocked which requires that raftMu // is held, but only if Replica.mu.internalRaftGroup == nil. To avoid // locking Replica.raftMu in the common case where the raft group is @@ -280,18 +292,6 @@ func (r *Replica) evalAndPropose( log.Fatalf(ctx, "no MaxLeaseIndex returned for %s", ba) } - if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil { - filterArgs := storagebase.ProposalFilterArgs{ - Ctx: ctx, - Cmd: *proposal.command, - CmdID: idKey, - Req: ba, - } - if pErr := filter(filterArgs); pErr != nil { - return nil, nil, 0, pErr - } - } - if err := r.submitProposalLocked(proposal); err == raft.ErrProposalDropped { // Silently ignore dropped proposals (they were always silently ignored // prior to the introduction of ErrProposalDropped). diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 741ca2fa317d..8f08218f637c 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6232,10 +6232,12 @@ func TestProposalOverhead(t *testing.T) { t.Fatal(pErr) } - // NB: the expected overhead reflects the space overhead currently present in - // Raft commands. This test will fail if that overhead changes. Try to make - // this number go down and not up. - const expectedOverhead = 52 + // NB: the expected overhead reflects the space overhead currently + // present in Raft commands. This test will fail if that overhead + // changes. Try to make this number go down and not up. It slightly + // undercounts because our proposal filter is called before + // maxLeaseIndex and a few other fields are filled in. + const expectedOverhead = 48 if v := atomic.LoadUint32(&overhead); expectedOverhead != v { t.Fatalf("expected overhead of %d, but found %d", expectedOverhead, v) } From 93db613792f76477298ce2f89fb80e6ff3f421de Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Wed, 27 Feb 2019 21:01:35 -0500 Subject: [PATCH 03/10] storage: Refactor second half of evalAndPropose into proposeLocked Release note: None --- pkg/storage/replica_raft.go | 67 +++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 2c91aebe0ec0..1770836a9acc 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -265,6 +265,43 @@ func (r *Replica) evalAndPropose( if r.mu.commandSizes != nil { r.mu.commandSizes[proposal.idKey] = proposalSize } + maxLeaseIndex, pErr := r.proposeLocked(ctx, proposal, lease) + if pErr != nil { + return nil, nil, 0, pErr + } + // Must not use `proposal` in the closure below as a proposal which is not + // present in r.mu.proposals is no longer protected by the mutex. Abandoning + // a command only abandons the associated context. As soon as we propose a + // command to Raft, ownership passes to the "below Raft" machinery. In + // particular, endCmds will be invoked when the command is applied. There are + // a handful of cases where the command may not be applied (or even + // processed): the process crashes or the local replica is removed from the + // range. + tryAbandon := func() bool { + r.mu.Lock() + p, ok := r.mu.proposals[idKey] + if ok { + // TODO(radu): Should this context be created via tracer.ForkCtxSpan? + // We'd need to make sure the span is finished eventually. + p.ctx = r.AnnotateCtx(context.TODO()) + } + r.mu.Unlock() + return ok + } + return proposalCh, tryAbandon, maxLeaseIndex, nil +} + +// proposeLocked starts tracking a command and proposes it to raft. If +// this method succeeds, the caller is responsible for eventually +// removing the proposal from the pending map (on success, in +// processRaftCommand, or on failure via cleanupFailedProposalLocked). +// +// This method requires that r.mu is held in all cases, and if +// r.mu.internalRaftGroup is nil, r.raftMu must also be held (note +// that lock ordering requires that raftMu be acquired first). +func (r *Replica) proposeLocked( + ctx context.Context, proposal *ProposalData, lease roachpb.Lease, +) (_ int64, pErr *roachpb.Error) { // Make sure we clean up the proposal if we fail to submit it successfully. // This is important both to ensure that that the proposals map doesn't // grow without bound and to ensure that we always release any quota that @@ -280,17 +317,14 @@ func (r *Replica) evalAndPropose( // and the acquisition of Replica.mu. Failure to do so will leave pending // proposals that never get cleared. if !r.mu.destroyStatus.IsAlive() { - return nil, nil, 0, roachpb.NewError(r.mu.destroyStatus.err) + return 0, roachpb.NewError(r.mu.destroyStatus.err) } repDesc, err := r.getReplicaDescriptorRLocked() if err != nil { - return nil, nil, 0, roachpb.NewError(err) + return 0, roachpb.NewError(err) } maxLeaseIndex := r.insertProposalLocked(proposal, repDesc, lease) - if maxLeaseIndex == 0 && !ba.IsLeaseRequest() { - log.Fatalf(ctx, "no MaxLeaseIndex returned for %s", ba) - } if err := r.submitProposalLocked(proposal); err == raft.ErrProposalDropped { // Silently ignore dropped proposals (they were always silently ignored @@ -298,29 +332,10 @@ func (r *Replica) evalAndPropose( // TODO(bdarnell): Handle ErrProposalDropped better. // https://github.com/cockroachdb/cockroach/issues/21849 } else if err != nil { - return nil, nil, 0, roachpb.NewError(err) + return 0, roachpb.NewError(err) } - // Must not use `proposal` in the closure below as a proposal which is not - // present in r.mu.proposals is no longer protected by the mutex. Abandoning - // a command only abandons the associated context. As soon as we propose a - // command to Raft, ownership passes to the "below Raft" machinery. In - // particular, endCmds will be invoked when the command is applied. There are - // a handful of cases where the command may not be applied (or even - // processed): the process crashes or the local replica is removed from the - // range. - tryAbandon := func() bool { - r.mu.Lock() - p, ok := r.mu.proposals[idKey] - if ok { - // TODO(radu): Should this context be created via tracer.ForkCtxSpan? - // We'd need to make sure the span is finished eventually. - p.ctx = r.AnnotateCtx(context.TODO()) - } - r.mu.Unlock() - return ok - } - return proposalCh, tryAbandon, maxLeaseIndex, nil + return maxLeaseIndex, nil } // submitProposalLocked proposes or re-proposes a command in r.mu.proposals. From 92f70d77473bcf04377e6cea6b46ba4bbef11fb4 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Thu, 21 Feb 2019 10:43:28 -0500 Subject: [PATCH 04/10] storage: Move illegal lease index retries down into raft Pipelined writes wait for commands to be evaluated, then detach before they apply. Retryable errors generated at evaluation time are handled correctly (by DistSender or above) even for pipelined writes, but pipelined writes have lost the ability to handle apply-time retry conditions in the executeWriteBatch loop (there used to be more of these, but illegal lease index is now the only condition retried at this level now). To remedy this, this commit moves reproposals due to illegal lease indexes below raft (but only on the proposing node) In practice, this can happen to writes that race with a raft leadership transfer. This is observable immediately after table creation, since table creation performs a split, and then may perform a lease transfer to balance load (and if the lease is transfer, raft leadership is transferred afterward). Specifically, 1. Lease is transferred from store s1 to s2. s1 is still raft leader. 2. Write w1 evaluates on store s2 and is assigned lease index i1. s2 forwards the proposal to s1. 3. s1 initiates raft leader transfer to s2. This puts it into a temporarily leaderless state so it drops the forwarded proposal. 4. s2 is elected raft leader, completing the transfer. 5. A second write w2 evalutes on s2, is assigned lease index i2, and goes right in the raft log since s2 is both leaseholder and leader. 6. s2 refreshes proposals as a side effect of becoming leader, and writes w1 to the log with lease index i1. 7. s2 applies w2, then w1. w1 fails because of the out of order lease index. If w1 was pipelined, the client stops listening after step 2, and won't learn of the failure until it tries to commit. At this point the commit returns an ASYNC_WRITE_FAILURE retry to the client. Note that in some cases, NotLeaseHolderError can be generated at apply time. These errors cannot be retried (since the proposer has lost its lease) so they will unavoidably result in an ASYNC_WRITE_FAILURE error to the client. However, this is uncommon - most NotLeaseHolderErrors are generated at evaluation time, which is compatible with pipelined writes. Fixes #28876 Release note (bug fix): Reduced the occurrence of ASYNC_WRITE_FAILURE transaction retry errors, especially for the first insert into a newly-created table. --- pkg/storage/batcheval/cmd_lease_test.go | 110 ++++++++++++++++++++++++ pkg/storage/replica_raft.go | 32 ++++++- pkg/storage/replica_test.go | 21 +---- 3 files changed, 142 insertions(+), 21 deletions(-) create mode 100644 pkg/storage/batcheval/cmd_lease_test.go diff --git a/pkg/storage/batcheval/cmd_lease_test.go b/pkg/storage/batcheval/cmd_lease_test.go new file mode 100644 index 000000000000..30caf6e405de --- /dev/null +++ b/pkg/storage/batcheval/cmd_lease_test.go @@ -0,0 +1,110 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. + +package batcheval + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// TestLeaseTransferWithPipelinedWrite verifies that pipelined writes +// do not cause retry errors to be leaked to clients when the error +// can be handled internally. Pipelining dissociates a write from its +// caller, so the retries of internally-generated errors (specifically +// out-of-order lease indexes) must be retried below that level. +// +// This issue was observed in practice to affect the first insert +// after table creation with high probability. +func TestLeaseTransferWithPipelinedWrite(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + tc := serverutils.StartTestCluster(t, 3, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + + for iter := 0; iter < 100; iter++ { + log.Infof(ctx, "iter %d", iter) + if _, err := db.ExecContext(ctx, "drop table if exists test"); err != nil { + t.Fatal(err) + } + if _, err := db.ExecContext(ctx, "create table test (a int, b int, primary key (a, b))"); err != nil { + t.Fatal(err) + } + + workerErrCh := make(chan error, 1) + go func() { + workerErrCh <- func() error { + for i := 0; i < 1; i++ { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { + if tx != nil { + if err := tx.Rollback(); err != nil { + log.Warningf(ctx, "error rolling back: %s", err) + } + } + }() + // Run two inserts in a transaction to ensure that we have + // pipelined writes that cannot be retried at the SQL layer + // due to the first-statement rule. + if _, err := tx.ExecContext(ctx, "INSERT INTO test (a, b) VALUES ($1, $2)", i, 1); err != nil { + return err + } + if _, err := tx.ExecContext(ctx, "INSERT INTO test (a, b) VALUES ($1, $2)", i, 2); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + tx = nil + } + return nil + }() + }() + + // TODO(bdarnell): This test reliably reproduced the issue when + // introduced, because table creation causes splits and repeated + // table creation leads to lease transfers due to rebalancing. + // This is a subtle thing to rely on and the test might become + // more reliable if we ran more iterations in the worker goroutine + // and added a second goroutine to explicitly transfer leases back + // and forth. + + select { + case <-time.After(15 * time.Second): + // TODO(bdarnell): The test seems flaky under stress with a 5s + // timeout. Why? I'm giving it a high timeout since hanging + // isn't a failure mode we're particularly concerned about here, + // but it shouldn't be taking this long even with stress. + t.Fatal("timed out") + case err := <-workerErrCh: + if err != nil { + t.Fatalf("worker failed: %s", err) + } + } + } +} diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 1770836a9acc..89f700c31685 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1782,7 +1782,6 @@ func (r *Replica) processRaftCommand( if proposedLocally { // We initiated this command, so use the caller-supplied context. ctx = proposal.ctx - proposal.ctx = nil // avoid confusion delete(r.mu.proposals, idKey) } @@ -2077,7 +2076,21 @@ func (r *Replica) processRaftCommand( } if proposedLocally { - proposal.finishApplication(response) + if response.ProposalRetry == proposalIllegalLeaseIndex { + // If we failed to apply at the right lease index, try again with a new one. + r.mu.Lock() + lease := *r.mu.state.Lease + r.mu.Unlock() + if lease.OwnedBy(r.StoreID()) { + // Some tests check for this log message in the trace. + log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") + // Use a goroutine so this doesn't count as "marshaling a proto downstream of raft". + go proposeAfterIllegalLeaseIndex(ctx, r, proposal, *r.mu.state.Lease) + return false + } + } else { + proposal.finishApplication(response) + } } else if response.Err != nil { log.VEventf(ctx, 1, "applying raft command resulted in error: %s", response.Err) } @@ -2085,6 +2098,21 @@ func (r *Replica) processRaftCommand( return raftCmd.ReplicatedEvalResult.ChangeReplicas != nil } +// proposeAfterIllegalLeaseIndex is used by processRaftCommand to +// repropose commands that have gotten an illegal lease index error. +// It is not intended for use elsewhere and is only a top-level +// function so that it can avoid the below_raft_protos check. +func proposeAfterIllegalLeaseIndex( + ctx context.Context, r *Replica, proposal *ProposalData, lease roachpb.Lease, +) { + r.mu.Lock() + defer r.mu.Unlock() + _, pErr := r.proposeLocked(ctx, proposal, lease) + if pErr != nil { + panic(pErr) + } +} + // maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes // any replicas and, if so, locks them for subsumption. See acquireMergeLock // for details about the lock itself. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 8f08218f637c..05c72f81d539 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7199,29 +7199,12 @@ func TestReplicaRetryRaftProposal(t *testing.T) { context.WithValue(ctx, magicKey{}, "foo"), ba, ) - if retry != proposalIllegalLeaseIndex { - t.Fatalf("expected retry from illegal lease index, but got (%v, %v, %d)", br, pErr, retry) - } - if exp, act := int32(1), atomic.LoadInt32(&c); exp != act { - t.Fatalf("expected %d proposals, got %d", exp, act) - } - } - - atomic.StoreInt32(&c, 0) - { - br, pErr := tc.repl.executeWriteBatch( - context.WithValue(ctx, magicKey{}, "foo"), - ba, - ) - if pErr != nil { - t.Fatal(pErr) + if retry != proposalNoReevaluation { + t.Fatalf("expected no retry from illegal lease index, but got (%v, %v, %d)", br, pErr, retry) } if exp, act := int32(2), atomic.LoadInt32(&c); exp != act { t.Fatalf("expected %d proposals, got %d", exp, act) } - if resp, ok := br.Responses[0].GetInner().(*roachpb.IncrementResponse); !ok || resp.NewValue != expInc { - t.Fatalf("expected new value %d, got (%t, %+v)", expInc, ok, resp) - } } // Test LeaseRequest since it's special: MaxLeaseIndex plays no role and so From c06321144a8a5b7ca36be21229787fff7c384535 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Wed, 27 Feb 2019 23:13:08 -0500 Subject: [PATCH 05/10] storage: Remove proposalIllegalLeaseIndex result from refreshProposals This adds considerable complexity for a rare edge case (not exercised by our test suite). The normal case for illegal lease indexes is for a retry to be triggered in processRaftCommand. Retrying in refreshProposals is a slight optimization to allow the retry to start earlier, but this is not common enough to optimize. Release note: None --- pkg/storage/replica_raft.go | 128 +++++++++++-------------------- pkg/storage/track_raft_protos.go | 3 + 2 files changed, 49 insertions(+), 82 deletions(-) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 89f700c31685..e07805d7915c 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1005,7 +1005,6 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR log.Fatalf(context.TODO(), "refreshAtDelta specified for reason %s != reasonTicks", reason) } - numShouldRetry := 0 var reproposals pendingCmdSlice for _, p := range r.mu.proposals { if p.command.MaxLeaseIndex == 0 { @@ -1019,75 +1018,43 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR roachpb.NewAmbiguousResultError( fmt.Sprintf("unknown status for command without MaxLeaseIndex "+ "at refreshProposalsLocked time (refresh reason: %s)", reason)))}) - } else if cannotApplyAnyMore := !p.command.ReplicatedEvalResult.IsLeaseRequest && - p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex; cannotApplyAnyMore { - // The command's designated lease index slot was filled up. We got to - // LeaseAppliedIndex and p is still pending in r.mu.proposals; generally - // this means that proposal p didn't commit, and it will be sent back to - // the proposer for a retry - the request needs to be re-evaluated and the - // command re-proposed with a new MaxLeaseIndex. Note that this branch is not - // taken for leases as their MaxLeaseIndex plays no role in deciding whether - // they can apply or not -- the sequence number of the previous lease matters. - // - // An exception is the case when we're refreshing because of - // reasonSnapshotApplied - in that case we don't know if p or some other - // command filled the p.command.MaxLeaseIndex slot (i.e. p might have been - // applied, but we're not watching for every proposal when applying a - // snapshot, so nobody removed p from r.mu.proposals). In this - // ambiguous case, we return an ambiguous error. - // - // NOTE: We used to perform a re-evaluation here in order to avoid the - // ambiguity, but it was cumbersome because the higher layer needed to be - // aware that if the re-evaluation failed in any way, it must rewrite the - // error to reflect the ambiguity. This mechanism also proved very hard to - // test and also is of questionable utility since snapshots are only - // applied in the first place if the leaseholder is divorced from the Raft - // leader. - // - // Note that we can't use the commit index here (which is typically a - // little ahead), because a pending command is removed only as it - // applies. Thus we'd risk reproposing a command that has been committed - // but not yet applied. - r.cleanupFailedProposalLocked(p) - log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason) - if reason == reasonSnapshotApplied { + continue + } + switch reason { + case reasonSnapshotApplied: + // If we applied a snapshot, check the MaxLeaseIndexes of all + // pending commands to see if any are now prevented from + // applying, and if so make them return an ambiguous error. We + // can't tell at this point (which should be rare) whether they + // were included in the snapshot we received or not. + if p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex { + r.cleanupFailedProposalLocked(p) + log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason) p.finishApplication(proposalResult{Err: roachpb.NewError( roachpb.NewAmbiguousResultError( fmt.Sprintf("unable to determine whether command was applied via snapshot")))}) - } else { - p.finishApplication(proposalResult{ProposalRetry: proposalIllegalLeaseIndex}) } - numShouldRetry++ - } else if reason == reasonTicks && p.proposedAtTicks > r.mu.ticks-refreshAtDelta { - // The command was proposed too recently, don't bother reproprosing it - // yet. - } else { - // The proposal can still apply according to its MaxLeaseIndex. But it's - // likely that the proposal was dropped, so we're going to repropose it - // directly; it's cheaper than asking the proposer to re-evaluate and - // re-propose. Note that we don't need to worry about receiving ambiguous - // responses for this reproposal - we'll only be handling one response - // across the original proposal and the reproposal because we're - // reproposing with the same MaxLeaseIndex and the same idKey as the - // original (a possible second response for the same idKey would be - // ignored). - // - // This situation happens when we've proposed commands that never made it - // to the leader - usually because we didn't know who the leader is. When - // we do discover the leader we need to repropose the command. In local - // testing, by far the most common reason for these reproposals is the - // initial leader election after a split. Specifically, when a range is - // split when the next command is proposed to the RHS the leader is - // elected. After the election completes the command is reproposed for - // both reasonNewLeader and reasonNewLeaderOrConfigChange. + continue + + case reasonTicks: + if p.proposedAtTicks <= r.mu.ticks-refreshAtDelta { + // The command was proposed a while ago and may have been dropped. Try it again. + reproposals = append(reproposals, p) + } + + default: + // We have reason to believe that all pending proposals were + // dropped on the floor (e.g. because of a leader election), so + // repropose everything. reproposals = append(reproposals, p) } } - if log.V(1) && (numShouldRetry > 0 || len(reproposals) > 0) { + + if log.V(1) && len(reproposals) > 0 { ctx := r.AnnotateCtx(context.TODO()) log.Infof(ctx, - "pending commands: sent %d back to client, reproposing %d (at %d.%d) %s", - numShouldRetry, len(reproposals), r.mu.state.RaftAppliedIndex, + "pending commands: reproposing %d (at %d.%d) %s", + len(reproposals), r.mu.state.RaftAppliedIndex, r.mu.state.LeaseAppliedIndex, reason) } @@ -2076,21 +2043,12 @@ func (r *Replica) processRaftCommand( } if proposedLocally { - if response.ProposalRetry == proposalIllegalLeaseIndex { - // If we failed to apply at the right lease index, try again with a new one. - r.mu.Lock() - lease := *r.mu.state.Lease - r.mu.Unlock() - if lease.OwnedBy(r.StoreID()) { - // Some tests check for this log message in the trace. - log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") - // Use a goroutine so this doesn't count as "marshaling a proto downstream of raft". - go proposeAfterIllegalLeaseIndex(ctx, r, proposal, *r.mu.state.Lease) - return false - } - } else { - proposal.finishApplication(response) + // If we failed to apply at the right lease index, try again with a new one. + if response.ProposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { + return false } + // Otherwise, signal the command's status to the client. + proposal.finishApplication(response) } else if response.Err != nil { log.VEventf(ctx, 1, "applying raft command resulted in error: %s", response.Err) } @@ -2098,19 +2056,25 @@ func (r *Replica) processRaftCommand( return raftCmd.ReplicatedEvalResult.ChangeReplicas != nil } -// proposeAfterIllegalLeaseIndex is used by processRaftCommand to +// tryReproposeWithNewLeaseIndex is used by processRaftCommand to // repropose commands that have gotten an illegal lease index error. // It is not intended for use elsewhere and is only a top-level // function so that it can avoid the below_raft_protos check. -func proposeAfterIllegalLeaseIndex( - ctx context.Context, r *Replica, proposal *ProposalData, lease roachpb.Lease, -) { +// Returns true if the command was successfully reproposed. +func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { r.mu.Lock() defer r.mu.Unlock() - _, pErr := r.proposeLocked(ctx, proposal, lease) - if pErr != nil { - panic(pErr) + lease := *r.mu.state.Lease + if lease.OwnedBy(r.StoreID()) && lease.Sequence == proposal.command.ProposerLeaseSequence { + // Some tests check for this log message in the trace. + log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") + if _, pErr := r.proposeLocked(proposal.ctx, proposal, lease); pErr != nil { + log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr) + return false + } + return true } + return false } // maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes diff --git a/pkg/storage/track_raft_protos.go b/pkg/storage/track_raft_protos.go index eb7cedd812ae..4fa806cba371 100644 --- a/pkg/storage/track_raft_protos.go +++ b/pkg/storage/track_raft_protos.go @@ -52,6 +52,9 @@ func TrackRaftProtos() func() []reflect.Type { // but tombstones are unreplicated and thus not subject to the strict // consistency requirements. funcName((*Replica).setTombstoneKey), + // tryReproposeWithNewLeaseIndex is only run on the replica that + // proposed the command. + funcName((*Replica).tryReproposeWithNewLeaseIndex), } belowRaftProtos := struct { From 5cf20cb12b451c7f5c3ce9a84e2767885ba39717 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Thu, 28 Feb 2019 18:44:04 -0500 Subject: [PATCH 06/10] storage: Merge executeWriteBatch and tryExecuteWriteBatch The only case in which we made multiple 'tries' was lease index errors, which no longer reach this level. Release note: None --- pkg/storage/replica.go | 6 ++-- pkg/storage/replica_proposal.go | 5 ++- pkg/storage/replica_read.go | 2 +- pkg/storage/replica_test.go | 7 +++-- pkg/storage/replica_write.go | 55 ++++++++++----------------------- 5 files changed, 27 insertions(+), 48 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 5c33d914b587..46b182868ca8 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -985,13 +985,11 @@ type endCmds struct { // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. -func (ec *endCmds) done( - br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalReevaluationReason, -) { +func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) { // Update the timestamp cache if the request is not being re-evaluated. Each // request is considered in turn; only those marked as affecting the cache are // processed. Inconsistent reads are excluded. - if retry == proposalNoReevaluation && ec.ba.ReadConsistency == roachpb.CONSISTENT { + if ec.ba.ReadConsistency == roachpb.CONSISTENT { ec.repl.updateTimestampCache(&ec.ba, br, pErr) } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 77dc5c9349ee..55f84975c2a4 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -110,7 +110,10 @@ type ProposalData struct { // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { if proposal.endCmds != nil { - proposal.endCmds.done(pr.Reply, pr.Err, pr.ProposalRetry) + if pr.ProposalRetry != proposalNoReevaluation { + log.Fatalf(context.Background(), "expected no reevaluation") + } + proposal.endCmds.done(pr.Reply, pr.Err) proposal.endCmds = nil } if proposal.sp != nil { diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index ef0f6f8bcb91..d10a0cfd8f01 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -68,7 +68,7 @@ func (r *Replica) executeReadOnlyBatch( // timestamp cache update is synchronized. This is wrapped to delay // pErr evaluation to its value when returning. defer func() { - endCmds.done(br, pErr, proposalNoReevaluation) + endCmds.done(br, pErr) }() // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 05c72f81d539..854f3552bde9 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7195,13 +7195,14 @@ func TestReplicaRetryRaftProposal(t *testing.T) { iArg := incrementArgs(roachpb.Key("b"), expInc) ba.Add(&iArg) { - br, pErr, retry := tc.repl.tryExecuteWriteBatch( + _, pErr := tc.repl.executeWriteBatch( context.WithValue(ctx, magicKey{}, "foo"), ba, ) - if retry != proposalNoReevaluation { - t.Fatalf("expected no retry from illegal lease index, but got (%v, %v, %d)", br, pErr, retry) + if pErr != nil { + t.Fatalf("write batch returned error: %s", pErr) } + // The command was reproposed internally, for two total proposals. if exp, act := int32(2), atomic.LoadInt32(&c); exp != act { t.Fatalf("expected %d proposals, got %d", exp, act) } diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index 29ce2974dcfa..a8eabc65f049 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -39,31 +39,6 @@ import ( // alternative, submitting requests to Raft one after another, paying massive // latency, is only taken for commands whose effects may overlap. // -// Internally, multiple iterations of the above process may take place -// due to the Raft proposal failing retryably, possibly due to proposal -// reordering or re-proposals. We call these retry "re-evaluations" since the -// request is evaluated again (against a fresh engine snapshot). -func (r *Replica) executeWriteBatch( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - for { - // TODO(andrei): export some metric about re-evaluations. - br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba) - if retry == proposalIllegalLeaseIndex { - log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") - if pErr != nil { - log.Fatalf(ctx, "both error and retry returned: %s", pErr) - } - continue // retry - } - return br, pErr - } -} - -// tryExecuteWriteBatch is invoked by executeWriteBatch, which will -// call this method until it returns a non-retryable result (i.e. no -// proposalRetryReason is returned). -// // Concretely, // // - Latches for the keys affected by the command are acquired (i.e. @@ -87,18 +62,18 @@ func (r *Replica) executeWriteBatch( // NB: changing BatchRequest to a pointer here would have to be done cautiously // as this method makes the assumption that it operates on a shallow copy (see // call to applyTimestampCache). -func (r *Replica) tryExecuteWriteBatch( +func (r *Replica) executeWriteBatch( ctx context.Context, ba roachpb.BatchRequest, -) (br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalReevaluationReason) { +) (br *roachpb.BatchResponse, pErr *roachpb.Error) { startTime := timeutil.Now() if err := r.maybeBackpressureWriteBatch(ctx, ba); err != nil { - return nil, roachpb.NewError(err), proposalNoReevaluation + return nil, roachpb.NewError(err) } spans, err := r.collectSpans(&ba) if err != nil { - return nil, roachpb.NewError(err), proposalNoReevaluation + return nil, roachpb.NewError(err) } var endCmds *endCmds @@ -111,7 +86,7 @@ func (r *Replica) tryExecuteWriteBatch( var err error endCmds, err = r.beginCmds(ctx, &ba, spans) if err != nil { - return nil, roachpb.NewError(err), proposalNoReevaluation + return nil, roachpb.NewError(err) } } @@ -119,7 +94,7 @@ func (r *Replica) tryExecuteWriteBatch( // wrapped to delay pErr evaluation to its value when returning. defer func() { if endCmds != nil { - endCmds.done(br, pErr, retry) + endCmds.done(br, pErr) } }() @@ -132,7 +107,7 @@ func (r *Replica) tryExecuteWriteBatch( // Other write commands require that this replica has the range // lease. if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { - return nil, pErr, proposalNoReevaluation + return nil, pErr } lease = status.Lease } @@ -146,7 +121,7 @@ func (r *Replica) tryExecuteWriteBatch( // forward. Or, in the case of a transactional write, the txn // timestamp and possible write-too-old bool. if bumped, pErr := r.applyTimestampCache(ctx, &ba, minTS); pErr != nil { - return nil, pErr, proposalNoReevaluation + return nil, pErr } else if bumped { // If we bump the transaction's timestamp, we must absolutely // tell the client in a response transaction (for otherwise it @@ -176,7 +151,7 @@ func (r *Replica) tryExecuteWriteBatch( maxLeaseIndex, ba, pErr, ) } - return nil, pErr, proposalNoReevaluation + return nil, pErr } // A max lease index of zero is returned when no proposal was made or a lease was proposed. // In both cases, we don't need to communicate a MLAI. @@ -218,7 +193,10 @@ func (r *Replica) tryExecuteWriteBatch( log.Warning(ctx, err) } } - return propResult.Reply, propResult.Err, propResult.ProposalRetry + if propResult.ProposalRetry != proposalNoReevaluation { + log.Fatalf(ctx, "expected no reevaluation") + } + return propResult.Reply, propResult.Err case <-slowTimer.C: slowTimer.Read = true log.Warningf(ctx, `have been waiting %.2fs for proposing command %s. @@ -242,11 +220,10 @@ and the following Raft status: %+v`, r.store.metrics.SlowRaftRequests.Dec(1) log.Infof( ctx, - "slow command %s finished after %.2fs with error %v, retry %d", + "slow command %s finished after %.2fs with error %v", ba, timeutil.Since(tBegin).Seconds(), pErr, - retry, ) }() @@ -259,7 +236,7 @@ and the following Raft status: %+v`, if tryAbandon() { log.VEventf(ctx, 2, "context cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) - return nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())), proposalNoReevaluation + return nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())) } ctxDone = nil case <-shouldQuiesce: @@ -272,7 +249,7 @@ and the following Raft status: %+v`, if tryAbandon() { log.VEventf(ctx, 2, "shutdown cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) - return nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")), proposalNoReevaluation + return nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")) } shouldQuiesce = nil } From 7f8656005a683c59c76daf30f5a38f9bf84a786d Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Thu, 28 Feb 2019 18:52:04 -0500 Subject: [PATCH 07/10] storage: Remove the no-longer-used proposalResult.ProposalRetry Release note: None --- pkg/storage/replica_proposal.go | 15 +++++---------- pkg/storage/replica_raft.go | 9 +++------ pkg/storage/replica_write.go | 3 --- 3 files changed, 8 insertions(+), 19 deletions(-) diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 55f84975c2a4..e6e469a3016b 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -110,9 +110,6 @@ type ProposalData struct { // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { if proposal.endCmds != nil { - if pr.ProposalRetry != proposalNoReevaluation { - log.Fatalf(context.Background(), "expected no reevaluation") - } proposal.endCmds.done(pr.Reply, pr.Err) proposal.endCmds = nil } @@ -811,14 +808,12 @@ func (r *Replica) handleEvalResultRaftMuLocked( } // proposalResult indicates the result of a proposal. Exactly one of -// Reply, Err and ProposalRetry is set, and it represents the result of -// the proposal. +// Reply and Err is set, and it represents the result of the proposal. type proposalResult struct { - Reply *roachpb.BatchResponse - Err *roachpb.Error - ProposalRetry proposalReevaluationReason - Intents []result.IntentsWithArg - EndTxns []result.EndTxnIntents + Reply *roachpb.BatchResponse + Err *roachpb.Error + Intents []result.IntentsWithArg + EndTxns []result.EndTxnIntents } // evaluateProposal generates a Result from the given request by diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index e07805d7915c..17f29a0f14c6 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1958,11 +1958,8 @@ func (r *Replica) processRaftCommand( var lResult *result.LocalResult if proposedLocally { - if proposalRetry != proposalNoReevaluation { - response.ProposalRetry = proposalRetry - if pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) - } + if proposalRetry != proposalNoReevaluation && pErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) } if pErr != nil { // A forced error was set (i.e. we did not apply the proposal, @@ -2044,7 +2041,7 @@ func (r *Replica) processRaftCommand( if proposedLocally { // If we failed to apply at the right lease index, try again with a new one. - if response.ProposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { + if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { return false } // Otherwise, signal the command's status to the client. diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index a8eabc65f049..0f0fccbdf198 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -193,9 +193,6 @@ func (r *Replica) executeWriteBatch( log.Warning(ctx, err) } } - if propResult.ProposalRetry != proposalNoReevaluation { - log.Fatalf(ctx, "expected no reevaluation") - } return propResult.Reply, propResult.Err case <-slowTimer.C: slowTimer.Read = true From 12a950766e4beb67fe3ffc72ca6f0d86801626ee Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Fri, 1 Mar 2019 12:33:11 -0500 Subject: [PATCH 08/10] storage: Remove arguments from insertProposalLocked This method didn't do anything with its arguments but assign them to attributes of the proposal. By making this assignment a prerequisite of the method we can avoid any question of whether they may be changed in tryReproposeWithNewLeaseIndex Release note: None --- pkg/storage/replica_raft.go | 45 +++++++++++++++++++------------------ pkg/storage/replica_test.go | 16 +++++++++---- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 17f29a0f14c6..e7a2874ddfb8 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -49,9 +49,7 @@ import ( // insertProposalLocked assigns a MaxLeaseIndex to a proposal and adds // it to the pending map. Returns the assigned MaxLeaseIndex, if any. -func (r *Replica) insertProposalLocked( - proposal *ProposalData, proposerReplica roachpb.ReplicaDescriptor, proposerLease roachpb.Lease, -) int64 { +func (r *Replica) insertProposalLocked(proposal *ProposalData) int64 { // Assign a lease index. Note that we do this as late as possible // to make sure (to the extent that we can) that we don't assign // (=predict) the index differently from the order in which commands are @@ -64,8 +62,10 @@ func (r *Replica) insertProposalLocked( r.mu.lastAssignedLeaseIndex++ } proposal.command.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex - proposal.command.ProposerReplica = proposerReplica - proposal.command.ProposerLeaseSequence = proposerLease.Sequence + if proposal.command.ProposerReplica == (roachpb.ReplicaDescriptor{}) { + // 0 is a valid LeaseSequence value so we can't actually enforce it. + log.Fatalf(context.TODO(), "ProposerReplica and ProposerLeaseSequence must be filled in") + } if log.V(4) { log.Infof(proposal.ctx, "submitting proposal %x: maxLeaseIndex=%d", @@ -265,7 +265,16 @@ func (r *Replica) evalAndPropose( if r.mu.commandSizes != nil { r.mu.commandSizes[proposal.idKey] = proposalSize } - maxLeaseIndex, pErr := r.proposeLocked(ctx, proposal, lease) + + // Record the proposer and lease sequence. + repDesc, err := r.getReplicaDescriptorRLocked() + if err != nil { + return nil, nil, 0, roachpb.NewError(err) + } + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = lease.Sequence + + maxLeaseIndex, pErr := r.proposeLocked(ctx, proposal) if pErr != nil { return nil, nil, 0, pErr } @@ -300,7 +309,7 @@ func (r *Replica) evalAndPropose( // r.mu.internalRaftGroup is nil, r.raftMu must also be held (note // that lock ordering requires that raftMu be acquired first). func (r *Replica) proposeLocked( - ctx context.Context, proposal *ProposalData, lease roachpb.Lease, + ctx context.Context, proposal *ProposalData, ) (_ int64, pErr *roachpb.Error) { // Make sure we clean up the proposal if we fail to submit it successfully. // This is important both to ensure that that the proposals map doesn't @@ -320,11 +329,7 @@ func (r *Replica) proposeLocked( return 0, roachpb.NewError(r.mu.destroyStatus.err) } - repDesc, err := r.getReplicaDescriptorRLocked() - if err != nil { - return 0, roachpb.NewError(err) - } - maxLeaseIndex := r.insertProposalLocked(proposal, repDesc, lease) + maxLeaseIndex := r.insertProposalLocked(proposal) if err := r.submitProposalLocked(proposal); err == raft.ErrProposalDropped { // Silently ignore dropped proposals (they were always silently ignored @@ -2061,17 +2066,13 @@ func (r *Replica) processRaftCommand( func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { r.mu.Lock() defer r.mu.Unlock() - lease := *r.mu.state.Lease - if lease.OwnedBy(r.StoreID()) && lease.Sequence == proposal.command.ProposerLeaseSequence { - // Some tests check for this log message in the trace. - log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") - if _, pErr := r.proposeLocked(proposal.ctx, proposal, lease); pErr != nil { - log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr) - return false - } - return true + // Some tests check for this log message in the trace. + log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") + if _, pErr := r.proposeLocked(proposal.ctx, proposal); pErr != nil { + log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr) + return false } - return false + return true } // maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 854f3552bde9..c91080980535 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7274,7 +7274,9 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { } repl.mu.Lock() - repl.insertProposalLocked(proposal, repDesc, lease) + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = lease.Sequence + repl.insertProposalLocked(proposal) // We actually propose the command only if we don't // cancel it to simulate the case in which Raft loses // the command and it isn't reproposed due to the @@ -7352,7 +7354,9 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { tc.repl.raftMu.Lock() tc.repl.mu.Lock() - tc.repl.insertProposalLocked(cmd, repDesc, status.Lease) + cmd.command.ProposerReplica = repDesc + cmd.command.ProposerLeaseSequence = status.Lease.Sequence + tc.repl.insertProposalLocked(cmd) chs = append(chs, cmd.doneCh) tc.repl.mu.Unlock() tc.repl.raftMu.Unlock() @@ -7458,7 +7462,9 @@ func TestReplicaLeaseReproposal(t *testing.T) { // Decrement the MaxLeaseIndex. If refreshProposalsLocked didn't know that // MaxLeaseIndex doesn't matter for lease requests, it might be tempted to // end the proposal early (since it would assume that it couldn't commit). - repl.insertProposalLocked(proposal, repDesc, lease) + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = lease.Sequence + repl.insertProposalLocked(proposal) proposal.command.MaxLeaseIndex = ai - 1 repl.refreshProposalsLocked(1 /* delta */, reasonTicks) select { @@ -7549,7 +7555,9 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { dropProposals.Unlock() r.mu.Lock() - r.insertProposalLocked(cmd, repDesc, lease) + cmd.command.ProposerReplica = repDesc + cmd.command.ProposerLeaseSequence = lease.Sequence + r.insertProposalLocked(cmd) if err := r.submitProposalLocked(cmd); err != nil { t.Error(err) } From a854c8dd30f1de65ca3155b8dee62ed077bbea3c Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Fri, 8 Mar 2019 13:23:45 -0500 Subject: [PATCH 09/10] storage: Improve comments around lease reproposals Release note: None --- pkg/storage/replica_raft.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index e7a2874ddfb8..6b2b77fed765 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2045,7 +2045,10 @@ func (r *Replica) processRaftCommand( } if proposedLocally { - // If we failed to apply at the right lease index, try again with a new one. + // If we failed to apply at the right lease index, try again with + // a new one. This is important for pipelined writes, since they + // don't have a client watching to retry, so a failure to + // eventually apply the proposal would be a user-visible error. if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { return false } @@ -2059,10 +2062,15 @@ func (r *Replica) processRaftCommand( } // tryReproposeWithNewLeaseIndex is used by processRaftCommand to -// repropose commands that have gotten an illegal lease index error. +// repropose commands that have gotten an illegal lease index error, +// and that we know could not have applied while their lease index was +// valid (that is, we observed all applied entries between proposal +// and the lease index becoming invalid, as opposed to skipping some +// of them by applying a snapshot). +// // It is not intended for use elsewhere and is only a top-level -// function so that it can avoid the below_raft_protos check. -// Returns true if the command was successfully reproposed. +// function so that it can avoid the below_raft_protos check. Returns +// true if the command was successfully reproposed. func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { r.mu.Lock() defer r.mu.Unlock() From f7dc847636bdb672116db5b71f5ff9b811979524 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sat, 16 Mar 2019 22:42:26 -0400 Subject: [PATCH 10/10] storage: Avoid multiple reproposals for lease index errors Moving lease index reproposals below raft introduced a bug in which the same proposal could be applied more than once at different lease indexes. (This was originally caught via an assertion in the handling of TruncateLog below raft). This commit adds a test and fixes the bug. Release note: None --- pkg/storage/batcheval/cmd_lease_test.go | 3 +- pkg/storage/replica_raft.go | 25 ++++- pkg/storage/replica_test.go | 143 ++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 3 deletions(-) diff --git a/pkg/storage/batcheval/cmd_lease_test.go b/pkg/storage/batcheval/cmd_lease_test.go index 30caf6e405de..40bba5227319 100644 --- a/pkg/storage/batcheval/cmd_lease_test.go +++ b/pkg/storage/batcheval/cmd_lease_test.go @@ -44,7 +44,8 @@ func TestLeaseTransferWithPipelinedWrite(t *testing.T) { db := tc.ServerConn(0) - for iter := 0; iter < 100; iter++ { + // More than 30 iterations is flaky under stressrace on teamcity. + for iter := 0; iter < 30; iter++ { log.Infof(ctx, "iter %d", iter) if _, err := db.ExecContext(ctx, "drop table if exists test"); err != nil { t.Fatal(err) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 6b2b77fed765..38644698bb47 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1426,7 +1426,7 @@ func (r *Replica) checkForcedErrLocked( if proposedLocally { log.VEventf( ctx, 1, - "retry proposal %x: applied at lease index %d, required <= %d", + "retry proposal %x: applied at lease index %d, required < %d", proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, ) retry = proposalIllegalLeaseIndex @@ -2070,10 +2070,31 @@ func (r *Replica) processRaftCommand( // // It is not intended for use elsewhere and is only a top-level // function so that it can avoid the below_raft_protos check. Returns -// true if the command was successfully reproposed. +// true if the command has been successfully reproposed (not +// necessarily by this method! But if this method returns true, the +// command will be in the local proposals map). func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { r.mu.Lock() defer r.mu.Unlock() + // Note that we don't need to validate anything about the proposal's + // lease here - if we got this far, we know that everything but the + // index is valid at this point in the log. + if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex { + // If the command's MaxLeaseIndex is greater than the + // LeaseAppliedIndex, it must have already been reproposed (this + // can happen if there are multiple copies of the command in the + // logs; see TestReplicaRefreshMultiple). We must not create + // multiple copies with multiple lease indexes, so don't repropose + // it again. + // + // Note that the caller has already removed the current version of + // the proposal from the pending proposals map. We must re-add it + // since it's still pending. + log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d", + proposal.command.MaxLeaseIndex) + r.mu.proposals[proposal.idKey] = proposal + return true + } // Some tests check for this log message in the trace. log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") if _, pErr := r.proposeLocked(proposal.ctx, proposal); pErr != nil { diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index c91080980535..bb4cedaafff2 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7602,6 +7602,149 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } } +// TestReplicaRefreshMultiple tests an interaction between refreshing +// proposals after a new leader or ticks (which results in multiple +// copies in the log with the same lease index) and refreshing after +// an illegal lease index error (with a new lease index assigned). +// +// The setup here is rather artificial, but it represents something +// that can happen (very rarely) in the real world with multiple raft +// leadership transfers. +func TestReplicaRefreshMultiple(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + var filterActive int32 + var incCmdID storagebase.CmdIDKey + var incApplyCount int64 + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) { + if atomic.LoadInt32(&filterActive) != 0 && filterArgs.CmdID == incCmdID { + atomic.AddInt64(&incApplyCount, 1) + } + return 0, nil + } + var tc testContext + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, tsc) + repl := tc.repl + + repDesc, err := repl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + + key := roachpb.Key("a") + + // Run a few commands first: This advances the lease index, which is + // necessary for the tricks we're going to play to induce failures + // (we need to be able to subtract from the current lease index + // without going below 0). + for i := 0; i < 3; i++ { + inc := incrementArgs(key, 1) + if _, pErr := client.SendWrapped(ctx, tc.Sender(), &inc); pErr != nil { + t.Fatal(pErr) + } + } + // Sanity check the resulting value. + get := getArgs(key) + if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { + t.Fatal(pErr) + } else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatalf("returned non-int: %s", err) + } else if x != 3 { + t.Fatalf("expected 3, got %d", x) + } + + // Manually propose another increment. This is the one we'll + // manipulate into failing. (the use of increment here is not + // significant. I originally wrote it this way because I thought the + // non-idempotence of increment would make it easier to test, but + // since the reproposals we're concerned with don't result in + // reevaluation it doesn't matter) + inc := incrementArgs(key, 1) + var ba roachpb.BatchRequest + ba.Add(&inc) + ba.Timestamp = tc.Clock().Now() + + incCmdID = makeIDKey() + atomic.StoreInt32(&filterActive, 1) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, nil, &allSpans) + if pErr != nil { + t.Fatal(pErr) + } + // Save this channel; it may get reset to nil before we read from it. + proposalDoneCh := proposal.doneCh + + // Propose the command manually with errors induced. + func() { + repl.mu.Lock() + defer repl.mu.Unlock() + ai := repl.mu.state.LeaseAppliedIndex + if ai <= 1 { + // Lease index zero is special in this test because we subtract + // from it below, so we need enough previous proposals in the + // log to ensure it doesn't go negative. + t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) + } + // Manually run parts of the proposal process. Insert it, then + // tweak the lease index to ensure it will generate a retry when + // it fails. Don't call submitProposal (to simulate a dropped + // message). Then call refreshProposals twice to repropose it and + // put it in the logs twice. (submit + refresh should be + // equivalent to the double refresh used here) + // + // Note that all of this is under the same lock so there's no + // chance of it applying and exiting the pending state before we + // refresh. + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = repl.mu.state.Lease.Sequence + repl.insertProposalLocked(proposal) + proposal.command.MaxLeaseIndex = ai - 1 + repl.refreshProposalsLocked(0, reasonNewLeader) + repl.refreshProposalsLocked(0, reasonNewLeader) + }() + + // Wait for our proposal to apply. The two refreshed proposals above + // will fail due to their illegal lease index. Then they'll generate + // a reproposal (in the bug that we're testing against, they'd + // *each* generate a reproposal). When this reproposal succeeds, the + // doneCh is signaled. + select { + case resp := <-proposalDoneCh: + if resp.Err != nil { + t.Fatal(resp.Err) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out") + } + // In the buggy case, there's a second reproposal that we don't have + // a good way to observe, so just sleep to let it apply if it's in + // the system. + time.Sleep(10 * time.Millisecond) + + // The command applied exactly once. Note that this check would pass + // even in the buggy case, since illegal lease index proposals do + // not generate reevaluations (and increment is handled upstream of + // raft). + if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { + t.Fatal(pErr) + } else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatalf("returned non-int: %s", err) + } else if x != 4 { + t.Fatalf("expected 4, got %d", x) + } + + // The real test: our apply filter can tell us whether there was a + // duplicate reproposal. (A reproposed increment isn't harmful, but + // some other commands could be) + if x := atomic.LoadInt64(&incApplyCount); x != 1 { + t.Fatalf("expected 1, got %d", x) + } +} + // TestGCWithoutThreshold validates that GCRequest only declares the threshold // keys which are subject to change, and that it does not access these keys if // it does not declare them.