Skip to content

Commit

Permalink
Merge #94983
Browse files Browse the repository at this point in the history
94983: allocator: correct node count calculations with overrides r=AlexTalks a=AlexTalks

While we are correctly using the overrides set in the
`OverrideStorePool` for the purposes of the node liveness function, the
node count function did not properly incorporate the overrides
previously. This change rectifies that, using the preset overrides
specified at creation of the override store pool to calculate the number
of non-decommissioning, non-decommissioned nodes (alive or dead), as
viewed by the override store pool. This allows for correct calculation
of the number of needed voters, allowing us to correctly determine which
allocation action is needed for a range.

Depends on #93758.

Epic: [CRDB-20924](https://cockroachlabs.atlassian.net/browse/CRDB-20924)

Release note: None

Co-authored-by: Alex Sarkesian <[email protected]>
  • Loading branch information
craig[bot] and AlexTalks committed Jan 25, 2023
2 parents 0f5863a + c2e2d13 commit 26be5f2
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 51 deletions.
106 changes: 62 additions & 44 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,11 +834,23 @@ func TestAllocatorExistingReplica(t *testing.T) {
func TestAllocatorReplaceDecommissioningReplica(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
storeDescriptors := sameDCStores

getNumNodes := func() int {
numNodes := 0
for _, storeDesc := range storeDescriptors {
if storeDesc.Node.NodeID == roachpb.NodeID(3) {
continue
}
numNodes++
}
return numNodes
}

ctx := context.Background()
stopper, g, sp, a, _ := CreateTestAllocator(ctx, 1, false /* deterministic */)
stopper, g, sp, a, _ := CreateTestAllocator(ctx, getNumNodes(), false /* deterministic */)
defer stopper.Stop(ctx)
gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t)
gossiputil.NewStoreGossiper(g).GossipStores(storeDescriptors, t)

// Override liveness of n3 to decommissioning so the only available target is s4.
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
Expand All @@ -847,7 +859,7 @@ func TestAllocatorReplaceDecommissioningReplica(t *testing.T) {
}

return sp.NodeLivenessFn(nid, now, timeUntilStoreDead)
})
}, getNumNodes)

result, _, err := a.AllocateVoter(
ctx,
Expand Down Expand Up @@ -6598,27 +6610,32 @@ func TestAllocatorComputeActionWithStorePoolRemoveDead(t *testing.T) {
},
}

ctx := context.Background()
stopper, _, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */)
defer stopper.Stop(ctx)

for i, tcase := range testCases {
// Mark all dead nodes as alive, so we can override later.
all := append(tcase.live, tcase.dead...)
mockStorePool(sp, all, nil, nil, nil, nil, nil)
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
for _, deadStoreID := range tcase.dead {
if nid == roachpb.NodeID(deadStoreID) {
return livenesspb.NodeLivenessStatus_DEAD
}
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ctx := context.Background()
getNumNodes := func() int {
return len(tcase.live) + len(tcase.dead)
}
stopper, _, sp, a, _ := CreateTestAllocator(ctx, getNumNodes(), false /* deterministic */)
defer stopper.Stop(ctx)

return sp.NodeLivenessFn(nid, now, timeUntilStoreDead)
// Mark all dead nodes as alive, so we can override later.
all := append(tcase.live, tcase.dead...)
mockStorePool(sp, all, nil, nil, nil, nil, nil)
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
for _, deadStoreID := range tcase.dead {
if nid == roachpb.NodeID(deadStoreID) {
return livenesspb.NodeLivenessStatus_DEAD
}
}

return sp.NodeLivenessFn(nid, now, timeUntilStoreDead)
}, getNumNodes)
action, _ := a.ComputeAction(ctx, oSp, conf, &tcase.desc)
if tcase.expectedAction != action {
t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action)
}
})
action, _ := a.ComputeAction(ctx, oSp, conf, &tcase.desc)
if tcase.expectedAction != action {
t.Errorf("Test case %d expected action %d, got action %d", i, tcase.expectedAction, action)
}
}
}

Expand Down Expand Up @@ -7241,34 +7258,35 @@ func TestAllocatorComputeActionWithStorePoolDecommission(t *testing.T) {
},
}

ctx := context.Background()
stopper, _, sp, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */)
defer stopper.Stop(ctx)

for i, tcase := range testCases {
// Mark all decommissioning and decommissioned nodes as alive, so we can override later.
all := append(tcase.live, tcase.decommissioning...)
all = append(all, tcase.decommissioned...)
overrideLivenessMap := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus)
for _, sID := range tcase.decommissioned {
overrideLivenessMap[roachpb.NodeID(sID)] = livenesspb.NodeLivenessStatus_DECOMMISSIONED
}
for _, sID := range tcase.decommissioning {
overrideLivenessMap[roachpb.NodeID(sID)] = livenesspb.NodeLivenessStatus_DECOMMISSIONING
}
mockStorePool(sp, all, nil, tcase.dead, nil, nil, nil)
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if liveness, ok := overrideLivenessMap[nid]; ok {
return liveness
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ctx := context.Background()
getNumNodes := func() int {
return len(tcase.live) + len(tcase.dead)
}

return sp.NodeLivenessFn(nid, now, timeUntilStoreDead)
stopper, _, sp, a, _ := CreateTestAllocator(ctx, getNumNodes(), false /* deterministic */)
defer stopper.Stop(ctx)

// Mark all decommissioning and decommissioned nodes as alive, so we can override later.
all := append(tcase.live, tcase.decommissioning...)
all = append(all, tcase.decommissioned...)
overrideLivenessMap := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus)
for _, sID := range tcase.decommissioned {
overrideLivenessMap[roachpb.NodeID(sID)] = livenesspb.NodeLivenessStatus_DECOMMISSIONED
}
for _, sID := range tcase.decommissioning {
overrideLivenessMap[roachpb.NodeID(sID)] = livenesspb.NodeLivenessStatus_DECOMMISSIONING
}
mockStorePool(sp, all, nil, tcase.dead, nil, nil, nil)
oSp := storepool.NewOverrideStorePool(sp,
storepool.OverrideNodeLivenessFunc(overrideLivenessMap, sp.NodeLivenessFn), getNumNodes,
)
action, _ := a.ComputeAction(ctx, oSp, tcase.conf, &tcase.desc)
if tcase.expectedAction != action {
t.Errorf("Test case %d expected action %s, got action %s", i, tcase.expectedAction, action)
}
})
action, _ := a.ComputeAction(ctx, oSp, tcase.conf, &tcase.desc)
if tcase.expectedAction != action {
t.Errorf("Test case %d expected action %s, got action %s", i, tcase.expectedAction, action)
continue
}
}
}

Expand Down
20 changes: 18 additions & 2 deletions pkg/kv/kvserver/allocator/storepool/override_store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
Expand All @@ -39,6 +40,7 @@ type OverrideStorePool struct {
sp *StorePool

overrideNodeLivenessFn NodeLivenessFunc
overrideNodeCountFn NodeCountFunc
}

var _ AllocatorStorePool = &OverrideStorePool{}
Expand All @@ -58,13 +60,27 @@ func OverrideNodeLivenessFunc(
}
}

// OverrideNodeCountFunc constructs a NodeCountFunc based on a set of predefined
// overrides. If any nodeID does not have an override, the real liveness is used
// for the count for the number of nodes not decommissioning or decommissioned.
func OverrideNodeCountFunc(
overrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus, nodeLiveness *liveness.NodeLiveness,
) NodeCountFunc {
return func() int {
return nodeLiveness.GetNodeCountWithOverrides(overrides)
}
}

// NewOverrideStorePool constructs an OverrideStorePool that can use its own
// view of node liveness while falling through to an underlying store pool for
// the state of peer stores.
func NewOverrideStorePool(storePool *StorePool, nl NodeLivenessFunc) *OverrideStorePool {
func NewOverrideStorePool(
storePool *StorePool, nl NodeLivenessFunc, nc NodeCountFunc,
) *OverrideStorePool {
return &OverrideStorePool{
sp: storePool,
overrideNodeLivenessFn: nl,
overrideNodeCountFn: nc,
}
}

Expand Down Expand Up @@ -133,7 +149,7 @@ func (o *OverrideStorePool) LiveAndDeadReplicas(

// ClusterNodeCount implements the AllocatorStorePool interface.
func (o *OverrideStorePool) ClusterNodeCount() int {
return o.sp.ClusterNodeCount()
return o.overrideNodeCountFn()
}

// IsDeterministic implements the AllocatorStorePool interface.
Expand Down
39 changes: 36 additions & 3 deletions pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ func TestOverrideStorePoolStatusString(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
const nodeCount = 5

stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st,
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
func() int { return nodeCount }, /* nodeCount */
livenesspb.NodeLivenessStatus_DEAD)
defer stopper.Stop(ctx)
sg := gossiputil.NewStoreGossiper(g)
Expand All @@ -46,6 +48,15 @@ func TestOverrideStorePoolStatusString(t *testing.T) {
}

return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead)
}, func() int {
excluded := 0
for _, overriddenLiveness := range livenessOverrides {
if overriddenLiveness == livenesspb.NodeLivenessStatus_DECOMMISSIONING ||
overriddenLiveness == livenesspb.NodeLivenessStatus_DECOMMISSIONED {
excluded++
}
}
return nodeCount - excluded
})

stores := []*roachpb.StoreDescriptor{
Expand Down Expand Up @@ -102,9 +113,11 @@ func TestOverrideStorePoolDecommissioningReplicas(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
const nodeCount = 5

stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st,
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
func() int { return nodeCount }, /* nodeCount */
livenesspb.NodeLivenessStatus_DEAD)
defer stopper.Stop(ctx)
sg := gossiputil.NewStoreGossiper(g)
Expand All @@ -116,6 +129,15 @@ func TestOverrideStorePoolDecommissioningReplicas(t *testing.T) {
}

return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead)
}, func() int {
excluded := 0
for _, overriddenLiveness := range livenessOverrides {
if overriddenLiveness == livenesspb.NodeLivenessStatus_DECOMMISSIONING ||
overriddenLiveness == livenesspb.NodeLivenessStatus_DECOMMISSIONED {
excluded++
}
}
return nodeCount - excluded
})

stores := []*roachpb.StoreDescriptor{
Expand Down Expand Up @@ -207,10 +229,12 @@ func TestOverrideStorePoolGetStoreList(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
const nodeCount = 8

// We're going to manually mark stores dead in this test.
stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st,
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
func() int { return nodeCount }, /* nodeCount */
livenesspb.NodeLivenessStatus_DEAD)
defer stopper.Stop(ctx)
sg := gossiputil.NewStoreGossiper(g)
Expand All @@ -222,6 +246,15 @@ func TestOverrideStorePoolGetStoreList(t *testing.T) {
}

return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead)
}, func() int {
excluded := 0
for _, overriddenLiveness := range livenessOverrides {
if overriddenLiveness == livenesspb.NodeLivenessStatus_DECOMMISSIONING ||
overriddenLiveness == livenesspb.NodeLivenessStatus_DECOMMISSIONED {
excluded++
}
}
return nodeCount - excluded
})

constraints := []roachpb.ConstraintsConjunction{
Expand Down
58 changes: 58 additions & 0 deletions pkg/kv/kvserver/liveness/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,61 @@ func TestNodeLivenessDecommissionedCallback(t *testing.T) {

}
}

// TestNodeLivenessNodeCount tests GetNodeCount() and GetNodeCountWithOverrides,
// which are critical for computing the number of needed voters for a range.
func TestNodeLivenessNodeCount(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

numNodes := 5
ctx := context.Background()
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

// At this point StartTestCluster has waited for all nodes to become live.
nl1 := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness)
require.Equal(t, numNodes, nl1.GetNodeCount())

// Mark n5 as decommissioning, which should reduce node count.
chg, err := nl1.SetMembershipStatus(ctx, 5, livenesspb.MembershipStatus_DECOMMISSIONING)
require.NoError(t, err)
require.True(t, chg)
testutils.SucceedsSoon(t, func() error {
l, ok := nl1.GetLiveness(5)
if !ok || !l.Membership.Decommissioning() {
return errors.Errorf("expected n5 to be decommissioning")
}
numNodes -= 1
return nil
})
require.Equal(t, numNodes, nl1.GetNodeCount())

// Mark n5 as decommissioning -> decommissioned, which should not change node count.
chg, err = nl1.SetMembershipStatus(ctx, 5, livenesspb.MembershipStatus_DECOMMISSIONED)
require.NoError(t, err)
require.True(t, chg)
testutils.SucceedsSoon(t, func() error {
l, ok := nl1.GetLiveness(5)
if !ok || !l.Membership.Decommissioned() {
return errors.Errorf("expected n5 to be decommissioned")
}
return nil
})
require.Equal(t, numNodes, nl1.GetNodeCount())

// Override n5 as decommissioning, which should not change node count.
overrides := map[roachpb.NodeID]livenesspb.NodeLivenessStatus{
5: livenesspb.NodeLivenessStatus_DECOMMISSIONING,
}
require.Equal(t, numNodes, nl1.GetNodeCountWithOverrides(nil))
require.Equal(t, numNodes, nl1.GetNodeCountWithOverrides(overrides))

// Override n4 as dead, which should not change node count.
overrides[4] = livenesspb.NodeLivenessStatus_DEAD
require.Equal(t, numNodes, nl1.GetNodeCountWithOverrides(overrides))

// Override n3 as decommissioning, which should reduce node count.
overrides[3] = livenesspb.NodeLivenessStatus_DECOMMISSIONING
require.Equal(t, numNodes-1, nl1.GetNodeCountWithOverrides(overrides))
}
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,27 @@ func (nl *NodeLiveness) GetNodeCount() int {
return count
}

// GetNodeCountWithOverrides returns a count of the number of nodes in the cluster,
// including dead nodes, but excluding decommissioning or decommissioned nodes,
// using the provided set of liveness overrides.
func (nl *NodeLiveness) GetNodeCountWithOverrides(
overrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus,
) int {
nl.mu.RLock()
defer nl.mu.RUnlock()
var count int
for _, l := range nl.mu.nodes {
if l.Membership.Active() {
if overrideStatus, ok := overrides[l.NodeID]; !ok ||
(overrideStatus != livenesspb.NodeLivenessStatus_DECOMMISSIONING &&
overrideStatus != livenesspb.NodeLivenessStatus_DECOMMISSIONED) {
count++
}
}
}
return count
}

// TestingSetDrainingInternal is a testing helper to set the internal draining
// state for a NodeLiveness instance.
func (nl *NodeLiveness) TestingSetDrainingInternal(
Expand Down
Loading

0 comments on commit 26be5f2

Please sign in to comment.