Skip to content

Commit

Permalink
allocator: add support for store pool liveness overrides
Browse files Browse the repository at this point in the history
While previously the allocator only evaluated using liveness obtained
from gossip, this change introduces a new `OverrideStorePool` struct
which can be used to override the liveness of a node for the purposes of
evaluating allocator actions and targets.  This `OverrideStorePool` is
backed by an existing actual `StorePool`, which retains the majority of
its logic.

Depends on #91461.

Part of #91570.

Release note: None
  • Loading branch information
AlexTalks committed Nov 18, 2022
1 parent b6e661e commit 45dd43f
Show file tree
Hide file tree
Showing 5 changed files with 595 additions and 17 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/allocator/storepool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "storepool",
srcs = [
"override_store_pool.go",
"store_pool.go",
"test_helpers.go",
],
Expand Down Expand Up @@ -35,7 +36,10 @@ go_library(

go_test(
name = "storepool_test",
srcs = ["store_pool_test.go"],
srcs = [
"override_store_pool_test.go",
"store_pool_test.go",
],
args = ["-test.timeout=295s"],
embed = [":storepool"],
deps = [
Expand Down
161 changes: 161 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/override_store_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storepool

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// OverrideStorePool is an implementation of AllocatorStorePool that allows
// the ability to override a node's liveness status for the purposes of
// evaluation for the allocator, otherwise delegating to an actual StorePool
// for all its logic, including management and lookup of store descriptors.
//
// The OverrideStorePool's methods fall through to the underlying StorePool
// while passing in the configured NodeLivenessFunc to be used in place of the
// underlying StorePool's. This is only the case, however, for StorePool methods
// that are read-only, despite StorePool.DetailsMu being held in write mode in
// some functions, avoiding the possibility of mutating underlying state.
type OverrideStorePool struct {
sp *StorePool

overrideNodeLivenessFn NodeLivenessFunc
}

var _ AllocatorStorePool = &OverrideStorePool{}

func NewOverrideStorePool(storePool *StorePool, nl NodeLivenessFunc) *OverrideStorePool {
return &OverrideStorePool{
sp: storePool,
overrideNodeLivenessFn: nl,
}
}

func (o *OverrideStorePool) String() string {
return o.sp.statusString(o.overrideNodeLivenessFn)
}

// IsStoreReadyForRoutineReplicaTransfer implements the AllocatorStorePool interface.
func (o *OverrideStorePool) IsStoreReadyForRoutineReplicaTransfer(
ctx context.Context, targetStoreID roachpb.StoreID,
) bool {
return o.sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID, o.overrideNodeLivenessFn)
}

// DecommissioningReplicas implements the AllocatorStorePool interface.
func (o *OverrideStorePool) DecommissioningReplicas(
repls []roachpb.ReplicaDescriptor,
) []roachpb.ReplicaDescriptor {
return o.sp.decommissioningReplicasWithLiveness(repls, o.overrideNodeLivenessFn)
}

// GetStoreList implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GetStoreList(
filter StoreFilter,
) (StoreList, int, ThrottledStoreReasons) {
o.sp.DetailsMu.Lock()
defer o.sp.DetailsMu.Unlock()

var storeIDs roachpb.StoreIDSlice
for storeID := range o.sp.DetailsMu.StoreDetails {
storeIDs = append(storeIDs, storeID)
}
return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter)
}

// GetStoreListFromIDs implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GetStoreListFromIDs(
storeIDs roachpb.StoreIDSlice, filter StoreFilter,
) (StoreList, int, ThrottledStoreReasons) {
o.sp.DetailsMu.Lock()
defer o.sp.DetailsMu.Unlock()
return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter)
}

// LiveAndDeadReplicas implements the AllocatorStorePool interface.
func (o *OverrideStorePool) LiveAndDeadReplicas(
repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool,
) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) {
return o.sp.liveAndDeadReplicasWithLiveness(repls, o.overrideNodeLivenessFn, includeSuspectAndDrainingStores)
}

// ClusterNodeCount implements the AllocatorStorePool interface.
func (o *OverrideStorePool) ClusterNodeCount() int {
return o.sp.ClusterNodeCount()
}

// IsDeterministic implements the AllocatorStorePool interface.
func (o *OverrideStorePool) IsDeterministic() bool {
return o.sp.deterministic
}

// Clock implements the AllocatorStorePool interface.
func (o *OverrideStorePool) Clock() *hlc.Clock {
return o.sp.clock
}

// GetLocalitiesByNode implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GetLocalitiesByNode(
replicas []roachpb.ReplicaDescriptor,
) map[roachpb.NodeID]roachpb.Locality {
return o.sp.GetLocalitiesByNode(replicas)
}

// GetLocalitiesByStore implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GetLocalitiesByStore(
replicas []roachpb.ReplicaDescriptor,
) map[roachpb.StoreID]roachpb.Locality {
return o.sp.GetLocalitiesByStore(replicas)
}

// GetStores implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GetStores() map[roachpb.StoreID]roachpb.StoreDescriptor {
return o.sp.GetStores()
}

// GetStoreDescriptor implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GetStoreDescriptor(
storeID roachpb.StoreID,
) (roachpb.StoreDescriptor, bool) {
return o.sp.GetStoreDescriptor(storeID)
}

// GossipNodeIDAddress implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GossipNodeIDAddress(
nodeID roachpb.NodeID,
) (*util.UnresolvedAddr, error) {
return o.sp.GossipNodeIDAddress(nodeID)
}

// UpdateLocalStoreAfterRebalance implements the AllocatorStorePool interface.
// This override method is a no-op, as
// StorePool.UpdateLocalStoreAfterRebalance(..) is not a read-only method and
// mutates the state of the held store details.
func (o *OverrideStorePool) UpdateLocalStoreAfterRebalance(
_ roachpb.StoreID,
_ allocator.RangeUsageInfo,
_ roachpb.ReplicaChangeType,
) {
}

// UpdateLocalStoresAfterLeaseTransfer implements the AllocatorStorePool interface.
// This override method is a no-op, as
// StorePool.UpdateLocalStoresAfterLeaseTransfer(..) is not a read-only method and
// mutates the state of the held store details.
func (o *OverrideStorePool) UpdateLocalStoresAfterLeaseTransfer(
_ roachpb.StoreID, _ roachpb.StoreID, _ float64,
) {
}
Loading

0 comments on commit 45dd43f

Please sign in to comment.