Skip to content

Commit

Permalink
storage: reimplement StorePool.AvailableNodeCount
Browse files Browse the repository at this point in the history
Reimplement `StorePool.AvailableNodeCount` in terms of
`NodeLiveness.GetNodeCount`. The latter returns a count of the user's
intended number of nodes in the cluster. This is added nodes minus
decommissioning (or decommissioned) nodes.

This fixes misbehavior of the dynamic replication factor heuristic. The
previous `StorePool.AvailableNodeCount` implementation would fluctuate
depending on the number of node descriptors that had been received from
gossip and the number of dead nodes. So if you had a 5-node cluster and
2 nodes died for more than 5 min (and thus marked as dead), the cluster
would suddenly start down-replicating ranges. Similarly, if you had a
5-node cluster and you took down all 5-nodes and only restarted 3, the
cluster would start down-replicating ranges. The new behavior is to
consider a node part of the cluster until it is decommissioned. This
better matches user expectations.

Fixes cockroachdb#34122

Release note (bug fix): Avoid down-replicating widely replicated ranges
when nodes in the cluster are temporarily down.
  • Loading branch information
petermattis committed Jan 23, 2019
1 parent 14f0633 commit 0ddac85
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 95 deletions.
1 change: 1 addition & 0 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func createTestNode(
st,
cfg.Gossip,
cfg.Clock,
cfg.NodeLiveness.GetNodeCount,
storage.MakeStorePoolNodeLivenessFunc(cfg.NodeLiveness),
/* deterministic */ false,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.st,
s.gossip,
s.clock,
s.nodeLiveness.GetNodeCount,
storage.MakeStorePoolNodeLivenessFunc(s.nodeLiveness),
/* deterministic */ false,
)
Expand Down
140 changes: 85 additions & 55 deletions pkg/storage/allocator_test.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ func (m *multiTestContext) populateStorePool(idx int, nodeLiveness *storage.Node
m.storeConfig.Settings,
m.gossips[idx],
m.clocks[idx],
nodeLiveness.GetNodeCount,
storage.MakeStorePoolNodeLivenessFunc(nodeLiveness),
/* deterministic */ false,
)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func NewTestStorePool(cfg StoreConfig) *StorePool {
cfg.Settings,
cfg.Gossip,
cfg.Clock,
// NodeCountFunc
func() int {
return 1
},
func(roachpb.NodeID, time.Time, time.Duration) NodeLivenessStatus {
return NodeLivenessStatus_LIVE
},
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,3 +1015,17 @@ func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn {
return now, ctpb.Epoch(liveness.Epoch), nil
}
}

// GetNodeCount returns a count of the number of nodes in the cluster,
// including dead nodes, but excluding decommissioning or decommissioned nodes.
func (nl *NodeLiveness) GetNodeCount() int {
nl.mu.Lock()
defer nl.mu.Unlock()
var count int
for _, l := range nl.mu.nodes {
if !l.Decommissioning {
count++
}
}
return count
}
38 changes: 12 additions & 26 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ var TimeUntilStoreDead = settings.RegisterValidatedDurationSetting(
},
)

// The NodeCountFunc returns a count of the total number of nodes the user
// intends for their to be in the cluster. The count includes dead nodes, but
// not decommissioned nodes.
type NodeCountFunc func() int

// A NodeLivenessFunc accepts a node ID, current time and threshold before
// a node is considered dead and returns whether or not the node is live.
type NodeLivenessFunc func(roachpb.NodeID, time.Time, time.Duration) NodeLivenessStatus
Expand Down Expand Up @@ -195,6 +200,7 @@ type StorePool struct {

clock *hlc.Clock
gossip *gossip.Gossip
nodeCountFn NodeCountFunc
nodeLivenessFn NodeLivenessFunc
startTime time.Time
deterministic bool
Expand All @@ -219,6 +225,7 @@ func NewStorePool(
st *cluster.Settings,
g *gossip.Gossip,
clock *hlc.Clock,
nodeCountFn NodeCountFunc,
nodeLivenessFn NodeLivenessFunc,
deterministic bool,
) *StorePool {
Expand All @@ -227,6 +234,7 @@ func NewStorePool(
st: st,
clock: clock,
gossip: g,
nodeCountFn: nodeCountFn,
nodeLivenessFn: nodeLivenessFn,
startTime: clock.PhysicalTime(),
deterministic: deterministic,
Expand Down Expand Up @@ -435,33 +443,11 @@ func (sp *StorePool) decommissioningReplicas(
return
}

// AvailableNodeCount returns the number of nodes which are considered
// available for use as allocation targets. This includes only nodes which are
// not dead or decommissioning. It notably does include nodes that are not
// considered live by node liveness but are also not yet considered dead.
// AvailableNodeCount returns the number of nodes that are possible allocation
// targets. This includes dead nodes, but not decommissioning or decommissioned
// nodes.
func (sp *StorePool) AvailableNodeCount() int {
sp.detailsMu.RLock()
defer sp.detailsMu.RUnlock()

now := sp.clock.PhysicalTime()
availableNodes := map[roachpb.NodeID]struct{}{}
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)

for _, detail := range sp.detailsMu.storeDetails {
if detail.desc == nil {
continue
}
switch s := detail.status(now, timeUntilStoreDead, 0, sp.nodeLivenessFn); s {
case storeStatusThrottled, storeStatusAvailable, storeStatusUnknown, storeStatusReplicaCorrupted:
availableNodes[detail.desc.Node.NodeID] = struct{}{}
case storeStatusDead, storeStatusDecommissioning:
// Do nothing; this node can't/shouldn't have any replicas on it.
default:
panic(fmt.Sprintf("unknown store status: %d", s))
}
}

return len(availableNodes)
return sp.nodeCountFn()
}

// liveAndDeadReplicas divides the provided repls slice into two slices: the
Expand Down
46 changes: 35 additions & 11 deletions pkg/storage/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ func (m *mockNodeLiveness) nodeLivenessFunc(
// createTestStorePool creates a stopper, gossip and storePool for use in
// tests. Stopper must be stopped by the caller.
func createTestStorePool(
timeUntilStoreDeadValue time.Duration, deterministic bool, defaultNodeStatus NodeLivenessStatus,
timeUntilStoreDeadValue time.Duration,
deterministic bool,
nodeCount NodeCountFunc,
defaultNodeStatus NodeLivenessStatus,
) (*stop.Stopper, *gossip.Gossip, *hlc.ManualClock, *StorePool, *mockNodeLiveness) {
stopper := stop.NewStopper()
mc := hlc.NewManualClock(123)
Expand All @@ -111,6 +114,7 @@ func createTestStorePool(
st,
g,
clock,
nodeCount,
mnl.nodeLivenessFunc,
deterministic,
)
Expand All @@ -122,7 +126,9 @@ func createTestStorePool(
func TestStorePoolGossipUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, sp, _ := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 0 }, /* NodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())
sg := gossiputil.NewStoreGossiper(g)

Expand Down Expand Up @@ -179,7 +185,9 @@ func TestStorePoolGetStoreList(t *testing.T) {
defer leaktest.AfterTest(t)()
// We're going to manually mark stores dead in this test.
stopper, g, _, sp, mnl := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())
sg := gossiputil.NewStoreGossiper(g)
constraints := []config.Constraints{
Expand Down Expand Up @@ -432,7 +440,9 @@ func TestStorePoolUpdateLocalStore(t *testing.T) {
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
// We're going to manually mark stores dead in this test.
stopper, g, _, sp, _ := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())
sg := gossiputil.NewStoreGossiper(g)
stores := []*roachpb.StoreDescriptor{
Expand Down Expand Up @@ -554,7 +564,9 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) {
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
stopper, _, _, sp, _ := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())

// Create store.
Expand Down Expand Up @@ -600,7 +612,9 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) {
func TestStorePoolGetStoreDetails(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, sp, _ := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(uniqueStore, t)
Expand All @@ -618,7 +632,9 @@ func TestStorePoolGetStoreDetails(t *testing.T) {
func TestStorePoolFindDeadReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, sp, mnl := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())
sg := gossiputil.NewStoreGossiper(g)

Expand Down Expand Up @@ -719,7 +735,9 @@ func TestStorePoolFindDeadReplicas(t *testing.T) {
func TestStorePoolDefaultState(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, _, _, sp, _ := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())

liveReplicas, deadReplicas := sp.liveAndDeadReplicas(0, []roachpb.ReplicaDescriptor{{StoreID: 1}})
Expand All @@ -742,7 +760,9 @@ func TestStorePoolDefaultState(t *testing.T) {
func TestStorePoolThrottle(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, sp, _ := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())

sg := gossiputil.NewStoreGossiper(g)
Expand Down Expand Up @@ -778,7 +798,9 @@ func TestStorePoolThrottle(t *testing.T) {
func TestGetLocalities(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, sp, _ := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())
sg := gossiputil.NewStoreGossiper(g)

Expand Down Expand Up @@ -847,7 +869,9 @@ func TestGetLocalities(t *testing.T) {
func TestStorePoolDecommissioningReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, sp, mnl := createTestStorePool(
TestTimeUntilStoreDead, false /* deterministic */, NodeLivenessStatus_DEAD)
TestTimeUntilStoreDead, false, /* deterministic */
func() int { return 10 }, /* nodeCount */
NodeLivenessStatus_DEAD)
defer stopper.Stop(context.TODO())
sg := gossiputil.NewStoreGossiper(g)

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/store_rebalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestChooseLeaseToTransfer(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */)
defer stopper.Stop(context.Background())
gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t)
storeList, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestChooseReplicaToRebalance(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */)
defer stopper.Stop(context.Background())
gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t)
storeList, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled)
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
stopper, g, _, a, _ := createTestAllocator(10, false /* deterministic */)
defer stopper.Stop(context.Background())
gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t)
storeList, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled)
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/localtestcluster/local_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto
cfg.Settings,
cfg.Gossip,
cfg.Clock,
cfg.NodeLiveness.GetNodeCount,
storage.MakeStorePoolNodeLivenessFunc(cfg.NodeLiveness),
/* deterministic */ false,
)
Expand Down

0 comments on commit 0ddac85

Please sign in to comment.