Skip to content

Commit

Permalink
Merge #65584
Browse files Browse the repository at this point in the history
65584: kvserver: fix bug obstructing range merges on multi-store clusters r=aayushshah15 a=aayushshah15

Previously, we had a safeguard inside `allocateTargetFromList()` (allocator
method used for finding the best target to allocate a new replica) that ensured
that it would never return a target store on the same node as one of the
existing replicas. This check was mostly well-intentioned but it became
outdated after we started allowing rebalances between stores on the same node
in #51567.

The aforementioned check is no longer correct since:
1. Callers of `AdminRelocateRange()` (currently, the merge queue and the
`StoreRebalancer`) must have the ability to move replicas laterally.
2. We have a more precise check inside `AdminChangeReplicas` that guards
against adding 2 replicas of a range on the same node. This check precisely
allows the cases where we're rebalancing laterally.

As a result of this check, clusters that use multiple stores per node were
more likely to have their range merges fail because the merge queue would fail
in its attempt to collocate ranges that required lateral movement of replicas
across stores. @adityamaru noticed error messages of the following nature
flooding the logs on a cluster while debugging an incident:
```
none of the remaining voters [n1,s2] are legal additions to (n5,s18):1,(n9,s33):2,(n1,s1):3
```

This patch fixes this behavior by allowing `AdminRelocateRange()` to disable
this check, because it may often find itself trying to execute upon relocations
that require moving replicas laterally within a given node. Note that we do not
allow _upreplication_ logic to disable this check since we do not want to
upreplicate across stores on the same node.

Release note (bug fix): A bug that made it less likely for range merges to
succeed on clusters using multiple stores per node is now fixed.


Co-authored-by: Aayush Shah <[email protected]>
  • Loading branch information
craig[bot] and aayushshah15 committed May 31, 2021
2 parents 995f0dd + 7ad39c7 commit a64b26d
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 17 deletions.
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,14 @@ func (a *Allocator) allocateTarget(
existingVoters,
existingNonVoters,
a.scorerOptions(),
targetType)
// When allocating a *new* replica, we explicitly disregard nodes with any
// existing replicas. This is important for multi-store scenarios as
// otherwise, stores on the nodes that have existing replicas are simply
// discouraged via the diversity heuristic. We want to entirely avoid
// allocating multiple replicas onto different stores of the same node.
false, /* allowMultipleReplsPerNode */
targetType,
)
if target != nil {
return target, details, nil
}
Expand All @@ -754,7 +761,7 @@ func (a *Allocator) allocateTarget(

// AllocateVoter returns a suitable store for a new allocation of a voting
// replica with the required attributes. Nodes already accommodating existing
// replicas are ruled out as targets.
// voting replicas are ruled out as targets.
func (a *Allocator) AllocateVoter(
ctx context.Context,
zone *zonepb.ZoneConfig,
Expand All @@ -780,6 +787,7 @@ func (a *Allocator) allocateTargetFromList(
zone *zonepb.ZoneConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options scorerOptions,
allowMultipleReplsPerNode bool,
targetType targetReplicaType,
) (*roachpb.StoreDescriptor, string) {
existingReplicas := append(existingVoters, existingNonVoters...)
Expand Down Expand Up @@ -815,7 +823,9 @@ func (a *Allocator) allocateTargetFromList(
existingReplicaSet,
a.storePool.getLocalitiesByStore(existingReplicaSet),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options)
allowMultipleReplsPerNode,
options,
)

log.VEventf(ctx, 3, "allocate %s: %s", targetType, candidates)
if target := candidates.selectGood(a.randGen); target != nil {
Expand Down
17 changes: 15 additions & 2 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,18 +407,31 @@ func (cl candidateList) removeCandidate(c candidate) candidateList {
// rankedCandidateListForAllocation creates a candidate list of all stores that
// can be used for allocating a new replica ordered from the best to the worst.
// Only stores that meet the criteria are included in the list.
//
// NB: When `allowMultipleReplsPerNode` is set to false, we disregard the
// *nodes* of `existingReplicas`. Otherwise, we disregard only the *stores* of
// `existingReplicas`. For instance, `allowMultipleReplsPerNode` is set to true
// by callers performing lateral relocation of replicas within the same node.
func rankedCandidateListForAllocation(
ctx context.Context,
candidateStores StoreList,
constraintsCheck constraintsCheckFn,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
allowMultipleReplsPerNode bool,
options scorerOptions,
) candidateList {
var candidates candidateList
existingReplTargets := roachpb.MakeReplicaSet(existingReplicas).ReplicationTargets()
for _, s := range candidateStores.stores {
if nodeHasReplica(s.Node.NodeID, existingReplicas) {
// Disregard all the stores that already have replicas.
if storeHasReplica(s.StoreID, existingReplTargets) {
continue
}
// Unless the caller specifically allows us to allocate multiple replicas on
// the same node, we disregard nodes with existing replicas.
if !allowMultipleReplsPerNode && nodeHasReplica(s.Node.NodeID, existingReplTargets) {
continue
}
if !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) {
Expand Down Expand Up @@ -906,7 +919,7 @@ func shouldRebalanceBasedOnRangeCount(

// nodeHasReplica returns true if the provided NodeID contains an entry in
// the provided list of existing replicas.
func nodeHasReplica(nodeID roachpb.NodeID, existing []roachpb.ReplicaDescriptor) bool {
func nodeHasReplica(nodeID roachpb.NodeID, existing []roachpb.ReplicationTarget) bool {
for _, r := range existing {
if r.NodeID == nodeID {
return true
Expand Down
40 changes: 30 additions & 10 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,19 +675,33 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) {

for _, tc := range testCases {
{
result, _, err := a.AllocateVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), tc.existing, nil)
result, _, err := a.AllocateVoter(
context.Background(), zonepb.EmptyCompleteZoneConfig(), tc.existing, nil,
)
if e, a := tc.expectTargetAllocate, result != nil; e != a {
t.Errorf("AllocateVoter(%v) got target %v, err %v; expectTarget=%v",
tc.existing, result, err, tc.expectTargetAllocate)
t.Errorf(
"AllocateVoter(%v) got target %v, err %v; expectTarget=%v",
tc.existing, result, err, tc.expectTargetAllocate,
)
}
}

{
var rangeUsageInfo RangeUsageInfo
target, _, details, ok := a.RebalanceVoter(context.Background(), zonepb.EmptyCompleteZoneConfig(), nil, tc.existing, nil, rangeUsageInfo, storeFilterThrottled)
target, _, details, ok := a.RebalanceVoter(
context.Background(),
zonepb.EmptyCompleteZoneConfig(),
nil,
tc.existing,
nil,
rangeUsageInfo,
storeFilterThrottled,
)
if e, a := tc.expectTargetRebalance, ok; e != a {
t.Errorf("RebalanceVoter(%v) got target %v, details %v; expectTarget=%v",
tc.existing, target, details, tc.expectTargetRebalance)
t.Errorf(
"RebalanceVoter(%v) got target %v, details %v; expectTarget=%v",
tc.existing, target, details, tc.expectTargetRebalance,
)
}
}
}
Expand Down Expand Up @@ -2668,13 +2682,16 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
}

t.Run(fmt.Sprintf("%d/allocate", testIdx), func(t *testing.T) {
candidates := rankedCandidateListForAllocation(context.Background(),
candidates := rankedCandidateListForAllocation(
context.Background(),
sl,
allocationConstraintsChecker,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
a.scorerOptions())
false, /* allowMultipleReplsPerNode */
a.scorerOptions(),
)

if !expectedStoreIDsMatch(tc.expected, candidates) {
t.Errorf("expected rankedCandidateListForAllocation(%v) = %v, but got %v",
Expand Down Expand Up @@ -3009,13 +3026,16 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) {
zone.Constraints)
checkFn := voterConstraintsCheckerForAllocation(analyzed, constraint.EmptyAnalyzedConstraints)

candidates := rankedCandidateListForAllocation(context.Background(),
candidates := rankedCandidateListForAllocation(
context.Background(),
sl,
checkFn,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
func(context.Context, roachpb.NodeID) bool { return true },
a.scorerOptions())
false, /* allowMultipleReplsPerNode */
a.scorerOptions(),
)
best := candidates.best()
match := true
if len(tc.expected) != len(best) {
Expand Down
123 changes: 122 additions & 1 deletion pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
Expand Down Expand Up @@ -65,6 +66,27 @@ func relocateAndCheck(
return retries
}

func requireRelocationFailure(
ctx context.Context,
t *testing.T,
tc *testcluster.TestCluster,
startKey roachpb.RKey,
voterTargets []roachpb.ReplicationTarget,
nonVoterTargets []roachpb.ReplicationTarget,
errRegExp string,
) {
testutils.SucceedsSoon(t, func() error {
err := tc.Servers[0].DB().AdminRelocateRange(
ctx, startKey.AsRawKey(), voterTargets, nonVoterTargets,
)
if kv.IsExpectedRelocateError(err) {
return err
}
require.Regexp(t, errRegExp, err)
return nil
})
}

func requireDescMembers(
t *testing.T, desc roachpb.RangeDescriptor, targets []roachpb.ReplicationTarget,
) {
Expand Down Expand Up @@ -94,7 +116,9 @@ func requireLeaseAt(
// it's returned here, so don't use FindRangeLeaseHolder which fails when
// that happens.
testutils.SucceedsSoon(t, func() error {
lease, _, err := tc.FindRangeLease(desc, &target)
// NB: Specifying a `hint` here does not play well with multi-store
// TestServers. See TODO inside `TestServer.GetRangeLease()`.
lease, _, err := tc.FindRangeLease(desc, nil /* hint */)
if err != nil {
return err
}
Expand Down Expand Up @@ -522,3 +546,100 @@ func setupReplicaRemovalTest(

return tc, key, evalDuringReplicaRemoval
}

// TestAdminRelocateRangeLaterallyAmongStores tests that `AdminRelocateRange` is
// able to relocate ranges laterally (i.e. between stores on the same node).
func TestAdminRelocateRangeLaterallyAmongStores(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
// Set up a test cluster with each node having 2 stores.
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
StoreSpecs: []base.StoreSpec{
{InMemory: true},
{InMemory: true},
},
},
ReplicationMode: base.ReplicationManual,
}
tc := testcluster.StartTestCluster(t, 5, args)
defer tc.Stopper().Stop(ctx)

for i := 0; i < tc.NumServers(); i++ {
tc.WaitForNStores(t, tc.NumServers()*2, tc.Server(i).GossipI().(*gossip.Gossip))
}

scratchKey := keys.MustAddr(tc.ScratchRange(t))
// Place replicas for the scratch range on stores 1, 3, 5 (i.e. the first
// store on each of the nodes). Note that the test cluster will start off with
// (n1,s1) already having a replica.
scratchDesc := tc.LookupRangeOrFatal(t, scratchKey.AsRawKey())
_, found := scratchDesc.GetReplicaDescriptor(1)
require.True(t, found)
tc.AddVotersOrFatal(t, scratchKey.AsRawKey(), []roachpb.ReplicationTarget{
{NodeID: 2, StoreID: 3},
{NodeID: 3, StoreID: 5},
}...)
// Now, ask `AdminRelocateRange()` to move all of these replicas laterally.
relocateAndCheck(
t, tc, scratchKey, []roachpb.ReplicationTarget{
{NodeID: 1, StoreID: 2},
{NodeID: 2, StoreID: 4},
{NodeID: 3, StoreID: 5},
}, nil, /* nonVoterTargets */
)
// Ensure that this sort of lateral relocation works even across non-voters
// and voters.
relocateAndCheck(
t, tc, scratchKey, []roachpb.ReplicationTarget{
{NodeID: 2, StoreID: 4},
{NodeID: 3, StoreID: 5},
}, []roachpb.ReplicationTarget{
{NodeID: 1, StoreID: 1},
},
)
relocateAndCheck(
t, tc, scratchKey, []roachpb.ReplicationTarget{
{NodeID: 2, StoreID: 4},
{NodeID: 3, StoreID: 5},
}, []roachpb.ReplicationTarget{
{NodeID: 1, StoreID: 2},
},
)

// Ensure that, in case a caller of `AdminRelocateRange` tries to place 2
// replicas on the same node, a safeguard inside `AdminChangeReplicas()`
// rejects the operation.
requireRelocationFailure(
ctx, t, tc, scratchKey, []roachpb.ReplicationTarget{
{NodeID: 1, StoreID: 1},
{NodeID: 1, StoreID: 2},
{NodeID: 2, StoreID: 4},
{NodeID: 3, StoreID: 5},
}, nil, /* nonVoterTargets */
"node 1 already has a replica", /* errRegExp */
)
// Same as above, but for non-voting replicas.
requireRelocationFailure(
ctx, t, tc, scratchKey, []roachpb.ReplicationTarget{
{NodeID: 2, StoreID: 4},
{NodeID: 3, StoreID: 5},
}, []roachpb.ReplicationTarget{
{NodeID: 1, StoreID: 1},
{NodeID: 1, StoreID: 2},
}, "node 1 already has a replica", /* errRegExp */
)
// Ensure that we can't place 2 replicas on the same node even if one is a
// voter and the other is a non-voter.
requireRelocationFailure(
ctx, t, tc, scratchKey, []roachpb.ReplicationTarget{
{NodeID: 1, StoreID: 1},
{NodeID: 2, StoreID: 4},
{NodeID: 3, StoreID: 5},
}, []roachpb.ReplicationTarget{
{NodeID: 1, StoreID: 2},
}, "node 1 already has a replica", /* errRegExp */
)
}
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2907,6 +2907,10 @@ func (s *Store) relocateOne(
existingVoters,
existingNonVoters,
s.allocator.scorerOptions(),
// NB: Allow the allocator to return target stores that might be on the
// same node as an existing replica. This is to ensure that relocations
// that require "lateral" movement of replicas within a node can succeed.
true, /* allowMultipleReplsPerNode */
args.targetType,
)
if targetStore == nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,10 @@ func (sr *StoreRebalancer) pickRemainingRepls(
partialVoterTargets,
partialNonVoterTargets,
options,
// The store rebalancer should never need to perform lateral relocations,
// so we ask the allocator to disregard all the nodes that exist in
// `partial{Non}VoterTargets`.
false, /* allowMultipleReplsPerNode */
targetType,
)
if target == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,7 @@ func (ts *TestServer) GetRangeLease(
resp := leaseResp.(*roachpb.LeaseInfoResponse)
if queryPolicy == QueryLocalNodeOnly && resp.EvaluatedBy != ts.GetFirstStoreID() {
// TODO(andrei): Figure out how to deal with nodes with multiple stores.
// This API API should permit addressing the query to a particular store.
// This API should permit addressing the query to a particular store.
return LeaseInfo{}, hlc.ClockTimestamp{}, errors.Errorf(
"request not evaluated locally; evaluated by s%d instead of local s%d",
resp.EvaluatedBy, ts.GetFirstStoreID())
Expand Down

0 comments on commit a64b26d

Please sign in to comment.