Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: handle learner replicas in merge code #39151

Merged
merged 1 commit into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,11 @@ func (db *DB) DelRange(ctx context.Context, begin, end interface{}) error {
return getOneErr(db.Run(ctx, b), b)
}

// AdminMerge merges the range containing key and the subsequent
// range. After the merge operation is complete, the range containing
// key will contain all of the key/value pairs of the subsequent range
// and the subsequent range will no longer exist.
// AdminMerge merges the range containing key and the subsequent range. After
// the merge operation is complete, the range containing key will contain all of
// the key/value pairs of the subsequent range and the subsequent range will no
// longer exist. Neither range may contain learner replicas, if one does, an
// error is returned.
//
// key can be either a byte slice or a string.
func (db *DB) AdminMerge(ctx context.Context, key interface{}) error {
Expand Down
56 changes: 45 additions & 11 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -239,6 +240,15 @@ func (mq *mergeQueue) process(
return nil
}

// Range was manually split and not expired, so skip merging.
now := mq.store.Clock().Now()
if now.Less(rhsDesc.GetStickyBit()) {
log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired")
// TODO(jeffreyxiao): Consider returning a purgatory error to avoid
// repeatedly processing ranges that cannot be merged.
return nil
}

mergedDesc := &roachpb.RangeDescriptor{
StartKey: lhsDesc.StartKey,
EndKey: rhsDesc.EndKey,
Expand All @@ -264,13 +274,46 @@ func (mq *mergeQueue) process(
return nil
}

if !replicaSetsEqual(lhsDesc.Replicas().Unwrap(), rhsDesc.Replicas().Unwrap()) {
{
// AdminMerge errors if there are learners on either side and
// AdminRelocateRange removes any on the range it operates on. For the sake
// of obviousness, just remove them all upfront.
newLHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), lhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
lhsDesc = newLHSDesc
newRHSDesc, err := removeLearners(ctx, lhsRepl.store.DB(), &rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}
rhsDesc = *newRHSDesc
}
lhsReplicas, rhsReplicas := lhsDesc.Replicas().All(), rhsDesc.Replicas().All()

// Defensive sanity check that everything is now a voter.
for i := range lhsReplicas {
if lhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER {
return errors.Errorf(`cannot merge non-voter replicas on lhs: %v`, lhsReplicas)
}
}
for i := range rhsReplicas {
if rhsReplicas[i].GetType() != roachpb.ReplicaType_VOTER {
return errors.Errorf(`cannot merge non-voter replicas on rhs: %v`, rhsReplicas)
}
}

if !replicaSetsEqual(lhsReplicas, rhsReplicas) {
var targets []roachpb.ReplicationTarget
for _, lhsReplDesc := range lhsDesc.Replicas().Unwrap() {
for _, lhsReplDesc := range lhsReplicas {
targets = append(targets, roachpb.ReplicationTarget{
NodeID: lhsReplDesc.NodeID, StoreID: lhsReplDesc.StoreID,
})
}
// AdminRelocateRange moves the lease to the first target in the list, so
// sort the existing leaseholder there to leave it unchanged.
lease, _ := lhsRepl.GetLease()
for i := range targets {
if targets[i].NodeID == lease.Replica.NodeID && targets[i].StoreID == lease.Replica.StoreID {
Expand All @@ -287,15 +330,6 @@ func (mq *mergeQueue) process(
}
}

// Range was manually split and not expired, so skip merging.
now := mq.store.Clock().Now()
if now.Less(rhsDesc.GetStickyBit()) {
log.VEventf(ctx, 2, "skipping merge: ranges were manually split and sticky bit was not expired")
// TODO(jeffreyxiao): Consider returning a purgatory error to avoid
// repeatedly processing ranges that cannot be merged.
return nil
}

log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey)
reason := fmt.Sprintf("lhs+rhs has (size=%s+%s qps=%.2f+%.2f --> %.2fqps) below threshold (size=%s, qps=%.2f)",
humanizeutil.IBytes(lhsStats.Total()),
Expand Down
89 changes: 49 additions & 40 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,19 @@ func (r *Replica) AdminMerge(
// Should never happen, but just in case.
return errors.Errorf("ranges are not adjacent; %s != %s", origLeftDesc.EndKey, rightDesc.StartKey)
}
if l, r := origLeftDesc.Replicas(), rightDesc.Replicas(); !replicaSetsEqual(l.Unwrap(), r.Unwrap()) {
return errors.Errorf("ranges not collocated; %s != %s", l, r)
// For simplicity, don't handle learner replicas, expect the caller to
// resolve them first. (Defensively, we check that there are no non-voter
// replicas, in case some third type is later added). This behavior can be
// changed later if the complexity becomes worth it, but it's not right now.
lReplicas, rReplicas := origLeftDesc.Replicas(), rightDesc.Replicas()
if len(lReplicas.Voters()) != len(lReplicas.All()) {
return errors.Errorf("cannot merge range with non-voter replicas on lhs: %s", lReplicas)
}
if len(rReplicas.Voters()) != len(rReplicas.All()) {
return errors.Errorf("cannot merge range with non-voter replicas on rhs: %s", rReplicas)
}
if !replicaSetsEqual(lReplicas.All(), rReplicas.All()) {
return errors.Errorf("ranges not collocated; %s != %s", lReplicas, rReplicas)
}

updatedLeftDesc := *origLeftDesc
Expand Down Expand Up @@ -1549,6 +1560,20 @@ func (s *Store) AdminRelocateRange(
rangeDesc.SetReplicas(rangeDesc.Replicas().DeepCopy())
startKey := rangeDesc.StartKey.AsRawKey()

// Step 0: Remove all learners so we don't have to think about them. We could
// do something smarter here and try to promote them, but it doesn't seem
// worth the complexity right now. Revisit if this is an issue in practice.
//
// Note that we can't just add the learners to removeTargets. The below logic
// always does add then remove and if the learner was in the requested
// targets, we might try to add it before removing it.
newDesc, err := removeLearners(ctx, s.DB(), &rangeDesc)
if err != nil {
log.Warning(ctx, err)
return err
}
rangeDesc = *newDesc

// Step 1: Compute which replicas are to be added and which are to be removed.
//
// TODO(radu): we can't have multiple replicas on different stores on the
Expand Down Expand Up @@ -1611,32 +1636,6 @@ func (s *Store) AdminRelocateRange(
}
}

// updateRangeDesc updates the passed RangeDescriptor following the successful
// completion of an AdminChangeReplicasRequest with the single provided target
// and changeType.
// TODO(ajwerner): Remove this for 19.2 after AdminChangeReplicas always
// returns a non-nil Desc.
updateRangeDesc := func(
desc *roachpb.RangeDescriptor,
changeType roachpb.ReplicaChangeType,
target roachpb.ReplicationTarget,
) {
switch changeType {
case roachpb.ADD_REPLICA:
desc.AddReplica(roachpb.ReplicaDescriptor{
NodeID: target.NodeID,
StoreID: target.StoreID,
ReplicaID: desc.NextReplicaID,
})
desc.NextReplicaID++
case roachpb.REMOVE_REPLICA:
newReplicas := removeTargetFromSlice(desc.Replicas().All(), target)
desc.SetReplicas(roachpb.MakeReplicaDescriptors(&newReplicas))
default:
panic(errors.Errorf("unknown ReplicaChangeType %v", changeType))
}
}

sysCfg := s.cfg.Gossip.GetSystemConfig()
if sysCfg == nil {
return fmt.Errorf("no system config available, unable to perform RelocateRange")
Expand Down Expand Up @@ -1720,12 +1719,7 @@ func (s *Store) AdminRelocateRange(
// local copy of the range descriptor such that future allocator
// decisions take it into account.
addTargets = removeTargetFromSlice(addTargets, target)
if newDesc != nil {
rangeInfo.Desc = newDesc
} else {
// TODO(ajwerner): Remove this case for 19.2.
updateRangeDesc(rangeInfo.Desc, roachpb.ADD_REPLICA, target)
}
rangeInfo.Desc = newDesc
}

if len(removeTargets) > 0 && len(removeTargets) > len(addTargets) {
Expand Down Expand Up @@ -1757,12 +1751,7 @@ func (s *Store) AdminRelocateRange(
// copy of the range descriptor such that future allocator decisions take
// its absence into account.
removeTargets = removeTargetFromSlice(removeTargets, target)
if newDesc != nil {
rangeInfo.Desc = newDesc
} else {
// TODO(ajwerner): Remove this case for 19.2.
updateRangeDesc(rangeInfo.Desc, roachpb.REMOVE_REPLICA, target)
}
rangeInfo.Desc = newDesc
}
}

Expand All @@ -1788,6 +1777,26 @@ func removeTargetFromSlice(
return targets
}

func removeLearners(
ctx context.Context, db *client.DB, desc *roachpb.RangeDescriptor,
) (*roachpb.RangeDescriptor, error) {
learners := desc.Replicas().Learners()
if len(learners) == 0 {
return desc, nil
}
targets := make([]roachpb.ReplicationTarget, len(learners))
for i := range learners {
targets[i].NodeID = learners[i].NodeID
targets[i].StoreID = learners[i].StoreID
}
log.VEventf(ctx, 2, `removing learner replicas %v from %v`, targets, desc)
newDesc, err := db.AdminChangeReplicas(ctx, desc.StartKey, roachpb.REMOVE_REPLICA, targets, *desc)
if err != nil {
return nil, errors.Wrapf(err, `removing learners from %s`, desc)
}
return newDesc, nil
}

// adminScatter moves replicas and leaseholders for a selection of ranges.
func (r *Replica) adminScatter(
ctx context.Context, args roachpb.AdminScatterRequest,
Expand Down
124 changes: 124 additions & 0 deletions pkg/storage/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"path/filepath"
"sort"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -574,3 +575,126 @@ func TestLearnerFollowerRead(t *testing.T) {
return nil
})
}

func TestLearnerAdminRelocateRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
knobs, ltk := makeLearnerTestKnobs()
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)

scratchStartKey := tc.ScratchRange(t)
atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 1)
_ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
_ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(2))
atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 0)

// Test AdminRelocateRange's treatment of learners by having one that it has
// to remove and one that should stay and become a voter.
//
// Before: 1 (voter), 2 (learner), 3 (learner)
// After: 1 (voter), 2 (voter), 4 (voter)
targets := []roachpb.ReplicationTarget{tc.Target(0), tc.Target(1), tc.Target(3)}
require.NoError(t, tc.Server(0).DB().AdminRelocateRange(ctx, scratchStartKey, targets))
desc := tc.LookupRangeOrFatal(t, scratchStartKey)
voters := desc.Replicas().Voters()
require.Len(t, voters, len(targets))
sort.Slice(voters, func(i, j int) bool { return voters[i].NodeID < voters[j].NodeID })
for i := range voters {
require.Equal(t, targets[i].NodeID, voters[i].NodeID, `%v`, voters)
require.Equal(t, targets[i].StoreID, voters[i].StoreID, `%v`, voters)
}
require.Empty(t, desc.Replicas().Learners())
}

func TestLearnerAdminMerge(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
knobs, ltk := makeLearnerTestKnobs()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)

scratchStartKey := tc.ScratchRange(t)
splitKey1 := scratchStartKey.Next()
splitKey2 := splitKey1.Next()
_, _ = tc.SplitRangeOrFatal(t, splitKey1)
_, _ = tc.SplitRangeOrFatal(t, splitKey2)

atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 1)
_ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
_ = tc.AddReplicasOrFatal(t, splitKey2, tc.Target(1))
atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 0)

// Learner on the lhs should fail.
err := tc.Server(0).DB().AdminMerge(ctx, scratchStartKey)
if !testutils.IsError(err, `cannot merge range with non-voter replicas on lhs`) {
t.Fatalf(`expected "cannot merge range with non-voter replicas on lhs" error got: %+v`, err)
}
// Learner on the rhs should fail.
err = tc.Server(0).DB().AdminMerge(ctx, splitKey1)
if !testutils.IsError(err, `cannot merge range with non-voter replicas on rhs`) {
t.Fatalf(`expected "cannot merge range with non-voter replicas on rhs" error got: %+v`, err)
}
}

func TestMergeQueueSeesLearner(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
knobs, ltk := makeLearnerTestKnobs()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
db := sqlutils.MakeSQLRunner(tc.ServerConn(0))
db.Exec(t, `SET CLUSTER SETTING kv.learner_replicas.enabled = true`)
// TestCluster currently overrides this when used with ReplicationManual.
db.Exec(t, `SET CLUSTER SETTING kv.range_merge.queue_enabled = true`)

scratchStartKey := tc.ScratchRange(t)
origDesc := tc.LookupRangeOrFatal(t, scratchStartKey)

splitKey := scratchStartKey.Next()
_, _ = tc.SplitRangeOrFatal(t, splitKey)

atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 1)
_ = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1))
atomic.StoreInt64(&ltk.replicaAddStopAfterLearnerAtomic, 0)

// Unsplit the range to clear the sticky bit.
require.NoError(t, tc.Server(0).DB().AdminUnsplit(ctx, splitKey))

// Run the merge queue.
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
trace, errMsg, err := store.ManuallyEnqueue(ctx, "merge", repl, true /* skipShouldQueue */)
require.NoError(t, err)
require.Equal(t, ``, errMsg)
formattedTrace := tracing.FormatRecordedSpans(trace)
expectedMessages := []string{
`removing learner replicas \[n2,s2\]`,
`merging to produce range: /Table/Max-/Max`,
}
if err := testutils.MatchInOrder(formattedTrace, expectedMessages...); err != nil {
t.Fatal(err)
}

// Sanity check that the desc has the same bounds it did originally.
desc := tc.LookupRangeOrFatal(t, scratchStartKey)
require.Equal(t, origDesc.StartKey, desc.StartKey)
require.Equal(t, origDesc.EndKey, desc.EndKey)
// The merge removed the learner.
require.Len(t, desc.Replicas().Voters(), 1)
require.Empty(t, desc.Replicas().Learners())
}
11 changes: 11 additions & 0 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,17 @@ func (tc *TestCluster) SplitRange(
return tc.Servers[0].SplitRange(splitKey)
}

// SplitRangeOrFatal is the same as SplitRange but will Fatal the test on error.
func (tc *TestCluster) SplitRangeOrFatal(
t testing.TB, splitKey roachpb.Key,
) (roachpb.RangeDescriptor, roachpb.RangeDescriptor) {
lhsDesc, rhsDesc, err := tc.Servers[0].SplitRange(splitKey)
if err != nil {
t.Fatalf(`splitting at %s: %+v`, splitKey, err)
}
return lhsDesc, rhsDesc
}

// Target returns a ReplicationTarget for the specified server.
func (tc *TestCluster) Target(serverIdx int) roachpb.ReplicationTarget {
s := tc.Servers[serverIdx]
Expand Down