Skip to content

Commit

Permalink
storage: handle learner replicas in merge code
Browse files Browse the repository at this point in the history
For simplicity of implementation and concept, learners are disallowed by
AdminMerge. The merge queue now removes them before calling it.

AdminRelocateRange similarly removes them before doing its work (even if
the learner is one of the supplied targets). This is not strictly
necessary for its use in the merge queue, since the merge queue does its
own removal, but decoupling this is appealing. Additionally,
AdminRelocateRange has other callers than the merge queue.

Touches cockroachdb#38902

Release note: None
  • Loading branch information
danhhz committed Jul 29, 2019
1 parent 04d3137 commit 5bda9fb
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 55 deletions.
9 changes: 5 additions & 4 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,11 @@ func (db *DB) DelRange(ctx context.Context, begin, end interface{}) error {
return getOneErr(db.Run(ctx, b), b)
}

// AdminMerge merges the range containing key and the subsequent
// range. After the merge operation is complete, the range containing
// key will contain all of the key/value pairs of the subsequent range
// and the subsequent range will no longer exist.
// AdminMerge merges the range containing key and the subsequent range. After
// the merge operation is complete, the range containing key will contain all of
// the key/value pairs of the subsequent range and the subsequent range will no
// longer exist. Neither range may contain learner replicas, if one does, an
// error is returned.
//
// key can be either a byte slice or a string.
func (db *DB) AdminMerge(ctx context.Context, key interface{}) error {
Expand Down
56 changes: 45 additions & 11 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -239,6 +240,15 @@ func (mq *mergeQueue) process(
return nil
}

// Range was manually split and not expired, so skip merging.
now := mq.store.Clock().Now()
if now.Less(rhsDesc.GetStickyBit()) {
log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired")
// TODO(jeffreyxiao): Consider returning a purgatory error to avoid
// repeatedly processing ranges that cannot be merged.
return nil
}

mergedDesc := &roachpb.RangeDescriptor{
StartKey: lhsDesc.StartKey,
EndKey: rhsDesc.EndKey,
Expand All @@ -264,13 +274,46 @@ func (mq *mergeQueue) process(
return nil
}

if !replicaSetsEqual(lhsDesc.Replicas().Unwrap(), rhsDesc.Replicas().Unwrap()) {
{
// AdminMerge errors if there are learners on either side and
// AdminRelocateRange removes any on the range it operates on. For the sake
// of obviousness, just remove them all upfront.
newLHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), lhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
lhsDesc = newLHSDesc
newRHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), &rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
rhsDesc = *newRHSDesc
}
lhsReplicas, rhsReplicas := lhsDesc.Replicas().All(), rhsDesc.Replicas().All()

// Defensive sanity check that everything is now a voter.
for i := range lhsReplicas {
if lhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER {
return errors.Errorf(`cannot merge non-voter replicas on lhs: %v`, lhsReplicas)
}
}
for i := range rhsReplicas {
if rhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER {
return errors.Errorf(`cannot merge non-voter replicas on rhs: %v`, rhsReplicas)
}
}

if !replicaSetsEqual(lhsReplicas, rhsReplicas) {
var targets []roachpb.ReplicationTarget
for _, lhsReplDesc := range lhsDesc.Replicas().Unwrap() {
for _, lhsReplDesc := range lhsReplicas {
targets = append(targets, roachpb.ReplicationTarget{
NodeID: lhsReplDesc.NodeID, StoreID: lhsReplDesc.StoreID,
})
}
// AdminRelocateRange moves the lease to the first target in the list, so
// sort the existing leaseholder there to leave it unchanged.
lease, _ := lhsRepl.GetLease()
for i := range targets {
if targets[i].NodeID == lease.Replica.NodeID && targets[i].StoreID == lease.Replica.StoreID {
Expand All @@ -287,15 +330,6 @@ func (mq *mergeQueue) process(
}
}

// Range was manually split and not expired, so skip merging.
now := mq.store.Clock().Now()
if now.Less(rhsDesc.GetStickyBit()) {
log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired")
// TODO(jeffreyxiao): Consider returning a purgatory error to avoid
// repeatedly processing ranges that cannot be merged.
return nil
}

log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey)
reason := fmt.Sprintf("lhs+rhs has (size=%s+%s qps=%.2f+%.2f --> %.2fqps) below threshold (size=%s, qps=%.2f)",
humanizeutil.IBytes(lhsStats.Total()),
Expand Down
89 changes: 49 additions & 40 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,19 @@ func (r *Replica) AdminMerge(
// Should never happen, but just in case.
return errors.Errorf("ranges are not adjacent; %s != %s", origLeftDesc.EndKey, rightDesc.StartKey)
}
if l, r := origLeftDesc.Replicas(), rightDesc.Replicas(); !replicaSetsEqual(l.Unwrap(), r.Unwrap()) {
return errors.Errorf("ranges not collocated; %s != %s", l, r)
// For simplicity, don't handle learner replicas, expect the caller to
// resolve them first. (Defensively, we check that there are no non-voter
// replicas, in case some third type is later added). This behavior can be
// changed later if the complexity becomes worth it, but it's not right now.
lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas()
if len(lReplicas.Voters()) != len(lReplicas.All()) {
return errors.Errorf("cannot merge range with non-voter replicas on lhs: %s", lReplicas)
}
if len(rReplicas.Voters()) != len(rReplicas.All()) {
return errors.Errorf("cannot merge range with non-voter replicas on rhs: %s", rReplicas)
}
if !replicaSetsEqual(lReplicas.All(), rReplicas.All()) {
return errors.Errorf("ranges not collocated; %s != %s", lReplicas, rReplicas)
}

updatedLeftDesc := *origLeftDesc
Expand Down Expand Up @@ -1549,6 +1560,20 @@ func (s *Store) AdminRelocateRange(
rangeDesc.SetReplicas(rangeDesc.Replicas().DeepCopy())
startKey := rangeDesc.StartKey.AsRawKey()

// Step 0: Remove all learners so we don't have to think about them. We could
// do something smarter here and try to promote them, but it doesn't seem
// worth the complexity right now. Revisit if this is an issue in practice.
//
// Note that we can't just add the learners to removeTargets. The below logic
// always does add then remove and if the learner was in the requested
// targets, we might try to add it before removing it.
newDesc, err := removeLearners(ctx, s.DB(), &rangeDesc)
if err != nil {
log.Warning(ctx, err)
return err
}
rangeDesc = *newDesc

// Step 1: Compute which replicas are to be added and which are to be removed.
//
// TODO(radu): we can't have multiple replicas on different stores on the
Expand Down Expand Up @@ -1611,32 +1636,6 @@ func (s *Store) AdminRelocateRange(
}
}

// updateRangeDesc updates the passed RangeDescriptor following the successful
// completion of an AdminChangeReplicasRequest with the single provided target
// and changeType.
// TODO(ajwerner): Remove this for 19.2 after AdminChangeReplicas always
// returns a non-nil Desc.
updateRangeDesc := func(
desc *roachpb.RangeDescriptor,
changeType roachpb.ReplicaChangeType,
target roachpb.ReplicationTarget,
) {
switch changeType {
case roachpb.ADD_REPLICA:
desc.AddReplica(roachpb.ReplicaDescriptor{
NodeID: target.NodeID,
StoreID: target.StoreID,
ReplicaID: desc.NextReplicaID,
})
desc.NextReplicaID++
case roachpb.REMOVE_REPLICA:
newReplicas := removeTargetFromSlice(desc.Replicas().All(), target)
desc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas))
default:
panic(errors.Errorf("unknown ReplicaChangeType %v", changeType))
}
}

sysCfg := s.cfg.Gossip.GetSystemConfig()
if sysCfg == nil {
return fmt.Errorf("no system config available, unable to perform RelocateRange")
Expand Down Expand Up @@ -1720,12 +1719,7 @@ func (s *Store) AdminRelocateRange(
// local copy of the range descriptor such that future allocator
// decisions take it into account.
addTargets = removeTargetFromSlice(addTargets, target)
if newDesc != nil {
rangeInfo.Desc = newDesc
} else {
// TODO(ajwerner): Remove this case for 19.2.
updateRangeDesc(rangeInfo.Desc, roachpb.ADD_REPLICA, target)
}
rangeInfo.Desc = newDesc
}

if len(removeTargets) > 0 && len(removeTargets) > len(addTargets) {
Expand Down Expand Up @@ -1757,12 +1751,7 @@ func (s *Store) AdminRelocateRange(
// copy of the range descriptor such that future allocator decisions take
// its absence into account.
removeTargets = removeTargetFromSlice(removeTargets, target)
if newDesc != nil {
rangeInfo.Desc = newDesc
} else {
// TODO(ajwerner): Remove this case for 19.2.
updateRangeDesc(rangeInfo.Desc, roachpb.REMOVE_REPLICA, target)
}
rangeInfo.Desc = newDesc
}
}

Expand All @@ -1788,6 +1777,26 @@ func removeTargetFromSlice(
return targets
}

func removeLearners(
ctx context.Context, db *client.DB, desc *roachpb.RangeDescriptor,
) (*roachpb.RangeDescriptor, error) {
learners := desc.Replicas().Learners()
if len(learners) == 0 {
return desc, nil
}
targets := make([]roachpb.ReplicationTarget, len(learners))
for i := range learners {
targets[i].NodeID = learners[i].NodeID
targets[i].StoreID = learners[i].StoreID
}
log.VEventf(ctx, 2, `removing learner replicas %v from %v`, targets, desc)
newDesc, err := db.AdminChangeReplicas(ctx, desc.StartKey, roachpb.REMOVE_REPLICA, targets, *desc)
if err != nil {
return nil, errors.Wrapf(err, `removing learners from %s`, desc)
}
return newDesc, nil
}

// adminScatter moves replicas and leaseholders for a selection of ranges.
func (r *Replica) adminScatter(
ctx context.Context, args roachpb.AdminScatterRequest,
Expand Down
124 changes: 124 additions & 0 deletions pkg/storage/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -574,3 +575,126 @@ func TestLearnerFollowerRead(t *testing.T) {
return nil
})
}

func TestLearnerAdminRelocateRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
knobs, ltk := makeLearnerTestKnobs()
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)

scratchStartKey := tc.ScratchRange(t)
atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 1)
_ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
_ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(2))
atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 0)

// Test AdminRelocateRange's treatment of learners by having one that it has
// to remove and one that should stay and become a voter.
//
// Before: 1 (voter), 2 (learner), 3 (learner)
// After: 1 (voter), 2 (voter), 4 (voter)
targets := []roachpb.ReplicationTarget{tc.Target(0), tc.Target(1), tc.Target(3)}
require.NoError(t, tc.Server(0).DB().AdminRelocateRange(ctx, scratchStartKey, targets))
desc := tc.LookupRangeOrFatal(t, scratchStartKey)
voters := desc.Replicas().Voters()
require.Len(t, voters, len(targets))
sort.Slice(voters, func(i, j int) bool { return voters[i].NodeID < voters[j].NodeID })
for i := range voters {
require.Equal(t, targets[i].NodeID, voters[i].NodeID, `%v`, voters)
require.Equal(t, targets[i].StoreID, voters[i].StoreID, `%v`, voters)
}
require.Empty(t, desc.Replicas().Learners())
}

func TestLearnerAdminMerge(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
knobs, ltk := makeLearnerTestKnobs()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)

scratchStartKey := tc.ScratchRange(t)
splitKey1 := scratchStartKey.Next()
splitKey2 := splitKey1.Next()
_, _ = tc.SplitRangeOrFatal(t, splitKey1)
_, _ = tc.SplitRangeOrFatal(t, splitKey2)

atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 1)
_ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
_ = tc.AddReplicasOrFatal(t, splitKey2, tc.Target(1))
atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 0)

// Learner on the lhs should fail.
err := tc.Server(0).DB().AdminMerge(ctx, scratchStartKey)
if !testutils.IsError(err, `cannot merge range with non-voter replicas on lhs`) {
t.Fatalf(`expected "cannot merge range with non-voter replicas on lhs" error got: %+v`, err)
}
// Learner on the rhs should fail.
err = tc.Server(0).DB().AdminMerge(ctx, splitKey1)
if !testutils.IsError(err, `cannot merge range with non-voter replicas on rhs`) {
t.Fatalf(`expected "cannot merge range with non-voter replicas on rhs" error got: %+v`, err)
}
}

func TestMergeQueueSeesLearner(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
knobs, ltk := makeLearnerTestKnobs()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
// TestCluster currently overrides this when used with ReplicationManual.
db.Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = true`)

scratchStartKey := tc.ScratchRange(t)
origDesc := tc.LookupRangeOrFatal(t, scratchStartKey)

splitKey := scratchStartKey.Next()
_, _ = tc.SplitRangeOrFatal(t, splitKey)

atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 1)
_ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 0)

// Unsplit the range to clear the sticky bit.
require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey))

// Run the merge queue.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, errMsg, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
require.NoError(t, err)
require.Equal(t, ``, errMsg)
formattedTrace := tracing.FormatRecordedSpans(trace)
expectedMessages := []string{
`removing learner replicas \[n2,s2\]`,
`merging to produce range: /Table/Max-/Max`,
}
if err := testutils.MatchInOrder(formattedTrace, expectedMessages...); err != nil {
t.Fatal(err)
}

// Sanity check that the desc has the same bounds it did originally.
desc := tc.LookupRangeOrFatal(t, scratchStartKey)
require.Equal(t, origDesc.StartKey, desc.StartKey)
require.Equal(t, origDesc.EndKey, desc.EndKey)
// The merge removed the learner.
require.Len(t, desc.Replicas().Voters(), 1)
require.Empty(t, desc.Replicas().Learners())
}
11 changes: 11 additions & 0 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,17 @@ func (tc *TestCluster) SplitRange(
return tc.Servers[0].SplitRange(splitKey)
}

// SplitRangeOrFatal is the same as SplitRange but will Fatal the test on error.
func (tc *TestCluster) SplitRangeOrFatal(
t testing.TB, splitKey roachpb.Key,
) (roachpb.RangeDescriptor, roachpb.RangeDescriptor) {
lhsDesc, rhsDesc, err := tc.Servers[0].SplitRange(splitKey)
if err != nil {
t.Fatalf(`splitting at %s: %+v`, splitKey, err)
}
return lhsDesc, rhsDesc
}

// Target returns a ReplicationTarget for the specified server.
func (tc *TestCluster) Target(serverIdx int) roachpb.ReplicationTarget {
s := tc.Servers[serverIdx]
Expand Down

0 comments on commit 5bda9fb

Please sign in to comment.