diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 0dc056d1de40..554055327d61 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -38,6 +38,7 @@
kv.closed_timestamp.target_duration | duration | 30s | if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration |
kv.follower_read.target_multiple | float | 3 | if above 1, encourages the distsender to perform a read against the closest replica if a request is older than kv.closed_timestamp.target_duration * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty interval. This value also is used to create follower_timestamp(). (WARNING: may compromise cluster stability or correctness; do not edit without supervision) |
kv.import.batch_size | byte size | 32 MiB | the maximum size of the payload in an AddSSTable request (WARNING: may compromise cluster stability or correctness; do not edit without supervision) |
+kv.learner_replicas.enabled | boolean | false | use learner replicas for replica addition |
kv.raft.command.max_size | byte size | 64 MiB | maximum size of a raft command |
kv.raft_log.disable_synchronization_unsafe | boolean | false | set to true to disable synchronization on Raft log writes to persistent storage. Setting to true risks data loss or data corruption on server crashes. The setting is meant for internal testing only and SHOULD NOT be used in production. |
kv.range.backpressure_range_size_multiplier | float | 2 | multiple of range_max_bytes that a range is allowed to grow to without splitting before writes to that range are blocked, or 0 to disable |
@@ -120,6 +121,6 @@
trace.debug.enable | boolean | false | if set, traces for recent requests can be seen in the /debug page |
trace.lightstep.token | string |
| if set, traces go to Lightstep using this token |
trace.zipkin.collector | string |
| if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set |
-version | custom validation | 19.1-5 | set the active cluster version in the format '.' |
+version | custom validation | 19.1-6 | set the active cluster version in the format '.' |
diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go
index 6efe07cf340c..63b3242e905b 100644
--- a/pkg/roachpb/metadata.go
+++ b/pkg/roachpb/metadata.go
@@ -270,6 +270,20 @@ func (r ReplicationTarget) String() string {
return fmt.Sprintf("n%d,s%d", r.NodeID, r.StoreID)
}
+// ReplicaTypeLearner returns a ReplicaType_LEARNER pointer suitable for use in
+// a nullable proto field.
+func ReplicaTypeLearner() *ReplicaType {
+ t := ReplicaType_LEARNER
+ return &t
+}
+
+// ReplicaTypeVoter returns a ReplicaType_VOTER pointer suitable for use in a
+// nullable proto field.
+func ReplicaTypeVoter() *ReplicaType {
+ t := ReplicaType_VOTER
+ return &t
+}
+
func (r ReplicaDescriptor) String() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "(n%d,s%d):", r.NodeID, r.StoreID)
diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go
index af6c0730d6a1..bb143ccf32ba 100644
--- a/pkg/roachpb/metadata_replicas.go
+++ b/pkg/roachpb/metadata_replicas.go
@@ -10,7 +10,10 @@
package roachpb
-import "sort"
+import (
+ "fmt"
+ "sort"
+)
// ReplicaDescriptors is a set of replicas, usually the nodes/stores on which
// replicas of a range are stored.
@@ -54,6 +57,77 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor {
}
// Learners returns the learner replicas in the set.
+//
+// A learner is a participant in a raft group that accepts messages but doesn't
+// vote. This means it doesn't affect raft quorum and thus doesn't affect the
+// fragility of the range, even if it's very far behind or many learners are
+// down.
+//
+// At the time of writing, learners are used in CockroachDB as an interim state
+// while adding a replica. A learner replica is added to the range via raft
+// ConfChange, a raft snapshot (of type LEARNER) is sent to catch it up, and
+// then a second ConfChange promotes it to a full replica.
+//
+// This means that learners are currently always expected to have a short
+// lifetime, approximately the time it takes to send a snapshot. Ideas have been
+// kicked around to use learners with follower reads, which could be a cheap way
+// to allow many geographies to have local reads without affecting write
+// latencies. If implemented, these learners would have long lifetimes.
+//
+// For simplicity, CockroachDB treats learner replicas the same as voter
+// replicas as much as possible, but there are a few exceptions:
+//
+// - Learner replicas are not considered when calculating quorum size, and thus
+// do not affect the computation of which ranges are under-replicated for
+// upreplication/alerting/debug/etc purposes. Ditto for over-replicated.
+// - Learner replicas cannot become raft leaders, so we also don't allow them to
+// become leaseholders. As a result, DistSender and the various oracles don't
+// try to send them traffic.
+// - The raft snapshot queue does not send snapshots to learners for reasons
+// described below.
+// - Merges won't run while a learner replica is present.
+//
+// Replicas are now added in two ConfChange transactions. The first creates the
+// learner and the second promotes it to a voter. If the node that is
+// coordinating this dies in the middle, we're left with an orphaned learner.
+// For this reason, the replicate queue always first removes any learners it
+// sees before doing anything else. We could instead try to finish off the
+// learner snapshot and promotion, but this is more complicated and it's not yet
+// clear the efficiency win is worth it.
+//
+// This introduces some rare races between the replicate queue and
+// AdminChangeReplicas or if a range's lease is moved to a new owner while the
+// old leaseholder is still processing it in the replicate queue. These races
+// are handled by retrying if a learner disappears during the
+// snapshot/promotion.
+//
+// If the coordinator otherwise encounters an error while sending the learner
+// snapshot or promoting it (which can happen for a number of reasons, including
+// the node getting the learner going away), it tries to clean up after itself
+// by rolling back the addition of the learner.
+//
+// There is another race between the learner snapshot being sent and the raft
+// snapshot queue happening to check the replica at the same time, also sending
+// it a snapshot. This is safe but wasteful, so the raft snapshot queue won't
+// try to send snapshots to learners.
+//
+// Merges are blocked if either side has a learner (to avoid working out the
+// edge cases) but it's historically turned out to be a bad idea to get in the
+// way of splits, so we allow them even when some of the replicas are learners.
+// This orphans a learner on each side of the split (the original coordinator
+// will not be able to finish either of them), but the replication queue will
+// eventually clean them up.
+//
+// Learner replicas don't affect quorum but they do affect the system in other
+// ways. The most obvious way is that the leader sends them the raft traffic it
+// would send to any follower, consuming resources. More surprising is that once
+// the learner has received a snapshot, it's considered by the quota pool that
+// prevents the raft leader from getting too far ahead of the followers. This is
+// because a learner (especially one that already has a snapshot) is expected to
+// very soon be a voter, so we treat it like one. However, it means a slow
+// learner can slow down regular traffic, which is possibly counterintuitive.
+//
+// For some related mega-comments, see Replica.sendSnapshot.
func (d ReplicaDescriptors) Learners() []ReplicaDescriptor {
// Note that the wrapped replicas are sorted first by type.
for i := range d.wrapped {
@@ -64,7 +138,9 @@ func (d ReplicaDescriptors) Learners() []ReplicaDescriptor {
return nil
}
-var _, _ = ReplicaDescriptors.All, ReplicaDescriptors.Learners
+func (d ReplicaDescriptors) String() string {
+ return fmt.Sprintf("%s", d.AsProto())
+}
// AsProto returns the protobuf representation of these replicas, suitable for
// setting the InternalReplicas field of a RangeDescriptor. When possible the
diff --git a/pkg/settings/cluster/cockroach_versions.go b/pkg/settings/cluster/cockroach_versions.go
index ea10da88d3b4..afa26d965ae4 100644
--- a/pkg/settings/cluster/cockroach_versions.go
+++ b/pkg/settings/cluster/cockroach_versions.go
@@ -39,6 +39,7 @@ const (
VersionQueryTxnTimestamp
VersionStickyBit
VersionParallelCommits
+ VersionLearnerReplicas
VersionGenerationComparable
// Add new versions here (step one of two).
@@ -483,6 +484,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionGenerationComparable,
Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 5},
},
+ {
+ // VersionLearnerReplicas is https://github.com/cockroachdb/cockroach/pull/38149.
+ Key: VersionLearnerReplicas,
+ Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 6},
+ },
// Add new versions here (step two of two).
diff --git a/pkg/settings/cluster/versionkey_string.go b/pkg/settings/cluster/versionkey_string.go
index 77b58cb16f83..0d608d49be63 100644
--- a/pkg/settings/cluster/versionkey_string.go
+++ b/pkg/settings/cluster/versionkey_string.go
@@ -16,12 +16,13 @@ func _() {
_ = x[VersionQueryTxnTimestamp-5]
_ = x[VersionStickyBit-6]
_ = x[VersionParallelCommits-7]
- _ = x[VersionGenerationComparable-8]
+ _ = x[VersionLearnerReplicas-8]
+ _ = x[VersionGenerationComparable-9]
}
-const _VersionKey_name = "Version2_1VersionUnreplicatedRaftTruncatedStateVersionSideloadedStorageNoReplicaIDVersion19_1VersionStart19_2VersionQueryTxnTimestampVersionStickyBitVersionParallelCommitsVersionGenerationComparable"
+const _VersionKey_name = "Version2_1VersionUnreplicatedRaftTruncatedStateVersionSideloadedStorageNoReplicaIDVersion19_1VersionStart19_2VersionQueryTxnTimestampVersionStickyBitVersionParallelCommitsVersionLearnerReplicasVersionGenerationComparable"
-var _VersionKey_index = [...]uint8{0, 10, 47, 82, 93, 109, 133, 149, 171, 198}
+var _VersionKey_index = [...]uint8{0, 10, 47, 82, 93, 109, 133, 149, 171, 193, 220}
func (i VersionKey) String() string {
if i < 0 || i >= VersionKey(len(_VersionKey_index)-1) {
diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go
index 10dbfc700820..ce2b0d992d2a 100644
--- a/pkg/sql/ambiguous_commit_test.go
+++ b/pkg/sql/ambiguous_commit_test.go
@@ -158,7 +158,7 @@ func TestAmbiguousCommit(t *testing.T) {
tableStartKey.Store(keys.MakeTablePrefix(tableID))
// Wait for new table to split & replication.
- if err := tc.WaitForSplitAndReplication(tableStartKey.Load().([]byte)); err != nil {
+ if err := tc.WaitForSplitAndInitialization(tableStartKey.Load().([]byte)); err != nil {
t.Fatal(err)
}
diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go
index 21712ac8ba65..a17856f311f8 100644
--- a/pkg/sql/crdb_internal.go
+++ b/pkg/sql/crdb_internal.go
@@ -1715,6 +1715,7 @@ CREATE TABLE crdb_internal.ranges_no_leases (
table_name STRING NOT NULL,
index_name STRING NOT NULL,
replicas INT[] NOT NULL,
+ non_quorum_replicas INT[] NOT NULL,
split_enforced_until TIMESTAMP
)
`,
@@ -1768,17 +1769,25 @@ CREATE TABLE crdb_internal.ranges_no_leases (
return nil, err
}
- // TODO(dan): We're trying to treat learners as a far-behind replica as
- // much as possible, so just include them in the list of replicas. We can
- // add a separate column for them if we get feedback about it.
- var replicas []int
+ var voterReplicas, nonVoterReplicas []int
for _, rd := range desc.Replicas().All() {
- replicas = append(replicas, int(rd.StoreID))
+ if rd.GetType() == roachpb.ReplicaType_VOTER {
+ voterReplicas = append(voterReplicas, int(rd.StoreID))
+ } else {
+ nonVoterReplicas = append(nonVoterReplicas, int(rd.StoreID))
+ }
+ }
+ sort.Ints(voterReplicas)
+ sort.Ints(nonVoterReplicas)
+ votersArr := tree.NewDArray(types.Int)
+ for _, replica := range voterReplicas {
+ if err := votersArr.Append(tree.NewDInt(tree.DInt(replica))); err != nil {
+ return nil, err
+ }
}
- sort.Ints(replicas)
- arr := tree.NewDArray(types.Int)
- for _, replica := range replicas {
- if err := arr.Append(tree.NewDInt(tree.DInt(replica))); err != nil {
+ nonVotersArr := tree.NewDArray(types.Int)
+ for _, replica := range nonVoterReplicas {
+ if err := nonVotersArr.Append(tree.NewDInt(tree.DInt(replica))); err != nil {
return nil, err
}
}
@@ -1811,7 +1820,8 @@ CREATE TABLE crdb_internal.ranges_no_leases (
tree.NewDString(dbName),
tree.NewDString(tableName),
tree.NewDString(indexName),
- arr,
+ votersArr,
+ nonVotersArr,
splitEnforcedUntil,
}, nil
}, nil
diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go
index 39fe69142a18..9964ac90a049 100644
--- a/pkg/storage/allocator.go
+++ b/pkg/storage/allocator.go
@@ -47,6 +47,7 @@ const (
minReplicaWeight = 0.001
// priorities for various repair operations.
+ removeLearnerReplicaPriority float64 = 12001
addDeadReplacementPriority float64 = 12000
addMissingReplicaPriority float64 = 10000
addDecommissioningReplacementPriority float64 = 5000
@@ -98,6 +99,7 @@ const (
AllocatorAdd
AllocatorRemoveDead
AllocatorRemoveDecommissioning
+ AllocatorRemoveLearner
AllocatorConsiderRebalance
)
@@ -107,6 +109,7 @@ var allocatorActionNames = map[AllocatorAction]string{
AllocatorAdd: "add",
AllocatorRemoveDead: "remove dead",
AllocatorRemoveDecommissioning: "remove decommissioning",
+ AllocatorRemoveLearner: "remove learner",
AllocatorConsiderRebalance: "consider rebalance",
}
@@ -294,8 +297,49 @@ func (a *Allocator) ComputeAction(
// Do nothing if storePool is nil for some unittests.
return AllocatorNoop, 0
}
- // TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets.
+ // Seeing a learner replica at this point is unexpected because learners are a
+ // short-lived (ish) transient state in a learner+snapshot+voter cycle, which
+ // is always done atomically. Only two places could have added a learner: the
+ // replicate queue or AdminChangeReplicas request.
+ //
+ // The replicate queue only operates on leaseholders, which means that only
+ // one node at a time is operating on a given range except in rare cases (old
+ // leaseholder could start the operation, and a new leaseholder steps up and
+ // also starts an overlapping operation). Combined with the above atomicity,
+ // this means that if the replicate queue sees a learner, either the node that
+ // was adding it crashed somewhere in the learner+snapshot+voter cycle and
+ // we're the new leaseholder or we caught a race.
+ //
+ // In the first case, we could assume the node that was adding it knew what it
+ // was doing and finish the addition. Or we could leave it and do higher
+ // priority operations first if there are any. However, this comes with code
+ // complexity and concept complexity (computing old vs new quorum sizes
+ // becomes ambiguous, the learner isn't in the quorum but it likely will be
+ // soon, so do you count it?). Instead, we do the simplest thing and remove it
+ // before doing any other operations to the range. We'll revisit this decision
+ // if and when the complexity becomes necessary.
+ //
+ // If we get the race where AdminChangeReplicas is adding a replica and the
+ // queue happens to run during the snapshot, this will remove the learner and
+ // AdminChangeReplicas will notice either during the snapshot transfer or when
+ // it tries to promote the learner to a voter. AdminChangeReplicas should
+ // retry.
+ //
+ // On the other hand if we get the race where a leaseholder starts adding a
+ // replica in the replicate queue and during this loses its lease, it should
+ // probably not retry.
+ if learners := rangeInfo.Desc.Replicas().Learners(); len(learners) > 0 {
+ // TODO(dan): Since this goes before anything else, the priority here should
+ // be influenced by whatever operations would happen right after the learner
+ // is removed. In the meantime, we don't want to block something important
+ // from happening (like addDeadReplacementPriority) by queueing this at a
+ // low priority so until this TODO is done, keep
+ // removeLearnerReplicaPriority as the highest priority.
+ return AllocatorRemoveLearner, removeLearnerReplicaPriority
+ }
+
+ // TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets.
have := len(rangeInfo.Desc.Replicas().Unwrap())
decommissioningReplicas := a.storePool.decommissioningReplicas(
rangeInfo.Desc.RangeID, rangeInfo.Desc.Replicas().Unwrap())
diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go
index ccdc1089dac6..d851ab6fea5f 100644
--- a/pkg/storage/allocator_test.go
+++ b/pkg/storage/allocator_test.go
@@ -43,6 +43,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
+ "github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft"
)
@@ -4769,6 +4770,40 @@ func TestAllocatorComputeActionDecommission(t *testing.T) {
}
}
+func TestAllocatorRemoveLearner(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ zone := config.ZoneConfig{
+ NumReplicas: proto.Int32(3),
+ }
+ learnerType := roachpb.ReplicaType_LEARNER
+ rangeWithLearnerDesc := roachpb.RangeDescriptor{
+ InternalReplicas: []roachpb.ReplicaDescriptor{
+ {
+ StoreID: 1,
+ NodeID: 1,
+ ReplicaID: 1,
+ },
+ {
+ StoreID: 2,
+ NodeID: 2,
+ ReplicaID: 2,
+ Type: &learnerType,
+ },
+ },
+ }
+
+ // Removing a learner is prioritized over adding a new replica to an under
+ // replicated range.
+ stopper, _, sp, a, _ := createTestAllocator(10, false /* deterministic */)
+ ctx := context.Background()
+ defer stopper.Stop(ctx)
+ live, dead := []roachpb.StoreID{1, 2}, []roachpb.StoreID{3}
+ mockStorePool(sp, live, nil, dead, nil, nil)
+ action, _ := a.ComputeAction(ctx, &zone, RangeInfo{Desc: &rangeWithLearnerDesc})
+ require.Equal(t, AllocatorRemoveLearner, action)
+}
+
func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
diff --git a/pkg/storage/batcheval/cmd_lease.go b/pkg/storage/batcheval/cmd_lease.go
index 3d43186d8924..5bfaddaf4b1f 100644
--- a/pkg/storage/batcheval/cmd_lease.go
+++ b/pkg/storage/batcheval/cmd_lease.go
@@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
+ "github.com/cockroachdb/errors"
)
func declareKeysRequestLease(
@@ -41,6 +42,17 @@ func newFailedLeaseTrigger(isTransfer bool) result.Result {
return trigger
}
+func checkNotLearnerReplica(rec EvalContext) error {
+ repDesc, ok := rec.Desc().GetReplicaDescriptor(rec.StoreID())
+ if !ok {
+ return errors.AssertionFailedf(
+ `could not find replica for store %s in %s`, rec.StoreID(), rec.Desc())
+ } else if t := repDesc.GetType(); t == roachpb.ReplicaType_LEARNER {
+ return errors.Errorf(`cannot transfer lease to replica of type %s`, t)
+ }
+ return nil
+}
+
// evalNewLease checks that the lease contains a valid interval and that
// the new lease holder is still a member of the replica set, and then proceeds
// to write the new lease to the batch, emitting an appropriate trigger.
diff --git a/pkg/storage/batcheval/cmd_lease_request.go b/pkg/storage/batcheval/cmd_lease_request.go
index 539a5514efa2..f45caf1d3176 100644
--- a/pkg/storage/batcheval/cmd_lease_request.go
+++ b/pkg/storage/batcheval/cmd_lease_request.go
@@ -32,9 +32,25 @@ func init() {
func RequestLease(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
+ // When returning an error from this method, must always return a
+ // newFailedLeaseTrigger() to satisfy stats.
args := cArgs.Args.(*roachpb.RequestLeaseRequest)
- // When returning an error from this method, must always return
- // a newFailedLeaseTrigger() to satisfy stats.
+
+ // For now, don't allow replicas of type LEARNER to be leaseholders. There's
+ // no reason this wouldn't work in principle, but it seems inadvisable. In
+ // particular, learners can't become raft leaders, so we wouldn't be able to
+ // co-locate the leaseholder + raft leader, which is going to affect tail
+ // latencies. Additionally, as of the time of writing, learner replicas are
+ // only used for a short time in replica addition, so it's not worth working
+ // out the edge cases. If we decide to start using long-lived learners at some
+ // point, that math may change.
+ //
+ // If this check is removed at some point, the filtering of learners on the
+ // sending side would have to be removed as well.
+ if err := checkNotLearnerReplica(cArgs.EvalCtx); err != nil {
+ return newFailedLeaseTrigger(false /* isTransfer */), err
+ }
+
prevLease, _ := cArgs.EvalCtx.GetLease()
rErr := &roachpb.LeaseRejectedError{
diff --git a/pkg/storage/batcheval/cmd_lease_transfer.go b/pkg/storage/batcheval/cmd_lease_transfer.go
index 5ad036a40669..91012c9e08ab 100644
--- a/pkg/storage/batcheval/cmd_lease_transfer.go
+++ b/pkg/storage/batcheval/cmd_lease_transfer.go
@@ -31,10 +31,25 @@ func init() {
func TransferLease(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
- args := cArgs.Args.(*roachpb.TransferLeaseRequest)
-
// When returning an error from this method, must always return
// a newFailedLeaseTrigger() to satisfy stats.
+ args := cArgs.Args.(*roachpb.TransferLeaseRequest)
+
+ // For now, don't allow replicas of type LEARNER to be leaseholders. There's
+ // no reason this wouldn't work in principle, but it seems inadvisable. In
+ // particular, learners can't become raft leaders, so we wouldn't be able to
+ // co-locate the leaseholder + raft leader, which is going to affect tail
+ // latencies. Additionally, as of the time of writing, learner replicas are
+ // only used for a short time in replica addition, so it's not worth working
+ // out the edge cases. If we decide to start using long-lived learners at some
+ // point, that math may change.
+ //
+ // If this check is removed at some point, the filtering of learners on the
+ // sending side would have to be removed as well.
+ if err := checkNotLearnerReplica(cArgs.EvalCtx); err != nil {
+ return newFailedLeaseTrigger(true /* isTransfer */), err
+ }
+
prevLease, _ := cArgs.EvalCtx.GetLease()
if log.V(2) {
log.Infof(ctx, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, args.Lease)
diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go
index d00a5db7f1f8..06b13e60a080 100644
--- a/pkg/storage/client_replica_test.go
+++ b/pkg/storage/client_replica_test.go
@@ -1326,7 +1326,7 @@ func TestLeaseInfoRequest(t *testing.T) {
// right answer immediately, since the old holder has definitely applied the
// transfer before TransferRangeLease returned.
leaseHolderReplica := LeaseInfo(t, kvDB0, rangeDesc, roachpb.INCONSISTENT).Lease.Replica
- if leaseHolderReplica != replicas[1] {
+ if !leaseHolderReplica.Equal(replicas[1]) {
t.Fatalf("lease holder should be replica %+v, but is: %+v",
replicas[1], leaseHolderReplica)
}
@@ -1339,7 +1339,7 @@ func TestLeaseInfoRequest(t *testing.T) {
// unaware of the new lease and so the request might bounce around for a
// while (see #8816).
leaseHolderReplica = LeaseInfo(t, kvDB1, rangeDesc, roachpb.INCONSISTENT).Lease.Replica
- if leaseHolderReplica != replicas[1] {
+ if !leaseHolderReplica.Equal(replicas[1]) {
return errors.Errorf("lease holder should be replica %+v, but is: %+v",
replicas[1], leaseHolderReplica)
}
@@ -1378,7 +1378,7 @@ func TestLeaseInfoRequest(t *testing.T) {
resp := *(reply.(*roachpb.LeaseInfoResponse))
leaseHolderReplica = resp.Lease.Replica
- if leaseHolderReplica != replicas[2] {
+ if !leaseHolderReplica.Equal(replicas[2]) {
t.Fatalf("lease holder should be replica %s, but is: %s", replicas[2], leaseHolderReplica)
}
diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go
index effe1017f64f..51038776bc9f 100644
--- a/pkg/storage/intentresolver/intent_resolver.go
+++ b/pkg/storage/intentresolver/intent_resolver.go
@@ -607,7 +607,7 @@ func (ir *IntentResolver) CleanupTxnIntentsAsync(
intents := roachpb.AsIntents(et.Txn.IntentSpans, &et.Txn)
if err := ir.cleanupFinishedTxnIntents(ctx, rangeID, &et.Txn, intents, now, et.Poison, nil); err != nil {
if ir.every.ShouldLog() {
- log.Warningf(ctx, "failed to cleanup transaction intents: %+v", err)
+ log.Warningf(ctx, "failed to cleanup transaction intents: %v", err)
}
}
}); err != nil {
@@ -815,7 +815,7 @@ func (ir *IntentResolver) cleanupFinishedTxnIntents(
}
if err != nil {
if ir.every.ShouldLog() {
- log.Warningf(ctx, "failed to gc transaction record: %+v", err)
+ log.Warningf(ctx, "failed to gc transaction record: %v", err)
}
}
})
diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go
index 22b6db067f1c..01c093bd26f3 100644
--- a/pkg/storage/raft_snapshot_queue.go
+++ b/pkg/storage/raft_snapshot_queue.go
@@ -105,6 +105,15 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
if !ok {
return errors.Errorf("%s: replica %d not present in %v", repl, id, desc.Replicas())
}
+
+ // A learner replica is either getting a snapshot of type LEARNER by the node
+ // that's adding it or it's been orphaned and it's about to be cleaned up.
+ // Either way, no point in also sending it a snapshot of type RAFT.
+ if repDesc.GetType() == roachpb.ReplicaType_LEARNER {
+ log.Eventf(ctx, "not sending snapshot type RAFT to learner: %s", repDesc)
+ return nil
+ }
+
err := repl.sendSnapshot(ctx, repDesc, SnapshotRequest_RAFT, SnapshotRequest_RECOVERY)
// NB: if the snapshot fails because of an overlapping replica on the
diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go
index 9fa1eb8d5c39..455de8146b10 100644
--- a/pkg/storage/replica_command.go
+++ b/pkg/storage/replica_command.go
@@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
+ "github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
@@ -34,12 +35,21 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
+ "github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)
+// useLearnerReplicas specifies whether to use learner replicas for replica
+// addition or whether to fall back to the previous method of a preemptive
+// snapshot followed by going straight to a voter replica.
+var useLearnerReplicas = settings.RegisterBoolSetting(
+ "kv.learner_replicas.enabled",
+ "use learner replicas for replica addition",
+ false)
+
// AdminSplit divides the range into into two ranges using args.SplitKey.
func (r *Replica) AdminSplit(
ctx context.Context, args roachpb.AdminSplitRequest, reason string,
@@ -848,18 +858,194 @@ func (r *Replica) ChangeReplicas(
reason storagepb.RangeLogEventReason,
details string,
) (updatedDesc *roachpb.RangeDescriptor, _ error) {
- return r.changeReplicas(ctx, changeType, target, desc, SnapshotRequest_REBALANCE, reason, details)
+ if desc == nil {
+ return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r)
+ }
+
+ switch changeType {
+ case roachpb.ADD_REPLICA:
+ return r.addReplica(ctx, target, desc, SnapshotRequest_REBALANCE, reason, details)
+ case roachpb.REMOVE_REPLICA:
+ return r.removeReplica(ctx, target, desc, SnapshotRequest_REBALANCE, reason, details)
+ default:
+ return nil, errors.Errorf(`unknown change type: %s`, changeType)
+ }
}
-func (r *Replica) changeReplicas(
+func (r *Replica) addReplica(
+ ctx context.Context,
+ target roachpb.ReplicationTarget,
+ desc *roachpb.RangeDescriptor,
+ priority SnapshotRequest_Priority,
+ reason storagepb.RangeLogEventReason,
+ details string,
+) (*roachpb.RangeDescriptor, error) {
+ for _, rDesc := range desc.Replicas().All() {
+ if rDesc.NodeID == target.NodeID {
+ // Two replicas from the same range are not allowed on the same node, even
+ // in different stores.
+ if rDesc.StoreID != target.StoreID {
+ return nil, errors.Errorf("%s: unable to add replica %v; node already has a replica", r, target)
+ }
+
+ // Looks like we found a replica with the same store and node id. If the
+ // replica is already a learner, then either some previous leaseholder was
+ // trying to add it with the learner+snapshot+voter cycle and got
+ // interrupted or else we hit a race between the replicate queue and
+ // AdminChangeReplicas.
+ if rDesc.GetType() == roachpb.ReplicaType_LEARNER {
+ return nil, errors.Errorf(
+ "%s: unable to add replica %v which is already present as a learner", r, target)
+ }
+
+ // Otherwise, we already had a full voter replica. Can't add another to
+ // this store.
+ return nil, errors.Errorf("%s: unable to add replica %v which is already present", r, target)
+ }
+ }
+
+ settings := r.ClusterSettings()
+ useLearners := useLearnerReplicas.Get(&settings.SV)
+ useLearners = useLearners && settings.Version.IsActive(cluster.VersionLearnerReplicas)
+ if !useLearners {
+ return r.addReplicaLegacyPreemptiveSnapshot(ctx, target, desc, priority, reason, details)
+ }
+
+ // First add the replica as a raft learner. This means it accepts raft traffic
+ // (so it can catch up) but doesn't vote (so it doesn't affect quorum and thus
+ // doesn't introduce fragility into the system). For details see
+ _ = roachpb.ReplicaDescriptors.Learners
+ learnerDesc, err := addLearnerReplica(ctx, r.store, desc, target, reason, details)
+ if err != nil {
+ return nil, err
+ }
+
+ // Now move it to be a full voter (waiting on it to get a raft snapshot first,
+ // so it's not immediately way behind).
+ voterDesc, err := r.promoteLearnerReplicaToVoter(ctx, learnerDesc, target, priority, reason, details)
+ if err != nil {
+ // Don't leave a learner replica lying around if we didn't succeed in
+ // promoting it to a voter.
+ r.rollbackLearnerReplica(ctx, learnerDesc, target, reason, details)
+ return nil, err
+ }
+ return voterDesc, nil
+}
+
+func addLearnerReplica(
+ ctx context.Context,
+ store *Store,
+ desc *roachpb.RangeDescriptor,
+ target roachpb.ReplicationTarget,
+ reason storagepb.RangeLogEventReason,
+ details string,
+) (*roachpb.RangeDescriptor, error) {
+ newDesc := *desc
+ newDesc.SetReplicas(desc.Replicas().DeepCopy())
+ replDesc := roachpb.ReplicaDescriptor{
+ NodeID: target.NodeID,
+ StoreID: target.StoreID,
+ ReplicaID: desc.NextReplicaID,
+ Type: roachpb.ReplicaTypeLearner(),
+ }
+ newDesc.NextReplicaID++
+ newDesc.AddReplica(replDesc)
+ err := execChangeReplicasTxn(
+ ctx, store, roachpb.ADD_REPLICA, desc, replDesc, &newDesc, reason, details,
+ )
+ return &newDesc, err
+}
+
+func (r *Replica) promoteLearnerReplicaToVoter(
+ ctx context.Context,
+ desc *roachpb.RangeDescriptor,
+ target roachpb.ReplicationTarget,
+ priority SnapshotRequest_Priority,
+ reason storagepb.RangeLogEventReason,
+ details string,
+) (*roachpb.RangeDescriptor, error) {
+ // TODO(dan): We allow ranges with learner replicas to split, so in theory
+ // this may want to detect that and retry, sending a snapshot and promoting
+ // both sides.
+
+ newReplicas := desc.Replicas().DeepCopy().All()
+ for i, rDesc := range newReplicas {
+ if rDesc.NodeID != target.NodeID || rDesc.StoreID != target.StoreID {
+ continue
+ }
+ if rDesc.GetType() != roachpb.ReplicaType_LEARNER {
+ return nil, errors.Errorf(`%s: cannot promote replica of type %s`, r, rDesc.Type)
+ }
+ rDesc.Type = roachpb.ReplicaTypeVoter()
+ newReplicas[i] = rDesc
+
+ // Note that raft snapshot queue refuses to send snapshots, so this is the
+ // only one a learner can get.
+ if err := r.sendSnapshot(ctx, rDesc, SnapshotRequest_LEARNER, priority); err != nil {
+ return nil, err
+ }
+
+ if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil {
+ if fn() {
+ return desc, nil
+ }
+ }
+
+ updatedDesc := *desc
+ updatedDesc.SetReplicas(roachpb.MakeReplicaDescriptors(newReplicas))
+ err := execChangeReplicasTxn(ctx, r.store, roachpb.ADD_REPLICA, desc, rDesc, &updatedDesc, reason, details)
+ return &updatedDesc, err
+ }
+ return nil, errors.Errorf(`%s: could not find replica to promote %s`, r, target)
+}
+
+func (r *Replica) rollbackLearnerReplica(
+ ctx context.Context,
+ desc *roachpb.RangeDescriptor,
+ target roachpb.ReplicationTarget,
+ reason storagepb.RangeLogEventReason,
+ details string,
+) {
+ newDesc := *desc
+ newDesc.SetReplicas(desc.Replicas().DeepCopy())
+ replDesc, ok := newDesc.RemoveReplica(target.NodeID, target.StoreID)
+ if !ok {
+ // This is a programming error if it happens. Why are we rolling back
+ // something that's not present?
+ log.Warningf(ctx, "failed to rollback learner %s, missing from descriptor %s", target, desc)
+ return
+ }
+
+ // If (for example) the promotion failed because of a context deadline
+ // exceeded, we do still want to clean up after ourselves, so always use a new
+ // context (but with the old tags and with some timeout to save this from
+ // blocking the caller indefinitely).
+ const rollbackTimeout = 10 * time.Second
+ rollbackFn := func(ctx context.Context) error {
+ return execChangeReplicasTxn(
+ ctx, r.store, roachpb.REMOVE_REPLICA, desc, replDesc, &newDesc, reason, details,
+ )
+ }
+ rollbackCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx))
+ if err := contextutil.RunWithTimeout(
+ rollbackCtx, "learner rollback", rollbackTimeout, rollbackFn,
+ ); err != nil {
+ log.Infof(ctx,
+ "failed to rollback learner %s, abandoning it for the replicate queue %v", target, err)
+ r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now())
+ } else {
+ log.Infof(ctx, "rolled back learner %s to %s", replDesc, newDesc)
+ }
+}
+
+func (r *Replica) addReplicaLegacyPreemptiveSnapshot(
ctx context.Context,
- changeType roachpb.ReplicaChangeType,
target roachpb.ReplicationTarget,
desc *roachpb.RangeDescriptor,
priority SnapshotRequest_Priority,
reason storagepb.RangeLogEventReason,
details string,
-) (_ *roachpb.RangeDescriptor, _ error) {
+) (*roachpb.RangeDescriptor, error) {
if desc == nil {
return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r)
}
@@ -881,70 +1067,93 @@ func (r *Replica) changeReplicas(
}
}
- generationComparableEnabled := r.store.ClusterSettings().Version.IsActive(cluster.VersionGenerationComparable)
- rangeID := desc.RangeID
updatedDesc := *desc
+ generationComparableEnabled := r.store.ClusterSettings().Version.IsActive(cluster.VersionGenerationComparable)
if generationComparableEnabled {
updatedDesc.IncrementGeneration()
updatedDesc.GenerationComparable = proto.Bool(true)
}
updatedDesc.SetReplicas(desc.Replicas().DeepCopy())
- switch changeType {
- case roachpb.ADD_REPLICA:
- // If the replica exists on the remote node, no matter in which store,
- // abort the replica add.
- if nodeUsed {
- if repDescIdx != -1 {
- return nil, errors.Errorf("%s: unable to add replica %v which is already present", r, repDesc)
- }
- return nil, errors.Errorf("%s: unable to add replica %v; node already has a replica", r, repDesc)
+ // If the replica exists on the remote node, no matter in which store,
+ // abort the replica add.
+ if nodeUsed {
+ if repDescIdx != -1 {
+ return nil, errors.Errorf("%s: unable to add replica %v which is already present", r, repDesc)
}
+ return nil, errors.Errorf("%s: unable to add replica %v; node already has a replica", r, repDesc)
+ }
- // Send a pre-emptive snapshot. Note that the replica to which this
- // snapshot is addressed has not yet had its replica ID initialized; this
- // is intentional, and serves to avoid the following race with the replica
- // GC queue:
- //
- // - snapshot received, a replica is lazily created with the "real" replica ID
- // - the replica is eligible for GC because it is not yet a member of the range
- // - GC queue runs, creating a raft tombstone with the replica's ID
- // - the replica is added to the range
- // - lazy creation of the replica fails due to the raft tombstone
- //
- // Instead, the replica GC queue will create a tombstone with replica ID
- // zero, which is never legitimately used, and thus never interferes with
- // raft operations. Racing with the replica GC queue can still partially
- // negate the benefits of pre-emptive snapshots, but that is a recoverable
- // degradation, not a catastrophic failure.
- //
- // NB: A closure is used here so that we can release the snapshot as soon
- // as it has been applied on the remote and before the ChangeReplica
- // operation is processed. This is important to allow other ranges to make
- // progress which might be required for this ChangeReplicas operation to
- // complete. See #10409.
- if err := r.sendSnapshot(ctx, repDesc, SnapshotRequest_PREEMPTIVE, priority); err != nil {
- return nil, err
- }
+ // Send a pre-emptive snapshot. Note that the replica to which this
+ // snapshot is addressed has not yet had its replica ID initialized; this
+ // is intentional, and serves to avoid the following race with the replica
+ // GC queue:
+ //
+ // - snapshot received, a replica is lazily created with the "real" replica ID
+ // - the replica is eligible for GC because it is not yet a member of the range
+ // - GC queue runs, creating a raft tombstone with the replica's ID
+ // - the replica is added to the range
+ // - lazy creation of the replica fails due to the raft tombstone
+ //
+ // Instead, the replica GC queue will create a tombstone with replica ID
+ // zero, which is never legitimately used, and thus never interferes with
+ // raft operations. Racing with the replica GC queue can still partially
+ // negate the benefits of pre-emptive snapshots, but that is a recoverable
+ // degradation, not a catastrophic failure.
+ //
+ // NB: A closure is used here so that we can release the snapshot as soon
+ // as it has been applied on the remote and before the ChangeReplica
+ // operation is processed. This is important to allow other ranges to make
+ // progress which might be required for this ChangeReplicas operation to
+ // complete. See #10409.
+ if err := r.sendSnapshot(ctx, repDesc, SnapshotRequest_PREEMPTIVE, priority); err != nil {
+ return nil, err
+ }
- repDesc.ReplicaID = updatedDesc.NextReplicaID
- updatedDesc.NextReplicaID++
- updatedDesc.AddReplica(repDesc)
+ repDesc.ReplicaID = updatedDesc.NextReplicaID
+ updatedDesc.NextReplicaID++
+ updatedDesc.AddReplica(repDesc)
- case roachpb.REMOVE_REPLICA:
- // If that exact node-store combination does not have the replica,
- // abort the removal.
- if repDescIdx == -1 {
- return nil, errors.Errorf("%s: unable to remove replica %v which is not present", r, repDesc)
- }
- if _, ok := updatedDesc.RemoveReplica(repDesc.NodeID, repDesc.StoreID); !ok {
- return nil, errors.Errorf("%s: unable to remove replica %v which is not present", r, repDesc)
- }
+ err := execChangeReplicasTxn(ctx, r.store, roachpb.ADD_REPLICA, desc, repDesc, &updatedDesc, reason, details)
+ return &updatedDesc, err
+}
+
+func (r *Replica) removeReplica(
+ ctx context.Context,
+ target roachpb.ReplicationTarget,
+ desc *roachpb.RangeDescriptor,
+ priority SnapshotRequest_Priority,
+ reason storagepb.RangeLogEventReason,
+ details string,
+) (*roachpb.RangeDescriptor, error) {
+ if desc == nil {
+ return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r)
}
+ updatedDesc := *desc
+ updatedDesc.SetReplicas(desc.Replicas().DeepCopy())
+ // If that exact node-store combination does not have the replica,
+ // abort the removal.
+ removed, ok := updatedDesc.RemoveReplica(target.NodeID, target.StoreID)
+ if !ok {
+ return nil, errors.Errorf("%s: unable to remove replica %v which is not present", r, target)
+ }
+ err := execChangeReplicasTxn(ctx, r.store, roachpb.REMOVE_REPLICA, desc, removed, &updatedDesc, reason, details)
+ return &updatedDesc, err
+}
+func execChangeReplicasTxn(
+ ctx context.Context,
+ store *Store,
+ changeType roachpb.ReplicaChangeType,
+ desc *roachpb.RangeDescriptor,
+ repDesc roachpb.ReplicaDescriptor,
+ updatedDesc *roachpb.RangeDescriptor,
+ reason storagepb.RangeLogEventReason,
+ details string,
+) error {
descKey := keys.RangeDescriptorKey(desc.StartKey)
- if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
+ if err := store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
log.Event(ctx, "attempting txn")
txn.SetDebugName(replicaChangeTxnName)
dbDescValue, err := conditionalGetDescValueFromDB(ctx, txn, desc)
@@ -958,7 +1167,7 @@ func (r *Replica) changeReplicas(
// Important: the range descriptor must be the first thing touched in the transaction
// so the transaction record is co-located with the range being modified.
- if err := updateRangeDescriptor(b, descKey, dbDescValue, &updatedDesc); err != nil {
+ if err := updateRangeDescriptor(b, descKey, dbDescValue, updatedDesc); err != nil {
return err
}
@@ -969,8 +1178,8 @@ func (r *Replica) changeReplicas(
}
// Log replica change into range event log.
- if err := r.store.logChange(
- ctx, txn, changeType, repDesc, updatedDesc, reason, details,
+ if err := store.logChange(
+ ctx, txn, changeType, repDesc, *updatedDesc, reason, details,
); err != nil {
return err
}
@@ -980,7 +1189,7 @@ func (r *Replica) changeReplicas(
b := txn.NewBatch()
// Update range descriptor addressing record(s).
- if err := updateRangeAddressing(b, &updatedDesc); err != nil {
+ if err := updateRangeAddressing(b, updatedDesc); err != nil {
return err
}
@@ -990,7 +1199,7 @@ func (r *Replica) changeReplicas(
ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{
ChangeType: changeType,
Replica: repDesc,
- Desc: &updatedDesc,
+ Desc: updatedDesc,
},
},
})
@@ -1005,10 +1214,10 @@ func (r *Replica) changeReplicas(
if msg, ok := maybeDescriptorChangedError(desc, err); ok {
err = &benignError{errors.New(msg)}
}
- return nil, errors.Wrapf(err, "change replicas of r%d failed", rangeID)
+ return errors.Wrapf(err, "change replicas of r%d failed", desc.RangeID)
}
log.Event(ctx, "txn complete")
- return &updatedDesc, nil
+ return nil
}
// sendSnapshot sends a snapshot of the replica state to the specified replica.
diff --git a/pkg/storage/replica_learners_test.go b/pkg/storage/replica_learners_test.go
new file mode 100644
index 000000000000..3147ed0ae821
--- /dev/null
+++ b/pkg/storage/replica_learners_test.go
@@ -0,0 +1,517 @@
+// Copyright 2019 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package storage_test
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/cockroach/pkg/testutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
+ "github.com/cockroachdb/cockroach/pkg/util"
+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/metric"
+ "github.com/cockroachdb/cockroach/pkg/util/tracing"
+ "github.com/cockroachdb/errors"
+ "github.com/stretchr/testify/require"
+ "go.etcd.io/etcd/raft"
+)
+
+// TODO(dan): Test learners and quota pool.
+// TODO(dan): Grep the codebase for "preemptive" and audit.
+// TODO(dan): Write a test like TestLearnerAdminChangeReplicasRace for the
+// replicate queue leadership transfer race.
+
+type learnerTestKnobs struct {
+ storeKnobs storage.StoreTestingKnobs
+ replicaAddStopAfterLearnerAtomic int64
+}
+
+func makeLearnerTestKnobs() (base.TestingKnobs, *learnerTestKnobs) {
+ var k learnerTestKnobs
+ k.storeKnobs.ReplicaAddStopAfterLearnerSnapshot = func() bool {
+ return atomic.LoadInt64(&k.replicaAddStopAfterLearnerAtomic) > 0
+ }
+ return base.TestingKnobs{Store: &k.storeKnobs}, &k
+}
+
+func getFirstStoreReplica(
+ t *testing.T, s serverutils.TestServerInterface, key roachpb.Key,
+) (*storage.Store, *storage.Replica) {
+ t.Helper()
+ store, err := s.GetStores().(*storage.Stores).GetStore(s.GetFirstStoreID())
+ require.NoError(t, err)
+ var repl *storage.Replica
+ testutils.SucceedsSoon(t, func() error {
+ repl = store.LookupReplica(roachpb.RKey(key))
+ if repl == nil {
+ return errors.New(`could not find replica`)
+ }
+ return nil
+ })
+ return store, repl
+}
+
+func getFirstStoreMetric(t *testing.T, s serverutils.TestServerInterface, name string) int64 {
+ t.Helper()
+ store, err := s.GetStores().(*storage.Stores).GetStore(s.GetFirstStoreID())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var c int64
+ var found bool
+ store.Registry().Each(func(n string, v interface{}) {
+ if name == n {
+ switch t := v.(type) {
+ case *metric.Counter:
+ c = t.Count()
+ found = true
+ case *metric.Gauge:
+ c = t.Value()
+ found = true
+ }
+ }
+ })
+ if !found {
+ panic(fmt.Sprintf("couldn't find metric %s", name))
+ }
+ return c
+}
+
+func TestAddReplicaViaLearner(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ // The happy case! \o/
+
+ blockUntilSnapshotCh := make(chan struct{})
+ blockSnapshotsCh := make(chan struct{})
+ knobs, ltk := makeLearnerTestKnobs()
+ ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error {
+ close(blockUntilSnapshotCh)
+ select {
+ case <-blockSnapshotsCh:
+ case <-time.After(10 * time.Second):
+ return errors.New(`test timed out`)
+ }
+ return nil
+ }
+ ctx := context.Background()
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+
+ g := ctxgroup.WithContext(ctx)
+ g.GoCtx(func(ctx context.Context) error {
+ _, err := tc.AddReplicas(scratchStartKey, tc.Target(1))
+ return err
+ })
+
+ // Wait until the snapshot starts, which happens after the learner has been
+ // added.
+ <-blockUntilSnapshotCh
+ desc := tc.LookupRangeOrFatal(t, scratchStartKey)
+ require.Len(t, desc.Replicas().Voters(), 1)
+ require.Len(t, desc.Replicas().Learners(), 1)
+
+ // Verify that raft thinks it's a learner, too.
+ _, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
+ testutils.SucceedsSoon(t, func() error {
+ for _, p := range repl.RaftStatus().Progress {
+ if p.IsLearner {
+ return nil
+ }
+ }
+ return errors.New(`expected some replica to be a learner`)
+ })
+
+ var voters, nonVoters string
+ db.QueryRow(t,
+ `SELECT array_to_string(replicas, ','), array_to_string(non_quorum_replicas, ',') FROM crdb_internal.ranges_no_leases WHERE range_id = $1`,
+ desc.RangeID,
+ ).Scan(&voters, &nonVoters)
+ require.Equal(t, `1`, voters)
+ require.Equal(t, `2`, nonVoters)
+
+ // Unblock the snapshot and let the learner get promoted to a voter.
+ close(blockSnapshotsCh)
+ require.NoError(t, g.Wait())
+
+ desc = tc.LookupRangeOrFatal(t, scratchStartKey)
+ require.Len(t, desc.Replicas().Voters(), 2)
+ require.Len(t, desc.Replicas().Learners(), 0)
+ require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(1), `range.snapshots.learner-applied`))
+}
+
+func TestLearnerSnapshotFailsRollback(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ var rejectSnapshots int64
+ knobs, ltk := makeLearnerTestKnobs()
+ ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error {
+ if atomic.LoadInt64(&rejectSnapshots) > 0 {
+ return errors.New(`nope`)
+ }
+ return nil
+ }
+ ctx := context.Background()
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+ atomic.StoreInt64(&rejectSnapshots, 1)
+ _, err := tc.AddReplicas(scratchStartKey, tc.Target(1))
+ // TODO(dan): It'd be nice if we could cancel the `AddReplicas` context before
+ // returning the error from the `ReceiveSnapshot` knob to test the codepath
+ // that uses a new context for the rollback, but plumbing that context is
+ // annoying.
+ if !testutils.IsError(err, `remote couldn't accept LEARNER snapshot`) {
+ t.Fatalf(`expected "remote couldn't accept LEARNER snapshot" error got: %+v`, err)
+ }
+
+ // Make sure we cleaned up after ourselves (by removing the learner).
+ desc := tc.LookupRangeOrFatal(t, scratchStartKey)
+ require.Empty(t, desc.Replicas().Learners())
+}
+
+func TestMergeWithLearner(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ ctx := context.Background()
+
+ knobs, ltk := makeLearnerTestKnobs()
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+ _, _, err := tc.SplitRange(scratchStartKey.Next())
+ require.NoError(t, err)
+
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1)
+ _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0)
+
+ _, err = tc.Server(0).MergeRanges(scratchStartKey)
+ if !testutils.IsError(err, `ranges not collocated`) {
+ t.Fatalf(`expected "ranges not collocated" error got: %+v`, err)
+ }
+ // WIP what happens now though?
+}
+
+func TestSplitWithLearner(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ ctx := context.Background()
+
+ knobs, ltk := makeLearnerTestKnobs()
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1)
+ _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0)
+
+ // Splitting a learner is allowed. This orphans the two learners, but the
+ // replication queue will eventually clean this up.
+ left, right, err := tc.SplitRange(scratchStartKey.Next())
+ require.NoError(t, err)
+ require.Len(t, left.Replicas().Learners(), 1)
+ require.Len(t, right.Replicas().Learners(), 1)
+}
+
+func TestReplicateQueueSeesLearner(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ // NB also see TestAllocatorRemoveLearner for a lower-level test.
+
+ ctx := context.Background()
+ knobs, ltk := makeLearnerTestKnobs()
+ tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1)
+ _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0)
+
+ // Run the replicate queue.
+ store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
+ require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
+ _, errMsg, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */)
+ require.NoError(t, err)
+ require.Equal(t, ``, errMsg)
+ require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`))
+
+ // Make sure it deleted the learner.
+ desc := tc.LookupRangeOrFatal(t, scratchStartKey)
+ require.Empty(t, desc.Replicas().Learners())
+
+ // Bonus points: the replicate queue keeps processing until there is nothing
+ // to do, so it should have upreplicated the range to 3.
+ require.Len(t, desc.Replicas().Voters(), 3)
+}
+
+func TestGCQueueSeesLearner(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ ctx := context.Background()
+ knobs, ltk := makeLearnerTestKnobs()
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1)
+ _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0)
+
+ // Run the replicaGC queue.
+ store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey)
+ trace, errMsg, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */)
+ require.NoError(t, err)
+ require.Equal(t, ``, errMsg)
+ const msg = `not gc'able, replica is still in range descriptor: (n2,s2):2LEARNER`
+ require.Contains(t, tracing.FormatRecordedSpans(trace), msg)
+
+ // Make sure it didn't collect the learner.
+ desc := tc.LookupRangeOrFatal(t, scratchStartKey)
+ require.NotEmpty(t, desc.Replicas().Learners())
+}
+
+func TestRaftSnapshotQueueSeesLearner(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ ctx := context.Background()
+ blockSnapshotsCh := make(chan struct{})
+ knobs, ltk := makeLearnerTestKnobs()
+ ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error {
+ select {
+ case <-blockSnapshotsCh:
+ case <-time.After(10 * time.Second):
+ return errors.New(`test timed out`)
+ }
+ return nil
+ }
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ // Create a learner replica.
+ scratchStartKey := tc.ScratchRange(t)
+ g := ctxgroup.WithContext(ctx)
+ g.GoCtx(func(ctx context.Context) error {
+ _, err := tc.AddReplicas(scratchStartKey, tc.Target(1))
+ return err
+ })
+
+ // Wait until raft knows that the learner needs a snapshot.
+ store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
+ testutils.SucceedsSoon(t, func() error {
+ for _, p := range repl.RaftStatus().Progress {
+ if p.State == raft.ProgressStateSnapshot {
+ return nil
+ }
+ }
+ return errors.New(`expected some replica to need a snapshot`)
+ })
+
+ // Check the metrics are what we expect before.
+ generatedBefore := getFirstStoreMetric(t, tc.Server(0), `range.snapshots.generated`)
+ raftAppliedBefore := getFirstStoreMetric(t, tc.Server(0), `range.snapshots.normal-applied`)
+
+ // Run the raftsnapshot queue.
+ trace, errMsg, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */)
+ require.NoError(t, err)
+ require.Equal(t, ``, errMsg)
+ const msg = `not sending snapshot type RAFT to learner: (n2,s2):2LEARNER`
+ require.Contains(t, tracing.FormatRecordedSpans(trace), msg)
+
+ // Make sure it didn't send any RAFT snapshots.
+ require.Equal(t, generatedBefore, getFirstStoreMetric(t, tc.Server(0), `range.snapshots.generated`))
+ require.Equal(t, raftAppliedBefore, getFirstStoreMetric(t, tc.Server(0), `range.snapshots.normal-applied`))
+
+ close(blockSnapshotsCh)
+ require.NoError(t, g.Wait())
+}
+
+// This test verifies the result of a race between the replicate queue running
+// while an AdminChangeReplicas is adding a replica.
+func TestLearnerAdminChangeReplicasRace(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ blockUntilSnapshotCh := make(chan struct{}, 2)
+ blockSnapshotsCh := make(chan struct{})
+ knobs, ltk := makeLearnerTestKnobs()
+ ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error {
+ blockUntilSnapshotCh <- struct{}{}
+ <-blockSnapshotsCh
+ return nil
+ }
+ ctx := context.Background()
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+
+ g := ctxgroup.WithContext(ctx)
+ g.GoCtx(func(ctx context.Context) error {
+ _, err := tc.AddReplicas(scratchStartKey, tc.Target(1))
+ return err
+ })
+
+ // Wait until the snapshot starts, which happens after the learner has been
+ // added.
+ <-blockUntilSnapshotCh
+
+ // Removes the learner out from under the coordinator running on behalf of
+ // AddReplicas.
+ _, err := tc.RemoveReplicas(scratchStartKey, tc.Target(1))
+ require.NoError(t, err)
+ desc := tc.LookupRangeOrFatal(t, scratchStartKey)
+ require.Len(t, desc.Replicas().Voters(), 1)
+ require.Len(t, desc.Replicas().Learners(), 0)
+
+ // Unblock the snapshot, and surprise AddReplicas. It should retry and error
+ // that the descriptor has changed since the AddReplicas command started.
+ close(blockSnapshotsCh)
+ if err := g.Wait(); !testutils.IsError(err, `descriptor changed`) {
+ t.Fatalf(`expected "descriptor changed" error got: %+v`, err)
+ }
+ desc = tc.LookupRangeOrFatal(t, scratchStartKey)
+ require.Len(t, desc.Replicas().Voters(), 1)
+ require.Len(t, desc.Replicas().Learners(), 0)
+}
+
+func TestLearnerNoAcceptLease(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ ctx := context.Background()
+ knobs, ltk := makeLearnerTestKnobs()
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1)
+ _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0)
+
+ desc := tc.LookupRangeOrFatal(t, scratchStartKey)
+ err := tc.TransferRangeLease(desc, tc.Target(1))
+ if !testutils.IsError(err, `cannot transfer lease to replica of type LEARNER`) {
+ t.Fatalf(`expected "cannot transfer lease to replica of type LEARNER" error got: %+v`, err)
+ }
+}
+
+func TestLearnerFollowerRead(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ t.Skip(`WIP`)
+
+ if util.RaceEnabled {
+ // Limiting how long transactions can run does not work well with race
+ // unless we're extremely lenient, which drives up the test duration.
+ t.Skip("skipping under race")
+ }
+
+ ctx := context.Background()
+ knobs, ltk := makeLearnerTestKnobs()
+ tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
+ ServerArgs: base.TestServerArgs{Knobs: knobs},
+ ReplicationMode: base.ReplicationManual,
+ })
+ defer tc.Stopper().Stop(ctx)
+ db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
+ db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = $1`, testingTargetDuration)
+ db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1`, closeFraction)
+ db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true`)
+
+ scratchStartKey := tc.ScratchRange(t)
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1)
+ scratchDesc := tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
+ atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0)
+
+ val := roachpb.MakeValueFromString("foo")
+ require.NoError(t, tc.Server(0).DB().Put(ctx, scratchStartKey, &val))
+
+ req := roachpb.BatchRequest{Header: roachpb.Header{
+ RangeID: scratchDesc.RangeID,
+ Timestamp: tc.Server(0).Clock().Now(),
+ }}
+ req.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{
+ Key: scratchDesc.StartKey.AsRawKey(), EndKey: scratchDesc.EndKey.AsRawKey(),
+ }})
+
+ _, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey)
+ testutils.SucceedsSoon(t, func() error {
+ ctx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "manual read request")
+ defer cancel()
+ _, pErr := repl.Send(ctx, req)
+ err := pErr.GoError()
+ if !testutils.IsError(err, `not lease holder`) {
+ return errors.Errorf(`expected "not lease holder" error got: %+v`, err)
+ }
+ const msg = `WIP not getting closed timestamps for some reason`
+ formattedTrace := tracing.FormatRecordedSpans(collect())
+ if !strings.Contains(formattedTrace, msg) {
+ return errors.Errorf("expected a trace with `%s` got:\n%s", msg, formattedTrace)
+ }
+ return nil
+ })
+}
diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go
index 94595aff74b8..22ccc1e421a9 100644
--- a/pkg/storage/replica_proposal_buf.go
+++ b/pkg/storage/replica_proposal_buf.go
@@ -445,11 +445,18 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error {
continue
}
- if err := raftGroup.ProposeConfChange(raftpb.ConfChange{
+ confChange := raftpb.ConfChange{
Type: changeTypeInternalToRaft[crt.ChangeType],
NodeID: uint64(crt.Replica.ReplicaID),
Context: encodedCtx,
- }); err != nil && err != raft.ErrProposalDropped {
+ }
+ if confChange.Type == raftpb.ConfChangeAddNode &&
+ crt.Replica.GetType() == roachpb.ReplicaType_LEARNER {
+ confChange.Type = raftpb.ConfChangeAddLearnerNode
+ }
+ if err := raftGroup.ProposeConfChange(
+ confChange,
+ ); err != nil && err != raft.ErrProposalDropped {
// Silently ignore dropped proposals (they were always silently
// ignored prior to the introduction of ErrProposalDropped).
// TODO(bdarnell): Handle ErrProposalDropped better.
diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go
index 494e4a6cd04f..0ec04ceb1dda 100644
--- a/pkg/storage/replica_raftstorage.go
+++ b/pkg/storage/replica_raftstorage.go
@@ -539,6 +539,8 @@ func snapshot(
for _, rep := range desc.Replicas().All() {
cs.Nodes = append(cs.Nodes, uint64(rep.ReplicaID))
}
+ // WIP pretty sure the learners need to go in cs.Learners, but no test seems
+ // to be catching this right now.
term, err := term(ctx, rsl, snap, rangeID, eCache, appliedIndex)
if err != nil {
diff --git a/pkg/storage/replica_range_lease.go b/pkg/storage/replica_range_lease.go
index 7b67fdcf239d..1d6a937b1efe 100644
--- a/pkg/storage/replica_range_lease.go
+++ b/pkg/storage/replica_range_lease.go
@@ -675,6 +675,21 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID
return nil, nil, errors.Errorf("unable to find store %d in range %+v", target, desc)
}
+ // For now, don't allow replicas of type LEARNER to be leaseholders. There's
+ // no reason this wouldn't work in principle, but it seems inadvisable. In
+ // particular, learners can't become raft leaders, so we wouldn't be able to
+ // co-locate the leaseholder + raft leader, which is going to affect tail
+ // latencies. Additionally, as of the time of writing, learner replicas are
+ // only used for a short time in replica addition, so it's not worth working
+ // out the edge cases. If we decide to start using long-lived learners at
+ // some point, that math may change.
+ //
+ // If this check is removed at some point, the filtering of learners on the
+ // sending side would have to be removed as well.
+ if t := nextLeaseHolder.GetType(); t != roachpb.ReplicaType_VOTER {
+ return nil, nil, errors.Errorf(`cannot transfer lease to replica of type %s`, t)
+ }
+
if nextLease, ok := r.mu.pendingLeaseRequest.RequestPending(); ok &&
nextLease.Replica != nextLeaseHolder {
repDesc, err := r.getReplicaDescriptorRLocked()
diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go
index 3126749ca73b..84a06df63982 100644
--- a/pkg/storage/replicate_queue.go
+++ b/pkg/storage/replicate_queue.go
@@ -69,6 +69,12 @@ var (
Measurement: "Replica Removals",
Unit: metric.Unit_COUNT,
}
+ metaReplicateQueueRemoveLearnerReplicaCount = metric.Metadata{
+ Name: "queue.replicate.removelearnerreplica",
+ Help: "Number of learner replica removals attempted by the replicate queue (typically due to internal race conditions)",
+ Measurement: "Replica Removals",
+ Unit: metric.Unit_COUNT,
+ }
metaReplicateQueueRebalanceReplicaCount = metric.Metadata{
Name: "queue.replicate.rebalancereplica",
Help: "Number of replica rebalancer-initiated additions attempted by the replicate queue",
@@ -104,20 +110,22 @@ func (*quorumError) purgatoryErrorMarker() {}
// ReplicateQueueMetrics is the set of metrics for the replicate queue.
type ReplicateQueueMetrics struct {
- AddReplicaCount *metric.Counter
- RemoveReplicaCount *metric.Counter
- RemoveDeadReplicaCount *metric.Counter
- RebalanceReplicaCount *metric.Counter
- TransferLeaseCount *metric.Counter
+ AddReplicaCount *metric.Counter
+ RemoveReplicaCount *metric.Counter
+ RemoveDeadReplicaCount *metric.Counter
+ RemoveLearnerReplicaCount *metric.Counter
+ RebalanceReplicaCount *metric.Counter
+ TransferLeaseCount *metric.Counter
}
func makeReplicateQueueMetrics() ReplicateQueueMetrics {
return ReplicateQueueMetrics{
- AddReplicaCount: metric.NewCounter(metaReplicateQueueAddReplicaCount),
- RemoveReplicaCount: metric.NewCounter(metaReplicateQueueRemoveReplicaCount),
- RemoveDeadReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaCount),
- RebalanceReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceReplicaCount),
- TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount),
+ AddReplicaCount: metric.NewCounter(metaReplicateQueueAddReplicaCount),
+ RemoveReplicaCount: metric.NewCounter(metaReplicateQueueRemoveReplicaCount),
+ RemoveDeadReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaCount),
+ RemoveLearnerReplicaCount: metric.NewCounter(metaReplicateQueueRemoveLearnerReplicaCount),
+ RebalanceReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceReplicaCount),
+ TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount),
}
}
@@ -526,6 +534,25 @@ func (rq *replicateQueue) processOneChange(
); err != nil {
return false, err
}
+ case AllocatorRemoveLearner:
+ learnerReplicas := desc.Replicas().Learners()
+ if len(learnerReplicas) == 0 {
+ log.VEventf(ctx, 1, "range of replica %s was identified as having learner replicas, "+
+ "but no learner replicas were found", repl)
+ break
+ }
+ learnerReplica := learnerReplicas[0]
+ rq.metrics.RemoveLearnerReplicaCount.Inc(1)
+ log.VEventf(ctx, 1, "removing learner replica %+v from store", learnerReplica)
+ target := roachpb.ReplicationTarget{
+ NodeID: learnerReplica.NodeID,
+ StoreID: learnerReplica.StoreID,
+ }
+ if err := rq.removeReplica(
+ ctx, repl, target, desc, storagepb.ReasonAbandonedLearner, "", dryRun,
+ ); err != nil {
+ return false, err
+ }
case AllocatorConsiderRebalance:
// The Noop case will result if this replica was queued in order to
// rebalance. Attempt to find a rebalancing target.
@@ -660,7 +687,7 @@ func (rq *replicateQueue) addReplica(
if dryRun {
return nil
}
- if _, err := repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details); err != nil {
+ if _, err := repl.addReplica(ctx, target, desc, priority, reason, details); err != nil {
return err
}
rangeInfo := rangeInfoForRepl(repl, desc)
diff --git a/pkg/storage/storagepb/log.go b/pkg/storage/storagepb/log.go
index 53601436458b..0cf93eb155ac 100644
--- a/pkg/storage/storagepb/log.go
+++ b/pkg/storage/storagepb/log.go
@@ -22,4 +22,5 @@ const (
ReasonStoreDecommissioning RangeLogEventReason = "store decommissioning"
ReasonRebalance RangeLogEventReason = "rebalance"
ReasonAdminRequest RangeLogEventReason = "admin request"
+ ReasonAbandonedLearner RangeLogEventReason = "abandoned learner replica"
)
diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go
index 20ff98fb09d3..8426c849b834 100644
--- a/pkg/storage/store_snapshot.go
+++ b/pkg/storage/store_snapshot.go
@@ -593,6 +593,12 @@ func (s *Store) shouldAcceptSnapshotData(
func (s *Store) receiveSnapshot(
ctx context.Context, header *SnapshotRequest_Header, stream incomingSnapshotStream,
) error {
+ if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil {
+ if err := fn(header); err != nil {
+ return sendSnapshotError(stream, err)
+ }
+ }
+
cleanup, rejectionMsg, err := s.reserveSnapshot(ctx, header)
if err != nil {
return err
diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go
index bb45d0fa7006..0d1a69a9d907 100644
--- a/pkg/storage/testing_knobs.go
+++ b/pkg/storage/testing_knobs.go
@@ -181,6 +181,16 @@ type StoreTestingKnobs struct {
// TraceAllRaftEvents enables raft event tracing even when the current
// vmodule would not have enabled it.
TraceAllRaftEvents bool
+
+ // ReceiveSnapshot is run after receiving a snapshot header but before
+ // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an
+ // error is returned from the hook, it's sent as an ERROR SnapshotResponse.
+ ReceiveSnapshot func(*SnapshotRequest_Header) error
+ // ReplicaAddStopAfterLearnerSnapshot causes replica adding to return early,
+ // if true: after the learner txn is successful and after the LEARNER type
+ // snapshot, but before promoting it to a voter. This ensures the `*Replica`
+ // will be materialized on the Store when it returns.
+ ReplicaAddStopAfterLearnerSnapshot func() bool
}
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go
index 5f57448fb90b..e3ef2f96cab2 100644
--- a/pkg/testutils/serverutils/test_cluster_shim.go
+++ b/pkg/testutils/serverutils/test_cluster_shim.go
@@ -52,6 +52,12 @@ type TestClusterInterface interface {
startKey roachpb.Key, targets ...roachpb.ReplicationTarget,
) (roachpb.RangeDescriptor, error)
+ // AddReplicasOrFatal is the same as AddReplicas but will Fatal the test on
+ // error.
+ AddReplicasOrFatal(
+ t testing.TB, startKey roachpb.Key, targets ...roachpb.ReplicationTarget,
+ ) roachpb.RangeDescriptor
+
// RemoveReplicas removes one or more replicas from a range.
RemoveReplicas(
startKey roachpb.Key, targets ...roachpb.ReplicationTarget,
@@ -86,6 +92,10 @@ type TestClusterInterface interface {
// LookupRange returns the descriptor of the range containing key.
LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, error)
+ // LookupRangeOrFatal is the same as LookupRange but will Fatal the test on
+ // error.
+ LookupRangeOrFatal(t testing.TB, key roachpb.Key) roachpb.RangeDescriptor
+
// Target returns a roachpb.ReplicationTarget for the specified server.
Target(serverIdx int) roachpb.ReplicationTarget
diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go
index 761efbaf6db9..2050e93ce20b 100644
--- a/pkg/testutils/testcluster/testcluster.go
+++ b/pkg/testutils/testcluster/testcluster.go
@@ -14,6 +14,7 @@ import (
"context"
gosql "database/sql"
"fmt"
+ "math"
"sync"
"testing"
"time"
@@ -44,6 +45,7 @@ type TestCluster struct {
Conns []*gosql.DB
stopper *stop.Stopper
replicationMode base.TestClusterReplicationMode
+ scratchRangeID roachpb.RangeID
mu struct {
syncutil.Mutex
serverStoppers []*stop.Stopper
@@ -333,6 +335,16 @@ func (tc *TestCluster) LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, er
return tc.Servers[0].LookupRange(key)
}
+// LookupRangeOrFatal is part of TestClusterInterface.
+func (tc *TestCluster) LookupRangeOrFatal(t testing.TB, key roachpb.Key) roachpb.RangeDescriptor {
+ t.Helper()
+ desc, err := tc.LookupRange(key)
+ if err != nil {
+ t.Fatalf(`looking up range for %s: %+v`, key, err)
+ }
+ return desc
+}
+
// SplitRange splits the range containing splitKey.
// The right range created by the split starts at the split key and extends to the
// original range's end key.
@@ -418,6 +430,19 @@ func (tc *TestCluster) AddReplicas(
}
}
+// AddReplicasOrFatal is part of TestClusterInterface.
+func (tc *TestCluster) AddReplicasOrFatal(
+ t testing.TB, startKey roachpb.Key, targets ...roachpb.ReplicationTarget,
+) roachpb.RangeDescriptor {
+ t.Helper()
+ desc, err := tc.AddReplicas(startKey, targets...)
+ if err != nil {
+ t.Fatalf(`could not add %v replicas to range containing %s: %+v`,
+ targets, startKey, err)
+ }
+ return desc
+}
+
// RemoveReplicas is part of the TestServerInterface.
func (tc *TestCluster) RemoveReplicas(
startKey roachpb.Key, targets ...roachpb.ReplicationTarget,
@@ -495,10 +520,25 @@ func (tc *TestCluster) FindRangeLeaseHolder(
return roachpb.ReplicationTarget{NodeID: replicaDesc.NodeID, StoreID: replicaDesc.StoreID}, nil
}
-// WaitForSplitAndReplication waits for a range which starts with
-// startKey and then verifies that each replica in the range
-// descriptor has been created.
-func (tc *TestCluster) WaitForSplitAndReplication(startKey roachpb.Key) error {
+// ScratchRange returns the start key of a span of keyspace suitable for use as
+// kv scratch space (it doesn't overlap system spans or SQL tables). The range
+// is lazily split off on the first call to ScratchRange.
+func (tc *TestCluster) ScratchRange(t testing.TB) roachpb.Key {
+ scratchKey := keys.MakeTablePrefix(math.MaxUint32)
+ if tc.scratchRangeID > 0 {
+ return scratchKey
+ }
+ _, right, err := tc.SplitRange(scratchKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+ tc.scratchRangeID = right.RangeID
+ return scratchKey
+}
+
+// WaitForSplitAndInitialization waits for a range which starts with startKey
+// and then verifies that each replica in the range descriptor has been created.
+func (tc *TestCluster) WaitForSplitAndInitialization(startKey roachpb.Key) error {
return retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
desc, err := tc.LookupRange(startKey)
if err != nil {
@@ -509,7 +549,7 @@ func (tc *TestCluster) WaitForSplitAndReplication(startKey roachpb.Key) error {
return errors.Errorf("expected range start key %s; got %s",
startKey, desc.StartKey)
}
- // A learner replicas is still up-replicating, so if we have any, we're not
+ // A learner replica is still up-replicating, so if we have any, we're not
// replicated yet.
if learnerReplicas := desc.Replicas().Learners(); len(learnerReplicas) > 0 {
return errors.Errorf("have %d learners, still replicating %s", len(learnerReplicas), desc)
diff --git a/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx b/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx
index 3327a4eb8bad..0829b80867c4 100644
--- a/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx
+++ b/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx
@@ -65,6 +65,7 @@ export default function (props: GraphDashboardProps) {
+