diff --git a/pkg/storage/batcheval/cmd_lease_test.go b/pkg/storage/batcheval/cmd_lease_test.go new file mode 100644 index 000000000000..40bba5227319 --- /dev/null +++ b/pkg/storage/batcheval/cmd_lease_test.go @@ -0,0 +1,111 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. + +package batcheval + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// TestLeaseTransferWithPipelinedWrite verifies that pipelined writes +// do not cause retry errors to be leaked to clients when the error +// can be handled internally. Pipelining dissociates a write from its +// caller, so the retries of internally-generated errors (specifically +// out-of-order lease indexes) must be retried below that level. +// +// This issue was observed in practice to affect the first insert +// after table creation with high probability. +func TestLeaseTransferWithPipelinedWrite(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + tc := serverutils.StartTestCluster(t, 3, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + + // More than 30 iterations is flaky under stressrace on teamcity. + for iter := 0; iter < 30; iter++ { + log.Infof(ctx, "iter %d", iter) + if _, err := db.ExecContext(ctx, "drop table if exists test"); err != nil { + t.Fatal(err) + } + if _, err := db.ExecContext(ctx, "create table test (a int, b int, primary key (a, b))"); err != nil { + t.Fatal(err) + } + + workerErrCh := make(chan error, 1) + go func() { + workerErrCh <- func() error { + for i := 0; i < 1; i++ { + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { + if tx != nil { + if err := tx.Rollback(); err != nil { + log.Warningf(ctx, "error rolling back: %s", err) + } + } + }() + // Run two inserts in a transaction to ensure that we have + // pipelined writes that cannot be retried at the SQL layer + // due to the first-statement rule. + if _, err := tx.ExecContext(ctx, "INSERT INTO test (a, b) VALUES ($1, $2)", i, 1); err != nil { + return err + } + if _, err := tx.ExecContext(ctx, "INSERT INTO test (a, b) VALUES ($1, $2)", i, 2); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + tx = nil + } + return nil + }() + }() + + // TODO(bdarnell): This test reliably reproduced the issue when + // introduced, because table creation causes splits and repeated + // table creation leads to lease transfers due to rebalancing. + // This is a subtle thing to rely on and the test might become + // more reliable if we ran more iterations in the worker goroutine + // and added a second goroutine to explicitly transfer leases back + // and forth. + + select { + case <-time.After(15 * time.Second): + // TODO(bdarnell): The test seems flaky under stress with a 5s + // timeout. Why? I'm giving it a high timeout since hanging + // isn't a failure mode we're particularly concerned about here, + // but it shouldn't be taking this long even with stress. + t.Fatal("timed out") + case err := <-workerErrCh: + if err != nil { + t.Fatalf("worker failed: %s", err) + } + } + } +} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 991181ec014f..4cb52d9e12de 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -988,13 +988,11 @@ type endCmds struct { // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. -func (ec *endCmds) done( - br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalReevaluationReason, -) { +func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) { // Update the timestamp cache if the request is not being re-evaluated. Each // request is considered in turn; only those marked as affecting the cache are // processed. Inconsistent reads are excluded. - if retry == proposalNoReevaluation && ec.ba.ReadConsistency == roachpb.CONSISTENT { + if ec.ba.ReadConsistency == roachpb.CONSISTENT { ec.repl.updateTimestampCache(&ec.ba, br, pErr) } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 77dc5c9349ee..e6e469a3016b 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -110,7 +110,7 @@ type ProposalData struct { // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { if proposal.endCmds != nil { - proposal.endCmds.done(pr.Reply, pr.Err, pr.ProposalRetry) + proposal.endCmds.done(pr.Reply, pr.Err) proposal.endCmds = nil } if proposal.sp != nil { @@ -808,14 +808,12 @@ func (r *Replica) handleEvalResultRaftMuLocked( } // proposalResult indicates the result of a proposal. Exactly one of -// Reply, Err and ProposalRetry is set, and it represents the result of -// the proposal. +// Reply and Err is set, and it represents the result of the proposal. type proposalResult struct { - Reply *roachpb.BatchResponse - Err *roachpb.Error - ProposalRetry proposalReevaluationReason - Intents []result.IntentsWithArg - EndTxns []result.EndTxnIntents + Reply *roachpb.BatchResponse + Err *roachpb.Error + Intents []result.IntentsWithArg + EndTxns []result.EndTxnIntents } // evaluateProposal generates a Result from the given request by diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 21fc60ed6f0f..b54e59a0c09b 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -49,9 +49,7 @@ import ( // insertProposalLocked assigns a MaxLeaseIndex to a proposal and adds // it to the pending map. Returns the assigned MaxLeaseIndex, if any. -func (r *Replica) insertProposalLocked( - proposal *ProposalData, proposerReplica roachpb.ReplicaDescriptor, proposerLease roachpb.Lease, -) int64 { +func (r *Replica) insertProposalLocked(proposal *ProposalData) int64 { // Assign a lease index. Note that we do this as late as possible // to make sure (to the extent that we can) that we don't assign // (=predict) the index differently from the order in which commands are @@ -64,8 +62,10 @@ func (r *Replica) insertProposalLocked( r.mu.lastAssignedLeaseIndex++ } proposal.command.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex - proposal.command.ProposerReplica = proposerReplica - proposal.command.ProposerLeaseSequence = proposerLease.Sequence + if proposal.command.ProposerReplica == (roachpb.ReplicaDescriptor{}) { + // 0 is a valid LeaseSequence value so we can't actually enforce it. + log.Fatalf(context.TODO(), "ProposerReplica and ProposerLeaseSequence must be filled in") + } if log.V(4) { log.Infof(proposal.ctx, "submitting proposal %x: maxLeaseIndex=%d", @@ -91,10 +91,12 @@ func makeIDKey() storagebase.CmdIDKey { return storagebase.CmdIDKey(idKeyBuf) } -// propose prepares the necessary pending command struct and initializes a +// evalAndPropose prepares the necessary pending command struct and initializes a // client command ID if one hasn't been. A verified lease is supplied // as a parameter if the command requires a lease; nil otherwise. It -// then proposes the command to Raft if necessary and returns +// then evaluates the command and proposes it to Raft on success. +// +// Return values: // - a channel which receives a response or error upon application // - a closure used to attempt to abandon the command. When called, it tries to // remove the pending command from the internal commands map. This is @@ -107,7 +109,7 @@ func makeIDKey() storagebase.CmdIDKey { // - the MaxLeaseIndex of the resulting proposal, if any. // - any error obtained during the creation or proposal of the command, in // which case the other returned values are zero. -func (r *Replica) propose( +func (r *Replica) evalAndPropose( ctx context.Context, lease roachpb.Lease, ba roachpb.BatchRequest, @@ -225,6 +227,18 @@ func (r *Replica) propose( return nil, nil, 0, roachpb.NewError(err) } + if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil { + filterArgs := storagebase.ProposalFilterArgs{ + Ctx: ctx, + Cmd: *proposal.command, + CmdID: idKey, + Req: ba, + } + if pErr := filter(filterArgs); pErr != nil { + return nil, nil, 0, pErr + } + } + // submitProposalLocked calls withRaftGroupLocked which requires that raftMu // is held, but only if Replica.mu.internalRaftGroup == nil. To avoid // locking Replica.raftMu in the common case where the raft group is @@ -251,54 +265,19 @@ func (r *Replica) propose( if r.mu.commandSizes != nil { r.mu.commandSizes[proposal.idKey] = proposalSize } - // Make sure we clean up the proposal if we fail to submit it successfully. - // This is important both to ensure that that the proposals map doesn't - // grow without bound and to ensure that we always release any quota that - // we acquire. - defer func() { - if pErr != nil { - r.cleanupFailedProposalLocked(proposal) - } - }() - - // NB: We need to check Replica.mu.destroyStatus again in case the Replica has - // been destroyed between the initial check at the beginning of this method - // and the acquisition of Replica.mu. Failure to do so will leave pending - // proposals that never get cleared. - if !r.mu.destroyStatus.IsAlive() { - return nil, nil, 0, roachpb.NewError(r.mu.destroyStatus.err) - } + // Record the proposer and lease sequence. repDesc, err := r.getReplicaDescriptorRLocked() if err != nil { return nil, nil, 0, roachpb.NewError(err) } - maxLeaseIndex := r.insertProposalLocked(proposal, repDesc, lease) - if maxLeaseIndex == 0 && !ba.IsLeaseRequest() { - log.Fatalf(ctx, "no MaxLeaseIndex returned for %s", ba) - } - - if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil { - filterArgs := storagebase.ProposalFilterArgs{ - Ctx: ctx, - Cmd: *proposal.command, - CmdID: idKey, - Req: ba, - } - if pErr := filter(filterArgs); pErr != nil { - return nil, nil, 0, pErr - } - } + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = lease.Sequence - if err := r.submitProposalLocked(proposal); err == raft.ErrProposalDropped { - // Silently ignore dropped proposals (they were always silently ignored - // prior to the introduction of ErrProposalDropped). - // TODO(bdarnell): Handle ErrProposalDropped better. - // https://github.com/cockroachdb/cockroach/issues/21849 - } else if err != nil { - return nil, nil, 0, roachpb.NewError(err) + maxLeaseIndex, pErr := r.proposeLocked(ctx, proposal) + if pErr != nil { + return nil, nil, 0, pErr } - // Must not use `proposal` in the closure below as a proposal which is not // present in r.mu.proposals is no longer protected by the mutex. Abandoning // a command only abandons the associated context. As soon as we propose a @@ -321,6 +300,49 @@ func (r *Replica) propose( return proposalCh, tryAbandon, maxLeaseIndex, nil } +// proposeLocked starts tracking a command and proposes it to raft. If +// this method succeeds, the caller is responsible for eventually +// removing the proposal from the pending map (on success, in +// processRaftCommand, or on failure via cleanupFailedProposalLocked). +// +// This method requires that r.mu is held in all cases, and if +// r.mu.internalRaftGroup is nil, r.raftMu must also be held (note +// that lock ordering requires that raftMu be acquired first). +func (r *Replica) proposeLocked( + ctx context.Context, proposal *ProposalData, +) (_ int64, pErr *roachpb.Error) { + // Make sure we clean up the proposal if we fail to submit it successfully. + // This is important both to ensure that that the proposals map doesn't + // grow without bound and to ensure that we always release any quota that + // we acquire. + defer func() { + if pErr != nil { + r.cleanupFailedProposalLocked(proposal) + } + }() + + // NB: We need to check Replica.mu.destroyStatus again in case the Replica has + // been destroyed between the initial check at the beginning of this method + // and the acquisition of Replica.mu. Failure to do so will leave pending + // proposals that never get cleared. + if !r.mu.destroyStatus.IsAlive() { + return 0, roachpb.NewError(r.mu.destroyStatus.err) + } + + maxLeaseIndex := r.insertProposalLocked(proposal) + + if err := r.submitProposalLocked(proposal); err == raft.ErrProposalDropped { + // Silently ignore dropped proposals (they were always silently ignored + // prior to the introduction of ErrProposalDropped). + // TODO(bdarnell): Handle ErrProposalDropped better. + // https://github.com/cockroachdb/cockroach/issues/21849 + } else if err != nil { + return 0, roachpb.NewError(err) + } + + return maxLeaseIndex, nil +} + // submitProposalLocked proposes or re-proposes a command in r.mu.proposals. // The replica lock must be held. func (r *Replica) submitProposalLocked(p *ProposalData) error { @@ -988,7 +1010,6 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR log.Fatalf(context.TODO(), "refreshAtDelta specified for reason %s != reasonTicks", reason) } - numShouldRetry := 0 var reproposals pendingCmdSlice for _, p := range r.mu.proposals { if p.command.MaxLeaseIndex == 0 { @@ -1002,75 +1023,43 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR roachpb.NewAmbiguousResultError( fmt.Sprintf("unknown status for command without MaxLeaseIndex "+ "at refreshProposalsLocked time (refresh reason: %s)", reason)))}) - } else if cannotApplyAnyMore := !p.command.ReplicatedEvalResult.IsLeaseRequest && - p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex; cannotApplyAnyMore { - // The command's designated lease index slot was filled up. We got to - // LeaseAppliedIndex and p is still pending in r.mu.proposals; generally - // this means that proposal p didn't commit, and it will be sent back to - // the proposer for a retry - the request needs to be re-evaluated and the - // command re-proposed with a new MaxLeaseIndex. Note that this branch is not - // taken for leases as their MaxLeaseIndex plays no role in deciding whether - // they can apply or not -- the sequence number of the previous lease matters. - // - // An exception is the case when we're refreshing because of - // reasonSnapshotApplied - in that case we don't know if p or some other - // command filled the p.command.MaxLeaseIndex slot (i.e. p might have been - // applied, but we're not watching for every proposal when applying a - // snapshot, so nobody removed p from r.mu.proposals). In this - // ambiguous case, we return an ambiguous error. - // - // NOTE: We used to perform a re-evaluation here in order to avoid the - // ambiguity, but it was cumbersome because the higher layer needed to be - // aware that if the re-evaluation failed in any way, it must rewrite the - // error to reflect the ambiguity. This mechanism also proved very hard to - // test and also is of questionable utility since snapshots are only - // applied in the first place if the leaseholder is divorced from the Raft - // leader. - // - // Note that we can't use the commit index here (which is typically a - // little ahead), because a pending command is removed only as it - // applies. Thus we'd risk reproposing a command that has been committed - // but not yet applied. - r.cleanupFailedProposalLocked(p) - log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason) - if reason == reasonSnapshotApplied { + continue + } + switch reason { + case reasonSnapshotApplied: + // If we applied a snapshot, check the MaxLeaseIndexes of all + // pending commands to see if any are now prevented from + // applying, and if so make them return an ambiguous error. We + // can't tell at this point (which should be rare) whether they + // were included in the snapshot we received or not. + if p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex { + r.cleanupFailedProposalLocked(p) + log.Eventf(p.ctx, "retry proposal %x: %s", p.idKey, reason) p.finishApplication(proposalResult{Err: roachpb.NewError( roachpb.NewAmbiguousResultError( fmt.Sprintf("unable to determine whether command was applied via snapshot")))}) - } else { - p.finishApplication(proposalResult{ProposalRetry: proposalIllegalLeaseIndex}) } - numShouldRetry++ - } else if reason == reasonTicks && p.proposedAtTicks > r.mu.ticks-refreshAtDelta { - // The command was proposed too recently, don't bother reproprosing it - // yet. - } else { - // The proposal can still apply according to its MaxLeaseIndex. But it's - // likely that the proposal was dropped, so we're going to repropose it - // directly; it's cheaper than asking the proposer to re-evaluate and - // re-propose. Note that we don't need to worry about receiving ambiguous - // responses for this reproposal - we'll only be handling one response - // across the original proposal and the reproposal because we're - // reproposing with the same MaxLeaseIndex and the same idKey as the - // original (a possible second response for the same idKey would be - // ignored). - // - // This situation happens when we've proposed commands that never made it - // to the leader - usually because we didn't know who the leader is. When - // we do discover the leader we need to repropose the command. In local - // testing, by far the most common reason for these reproposals is the - // initial leader election after a split. Specifically, when a range is - // split when the next command is proposed to the RHS the leader is - // elected. After the election completes the command is reproposed for - // both reasonNewLeader and reasonNewLeaderOrConfigChange. + continue + + case reasonTicks: + if p.proposedAtTicks <= r.mu.ticks-refreshAtDelta { + // The command was proposed a while ago and may have been dropped. Try it again. + reproposals = append(reproposals, p) + } + + default: + // We have reason to believe that all pending proposals were + // dropped on the floor (e.g. because of a leader election), so + // repropose everything. reproposals = append(reproposals, p) } } - if log.V(1) && (numShouldRetry > 0 || len(reproposals) > 0) { + + if log.V(1) && len(reproposals) > 0 { ctx := r.AnnotateCtx(context.TODO()) log.Infof(ctx, - "pending commands: sent %d back to client, reproposing %d (at %d.%d) %s", - numShouldRetry, len(reproposals), r.mu.state.RaftAppliedIndex, + "pending commands: reproposing %d (at %d.%d) %s", + len(reproposals), r.mu.state.RaftAppliedIndex, r.mu.state.LeaseAppliedIndex, reason) } @@ -1437,7 +1426,7 @@ func (r *Replica) checkForcedErrLocked( if proposedLocally { log.VEventf( ctx, 1, - "retry proposal %x: applied at lease index %d, required <= %d", + "retry proposal %x: applied at lease index %d, required < %d", proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, ) retry = proposalIllegalLeaseIndex @@ -1765,7 +1754,6 @@ func (r *Replica) processRaftCommand( if proposedLocally { // We initiated this command, so use the caller-supplied context. ctx = proposal.ctx - proposal.ctx = nil // avoid confusion delete(r.mu.proposals, idKey) } @@ -1975,11 +1963,8 @@ func (r *Replica) processRaftCommand( var lResult *result.LocalResult if proposedLocally { - if proposalRetry != proposalNoReevaluation { - response.ProposalRetry = proposalRetry - if pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) - } + if proposalRetry != proposalNoReevaluation && pErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) } if pErr != nil { // A forced error was set (i.e. we did not apply the proposal, @@ -2060,6 +2045,14 @@ func (r *Replica) processRaftCommand( } if proposedLocally { + // If we failed to apply at the right lease index, try again with + // a new one. This is important for pipelined writes, since they + // don't have a client watching to retry, so a failure to + // eventually apply the proposal would be a user-visible error. + if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { + return false + } + // Otherwise, signal the command's status to the client. proposal.finishApplication(response) } else if response.Err != nil { log.VEventf(ctx, 1, "applying raft command resulted in error: %s", response.Err) @@ -2068,6 +2061,49 @@ func (r *Replica) processRaftCommand( return raftCmd.ReplicatedEvalResult.ChangeReplicas != nil } +// tryReproposeWithNewLeaseIndex is used by processRaftCommand to +// repropose commands that have gotten an illegal lease index error, +// and that we know could not have applied while their lease index was +// valid (that is, we observed all applied entries between proposal +// and the lease index becoming invalid, as opposed to skipping some +// of them by applying a snapshot). +// +// It is not intended for use elsewhere and is only a top-level +// function so that it can avoid the below_raft_protos check. Returns +// true if the command has been successfully reproposed (not +// necessarily by this method! But if this method returns true, the +// command will be in the local proposals map). +func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { + r.mu.Lock() + defer r.mu.Unlock() + // Note that we don't need to validate anything about the proposal's + // lease here - if we got this far, we know that everything but the + // index is valid at this point in the log. + if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex { + // If the command's MaxLeaseIndex is greater than the + // LeaseAppliedIndex, it must have already been reproposed (this + // can happen if there are multiple copies of the command in the + // logs; see TestReplicaRefreshMultiple). We must not create + // multiple copies with multiple lease indexes, so don't repropose + // it again. + // + // Note that the caller has already removed the current version of + // the proposal from the pending proposals map. We must re-add it + // since it's still pending. + log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d", + proposal.command.MaxLeaseIndex) + r.mu.proposals[proposal.idKey] = proposal + return true + } + // Some tests check for this log message in the trace. + log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") + if _, pErr := r.proposeLocked(proposal.ctx, proposal); pErr != nil { + log.Warningf(proposal.ctx, "failed to repropose with new lease index: %s", pErr) + return false + } + return true +} + // maybeAcquireSnapshotMergeLock checks whether the incoming snapshot subsumes // any replicas and, if so, locks them for subsumption. See acquireMergeLock // for details about the lock itself. diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index ef0f6f8bcb91..d10a0cfd8f01 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -68,7 +68,7 @@ func (r *Replica) executeReadOnlyBatch( // timestamp cache update is synchronized. This is wrapped to delay // pErr evaluation to its value when returning. defer func() { - endCmds.done(br, pErr, proposalNoReevaluation) + endCmds.done(br, pErr) }() // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 3f8f5ca4f5db..8b24a75f6fb9 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -501,7 +501,7 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *l}) exLease, _ := r.GetLease() - ch, _, _, pErr := r.propose(context.TODO(), exLease, ba, nil, &allSpans) + ch, _, _, pErr := r.evalAndPropose(context.TODO(), exLease, ba, nil, &allSpans) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1277,7 +1277,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { ba := roachpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) - ch, _, _, pErr := tc.repl.propose(context.Background(), exLease, ba, nil, &allSpans) + ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), exLease, ba, nil, &allSpans) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -6231,10 +6231,12 @@ func TestProposalOverhead(t *testing.T) { t.Fatal(pErr) } - // NB: the expected overhead reflects the space overhead currently present in - // Raft commands. This test will fail if that overhead changes. Try to make - // this number go down and not up. - const expectedOverhead = 52 + // NB: the expected overhead reflects the space overhead currently + // present in Raft commands. This test will fail if that overhead + // changes. Try to make this number go down and not up. It slightly + // undercounts because our proposal filter is called before + // maxLeaseIndex and a few other fields are filled in. + const expectedOverhead = 48 if v := atomic.LoadUint32(&overhead); expectedOverhead != v { t.Fatalf("expected overhead of %d, but found %d", expectedOverhead, v) } @@ -7066,7 +7068,7 @@ func TestReplicaIDChangePending(t *testing.T) { }, Value: roachpb.MakeValueFromBytes([]byte("val")), }) - _, _, _, err := repl.propose(context.Background(), lease, ba, nil, &allSpans) + _, _, _, err := repl.evalAndPropose(context.Background(), lease, ba, nil, &allSpans) if err != nil { t.Fatal(err) } @@ -7192,33 +7194,17 @@ func TestReplicaRetryRaftProposal(t *testing.T) { iArg := incrementArgs(roachpb.Key("b"), expInc) ba.Add(&iArg) { - br, pErr, retry := tc.repl.tryExecuteWriteBatch( - context.WithValue(ctx, magicKey{}, "foo"), - ba, - ) - if retry != proposalIllegalLeaseIndex { - t.Fatalf("expected retry from illegal lease index, but got (%v, %v, %d)", br, pErr, retry) - } - if exp, act := int32(1), atomic.LoadInt32(&c); exp != act { - t.Fatalf("expected %d proposals, got %d", exp, act) - } - } - - atomic.StoreInt32(&c, 0) - { - br, pErr := tc.repl.executeWriteBatch( + _, pErr := tc.repl.executeWriteBatch( context.WithValue(ctx, magicKey{}, "foo"), ba, ) if pErr != nil { - t.Fatal(pErr) + t.Fatalf("write batch returned error: %s", pErr) } + // The command was reproposed internally, for two total proposals. if exp, act := int32(2), atomic.LoadInt32(&c); exp != act { t.Fatalf("expected %d proposals, got %d", exp, act) } - if resp, ok := br.Responses[0].GetInner().(*roachpb.IncrementResponse); !ok || resp.NewValue != expInc { - t.Fatalf("expected new value %d, got (%t, %+v)", expInc, ok, resp) - } } // Test LeaseRequest since it's special: MaxLeaseIndex plays no role and so @@ -7287,7 +7273,9 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { } repl.mu.Lock() - repl.insertProposalLocked(proposal, repDesc, lease) + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = lease.Sequence + repl.insertProposalLocked(proposal) // We actually propose the command only if we don't // cancel it to simulate the case in which Raft loses // the command and it isn't reproposed due to the @@ -7365,7 +7353,9 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { tc.repl.raftMu.Lock() tc.repl.mu.Lock() - tc.repl.insertProposalLocked(cmd, repDesc, status.Lease) + cmd.command.ProposerReplica = repDesc + cmd.command.ProposerLeaseSequence = status.Lease.Sequence + tc.repl.insertProposalLocked(cmd) chs = append(chs, cmd.doneCh) tc.repl.mu.Unlock() tc.repl.raftMu.Unlock() @@ -7471,7 +7461,9 @@ func TestReplicaLeaseReproposal(t *testing.T) { // Decrement the MaxLeaseIndex. If refreshProposalsLocked didn't know that // MaxLeaseIndex doesn't matter for lease requests, it might be tempted to // end the proposal early (since it would assume that it couldn't commit). - repl.insertProposalLocked(proposal, repDesc, lease) + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = lease.Sequence + repl.insertProposalLocked(proposal) proposal.command.MaxLeaseIndex = ai - 1 repl.refreshProposalsLocked(1 /* delta */, reasonTicks) select { @@ -7562,7 +7554,9 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { dropProposals.Unlock() r.mu.Lock() - r.insertProposalLocked(cmd, repDesc, lease) + cmd.command.ProposerReplica = repDesc + cmd.command.ProposerLeaseSequence = lease.Sequence + r.insertProposalLocked(cmd) if err := r.submitProposalLocked(cmd); err != nil { t.Error(err) } @@ -7607,6 +7601,149 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } } +// TestReplicaRefreshMultiple tests an interaction between refreshing +// proposals after a new leader or ticks (which results in multiple +// copies in the log with the same lease index) and refreshing after +// an illegal lease index error (with a new lease index assigned). +// +// The setup here is rather artificial, but it represents something +// that can happen (very rarely) in the real world with multiple raft +// leadership transfers. +func TestReplicaRefreshMultiple(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + var filterActive int32 + var incCmdID storagebase.CmdIDKey + var incApplyCount int64 + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) { + if atomic.LoadInt32(&filterActive) != 0 && filterArgs.CmdID == incCmdID { + atomic.AddInt64(&incApplyCount, 1) + } + return 0, nil + } + var tc testContext + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, tsc) + repl := tc.repl + + repDesc, err := repl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + + key := roachpb.Key("a") + + // Run a few commands first: This advances the lease index, which is + // necessary for the tricks we're going to play to induce failures + // (we need to be able to subtract from the current lease index + // without going below 0). + for i := 0; i < 3; i++ { + inc := incrementArgs(key, 1) + if _, pErr := client.SendWrapped(ctx, tc.Sender(), &inc); pErr != nil { + t.Fatal(pErr) + } + } + // Sanity check the resulting value. + get := getArgs(key) + if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { + t.Fatal(pErr) + } else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatalf("returned non-int: %s", err) + } else if x != 3 { + t.Fatalf("expected 3, got %d", x) + } + + // Manually propose another increment. This is the one we'll + // manipulate into failing. (the use of increment here is not + // significant. I originally wrote it this way because I thought the + // non-idempotence of increment would make it easier to test, but + // since the reproposals we're concerned with don't result in + // reevaluation it doesn't matter) + inc := incrementArgs(key, 1) + var ba roachpb.BatchRequest + ba.Add(&inc) + ba.Timestamp = tc.Clock().Now() + + incCmdID = makeIDKey() + atomic.StoreInt32(&filterActive, 1) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, nil, &allSpans) + if pErr != nil { + t.Fatal(pErr) + } + // Save this channel; it may get reset to nil before we read from it. + proposalDoneCh := proposal.doneCh + + // Propose the command manually with errors induced. + func() { + repl.mu.Lock() + defer repl.mu.Unlock() + ai := repl.mu.state.LeaseAppliedIndex + if ai <= 1 { + // Lease index zero is special in this test because we subtract + // from it below, so we need enough previous proposals in the + // log to ensure it doesn't go negative. + t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) + } + // Manually run parts of the proposal process. Insert it, then + // tweak the lease index to ensure it will generate a retry when + // it fails. Don't call submitProposal (to simulate a dropped + // message). Then call refreshProposals twice to repropose it and + // put it in the logs twice. (submit + refresh should be + // equivalent to the double refresh used here) + // + // Note that all of this is under the same lock so there's no + // chance of it applying and exiting the pending state before we + // refresh. + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = repl.mu.state.Lease.Sequence + repl.insertProposalLocked(proposal) + proposal.command.MaxLeaseIndex = ai - 1 + repl.refreshProposalsLocked(0, reasonNewLeader) + repl.refreshProposalsLocked(0, reasonNewLeader) + }() + + // Wait for our proposal to apply. The two refreshed proposals above + // will fail due to their illegal lease index. Then they'll generate + // a reproposal (in the bug that we're testing against, they'd + // *each* generate a reproposal). When this reproposal succeeds, the + // doneCh is signaled. + select { + case resp := <-proposalDoneCh: + if resp.Err != nil { + t.Fatal(resp.Err) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out") + } + // In the buggy case, there's a second reproposal that we don't have + // a good way to observe, so just sleep to let it apply if it's in + // the system. + time.Sleep(10 * time.Millisecond) + + // The command applied exactly once. Note that this check would pass + // even in the buggy case, since illegal lease index proposals do + // not generate reevaluations (and increment is handled upstream of + // raft). + if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { + t.Fatal(pErr) + } else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatalf("returned non-int: %s", err) + } else if x != 4 { + t.Fatalf("expected 4, got %d", x) + } + + // The real test: our apply filter can tell us whether there was a + // duplicate reproposal. (A reproposed increment isn't harmful, but + // some other commands could be) + if x := atomic.LoadInt64(&incApplyCount); x != 1 { + t.Fatalf("expected 1, got %d", x) + } +} + // TestGCWithoutThreshold validates that GCRequest only declares the threshold // keys which are subject to change, and that it does not access these keys if // it does not declare them. @@ -8524,7 +8661,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { } exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.propose( + ch, _, _, pErr := repl.evalAndPropose( context.Background(), exLease, ba, nil /* endCmds */, &allSpans, ) if pErr != nil { @@ -8571,7 +8708,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.propose( + ch, _, _, pErr := repl.evalAndPropose( context.Background(), exLease, ba, nil /* endCmds */, &allSpans, ) if pErr != nil { @@ -8636,7 +8773,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { atomic.StoreInt32(&filterActive, 1) exLease, _ := repl.GetLease() - _, _, _, pErr := repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + _, _, _, pErr := repl.evalAndPropose(ctx, exLease, ba, nil /* endCmds */, &allSpans) if pErr != nil { t.Fatal(pErr) } @@ -8654,7 +8791,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba2.Timestamp = tc.Clock().Now() var pErr *roachpb.Error - ch, _, _, pErr = repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + ch, _, _, pErr = repl.evalAndPropose(ctx, exLease, ba, nil /* endCmds */, &allSpans) if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index 82c6be7c2d5c..32e293dae04e 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -39,31 +39,6 @@ import ( // alternative, submitting requests to Raft one after another, paying massive // latency, is only taken for commands whose effects may overlap. // -// Internally, multiple iterations of the above process may take place -// due to the Raft proposal failing retryably, possibly due to proposal -// reordering or re-proposals. We call these retry "re-evaluations" since the -// request is evaluated again (against a fresh engine snapshot). -func (r *Replica) executeWriteBatch( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - for { - // TODO(andrei): export some metric about re-evaluations. - br, pErr, retry := r.tryExecuteWriteBatch(ctx, ba) - if retry == proposalIllegalLeaseIndex { - log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") - if pErr != nil { - log.Fatalf(ctx, "both error and retry returned: %s", pErr) - } - continue // retry - } - return br, pErr - } -} - -// tryExecuteWriteBatch is invoked by executeWriteBatch, which will -// call this method until it returns a non-retryable result (i.e. no -// proposalRetryReason is returned). -// // Concretely, // // - Latches for the keys affected by the command are acquired (i.e. @@ -87,18 +62,18 @@ func (r *Replica) executeWriteBatch( // NB: changing BatchRequest to a pointer here would have to be done cautiously // as this method makes the assumption that it operates on a shallow copy (see // call to applyTimestampCache). -func (r *Replica) tryExecuteWriteBatch( +func (r *Replica) executeWriteBatch( ctx context.Context, ba roachpb.BatchRequest, -) (br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalReevaluationReason) { +) (br *roachpb.BatchResponse, pErr *roachpb.Error) { startTime := timeutil.Now() if err := r.maybeBackpressureWriteBatch(ctx, ba); err != nil { - return nil, roachpb.NewError(err), proposalNoReevaluation + return nil, roachpb.NewError(err) } spans, err := r.collectSpans(&ba) if err != nil { - return nil, roachpb.NewError(err), proposalNoReevaluation + return nil, roachpb.NewError(err) } var endCmds *endCmds @@ -111,7 +86,7 @@ func (r *Replica) tryExecuteWriteBatch( var err error endCmds, err = r.beginCmds(ctx, &ba, spans) if err != nil { - return nil, roachpb.NewError(err), proposalNoReevaluation + return nil, roachpb.NewError(err) } } @@ -119,7 +94,7 @@ func (r *Replica) tryExecuteWriteBatch( // wrapped to delay pErr evaluation to its value when returning. defer func() { if endCmds != nil { - endCmds.done(br, pErr, retry) + endCmds.done(br, pErr) } }() @@ -132,7 +107,7 @@ func (r *Replica) tryExecuteWriteBatch( // Other write commands require that this replica has the range // lease. if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { - return nil, pErr, proposalNoReevaluation + return nil, pErr } lease = status.Lease } @@ -146,7 +121,7 @@ func (r *Replica) tryExecuteWriteBatch( // forward. Or, in the case of a transactional write, the txn // timestamp and possible write-too-old bool. if bumped, pErr := r.applyTimestampCache(ctx, &ba, minTS); pErr != nil { - return nil, pErr, proposalNoReevaluation + return nil, pErr } else if bumped { // If we bump the transaction's timestamp, we must absolutely // tell the client in a response transaction (for otherwise it @@ -168,7 +143,7 @@ func (r *Replica) tryExecuteWriteBatch( log.Event(ctx, "applied timestamp cache") - ch, tryAbandon, maxLeaseIndex, pErr := r.propose(ctx, lease, ba, endCmds, spans) + ch, tryAbandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, lease, ba, endCmds, spans) if pErr != nil { if maxLeaseIndex != 0 { log.Fatalf( @@ -176,7 +151,7 @@ func (r *Replica) tryExecuteWriteBatch( maxLeaseIndex, ba, pErr, ) } - return nil, pErr, proposalNoReevaluation + return nil, pErr } // A max lease index of zero is returned when no proposal was made or a lease was proposed. // In both cases, we don't need to communicate a MLAI. @@ -218,7 +193,7 @@ func (r *Replica) tryExecuteWriteBatch( log.Warning(ctx, err) } } - return propResult.Reply, propResult.Err, propResult.ProposalRetry + return propResult.Reply, propResult.Err case <-slowTimer.C: slowTimer.Read = true log.Warningf(ctx, `have been waiting %.2fs for proposing command %s. @@ -242,11 +217,10 @@ and the following Raft status: %+v`, r.store.metrics.SlowRaftRequests.Dec(1) log.Infof( ctx, - "slow command %s finished after %.2fs with error %v, retry %d", + "slow command %s finished after %.2fs with error %v", ba, timeutil.Since(tBegin).Seconds(), pErr, - retry, ) }() @@ -259,7 +233,7 @@ and the following Raft status: %+v`, if tryAbandon() { log.VEventf(ctx, 2, "context cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) - return nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())), proposalNoReevaluation + return nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())) } ctxDone = nil case <-shouldQuiesce: @@ -272,7 +246,7 @@ and the following Raft status: %+v`, if tryAbandon() { log.VEventf(ctx, 2, "shutdown cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) - return nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")), proposalNoReevaluation + return nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")) } shouldQuiesce = nil } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 56740e7b0f0f..90fb1e4ef0c8 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -726,7 +726,7 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { t.Fatal("replica was not marked as destroyed") } - if _, _, _, pErr := repl1.propose( + if _, _, _, pErr := repl1.evalAndPropose( context.Background(), lease, roachpb.BatchRequest{}, nil, &allSpans, ); !pErr.Equal(expErr) { t.Fatalf("expected error %s, but got %v", expErr, pErr) diff --git a/pkg/storage/track_raft_protos.go b/pkg/storage/track_raft_protos.go index eb7cedd812ae..4fa806cba371 100644 --- a/pkg/storage/track_raft_protos.go +++ b/pkg/storage/track_raft_protos.go @@ -52,6 +52,9 @@ func TrackRaftProtos() func() []reflect.Type { // but tombstones are unreplicated and thus not subject to the strict // consistency requirements. funcName((*Replica).setTombstoneKey), + // tryReproposeWithNewLeaseIndex is only run on the replica that + // proposed the command. + funcName((*Replica).tryReproposeWithNewLeaseIndex), } belowRaftProtos := struct {