Skip to content

Commit

Permalink
server: initialize first store within the init server
Browse files Browse the repository at this point in the history
...when joining an existing cluster. This diff adds some sanity around
how we bootstrap stores for nodes when we're informed by the existing
cluster what the first store ID should be. Previously we were
bootstrapping the first store asynchronously, and that is not what we
want.

We first observed the implications of doing so in #56263, which
attempted to remove the use of gossip in cluster/node ID distribution.
There we noticed that our somewhat haphazard structure around
initialization of stores could lead to doubly allocating store IDs
(#56272). We were inadvertently safeguarded against this, as described
in #56271, but this structure is still pretty confusing and needed
cleanup.

Now the new store initialization structure is the following:
```
- In the init code:
  - If we're being bootstrapped:
    - Initialize all the stores
  - If we're joining an existing cluster:
    - Only initialize first store, leave remaining stores to start up code
      later

- Later, when initializing additional new stores:
  - Allocate len(auxiliary engines) store IDs, and initialize them
    asynchronously.
```

This lets us avoid threading in the first store ID, and always rely on
the KV increment operation to tell us what the store ID should be for
the first additional store. We update TestAddNewStoresToExistingNodes to
test allocation behaviour with more than just two stores.

Eventually we could simplify the init code to only initialize the first
store when we're bootstrapping (there's a longstanding TODO from Andrei
to that effect), but it's not strictly needed. This PR unblocks #56263.

Release note: None
  • Loading branch information
irfansharif committed Nov 4, 2020
1 parent c738ae0 commit ef35ee5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 41 deletions.
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ go_test(
"//pkg/util/timeutil",
"//pkg/util/uuid",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/dustin/go-humanize",
"//vendor/github.com/gogo/protobuf/jsonpb",
"//vendor/github.com/gogo/protobuf/proto",
"//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime",
Expand Down
45 changes: 36 additions & 9 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ type initDiskState struct {
// same we do when `cockroach init`-ialized).
type initState struct {
initDiskState

firstStoreID roachpb.StoreID
}

// NeedsInit is like needsInitLocked, except it acquires the necessary locks.
Expand Down Expand Up @@ -531,18 +529,47 @@ func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState
return nil, err
}

nodeID, storeID := roachpb.NodeID(resp.NodeID), roachpb.StoreID(resp.StoreID)
clusterVersion := clusterversion.ClusterVersion{Version: *resp.ActiveVersion}

s.mu.Lock()
defer s.mu.Unlock()

s.mu.inspectState.clusterID = clusterID
s.mu.inspectState.nodeID = roachpb.NodeID(resp.NodeID)
s.mu.inspectState.clusterVersion = clusterversion.ClusterVersion{Version: *resp.ActiveVersion}
diskState := *s.mu.inspectState
s.mu.Unlock()
s.mu.inspectState.nodeID = nodeID
s.mu.inspectState.clusterVersion = clusterVersion

state := &initState{
initDiskState: diskState,
firstStoreID: roachpb.StoreID(resp.StoreID),
if len(s.mu.inspectState.uninitializedEngines) < 1 {
log.Fatal(ctx, "expected to find at least one uninitialized engine")
}

// We initialize the very first store here, using the store ID handed to us.
sIdent := roachpb.StoreIdent{
ClusterID: clusterID,
NodeID: nodeID,
StoreID: storeID,
}

firstEngine := s.mu.inspectState.uninitializedEngines[0]
if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil {
return nil, err
}

// We construct the appropriate initState to indicate that we've initialized
// the first engine. We similarly trim it off the uninitializedEngines list
// so that when initializing auxiliary stores, if any, we know to avoid
// re-initializing the first store.
initializedEngines := []storage.Engine{firstEngine}
uninitializedEngines := s.mu.inspectState.uninitializedEngines[1:]
state := &initState{
initDiskState: initDiskState{
nodeID: nodeID,
clusterID: clusterID,
clusterVersion: clusterVersion,
initializedEngines: initializedEngines,
uninitializedEngines: uninitializedEngines,
},
}
return state, nil
}

Expand Down
43 changes: 32 additions & 11 deletions pkg/server/multi_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ package server_test

import (
"context"
"sort"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -71,14 +74,21 @@ func TestAddNewStoresToExistingNodes(t *testing.T) {
clusterID := tc.Server(0).ClusterID()
tc.Stopper().Stop(ctx)

// Add an additional store to each node.
// Add two additional stores to each node.
n1s2, n1cleanup2 := testutils.TempDir(t)
defer n1cleanup2()
n2s2, n2cleanup2 := testutils.TempDir(t)
defer n2cleanup2()
n3s2, n3cleanup2 := testutils.TempDir(t)
defer n3cleanup2()

n1s3, n1cleanup3 := testutils.TempDir(t)
defer n1cleanup3()
n2s3, n2cleanup3 := testutils.TempDir(t)
defer n2cleanup3()
n3s3, n3cleanup3 := testutils.TempDir(t)
defer n3cleanup3()

tcArgs = base.TestClusterArgs{
// We need ParallelStart since this is an existing cluster. If
// we started sequentially, then the first node would hang forever
Expand All @@ -89,17 +99,17 @@ func TestAddNewStoresToExistingNodes(t *testing.T) {
ServerArgsPerNode: map[int]base.TestServerArgs{
0: {
StoreSpecs: []base.StoreSpec{
{Path: n1s1}, {Path: n1s2},
{Path: n1s1}, {Path: n1s2}, {Path: n1s3},
},
},
1: {
StoreSpecs: []base.StoreSpec{
{Path: n2s1}, {Path: n2s2},
{Path: n2s1}, {Path: n2s2}, {Path: n2s3},
},
},
2: {
StoreSpecs: []base.StoreSpec{
{Path: n3s1}, {Path: n3s2},
{Path: n3s1}, {Path: n3s2}, {Path: n3s3},
},
},
},
Expand All @@ -115,23 +125,34 @@ func TestAddNewStoresToExistingNodes(t *testing.T) {
require.Equal(t, clusterID, srv.ClusterID())
}

// Ensure all nodes have 2 stores available.
// Ensure all nodes have all stores available, and each store has a unique
// store ID.
testutils.SucceedsSoon(t, func() error {
var storeIDs []roachpb.StoreID
for _, server := range tc.Servers {
var storeCount = 0

err := server.GetStores().(*kvserver.Stores).VisitStores(
if err := server.GetStores().(*kvserver.Stores).VisitStores(
func(s *kvserver.Store) error {
storeCount++
storeIDs = append(storeIDs, s.StoreID())
return nil
},
)
if err != nil {
); err != nil {
return errors.Errorf("failed to visit all nodes, got %v", err)
}

if storeCount != 2 {
return errors.Errorf("expected two stores to be available on node %v, got %v stores instead", server.NodeID(), storeCount)
if storeCount != 3 {
return errors.Errorf("expected 3 stores to be available on n%s, got %d stores instead", server.NodeID(), storeCount)
}
}

sort.Slice(storeIDs, func(i, j int) bool {
return storeIDs[i] < storeIDs[j]
})
for i := range storeIDs {
expStoreID := roachpb.StoreID(i + 1)
if storeIDs[i] != expStoreID {
t.Fatalf("expected the %s store to have storeID s%s, found s%s", humanize.Ordinal(i+1), expStoreID, storeIDs[i])
}
}

Expand Down
27 changes: 6 additions & 21 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ func bootstrapCluster(
initializedEngines: engines,
uninitializedEngines: nil,
},
firstStoreID: firstStoreID,
}
return state, nil
}
Expand Down Expand Up @@ -480,7 +479,7 @@ func (n *Node) start(
// sequence ID generator stored in a system key.
n.additionalStoreInitCh = make(chan struct{})
if err := n.stopper.RunAsyncTask(ctx, "initialize-additional-stores", func(ctx context.Context) {
if err := n.initializeAdditionalStores(ctx, state.firstStoreID, state.uninitializedEngines, n.stopper); err != nil {
if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines, n.stopper); err != nil {
log.Fatalf(ctx, "while initializing additional stores: %v", err)
}
close(n.additionalStoreInitCh)
Expand Down Expand Up @@ -584,10 +583,7 @@ func (n *Node) validateStores(ctx context.Context) error {
// allocated via a sequence id generator stored at a system key per node. The
// new stores are added to n.stores.
func (n *Node) initializeAdditionalStores(
ctx context.Context,
firstStoreID roachpb.StoreID,
engines []storage.Engine,
stopper *stop.Stopper,
ctx context.Context, engines []storage.Engine, stopper *stop.Stopper,
) error {
if n.clusterID.Get() == uuid.Nil {
return errors.New("missing cluster ID during initialization of additional store")
Expand All @@ -597,28 +593,17 @@ func (n *Node) initializeAdditionalStores(
// Initialize all waiting stores by allocating a new store id for each
// and invoking kvserver.InitEngine() to persist it. We'll then
// construct a new store out of the initialized engine and attach it to
// ourselves. The -1 comes from the fact that our first store ID has
// already been pre-allocated for us.
storeIDAlloc := int64(len(engines)) - 1
if firstStoreID == 0 {
// We lied, we don't have a firstStoreID; we'll need to allocate for
// that too.
//
// TODO(irfansharif): We get here if we're falling back to
// gossip-based connectivity. This can be removed in 21.1.
storeIDAlloc++
}
// ourselves.
storeIDAlloc := int64(len(engines))
startID, err := allocateStoreIDs(ctx, n.Descriptor.NodeID, storeIDAlloc, n.storeCfg.DB)
if firstStoreID == 0 {
firstStoreID = startID
}
if err != nil {
return errors.Errorf("error allocating store ids: %s", err)
}

sIdent := roachpb.StoreIdent{
ClusterID: n.clusterID.Get(),
NodeID: n.Descriptor.NodeID,
StoreID: firstStoreID,
StoreID: startID,
}
for _, eng := range engines {
if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil {
Expand Down

0 comments on commit ef35ee5

Please sign in to comment.