diff --git a/Gopkg.lock b/Gopkg.lock
index 0c61d58b7917..d5504e1dffb4 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -1512,7 +1512,7 @@
[[projects]]
branch = "master"
- digest = "1:a5dd907052a1b42bbdf6a9608300143ee14f477bd6232597494203a21aa0fd45"
+ digest = "1:4613f5fc59e17fb05c86b663d7d3e52c8bbefa50aeaeaab53ab10b33d9819457"
name = "go.etcd.io/etcd"
packages = [
"raft",
@@ -1522,7 +1522,7 @@
"raft/tracker",
]
pruneopts = "UT"
- revision = "d137fa9d4ad7aa242f6fed04186a700ce082fdda"
+ revision = "a41bd303ec00ef3621f080e0ef12fe3d6924653c"
[[projects]]
digest = "1:3b5a3bc35810830ded5e26ef9516e933083a2380d8e57371fdfde3c70d7c6952"
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 5cc133424d0a..9b2c4ae94947 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -38,7 +38,7 @@
kv.closed_timestamp.target_duration | duration | 30s | if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration |
kv.follower_read.target_multiple | float | 3 | if above 1, encourages the distsender to perform a read against the closest replica if a request is older than kv.closed_timestamp.target_duration * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty interval. This value also is used to create follower_timestamp(). (WARNING: may compromise cluster stability or correctness; do not edit without supervision) |
kv.import.batch_size | byte size | 32 MiB | the maximum size of the payload in an AddSSTable request (WARNING: may compromise cluster stability or correctness; do not edit without supervision) |
-kv.learner_replicas.enabled | boolean | false | use learner replicas for replica addition |
+kv.learner_replicas.enabled | boolean | true | use learner replicas for replica addition |
kv.raft.command.max_size | byte size | 64 MiB | maximum size of a raft command |
kv.raft_log.disable_synchronization_unsafe | boolean | false | set to true to disable synchronization on Raft log writes to persistent storage. Setting to true risks data loss or data corruption on server crashes. The setting is meant for internal testing only and SHOULD NOT be used in production. |
kv.range.backpressure_range_size_multiplier | float | 2 | multiple of range_max_bytes that a range is allowed to grow to without splitting before writes to that range are blocked, or 0 to disable |
diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go
index 57be1f34e4a5..117174dfe94d 100644
--- a/pkg/roachpb/metadata_replicas.go
+++ b/pkg/roachpb/metadata_replicas.go
@@ -109,8 +109,8 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
// - Learner replicas cannot become raft leaders, so we also don't allow them to
// become leaseholders. As a result, DistSender and the various oracles don't
// try to send them traffic.
-// - The raft snapshot queue does not send snapshots to learners for reasons
-// described below.
+// - The raft snapshot queue tries to avoid sending snapshots to learners for
+// reasons described below.
// - Merges won't run while a learner replica is present.
//
// Replicas are now added in two ConfChange transactions. The first creates the
@@ -135,7 +135,19 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
// There is another race between the learner snapshot being sent and the raft
// snapshot queue happening to check the replica at the same time, also sending
// it a snapshot. This is safe but wasteful, so the raft snapshot queue won't
-// try to send snapshots to learners.
+// try to send snapshots to learners if there is already a snapshot to that
+// range in flight.
+//
+// *However*, raft is currently pickier than the needs to be about the snapshots
+// it requests and it can get stuck in StateSnapshot if it doesn't receive
+// exactly the index it wants. As a result, for now, the raft snapshot queue
+// will send one if it's still needed after the learner snapshot finishes (or
+// times out). To make this work in a timely manner (i.e. without relying on the
+// replica scanner) but without blocking the raft snapshot queue, when a
+// snapshot is skipped, this is reported to raft as an error sending the
+// snapshot. This causes raft to eventually re-enqueue it in the raft snapshot
+// queue. All of this is quite hard to reason about, so it'd be nice to make
+// this go away at some point.
//
// Merges are blocked if either side has a learner (to avoid working out the
// edge cases) but it's historically turned out to be a bad idea to get in the
diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go
index dff83d9866a0..ecbb5ad6a1c6 100644
--- a/pkg/storage/client_merge_test.go
+++ b/pkg/storage/client_merge_test.go
@@ -39,22 +39,22 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
- "github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
- "github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)
@@ -1750,100 +1750,56 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
}
}
-// TestStoreRangeMergeResplitAddReplicaRace verifies that an add replica request
-// that occurs concurrently with a merge is aborted.
-//
-// To see why aborting the add replica request is necessary, consider two
-// adjacent and collocated ranges, Q and R. Say the replicate queue decides to
-// rebalance Q onto store S4. It will initiate a ChangeReplicas command that
-// will send S4 a preemptive snapshot, then launch a replica-change transaction
-// to update R's range descriptor with the new replica. Now say the merge queue
-// decides to merge Q and R after the preemptive snapshot of Q has been sent to
-// S4 but before the replica-change transaction has started. The merge can
-// succeed because the ranges are still collocated. (The new replica of Q is
-// only considered added once the replica-change transaction commits.) If the
-// replica-change transaction were to commit, the new replica of Q on S4 would
-// have a snapshot of Q that predated the merge. In order to catch up, it would
-// need to apply the merge trigger, but the merge trigger will explode because
-// S4 does not have a replica of R.
-//
-// To avoid this scenario, ChangeReplicas commands will abort if they discover
-// the range descriptor has changed between when the snapshot is sent and when
-// the replica-change transaction starts.
+// TestStoreRangeMergeAddReplicaRace verifies that when an add replica request
+// occurs concurrently with a merge, one of them is aborted with a "descriptor
+// changed" CPut error.
func TestStoreRangeMergeAddReplicaRace(t *testing.T) {
defer leaktest.AfterTest(t)()
-
ctx := context.Background()
- storeCfg := storage.TestStoreConfig(nil)
- storeCfg.TestingKnobs.DisableSplitQueue = true
- storeCfg.TestingKnobs.DisableMergeQueue = true
- storeCfg.TestingKnobs.DisableReplicateQueue = true
-
- mtc := &multiTestContext{
- storeConfig: &storeCfg,
- // This test was written before the multiTestContext started creating many
- // system ranges at startup, and hasn't been update to take that into
- // account.
- startWithSingleRange: true,
- }
-
- mtc.Start(t, 2)
- defer mtc.Stop()
- store0, store1 := mtc.Store(0), mtc.Store(1)
-
- lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0)
- if err != nil {
- t.Fatal(err)
- }
-
- // Arrange to block all snapshots until we're ready.
- snapshotSentCh := make(chan struct{})
- allowSnapshotCh := make(chan struct{})
- mtc.transport.Listen(store1.StoreID(), RaftMessageHandlerInterceptor{
- RaftMessageHandler: store1,
- handleSnapshotFilter: func(header *storage.SnapshotRequest_Header) {
- snapshotSentCh <- struct{}{}
- <-allowSnapshotCh
- },
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ReplicationMode: base.ReplicationManual,
})
+ defer tc.Stopper().Stop(ctx)
+
+ scratchStartKey := tc.ScratchRange(t)
+ origDesc := tc.LookupRangeOrFatal(t, scratchStartKey)
+ splitKey := scratchStartKey.Next()
+ beforeDesc, _ := tc.SplitRangeOrFatal(t, splitKey)
- // Launch the add replicas request. This will get stuck sending a preemptive
- // snapshot thanks to the handler we installed above.
- errCh := make(chan error)
+ mergeErrCh, addErrCh := make(chan error, 1), make(chan error, 1)
go func() {
- errCh <- mtc.replicateRangeNonFatal(rhsDesc.RangeID, 1)
+ mergeErrCh <- tc.Server(0).DB().AdminMerge(ctx, scratchStartKey)
}()
-
- // Wait for the add replicas request to send a snapshot, then merge the ranges
- // together.
- <-snapshotSentCh
- mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
- if _, pErr := client.SendWrapped(ctx, store0.TestSender(), mergeArgs); pErr != nil {
- t.Fatal(pErr)
- }
-
- // Allow snapshots, which will allow the add replicas to complete.
- close(allowSnapshotCh)
- expErr := `change replicas of r2 failed: descriptor changed: .* \(range subsumed\)`
- if err := <-errCh; !testutils.IsError(err, expErr) {
- t.Fatalf("expected error %q, got %v", expErr, err)
+ go func() {
+ targets := []roachpb.ReplicationTarget{tc.Target(1)}
+ _, err := tc.Server(0).DB().AdminChangeReplicas(
+ ctx, scratchStartKey, roachpb.ADD_REPLICA, targets, beforeDesc)
+ addErrCh <- err
+ }()
+ mergeErr := <-mergeErrCh
+ addErr := <-addErrCh
+ afterDesc := tc.LookupRangeOrFatal(t, scratchStartKey)
+
+ const acceptableMergeErr = `unexpected value: raw_bytes|ranges not collocated` +
+ `|cannot merge range with non-voter replicas`
+ if mergeErr == nil && testutils.IsError(addErr, `descriptor changed: \[expected\]`) {
+ // Merge won the race, no add happened.
+ require.Len(t, afterDesc.Replicas().Voters(), 1)
+ require.Equal(t, origDesc.EndKey, afterDesc.EndKey)
+ } else if addErr == nil && testutils.IsError(mergeErr, acceptableMergeErr) {
+ // Add won the race, no merge happened.
+ require.Len(t, afterDesc.Replicas().Voters(), 2)
+ require.Equal(t, beforeDesc.EndKey, afterDesc.EndKey)
+ } else {
+ t.Fatalf(`expected exactly one of merge or add to succeed got: [merge] %v [add] %v`,
+ mergeErr, addErr)
}
}
// TestStoreRangeMergeResplitAddReplicaRace tests a diabolical edge case in the
-// merge/add replica race.
-//
-// Consider the same situation as described in the comment on
-// TestStoreRangeMergeAddReplicaRace, except that Q and R merge together and
-// then split at exactly the same key, all before the replica-change transaction
-// starts. Q's range descriptor will have the same start key, end key, and next
-// replica ID that it did when the preemptive snapshot started. That is, it will
-// look unchanged! To protect against this, range descriptors contain a
-// generation counter, which is incremented on every split or merge. The
-// presence of this counter means that ChangeReplicas commands can detect and
-// abort if any merges have occurred since the preemptive snapshot, even if the
-// sequence of splits or merges left the keyspan of the range unchanged. This
-// diabolical edge case is tested here.
+// merge/add replica race. If two replicas merge and then split at the previous
+// boundary, the descriptor will look unchanged and our usual CPut protection
+// would fail. For this reason, we introduced RangeDescriptor.Generation.
//
// Note that splits will not increment the generation counter until the cluster
// version includes VersionRangeMerges. That's ok, because a sequence of splits
@@ -1852,44 +1808,29 @@ func TestStoreRangeMergeAddReplicaRace(t *testing.T) {
// always increment the generation counter.
func TestStoreRangeMergeResplitAddReplicaRace(t *testing.T) {
defer leaktest.AfterTest(t)()
-
ctx := context.Background()
- storeCfg := storage.TestStoreConfig(nil)
- storeCfg.TestingKnobs.DisableSplitQueue = true
- storeCfg.TestingKnobs.DisableMergeQueue = true
- storeCfg.TestingKnobs.DisableReplicateQueue = true
-
- mtc := &multiTestContext{storeConfig: &storeCfg}
- mtc.Start(t, 2)
- defer mtc.Stop()
- store0, store1 := mtc.Store(0), mtc.Store(1)
-
- lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0)
- if err != nil {
- t.Fatal(err)
- }
-
- mtc.transport.Listen(store1.StoreID(), RaftMessageHandlerInterceptor{
- RaftMessageHandler: store1,
- handleSnapshotFilter: func(header *storage.SnapshotRequest_Header) {
- mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
- if _, pErr := client.SendWrapped(ctx, store0.TestSender(), mergeArgs); pErr != nil {
- // The filter is invoked by a different goroutine, so can't use t.Fatal.
- t.Error(pErr)
- return
- }
- splitArgs := adminSplitArgs(rhsDesc.StartKey.AsRawKey())
- if _, pErr := client.SendWrapped(ctx, store0.TestSender(), splitArgs); pErr != nil {
- // The filter is invoked by a different goroutine, so can't use t.Fatal.
- t.Error(pErr)
- return
- }
- },
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ReplicationMode: base.ReplicationManual,
})
-
- err = mtc.replicateRangeNonFatal(lhsDesc.RangeID, 1)
- if exp := "change replicas of r.* failed: descriptor changed"; !testutils.IsError(err, exp) {
- t.Fatalf("expected error %q, got %v", exp, err)
+ defer tc.Stopper().Stop(ctx)
+
+ scratchStartKey := tc.ScratchRange(t)
+ splitKey := scratchStartKey.Next()
+ origDesc, _ := tc.SplitRangeOrFatal(t, splitKey)
+ require.NoError(t, tc.Server(0).DB().AdminMerge(ctx, scratchStartKey))
+ resplitDesc, _ := tc.SplitRangeOrFatal(t, splitKey)
+
+ assert.Equal(t, origDesc.RangeID, resplitDesc.RangeID)
+ assert.Equal(t, origDesc.StartKey, resplitDesc.StartKey)
+ assert.Equal(t, origDesc.EndKey, resplitDesc.EndKey)
+ assert.Equal(t, origDesc.Replicas().All(), resplitDesc.Replicas().All())
+ assert.NotEqual(t, origDesc.Generation, resplitDesc.Generation)
+
+ targets := []roachpb.ReplicationTarget{tc.Target(1)}
+ _, err := tc.Server(0).DB().AdminChangeReplicas(
+ ctx, scratchStartKey, roachpb.ADD_REPLICA, targets, origDesc)
+ if !testutils.IsError(err, `descriptor changed`) {
+ t.Fatalf(`expected "descriptor changed" error got: %+v`, err)
}
}
@@ -2324,116 +2265,6 @@ func TestStoreRangeMergeDeadFollowerDuringTxn(t *testing.T) {
}
}
-func TestStoreRangeMergeReadoptedBothFollowers(t *testing.T) {
- defer leaktest.AfterTest(t)()
-
- ctx := context.Background()
- storeCfg := storage.TestStoreConfig(nil)
- storeCfg.TestingKnobs.DisableReplicateQueue = true
- storeCfg.TestingKnobs.DisableReplicaGCQueue = true
- storeCfg.TestingKnobs.DisableMergeQueue = true
- mtc := &multiTestContext{storeConfig: &storeCfg}
- mtc.Start(t, 3)
- defer mtc.Stop()
- store0, store2 := mtc.Store(0), mtc.Store(2)
- distSender := mtc.distSenders[0]
-
- // Create two ranges on all nodes.
- rngID := store0.LookupReplica(roachpb.RKey("a")).Desc().RangeID
- mtc.replicateRange(rngID, 1, 2)
- lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0)
- if err != nil {
- t.Fatal(err)
- }
-
- // Wait for all stores to have fully processed the split.
- for _, key := range []roachpb.Key{roachpb.Key("a"), roachpb.Key("b")} {
- if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
- t.Fatal(pErr)
- }
- mtc.waitForValues(key, []int64{1, 1, 1})
- }
-
- lhsRepl0, err := store0.GetReplica(lhsDesc.RangeID)
- if err != nil {
- t.Fatal(err)
- }
- lhsRepl2, err := store2.GetReplica(lhsDesc.RangeID)
- if err != nil {
- t.Fatal(err)
- }
- rhsRepl2, err := store2.GetReplica(rhsDesc.RangeID)
- if err != nil {
- t.Fatal(err)
- }
-
- // Abandon the two ranges on store2, but do not GC them.
- mtc.unreplicateRange(lhsDesc.RangeID, 2)
- mtc.unreplicateRange(rhsDesc.RangeID, 2)
-
- // Merge the two ranges together.
- args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
- _, pErr := client.SendWrapped(ctx, store0.TestSender(), args)
- if pErr != nil {
- t.Fatal(pErr)
- }
-
- addLHSRepl2 := func() error {
- for r := retry.StartWithCtx(ctx, retry.Options{}); r.Next(); {
- _, err := lhsRepl0.ChangeReplicas(ctx, roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
- NodeID: store2.Ident.NodeID,
- StoreID: store2.Ident.StoreID,
- }, lhsRepl0.Desc(), storagepb.ReasonUnknown, t.Name())
- if !testutils.IsError(err, "store busy applying snapshots") {
- return err
- }
- }
- t.Fatal("unreachable")
- return nil
- }
-
- // Attempt to re-add the merged range to store2. The operation should fail
- // because store2's LHS and RHS replicas intersect the merged range.
- err = addLHSRepl2()
- if exp := "cannot apply snapshot: snapshot intersects existing range"; !testutils.IsError(err, exp) {
- t.Fatalf("expected %q error, but got %v", exp, err)
- }
-
- // GC the replica of the LHS on store2.
- if err := store2.ManualReplicaGC(lhsRepl2); err != nil {
- t.Fatal(err)
- }
- if _, err := store2.GetReplica(lhsDesc.RangeID); err == nil {
- t.Fatal("lhs replica not destroyed on store2")
- }
-
- // Attempt to re-add the merged range to store2. The operation should fail
- // again because store2's RHS still intersects the merged range.
- err = addLHSRepl2()
- if exp := "cannot apply snapshot: snapshot intersects existing range"; !testutils.IsError(err, exp) {
- t.Fatalf("expected %q error, but got %v", exp, err)
- }
-
- // GC the replica of the RHS on store2.
- if err := store2.ManualReplicaGC(rhsRepl2); err != nil {
- t.Fatal(err)
- }
- if _, err := store2.GetReplica(rhsDesc.RangeID); err == nil {
- t.Fatal("rhs replica not destroyed on store2")
- }
-
- // Attempt to re-add the merged range to store2 one last time. This time the
- // operation should succeed because there are no remaining intersecting
- // replicas.
- if err := addLHSRepl2(); err != nil {
- t.Fatal(err)
- }
-
- // Give store2 the lease to force all commands to be applied, including the
- // ChangeReplicas.
- mtc.transferLease(ctx, lhsDesc.RangeID, 0, 2)
-}
-
func TestStoreRangeReadoptedLHSFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
@@ -2493,7 +2324,7 @@ func TestStoreRangeReadoptedLHSFollower(t *testing.T) {
StoreID: mtc.idents[2].StoreID,
}},
*lhsDesc,
- ); !testutils.IsError(err, "cannot apply snapshot: snapshot intersects existing range") {
+ ); !testutils.IsError(err, "descriptor changed") {
t.Fatal(err)
}
@@ -3277,7 +3108,10 @@ func TestMergeQueue(t *testing.T) {
clock := hlc.NewClock(manualClock.UnixNano, time.Nanosecond)
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableSplitQueue = true
+ storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableScanner = true
+ rangeMinBytes := int64(1 << 10) // 1KB
+ storeCfg.DefaultZoneConfig.RangeMinBytes = &rangeMinBytes
sv := &storeCfg.Settings.SV
storagebase.MergeQueueEnabled.Override(sv, true)
storage.MergeQueueInterval.Override(sv, 0) // process greedily
diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go
index e8a7b42816d8..115664942d46 100644
--- a/pkg/storage/client_raft_test.go
+++ b/pkg/storage/client_raft_test.go
@@ -1157,6 +1157,7 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) {
sc := storage.TestStoreConfig(nil)
sc.TestingKnobs.DisableMergeQueue = true
+ sc.TestingKnobs.DisableReplicateQueue = true
// Disable the replica GC queue so that it doesn't accidentally pick up the
// removed replica and GC it. We'll explicitly enable it later in the test.
sc.TestingKnobs.DisableReplicaGCQueue = true
@@ -1472,7 +1473,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- if lease, _ := repl.GetLease(); lease.Replica != repDesc {
+ if lease, _ := repl.GetLease(); !lease.Replica.Equal(repDesc) {
return errors.Errorf("lease not transferred yet; found %v", lease)
}
return nil
@@ -1539,8 +1540,8 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
}
// TestStoreRangeUpReplicate verifies that the replication queue will notice
-// under-replicated ranges and replicate them. Also tests that preemptive
-// snapshots which contain sideloaded proposals don't panic the receiving end.
+// under-replicated ranges and replicate them. Also tests that snapshots which
+// contain sideloaded proposals don't panic the receiving end.
func TestStoreRangeUpReplicate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer storage.SetMockAddSSTable()()
@@ -1580,9 +1581,8 @@ func TestStoreRangeUpReplicate(t *testing.T) {
return errors.Errorf("expected 0 reservations, but found %d", n)
}
if len(r.Desc().InternalReplicas) != 3 {
- // This fails even after the preemptive snapshot has arrived and
- // only goes through once the replica has properly caught up to
- // the fully replicated descriptor.
+ // This fails even after the snapshot has arrived and only goes through
+ // once the replica has applied the conf change.
return errors.Errorf("not fully initialized")
}
}
@@ -1590,23 +1590,33 @@ func TestStoreRangeUpReplicate(t *testing.T) {
})
var generated int64
- var normalApplied int64
- var preemptiveApplied int64
+ var learnerApplied, raftApplied int64
for _, s := range mtc.stores {
m := s.Metrics()
generated += m.RangeSnapshotsGenerated.Count()
- normalApplied += m.RangeSnapshotsNormalApplied.Count()
- preemptiveApplied += m.RangeSnapshotsPreemptiveApplied.Count()
+ learnerApplied += m.RangeSnapshotsLearnerApplied.Count()
+ raftApplied += m.RangeSnapshotsNormalApplied.Count()
}
if generated == 0 {
t.Fatalf("expected at least 1 snapshot, but found 0")
}
-
- if normalApplied != 0 {
- t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
- }
- if generated != preemptiveApplied {
- t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
+ var replicaCount int64
+ mtc.stores[0].VisitReplicas(func(_ *storage.Replica) bool {
+ replicaCount++
+ return true
+ })
+ // It's hard to make generalizations about exactly how many snapshots happen
+ // of each type. Almost all of them are learner snaps, but there is a race
+ // where the raft snapshot queue sometimes starts the snapshot first. Further,
+ // if the raft snapshot is at a higher index, we may even reject the learner
+ // snap. We definitely get at least one snapshot per replica and the race is
+ // rare enough that the majority of them should be learner snaps.
+ if expected := 2 * replicaCount; expected < learnerApplied+raftApplied {
+ t.Fatalf("expected at least %d snapshots, but found %d learner snaps and %d raft snaps",
+ expected, learnerApplied, raftApplied)
+ }
+ if raftApplied > learnerApplied {
+ t.Fatalf("expected more learner snaps %d than raft snaps %d", learnerApplied, raftApplied)
}
}
@@ -1655,11 +1665,6 @@ func TestUnreplicateFirstRange(t *testing.T) {
// TestChangeReplicasDescriptorInvariant tests that a replica change aborts if
// another change has been made to the RangeDescriptor since it was initiated.
-//
-// TODO(tschottdorf): If this test is flaky because the snapshot count does not
-// increase, it's likely because with proposer-evaluated KV, less gets proposed
-// and so sometimes Raft discards the preemptive snapshot (though we count that
-// case in stats already) or doesn't produce a Ready.
func TestChangeReplicasDescriptorInvariant(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := &multiTestContext{
@@ -1706,7 +1711,7 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) {
return nil
})
- before := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
+ before := mtc.stores[2].Metrics().RangeSnapshotsLearnerApplied.Count()
// Attempt to add replica to the third store with the original descriptor.
// This should fail because the descriptor is stale.
expectedErr := `change replicas of r1 failed: descriptor changed: \[expected\]`
@@ -1714,29 +1719,26 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) {
t.Fatalf("got unexpected error: %+v", err)
}
- testutils.SucceedsSoon(t, func() error {
- after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
- // The failed ChangeReplicas call should have applied a preemptive snapshot.
- if after != before+1 {
- return errors.Errorf(
- "ChangeReplicas call should have applied a preemptive snapshot, before %d after %d",
- before, after)
- }
- return nil
- })
+ after := mtc.stores[2].Metrics().RangeSnapshotsLearnerApplied.Count()
+ // The failed ChangeReplicas call should NOT have applied a learner snapshot.
+ if after != before {
+ t.Fatalf(
+ "ChangeReplicas call should not have applied a learner snapshot, before %d after %d",
+ before, after)
+ }
- before = mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
+ before = mtc.stores[2].Metrics().RangeSnapshotsLearnerApplied.Count()
// Add to third store with fresh descriptor.
if err := addReplica(2, repl.Desc()); err != nil {
t.Fatal(err)
}
testutils.SucceedsSoon(t, func() error {
- after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
- // The failed ChangeReplicas call should have applied a preemptive snapshot.
+ after := mtc.stores[2].Metrics().RangeSnapshotsLearnerApplied.Count()
+ // The failed ChangeReplicas call should have applied a learner snapshot.
if after != before+1 {
return errors.Errorf(
- "ChangeReplicas call should have applied a preemptive snapshot, before %d after %d",
+ "ChangeReplicas call should have applied a learner snapshot, before %d after %d",
before, after)
}
r := mtc.stores[2].LookupReplica(roachpb.RKey("a"))
@@ -2439,7 +2441,7 @@ outer:
if err != nil {
t.Fatal(err)
}
- if lease, _ := repl.GetLease(); lease.Replica == repDesc {
+ if lease, _ := repl.GetLease(); lease.Replica.Equal(repDesc) {
mtc.transferLease(context.TODO(), rangeID, leaderIdx, replicaIdx)
}
mtc.unreplicateRange(rangeID, leaderIdx)
@@ -3765,42 +3767,6 @@ func TestTransferRaftLeadership(t *testing.T) {
})
}
-// TestFailedPreemptiveSnapshot verifies that ChangeReplicas is
-// aborted if we are unable to send a preemptive snapshot.
-func TestFailedPreemptiveSnapshot(t *testing.T) {
- defer leaktest.AfterTest(t)()
-
- mtc := &multiTestContext{}
- defer mtc.Stop()
- mtc.Start(t, 2)
-
- // Replicate a range onto the two stores. This replication is
- // important because if there was only one node to begin with, the
- // ChangeReplicas would fail because it was unable to achieve quorum
- // even if the preemptive snapshot failure were ignored.
- mtc.replicateRange(1, 1)
-
- // Now try to add a third. It should fail because we cannot send a
- // preemptive snapshot to it.
- rep, err := mtc.stores[0].GetReplica(1)
- if err != nil {
- t.Fatal(err)
- }
- const expErr = "snapshot failed: failed to resolve n3: unknown peer 3"
- if _, err := rep.ChangeReplicas(
- context.Background(),
- roachpb.ADD_REPLICA,
- roachpb.ReplicationTarget{NodeID: 3, StoreID: 3},
- rep.Desc(),
- storagepb.ReasonRangeUnderReplicated,
- "",
- ); !testutils.IsError(err, expErr) {
- t.Fatalf("expected %s; got %v", expErr, err)
- } else if !storage.IsSnapshotError(err) {
- t.Fatalf("expected preemptive snapshot failed error; got %T: %+v", err, err)
- }
-}
-
// Test that a single blocked replica does not block other replicas.
func TestRaftBlockedReplica(t *testing.T) {
defer leaktest.AfterTest(t)()
diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go
index 06b13e60a080..5a90ab95d1c5 100644
--- a/pkg/storage/client_replica_test.go
+++ b/pkg/storage/client_replica_test.go
@@ -739,7 +739,7 @@ func TestRangeTransferLeaseExpirationBased(t *testing.T) {
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
- if *(nlhe.LeaseHolder) != l.replica1Desc {
+ if !nlhe.LeaseHolder.Equal(&l.replica1Desc) {
t.Fatalf("expected lease holder %+v, got %+v",
l.replica1Desc, nlhe.LeaseHolder)
}
@@ -829,7 +829,7 @@ func TestRangeTransferLeaseExpirationBased(t *testing.T) {
if !ok {
t.Fatalf("expected %T, got %s", &roachpb.NotLeaseHolderError{}, pErr)
}
- if nlhe.LeaseHolder == nil || *nlhe.LeaseHolder != l.replica1Desc {
+ if nlhe.LeaseHolder == nil || !nlhe.LeaseHolder.Equal(&l.replica1Desc) {
t.Fatalf("expected lease holder %+v, got %+v",
l.replica1Desc, nlhe.LeaseHolder)
}
@@ -1670,7 +1670,7 @@ func TestChangeReplicasGeneration(t *testing.T) {
); err != nil {
t.Fatalf("unexpected error: %v", err)
}
- assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+1)
+ assert.EqualValues(t, repl.Desc().GetGeneration(), oldGeneration+2)
oldGeneration = repl.Desc().GetGeneration()
if _, err := repl.ChangeReplicas(
@@ -2053,9 +2053,8 @@ func TestConcurrentAdminChangeReplicasRequests(t *testing.T) {
assert.Falsef(t, err1 == nil && err2 == nil,
"expected one of racing AdminChangeReplicasRequests to fail but neither did")
- // It is possible that an error can occur due to a rejected preemptive
- // snapshot from the target range. We don't want to fail the test if we got
- // one of those.
+ // It is possible that an error can occur due to a rejected snapshot from the
+ // target range. We don't want to fail the test if we got one of those.
isSnapshotErr := func(err error) bool {
return testutils.IsError(err, "snapshot failed:")
}
diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go
index 363712b52ad0..b5dabcbcf48f 100644
--- a/pkg/storage/client_split_test.go
+++ b/pkg/storage/client_split_test.go
@@ -57,6 +57,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
+ "github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/raftpb"
)
@@ -479,28 +480,28 @@ func TestSplitTriggerRaftSnapshotRace(t *testing.T) {
perm := rand.Perm(numSplits)
idx := int32(-1) // accessed atomically
- checkNoSnaps := func(when string) {
+ numRaftSnaps := func(when string) int {
+ var totalSnaps int
for i := 0; i < numNodes; i++ {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := tc.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
- name LIKE 'queue.raftsnapshot.process.%'
-OR
- name LIKE 'queue.raftsnapshot.pending'
+ name = 'range.snapshots.normal-applied'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
- if expRows := 3; n != expRows {
+ if expRows := 1; n != expRows {
t.Fatalf("%s: expected %d rows, got %d", when, expRows, n)
}
- if c > 0 {
- t.Fatalf("observed %d Raft snapshots %s splits", c, when)
- }
+ totalSnaps += c
}
+ return totalSnaps
}
- checkNoSnaps("before")
+ // There are usually no raft snaps before, but there is a race condition where
+ // they can occasionally happen during upreplication.
+ numSnapsBefore := numRaftSnaps("before")
doSplit := func(ctx context.Context) error {
_, _, err := tc.SplitRange(
@@ -512,7 +513,8 @@ OR
t.Fatal(err)
}
- checkNoSnaps("after")
+ // Check that no snaps happened during the splits.
+ require.Equal(t, numSnapsBefore, numRaftSnaps("after"))
}
// TestStoreRangeSplitIdempotency executes a split of a range and
diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go
index ca6508aa1ddb..dac30b5a9419 100644
--- a/pkg/storage/client_test.go
+++ b/pkg/storage/client_test.go
@@ -1201,6 +1201,9 @@ func (m *multiTestContext) replicateRangeNonFatal(rangeID roachpb.RangeID, dests
if e := expectedReplicaIDs[i]; repDesc.ReplicaID != e {
return errors.Errorf("expected replica %s to have ID %d", repl, e)
}
+ if t := repDesc.GetType(); t != roachpb.ReplicaType_VOTER {
+ return errors.Errorf("expected replica %s to be a voter was %s", repl, t)
+ }
if !repl.Desc().ContainsKey(startKey) {
return errors.Errorf("expected replica %s to contain %s", repl, startKey)
}
diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go
index 7b210d1e8a32..e7a479302693 100644
--- a/pkg/storage/raft_log_queue.go
+++ b/pkg/storage/raft_log_queue.go
@@ -243,12 +243,13 @@ func updateRaftProgressFromActivity(
}
pr.RecentActive = lastUpdate.isFollowerActive(ctx, replicaID, now)
// Override this field for safety since we don't use it. Instead, we use
- // pendingSnapshotIndex from above which is also populated for
- // preemptive snapshots.
+ // pendingSnapshotIndex from above which is also populated for preemptive
+ // snapshots.
//
- // TODO(tschottdorf): if we used Raft learners instead of preemptive
- // snapshots, I think this value would do exactly the right tracking
- // (including only resetting when the follower resumes replicating).
+ // NOTE: We don't rely on PendingSnapshot because PendingSnapshot is
+ // initialized by the leader when it realizes the follower needs a snapshot,
+ // and it isn't initialized with the index of the snapshot that is actually
+ // sent by us (out of band), which likely is lower.
pr.PendingSnapshot = 0
prs[uint64(replicaID)] = pr
}
diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go
index b830c1bc378c..f4dc213962a2 100644
--- a/pkg/storage/raft_snapshot_queue.go
+++ b/pkg/storage/raft_snapshot_queue.go
@@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/tracker"
)
@@ -65,15 +66,8 @@ func (rq *raftSnapshotQueue) shouldQueue(
// If a follower needs a snapshot, enqueue at the highest priority.
if status := repl.RaftStatus(); status != nil {
// raft.Status.Progress is only populated on the Raft group leader.
- for id, p := range status.Progress {
+ for _, p := range status.Progress {
if p.State == tracker.StateSnapshot {
- // We refuse to send a snapshot of type RAFT to a learner for reasons
- // described in processRaftSnapshot, so don't bother queueing.
- for _, r := range repl.Desc().Replicas().Learners() {
- if r.ReplicaID == roachpb.ReplicaID(id) {
- continue
- }
- }
if log.V(2) {
log.Infof(ctx, "raft snapshot needed, enqueuing")
}
@@ -117,14 +111,28 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
// that's adding it or it's been orphaned and it's about to be cleaned up by
// the replicate queue. Either way, no point in also sending it a snapshot of
// type RAFT.
- //
- // TODO(dan): Reconsider this. If the learner coordinator fails before sending
- // it a snap, then until the replication queue collects it, any proposals sent
- // to it will get stuck indefinitely. At the moment, nothing should be sending
- // it such a proposal, but this is brittle and could change easily.
if repDesc.GetType() == roachpb.ReplicaType_LEARNER {
- log.Eventf(ctx, "not sending snapshot type RAFT to learner: %s", repDesc)
- return nil
+ if index := repl.getAndGCSnapshotLogTruncationConstraints(timeutil.Now()); index > 0 {
+ // There is a snapshot being transferred. It's probably a LEARNER snap, so
+ // bail for now and try again later.
+ err := errors.Errorf(
+ "skipping snapshot; replica is likely a learner in the process of being added: %s", repDesc)
+ log.Info(ctx, err)
+ // TODO(dan): This is super brittle and non-obvious. In the common case,
+ // this check avoids duplicate work, but in rare cases, we send the
+ // learner snap at an index before the one raft wanted here. The raft
+ // leader should be able to use logs to get the rest of the way, but it
+ // doesn't try. In this case, skipping the raft snapshot would mean that
+ // we have to wait for the next scanner cycle of the raft snapshot queue
+ // to pick it up again. So, punt the responsibility back to raft by
+ // telling it that the snapshot failed. If the learner snap ends up being
+ // sufficient, this message will be ignored, but if we hit the case
+ // described above, this will cause raft to keep asking for a snap and at
+ // some point the snapshot lock above will be released and we'll fall
+ // through to the below.
+ repl.reportSnapshotStatus(ctx, repDesc.ReplicaID, err)
+ return nil
+ }
}
err := repl.sendSnapshot(ctx, repDesc, SnapshotRequest_RAFT, SnapshotRequest_RECOVERY)
@@ -148,10 +156,6 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
// We're currently not handling this and instead rely on the quota pool to
// make sure that log truncations won't require snapshots for healthy
// followers.
-
- // Report the snapshot status to Raft, which expects us to do this once
- // we finish sending the snapshot.
- repl.reportSnapshotStatus(ctx, id, err)
return err
}
diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go
index f1668813a64c..ec16625466bf 100644
--- a/pkg/storage/replica_command.go
+++ b/pkg/storage/replica_command.go
@@ -50,7 +50,7 @@ import (
var useLearnerReplicas = settings.RegisterBoolSetting(
"kv.learner_replicas.enabled",
"use learner replicas for replica addition",
- false)
+ true)
// AdminSplit divides the range into into two ranges using args.SplitKey.
func (r *Replica) AdminSplit(
@@ -798,8 +798,8 @@ func IsSnapshotError(err error) bool {
}
// ChangeReplicas adds or removes a replica of a range. The change is performed
-// in a distributed transaction and takes effect when that transaction is committed.
-// When removing a replica, only the NodeID and StoreID fields of the Replica are used.
+// in a distributed transaction and takes effect when that transaction is
+// committed.
//
// The supplied RangeDescriptor is used as a form of optimistic lock. See the
// comment of "adminSplitWithDescriptor" for more information on this pattern.
@@ -809,31 +809,35 @@ func IsSnapshotError(err error) bool {
// actors.
//
// Changing the replicas for a range is complicated. A change is initiated by
-// the "replicate" queue when it encounters a range which has too many
-// replicas, too few replicas or requires rebalancing. Addition and removal of
-// a replica is divided into four phases. The first phase, which occurs in
-// Replica.ChangeReplicas, is performed via a distributed transaction which
-// updates the range descriptor and the meta range addressing information. This
-// transaction includes a special ChangeReplicasTrigger on the EndTransaction
-// request. A ConditionalPut of the RangeDescriptor implements the optimistic
-// lock on the RangeDescriptor mentioned previously. Like all transactions, the
-// requests within the transaction are replicated via Raft, including the
-// EndTransaction request.
+// the "replicate" queue when it encounters a range which has too many replicas,
+// too few replicas or requires rebalancing. Removal of a replica is divided
+// into four phases, described below. Addition of a replica is divided into two
+// sets of the same four phases: first to add it as a raft learner and then to
+// promote it to a raft voter after sending it a snapshot. For more information
+// on learner replicas, see `(ReplicaDescriptors).Learners`.
+//
+// The first phase, which occurs in Replica.ChangeReplicas, is performed via a
+// distributed transaction which updates the range descriptor and the meta range
+// addressing information. This transaction includes a special
+// ChangeReplicasTrigger on the EndTransaction request. A ConditionalPut of the
+// RangeDescriptor implements the optimistic lock on the RangeDescriptor
+// mentioned previously. Like all transactions, the requests within the
+// transaction are replicated via Raft, including the EndTransaction request.
//
// The second phase of processing occurs when the batch containing the
-// EndTransaction is proposed to raft. This proposing occurs on whatever
-// replica received the batch, usually, but not always the range lease
-// holder. defaultProposeRaftCommandLocked notices that the EndTransaction
-// contains a ChangeReplicasTrigger and proposes a ConfChange to Raft (via
+// EndTransaction is proposed to raft. This proposing occurs on whatever replica
+// received the batch, usually, but not always the range lease holder.
+// defaultProposeRaftCommandLocked notices that the EndTransaction contains a
+// ChangeReplicasTrigger and proposes a ConfChange to Raft (via
// raft.RawNode.ProposeConfChange).
//
// The ConfChange is propagated to all of the replicas similar to a normal Raft
// command, though additional processing is done inside of Raft. A Replica
// encounters the ConfChange in Replica.handleRaftReady and executes it using
-// raft.RawNode.ApplyConfChange. If a new replica was added the Raft leader
-// will start sending it heartbeat messages and attempting to bring it up to
-// date. If a replica was removed, it is at this point that the Raft leader
-// will stop communicating with it.
+// raft.RawNode.ApplyConfChange. If a new replica was added the Raft leader will
+// start sending it heartbeat messages and attempting to bring it up to date. If
+// a replica was removed, it is at this point that the Raft leader will stop
+// communicating with it.
//
// The fourth phase of change replicas occurs when each replica for the range
// encounters the ChangeReplicasTrigger when applying the EndTransaction
@@ -844,37 +848,25 @@ func IsSnapshotError(err error) bool {
// Note that a removed replica may not see the EndTransaction containing the
// ChangeReplicasTrigger. The ConfChange operation will be applied as soon as a
// quorum of nodes have committed it. If the removed replica is down or the
-// message is dropped for some reason the removed replica will not be
-// notified. The replica GC queue will eventually discover and cleanup this
-// state.
+// message is dropped for some reason the removed replica will not be notified.
+// The replica GC queue will eventually discover and cleanup this state.
//
// When a new replica is added, it will have to catch up to the state of the
-// other replicas. The Raft leader automatically handles this by either sending
-// the new replica Raft log entries to apply, or by generating and sending a
-// snapshot. See Replica.Snapshot and Replica.Entries.
-//
-// Note that Replica.ChangeReplicas returns when the distributed transaction
-// has been committed to a quorum of replicas in the range. The actual
-// replication of data occurs asynchronously via a snapshot or application of
-// Raft log entries. This is important for the replicate queue to be aware
-// of. A node can process hundreds or thousands of ChangeReplicas operations
-// per second even though the actual replication of data proceeds at a much
-// slower base. In order to avoid having this background replication and
-// overwhelming the system, replication is throttled via a reservation system.
-// When allocating a new replica for a range, the replicate queue reserves space
-// for that replica on the target store via a ReservationRequest. (See
-// StorePool.reserve). The reservation is fulfilled when the snapshot is
-// applied.
+// other replicas. In the common case, a "learner snapshot" is sent after the
+// replica is added as a learner, but before it is promoted to a voter. If this
+// fails, the raft leader will request a snapshot. See Replica.sendSnapshot.
//
-// TODO(peter): There is a rare scenario in which a replica can be brought up
-// to date via Raft log replay. In this scenario, the reservation will be left
-// dangling until it expires. See #7849.
-//
-// TODO(peter): Describe preemptive snapshots. Preemptive snapshots are needed
-// for the replicate queue to function properly. Currently the replicate queue
-// will fire off as many replica additions as possible until it starts getting
-// reservations denied at which point it will ignore the replica until the next
-// scanner cycle.
+// Note that Replica.ChangeReplicas returns when the distributed transaction has
+// been committed to a quorum of replicas in the range. The actual replication
+// of data occurs asynchronously via a snapshot or application of Raft log
+// entries. This is important for the replicate queue to be aware of. A node can
+// process hundreds or thousands of ChangeReplicas operations per second even
+// though the actual replication of data proceeds at a much slower base. In
+// order to avoid having this background replication and overwhelming the
+// system, replication is throttled via a reservation system. When allocating a
+// new replica for a range, the replicate queue reserves space for that replica
+// on the target store via a ReservationRequest. (See StorePool.reserve). The
+// reservation is fulfilled when the snapshot is applied.
func (r *Replica) ChangeReplicas(
ctx context.Context,
changeType roachpb.ReplicaChangeType,
@@ -951,6 +943,7 @@ func (r *Replica) addReplica(
if err != nil {
// Don't leave a learner replica lying around if we didn't succeed in
// promoting it to a voter.
+ log.Infof(ctx, "could not promote %s to voter, rolling back: %v", target, err)
r.rollbackLearnerReplica(ctx, learnerDesc, target, reason, details)
return nil, err
}
@@ -1004,8 +997,25 @@ func (r *Replica) promoteLearnerReplicaToVoter(
rDesc.Type = roachpb.ReplicaTypeVoter()
newReplicas[i] = rDesc
- // Note that raft snapshot queue refuses to send snapshots, so this is the
- // only one a learner can get.
+ // Note that raft snapshot queue will refuse to send a snapshot to a learner
+ // replica if its store is already sending a snapshot to that replica. That
+ // races with this snapshot. Most of the time, this side will win the race,
+ // which avoids needlessly sending the snapshot twice. If the raft snapshot
+ // queue wins, it's wasteful, but doesn't impact correctness.
+ //
+ // Replicas are added to the raft snapshot queue by the raft leader. This
+ // code can be run anywhere (though it's usually run on the leaseholder,
+ // which is usually co-located with the raft leader). This means that
+ // they're usually on the same node, but not always, so that's about as good
+ // a guarantee as we can offer, anyway.
+ //
+ // We originally tried always refusing to send snapshots from the raft
+ // snapshot queue to learner replicas, but this turned out to be brittle.
+ // First, if the snapshot failed, any attempt to use the learner's raft
+ // group would hang until the replicate queue got around to cleaning up the
+ // orphaned learner. Second, this tickled some bugs in etcd/raft around
+ // switching between StateSnapshot and StateProbe. Even if we worked through
+ // these, it would be susceptible to future similar issues.
if err := r.sendSnapshot(ctx, rDesc, SnapshotRequest_LEARNER, priority); err != nil {
return nil, err
}
@@ -1056,7 +1066,7 @@ func (r *Replica) rollbackLearnerReplica(
rollbackCtx, "learner rollback", rollbackTimeout, rollbackFn,
); err != nil {
log.Infof(ctx,
- "failed to rollback learner %s, abandoning it for the replicate queue %v", target, err)
+ "failed to rollback learner %s, abandoning it for the replicate queue: %v", target, err)
r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now())
} else {
log.Infof(ctx, "rolled back learner %s to %s", replDesc, &newDesc)
@@ -1353,7 +1363,15 @@ func (r *Replica) sendSnapshot(
recipient roachpb.ReplicaDescriptor,
snapType SnapshotRequest_Type,
priority SnapshotRequest_Priority,
-) error {
+) (retErr error) {
+ defer func() {
+ if snapType != SnapshotRequest_PREEMPTIVE {
+ // Report the snapshot status to Raft, which expects us to do this once we
+ // finish sending the snapshot.
+ r.reportSnapshotStatus(ctx, recipient.ReplicaID, retErr)
+ }
+ }()
+
snap, err := r.GetSnapshot(ctx, snapType)
if err != nil {
return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go
index 04c7ec5ab830..4c49dfacf45e 100644
--- a/pkg/storage/replica_learner_test.go
+++ b/pkg/storage/replica_learner_test.go
@@ -36,14 +36,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
- "go.etcd.io/etcd/raft/tracker"
)
-// TODO(dan): Test learners and quota pool.
-// TODO(dan): Grep the codebase for "preemptive" and audit.
-// TODO(dan): Write a test like TestLearnerAdminChangeReplicasRace for the
-// replicate queue leadership transfer race.
-
type learnerTestKnobs struct {
storeKnobs storage.StoreTestingKnobs
replicaAddStopAfterLearnerAtomic int64
@@ -387,6 +381,7 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
ctx := context.Background()
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeLearnerTestKnobs()
+ ltk.storeKnobs.DisableRaftSnapshotQueue = true
ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error {
select {
case <-blockSnapshotsCh:
@@ -411,27 +406,28 @@ func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
return err
})
- // Wait until raft knows that the learner needs a snapshot.
- store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
- testutils.SucceedsSoon(t, func() error {
- for _, p := range repl.RaftStatus().Progress {
- if p.State == tracker.StateSnapshot {
- return nil
- }
- }
- return errors.New(`expected some replica to need a snapshot`)
- })
-
// Note the value of the metrics before.
generatedBefore := getFirstStoreMetric(t, tc.Server(0), `range.snapshots.generated`)
raftAppliedBefore := getFirstStoreMetric(t, tc.Server(0), `range.snapshots.normal-applied`)
- // Run the raftsnapshot queue.
- trace, errMsg, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
- require.NoError(t, err)
- require.Equal(t, ``, errMsg)
- const msg = `not sending snapshot type RAFT to learner: (n2,s2):2LEARNER`
- require.Contains(t, tracing.FormatRecordedSpans(trace), msg)
+ // Run the raftsnapshot queue. SucceedsSoon because it may take a bit for
+ // raft to figure out that the replica needs a snapshot.
+ store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
+ testutils.SucceedsSoon(t, func() error {
+ trace, errMsg, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
+ if err != nil {
+ return err
+ }
+ if errMsg != `` {
+ return errors.New(errMsg)
+ }
+ const msg = `skipping snapshot; replica is likely a learner in the process of being added: (n2,s2):2LEARNER`
+ formattedTrace := tracing.FormatRecordedSpans(trace)
+ if !strings.Contains(formattedTrace, msg) {
+ return errors.Errorf(`expected "%s" in trace got:\n%s`, msg, formattedTrace)
+ }
+ return nil
+ })
// Make sure it didn't send any RAFT snapshots.
require.Equal(t, generatedBefore, getFirstStoreMetric(t, tc.Server(0), `range.snapshots.generated`))
@@ -476,7 +472,8 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) {
<-blockUntilSnapshotCh
// Removes the learner out from under the coordinator running on behalf of
- // AddReplicas.
+ // AddReplicas. This simulates the replicate queue running concurrently. The
+ // first thing the replicate queue would do is remove any learners it sees.
_, err := tc.RemoveReplicas(scratchStartKey, tc.Target(1))
require.NoError(t, err)
desc := tc.LookupRangeOrFatal(t, scratchStartKey)
@@ -495,6 +492,87 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) {
require.Len(t, desc.Replicas().Learners(), 0)
}
+// This test verifies the result of a race between the replicate queue running
+// for the same range from two different nodes. This can happen around
+// leadership changes.
+func TestLearnerReplicateQueueRace(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ var skipReceiveSnapshotKnobAtomic int64 = 1
+ blockUntilSnapshotCh := make(chan struct{}, 2)
+ blockSnapshotsCh := make(chan struct{})
+ knobs, ltk := makeLearnerTestKnobs()
+ ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error {
+ if atomic.LoadInt64(&skipReceiveSnapshotKnobAtomic) > 0 {
+ return nil
+ }
+ blockUntilSnapshotCh <- struct{}{}
+ <-blockSnapshotsCh
+ return nil
+ }
+ ctx := context.Background()
+ tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+ store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
+
+ // Start with 2 replicas so the replicate queue can go from 2->3, otherwise it
+ // will refuse to upreplicate to a fragile quorum of 1->2.
+ tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
+ atomic.StoreInt64(&skipReceiveSnapshotKnobAtomic, 0)
+
+ // Run the replicate queue, this will add a learner to node 3 and start
+ // sending it a snapshot. This will eventually fail and we assert some things
+ // about the trace to prove it failed in the way we want.
+ queue1ErrCh := make(chan error, 1)
+ go func() {
+ queue1ErrCh <- func() error {
+ trace, errMsg, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
+ if err != nil {
+ return err
+ }
+ if !strings.Contains(errMsg, `descriptor changed`) {
+ return errors.Errorf(`expected "descriptor changed" error got: %s`, errMsg)
+ }
+ formattedTrace := tracing.FormatRecordedSpans(trace)
+ expectedMessages := []string{
+ `could not promote n3,s3 to voter, rolling back: change replicas of r\d+ failed: descriptor changed`,
+ // TODO(dan): Consider skipping the rollback when trying to promote a
+ // learner to a voter results in a "descriptor changed" error.
+ `failed to rollback learner n3,s3, abandoning it for the replicate queue: change replicas of r\d+ failed: descriptor changed`,
+ }
+ return testutils.MatchInOrder(formattedTrace, expectedMessages...)
+ }()
+ }()
+
+ // Wait until the snapshot starts, which happens after the learner has been
+ // added.
+ <-blockUntilSnapshotCh
+
+ // Removes the learner on node 3 out from under the replicate queue. This
+ // simulates a second replicate queue running concurrently. The first thing
+ // this second replicate queue would do is remove any learners it sees,
+ // leaving the 2 voters.
+ desc, err := tc.RemoveReplicas(scratchStartKey, tc.Target(2))
+ require.NoError(t, err)
+ require.Len(t, desc.Replicas().Voters(), 2)
+ require.Len(t, desc.Replicas().Learners(), 0)
+
+ // Unblock the snapshot, and surprise the replicate queue. It should retry,
+ // get a descriptor changed error, and realize it should stop.
+ close(blockSnapshotsCh)
+ require.NoError(t, <-queue1ErrCh)
+ desc = tc.LookupRangeOrFatal(t, scratchStartKey)
+ require.Len(t, desc.Replicas().Voters(), 2)
+ require.Len(t, desc.Replicas().Learners(), 0)
+}
+
func TestLearnerNoAcceptLease(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go
index 60c6402d690d..d90720666c55 100644
--- a/pkg/storage/replica_raft.go
+++ b/pkg/storage/replica_raft.go
@@ -1215,6 +1215,12 @@ func (r *Replica) completeSnapshotLogTruncationConstraint(
r.mu.snapshotLogTruncationConstraints[snapUUID] = item
}
+func (r *Replica) getAndGCSnapshotLogTruncationConstraints(now time.Time) (minSnapIndex uint64) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ return r.getAndGCSnapshotLogTruncationConstraintsLocked(now)
+}
+
func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked(
now time.Time,
) (minSnapIndex uint64) {
diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go
index a4f60c15f62b..3550538f2245 100644
--- a/pkg/storage/replicate_queue.go
+++ b/pkg/storage/replicate_queue.go
@@ -250,17 +250,17 @@ func (rq *replicateQueue) process(
MaxRetries: 5,
}
- // Use a retry loop in order to backoff in the case of preemptive
- // snapshot errors, usually signaling that a rebalancing
- // reservation could not be made with the selected target.
+ // Use a retry loop in order to backoff in the case of snapshot errors,
+ // usually signaling that a rebalancing reservation could not be made with the
+ // selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
for {
requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLease, false /* dryRun */)
if IsSnapshotError(err) {
- // If ChangeReplicas failed because the preemptive snapshot failed, we
- // log the error but then return success indicating we should retry the
- // operation. The most likely causes of the preemptive snapshot failing are
- // a declined reservation or the remote node being unavailable. In either
+ // If ChangeReplicas failed because the snapshot failed, we log the
+ // error but then return success indicating we should retry the
+ // operation. The most likely causes of the snapshot failing are a
+ // declined reservation or the remote node being unavailable. In either
// case we don't want to wait another scanner cycle before reconsidering
// the range.
log.Info(ctx, err)
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index befd75eeed12..e986f86809a0 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -3354,12 +3354,11 @@ func (s *Store) processRaftRequestWithReplica(
return nil
}
-// processRaftSnapshotRequest processes the incoming snapshot Raft request on
-// the request's specified replica. This snapshot can be preemptive or not. If
-// not, the function makes sure to handle any updated Raft Ready state. It also
-// adds and later removes the (potentially) necessary placeholder to protect
-// against concurrent access to the keyspace encompassed by the snapshot but not
-// yet guarded by the replica.
+// processRaftSnapshotRequest processes the incoming non-preemptive snapshot
+// Raft request on the request's specified replica. The function makes sure to
+// handle any updated Raft Ready state. It also adds and later removes the
+// (potentially) necessary placeholder to protect against concurrent access to
+// the keyspace encompassed by the snapshot but not yet guarded by the replica.
func (s *Store) processRaftSnapshotRequest(
ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot,
) *roachpb.Error {
diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go
index 7aafe83985a4..06c862db384f 100644
--- a/pkg/storage/store_snapshot.go
+++ b/pkg/storage/store_snapshot.go
@@ -594,6 +594,17 @@ func (s *Store) receiveSnapshot(
}
}
+ // Defensive check that any non-preemptive snapshot contains this store in the
+ // descriptor.
+ if !header.IsPreemptive() {
+ storeID := s.StoreID()
+ if _, ok := header.State.Desc.GetReplicaDescriptor(storeID); !ok {
+ return crdberrors.AssertionFailedf(
+ `snapshot of type %s was sent to s%d which did not contain it as a replica: %s`,
+ header.Type, storeID, header.State.Desc.Replicas())
+ }
+ }
+
cleanup, rejectionMsg, err := s.reserveSnapshot(ctx, header)
if err != nil {
return err
diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go
index f5a5ba779de9..6f699bfc71fd 100644
--- a/pkg/testutils/testcluster/testcluster.go
+++ b/pkg/testutils/testcluster/testcluster.go
@@ -577,7 +577,7 @@ func (tc *TestCluster) WaitForSplitAndInitialization(startKey roachpb.Key) error
if err != nil {
return err
}
- if actualReplicaDesc != rDesc {
+ if !actualReplicaDesc.Equal(rDesc) {
return errors.Errorf("expected replica %s; got %s", rDesc, actualReplicaDesc)
}
}
diff --git a/vendor b/vendor
index 3a89d360b6ff..6c5b436d64e8 160000
--- a/vendor
+++ b/vendor
@@ -1 +1 @@
-Subproject commit 3a89d360b6ffed361211b1ca493a66e22bb3c134
+Subproject commit 6c5b436d64e8c0545f38088a0175c28f09b92bfb