Skip to content

Commit

Permalink
kvserver: skip non-live nodes when considering candidates for transfers
Browse files Browse the repository at this point in the history
Prior to this patch, 3 components could attempt to transfer a replica
to a node currently being drained:

- the store rebalancer, which rebalances replicas based on disk
  usage and QPS.
- the allocator, to place new replicas.
- the allocator, to rebalance replicas depending on load.

This commit introduces a consideration for node liveness when building
the list of candidates, to detect whether a target node is
acceptable. Any node that is not LIVE according to its liveness status
is not considered for a transfer.

Release note (bug fix): In some cases CockroachDB would attempt to
transfer ranges to nodes in the process of being decommissioned or
being shut down; this could cause disruption the moment the node
did actually terminate. This bug has been fixed. It had been
introduced some time before v2.0.
  • Loading branch information
knz committed Nov 12, 2020
1 parent effe282 commit ffa0ce3
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 31 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,11 @@ func (a *Allocator) allocateTargetFromList(
analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone)
candidates := allocateCandidates(
ctx,
sl, analyzedConstraints, candidateReplicas,
a.storePool.getLocalitiesByStore(candidateReplicas), options,
a.storePool.getLocalitiesByStore(candidateReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
)
log.VEventf(ctx, 3, "allocate candidates: %s", candidates)
if target := candidates.selectGood(a.randGen); target != nil {
Expand Down Expand Up @@ -664,6 +667,7 @@ func (a Allocator) RebalanceTarget(
analyzedConstraints,
existingReplicas,
a.storePool.getLocalitiesByStore(existingReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
)

Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,23 @@ func (cl candidateList) removeCandidate(c candidate) candidateList {
// for allocating a new replica ordered from the best to the worst. Only
// stores that meet the criteria are included in the list.
func allocateCandidates(
ctx context.Context,
sl StoreList,
constraints constraint.AnalyzedConstraints,
existing []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
if nodeHasReplica(s.Node.NodeID, existing) {
continue
}
if !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) {
log.VEventf(ctx, 3, "not considering non-ready node n%d for allocate", s.Node.NodeID)
continue
}
constraintsOK, necessary := allocateConstraintsCheck(s, constraints)
if !constraintsOK {
continue
Expand Down Expand Up @@ -523,13 +529,18 @@ func rebalanceCandidates(
constraints constraint.AnalyzedConstraints,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
) []rebalanceOptions {
// 1. Determine whether existing replicas are valid and/or necessary.
existingStores := make(map[roachpb.StoreID]candidate)
var needRebalanceFrom bool
curDiversityScore := rangeDiversityScore(existingStoreLocalities)
for _, store := range allStores.stores {
if !isNodeValidForRoutineReplicaTransfer(ctx, store.Node.NodeID) {
log.VEventf(ctx, 3, "not considering non-ready node n%d for rebalance", store.Node.NodeID)
continue
}
for _, repl := range existingReplicas {
if store.StoreID != repl.StoreID {
continue
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@ func TestShouldRebalanceDiversity(t *testing.T) {
constraint.AnalyzedConstraints{},
replicas,
existingStoreLocalities,
func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */
options)
actual := len(targets) > 0
if actual != tc.expected {
Expand Down
125 changes: 125 additions & 0 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2454,6 +2454,128 @@ var (
}
)

// TestAllocateCandidatesExcludeNonReadyNodes checks that non-ready
// (e.g. draining) nodes, as per a store pool's
// isNodeValidForRoutineReplicaTransfer(), are excluded from the list
// of candidates for an allocation.
func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

stores := []*roachpb.StoreDescriptor{
{
StoreID: 1,
Node: roachpb.NodeDescriptor{NodeID: 1},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 600},
},
{
StoreID: 2,
Node: roachpb.NodeDescriptor{NodeID: 2},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 600},
},
{
StoreID: 3,
Node: roachpb.NodeDescriptor{NodeID: 3},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 600},
},
{
StoreID: 4,
Node: roachpb.NodeDescriptor{NodeID: 4},
Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 100, RangeCount: 600},
},
}

stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */)
defer stopper.Stop(context.Background())
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)
sl, _, _ := a.storePool.getStoreList(storeFilterThrottled)

testCases := []struct {
existing []roachpb.StoreID
excluded []roachpb.StoreID
expected []roachpb.StoreID
}{
{
[]roachpb.StoreID{1},
[]roachpb.StoreID{2},
[]roachpb.StoreID{3, 4},
},
{
[]roachpb.StoreID{1},
[]roachpb.StoreID{2, 3},
[]roachpb.StoreID{4},
},
{
[]roachpb.StoreID{1},
[]roachpb.StoreID{2, 3, 4},
[]roachpb.StoreID{},
},
}

for testIdx, tc := range testCases {
existingRepls := make([]roachpb.ReplicaDescriptor, len(tc.existing))
for i, storeID := range tc.existing {
existingRepls[i] = roachpb.ReplicaDescriptor{
NodeID: roachpb.NodeID(storeID),
StoreID: storeID,
}
}
// No constraints.
zone := &zonepb.ZoneConfig{NumReplicas: proto.Int32(0), Constraints: nil}
analyzed := constraint.AnalyzeConstraints(
context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone)

a.storePool.isNodeReadyForRoutineReplicaTransfer = func(_ context.Context, n roachpb.NodeID) bool {
for _, s := range tc.excluded {
// NodeID match StoreIDs here, so this comparison is valid.
if roachpb.NodeID(s) == n {
return false
}
}
return true
}

t.Run(fmt.Sprintf("%d/allocate", testIdx), func(t *testing.T) {
candidates := allocateCandidates(
context.Background(),
sl,
analyzed,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
a.scorerOptions(),
)

if !expectedStoreIDsMatch(tc.expected, candidates) {
t.Errorf("expected allocateCandidates(%v) = %v, but got %v",
tc.existing, tc.expected, candidates)
}
})

t.Run(fmt.Sprintf("%d/rebalance", testIdx), func(t *testing.T) {
results := rebalanceCandidates(
context.Background(),
sl,
analyzed,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
a.scorerOptions(),
)

for i := range results {
if !expectedStoreIDsMatch(tc.existing, results[i].existingCandidates) {
t.Errorf("results[%d]: expected existing candidates %v, got %v", i, tc.existing, results[i].existingCandidates)
}
if !expectedStoreIDsMatch(tc.expected, results[i].candidates) {
t.Errorf("results[%d]: expected candidates %v, got %v", i, tc.expected, results[i].candidates)
}
}
})
}
}

func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -2666,10 +2788,12 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) {
analyzed := constraint.AnalyzeConstraints(
context.Background(), a.storePool.getStoreDescriptor, existingRepls, zone)
candidates := allocateCandidates(
context.Background(),
sl,
analyzed,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */
a.scorerOptions(),
)
best := candidates.best()
Expand Down Expand Up @@ -3690,6 +3814,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) {
analyzed,
existingRepls,
a.storePool.getLocalitiesByStore(existingRepls),
func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */
a.scorerOptions(),
)
match := true
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvserver/liveness/livenesspb/liveness.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/kv/kvserver/liveness/livenesspb/liveness.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ enum NodeLivenessStatus {
// UNAVAILABLE indicates that the node is unavailable - it has not updated its
// liveness record recently enough to be considered live, but has not been
// unavailable long enough to be considered dead.
// UNAVAILABLE is also reported for nodes whose descriptor is marked
// as draining.
NODE_STATUS_UNAVAILABLE = 2 [(gogoproto.enumvalue_customname) = "UNAVAILABLE"];
// LIVE indicates a live node.
NODE_STATUS_LIVE = 3 [(gogoproto.enumvalue_customname) = "LIVE"];
Expand Down
28 changes: 28 additions & 0 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ type StorePool struct {
syncutil.RWMutex
nodeLocalities map[roachpb.NodeID]localityWithString
}

// isNodeReadyForRoutineReplicaTransferInternal returns true iff the
// node is live and thus a good candidate to receive a replica.
// This is defined as a closure reference here instead
// of a regular method so it can be overridden in tests.
isNodeReadyForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool
}

// NewStorePool creates a StorePool and registers the store updating callback
Expand All @@ -293,6 +299,7 @@ func NewStorePool(
startTime: clock.PhysicalTime(),
deterministic: deterministic,
}
sp.isNodeReadyForRoutineReplicaTransfer = sp.isNodeReadyForRoutineReplicaTransferInternal
sp.detailsMu.storeDetails = make(map[roachpb.StoreID]*storeDetail)
sp.localitiesMu.nodeLocalities = make(map[roachpb.NodeID]localityWithString)

Expand Down Expand Up @@ -812,3 +819,24 @@ func (sp *StorePool) getNodeLocalityString(nodeID roachpb.NodeID) string {
}
return locality.str
}

func (sp *StorePool) isNodeReadyForRoutineReplicaTransferInternal(
ctx context.Context, targetNodeID roachpb.NodeID,
) bool {
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
// We use Now().GoTime() instead of PhysicalTime() as per the
// comment on top of IsLive().
now := sp.clock.Now().GoTime()

liveness := sp.nodeLivenessFn(
targetNodeID, now, timeUntilStoreDead)
res := liveness == livenesspb.NodeLivenessStatus_LIVE
if res {
log.VEventf(ctx, 3,
"n%d is a live target, candidate for rebalancing", targetNodeID)
} else {
log.VEventf(ctx, 3,
"not considering non-live node n%d (%s)", targetNodeID, liveness)
}
return res
}
24 changes: 21 additions & 3 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/redact"
"go.etcd.io/etcd/raft"
)

Expand Down Expand Up @@ -417,7 +418,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
}

meanQPS := storeList.candidateQueriesPerSecond.mean
if shouldNotMoveTo(ctx, storeMap, replWithStats, candidate.StoreID, meanQPS, minQPS, maxQPS) {
if sr.shouldNotMoveTo(ctx, storeMap, replWithStats, candidate.StoreID, meanQPS, minQPS, maxQPS) {
continue
}

Expand Down Expand Up @@ -521,6 +522,13 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance(
// could cause mass evictions if the storePool gets out of sync.
storeDesc, ok := storeMap[currentReplicas[i].StoreID]
if !ok || storeDesc.Capacity.QueriesPerSecond < maxQPS {
if log.V(3) {
var reason redact.RedactableString
if ok {
reason = redact.Sprintf(" (qps %.2f vs max %.2f)", storeDesc.Capacity.QueriesPerSecond, maxQPS)
}
log.VEventf(ctx, 3, "keeping r%d/%d on s%d%s", desc.RangeID, currentReplicas[i].ReplicaID, currentReplicas[i].StoreID, reason)
}
targets = append(targets, roachpb.ReplicationTarget{
NodeID: currentReplicas[i].NodeID,
StoreID: currentReplicas[i].StoreID,
Expand Down Expand Up @@ -553,7 +561,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance(
}

meanQPS := storeList.candidateQueriesPerSecond.mean
if shouldNotMoveTo(ctx, storeMap, replWithStats, target.StoreID, meanQPS, minQPS, maxQPS) {
if sr.shouldNotMoveTo(ctx, storeMap, replWithStats, target.StoreID, meanQPS, minQPS, maxQPS) {
break
}

Expand Down Expand Up @@ -634,7 +642,7 @@ func shouldNotMoveAway(
return false
}

func shouldNotMoveTo(
func (sr *StoreRebalancer) shouldNotMoveTo(
ctx context.Context,
storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor,
replWithStats replicaWithStats,
Expand Down Expand Up @@ -664,6 +672,16 @@ func shouldNotMoveTo(
return true
}

// If the target store is on a separate node, we will also care
// about node liveness.
targetNodeID := storeDesc.Node.NodeID
if targetNodeID != sr.rq.store.Ident.NodeID {
if !sr.rq.store.cfg.StorePool.isNodeReadyForRoutineReplicaTransfer(ctx, targetNodeID) {
log.VEventf(ctx, 3,
"refusing to transfer replica to n%d/s%d", targetNodeID, storeDesc.StoreID)
return true
}
}
return false
}

Expand Down
Loading

0 comments on commit ffa0ce3

Please sign in to comment.