diff --git a/pkg/internal/client/db.go b/pkg/internal/client/db.go index e9d55123f034..b806bcd5109c 100644 --- a/pkg/internal/client/db.go +++ b/pkg/internal/client/db.go @@ -451,10 +451,11 @@ func (db *DB) DelRange(ctx context.Context, begin, end interface{}) error { return getOneErr(db.Run(ctx, b), b) } -// AdminMerge merges the range containing key and the subsequent -// range. After the merge operation is complete, the range containing -// key will contain all of the key/value pairs of the subsequent range -// and the subsequent range will no longer exist. +// AdminMerge merges the range containing key and the subsequent range. After +// the merge operation is complete, the range containing key will contain all of +// the key/value pairs of the subsequent range and the subsequent range will no +// longer exist. Neither range may contain learner replicas, if one does, an +// error is returned. // // key can be either a byte slice or a string. func (db *DB) AdminMerge(ctx context.Context, key interface{}) error { diff --git a/pkg/storage/merge_queue.go b/pkg/storage/merge_queue.go index 33ec326b5bbf..c9b0c5e7d3fd 100644 --- a/pkg/storage/merge_queue.go +++ b/pkg/storage/merge_queue.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/pkg/errors" ) const ( @@ -239,6 +240,15 @@ func (mq *mergeQueue) process( return nil } + // Range was manually split and not expired, so skip merging. + now := mq.store.Clock().Now() + if now.Less(rhsDesc.GetStickyBit()) { + log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired") + // TODO(jeffreyxiao): Consider returning a purgatory error to avoid + // repeatedly processing ranges that cannot be merged. + return nil + } + mergedDesc := &roachpb.RangeDescriptor{ StartKey: lhsDesc.StartKey, EndKey: rhsDesc.EndKey, @@ -264,13 +274,46 @@ func (mq *mergeQueue) process( return nil } - if !replicaSetsEqual(lhsDesc.Replicas().Unwrap(), rhsDesc.Replicas().Unwrap()) { + { + // AdminMerge errors if there are learners on either side and + // AdminRelocateRange removes any on the range it operates on. For the sake + // of obviousness, just remove them all upfront. + newLHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), lhsDesc) + if err != nil { + log.VEventf(ctx, 2, `%v`, err) + return err + } + lhsDesc = newLHSDesc + newRHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), &rhsDesc) + if err != nil { + log.VEventf(ctx, 2, `%v`, err) + return err + } + rhsDesc = *newRHSDesc + } + lhsReplicas, rhsReplicas := lhsDesc.Replicas().All(), rhsDesc.Replicas().All() + + // Defensive sanity check that everything is now a voter. + for i := range lhsReplicas { + if lhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER { + return errors.Errorf(`cannot merge non-voter replicas on lhs: %v`, lhsReplicas) + } + } + for i := range rhsReplicas { + if rhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER { + return errors.Errorf(`cannot merge non-voter replicas on rhs: %v`, rhsReplicas) + } + } + + if !replicaSetsEqual(lhsReplicas, rhsReplicas) { var targets []roachpb.ReplicationTarget - for _, lhsReplDesc := range lhsDesc.Replicas().Unwrap() { + for _, lhsReplDesc := range lhsReplicas { targets = append(targets, roachpb.ReplicationTarget{ NodeID: lhsReplDesc.NodeID, StoreID: lhsReplDesc.StoreID, }) } + // AdminRelocateRange moves the lease to the first target in the list, so + // sort the existing leaseholder there to leave it unchanged. lease, _ := lhsRepl.GetLease() for i := range targets { if targets[i].NodeID == lease.Replica.NodeID && targets[i].StoreID == lease.Replica.StoreID { @@ -287,15 +330,6 @@ func (mq *mergeQueue) process( } } - // Range was manually split and not expired, so skip merging. - now := mq.store.Clock().Now() - if now.Less(rhsDesc.GetStickyBit()) { - log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired") - // TODO(jeffreyxiao): Consider returning a purgatory error to avoid - // repeatedly processing ranges that cannot be merged. - return nil - } - log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey) reason := fmt.Sprintf("lhs+rhs has (size=%s+%s qps=%.2f+%.2f --> %.2fqps) below threshold (size=%s, qps=%.2f)", humanizeutil.IBytes(lhsStats.Total()), diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 9ae8be77637f..a21e199c0fcd 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -566,8 +566,19 @@ func (r *Replica) AdminMerge( // Should never happen, but just in case. return errors.Errorf("ranges are not adjacent; %s != %s", origLeftDesc.EndKey, rightDesc.StartKey) } - if l, r := origLeftDesc.Replicas(), rightDesc.Replicas(); !replicaSetsEqual(l.Unwrap(), r.Unwrap()) { - return errors.Errorf("ranges not collocated; %s != %s", l, r) + // For simplicity, don't handle learner replicas, expect the caller to + // resolve them first. (Defensively, we check that there are no non-voter + // replicas, in case some third type is later added). This behavior can be + // changed later if the complexity becomes worth it, but it's not right now. + lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas() + if len(lReplicas.Voters()) != len(lReplicas.All()) { + return errors.Errorf("cannot merge range with non-voter replicas on lhs: %s", lReplicas) + } + if len(rReplicas.Voters()) != len(rReplicas.All()) { + return errors.Errorf("cannot merge range with non-voter replicas on rhs: %s", rReplicas) + } + if !replicaSetsEqual(lReplicas.All(), rReplicas.All()) { + return errors.Errorf("ranges not collocated; %s != %s", lReplicas, rReplicas) } updatedLeftDesc := *origLeftDesc @@ -1549,6 +1560,20 @@ func (s *Store) AdminRelocateRange( rangeDesc.SetReplicas(rangeDesc.Replicas().DeepCopy()) startKey := rangeDesc.StartKey.AsRawKey() + // Step 0: Remove all learners so we don't have to think about them. We could + // do something smarter here and try to promote them, but it doesn't seem + // worth the complexity right now. Revisit if this is an issue in practice. + // + // Note that we can't just add the learners to removeTargets. The below logic + // always does add then remove and if the learner was in the requested + // targets, we might try to add it before removing it. + newDesc, err := removeLearners(ctx, s.DB(), &rangeDesc) + if err != nil { + log.Warning(ctx, err) + return err + } + rangeDesc = *newDesc + // Step 1: Compute which replicas are to be added and which are to be removed. // // TODO(radu): we can't have multiple replicas on different stores on the @@ -1611,32 +1636,6 @@ func (s *Store) AdminRelocateRange( } } - // updateRangeDesc updates the passed RangeDescriptor following the successful - // completion of an AdminChangeReplicasRequest with the single provided target - // and changeType. - // TODO(ajwerner): Remove this for 19.2 after AdminChangeReplicas always - // returns a non-nil Desc. - updateRangeDesc := func( - desc *roachpb.RangeDescriptor, - changeType roachpb.ReplicaChangeType, - target roachpb.ReplicationTarget, - ) { - switch changeType { - case roachpb.ADD_REPLICA: - desc.AddReplica(roachpb.ReplicaDescriptor{ - NodeID: target.NodeID, - StoreID: target.StoreID, - ReplicaID: desc.NextReplicaID, - }) - desc.NextReplicaID++ - case roachpb.REMOVE_REPLICA: - newReplicas := removeTargetFromSlice(desc.Replicas().All(), target) - desc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas)) - default: - panic(errors.Errorf("unknown ReplicaChangeType %v", changeType)) - } - } - sysCfg := s.cfg.Gossip.GetSystemConfig() if sysCfg == nil { return fmt.Errorf("no system config available, unable to perform RelocateRange") @@ -1720,12 +1719,7 @@ func (s *Store) AdminRelocateRange( // local copy of the range descriptor such that future allocator // decisions take it into account. addTargets = removeTargetFromSlice(addTargets, target) - if newDesc != nil { - rangeInfo.Desc = newDesc - } else { - // TODO(ajwerner): Remove this case for 19.2. - updateRangeDesc(rangeInfo.Desc, roachpb.ADD_REPLICA, target) - } + rangeInfo.Desc = newDesc } if len(removeTargets) > 0 && len(removeTargets) > len(addTargets) { @@ -1757,12 +1751,7 @@ func (s *Store) AdminRelocateRange( // copy of the range descriptor such that future allocator decisions take // its absence into account. removeTargets = removeTargetFromSlice(removeTargets, target) - if newDesc != nil { - rangeInfo.Desc = newDesc - } else { - // TODO(ajwerner): Remove this case for 19.2. - updateRangeDesc(rangeInfo.Desc, roachpb.REMOVE_REPLICA, target) - } + rangeInfo.Desc = newDesc } } @@ -1788,6 +1777,26 @@ func removeTargetFromSlice( return targets } +func removeLearners( + ctx context.Context, db *client.DB, desc *roachpb.RangeDescriptor, +) (*roachpb.RangeDescriptor, error) { + learners := desc.Replicas().Learners() + if len(learners) == 0 { + return desc, nil + } + targets := make([]roachpb.ReplicationTarget, len(learners)) + for i := range learners { + targets[i].NodeID = learners[i].NodeID + targets[i].StoreID = learners[i].StoreID + } + log.VEventf(ctx, 2, `removing learner replicas %v from %v`, targets, desc) + newDesc, err := db.AdminChangeReplicas(ctx, desc.StartKey, roachpb.REMOVE_REPLICA, targets, *desc) + if err != nil { + return nil, errors.Wrapf(err, `removing learners from %s`, desc) + } + return newDesc, nil +} + // adminScatter moves replicas and leaseholders for a selection of ranges. func (r *Replica) adminScatter( ctx context.Context, args roachpb.AdminScatterRequest, diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go index 2e96e36fd324..04c7ec5ab830 100644 --- a/pkg/storage/replica_learner_test.go +++ b/pkg/storage/replica_learner_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "path/filepath" + "sort" "strconv" "strings" "sync/atomic" @@ -574,3 +575,126 @@ func TestLearnerFollowerRead(t *testing.T) { return nil }) } + +func TestLearnerAdminRelocateRange(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(2)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Test AdminRelocateRange's treatment of learners by having one that it has + // to remove and one that should stay and become a voter. + // + // Before: 1 (voter), 2 (learner), 3 (learner) + // After: 1 (voter), 2 (voter), 4 (voter) + targets := []roachpb.ReplicationTarget{tc.Target(0), tc.Target(1), tc.Target(3)} + require.NoError(t, tc.Server(0).DB().AdminRelocateRange(ctx, scratchStartKey, targets)) + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + voters := desc.Replicas().Voters() + require.Len(t, voters, len(targets)) + sort.Slice(voters, func(i, j int) bool { return voters[i].NodeID < voters[j].NodeID }) + for i := range voters { + require.Equal(t, targets[i].NodeID, voters[i].NodeID, `%v`, voters) + require.Equal(t, targets[i].StoreID, voters[i].StoreID, `%v`, voters) + } + require.Empty(t, desc.Replicas().Learners()) +} + +func TestLearnerAdminMerge(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + splitKey1 := scratchStartKey.Next() + splitKey2 := splitKey1.Next() + _, _ = tc.SplitRangeOrFatal(t, splitKey1) + _, _ = tc.SplitRangeOrFatal(t, splitKey2) + + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + _ = tc.AddReplicasOrFatal(t, splitKey2, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Learner on the lhs should fail. + err := tc.Server(0).DB().AdminMerge(ctx, scratchStartKey) + if !testutils.IsError(err, `cannot merge range with non-voter replicas on lhs`) { + t.Fatalf(`expected "cannot merge range with non-voter replicas on lhs" error got: %+v`, err) + } + // Learner on the rhs should fail. + err = tc.Server(0).DB().AdminMerge(ctx, splitKey1) + if !testutils.IsError(err, `cannot merge range with non-voter replicas on rhs`) { + t.Fatalf(`expected "cannot merge range with non-voter replicas on rhs" error got: %+v`, err) + } +} + +func TestMergeQueueSeesLearner(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + // TestCluster currently overrides this when used with ReplicationManual. + db.Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + origDesc := tc.LookupRangeOrFatal(t, scratchStartKey) + + splitKey := scratchStartKey.Next() + _, _ = tc.SplitRangeOrFatal(t, splitKey) + + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Unsplit the range to clear the sticky bit. + require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey)) + + // Run the merge queue. + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + trace, errMsg, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + formattedTrace := tracing.FormatRecordedSpans(trace) + expectedMessages := []string{ + `removing learner replicas \[n2,s2\]`, + `merging to produce range: /Table/Max-/Max`, + } + if err := testutils.MatchInOrder(formattedTrace, expectedMessages...); err != nil { + t.Fatal(err) + } + + // Sanity check that the desc has the same bounds it did originally. + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.Equal(t, origDesc.StartKey, desc.StartKey) + require.Equal(t, origDesc.EndKey, desc.EndKey) + // The merge removed the learner. + require.Len(t, desc.Replicas().Voters(), 1) + require.Empty(t, desc.Replicas().Learners()) +} diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 326afbdbcc65..f5a5ba779de9 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -358,6 +358,17 @@ func (tc *TestCluster) SplitRange( return tc.Servers[0].SplitRange(splitKey) } +// SplitRangeOrFatal is the same as SplitRange but will Fatal the test on error. +func (tc *TestCluster) SplitRangeOrFatal( + t testing.TB, splitKey roachpb.Key, +) (roachpb.RangeDescriptor, roachpb.RangeDescriptor) { + lhsDesc, rhsDesc, err := tc.Servers[0].SplitRange(splitKey) + if err != nil { + t.Fatalf(`splitting at %s: %+v`, splitKey, err) + } + return lhsDesc, rhsDesc +} + // Target returns a ReplicationTarget for the specified server. func (tc *TestCluster) Target(serverIdx int) roachpb.ReplicationTarget { s := tc.Servers[serverIdx]