Skip to content

Commit

Permalink
kvserver: skip non-live nodes when considering candidates for transfers
Browse files Browse the repository at this point in the history
Prior to this patch, 3 components could attempt to transfer a replica
to a node currently being drained:

- the store rebalancer, which rebalances replicas based on disk
  usage and QPS.
- the allocator, to place new replicas.
- the allocator, to rebalance replicas depending on load.

This commit introduces a consideration for node liveness when building
the list of candidates, to detect whether a target node is
acceptable. Any node that is not LIVE according to its liveness status
is not considered for a transfer.

Release note (bug fix): In some cases CockroachDB would attempt to
transfer ranges to nodes in the process of being decommissioned or
being shut down; this could cause disruption the moment the node
did actually terminate. This bug has been fixed. It had been
introduced some time before v2.0.
  • Loading branch information
knz committed Oct 12, 2020
1 parent 5f004f7 commit 5138627
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 4 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,11 @@ func (a *Allocator) allocateTargetFromList(
analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone)
candidates := allocateCandidates(
ctx,
sl, analyzedConstraints, candidateReplicas,
a.storePool.getLocalitiesByStore(candidateReplicas), options,
a.storePool.getLocalitiesByStore(candidateReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
)
log.VEventf(ctx, 3, "allocate candidates: %s", candidates)
if target := candidates.selectGood(a.randGen); target != nil {
Expand Down Expand Up @@ -664,6 +667,7 @@ func (a Allocator) RebalanceTarget(
analyzedConstraints,
existingReplicas,
a.storePool.getLocalitiesByStore(existingReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
)

Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,23 @@ func (cl candidateList) removeCandidate(c candidate) candidateList {
// for allocating a new replica ordered from the best to the worst. Only
// stores that meet the criteria are included in the list.
func allocateCandidates(
ctx context.Context,
sl StoreList,
constraints constraint.AnalyzedConstraints,
existing []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
if nodeHasReplica(s.Node.NodeID, existing) {
continue
}
if isNodeValidForRoutineReplicaTransfer != nil && !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) {
log.VEventf(ctx, 3, "not considering non-ready node n%d for allocate", s.Node.NodeID)
continue
}
constraintsOK, necessary := allocateConstraintsCheck(s, constraints)
if !constraintsOK {
continue
Expand Down Expand Up @@ -523,13 +529,18 @@ func rebalanceCandidates(
constraints constraint.AnalyzedConstraints,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
) []rebalanceOptions {
// 1. Determine whether existing replicas are valid and/or necessary.
existingStores := make(map[roachpb.StoreID]candidate)
var needRebalanceFrom bool
curDiversityScore := rangeDiversityScore(existingStoreLocalities)
for _, store := range allStores.stores {
if isNodeValidForRoutineReplicaTransfer != nil && !isNodeValidForRoutineReplicaTransfer(ctx, store.Node.NodeID) {
log.VEventf(ctx, 3, "not considering non-ready node n%d for rebalance", store.Node.NodeID)
continue
}
for _, repl := range existingReplicas {
if store.StoreID != repl.StoreID {
continue
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@ func TestShouldRebalanceDiversity(t *testing.T) {
constraint.AnalyzedConstraints{},
replicas,
existingStoreLocalities,
nil, /* isNodeValidForRoutineReplicaTransfer */
options)
actual := len(targets) > 0
if actual != tc.expected {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2666,10 +2666,12 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) {
analyzed := constraint.AnalyzeConstraints(
context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone)
candidates := allocateCandidates(
context.Background(),
sl,
analyzed,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
nil, /* isNodeValidForRoutineReplicaTransfer */
a.scorerOptions(),
)
best := candidates.best()
Expand Down Expand Up @@ -3690,6 +3692,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) {
analyzed,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
nil, /* isNodeValidForRoutineReplicaTransfer */
a.scorerOptions(),
)
match := true
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,3 +811,22 @@ func (sp *StorePool) getNodeLocalityString(nodeID roachpb.NodeID) string {
}
return locality.str
}

func (sp *StorePool) isNodeReadyForRoutineReplicaTransfer(
ctx context.Context, targetNodeID roachpb.NodeID,
) bool {
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
now := sp.clock.Now().GoTime()

liveness := sp.nodeLivenessFn(
targetNodeID, now, timeUntilStoreDead)
res := liveness == kvserverpb.NodeLivenessStatus_LIVE
if res {
log.VEventf(ctx, 3,
"n%d is a live target, candidate for rebalancing", targetNodeID)
} else {
log.VEventf(ctx, 3,
"not considering non-live node n%d (%s)", targetNodeID, liveness)
}
return res
}
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
}

meanQPS := storeList.candidateQueriesPerSecond.mean
if shouldNotMoveTo(ctx, storeMap, replWithStats, candidate.StoreID, meanQPS, minQPS, maxQPS) {
if sr.shouldNotMoveTo(ctx, storeMap, replWithStats, candidate.StoreID, meanQPS, minQPS, maxQPS) {
continue
}

Expand Down Expand Up @@ -553,7 +553,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance(
}

meanQPS := storeList.candidateQueriesPerSecond.mean
if shouldNotMoveTo(ctx, storeMap, replWithStats, target.StoreID, meanQPS, minQPS, maxQPS) {
if sr.shouldNotMoveTo(ctx, storeMap, replWithStats, target.StoreID, meanQPS, minQPS, maxQPS) {
break
}

Expand Down Expand Up @@ -634,7 +634,7 @@ func shouldNotMoveAway(
return false
}

func shouldNotMoveTo(
func (sr *StoreRebalancer) shouldNotMoveTo(
ctx context.Context,
storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor,
replWithStats replicaWithStats,
Expand Down Expand Up @@ -664,6 +664,16 @@ func shouldNotMoveTo(
return true
}

// If the target store is on a separate node, we will also care
// about node liveness.
targetNodeID := storeDesc.Node.NodeID
if targetNodeID != sr.rq.store.Ident.NodeID {
if !sr.rq.store.cfg.StorePool.isNodeReadyForRoutineReplicaTransfer(ctx, targetNodeID) {
log.VEventf(ctx, 3,
"refusing to transfer replica to n%d/s%d", targetNodeID, storeDesc.StoreID)
return true
}
}
return false
}

Expand Down

0 comments on commit 5138627

Please sign in to comment.