diff --git a/pkg/kv/kvnemesis/BUILD.bazel b/pkg/kv/kvnemesis/BUILD.bazel index 914d7535a83e..3dbf70de10ef 100644 --- a/pkg/kv/kvnemesis/BUILD.bazel +++ b/pkg/kv/kvnemesis/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/storage", "//pkg/util/bufalloc", diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index c45f963f42e2..0407c7b9971b 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -17,6 +17,7 @@ import ( "sort" "strings" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -328,39 +329,14 @@ func (v *validator) processOp(txnID *string, op Operation) { v.failIfError(op, t.Result) } case *ChangeReplicasOperation: - if resultIsError(t.Result, `unable to add replica .* which is already present in`) { - // Generator created this operations based on data about a range's - // replicas that is now stale (because it raced with some other operation - // created by that Generator): a replica is being added and in the - // meantime, some other operation added the same replica. - } else if resultIsError(t.Result, `unable to add replica .* which is already present as a learner`) { - // Generator created this operations based on data about a range's - // replicas that is now stale (because it raced with some other operation - // created by that Generator): a replica is being added and in the - // meantime, some other operation started (but did not finish) adding the - // same replica. - } else if resultIsError(t.Result, `descriptor changed`) { - // Race between two operations being executed concurrently. Applier grabs - // a range descriptor and then calls AdminChangeReplicas with it, but the - // descriptor is changed by some other operation in between. - } else if resultIsError(t.Result, `received invalid ChangeReplicasTrigger .* to remove self \(leaseholder\)`) { - // Removing the leaseholder is invalid for technical reasons, but - // Generator intentiontally does not try to avoid this so that this edge - // case is exercised. - } else if resultIsError(t.Result, `removing .* which is not in`) { - // Generator created this operations based on data about a range's - // replicas that is now stale (because it raced with some other operation - // created by that Generator): a replica is being removed and in the - // meantime, some other operation removed the same replica. - } else if resultIsError(t.Result, `remote failed to apply snapshot for reason failed to apply snapshot: raft group deleted`) { - // Probably should be transparently retried. - } else if resultIsError(t.Result, `remote failed to apply snapshot for reason failed to apply snapshot: .* cannot add placeholder, have an existing placeholder`) { - // Probably should be transparently retried. - } else if resultIsError(t.Result, `cannot apply snapshot: snapshot intersects existing range`) { - // Probably should be transparently retried. - } else if resultIsError(t.Result, `snapshot of type LEARNER was sent to .* which did not contain it as a replica`) { - // Probably should be transparently retried. - } else { + var ignore bool + if t.Result.Type == ResultType_Error { + ctx := context.Background() + err := errors.DecodeError(ctx, *t.Result.Err) + ignore = kvserver.IsRetriableReplicationChangeError(err) || + kvserver.IsIllegalReplicationChangeError(err) + } + if !ignore { v.failIfError(op, t.Result) } case *BatchOperation: diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index ebd5948a781c..91765e07886c 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "gc_queue.go", "lease_history.go", "log.go", + "markers.go", "merge_queue.go", "metrics.go", "queue.go", diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 04cac5d84f0c..77a86a6d58f4 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1983,7 +1983,7 @@ func TestStoreRangeMergeAddReplicaRace(t *testing.T) { const acceptableMergeErr = `unexpected value: raw_bytes|ranges not collocated` + `|cannot merge range with non-voter replicas` - if mergeErr == nil && testutils.IsError(addErr, `descriptor changed: \[expected\]`) { + if mergeErr == nil && kvserver.IsRetriableReplicationChangeError(addErr) { // Merge won the race, no add happened. require.Len(t, afterDesc.Replicas().Voters(), 1) require.Equal(t, origDesc.EndKey, afterDesc.EndKey) @@ -2030,7 +2030,7 @@ func TestStoreRangeMergeResplitAddReplicaRace(t *testing.T) { _, err := tc.Server(0).DB().AdminChangeReplicas( ctx, scratchStartKey, origDesc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1))) - if !testutils.IsError(err, `descriptor changed`) { + if !kvserver.IsRetriableReplicationChangeError(err) { t.Fatalf(`expected "descriptor changed" error got: %+v`, err) } } diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index ff12b7010f23..8b8a6cc0b2de 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -1998,10 +1998,11 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA func testReplicaAddRemove(t *testing.T, addFirst bool) { sc := kvserver.TestStoreConfig(nil) - // We're gonna want to validate the state of the store before and after the - // replica GC queue does its work, so we disable the replica gc queue here - // and run it manually when we're ready. + // We're gonna want to validate the state of the store before and + // after the replica GC queue does its work, so we disable the + // replica gc queue here and run it manually when we're ready. sc.TestingKnobs.DisableReplicaGCQueue = true + sc.TestingKnobs.DisableReplicateQueue = true sc.TestingKnobs.DisableEagerReplicaRemoval = true sc.Clock = nil // manual clock mtc := &multiTestContext{ @@ -2852,7 +2853,7 @@ func TestRemovePlaceholderRace(t *testing.T) { StoreID: mtc.stores[1].Ident.StoreID, }) if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonUnknown, "", chgs); err != nil { - if kvserver.IsSnapshotError(err) { + if kvserver.IsRetriableReplicationChangeError(err) { continue } else { t.Fatal(err) @@ -4786,6 +4787,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // Newly-started stores (including the "rogue" one) should not GC // their replicas. We'll turn this back on when needed. sc.TestingKnobs.DisableReplicaGCQueue = true + sc.TestingKnobs.DisableReplicateQueue = true sc.RaftDelaySplitToSuppressSnapshotTicks = 0 // Make the tick interval short so we don't need to wait too long for the // partitioned leader to time out. Also make the @@ -4959,8 +4961,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // and will be rolled back. Nevertheless it will have learned that it // has been removed at the old replica ID. err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) - require.True(t, - testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err) + require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) // Without a partitioned RHS we'll end up always writing a tombstone here because // the RHS will be created at the initial replica ID because it will get @@ -5008,8 +5009,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // and will be rolled back. Nevertheless it will have learned that it // has been removed at the old replica ID. err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) - require.True(t, - testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err) + require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) // Without a partitioned RHS we'll end up always writing a tombstone here because // the RHS will be created at the initial replica ID because it will get @@ -5078,8 +5078,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // and will be rolled back. Nevertheless it will have learned that it // has been removed at the old replica ID. err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) - require.True(t, - testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err) + require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) // Ensure that the replica exists with the higher replica ID. repl, err := mtc.Store(0).GetReplica(rhsInfo.Desc.RangeID) require.NoError(t, err) @@ -5135,8 +5134,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // and will be rolled back. Nevertheless it will have learned that it // has been removed at the old replica ID. err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0) - require.True(t, - testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err) + require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) // Ensure that there's no tombstone. // The RHS on store 0 never should have heard about its original ID. ensureNoTombstone(t, mtc.Store(0), rhsID) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 6af61af1216a..a88d1349b27b 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2240,9 +2240,7 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) { var gotSuccess bool for _, err := range errors { if err != nil { - const exp = "change replicas of .* failed: descriptor changed" + - "|snapshot failed:" - assert.True(t, testutils.IsError(err, exp), err) + assert.True(t, kvserver.IsRetriableReplicationChangeError(err), err) } else if gotSuccess { t.Error("expected only one success") } else { diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 0b9a8337510e..fe4a2700f90b 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -3172,7 +3172,7 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { }) close(blockPromoteCh) - if err := g.Wait(); !testutils.IsError(err, `descriptor changed`) { + if err := g.Wait(); !kvserver.IsRetriableReplicationChangeError(err) { t.Fatalf(`expected "descriptor changed" error got: %+v`, err) } @@ -3184,8 +3184,8 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { // has not heard a raft message addressed to a later replica ID while the // "was not found on" error is expected if the store has heard that it has // a newer replica ID before receiving the snapshot. - if !testutils.IsError(err, `snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+`) { - t.Fatalf(`expected snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+" error got: %+v`, err) + if !kvserver.IsRetriableReplicationChangeError(err) { + t.Fatal(err) } } for i := 0; i < 5; i++ { diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index ac206be59695..411aa2e196b8 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -790,6 +790,10 @@ func (m *multiTestContext) makeStoreConfig(i int) kvserver.StoreConfig { cfg.TestingKnobs.DisableMergeQueue = true cfg.TestingKnobs.DisableSplitQueue = true cfg.TestingKnobs.ReplicateQueueAcceptsUnsplit = true + // The mtc does not populate the allocator's store pool well and so + // the check never sees any live replicas. + cfg.TestingKnobs.AllowDangerousReplicationChanges = true + return cfg } @@ -1242,10 +1246,7 @@ func (m *multiTestContext) changeReplicas( continue } - // We can't use storage.IsSnapshotError() because the original error object - // is lost. We could make a this into a roachpb.Error but it seems overkill - // for this one usage. - if testutils.IsError(err, "snapshot failed: .*|descriptor changed") { + if kvserver.IsRetriableReplicationChangeError(err) { log.Infof(ctx, "%v", err) continue } diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go new file mode 100644 index 000000000000..d2ca9e0e6657 --- /dev/null +++ b/pkg/kv/kvserver/markers.go @@ -0,0 +1,69 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" +) + +// NB: don't change the string here; this will cause cross-version issues +// since this singleton is used as a marker. +var errMarkSnapshotError = errors.New("snapshot failed") + +// isSnapshotError returns true iff the error indicates that a snapshot failed. +func isSnapshotError(err error) bool { + return errors.Is(err, errMarkSnapshotError) +} + +// NB: don't change the string here; this will cause cross-version issues +// since this singleton is used as a marker. +var errMarkCanRetryReplicationChangeWithUpdatedDesc = errors.New("should retry with updated descriptor") + +// IsRetriableReplicationChangeError detects whether an error (which is +// assumed to have been emitted by the KV layer during a replication change +// operation) is likely to succeed when retried with an updated descriptor. +func IsRetriableReplicationChangeError(err error) bool { + return errors.Is(err, errMarkCanRetryReplicationChangeWithUpdatedDesc) || isSnapshotError(err) +} + +const ( + descChangedRangeSubsumedErrorFmt = "descriptor changed: expected %s != [actual] nil (range subsumed)" + descChangedErrorFmt = "descriptor changed: [expected] %s != [actual] %s" +) + +func newDescChangedError(desc, actualDesc *roachpb.RangeDescriptor) error { + if actualDesc == nil { + return errors.Mark(errors.Newf(descChangedRangeSubsumedErrorFmt, desc), errMarkCanRetryReplicationChangeWithUpdatedDesc) + } + return errors.Mark(errors.Newf(descChangedErrorFmt, desc, actualDesc), errMarkCanRetryReplicationChangeWithUpdatedDesc) +} + +func wrapDescChangedError(err error, desc, actualDesc *roachpb.RangeDescriptor) error { + if actualDesc == nil { + return errors.Mark(errors.Wrapf(err, descChangedRangeSubsumedErrorFmt, desc), errMarkCanRetryReplicationChangeWithUpdatedDesc) + } + return errors.Mark(errors.Wrapf(err, descChangedErrorFmt, desc, actualDesc), errMarkCanRetryReplicationChangeWithUpdatedDesc) +} + +// NB: don't change the string here; this will cause cross-version issues +// since this singleton is used as a marker. +var errMarkInvalidReplicationChange = errors.New("invalid replication change") + +// IsIllegalReplicationChangeError detects whether an error (assumed to have been emitted +// by a replication change) indicates that the replication change is illegal, meaning +// that it's unlikely to be handled through a retry. Examples of this are attempts to add +// a store that is already a member of the supplied descriptor. A non-example is a change +// detected in the descriptor at the KV layer relative to that supplied as input to the +// replication change, which would likely benefit from a retry. +func IsIllegalReplicationChangeError(err error) bool { + return errors.Is(err, errMarkInvalidReplicationChange) +} diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 89d231acd475..2d78aed15449 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -77,25 +77,6 @@ func maybeDescriptorChangedError( return false, nil } -const ( - descChangedRangeSubsumedErrorFmt = "descriptor changed: expected %s != [actual] nil (range subsumed)" - descChangedErrorFmt = "descriptor changed: [expected] %s != [actual] %s" -) - -func newDescChangedError(desc, actualDesc *roachpb.RangeDescriptor) error { - if actualDesc == nil { - return errors.Newf(descChangedRangeSubsumedErrorFmt, desc) - } - return errors.Newf(descChangedErrorFmt, desc, actualDesc) -} - -func wrapDescChangedError(err error, desc, actualDesc *roachpb.RangeDescriptor) error { - if actualDesc == nil { - return errors.Wrapf(err, descChangedRangeSubsumedErrorFmt, desc) - } - return errors.Wrapf(err, descChangedErrorFmt, desc, actualDesc) -} - func splitSnapshotWarningStr(rangeID roachpb.RangeID, status *raft.Status) string { var s string if status != nil && status.RaftState == raft.StateLeader { @@ -849,20 +830,6 @@ func waitForReplicasInit( }) } -type snapshotError struct { - // NB: don't implement Cause() on this type without also updating IsSnapshotError. - cause error -} - -func (s *snapshotError) Error() string { - return fmt.Sprintf("snapshot failed: %s", s.cause.Error()) -} - -// IsSnapshotError returns true iff the error indicates a snapshot failed. -func IsSnapshotError(err error) bool { - return errors.HasType(err, (*snapshotError)(nil)) -} - // ChangeReplicas atomically changes the replicas that are members of a range. // The change is performed in a distributed transaction and takes effect when // that transaction is committed. This transaction confirms that the supplied @@ -1207,17 +1174,21 @@ func validateReplicationChanges( // interrupted or else we hit a race between the replicate queue and // AdminChangeReplicas. if rDesc.GetType() == roachpb.LEARNER { - return errors.Errorf( - "unable to add replica %v which is already present as a learner in %s", chg.Target, desc) + return errors.Mark(errors.Errorf( + "unable to add replica %v which is already present as a learner in %s", chg.Target, desc), + errMarkInvalidReplicationChange) } if rDesc.GetType() == roachpb.NON_VOTER { - return errors.Errorf( - "unable to add replica %v which is already present as a non-voter in %s", chg.Target, desc) + return errors.Mark(errors.Errorf( + "unable to add replica %v which is already present as a non-voter in %s", chg.Target, desc), + errMarkInvalidReplicationChange) } // Otherwise, we already had a full voter replica. Can't add another to // this store. - return errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc) + return errors.Mark( + errors.Errorf("unable to add replica %v which is already present in %s", chg.Target, desc), + errMarkInvalidReplicationChange) } for _, chg := range byStoreID { @@ -1225,10 +1196,14 @@ func validateReplicationChanges( // when the newly added one would be on a different store. if chg.ChangeType.IsAddition() { if len(desc.Replicas().All()) > 1 { - return errors.Errorf("unable to add replica %v; node already has a replica in %s", chg.Target.StoreID, desc) + return errors.Mark( + errors.Errorf("unable to add replica %v; node already has a replica in %s", chg.Target.StoreID, desc), + errMarkInvalidReplicationChange) } } else { - return errors.Errorf("removing %v which is not in %s", chg.Target, desc) + return errors.Mark( + errors.Errorf("removing %v which is not in %s", chg.Target, desc), + errMarkInvalidReplicationChange) } } } @@ -1239,7 +1214,7 @@ func validateReplicationChanges( if !chg.ChangeType.IsRemoval() { continue } - return errors.Errorf("removing %v which is not in %s", chg.Target, desc) + return errors.Mark(errors.Errorf("removing %v which is not in %s", chg.Target, desc), errMarkInvalidReplicationChange) } } return nil @@ -1661,6 +1636,41 @@ func execChangeReplicasTxn( } log.Infof(ctx, "change replicas (add %v remove %v): existing descriptor %s", crt.Added(), crt.Removed(), desc) + for maxAttempts, i := 10, 0; i < maxAttempts; i++ { + if knobs := store.TestingKnobs(); knobs != nil && knobs.AllowDangerousReplicationChanges { + break + } + // Run (and retry for a bit) a sanity check that the configuration + // resulting from this change is able to meet quorum. It's + // important to do this at this low layer as there are multiple + // entry points that are not generally too careful. For example, + // before the below check existed, the store rebalancer could + // carry out operations that would lead to a loss of quorum. + // + // See: + // https://github.com/cockroachdb/cockroach/issues/54444#issuecomment-707706553 + replicas := crt.Desc.Replicas() + liveReplicas, _ := store.allocator.storePool.liveAndDeadReplicas(replicas.All()) + if !crt.Desc.Replicas().CanMakeProgress( + func(rDesc roachpb.ReplicaDescriptor) bool { + for _, inner := range liveReplicas { + if inner.ReplicaID == rDesc.ReplicaID { + return true + } + } + return false + }) { + // NB: we use newQuorumError which is recognized by the replicate queue. + err := newQuorumError("range %s cannot make progress with proposed changes add=%v del=%v "+ + "based on live replicas %v", crt.Desc, crt.Added(), crt.Removed(), liveReplicas) + if i == maxAttempts-1 { + return err + } + log.Infof(ctx, "%s; retrying", err) + time.Sleep(time.Second) + } + } + { b := txn.NewBatch() @@ -1982,7 +1992,7 @@ func (r *Replica) sendSnapshot( log.Fatal(ctx, "malformed snapshot generated") } - return &snapshotError{err} + return errors.Mark(err, errMarkSnapshotError) } return nil } @@ -2448,7 +2458,8 @@ func (r *Replica) adminScatter( for re := retry.StartWithCtx(ctx, retryOpts); re.Next(); { requeue, err := rq.processOneChange(ctx, r, canTransferLease, false /* dryRun */) if err != nil { - if IsSnapshotError(err) { + // TODO(tbg): can this use IsRetriableReplicationError? + if isSnapshotError(err) { continue } break diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index d3374b25a707..63c3a110661b 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -396,8 +396,8 @@ func TestSplitWithLearnerOrJointConfig(t *testing.T) { desc, err := tc.AddVoters(right.StartKey.AsRawKey(), tc.Target(1)) if err == nil { right = desc - } else if !testutils.IsError(err, "cannot apply snapshot: snapshot intersects existing range") { - t.Fatal(err) + } else { + require.True(t, kvserver.IsRetriableReplicationChangeError(err), err) } return err }) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 1b0f0d89e703..99cae0ee4884 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -264,12 +264,12 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (index int64, pE replID := r.ReplicaID() for _, rDesc := range crt.Removed() { if rDesc.ReplicaID == replID { - msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt) - log.Errorf(p.ctx, "%v", msg) - return 0, roachpb.NewErrorf("%s: %s", r, msg) + err := errors.Mark(errors.Newf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt), + errMarkInvalidReplicationChange) + log.Errorf(p.ctx, "%v", err) + return 0, roachpb.NewError(err) } } - } else if p.command.ReplicatedEvalResult.AddSSTable != nil { log.VEvent(p.ctx, 4, "sideloadable proposal detected") version = raftVersionSideloaded diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 058932918b52..c0985db4f1fe 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -271,7 +271,7 @@ func (rq *replicateQueue) process( for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { for { requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLease, false /* dryRun */) - if IsSnapshotError(err) { + if isSnapshotError(err) { // If ChangeReplicas failed because the snapshot failed, we log the // error but then return success indicating we should retry the // operation. The most likely causes of the snapshot failing are a @@ -322,23 +322,10 @@ func (rq *replicateQueue) processOneChange( // quorum. voterReplicas := desc.Replicas().Voters() liveVoterReplicas, deadVoterReplicas := rq.allocator.storePool.liveAndDeadReplicas(voterReplicas) - { - unavailable := !desc.Replicas().CanMakeProgress(func(rDesc roachpb.ReplicaDescriptor) bool { - for _, inner := range liveVoterReplicas { - if inner.ReplicaID == rDesc.ReplicaID { - return true - } - } - return false - }) - if unavailable { - return false, newQuorumError( - "range requires a replication change, but live replicas %v don't constitute a quorum for %v:", - liveVoterReplicas, - desc.Replicas().All(), - ) - } - } + + // NB: the replication layer ensures that the below operations don't cause + // unavailability; see: + _ = execChangeReplicasTxn action, _ := rq.allocator.ComputeAction(ctx, zone, desc) log.VEventf(ctx, 1, "next replica action: %s", action) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index dcad8b9988da..2fdc13fb1706 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -255,6 +255,10 @@ type StoreTestingKnobs struct { // heartbeats and then expect other replicas to take the lease without // worrying about Raft). AllowLeaseRequestProposalsWhenNotLeader bool + // AllowDangerousReplicationChanges disables safeguards + // in execChangeReplicasTxn that prevent moving + // to a configuration that cannot make progress. + AllowDangerousReplicationChanges bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/kv/test_utils.go b/pkg/kv/test_utils.go index faf3c022b33d..ef82f6e7f069 100644 --- a/pkg/kv/test_utils.go +++ b/pkg/kv/test_utils.go @@ -43,6 +43,10 @@ func OnlyFollowerReads(rec tracing.Recording) bool { // https://github.cm/cockroachdb/cockroach/issues/34012 // https://github.com/cockroachdb/cockroach/issues/33683#issuecomment-454889149 // for more failure modes not caught here. +// +// Note that whenever possible, callers should rely on +// kvserver.Is{Retryable,Illegal}ReplicationChangeError, +// which avoids string matching. func IsExpectedRelocateError(err error) bool { allowlist := []string{ "descriptor changed",