diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 0dc056d1de40..554055327d61 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -38,6 +38,7 @@ kv.closed_timestamp.target_durationduration30sif nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration kv.follower_read.target_multiplefloat3if above 1, encourages the distsender to perform a read against the closest replica if a request is older than kv.closed_timestamp.target_duration * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty interval. This value also is used to create follower_timestamp(). (WARNING: may compromise cluster stability or correctness; do not edit without supervision) kv.import.batch_sizebyte size32 MiBthe maximum size of the payload in an AddSSTable request (WARNING: may compromise cluster stability or correctness; do not edit without supervision) +kv.learner_replicas.enabledbooleanfalseuse learner replicas for replica addition kv.raft.command.max_sizebyte size64 MiBmaximum size of a raft command kv.raft_log.disable_synchronization_unsafebooleanfalseset to true to disable synchronization on Raft log writes to persistent storage. Setting to true risks data loss or data corruption on server crashes. The setting is meant for internal testing only and SHOULD NOT be used in production. kv.range.backpressure_range_size_multiplierfloat2multiple of range_max_bytes that a range is allowed to grow to without splitting before writes to that range are blocked, or 0 to disable @@ -120,6 +121,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen in the /debug page trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set -versioncustom validation19.1-5set the active cluster version in the format '.' +versioncustom validation19.1-6set the active cluster version in the format '.' diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 929fd5e037c9..ed8c0875364b 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -270,6 +270,20 @@ func (r ReplicationTarget) String() string { return fmt.Sprintf("n%d,s%d", r.NodeID, r.StoreID) } +// ReplicaTypeLearner returns a ReplicaType_LEARNER pointer suitable for use in +// a nullable proto field. +func ReplicaTypeLearner() *ReplicaType { + t := ReplicaType_LEARNER + return &t +} + +// ReplicaTypeVoter returns a ReplicaType_VOTER pointer suitable for use in a +// nullable proto field. +func ReplicaTypeVoter() *ReplicaType { + t := ReplicaType_VOTER + return &t +} + func (r ReplicaDescriptor) String() string { var buf bytes.Buffer fmt.Fprintf(&buf, "(n%d,s%d):", r.NodeID, r.StoreID) diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 87e803953e73..cf35cf297e46 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -76,6 +76,77 @@ func (d ReplicaDescriptors) Voters() []ReplicaDescriptor { } // Learners returns the learner replicas in the set. +// +// A learner is a participant in a raft group that accepts messages but doesn't +// vote. This means it doesn't affect raft quorum and thus doesn't affect the +// fragility of the range, even if it's very far behind or many learners are +// down. +// +// At the time of writing, learners are used in CockroachDB as an interim state +// while adding a replica. A learner replica is added to the range via raft +// ConfChange, a raft snapshot (of type LEARNER) is sent to catch it up, and +// then a second ConfChange promotes it to a full replica. +// +// This means that learners are currently always expected to have a short +// lifetime, approximately the time it takes to send a snapshot. Ideas have been +// kicked around to use learners with follower reads, which could be a cheap way +// to allow many geographies to have local reads without affecting write +// latencies. If implemented, these learners would have long lifetimes. +// +// For simplicity, CockroachDB treats learner replicas the same as voter +// replicas as much as possible, but there are a few exceptions: +// +// - Learner replicas are not considered when calculating quorum size, and thus +// do not affect the computation of which ranges are under-replicated for +// upreplication/alerting/debug/etc purposes. Ditto for over-replicated. +// - Learner replicas cannot become raft leaders, so we also don't allow them to +// become leaseholders. As a result, DistSender and the various oracles don't +// try to send them traffic. +// - The raft snapshot queue does not send snapshots to learners for reasons +// described below. +// - Merges won't run while a learner replica is present. +// +// Replicas are now added in two ConfChange transactions. The first creates the +// learner and the second promotes it to a voter. If the node that is +// coordinating this dies in the middle, we're left with an orphaned learner. +// For this reason, the replicate queue always first removes any learners it +// sees before doing anything else. We could instead try to finish off the +// learner snapshot and promotion, but this is more complicated and it's not yet +// clear the efficiency win is worth it. +// +// This introduces some rare races between the replicate queue and +// AdminChangeReplicas or if a range's lease is moved to a new owner while the +// old leaseholder is still processing it in the replicate queue. These races +// are handled by retrying if a learner disappears during the +// snapshot/promotion. +// +// If the coordinator otherwise encounters an error while sending the learner +// snapshot or promoting it (which can happen for a number of reasons, including +// the node getting the learner going away), it tries to clean up after itself +// by rolling back the addition of the learner. +// +// There is another race between the learner snapshot being sent and the raft +// snapshot queue happening to check the replica at the same time, also sending +// it a snapshot. This is safe but wasteful, so the raft snapshot queue won't +// try to send snapshots to learners. +// +// Merges are blocked if either side has a learner (to avoid working out the +// edge cases) but it's historically turned out to be a bad idea to get in the +// way of splits, so we allow them even when some of the replicas are learners. +// This orphans a learner on each side of the split (the original coordinator +// will not be able to finish either of them), but the replication queue will +// eventually clean them up. +// +// Learner replicas don't affect quorum but they do affect the system in other +// ways. The most obvious way is that the leader sends them the raft traffic it +// would send to any follower, consuming resources. More surprising is that once +// the learner has received a snapshot, it's considered by the quota pool that +// prevents the raft leader from getting too far ahead of the followers. This is +// because a learner (especially one that already has a snapshot) is expected to +// very soon be a voter, so we treat it like one. However, it means a slow +// learner can slow down regular traffic, which is possibly counterintuitive. +// +// For some related mega-comments, see Replica.sendSnapshot. func (d ReplicaDescriptors) Learners() []ReplicaDescriptor { // Note that the wrapped replicas are sorted first by type. for i := range d.wrapped { @@ -86,8 +157,6 @@ func (d ReplicaDescriptors) Learners() []ReplicaDescriptor { return nil } -var _, _ = ReplicaDescriptors.All, ReplicaDescriptors.Learners - // AsProto returns the protobuf representation of these replicas, suitable for // setting the InternalReplicas field of a RangeDescriptor. When possible the // SetReplicas method of RangeDescriptor should be used instead, this is only diff --git a/pkg/settings/cluster/cockroach_versions.go b/pkg/settings/cluster/cockroach_versions.go index ea10da88d3b4..002b133d8fe7 100644 --- a/pkg/settings/cluster/cockroach_versions.go +++ b/pkg/settings/cluster/cockroach_versions.go @@ -40,6 +40,7 @@ const ( VersionStickyBit VersionParallelCommits VersionGenerationComparable + VersionLearnerReplicas // Add new versions here (step one of two). @@ -483,6 +484,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Key: VersionGenerationComparable, Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 5}, }, + { + // VersionLearnerReplicas is https://github.com/cockroachdb/cockroach/pull/38149. + Key: VersionLearnerReplicas, + Version: roachpb.Version{Major: 19, Minor: 1, Unstable: 6}, + }, // Add new versions here (step two of two). diff --git a/pkg/settings/cluster/versionkey_string.go b/pkg/settings/cluster/versionkey_string.go index 77b58cb16f83..9470ddfe75d1 100644 --- a/pkg/settings/cluster/versionkey_string.go +++ b/pkg/settings/cluster/versionkey_string.go @@ -17,11 +17,12 @@ func _() { _ = x[VersionStickyBit-6] _ = x[VersionParallelCommits-7] _ = x[VersionGenerationComparable-8] + _ = x[VersionLearnerReplicas-9] } -const _VersionKey_name = "Version2_1VersionUnreplicatedRaftTruncatedStateVersionSideloadedStorageNoReplicaIDVersion19_1VersionStart19_2VersionQueryTxnTimestampVersionStickyBitVersionParallelCommitsVersionGenerationComparable" +const _VersionKey_name = "Version2_1VersionUnreplicatedRaftTruncatedStateVersionSideloadedStorageNoReplicaIDVersion19_1VersionStart19_2VersionQueryTxnTimestampVersionStickyBitVersionParallelCommitsVersionGenerationComparableVersionLearnerReplicas" -var _VersionKey_index = [...]uint8{0, 10, 47, 82, 93, 109, 133, 149, 171, 198} +var _VersionKey_index = [...]uint8{0, 10, 47, 82, 93, 109, 133, 149, 171, 198, 220} func (i VersionKey) String() string { if i < 0 || i >= VersionKey(len(_VersionKey_index)-1) { diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 10dbfc700820..ce2b0d992d2a 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -158,7 +158,7 @@ func TestAmbiguousCommit(t *testing.T) { tableStartKey.Store(keys.MakeTablePrefix(tableID)) // Wait for new table to split & replication. - if err := tc.WaitForSplitAndReplication(tableStartKey.Load().([]byte)); err != nil { + if err := tc.WaitForSplitAndInitialization(tableStartKey.Load().([]byte)); err != nil { t.Fatal(err) } diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index e900955c5811..b4278097a9ad 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -48,6 +48,7 @@ const ( minReplicaWeight = 0.001 // priorities for various repair operations. + removeLearnerReplicaPriority float64 = 12001 addDeadReplacementPriority float64 = 12000 addMissingReplicaPriority float64 = 10000 addDecommissioningReplacementPriority float64 = 5000 @@ -99,6 +100,7 @@ const ( AllocatorAdd AllocatorRemoveDead AllocatorRemoveDecommissioning + AllocatorRemoveLearner AllocatorConsiderRebalance ) @@ -108,6 +110,7 @@ var allocatorActionNames = map[AllocatorAction]string{ AllocatorAdd: "add", AllocatorRemoveDead: "remove dead", AllocatorRemoveDecommissioning: "remove decommissioning", + AllocatorRemoveLearner: "remove learner", AllocatorConsiderRebalance: "consider rebalance", } @@ -295,8 +298,49 @@ func (a *Allocator) ComputeAction( // Do nothing if storePool is nil for some unittests. return AllocatorNoop, 0 } - // TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets. + // Seeing a learner replica at this point is unexpected because learners are a + // short-lived (ish) transient state in a learner+snapshot+voter cycle, which + // is always done atomically. Only two places could have added a learner: the + // replicate queue or AdminChangeReplicas request. + // + // The replicate queue only operates on leaseholders, which means that only + // one node at a time is operating on a given range except in rare cases (old + // leaseholder could start the operation, and a new leaseholder steps up and + // also starts an overlapping operation). Combined with the above atomicity, + // this means that if the replicate queue sees a learner, either the node that + // was adding it crashed somewhere in the learner+snapshot+voter cycle and + // we're the new leaseholder or we caught a race. + // + // In the first case, we could assume the node that was adding it knew what it + // was doing and finish the addition. Or we could leave it and do higher + // priority operations first if there are any. However, this comes with code + // complexity and concept complexity (computing old vs new quorum sizes + // becomes ambiguous, the learner isn't in the quorum but it likely will be + // soon, so do you count it?). Instead, we do the simplest thing and remove it + // before doing any other operations to the range. We'll revisit this decision + // if and when the complexity becomes necessary. + // + // If we get the race where AdminChangeReplicas is adding a replica and the + // queue happens to run during the snapshot, this will remove the learner and + // AdminChangeReplicas will notice either during the snapshot transfer or when + // it tries to promote the learner to a voter. AdminChangeReplicas should + // retry. + // + // On the other hand if we get the race where a leaseholder starts adding a + // replica in the replicate queue and during this loses its lease, it should + // probably not retry. + if learners := rangeInfo.Desc.Replicas().Learners(); len(learners) > 0 { + // TODO(dan): Since this goes before anything else, the priority here should + // be influenced by whatever operations would happen right after the learner + // is removed. In the meantime, we don't want to block something important + // from happening (like addDeadReplacementPriority) by queueing this at a + // low priority so until this TODO is done, keep + // removeLearnerReplicaPriority as the highest priority. + return AllocatorRemoveLearner, removeLearnerReplicaPriority + } + + // TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets. have := len(rangeInfo.Desc.Replicas().Unwrap()) decommissioningReplicas := a.storePool.decommissioningReplicas( rangeInfo.Desc.RangeID, rangeInfo.Desc.Replicas().Unwrap()) diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index c75440510402..29cf0ef1469b 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -43,6 +43,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/tracker" ) @@ -4770,6 +4771,40 @@ func TestAllocatorComputeActionDecommission(t *testing.T) { } } +func TestAllocatorRemoveLearner(t *testing.T) { + defer leaktest.AfterTest(t)() + + zone := config.ZoneConfig{ + NumReplicas: proto.Int32(3), + } + learnerType := roachpb.ReplicaType_LEARNER + rangeWithLearnerDesc := roachpb.RangeDescriptor{ + InternalReplicas: []roachpb.ReplicaDescriptor{ + { + StoreID: 1, + NodeID: 1, + ReplicaID: 1, + }, + { + StoreID: 2, + NodeID: 2, + ReplicaID: 2, + Type: &learnerType, + }, + }, + } + + // Removing a learner is prioritized over adding a new replica to an under + // replicated range. + stopper, _, sp, a, _ := createTestAllocator(10, false /* deterministic */) + ctx := context.Background() + defer stopper.Stop(ctx) + live, dead := []roachpb.StoreID{1, 2}, []roachpb.StoreID{3} + mockStorePool(sp, live, nil, dead, nil, nil) + action, _ := a.ComputeAction(ctx, &zone, RangeInfo{Desc: &rangeWithLearnerDesc}) + require.Equal(t, AllocatorRemoveLearner, action) +} + func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/batcheval/cmd_lease.go b/pkg/storage/batcheval/cmd_lease.go index 3d43186d8924..5bfaddaf4b1f 100644 --- a/pkg/storage/batcheval/cmd_lease.go +++ b/pkg/storage/batcheval/cmd_lease.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/errors" ) func declareKeysRequestLease( @@ -41,6 +42,17 @@ func newFailedLeaseTrigger(isTransfer bool) result.Result { return trigger } +func checkNotLearnerReplica(rec EvalContext) error { + repDesc, ok := rec.Desc().GetReplicaDescriptor(rec.StoreID()) + if !ok { + return errors.AssertionFailedf( + `could not find replica for store %s in %s`, rec.StoreID(), rec.Desc()) + } else if t := repDesc.GetType(); t == roachpb.ReplicaType_LEARNER { + return errors.Errorf(`cannot transfer lease to replica of type %s`, t) + } + return nil +} + // evalNewLease checks that the lease contains a valid interval and that // the new lease holder is still a member of the replica set, and then proceeds // to write the new lease to the batch, emitting an appropriate trigger. diff --git a/pkg/storage/batcheval/cmd_lease_request.go b/pkg/storage/batcheval/cmd_lease_request.go index 539a5514efa2..f45caf1d3176 100644 --- a/pkg/storage/batcheval/cmd_lease_request.go +++ b/pkg/storage/batcheval/cmd_lease_request.go @@ -32,9 +32,25 @@ func init() { func RequestLease( ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response, ) (result.Result, error) { + // When returning an error from this method, must always return a + // newFailedLeaseTrigger() to satisfy stats. args := cArgs.Args.(*roachpb.RequestLeaseRequest) - // When returning an error from this method, must always return - // a newFailedLeaseTrigger() to satisfy stats. + + // For now, don't allow replicas of type LEARNER to be leaseholders. There's + // no reason this wouldn't work in principle, but it seems inadvisable. In + // particular, learners can't become raft leaders, so we wouldn't be able to + // co-locate the leaseholder + raft leader, which is going to affect tail + // latencies. Additionally, as of the time of writing, learner replicas are + // only used for a short time in replica addition, so it's not worth working + // out the edge cases. If we decide to start using long-lived learners at some + // point, that math may change. + // + // If this check is removed at some point, the filtering of learners on the + // sending side would have to be removed as well. + if err := checkNotLearnerReplica(cArgs.EvalCtx); err != nil { + return newFailedLeaseTrigger(false /* isTransfer */), err + } + prevLease, _ := cArgs.EvalCtx.GetLease() rErr := &roachpb.LeaseRejectedError{ diff --git a/pkg/storage/batcheval/cmd_lease_test.go b/pkg/storage/batcheval/cmd_lease_test.go index 1ddbf5fd2cbf..34d2970c836c 100644 --- a/pkg/storage/batcheval/cmd_lease_test.go +++ b/pkg/storage/batcheval/cmd_lease_test.go @@ -16,9 +16,11 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" ) // TestLeaseTransferWithPipelinedWrite verifies that pipelined writes @@ -104,3 +106,32 @@ func TestLeaseTransferWithPipelinedWrite(t *testing.T) { } } } + +func TestLeaseCommandLearnerReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + const voterStoreID, learnerStoreID roachpb.StoreID = 1, 2 + replicas := []roachpb.ReplicaDescriptor{ + {StoreID: voterStoreID, Type: roachpb.ReplicaTypeVoter()}, + {StoreID: learnerStoreID, Type: roachpb.ReplicaTypeLearner()}, + } + desc := roachpb.RangeDescriptor{} + desc.SetReplicas(roachpb.MakeReplicaDescriptors(&replicas)) + cArgs := CommandArgs{ + EvalCtx: &mockEvalCtx{ + storeID: learnerStoreID, + desc: &desc, + }, + Args: &roachpb.TransferLeaseRequest{}, + } + + // Learners are not allowed to become leaseholders for now, see the comments + // in TransferLease and RequestLease. + _, err := TransferLease(ctx, nil, cArgs, nil) + require.EqualError(t, err, `cannot transfer lease to replica of type LEARNER`) + + cArgs.Args = &roachpb.RequestLeaseRequest{} + _, err = RequestLease(ctx, nil, cArgs, nil) + require.EqualError(t, err, `cannot transfer lease to replica of type LEARNER`) +} diff --git a/pkg/storage/batcheval/cmd_lease_transfer.go b/pkg/storage/batcheval/cmd_lease_transfer.go index 5ad036a40669..91012c9e08ab 100644 --- a/pkg/storage/batcheval/cmd_lease_transfer.go +++ b/pkg/storage/batcheval/cmd_lease_transfer.go @@ -31,10 +31,25 @@ func init() { func TransferLease( ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response, ) (result.Result, error) { - args := cArgs.Args.(*roachpb.TransferLeaseRequest) - // When returning an error from this method, must always return // a newFailedLeaseTrigger() to satisfy stats. + args := cArgs.Args.(*roachpb.TransferLeaseRequest) + + // For now, don't allow replicas of type LEARNER to be leaseholders. There's + // no reason this wouldn't work in principle, but it seems inadvisable. In + // particular, learners can't become raft leaders, so we wouldn't be able to + // co-locate the leaseholder + raft leader, which is going to affect tail + // latencies. Additionally, as of the time of writing, learner replicas are + // only used for a short time in replica addition, so it's not worth working + // out the edge cases. If we decide to start using long-lived learners at some + // point, that math may change. + // + // If this check is removed at some point, the filtering of learners on the + // sending side would have to be removed as well. + if err := checkNotLearnerReplica(cArgs.EvalCtx); err != nil { + return newFailedLeaseTrigger(true /* isTransfer */), err + } + prevLease, _ := cArgs.EvalCtx.GetLease() if log.V(2) { log.Infof(ctx, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, args.Lease) diff --git a/pkg/storage/batcheval/cmd_resolve_intent_test.go b/pkg/storage/batcheval/cmd_resolve_intent_test.go index 86181ab32bc4..c1ced7ecabee 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent_test.go +++ b/pkg/storage/batcheval/cmd_resolve_intent_test.go @@ -34,6 +34,7 @@ import ( type mockEvalCtx struct { clusterSettings *cluster.Settings desc *roachpb.RangeDescriptor + storeID roachpb.StoreID clock *hlc.Clock stats enginepb.MVCCStats qps float64 @@ -74,7 +75,7 @@ func (m *mockEvalCtx) NodeID() roachpb.NodeID { panic("unimplemented") } func (m *mockEvalCtx) StoreID() roachpb.StoreID { - panic("unimplemented") + return m.storeID } func (m *mockEvalCtx) GetRangeID() roachpb.RangeID { return m.desc.RangeID diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index d00a5db7f1f8..06b13e60a080 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -1326,7 +1326,7 @@ func TestLeaseInfoRequest(t *testing.T) { // right answer immediately, since the old holder has definitely applied the // transfer before TransferRangeLease returned. leaseHolderReplica := LeaseInfo(t, kvDB0, rangeDesc, roachpb.INCONSISTENT).Lease.Replica - if leaseHolderReplica != replicas[1] { + if !leaseHolderReplica.Equal(replicas[1]) { t.Fatalf("lease holder should be replica %+v, but is: %+v", replicas[1], leaseHolderReplica) } @@ -1339,7 +1339,7 @@ func TestLeaseInfoRequest(t *testing.T) { // unaware of the new lease and so the request might bounce around for a // while (see #8816). leaseHolderReplica = LeaseInfo(t, kvDB1, rangeDesc, roachpb.INCONSISTENT).Lease.Replica - if leaseHolderReplica != replicas[1] { + if !leaseHolderReplica.Equal(replicas[1]) { return errors.Errorf("lease holder should be replica %+v, but is: %+v", replicas[1], leaseHolderReplica) } @@ -1378,7 +1378,7 @@ func TestLeaseInfoRequest(t *testing.T) { resp := *(reply.(*roachpb.LeaseInfoResponse)) leaseHolderReplica = resp.Lease.Replica - if leaseHolderReplica != replicas[2] { + if !leaseHolderReplica.Equal(replicas[2]) { t.Fatalf("lease holder should be replica %s, but is: %s", replicas[2], leaseHolderReplica) } diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index effe1017f64f..51038776bc9f 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -607,7 +607,7 @@ func (ir *IntentResolver) CleanupTxnIntentsAsync( intents := roachpb.AsIntents(et.Txn.IntentSpans, &et.Txn) if err := ir.cleanupFinishedTxnIntents(ctx, rangeID, &et.Txn, intents, now, et.Poison, nil); err != nil { if ir.every.ShouldLog() { - log.Warningf(ctx, "failed to cleanup transaction intents: %+v", err) + log.Warningf(ctx, "failed to cleanup transaction intents: %v", err) } } }); err != nil { @@ -815,7 +815,7 @@ func (ir *IntentResolver) cleanupFinishedTxnIntents( } if err != nil { if ir.every.ShouldLog() { - log.Warningf(ctx, "failed to gc transaction record: %+v", err) + log.Warningf(ctx, "failed to gc transaction record: %v", err) } } }) diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go index 32b6d2aa9328..b830c1bc378c 100644 --- a/pkg/storage/raft_snapshot_queue.go +++ b/pkg/storage/raft_snapshot_queue.go @@ -65,8 +65,15 @@ func (rq *raftSnapshotQueue) shouldQueue( // If a follower needs a snapshot, enqueue at the highest priority. if status := repl.RaftStatus(); status != nil { // raft.Status.Progress is only populated on the Raft group leader. - for _, p := range status.Progress { + for id, p := range status.Progress { if p.State == tracker.StateSnapshot { + // We refuse to send a snapshot of type RAFT to a learner for reasons + // described in processRaftSnapshot, so don't bother queueing. + for _, r := range repl.Desc().Replicas().Learners() { + if r.ReplicaID == roachpb.ReplicaID(id) { + continue + } + } if log.V(2) { log.Infof(ctx, "raft snapshot needed, enqueuing") } @@ -105,6 +112,21 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( if !ok { return errors.Errorf("%s: replica %d not present in %v", repl, id, desc.Replicas()) } + + // A learner replica is either getting a snapshot of type LEARNER by the node + // that's adding it or it's been orphaned and it's about to be cleaned up by + // the replicate queue. Either way, no point in also sending it a snapshot of + // type RAFT. + // + // TODO(dan): Reconsider this. If the learner coordinator fails before sending + // it a snap, then until the replication queue collects it, any proposals sent + // to it will get stuck indefinitely. At the moment, nothing should be sending + // it such a proposal, but this is brittle and could change easily. + if repDesc.GetType() == roachpb.ReplicaType_LEARNER { + log.Eventf(ctx, "not sending snapshot type RAFT to learner: %s", repDesc) + return nil + } + err := repl.sendSnapshot(ctx, repDesc, SnapshotRequest_RAFT, SnapshotRequest_RECOVERY) // NB: if the snapshot fails because of an overlapping replica on the diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index f0c0ca251ba5..9ae8be77637f 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -34,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "go.etcd.io/etcd/raft" @@ -41,6 +43,14 @@ import ( "go.etcd.io/etcd/raft/tracker" ) +// useLearnerReplicas specifies whether to use learner replicas for replica +// addition or whether to fall back to the previous method of a preemptive +// snapshot followed by going straight to a voter replica. +var useLearnerReplicas = settings.RegisterBoolSetting( + "kv.learner_replicas.enabled", + "use learner replicas for replica addition", + false) + // AdminSplit divides the range into into two ranges using args.SplitKey. func (r *Replica) AdminSplit( ctx context.Context, args roachpb.AdminSplitRequest, reason string, @@ -849,18 +859,194 @@ func (r *Replica) ChangeReplicas( reason storagepb.RangeLogEventReason, details string, ) (updatedDesc *roachpb.RangeDescriptor, _ error) { - return r.changeReplicas(ctx, changeType, target, desc, SnapshotRequest_REBALANCE, reason, details) + if desc == nil { + return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r) + } + + switch changeType { + case roachpb.ADD_REPLICA: + return r.addReplica(ctx, target, desc, SnapshotRequest_REBALANCE, reason, details) + case roachpb.REMOVE_REPLICA: + return r.removeReplica(ctx, target, desc, SnapshotRequest_REBALANCE, reason, details) + default: + return nil, errors.Errorf(`unknown change type: %s`, changeType) + } } -func (r *Replica) changeReplicas( +func (r *Replica) addReplica( ctx context.Context, - changeType roachpb.ReplicaChangeType, target roachpb.ReplicationTarget, desc *roachpb.RangeDescriptor, priority SnapshotRequest_Priority, reason storagepb.RangeLogEventReason, details string, -) (_ *roachpb.RangeDescriptor, _ error) { +) (*roachpb.RangeDescriptor, error) { + for _, rDesc := range desc.Replicas().All() { + if rDesc.NodeID == target.NodeID { + // Two replicas from the same range are not allowed on the same node, even + // in different stores. + if rDesc.StoreID != target.StoreID { + return nil, errors.Errorf("%s: unable to add replica %v; node already has a replica", r, target) + } + + // Looks like we found a replica with the same store and node id. If the + // replica is already a learner, then either some previous leaseholder was + // trying to add it with the learner+snapshot+voter cycle and got + // interrupted or else we hit a race between the replicate queue and + // AdminChangeReplicas. + if rDesc.GetType() == roachpb.ReplicaType_LEARNER { + return nil, errors.Errorf( + "%s: unable to add replica %v which is already present as a learner", r, target) + } + + // Otherwise, we already had a full voter replica. Can't add another to + // this store. + return nil, errors.Errorf("%s: unable to add replica %v which is already present", r, target) + } + } + + settings := r.ClusterSettings() + useLearners := useLearnerReplicas.Get(&settings.SV) + useLearners = useLearners && settings.Version.IsActive(cluster.VersionLearnerReplicas) + if !useLearners { + return r.addReplicaLegacyPreemptiveSnapshot(ctx, target, desc, priority, reason, details) + } + + // First add the replica as a raft learner. This means it accepts raft traffic + // (so it can catch up) but doesn't vote (so it doesn't affect quorum and thus + // doesn't introduce fragility into the system). For details see + _ = roachpb.ReplicaDescriptors.Learners + learnerDesc, err := addLearnerReplica(ctx, r.store, desc, target, reason, details) + if err != nil { + return nil, err + } + + // Now move it to be a full voter (waiting on it to get a raft snapshot first, + // so it's not immediately way behind). + voterDesc, err := r.promoteLearnerReplicaToVoter(ctx, learnerDesc, target, priority, reason, details) + if err != nil { + // Don't leave a learner replica lying around if we didn't succeed in + // promoting it to a voter. + r.rollbackLearnerReplica(ctx, learnerDesc, target, reason, details) + return nil, err + } + return voterDesc, nil +} + +func addLearnerReplica( + ctx context.Context, + store *Store, + desc *roachpb.RangeDescriptor, + target roachpb.ReplicationTarget, + reason storagepb.RangeLogEventReason, + details string, +) (*roachpb.RangeDescriptor, error) { + newDesc := *desc + newDesc.SetReplicas(desc.Replicas().DeepCopy()) + replDesc := roachpb.ReplicaDescriptor{ + NodeID: target.NodeID, + StoreID: target.StoreID, + ReplicaID: desc.NextReplicaID, + Type: roachpb.ReplicaTypeLearner(), + } + newDesc.NextReplicaID++ + newDesc.AddReplica(replDesc) + err := execChangeReplicasTxn( + ctx, store, roachpb.ADD_REPLICA, desc, replDesc, &newDesc, reason, details, + ) + return &newDesc, err +} + +func (r *Replica) promoteLearnerReplicaToVoter( + ctx context.Context, + desc *roachpb.RangeDescriptor, + target roachpb.ReplicationTarget, + priority SnapshotRequest_Priority, + reason storagepb.RangeLogEventReason, + details string, +) (*roachpb.RangeDescriptor, error) { + // TODO(dan): We allow ranges with learner replicas to split, so in theory + // this may want to detect that and retry, sending a snapshot and promoting + // both sides. + + newReplicas := desc.Replicas().DeepCopy().All() + for i, rDesc := range newReplicas { + if rDesc.NodeID != target.NodeID || rDesc.StoreID != target.StoreID { + continue + } + if rDesc.GetType() != roachpb.ReplicaType_LEARNER { + return nil, errors.Errorf(`%s: cannot promote replica of type %s`, r, rDesc.Type) + } + rDesc.Type = roachpb.ReplicaTypeVoter() + newReplicas[i] = rDesc + + // Note that raft snapshot queue refuses to send snapshots, so this is the + // only one a learner can get. + if err := r.sendSnapshot(ctx, rDesc, SnapshotRequest_LEARNER, priority); err != nil { + return nil, err + } + + if fn := r.store.cfg.TestingKnobs.ReplicaAddStopAfterLearnerSnapshot; fn != nil { + if fn() { + return desc, nil + } + } + + updatedDesc := *desc + updatedDesc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas)) + err := execChangeReplicasTxn(ctx, r.store, roachpb.ADD_REPLICA, desc, rDesc, &updatedDesc, reason, details) + return &updatedDesc, err + } + return nil, errors.Errorf(`%s: could not find replica to promote %s`, r, target) +} + +func (r *Replica) rollbackLearnerReplica( + ctx context.Context, + desc *roachpb.RangeDescriptor, + target roachpb.ReplicationTarget, + reason storagepb.RangeLogEventReason, + details string, +) { + newDesc := *desc + newDesc.SetReplicas(desc.Replicas().DeepCopy()) + replDesc, ok := newDesc.RemoveReplica(target.NodeID, target.StoreID) + if !ok { + // This is a programming error if it happens. Why are we rolling back + // something that's not present? + log.Warningf(ctx, "failed to rollback learner %s, missing from descriptor %s", target, desc) + return + } + + // If (for example) the promotion failed because of a context deadline + // exceeded, we do still want to clean up after ourselves, so always use a new + // context (but with the old tags and with some timeout to save this from + // blocking the caller indefinitely). + const rollbackTimeout = 10 * time.Second + rollbackFn := func(ctx context.Context) error { + return execChangeReplicasTxn( + ctx, r.store, roachpb.REMOVE_REPLICA, desc, replDesc, &newDesc, reason, details, + ) + } + rollbackCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx)) + if err := contextutil.RunWithTimeout( + rollbackCtx, "learner rollback", rollbackTimeout, rollbackFn, + ); err != nil { + log.Infof(ctx, + "failed to rollback learner %s, abandoning it for the replicate queue %v", target, err) + r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } else { + log.Infof(ctx, "rolled back learner %s to %s", replDesc, &newDesc) + } +} + +func (r *Replica) addReplicaLegacyPreemptiveSnapshot( + ctx context.Context, + target roachpb.ReplicationTarget, + desc *roachpb.RangeDescriptor, + priority SnapshotRequest_Priority, + reason storagepb.RangeLogEventReason, + details string, +) (*roachpb.RangeDescriptor, error) { if desc == nil { return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r) } @@ -882,70 +1068,93 @@ func (r *Replica) changeReplicas( } } - generationComparableEnabled := r.store.ClusterSettings().Version.IsActive(cluster.VersionGenerationComparable) - rangeID := desc.RangeID updatedDesc := *desc - if generationComparableEnabled { - updatedDesc.IncrementGeneration() - updatedDesc.GenerationComparable = proto.Bool(true) - } updatedDesc.SetReplicas(desc.Replicas().DeepCopy()) - switch changeType { - case roachpb.ADD_REPLICA: - // If the replica exists on the remote node, no matter in which store, - // abort the replica add. - if nodeUsed { - if repDescIdx != -1 { - return nil, errors.Errorf("%s: unable to add replica %v which is already present", r, repDesc) - } - return nil, errors.Errorf("%s: unable to add replica %v; node already has a replica", r, repDesc) + // If the replica exists on the remote node, no matter in which store, + // abort the replica add. + if nodeUsed { + if repDescIdx != -1 { + return nil, errors.Errorf("%s: unable to add replica %v which is already present", r, repDesc) } + return nil, errors.Errorf("%s: unable to add replica %v; node already has a replica", r, repDesc) + } - // Send a pre-emptive snapshot. Note that the replica to which this - // snapshot is addressed has not yet had its replica ID initialized; this - // is intentional, and serves to avoid the following race with the replica - // GC queue: - // - // - snapshot received, a replica is lazily created with the "real" replica ID - // - the replica is eligible for GC because it is not yet a member of the range - // - GC queue runs, creating a raft tombstone with the replica's ID - // - the replica is added to the range - // - lazy creation of the replica fails due to the raft tombstone - // - // Instead, the replica GC queue will create a tombstone with replica ID - // zero, which is never legitimately used, and thus never interferes with - // raft operations. Racing with the replica GC queue can still partially - // negate the benefits of pre-emptive snapshots, but that is a recoverable - // degradation, not a catastrophic failure. - // - // NB: A closure is used here so that we can release the snapshot as soon - // as it has been applied on the remote and before the ChangeReplica - // operation is processed. This is important to allow other ranges to make - // progress which might be required for this ChangeReplicas operation to - // complete. See #10409. - if err := r.sendSnapshot(ctx, repDesc, SnapshotRequest_PREEMPTIVE, priority); err != nil { - return nil, err - } + // Send a pre-emptive snapshot. Note that the replica to which this + // snapshot is addressed has not yet had its replica ID initialized; this + // is intentional, and serves to avoid the following race with the replica + // GC queue: + // + // - snapshot received, a replica is lazily created with the "real" replica ID + // - the replica is eligible for GC because it is not yet a member of the range + // - GC queue runs, creating a raft tombstone with the replica's ID + // - the replica is added to the range + // - lazy creation of the replica fails due to the raft tombstone + // + // Instead, the replica GC queue will create a tombstone with replica ID + // zero, which is never legitimately used, and thus never interferes with + // raft operations. Racing with the replica GC queue can still partially + // negate the benefits of pre-emptive snapshots, but that is a recoverable + // degradation, not a catastrophic failure. + // + // NB: A closure is used here so that we can release the snapshot as soon + // as it has been applied on the remote and before the ChangeReplica + // operation is processed. This is important to allow other ranges to make + // progress which might be required for this ChangeReplicas operation to + // complete. See #10409. + if err := r.sendSnapshot(ctx, repDesc, SnapshotRequest_PREEMPTIVE, priority); err != nil { + return nil, err + } - repDesc.ReplicaID = updatedDesc.NextReplicaID - updatedDesc.NextReplicaID++ - updatedDesc.AddReplica(repDesc) + repDesc.ReplicaID = updatedDesc.NextReplicaID + updatedDesc.NextReplicaID++ + updatedDesc.AddReplica(repDesc) - case roachpb.REMOVE_REPLICA: - // If that exact node-store combination does not have the replica, - // abort the removal. - if repDescIdx == -1 { - return nil, errors.Errorf("%s: unable to remove replica %v which is not present", r, repDesc) - } - if _, ok := updatedDesc.RemoveReplica(repDesc.NodeID, repDesc.StoreID); !ok { - return nil, errors.Errorf("%s: unable to remove replica %v which is not present", r, repDesc) - } + err := execChangeReplicasTxn(ctx, r.store, roachpb.ADD_REPLICA, desc, repDesc, &updatedDesc, reason, details) + return &updatedDesc, err +} + +func (r *Replica) removeReplica( + ctx context.Context, + target roachpb.ReplicationTarget, + desc *roachpb.RangeDescriptor, + priority SnapshotRequest_Priority, + reason storagepb.RangeLogEventReason, + details string, +) (*roachpb.RangeDescriptor, error) { + if desc == nil { + return nil, errors.Errorf("%s: the current RangeDescriptor must not be nil", r) + } + updatedDesc := *desc + updatedDesc.SetReplicas(desc.Replicas().DeepCopy()) + // If that exact node-store combination does not have the replica, + // abort the removal. + removed, ok := updatedDesc.RemoveReplica(target.NodeID, target.StoreID) + if !ok { + return nil, errors.Errorf("%s: unable to remove replica %v which is not present", r, target) } + err := execChangeReplicasTxn(ctx, r.store, roachpb.REMOVE_REPLICA, desc, removed, &updatedDesc, reason, details) + return &updatedDesc, err +} - descKey := keys.RangeDescriptorKey(desc.StartKey) +func execChangeReplicasTxn( + ctx context.Context, + store *Store, + changeType roachpb.ReplicaChangeType, + desc *roachpb.RangeDescriptor, + repDesc roachpb.ReplicaDescriptor, + updatedDesc *roachpb.RangeDescriptor, + reason storagepb.RangeLogEventReason, + details string, +) error { + generationComparableEnabled := store.ClusterSettings().Version.IsActive(cluster.VersionGenerationComparable) + if generationComparableEnabled { + updatedDesc.IncrementGeneration() + updatedDesc.GenerationComparable = proto.Bool(true) + } - if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + descKey := keys.RangeDescriptorKey(desc.StartKey) + if err := store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error { log.Event(ctx, "attempting txn") txn.SetDebugName(replicaChangeTxnName) dbDescValue, err := conditionalGetDescValueFromDB(ctx, txn, desc) @@ -959,7 +1168,7 @@ func (r *Replica) changeReplicas( // Important: the range descriptor must be the first thing touched in the transaction // so the transaction record is co-located with the range being modified. - if err := updateRangeDescriptor(b, descKey, dbDescValue, &updatedDesc); err != nil { + if err := updateRangeDescriptor(b, descKey, dbDescValue, updatedDesc); err != nil { return err } @@ -970,8 +1179,8 @@ func (r *Replica) changeReplicas( } // Log replica change into range event log. - if err := r.store.logChange( - ctx, txn, changeType, repDesc, updatedDesc, reason, details, + if err := store.logChange( + ctx, txn, changeType, repDesc, *updatedDesc, reason, details, ); err != nil { return err } @@ -981,7 +1190,7 @@ func (r *Replica) changeReplicas( b := txn.NewBatch() // Update range descriptor addressing record(s). - if err := updateRangeAddressing(b, &updatedDesc); err != nil { + if err := updateRangeAddressing(b, updatedDesc); err != nil { return err } @@ -991,7 +1200,7 @@ func (r *Replica) changeReplicas( ChangeReplicasTrigger: &roachpb.ChangeReplicasTrigger{ ChangeType: changeType, Replica: repDesc, - Desc: &updatedDesc, + Desc: updatedDesc, }, }, }) @@ -1006,10 +1215,10 @@ func (r *Replica) changeReplicas( if msg, ok := maybeDescriptorChangedError(desc, err); ok { err = &benignError{errors.New(msg)} } - return nil, errors.Wrapf(err, "change replicas of r%d failed", rangeID) + return errors.Wrapf(err, "change replicas of r%d failed", desc.RangeID) } log.Event(ctx, "txn complete") - return &updatedDesc, nil + return nil } // sendSnapshot sends a snapshot of the replica state to the specified replica. diff --git a/pkg/storage/replica_follower_read.go b/pkg/storage/replica_follower_read.go index ccf8b3c13ba5..1a2faa867edc 100644 --- a/pkg/storage/replica_follower_read.go +++ b/pkg/storage/replica_follower_read.go @@ -37,6 +37,19 @@ var FollowerReadsEnabled = settings.RegisterBoolSetting( func (r *Replica) canServeFollowerRead( ctx context.Context, ba *roachpb.BatchRequest, pErr *roachpb.Error, ) *roachpb.Error { + // There's no known reason that a learner replica couldn't serve follower + // reads (or RangeFeed), but as of the time of writing, learners are expected + // to be short-lived, so it's not worth working out the edge-cases. Revisit if + // we add long-lived learners. + repDesc, err := r.GetReplicaDescriptor() + if err != nil { + return roachpb.NewError(err) + } + if repDesc.GetType() == roachpb.ReplicaType_LEARNER { + log.Event(ctx, "learner replicas cannot serve follower reads") + return pErr + } + canServeFollowerRead := false if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch && diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go new file mode 100644 index 000000000000..2e96e36fd324 --- /dev/null +++ b/pkg/storage/replica_learner_test.go @@ -0,0 +1,576 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage_test + +import ( + "context" + "fmt" + "path/filepath" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/tracker" +) + +// TODO(dan): Test learners and quota pool. +// TODO(dan): Grep the codebase for "preemptive" and audit. +// TODO(dan): Write a test like TestLearnerAdminChangeReplicasRace for the +// replicate queue leadership transfer race. + +type learnerTestKnobs struct { + storeKnobs storage.StoreTestingKnobs + replicaAddStopAfterLearnerAtomic int64 +} + +func makeLearnerTestKnobs() (base.TestingKnobs, *learnerTestKnobs) { + var k learnerTestKnobs + k.storeKnobs.ReplicaAddStopAfterLearnerSnapshot = func() bool { + return atomic.LoadInt64(&k.replicaAddStopAfterLearnerAtomic) > 0 + } + return base.TestingKnobs{Store: &k.storeKnobs}, &k +} + +func getFirstStoreReplica( + t *testing.T, s serverutils.TestServerInterface, key roachpb.Key, +) (*storage.Store, *storage.Replica) { + t.Helper() + store, err := s.GetStores().(*storage.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + var repl *storage.Replica + testutils.SucceedsSoon(t, func() error { + repl = store.LookupReplica(roachpb.RKey(key)) + if repl == nil { + return errors.New(`could not find replica`) + } + return nil + }) + return store, repl +} + +// Some of the metrics used in these tests live on the queue objects and are +// registered with of storage.StoreMetrics instead of living on it. Example: +// queue.replicate.removelearnerreplica. +// +// TODO(dan): Move things like ReplicateQueueMetrics to be a field on +// storage.StoreMetrics and just keep a reference in newReplicateQueue. Ditto +// for other queues that do this. +func getFirstStoreMetric(t *testing.T, s serverutils.TestServerInterface, name string) int64 { + t.Helper() + store, err := s.GetStores().(*storage.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + + var c int64 + var found bool + store.Registry().Each(func(n string, v interface{}) { + if name == n { + switch t := v.(type) { + case *metric.Counter: + c = t.Count() + found = true + case *metric.Gauge: + c = t.Value() + found = true + } + } + }) + if !found { + panic(fmt.Sprintf("couldn't find metric %s", name)) + } + return c +} + +func TestAddReplicaViaLearner(t *testing.T) { + defer leaktest.AfterTest(t)() + // The happy case! \o/ + + blockUntilSnapshotCh := make(chan struct{}) + blockSnapshotsCh := make(chan struct{}) + knobs, ltk := makeLearnerTestKnobs() + ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { + close(blockUntilSnapshotCh) + select { + case <-blockSnapshotsCh: + case <-time.After(10 * time.Second): + return errors.New(`test timed out`) + } + return nil + } + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + _, err := tc.AddReplicas(scratchStartKey, tc.Target(1)) + return err + }) + + // Wait until the snapshot starts, which happens after the learner has been + // added. + <-blockUntilSnapshotCh + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.Len(t, desc.Replicas().Voters(), 1) + require.Len(t, desc.Replicas().Learners(), 1) + + var voters, nonVoters string + db.QueryRow(t, + `SELECT array_to_string(replicas, ','), array_to_string(learner_replicas, ',') FROM crdb_internal.ranges_no_leases WHERE range_id = $1`, + desc.RangeID, + ).Scan(&voters, &nonVoters) + require.Equal(t, `1`, voters) + require.Equal(t, `2`, nonVoters) + + // Unblock the snapshot and let the learner get promoted to a voter. + close(blockSnapshotsCh) + require.NoError(t, g.Wait()) + + desc = tc.LookupRangeOrFatal(t, scratchStartKey) + require.Len(t, desc.Replicas().Voters(), 2) + require.Len(t, desc.Replicas().Learners(), 0) + require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(1), `range.snapshots.learner-applied`)) +} + +func TestLearnerRaftConfState(t *testing.T) { + defer leaktest.AfterTest(t)() + + verifyLearnerInRaftOnNodes := func( + key roachpb.Key, id roachpb.ReplicaID, servers []*server.TestServer, + ) { + t.Helper() + var repls []*storage.Replica + for _, s := range servers { + _, repl := getFirstStoreReplica(t, s, key) + repls = append(repls, repl) + } + testutils.SucceedsSoon(t, func() error { + for _, repl := range repls { + status := repl.RaftStatus() + if status == nil { + return errors.Errorf(`%s is still waking up`, repl) + } + if _, ok := status.Config.Learners[uint64(id)]; !ok { + return errors.Errorf(`%s thinks %d is not a learner`, repl, id) + } + } + return nil + }) + } + + // Run the TestCluster with a known datadir so we can shut it down and start a + // new one on top of the existing data as part of the test. + dir, cleanup := testutils.TempDir(t) + defer cleanup() + + knobs, ltk := makeLearnerTestKnobs() + ctx := context.Background() + const numNodes = 2 + serverArgsPerNode := make(map[int]base.TestServerArgs) + for i := 0; i < numNodes; i++ { + path := filepath.Join(dir, "testserver", strconv.Itoa(i)) + serverArgsPerNode[i] = base.TestServerArgs{ + Knobs: knobs, + StoreSpecs: []base.StoreSpec{{InMemory: false, Path: path}}, + } + } + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ServerArgsPerNode: serverArgsPerNode, + ReplicationMode: base.ReplicationManual, + }) + defer func() { + // We modify the value of `tc` below to start up a second cluster, so in + // contrast to other tests, run this `defer Stop` in an anonymous func. + tc.Stopper().Stop(ctx) + }() + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + // Add a learner replica, send a snapshot so that it's materialized as a + // Replica on the Store, but don't promote it to a voter. + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + desc := tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + require.Len(t, desc.Replicas().Learners(), 1) + learnerReplicaID := desc.Replicas().Learners()[0].ReplicaID + + // Verify that raft on every node thinks it's a learner. This checks that we + // use ConfChangeAddLearnerNode in the ConfChange and also checks that we + // correctly generate the ConfState for the snapshot. + verifyLearnerInRaftOnNodes(scratchStartKey, learnerReplicaID, tc.Servers) + + // Shut down the cluster and restart it, then verify again that raft on every + // node thinks our learner is a learner. This checks that we generate the + // initial ConfState correctly. + tc.Stopper().Stop(ctx) + tc = testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ServerArgsPerNode: serverArgsPerNode, + ReplicationMode: base.ReplicationManual, + }) + { + // Ping the raft group to wake it up. + _, err := tc.Server(0).DB().Get(ctx, scratchStartKey) + require.NoError(t, err) + } + verifyLearnerInRaftOnNodes(scratchStartKey, learnerReplicaID, tc.Servers) +} + +func TestLearnerSnapshotFailsRollback(t *testing.T) { + defer leaktest.AfterTest(t)() + + var rejectSnapshots int64 + knobs, ltk := makeLearnerTestKnobs() + ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { + if atomic.LoadInt64(&rejectSnapshots) > 0 { + return errors.New(`nope`) + } + return nil + } + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(&rejectSnapshots, 1) + _, err := tc.AddReplicas(scratchStartKey, tc.Target(1)) + // TODO(dan): It'd be nice if we could cancel the `AddReplicas` context before + // returning the error from the `ReceiveSnapshot` knob to test the codepath + // that uses a new context for the rollback, but plumbing that context is + // annoying. + if !testutils.IsError(err, `remote couldn't accept LEARNER snapshot`) { + t.Fatalf(`expected "remote couldn't accept LEARNER snapshot" error got: %+v`, err) + } + + // Make sure we cleaned up after ourselves (by removing the learner). + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.Empty(t, desc.Replicas().Learners()) +} + +func TestSplitWithLearner(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + // Add a learner replica, send a snapshot so that it's materialized as a + // Replica on the Store, but don't promote it to a voter. + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Splitting a learner is allowed. This orphans the two learners, but the + // replication queue will eventually clean this up. + left, right, err := tc.SplitRange(scratchStartKey.Next()) + require.NoError(t, err) + require.Len(t, left.Replicas().Learners(), 1) + require.Len(t, right.Replicas().Learners(), 1) +} + +func TestReplicateQueueSeesLearner(t *testing.T) { + defer leaktest.AfterTest(t)() + // NB also see TestAllocatorRemoveLearner for a lower-level test. + + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + // Add a learner replica, send a snapshot so that it's materialized as a + // Replica on the Store, but don't promote it to a voter. + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Run the replicate queue. + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + require.Equal(t, int64(0), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`)) + _, errMsg, err := store.ManuallyEnqueue(ctx, "replicate", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + require.Equal(t, int64(1), getFirstStoreMetric(t, tc.Server(0), `queue.replicate.removelearnerreplica`)) + + // Make sure it deleted the learner. + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.Empty(t, desc.Replicas().Learners()) + + // Bonus points: the replicate queue keeps processing until there is nothing + // to do, so it should have upreplicated the range to 3. + require.Len(t, desc.Replicas().Voters(), 3) +} + +func TestGCQueueSeesLearner(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + // Add a learner replica, send a snapshot so that it's materialized as a + // Replica on the Store, but don't promote it to a voter. + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Run the replicaGC queue. + store, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey) + trace, errMsg, err := store.ManuallyEnqueue(ctx, "replicaGC", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + const msg = `not gc'able, replica is still in range descriptor: (n2,s2):2LEARNER` + require.Contains(t, tracing.FormatRecordedSpans(trace), msg) + + // Make sure it didn't collect the learner. + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.NotEmpty(t, desc.Replicas().Learners()) +} + +func TestRaftSnapshotQueueSeesLearner(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + blockSnapshotsCh := make(chan struct{}) + knobs, ltk := makeLearnerTestKnobs() + ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { + select { + case <-blockSnapshotsCh: + case <-time.After(10 * time.Second): + return errors.New(`test timed out`) + } + return nil + } + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + // Create a learner replica. + scratchStartKey := tc.ScratchRange(t) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + _, err := tc.AddReplicas(scratchStartKey, tc.Target(1)) + return err + }) + + // Wait until raft knows that the learner needs a snapshot. + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + testutils.SucceedsSoon(t, func() error { + for _, p := range repl.RaftStatus().Progress { + if p.State == tracker.StateSnapshot { + return nil + } + } + return errors.New(`expected some replica to need a snapshot`) + }) + + // Note the value of the metrics before. + generatedBefore := getFirstStoreMetric(t, tc.Server(0), `range.snapshots.generated`) + raftAppliedBefore := getFirstStoreMetric(t, tc.Server(0), `range.snapshots.normal-applied`) + + // Run the raftsnapshot queue. + trace, errMsg, err := store.ManuallyEnqueue(ctx, "raftsnapshot", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + const msg = `not sending snapshot type RAFT to learner: (n2,s2):2LEARNER` + require.Contains(t, tracing.FormatRecordedSpans(trace), msg) + + // Make sure it didn't send any RAFT snapshots. + require.Equal(t, generatedBefore, getFirstStoreMetric(t, tc.Server(0), `range.snapshots.generated`)) + require.Equal(t, raftAppliedBefore, getFirstStoreMetric(t, tc.Server(0), `range.snapshots.normal-applied`)) + + close(blockSnapshotsCh) + require.NoError(t, g.Wait()) +} + +// This test verifies the result of a race between the replicate queue running +// while an AdminChangeReplicas is adding a replica. +func TestLearnerAdminChangeReplicasRace(t *testing.T) { + defer leaktest.AfterTest(t)() + + blockUntilSnapshotCh := make(chan struct{}, 2) + blockSnapshotsCh := make(chan struct{}) + knobs, ltk := makeLearnerTestKnobs() + ltk.storeKnobs.ReceiveSnapshot = func(h *storage.SnapshotRequest_Header) error { + blockUntilSnapshotCh <- struct{}{} + <-blockSnapshotsCh + return nil + } + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + // Add the learner. + scratchStartKey := tc.ScratchRange(t) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + _, err := tc.AddReplicas(scratchStartKey, tc.Target(1)) + return err + }) + + // Wait until the snapshot starts, which happens after the learner has been + // added. + <-blockUntilSnapshotCh + + // Removes the learner out from under the coordinator running on behalf of + // AddReplicas. + _, err := tc.RemoveReplicas(scratchStartKey, tc.Target(1)) + require.NoError(t, err) + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.Len(t, desc.Replicas().Voters(), 1) + require.Len(t, desc.Replicas().Learners(), 0) + + // Unblock the snapshot, and surprise AddReplicas. It should retry and error + // that the descriptor has changed since the AdminChangeReplicas command + // started. + close(blockSnapshotsCh) + if err := g.Wait(); !testutils.IsError(err, `descriptor changed`) { + t.Fatalf(`expected "descriptor changed" error got: %+v`, err) + } + desc = tc.LookupRangeOrFatal(t, scratchStartKey) + require.Len(t, desc.Replicas().Voters(), 1) + require.Len(t, desc.Replicas().Learners(), 0) +} + +func TestLearnerNoAcceptLease(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + // Add a learner replica, send a snapshot so that it's materialized as a + // Replica on the Store, but don't promote it to a voter. + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + err := tc.TransferRangeLease(desc, tc.Target(1)) + if !testutils.IsError(err, `cannot transfer lease to replica of type LEARNER`) { + t.Fatalf(`expected "cannot transfer lease to replica of type LEARNER" error got: %+v`, err) + } +} + +func TestLearnerFollowerRead(t *testing.T) { + defer leaktest.AfterTest(t)() + + if util.RaceEnabled { + // Limiting how long transactions can run does not work well with race + // unless we're extremely lenient, which drives up the test duration. + t.Skip("skipping under race") + } + + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = $1`, testingTargetDuration) + db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1`, closeFraction) + db.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + scratchDesc := tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + req := roachpb.BatchRequest{Header: roachpb.Header{ + RangeID: scratchDesc.RangeID, + Timestamp: tc.Server(0).Clock().Now(), + }} + req.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{ + Key: scratchDesc.StartKey.AsRawKey(), EndKey: scratchDesc.EndKey.AsRawKey(), + }}) + + _, repl := getFirstStoreReplica(t, tc.Server(1), scratchStartKey) + testutils.SucceedsSoon(t, func() error { + // Trace the Send call so we can verify that it hit the exact `learner + // replicas cannot serve follower reads` branch that we're trying to test. + sendCtx, collect, cancel := tracing.ContextWithRecordingSpan(ctx, "manual read request") + defer cancel() + _, pErr := repl.Send(sendCtx, req) + err := pErr.GoError() + if !testutils.IsError(err, `not lease holder`) { + return errors.Errorf(`expected "not lease holder" error got: %+v`, err) + } + const msg = `learner replicas cannot serve follower reads` + formattedTrace := tracing.FormatRecordedSpans(collect()) + if !strings.Contains(formattedTrace, msg) { + return errors.Errorf("expected a trace with `%s` got:\n%s", msg, formattedTrace) + } + return nil + }) +} diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index 94595aff74b8..22ccc1e421a9 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -445,11 +445,18 @@ func (b *propBuf) FlushLockedWithRaftGroup(raftGroup *raft.RawNode) error { continue } - if err := raftGroup.ProposeConfChange(raftpb.ConfChange{ + confChange := raftpb.ConfChange{ Type: changeTypeInternalToRaft[crt.ChangeType], NodeID: uint64(crt.Replica.ReplicaID), Context: encodedCtx, - }); err != nil && err != raft.ErrProposalDropped { + } + if confChange.Type == raftpb.ConfChangeAddNode && + crt.Replica.GetType() == roachpb.ReplicaType_LEARNER { + confChange.Type = raftpb.ConfChangeAddLearnerNode + } + if err := raftGroup.ProposeConfChange( + confChange, + ); err != nil && err != raft.ErrProposalDropped { // Silently ignore dropped proposals (they were always silently // ignored prior to the introduction of ErrProposalDropped). // TODO(bdarnell): Handle ErrProposalDropped better. diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 494e4a6cd04f..7f0d03dff53b 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -66,9 +66,12 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, return raftpb.HardState{}, raftpb.ConfState{}, err } var cs raftpb.ConfState - for _, rep := range r.mu.state.Desc.Replicas().All() { + for _, rep := range r.mu.state.Desc.Replicas().Voters() { cs.Nodes = append(cs.Nodes, uint64(rep.ReplicaID)) } + for _, rep := range r.mu.state.Desc.Replicas().Learners() { + cs.Learners = append(cs.Learners, uint64(rep.ReplicaID)) + } return hs, cs, nil } @@ -536,9 +539,12 @@ func snapshot( // Synthesize our raftpb.ConfState from desc. var cs raftpb.ConfState - for _, rep := range desc.Replicas().All() { + for _, rep := range desc.Replicas().Voters() { cs.Nodes = append(cs.Nodes, uint64(rep.ReplicaID)) } + for _, rep := range desc.Replicas().Learners() { + cs.Learners = append(cs.Learners, uint64(rep.ReplicaID)) + } term, err := term(ctx, rsl, snap, rangeID, eCache, appliedIndex) if err != nil { diff --git a/pkg/storage/replica_range_lease.go b/pkg/storage/replica_range_lease.go index 7b67fdcf239d..64f4954c666c 100644 --- a/pkg/storage/replica_range_lease.go +++ b/pkg/storage/replica_range_lease.go @@ -675,6 +675,18 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, errors.Errorf("unable to find store %d in range %+v", target, desc) } + // For now, don't allow replicas of type LEARNER to be leaseholders, see + // comments in RequestLease and TransferLease for why. + // + // TODO(dan): We shouldn't need this, the checks in RequestLease and + // TransferLease are the canonical ones and should be sufficient. Sadly, the + // `r.mu.minLeaseProposedTS = status.Timestamp` line below will likely play + // badly with that. This would be an issue even without learners, but + // omitting this check would make it worse. Fixme. + if t := nextLeaseHolder.GetType(); t != roachpb.ReplicaType_VOTER { + return nil, nil, errors.Errorf(`cannot transfer lease to replica of type %s`, t) + } + if nextLease, ok := r.mu.pendingLeaseRequest.RequestPending(); ok && nextLease.Replica != nextLeaseHolder { repDesc, err := r.getReplicaDescriptorRLocked() diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 3126749ca73b..84a06df63982 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -69,6 +69,12 @@ var ( Measurement: "Replica Removals", Unit: metric.Unit_COUNT, } + metaReplicateQueueRemoveLearnerReplicaCount = metric.Metadata{ + Name: "queue.replicate.removelearnerreplica", + Help: "Number of learner replica removals attempted by the replicate queue (typically due to internal race conditions)", + Measurement: "Replica Removals", + Unit: metric.Unit_COUNT, + } metaReplicateQueueRebalanceReplicaCount = metric.Metadata{ Name: "queue.replicate.rebalancereplica", Help: "Number of replica rebalancer-initiated additions attempted by the replicate queue", @@ -104,20 +110,22 @@ func (*quorumError) purgatoryErrorMarker() {} // ReplicateQueueMetrics is the set of metrics for the replicate queue. type ReplicateQueueMetrics struct { - AddReplicaCount *metric.Counter - RemoveReplicaCount *metric.Counter - RemoveDeadReplicaCount *metric.Counter - RebalanceReplicaCount *metric.Counter - TransferLeaseCount *metric.Counter + AddReplicaCount *metric.Counter + RemoveReplicaCount *metric.Counter + RemoveDeadReplicaCount *metric.Counter + RemoveLearnerReplicaCount *metric.Counter + RebalanceReplicaCount *metric.Counter + TransferLeaseCount *metric.Counter } func makeReplicateQueueMetrics() ReplicateQueueMetrics { return ReplicateQueueMetrics{ - AddReplicaCount: metric.NewCounter(metaReplicateQueueAddReplicaCount), - RemoveReplicaCount: metric.NewCounter(metaReplicateQueueRemoveReplicaCount), - RemoveDeadReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaCount), - RebalanceReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceReplicaCount), - TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount), + AddReplicaCount: metric.NewCounter(metaReplicateQueueAddReplicaCount), + RemoveReplicaCount: metric.NewCounter(metaReplicateQueueRemoveReplicaCount), + RemoveDeadReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaCount), + RemoveLearnerReplicaCount: metric.NewCounter(metaReplicateQueueRemoveLearnerReplicaCount), + RebalanceReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceReplicaCount), + TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount), } } @@ -526,6 +534,25 @@ func (rq *replicateQueue) processOneChange( ); err != nil { return false, err } + case AllocatorRemoveLearner: + learnerReplicas := desc.Replicas().Learners() + if len(learnerReplicas) == 0 { + log.VEventf(ctx, 1, "range of replica %s was identified as having learner replicas, "+ + "but no learner replicas were found", repl) + break + } + learnerReplica := learnerReplicas[0] + rq.metrics.RemoveLearnerReplicaCount.Inc(1) + log.VEventf(ctx, 1, "removing learner replica %+v from store", learnerReplica) + target := roachpb.ReplicationTarget{ + NodeID: learnerReplica.NodeID, + StoreID: learnerReplica.StoreID, + } + if err := rq.removeReplica( + ctx, repl, target, desc, storagepb.ReasonAbandonedLearner, "", dryRun, + ); err != nil { + return false, err + } case AllocatorConsiderRebalance: // The Noop case will result if this replica was queued in order to // rebalance. Attempt to find a rebalancing target. @@ -660,7 +687,7 @@ func (rq *replicateQueue) addReplica( if dryRun { return nil } - if _, err := repl.changeReplicas(ctx, roachpb.ADD_REPLICA, target, desc, priority, reason, details); err != nil { + if _, err := repl.addReplica(ctx, target, desc, priority, reason, details); err != nil { return err } rangeInfo := rangeInfoForRepl(repl, desc) diff --git a/pkg/storage/storagepb/log.go b/pkg/storage/storagepb/log.go index 53601436458b..0cf93eb155ac 100644 --- a/pkg/storage/storagepb/log.go +++ b/pkg/storage/storagepb/log.go @@ -22,4 +22,5 @@ const ( ReasonStoreDecommissioning RangeLogEventReason = "store decommissioning" ReasonRebalance RangeLogEventReason = "rebalance" ReasonAdminRequest RangeLogEventReason = "admin request" + ReasonAbandonedLearner RangeLogEventReason = "abandoned learner replica" ) diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index c91e20102b61..7aafe83985a4 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -588,6 +588,12 @@ func (s *Store) shouldAcceptSnapshotData( func (s *Store) receiveSnapshot( ctx context.Context, header *SnapshotRequest_Header, stream incomingSnapshotStream, ) error { + if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { + if err := fn(header); err != nil { + return sendSnapshotError(stream, err) + } + } + cleanup, rejectionMsg, err := s.reserveSnapshot(ctx, header) if err != nil { return err diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index bb45d0fa7006..5a4945e977d6 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -181,6 +181,17 @@ type StoreTestingKnobs struct { // TraceAllRaftEvents enables raft event tracing even when the current // vmodule would not have enabled it. TraceAllRaftEvents bool + + // ReceiveSnapshot is run after receiving a snapshot header but before + // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an + // error is returned from the hook, it's sent as an ERROR SnapshotResponse. + ReceiveSnapshot func(*SnapshotRequest_Header) error + // ReplicaAddStopAfterLearnerSnapshot causes replica addition to return early + // if the func returns true. Specifically, after the learner txn is successful + // and after the LEARNER type snapshot, but before promoting it to a voter. + // This ensures the `*Replica` will be materialized on the Store when it + // returns. + ReplicaAddStopAfterLearnerSnapshot func() bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 5f57448fb90b..e3ef2f96cab2 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -52,6 +52,12 @@ type TestClusterInterface interface { startKey roachpb.Key, targets ...roachpb.ReplicationTarget, ) (roachpb.RangeDescriptor, error) + // AddReplicasOrFatal is the same as AddReplicas but will Fatal the test on + // error. + AddReplicasOrFatal( + t testing.TB, startKey roachpb.Key, targets ...roachpb.ReplicationTarget, + ) roachpb.RangeDescriptor + // RemoveReplicas removes one or more replicas from a range. RemoveReplicas( startKey roachpb.Key, targets ...roachpb.ReplicationTarget, @@ -86,6 +92,10 @@ type TestClusterInterface interface { // LookupRange returns the descriptor of the range containing key. LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, error) + // LookupRangeOrFatal is the same as LookupRange but will Fatal the test on + // error. + LookupRangeOrFatal(t testing.TB, key roachpb.Key) roachpb.RangeDescriptor + // Target returns a roachpb.ReplicationTarget for the specified server. Target(serverIdx int) roachpb.ReplicationTarget diff --git a/pkg/testutils/soon.go b/pkg/testutils/soon.go index 7c5d6c5696d2..b33ae3b61a5c 100644 --- a/pkg/testutils/soon.go +++ b/pkg/testutils/soon.go @@ -47,7 +47,7 @@ func SucceedsSoonError(t testing.TB, fn func() error) error { wrappedFn := func() error { err := fn() if timeutil.Since(tBegin) > 3*time.Second && err != nil { - log.InfoDepth(context.Background(), 3, errors.Wrap(err, "SucceedsSoon")) + log.InfoDepth(context.Background(), 4, errors.Wrap(err, "SucceedsSoon")) } return err } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 5846710f85f4..326afbdbcc65 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -14,6 +14,7 @@ import ( "context" gosql "database/sql" "fmt" + "math" "sync" "testing" "time" @@ -44,6 +45,7 @@ type TestCluster struct { Conns []*gosql.DB stopper *stop.Stopper replicationMode base.TestClusterReplicationMode + scratchRangeID roachpb.RangeID mu struct { syncutil.Mutex serverStoppers []*stop.Stopper @@ -333,6 +335,16 @@ func (tc *TestCluster) LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, er return tc.Servers[0].LookupRange(key) } +// LookupRangeOrFatal is part of TestClusterInterface. +func (tc *TestCluster) LookupRangeOrFatal(t testing.TB, key roachpb.Key) roachpb.RangeDescriptor { + t.Helper() + desc, err := tc.LookupRange(key) + if err != nil { + t.Fatalf(`looking up range for %s: %+v`, key, err) + } + return desc +} + // SplitRange splits the range containing splitKey. // The right range created by the split starts at the split key and extends to the // original range's end key. @@ -418,6 +430,19 @@ func (tc *TestCluster) AddReplicas( } } +// AddReplicasOrFatal is part of TestClusterInterface. +func (tc *TestCluster) AddReplicasOrFatal( + t testing.TB, startKey roachpb.Key, targets ...roachpb.ReplicationTarget, +) roachpb.RangeDescriptor { + t.Helper() + desc, err := tc.AddReplicas(startKey, targets...) + if err != nil { + t.Fatalf(`could not add %v replicas to range containing %s: %+v`, + targets, startKey, err) + } + return desc +} + // RemoveReplicas is part of the TestServerInterface. func (tc *TestCluster) RemoveReplicas( startKey roachpb.Key, targets ...roachpb.ReplicationTarget, @@ -495,10 +520,28 @@ func (tc *TestCluster) FindRangeLeaseHolder( return roachpb.ReplicationTarget{NodeID: replicaDesc.NodeID, StoreID: replicaDesc.StoreID}, nil } -// WaitForSplitAndReplication waits for a range which starts with -// startKey and then verifies that each replica in the range -// descriptor has been created. -func (tc *TestCluster) WaitForSplitAndReplication(startKey roachpb.Key) error { +// ScratchRange returns the start key of a span of keyspace suitable for use as +// kv scratch space (it doesn't overlap system spans or SQL tables). The range +// is lazily split off on the first call to ScratchRange. +func (tc *TestCluster) ScratchRange(t testing.TB) roachpb.Key { + scratchKey := keys.MakeTablePrefix(math.MaxUint32) + if tc.scratchRangeID > 0 { + return scratchKey + } + _, right, err := tc.SplitRange(scratchKey) + if err != nil { + t.Fatal(err) + } + tc.scratchRangeID = right.RangeID + return scratchKey +} + +// WaitForSplitAndInitialization waits for a range which starts with startKey +// and then verifies that each replica in the range descriptor has been created. +// +// NB: This doesn't actually wait for full upreplication to whatever the zone +// config specifies. +func (tc *TestCluster) WaitForSplitAndInitialization(startKey roachpb.Key) error { return retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { desc, err := tc.LookupRange(startKey) if err != nil { diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index d2b2928308f9..27dbe9459bd0 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1292,6 +1292,7 @@ var charts = []sectionDescription{ Metrics: []string{ "queue.replicate.removedeadreplica", "queue.replicate.removereplica", + "queue.replicate.removelearnerreplica", }, }, { diff --git a/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx b/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx index 3327a4eb8bad..0829b80867c4 100644 --- a/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx +++ b/pkg/ui/src/views/cluster/containers/nodeGraphs/dashboards/queues.tsx @@ -65,6 +65,7 @@ export default function (props: GraphDashboardProps) { +