Skip to content

Commit

Permalink
Merge #56299
Browse files Browse the repository at this point in the history
56299: server: initialize first store within the init server r=irfansharif a=irfansharif

...when joining an existing cluster. This diff adds some sanity around
how we bootstrap stores for nodes when we're informed by the existing
cluster what the first store ID should be. Previously we were
bootstrapping the first store asynchronously, and that is not what we
want.

We first observed the implications of doing so in #56263, which
attempted to remove the use of gossip in cluster/node ID distribution.
There we noticed that our somewhat haphazard structure around
initialization of stores could lead to doubly allocating store IDs
(#56272). We were inadvertently safeguarded against this, as described
in #56271, but this structure is still pretty confusing and needed
cleanup.

Now the new store initialization structure is the following:
```
- In the init code:
  - If we're being bootstrapped:
    - Initialize all the stores
  - If we're joining an existing cluster:
    - Only initialize first store, leave remaining stores to start up code
      later

- Later, when initializing additional new stores:
  - Allocate len(auxiliary engines) store IDs, and initialize them
    asynchronously.
```

This lets us avoid threading in the first store ID, and always rely on
the KV increment operation to tell us what the store ID should be for
the first additional store. We update TestAddNewStoresToExistingNodes to
test allocation behaviour with more than just two stores.

Eventually we could simplify the init code to only initialize the first
store when we're bootstrapping (there's a longstanding TODO from Andrei
to that effect), but it's not strictly needed. This PR unblocks #56263.

Release note: None


Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
craig[bot] and irfansharif committed Nov 5, 2020
2 parents 8ed55e4 + ef35ee5 commit 53e7cc5
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 130 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ go_library(
"split_trigger_helper.go",
"storage_services.pb.go",
"store.go",
"store_bootstrap.go",
"store_create_replica.go",
"store_init.go",
"store_merge.go",
"store_pool.go",
"store_raft.go",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ import (
func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error {
exIdent, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine %s is already bootstrapped with ident %s", eng, exIdent.String())
return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String())
}
if !errors.HasType(err, (*NotBootstrappedError)(nil)) {
return err
}

if err := checkCanInitializeEngine(ctx, eng); err != nil {
return errors.Wrap(err, "while trying to initialize store")
return errors.Wrap(err, "while trying to initialize engine")
}

batch := eng.NewBatch()
Expand All @@ -58,13 +58,13 @@ func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIden
return err
}
if err := batch.Commit(true /* sync */); err != nil {
return errors.Wrap(err, "persisting bootstrap data")
return errors.Wrap(err, "persisting engine initialization data")
}

return nil
}

// WriteInitialClusterData writes bootstrapping data to an engine. It creates
// WriteInitialClusterData writes initialization data to an engine. It creates
// system ranges (filling in meta1 and meta2) and the default zone config.
//
// Args:
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ go_test(
"//pkg/util/timeutil",
"//pkg/util/uuid",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/dustin/go-humanize",
"//vendor/github.com/gogo/protobuf/jsonpb",
"//vendor/github.com/gogo/protobuf/proto",
"//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime",
Expand Down
65 changes: 44 additions & 21 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ type initDiskState struct {
// TODO(tbg): see TODO above.
nodeID roachpb.NodeID
// All fields below are always set.
clusterID uuid.UUID
clusterVersion clusterversion.ClusterVersion
initializedEngines []storage.Engine
newEngines []storage.Engine
clusterID uuid.UUID
clusterVersion clusterversion.ClusterVersion
initializedEngines []storage.Engine
uninitializedEngines []storage.Engine
}

// initState contains the cluster and node IDs as well as the stores, from which
Expand All @@ -125,12 +125,10 @@ type initDiskState struct {
// running server will be wholly reconstructed if reloading from disk. It
// could if we always persisted any changes made to it back to disk. Right now
// when initializing after a successful join attempt, we don't persist back the
// disk state back to disk (we'd need to bootstrap the first store here, in the
// disk state back to disk (we'd need to initialize the first store here, in the
// same we do when `cockroach init`-ialized).
type initState struct {
initDiskState

firstStoreID roachpb.StoreID
}

// NeedsInit is like needsInitLocked, except it acquires the necessary locks.
Expand Down Expand Up @@ -205,7 +203,7 @@ func (s *initServer) ServeAndWait(
}
s.mu.Unlock()

log.Info(ctx, "no stores bootstrapped")
log.Info(ctx, "no stores initialized")
log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node")

joinCtx, cancelJoin := context.WithCancel(ctx)
Expand Down Expand Up @@ -311,10 +309,6 @@ func (s *initServer) ServeAndWait(
log.Infof(ctx, "joined cluster %s through join rpc", state.clusterID)
log.Infof(ctx, "received node ID %d", state.nodeID)

s.mu.Lock()
s.mu.inspectState.clusterID = state.clusterID
s.mu.Unlock()

return state, true, nil
case <-gossipConnectedCh:
// Ensure we're draining out the join attempt.
Expand Down Expand Up @@ -535,29 +529,58 @@ func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState
return nil, err
}

nodeID, storeID := roachpb.NodeID(resp.NodeID), roachpb.StoreID(resp.StoreID)
clusterVersion := clusterversion.ClusterVersion{Version: *resp.ActiveVersion}

s.mu.Lock()
defer s.mu.Unlock()

s.mu.inspectState.clusterID = clusterID
s.mu.inspectState.nodeID = roachpb.NodeID(resp.NodeID)
s.mu.inspectState.clusterVersion = clusterversion.ClusterVersion{Version: *resp.ActiveVersion}
diskState := *s.mu.inspectState
s.mu.Unlock()
s.mu.inspectState.nodeID = nodeID
s.mu.inspectState.clusterVersion = clusterVersion

state := &initState{
initDiskState: diskState,
firstStoreID: roachpb.StoreID(resp.StoreID),
if len(s.mu.inspectState.uninitializedEngines) < 1 {
log.Fatal(ctx, "expected to find at least one uninitialized engine")
}

// We initialize the very first store here, using the store ID handed to us.
sIdent := roachpb.StoreIdent{
ClusterID: clusterID,
NodeID: nodeID,
StoreID: storeID,
}

firstEngine := s.mu.inspectState.uninitializedEngines[0]
if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil {
return nil, err
}

// We construct the appropriate initState to indicate that we've initialized
// the first engine. We similarly trim it off the uninitializedEngines list
// so that when initializing auxiliary stores, if any, we know to avoid
// re-initializing the first store.
initializedEngines := []storage.Engine{firstEngine}
uninitializedEngines := s.mu.inspectState.uninitializedEngines[1:]
state := &initState{
initDiskState: initDiskState{
nodeID: nodeID,
clusterID: clusterID,
clusterVersion: clusterVersion,
initializedEngines: initializedEngines,
uninitializedEngines: uninitializedEngines,
},
}
return state, nil
}

func (s *initServer) tryBootstrapLocked(ctx context.Context) (*initState, error) {
// We use our binary version to bootstrap the cluster.
cv := clusterversion.ClusterVersion{Version: s.config.binaryVersion}
if err := kvserver.WriteClusterVersionToEngines(ctx, s.mu.inspectState.newEngines, cv); err != nil {
if err := kvserver.WriteClusterVersionToEngines(ctx, s.mu.inspectState.uninitializedEngines, cv); err != nil {
return nil, err
}
return bootstrapCluster(
ctx, s.mu.inspectState.newEngines, &s.config.defaultZoneConfig, &s.config.defaultSystemZoneConfig,
ctx, s.mu.inspectState.uninitializedEngines, &s.config.defaultZoneConfig, &s.config.defaultSystemZoneConfig,
)
}

Expand Down
43 changes: 32 additions & 11 deletions pkg/server/multi_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ package server_test

import (
"context"
"sort"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -71,14 +74,21 @@ func TestAddNewStoresToExistingNodes(t *testing.T) {
clusterID := tc.Server(0).ClusterID()
tc.Stopper().Stop(ctx)

// Add an additional store to each node.
// Add two additional stores to each node.
n1s2, n1cleanup2 := testutils.TempDir(t)
defer n1cleanup2()
n2s2, n2cleanup2 := testutils.TempDir(t)
defer n2cleanup2()
n3s2, n3cleanup2 := testutils.TempDir(t)
defer n3cleanup2()

n1s3, n1cleanup3 := testutils.TempDir(t)
defer n1cleanup3()
n2s3, n2cleanup3 := testutils.TempDir(t)
defer n2cleanup3()
n3s3, n3cleanup3 := testutils.TempDir(t)
defer n3cleanup3()

tcArgs = base.TestClusterArgs{
// We need ParallelStart since this is an existing cluster. If
// we started sequentially, then the first node would hang forever
Expand All @@ -89,17 +99,17 @@ func TestAddNewStoresToExistingNodes(t *testing.T) {
ServerArgsPerNode: map[int]base.TestServerArgs{
0: {
StoreSpecs: []base.StoreSpec{
{Path: n1s1}, {Path: n1s2},
{Path: n1s1}, {Path: n1s2}, {Path: n1s3},
},
},
1: {
StoreSpecs: []base.StoreSpec{
{Path: n2s1}, {Path: n2s2},
{Path: n2s1}, {Path: n2s2}, {Path: n2s3},
},
},
2: {
StoreSpecs: []base.StoreSpec{
{Path: n3s1}, {Path: n3s2},
{Path: n3s1}, {Path: n3s2}, {Path: n3s3},
},
},
},
Expand All @@ -115,23 +125,34 @@ func TestAddNewStoresToExistingNodes(t *testing.T) {
require.Equal(t, clusterID, srv.ClusterID())
}

// Ensure all nodes have 2 stores available.
// Ensure all nodes have all stores available, and each store has a unique
// store ID.
testutils.SucceedsSoon(t, func() error {
var storeIDs []roachpb.StoreID
for _, server := range tc.Servers {
var storeCount = 0

err := server.GetStores().(*kvserver.Stores).VisitStores(
if err := server.GetStores().(*kvserver.Stores).VisitStores(
func(s *kvserver.Store) error {
storeCount++
storeIDs = append(storeIDs, s.StoreID())
return nil
},
)
if err != nil {
); err != nil {
return errors.Errorf("failed to visit all nodes, got %v", err)
}

if storeCount != 2 {
return errors.Errorf("expected two stores to be available on node %v, got %v stores instead", server.NodeID(), storeCount)
if storeCount != 3 {
return errors.Errorf("expected 3 stores to be available on n%s, got %d stores instead", server.NodeID(), storeCount)
}
}

sort.Slice(storeIDs, func(i, j int) bool {
return storeIDs[i] < storeIDs[j]
})
for i := range storeIDs {
expStoreID := roachpb.StoreID(i + 1)
if storeIDs[i] != expStoreID {
t.Fatalf("expected the %s store to have storeID s%s, found s%s", humanize.Ordinal(i+1), expStoreID, storeIDs[i])
}
}

Expand Down
Loading

0 comments on commit 53e7cc5

Please sign in to comment.