From 5bda9fb2798d121c31e881571f6481dba0c1ac9b Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Mon, 29 Jul 2019 09:58:50 -0700 Subject: [PATCH] storage: handle learner replicas in merge code For simplicity of implementation and concept, learners are disallowed by AdminMerge. The merge queue now removes them before calling it. AdminRelocateRange similarly removes them before doing its work (even if the learner is one of the supplied targets). This is not strictly necessary for its use in the merge queue, since the merge queue does its own removal, but decoupling this is appealing. Additionally, AdminRelocateRange has other callers than the merge queue. Touches #38902 Release note: None --- pkg/internal/client/db.go | 9 +- pkg/storage/merge_queue.go | 56 ++++++++-- pkg/storage/replica_command.go | 89 ++++++++-------- pkg/storage/replica_learner_test.go | 124 +++++++++++++++++++++++ pkg/testutils/testcluster/testcluster.go | 11 ++ 5 files changed, 234 insertions(+), 55 deletions(-) diff --git a/pkg/internal/client/db.go b/pkg/internal/client/db.go index e9d55123f034..b806bcd5109c 100644 --- a/pkg/internal/client/db.go +++ b/pkg/internal/client/db.go @@ -451,10 +451,11 @@ func (db *DB) DelRange(ctx context.Context, begin, end interface{}) error { return getOneErr(db.Run(ctx, b), b) } -// AdminMerge merges the range containing key and the subsequent -// range. After the merge operation is complete, the range containing -// key will contain all of the key/value pairs of the subsequent range -// and the subsequent range will no longer exist. +// AdminMerge merges the range containing key and the subsequent range. After +// the merge operation is complete, the range containing key will contain all of +// the key/value pairs of the subsequent range and the subsequent range will no +// longer exist. Neither range may contain learner replicas, if one does, an +// error is returned. // // key can be either a byte slice or a string. func (db *DB) AdminMerge(ctx context.Context, key interface{}) error { diff --git a/pkg/storage/merge_queue.go b/pkg/storage/merge_queue.go index 33ec326b5bbf..c9b0c5e7d3fd 100644 --- a/pkg/storage/merge_queue.go +++ b/pkg/storage/merge_queue.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/pkg/errors" ) const ( @@ -239,6 +240,15 @@ func (mq *mergeQueue) process( return nil } + // Range was manually split and not expired, so skip merging. + now := mq.store.Clock().Now() + if now.Less(rhsDesc.GetStickyBit()) { + log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired") + // TODO(jeffreyxiao): Consider returning a purgatory error to avoid + // repeatedly processing ranges that cannot be merged. + return nil + } + mergedDesc := &roachpb.RangeDescriptor{ StartKey: lhsDesc.StartKey, EndKey: rhsDesc.EndKey, @@ -264,13 +274,46 @@ func (mq *mergeQueue) process( return nil } - if !replicaSetsEqual(lhsDesc.Replicas().Unwrap(), rhsDesc.Replicas().Unwrap()) { + { + // AdminMerge errors if there are learners on either side and + // AdminRelocateRange removes any on the range it operates on. For the sake + // of obviousness, just remove them all upfront. + newLHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), lhsDesc) + if err != nil { + log.VEventf(ctx, 2, `%v`, err) + return err + } + lhsDesc = newLHSDesc + newRHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), &rhsDesc) + if err != nil { + log.VEventf(ctx, 2, `%v`, err) + return err + } + rhsDesc = *newRHSDesc + } + lhsReplicas, rhsReplicas := lhsDesc.Replicas().All(), rhsDesc.Replicas().All() + + // Defensive sanity check that everything is now a voter. + for i := range lhsReplicas { + if lhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER { + return errors.Errorf(`cannot merge non-voter replicas on lhs: %v`, lhsReplicas) + } + } + for i := range rhsReplicas { + if rhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER { + return errors.Errorf(`cannot merge non-voter replicas on rhs: %v`, rhsReplicas) + } + } + + if !replicaSetsEqual(lhsReplicas, rhsReplicas) { var targets []roachpb.ReplicationTarget - for _, lhsReplDesc := range lhsDesc.Replicas().Unwrap() { + for _, lhsReplDesc := range lhsReplicas { targets = append(targets, roachpb.ReplicationTarget{ NodeID: lhsReplDesc.NodeID, StoreID: lhsReplDesc.StoreID, }) } + // AdminRelocateRange moves the lease to the first target in the list, so + // sort the existing leaseholder there to leave it unchanged. lease, _ := lhsRepl.GetLease() for i := range targets { if targets[i].NodeID == lease.Replica.NodeID && targets[i].StoreID == lease.Replica.StoreID { @@ -287,15 +330,6 @@ func (mq *mergeQueue) process( } } - // Range was manually split and not expired, so skip merging. - now := mq.store.Clock().Now() - if now.Less(rhsDesc.GetStickyBit()) { - log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired") - // TODO(jeffreyxiao): Consider returning a purgatory error to avoid - // repeatedly processing ranges that cannot be merged. - return nil - } - log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey) reason := fmt.Sprintf("lhs+rhs has (size=%s+%s qps=%.2f+%.2f --> %.2fqps) below threshold (size=%s, qps=%.2f)", humanizeutil.IBytes(lhsStats.Total()), diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 9ae8be77637f..a21e199c0fcd 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -566,8 +566,19 @@ func (r *Replica) AdminMerge( // Should never happen, but just in case. return errors.Errorf("ranges are not adjacent; %s != %s", origLeftDesc.EndKey, rightDesc.StartKey) } - if l, r := origLeftDesc.Replicas(), rightDesc.Replicas(); !replicaSetsEqual(l.Unwrap(), r.Unwrap()) { - return errors.Errorf("ranges not collocated; %s != %s", l, r) + // For simplicity, don't handle learner replicas, expect the caller to + // resolve them first. (Defensively, we check that there are no non-voter + // replicas, in case some third type is later added). This behavior can be + // changed later if the complexity becomes worth it, but it's not right now. + lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas() + if len(lReplicas.Voters()) != len(lReplicas.All()) { + return errors.Errorf("cannot merge range with non-voter replicas on lhs: %s", lReplicas) + } + if len(rReplicas.Voters()) != len(rReplicas.All()) { + return errors.Errorf("cannot merge range with non-voter replicas on rhs: %s", rReplicas) + } + if !replicaSetsEqual(lReplicas.All(), rReplicas.All()) { + return errors.Errorf("ranges not collocated; %s != %s", lReplicas, rReplicas) } updatedLeftDesc := *origLeftDesc @@ -1549,6 +1560,20 @@ func (s *Store) AdminRelocateRange( rangeDesc.SetReplicas(rangeDesc.Replicas().DeepCopy()) startKey := rangeDesc.StartKey.AsRawKey() + // Step 0: Remove all learners so we don't have to think about them. We could + // do something smarter here and try to promote them, but it doesn't seem + // worth the complexity right now. Revisit if this is an issue in practice. + // + // Note that we can't just add the learners to removeTargets. The below logic + // always does add then remove and if the learner was in the requested + // targets, we might try to add it before removing it. + newDesc, err := removeLearners(ctx, s.DB(), &rangeDesc) + if err != nil { + log.Warning(ctx, err) + return err + } + rangeDesc = *newDesc + // Step 1: Compute which replicas are to be added and which are to be removed. // // TODO(radu): we can't have multiple replicas on different stores on the @@ -1611,32 +1636,6 @@ func (s *Store) AdminRelocateRange( } } - // updateRangeDesc updates the passed RangeDescriptor following the successful - // completion of an AdminChangeReplicasRequest with the single provided target - // and changeType. - // TODO(ajwerner): Remove this for 19.2 after AdminChangeReplicas always - // returns a non-nil Desc. - updateRangeDesc := func( - desc *roachpb.RangeDescriptor, - changeType roachpb.ReplicaChangeType, - target roachpb.ReplicationTarget, - ) { - switch changeType { - case roachpb.ADD_REPLICA: - desc.AddReplica(roachpb.ReplicaDescriptor{ - NodeID: target.NodeID, - StoreID: target.StoreID, - ReplicaID: desc.NextReplicaID, - }) - desc.NextReplicaID++ - case roachpb.REMOVE_REPLICA: - newReplicas := removeTargetFromSlice(desc.Replicas().All(), target) - desc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas)) - default: - panic(errors.Errorf("unknown ReplicaChangeType %v", changeType)) - } - } - sysCfg := s.cfg.Gossip.GetSystemConfig() if sysCfg == nil { return fmt.Errorf("no system config available, unable to perform RelocateRange") @@ -1720,12 +1719,7 @@ func (s *Store) AdminRelocateRange( // local copy of the range descriptor such that future allocator // decisions take it into account. addTargets = removeTargetFromSlice(addTargets, target) - if newDesc != nil { - rangeInfo.Desc = newDesc - } else { - // TODO(ajwerner): Remove this case for 19.2. - updateRangeDesc(rangeInfo.Desc, roachpb.ADD_REPLICA, target) - } + rangeInfo.Desc = newDesc } if len(removeTargets) > 0 && len(removeTargets) > len(addTargets) { @@ -1757,12 +1751,7 @@ func (s *Store) AdminRelocateRange( // copy of the range descriptor such that future allocator decisions take // its absence into account. removeTargets = removeTargetFromSlice(removeTargets, target) - if newDesc != nil { - rangeInfo.Desc = newDesc - } else { - // TODO(ajwerner): Remove this case for 19.2. - updateRangeDesc(rangeInfo.Desc, roachpb.REMOVE_REPLICA, target) - } + rangeInfo.Desc = newDesc } } @@ -1788,6 +1777,26 @@ func removeTargetFromSlice( return targets } +func removeLearners( + ctx context.Context, db *client.DB, desc *roachpb.RangeDescriptor, +) (*roachpb.RangeDescriptor, error) { + learners := desc.Replicas().Learners() + if len(learners) == 0 { + return desc, nil + } + targets := make([]roachpb.ReplicationTarget, len(learners)) + for i := range learners { + targets[i].NodeID = learners[i].NodeID + targets[i].StoreID = learners[i].StoreID + } + log.VEventf(ctx, 2, `removing learner replicas %v from %v`, targets, desc) + newDesc, err := db.AdminChangeReplicas(ctx, desc.StartKey, roachpb.REMOVE_REPLICA, targets, *desc) + if err != nil { + return nil, errors.Wrapf(err, `removing learners from %s`, desc) + } + return newDesc, nil +} + // adminScatter moves replicas and leaseholders for a selection of ranges. func (r *Replica) adminScatter( ctx context.Context, args roachpb.AdminScatterRequest, diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go index 2e96e36fd324..04c7ec5ab830 100644 --- a/pkg/storage/replica_learner_test.go +++ b/pkg/storage/replica_learner_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "path/filepath" + "sort" "strconv" "strings" "sync/atomic" @@ -574,3 +575,126 @@ func TestLearnerFollowerRead(t *testing.T) { return nil }) } + +func TestLearnerAdminRelocateRange(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(2)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Test AdminRelocateRange's treatment of learners by having one that it has + // to remove and one that should stay and become a voter. + // + // Before: 1 (voter), 2 (learner), 3 (learner) + // After: 1 (voter), 2 (voter), 4 (voter) + targets := []roachpb.ReplicationTarget{tc.Target(0), tc.Target(1), tc.Target(3)} + require.NoError(t, tc.Server(0).DB().AdminRelocateRange(ctx, scratchStartKey, targets)) + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + voters := desc.Replicas().Voters() + require.Len(t, voters, len(targets)) + sort.Slice(voters, func(i, j int) bool { return voters[i].NodeID < voters[j].NodeID }) + for i := range voters { + require.Equal(t, targets[i].NodeID, voters[i].NodeID, `%v`, voters) + require.Equal(t, targets[i].StoreID, voters[i].StoreID, `%v`, voters) + } + require.Empty(t, desc.Replicas().Learners()) +} + +func TestLearnerAdminMerge(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + splitKey1 := scratchStartKey.Next() + splitKey2 := splitKey1.Next() + _, _ = tc.SplitRangeOrFatal(t, splitKey1) + _, _ = tc.SplitRangeOrFatal(t, splitKey2) + + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + _ = tc.AddReplicasOrFatal(t, splitKey2, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Learner on the lhs should fail. + err := tc.Server(0).DB().AdminMerge(ctx, scratchStartKey) + if !testutils.IsError(err, `cannot merge range with non-voter replicas on lhs`) { + t.Fatalf(`expected "cannot merge range with non-voter replicas on lhs" error got: %+v`, err) + } + // Learner on the rhs should fail. + err = tc.Server(0).DB().AdminMerge(ctx, splitKey1) + if !testutils.IsError(err, `cannot merge range with non-voter replicas on rhs`) { + t.Fatalf(`expected "cannot merge range with non-voter replicas on rhs" error got: %+v`, err) + } +} + +func TestMergeQueueSeesLearner(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + knobs, ltk := makeLearnerTestKnobs() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`) + // TestCluster currently overrides this when used with ReplicationManual. + db.Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = true`) + + scratchStartKey := tc.ScratchRange(t) + origDesc := tc.LookupRangeOrFatal(t, scratchStartKey) + + splitKey := scratchStartKey.Next() + _, _ = tc.SplitRangeOrFatal(t, splitKey) + + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 1) + _ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) + atomic.StoreInt64(<k.replicaAddStopAfterLearnerAtomic, 0) + + // Unsplit the range to clear the sticky bit. + require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey)) + + // Run the merge queue. + store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) + trace, errMsg, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */) + require.NoError(t, err) + require.Equal(t, ``, errMsg) + formattedTrace := tracing.FormatRecordedSpans(trace) + expectedMessages := []string{ + `removing learner replicas \[n2,s2\]`, + `merging to produce range: /Table/Max-/Max`, + } + if err := testutils.MatchInOrder(formattedTrace, expectedMessages...); err != nil { + t.Fatal(err) + } + + // Sanity check that the desc has the same bounds it did originally. + desc := tc.LookupRangeOrFatal(t, scratchStartKey) + require.Equal(t, origDesc.StartKey, desc.StartKey) + require.Equal(t, origDesc.EndKey, desc.EndKey) + // The merge removed the learner. + require.Len(t, desc.Replicas().Voters(), 1) + require.Empty(t, desc.Replicas().Learners()) +} diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 326afbdbcc65..f5a5ba779de9 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -358,6 +358,17 @@ func (tc *TestCluster) SplitRange( return tc.Servers[0].SplitRange(splitKey) } +// SplitRangeOrFatal is the same as SplitRange but will Fatal the test on error. +func (tc *TestCluster) SplitRangeOrFatal( + t testing.TB, splitKey roachpb.Key, +) (roachpb.RangeDescriptor, roachpb.RangeDescriptor) { + lhsDesc, rhsDesc, err := tc.Servers[0].SplitRange(splitKey) + if err != nil { + t.Fatalf(`splitting at %s: %+v`, splitKey, err) + } + return lhsDesc, rhsDesc +} + // Target returns a ReplicationTarget for the specified server. func (tc *TestCluster) Target(serverIdx int) roachpb.ReplicationTarget { s := tc.Servers[serverIdx]