Skip to content

Commit

Permalink
kvserverpb: move quorum safeguard into execChangeReplicasTxn
Browse files Browse the repository at this point in the history
This used to live in the replicate queue, but there are other
entry points to replication changes, notably the store rebalancer
which caused #54444.

Move the check in the guts of replication changes where it is
guaranteed to be invoked.

Fixes #50729
Touches #54444 (release-20.2)

Release note (bug fix): in rare situations, an automated replication
change could result in a loss of quorum. This would require down nodes
and a simultaneous change in the replication factor. Note that a change
in the replication factor can occur automatically if the cluster is
comprised of less than five available nodes. Experimentally the likeli-
hood of encountering this issue, even under contrived conditions, was
small.
  • Loading branch information
tbg committed Dec 3, 2020
1 parent 677f6f8 commit 5178559
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 124 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/util/bufalloc",
Expand Down
42 changes: 9 additions & 33 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -328,39 +329,14 @@ func (v *validator) processOp(txnID *string, op Operation) {
v.failIfError(op, t.Result)
}
case *ChangeReplicasOperation:
if resultIsError(t.Result, `unable to add replica .* which is already present in`) {
// Generator created this operations based on data about a range's
// replicas that is now stale (because it raced with some other operation
// created by that Generator): a replica is being added and in the
// meantime, some other operation added the same replica.
} else if resultIsError(t.Result, `unable to add replica .* which is already present as a learner`) {
// Generator created this operations based on data about a range's
// replicas that is now stale (because it raced with some other operation
// created by that Generator): a replica is being added and in the
// meantime, some other operation started (but did not finish) adding the
// same replica.
} else if resultIsError(t.Result, `descriptor changed`) {
// Race between two operations being executed concurrently. Applier grabs
// a range descriptor and then calls AdminChangeReplicas with it, but the
// descriptor is changed by some other operation in between.
} else if resultIsError(t.Result, `received invalid ChangeReplicasTrigger .* to remove self \(leaseholder\)`) {
// Removing the leaseholder is invalid for technical reasons, but
// Generator intentiontally does not try to avoid this so that this edge
// case is exercised.
} else if resultIsError(t.Result, `removing .* which is not in`) {
// Generator created this operations based on data about a range's
// replicas that is now stale (because it raced with some other operation
// created by that Generator): a replica is being removed and in the
// meantime, some other operation removed the same replica.
} else if resultIsError(t.Result, `remote failed to apply snapshot for reason failed to apply snapshot: raft group deleted`) {
// Probably should be transparently retried.
} else if resultIsError(t.Result, `remote failed to apply snapshot for reason failed to apply snapshot: .* cannot add placeholder, have an existing placeholder`) {
// Probably should be transparently retried.
} else if resultIsError(t.Result, `cannot apply snapshot: snapshot intersects existing range`) {
// Probably should be transparently retried.
} else if resultIsError(t.Result, `snapshot of type LEARNER was sent to .* which did not contain it as a replica`) {
// Probably should be transparently retried.
} else {
var ignore bool
if t.Result.Type == ResultType_Error {
ctx := context.Background()
err := errors.DecodeError(ctx, *t.Result.Err)
ignore = kvserver.IsRetriableReplicationChangeError(err) ||
kvserver.IsIllegalReplicationChangeError(err)
}
if !ignore {
v.failIfError(op, t.Result)
}
case *BatchOperation:
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"gc_queue.go",
"lease_history.go",
"log.go",
"markers.go",
"merge_queue.go",
"metrics.go",
"queue.go",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ func TestStoreRangeMergeAddReplicaRace(t *testing.T) {

const acceptableMergeErr = `unexpected value: raw_bytes|ranges not collocated` +
`|cannot merge range with non-voter replicas`
if mergeErr == nil && testutils.IsError(addErr, `descriptor changed: \[expected\]`) {
if mergeErr == nil && kvserver.IsRetriableReplicationChangeError(addErr) {
// Merge won the race, no add happened.
require.Len(t, afterDesc.Replicas().Voters(), 1)
require.Equal(t, origDesc.EndKey, afterDesc.EndKey)
Expand Down Expand Up @@ -2030,7 +2030,7 @@ func TestStoreRangeMergeResplitAddReplicaRace(t *testing.T) {

_, err := tc.Server(0).DB().AdminChangeReplicas(
ctx, scratchStartKey, origDesc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)))
if !testutils.IsError(err, `descriptor changed`) {
if !kvserver.IsRetriableReplicationChangeError(err) {
t.Fatalf(`expected "descriptor changed" error got: %+v`, err)
}
}
Expand Down
22 changes: 10 additions & 12 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1998,10 +1998,11 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA

func testReplicaAddRemove(t *testing.T, addFirst bool) {
sc := kvserver.TestStoreConfig(nil)
// We're gonna want to validate the state of the store before and after the
// replica GC queue does its work, so we disable the replica gc queue here
// and run it manually when we're ready.
// We're gonna want to validate the state of the store before and
// after the replica GC queue does its work, so we disable the
// replica gc queue here and run it manually when we're ready.
sc.TestingKnobs.DisableReplicaGCQueue = true
sc.TestingKnobs.DisableReplicateQueue = true
sc.TestingKnobs.DisableEagerReplicaRemoval = true
sc.Clock = nil // manual clock
mtc := &multiTestContext{
Expand Down Expand Up @@ -2852,7 +2853,7 @@ func TestRemovePlaceholderRace(t *testing.T) {
StoreID: mtc.stores[1].Ident.StoreID,
})
if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonUnknown, "", chgs); err != nil {
if kvserver.IsSnapshotError(err) {
if kvserver.IsRetriableReplicationChangeError(err) {
continue
} else {
t.Fatal(err)
Expand Down Expand Up @@ -4786,6 +4787,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// Newly-started stores (including the "rogue" one) should not GC
// their replicas. We'll turn this back on when needed.
sc.TestingKnobs.DisableReplicaGCQueue = true
sc.TestingKnobs.DisableReplicateQueue = true
sc.RaftDelaySplitToSuppressSnapshotTicks = 0
// Make the tick interval short so we don't need to wait too long for the
// partitioned leader to time out. Also make the
Expand Down Expand Up @@ -4959,8 +4961,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// and will be rolled back. Nevertheless it will have learned that it
// has been removed at the old replica ID.
err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0)
require.True(t,
testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err)
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)

// Without a partitioned RHS we'll end up always writing a tombstone here because
// the RHS will be created at the initial replica ID because it will get
Expand Down Expand Up @@ -5008,8 +5009,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// and will be rolled back. Nevertheless it will have learned that it
// has been removed at the old replica ID.
err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0)
require.True(t,
testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err)
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)

// Without a partitioned RHS we'll end up always writing a tombstone here because
// the RHS will be created at the initial replica ID because it will get
Expand Down Expand Up @@ -5078,8 +5078,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// and will be rolled back. Nevertheless it will have learned that it
// has been removed at the old replica ID.
err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0)
require.True(t,
testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err)
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
// Ensure that the replica exists with the higher replica ID.
repl, err := mtc.Store(0).GetReplica(rhsInfo.Desc.RangeID)
require.NoError(t, err)
Expand Down Expand Up @@ -5135,8 +5134,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
// and will be rolled back. Nevertheless it will have learned that it
// has been removed at the old replica ID.
err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0)
require.True(t,
testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err)
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
// Ensure that there's no tombstone.
// The RHS on store 0 never should have heard about its original ID.
ensureNoTombstone(t, mtc.Store(0), rhsID)
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2240,9 +2240,7 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) {
var gotSuccess bool
for _, err := range errors {
if err != nil {
const exp = "change replicas of .* failed: descriptor changed" +
"|snapshot failed:"
assert.True(t, testutils.IsError(err, exp), err)
assert.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
} else if gotSuccess {
t.Error("expected only one success")
} else {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3172,7 +3172,7 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
})

close(blockPromoteCh)
if err := g.Wait(); !testutils.IsError(err, `descriptor changed`) {
if err := g.Wait(); !kvserver.IsRetriableReplicationChangeError(err) {
t.Fatalf(`expected "descriptor changed" error got: %+v`, err)
}

Expand All @@ -3184,8 +3184,8 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
// has not heard a raft message addressed to a later replica ID while the
// "was not found on" error is expected if the store has heard that it has
// a newer replica ID before receiving the snapshot.
if !testutils.IsError(err, `snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+`) {
t.Fatalf(`expected snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+" error got: %+v`, err)
if !kvserver.IsRetriableReplicationChangeError(err) {
t.Fatal(err)
}
}
for i := 0; i < 5; i++ {
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,10 @@ func (m *multiTestContext) makeStoreConfig(i int) kvserver.StoreConfig {
cfg.TestingKnobs.DisableMergeQueue = true
cfg.TestingKnobs.DisableSplitQueue = true
cfg.TestingKnobs.ReplicateQueueAcceptsUnsplit = true
// The mtc does not populate the allocator's store pool well and so
// the check never sees any live replicas.
cfg.TestingKnobs.AllowDangerousReplicationChanges = true

return cfg
}

Expand Down Expand Up @@ -1242,10 +1246,7 @@ func (m *multiTestContext) changeReplicas(
continue
}

// We can't use storage.IsSnapshotError() because the original error object
// is lost. We could make a this into a roachpb.Error but it seems overkill
// for this one usage.
if testutils.IsError(err, "snapshot failed: .*|descriptor changed") {
if kvserver.IsRetriableReplicationChangeError(err) {
log.Infof(ctx, "%v", err)
continue
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/kv/kvserver/markers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/errors"
)

// NB: don't change the string here; this will cause cross-version issues
// since this singleton is used as a marker.
var errMarkSnapshotError = errors.New("snapshot failed")

// isSnapshotError returns true iff the error indicates that a snapshot failed.
func isSnapshotError(err error) bool {
return errors.Is(err, errMarkSnapshotError)
}

// NB: don't change the string here; this will cause cross-version issues
// since this singleton is used as a marker.
var errMarkCanRetryReplicationChangeWithUpdatedDesc = errors.New("should retry with updated descriptor")

// IsRetriableReplicationChangeError detects whether an error (which is
// assumed to have been emitted by the KV layer during a replication change
// operation) is likely to succeed when retried with an updated descriptor.
func IsRetriableReplicationChangeError(err error) bool {
return errors.Is(err, errMarkCanRetryReplicationChangeWithUpdatedDesc) || isSnapshotError(err)
}

const (
descChangedRangeSubsumedErrorFmt = "descriptor changed: expected %s != [actual] nil (range subsumed)"
descChangedErrorFmt = "descriptor changed: [expected] %s != [actual] %s"
)

func newDescChangedError(desc, actualDesc *roachpb.RangeDescriptor) error {
if actualDesc == nil {
return errors.Mark(errors.Newf(descChangedRangeSubsumedErrorFmt, desc), errMarkCanRetryReplicationChangeWithUpdatedDesc)
}
return errors.Mark(errors.Newf(descChangedErrorFmt, desc, actualDesc), errMarkCanRetryReplicationChangeWithUpdatedDesc)
}

func wrapDescChangedError(err error, desc, actualDesc *roachpb.RangeDescriptor) error {
if actualDesc == nil {
return errors.Mark(errors.Wrapf(err, descChangedRangeSubsumedErrorFmt, desc), errMarkCanRetryReplicationChangeWithUpdatedDesc)
}
return errors.Mark(errors.Wrapf(err, descChangedErrorFmt, desc, actualDesc), errMarkCanRetryReplicationChangeWithUpdatedDesc)
}

// NB: don't change the string here; this will cause cross-version issues
// since this singleton is used as a marker.
var errMarkInvalidReplicationChange = errors.New("invalid replication change")

// IsIllegalReplicationChangeError detects whether an error (assumed to have been emitted
// by a replication change) indicates that the replication change is illegal, meaning
// that it's unlikely to be handled through a retry. Examples of this are attempts to add
// a store that is already a member of the supplied descriptor. A non-example is a change
// detected in the descriptor at the KV layer relative to that supplied as input to the
// replication change, which would likely benefit from a retry.
func IsIllegalReplicationChangeError(err error) bool {
return errors.Is(err, errMarkInvalidReplicationChange)
}
Loading

0 comments on commit 5178559

Please sign in to comment.