Skip to content

Commit

Permalink
Merge #55350
Browse files Browse the repository at this point in the history
55350: server: support adding stores to existing nodes r=irfansharif a=TheSamHuang

We need to bootstrap additional stores asynchronously. Consider the range that
houses the store ID allocator. When restarting the set of nodes that holds a
quorum of these replicas, when restarting them with additional stores, those
additional stores will require store IDs to get fully bootstrapped. But if
we're gating node start (specifically opening up the RPC floodgates) on having
all stores fully bootstrapped, we'll simply hang when trying to allocate store
IDs.  See TestAddNewStoresToExistingNodes and #39415 for more details.

Instead we opt to bootstrap additional stores asynchronously, and rely on the
blocking function to signal to the caller that all stores have been fully
bootstrapped.

Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
craig[bot] and irfansharif committed Oct 22, 2020
2 parents b824913 + 991a01d commit 4e88183
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 21 deletions.
140 changes: 140 additions & 0 deletions pkg/server/multi_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestAddNewStoresToExistingNodes tests database behavior with
// multiple stores per node, in particular when new stores are
// added while nodes are shut down. This test starts a cluster with
// three nodes, shuts down all nodes and adds a store to each node,
// and ensures nodes start back up successfully. See #39415.
func TestAddNewStoresToExistingNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t, "too many new stores and nodes for stress")

ctx := context.Background()

n1s1, n1cleanup1 := testutils.TempDir(t)
defer n1cleanup1()
n2s1, n2cleanup1 := testutils.TempDir(t)
defer n2cleanup1()
n3s1, n3cleanup1 := testutils.TempDir(t)
defer n3cleanup1()

numNodes := 3
tcArgs := base.TestClusterArgs{
ServerArgsPerNode: map[int]base.TestServerArgs{
// NB: on my local (beefy) machine, upreplication
// takes ~6s. This is pretty hefty compared to ~1s
// with ephemeral stores. But - we need the real
// stores here. At the time of writing, we perform
// ~100 change replicas txns, all in all, and
// 0.06s for a replication change does seem ok.
0: {StoreSpecs: []base.StoreSpec{{Path: n1s1}}},
1: {StoreSpecs: []base.StoreSpec{{Path: n2s1}}},
2: {StoreSpecs: []base.StoreSpec{{Path: n3s1}}},
},
}

tc := testcluster.StartTestCluster(t, numNodes, tcArgs)
// NB: it's important that this test wait for full replication. Otherwise,
// with only a single voter on the range that allocates store IDs, it can
// pass erroneously. StartTestCluster already calls it, but we call it
// again explicitly.
if err := tc.WaitForFullReplication(); err != nil {
log.Fatalf(ctx, "while waiting for full replication: %v", err)
}
clusterID := tc.Server(0).ClusterID()
tc.Stopper().Stop(ctx)

// Add an additional store to each node.
n1s2, n1cleanup2 := testutils.TempDir(t)
defer n1cleanup2()
n2s2, n2cleanup2 := testutils.TempDir(t)
defer n2cleanup2()
n3s2, n3cleanup2 := testutils.TempDir(t)
defer n3cleanup2()

tcArgs = base.TestClusterArgs{
// We need ParallelStart since this is an existing cluster. If
// we started sequentially, then the first node would hang forever
// waiting for the KV layer to become available, but that only
// happens when the second node also starts.
ParallelStart: true,
ReplicationMode: base.ReplicationManual, // saves time
ServerArgsPerNode: map[int]base.TestServerArgs{
0: {
StoreSpecs: []base.StoreSpec{
{Path: n1s1}, {Path: n1s2},
},
},
1: {
StoreSpecs: []base.StoreSpec{
{Path: n2s1}, {Path: n2s2},
},
},
2: {
StoreSpecs: []base.StoreSpec{
{Path: n3s1}, {Path: n3s2},
},
},
},
}

// Start all nodes with additional stores.
tc = testcluster.StartTestCluster(t, numNodes, tcArgs)
defer tc.Stopper().Stop(ctx)

// Sanity check that we're testing what we wanted to test and didn't accidentally
// bootstrap three single-node clusters (who knows).
for _, srv := range tc.Servers {
require.Equal(t, clusterID, srv.ClusterID())
}

// Ensure all nodes have 2 stores available.
testutils.SucceedsSoon(t, func() error {
for _, server := range tc.Servers {
var storeCount = 0

err := server.GetStores().(*kvserver.Stores).VisitStores(
func(s *kvserver.Store) error {
storeCount++
return nil
},
)
if err != nil {
return errors.Errorf("failed to visit all nodes, got %v", err)
}

if storeCount != 2 {
return errors.Errorf("expected two stores to be available on node %v, got %v stores instead", server.NodeID(), storeCount)
}
}

return nil
})
}
68 changes: 47 additions & 21 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,19 @@ func (nm nodeMetrics) callComplete(d time.Duration, pErr *roachpb.Error) {
// IDs for bootstrapping the node itself or new stores as they're added
// on subsequent instantiations.
type Node struct {
stopper *stop.Stopper
clusterID *base.ClusterIDContainer // UUID for Cockroach cluster
Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology
storeCfg kvserver.StoreConfig // Config to use and pass to stores
eventLogger sql.EventLogger
stores *kvserver.Stores // Access to node-local stores
metrics nodeMetrics
recorder *status.MetricsRecorder
startedAt int64
lastUp int64
initialStart bool // True if this is the first time this node has started.
txnMetrics kvcoord.TxnMetrics
stopper *stop.Stopper
clusterID *base.ClusterIDContainer // UUID for Cockroach cluster
Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology
storeCfg kvserver.StoreConfig // Config to use and pass to stores
eventLogger sql.EventLogger
stores *kvserver.Stores // Access to node-local stores
metrics nodeMetrics
recorder *status.MetricsRecorder
startedAt int64
lastUp int64
initialStart bool // True if this is the first time this node has started.
txnMetrics kvcoord.TxnMetrics
bootstrapNewStoresCh chan struct{}

perReplicaServer kvserver.Server
}
Expand Down Expand Up @@ -333,10 +334,10 @@ func (n *Node) AnnotateCtxWithSpan(
return n.storeCfg.AmbientCtx.AnnotateCtxWithSpan(ctx, opName)
}

// start starts the node by registering the storage instance for the
// RPC service "Node" and initializing stores for each specified
// engine. Launches periodic store gossiping in a goroutine.
// A callback can be optionally provided that will be invoked once this node's
// start starts the node by registering the storage instance for the RPC
// service "Node" and initializing stores for each specified engine.
// Launches periodic store gossiping in a goroutine. A callback can
// be optionally provided that will be invoked once this node's
// NodeDescriptor is available, to help bootstrapping.
func (n *Node) start(
ctx context.Context,
Expand Down Expand Up @@ -462,12 +463,29 @@ func (n *Node) start(
return fmt.Errorf("failed to initialize the gossip interface: %s", err)
}

// Bootstrap any uninitialized stores.
//
// TODO(tbg): address https://github.com/cockroachdb/cockroach/issues/39415.
// Should be easy enough. Writing the test is probably most of the work.
// Bootstrap any uninitialized stores and define a function for blocking
// until new stores are fully bootstrapped. This function remains a
// no-op unless we find ourselves bootstrapping new stores.
if len(state.newEngines) > 0 {
if err := n.bootstrapStores(ctx, state.firstStoreID, state.newEngines, n.stopper); err != nil {
// We need to bootstrap additional stores asynchronously. Consider the range that
// houses the store ID allocator. When restarting the set of nodes that holds a
// quorum of these replicas, when restarting them with additional stores, those
// additional stores will require store IDs to get fully bootstrapped. But if we're
// gating node start (specifically opening up the RPC floodgates) on having all
// stores fully bootstrapped, we'll simply hang when trying to allocate store IDs.
// See TestAddNewStoresToExistingNodes and #39415 for more details.
//
// Instead we opt to bootstrap additional stores asynchronously, and rely on the
// blocking function n.waitForBootstrapNewStores() to signal to the caller that
// all stores have been fully bootstrapped.
n.bootstrapNewStoresCh = make(chan struct{})
if err := n.stopper.RunAsyncTask(ctx, "bootstrap-stores", func(ctx context.Context) {
if err := n.bootstrapStores(ctx, state.firstStoreID, state.newEngines, n.stopper); err != nil {
log.Fatalf(ctx, "while bootstrapping additional stores: %v", err)
}
close(n.bootstrapNewStoresCh)
}); err != nil {
close(n.bootstrapNewStoresCh)
return err
}
}
Expand All @@ -492,6 +510,14 @@ func (n *Node) start(
return nil
}

// waitForBootstrapNewStores blocks until all additional empty stores,
// if any, have been bootstrapped.
func (n *Node) waitForBootstrapNewStores() {
if n.bootstrapNewStoresCh != nil {
<-n.bootstrapNewStoresCh
}
}

// IsDraining returns true if at least one Store housed on this Node is not
// currently allowing range leases to be procured or extended.
func (n *Node) IsDraining() bool {
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1603,8 +1603,18 @@ func (s *Server) PreStart(ctx context.Context) error {
}
})

// After setting modeOperational, we can block until all stores are fully
// bootstrapped.
s.grpc.setMode(modeOperational)

// We'll block here until all stores are fully bootstrapped. We do this here for
// two reasons:
// - some of the components below depend on all stores being fully bootstrapped
// (like the debug server registration for e.g.)
// - we'll need to do it after having opened up the RPC floodgates (due to the
// hazard described in Node.start, around bootstrapping additional stores)
s.node.waitForBootstrapNewStores()

log.Infof(ctx, "starting %s server at %s (use: %s)",
redact.Safe(s.cfg.HTTPRequestScheme()), s.cfg.HTTPAddr, s.cfg.HTTPAdvertiseAddr)
rpcConnType := redact.SafeString("grpc/postgres")
Expand Down

0 comments on commit 4e88183

Please sign in to comment.