From 634e4274ca7a9b588536d714566a5673c6ed66b2 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 1 Apr 2020 14:01:11 +0200 Subject: [PATCH] server: simplify bootstrap via fatter init server \## Motivation Today, starting up a server is complicated. This is especially true when bootstrap is necessary. By this, we mean that either - no NodeID is known. This implies that none of the engines were initialized yet - the NodeID is known (i.e. at least one store is initialized), but a new engine was added. When the process first starts, and a NodeID is known, a ClusterID is also known (since they get persisted together, on every store). For the same reason, a persisted cluster version is known. In this case, we can start a fully initialized cluster, so this is the easy case. It is more difficult when no NodeID is known, in which case the server is just starting up for the first time, with its engines all blank. It needs to somehow allocate NodeIDs (and StoreIDs) which are then written to the engines. It also needs to, at least initially, use the lowest possible cluster version it can tolerate (after all, the cluster may actually run at that lowest version). Right now, there is a delicate dance: we thread late-bound ClusterID and NodeID containers all over the place, spin up a mostly dysfunctional Node, use its KV client to allocate NodeID and StoreIDs once this is possible - we need Gossip to have connected first - and update the containers. It is complex, error prone, and ossifies any code it touches. Cluster versions deserve an extra shout-out for complexity. Even if a cluster version was persisted, the node may have been decommissioned and the cluster upgraded. Much work went into our RPC layer to prevent connections between incompatible nodes, but there is no boot-time check that results in a swift and descriptive error - there will be a fatal error, originating from the RPC subsystem. One aim of our work is to simplify this by checking the version via an RPC before setting too many gears in motion. \## Context This marks the beginning of a series of PRs aiming at improving the startup code. Ultimately, the goal is to have the Node and all Stores bootstrapped as early as possible, but in particular before starting the KV or SQL server subsystems. Furthermore, we want to achieve this without relying on Gossip, to prepare for a split between the SQL and KV servers in the context of multitenancy support (SQL server won't be able to rely on Gossip, but will still need to announce itself to the KV servers). \## This PR This PR is an initial simplifying refactor that can help achieve these goals. The init server (which hosts the Bootstrap RPC) is given more responsibilities: it is now directly in charge of determining which, if any, engines are bootstrapped, and explicitly listens to Gossip as well as the Bootstrap RPC. It returns the cluster ID to the main server start-up goroutine when it is available. As a result, the main startup code has simplified, and a thread to be pulled on further has appeared and is called out in TODOs. Down the road (i.e. in later PRs), the init server will bootstrap a NodeID and StoreIDs even when joining an existing cluster. It will initially mimic/front-load the strategy taken by Node today, i.e. use a `kv.DB`, but ultimately will bypass Gossip completely and use a simple RPC call to ask the existing cluster to assign these IDs as needed. This RPC will also establish the active cluster version, which is required for SQL multi-tenancy, and generally follows the ideas in #32574. Release note: None --- pkg/server/config.go | 3 +- pkg/server/init.go | 222 +++++++++++++++++++------ pkg/server/node.go | 77 ++++----- pkg/server/node_test.go | 120 +++++++++++++- pkg/server/server.go | 330 ++++++++++++++++---------------------- pkg/server/server_test.go | 6 +- 6 files changed, 471 insertions(+), 287 deletions(-) diff --git a/pkg/server/config.go b/pkg/server/config.go index eba7e1f01563..e3252112d6cb 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -261,7 +261,8 @@ type Config struct { // The argument waitForInit indicates (iff true) that the // server is not bootstrapped yet, will not bootstrap itself and // will be waiting for an `init` command or accept bootstrapping - // from a joined node. + // from a joined node. It is set in an advisory fashion, that is, + // should be used for logging output only. ReadyFn func(waitForInit bool) // DelayedBootstrapFn is called if the boostrap process does not complete diff --git a/pkg/server/init.go b/pkg/server/init.go index f5b195ae43bb..122198d53d30 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -14,10 +14,23 @@ import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" ) +// ErrClusterInitialized is reported when the Boostrap RPC is ran on +// a node already part of an initialized cluster. +var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized") + // initServer manages the temporary init server used during // bootstrapping. type initServer struct { @@ -26,16 +39,153 @@ type initServer struct { // If set, a Bootstrap() call is rejected with this error. rejectErr error } - bootstrapReqCh chan struct{} - connected <-chan struct{} - shouldStop <-chan struct{} + bootstrapBlockCh chan struct{} // blocks Bootstrap() until ServeAndWait() is invoked + bootstrapReqCh chan *initState + + engs []storage.Engine // late-bound in ServeAndWait + + binaryVersion, binaryMinSupportedVersion roachpb.Version + bootstrapVersion clusterversion.ClusterVersion + bootstrapZoneConfig, bootstrapSystemZoneConfig *zonepb.ZoneConfig } -func newInitServer(connected <-chan struct{}, shouldStop <-chan struct{}) *initServer { +func newInitServer( + binaryVersion, binaryMinSupportedVersion roachpb.Version, + bootstrapVersion clusterversion.ClusterVersion, + bootstrapZoneConfig, bootstrapSystemZoneConfig *zonepb.ZoneConfig, +) *initServer { return &initServer{ - bootstrapReqCh: make(chan struct{}), - connected: connected, - shouldStop: shouldStop, + bootstrapReqCh: make(chan *initState, 1), + bootstrapBlockCh: make(chan struct{}), + + binaryVersion: binaryVersion, + binaryMinSupportedVersion: binaryMinSupportedVersion, + bootstrapVersion: bootstrapVersion, + bootstrapZoneConfig: bootstrapZoneConfig, + bootstrapSystemZoneConfig: bootstrapSystemZoneConfig, + } +} + +// initDiskState contains the part of initState that is read from stable +// storage. +// +// TODO(tbg): the above is a lie in the case in which we join an existing +// cluster. In that case, the state returned from ServeAndWait will have the +// clusterID set from Gossip (and there will be no NodeID). The plan is to +// allocate the IDs in ServeAndWait itself eventually, at which point the +// lie disappears. +type initDiskState struct { + // nodeID is zero if joining an existing cluster. + // + // TODO(tbg): see TODO above. + nodeID roachpb.NodeID + // All fields below are always set. + clusterID uuid.UUID + clusterVersion clusterversion.ClusterVersion + initializedEngines []storage.Engine + newEngines []storage.Engine +} + +// initState contains the cluster and node IDs as well as the stores, from which +// a CockroachDB server can be started up after ServeAndWait returns. +type initState struct { + initDiskState + // joined is true if this is a new node. Note that the initDiskState may + // reflect the result of bootstrapping a new cluster, i.e. it is not true + // that joined==true implies that the initDiskState shows no initialized + // engines. + // + // This flag should only be used for logging and reporting. A newly + // bootstrapped single-node cluster is functionally equivalent to one that + // restarted; any decisions should be made on persisted data instead of + // this flag. + // + // TODO(tbg): remove this bool. The Node can find out another way whether + // it just joined or restarted. + joined bool + // bootstrapped is true if a new cluster was initialized. If this is true, + // 'joined' above is also true. Usage of this field should follow that of + // 'joined' as well. + bootstrapped bool +} + +// ServeAndWait sets up the initServer to accept Bootstrap requests (which will +// block until then). It uses the provided engines and gossip to block until +// either a new cluster was bootstrapped or Gossip connected to an existing +// cluster. +// +// This method must be called only once. +func (s *initServer) ServeAndWait( + ctx context.Context, stopper *stop.Stopper, engs []storage.Engine, g *gossip.Gossip, +) (*initState, error) { + if s.engs != nil { + return nil, errors.New("cannot call ServeAndWait twice") + } + + s.engs = engs // Bootstrap() is still blocked, so no data race here + + inspectState, err := inspectEngines(ctx, engs, s.binaryVersion, s.binaryMinSupportedVersion) + if err != nil { + return nil, err + } + + if len(inspectState.initializedEngines) != 0 { + // We have a NodeID/ClusterID, so don't allow bootstrap. + if err := s.testOrSetRejectErr(ErrClusterInitialized); err != nil { + return nil, errors.Wrap(err, "error unexpectedly set previously") + } + // If anyone mistakenly tried to bootstrap, unblock them so they can get + // the above error. + close(s.bootstrapBlockCh) + + // In fact, it's crucial that we return early. This is because Gossip + // won't necessarily connect until a leaseholder for range 1 gossips the + // cluster ID, and all nodes in the cluster might be starting up right + // now. Without this return, we could have all nodes in the cluster + // deadlocked on g.Connected below. For similar reasons, we can't ever + // hope to initialize the newEngines below, for which we would need to + // increment a KV counter. + return &initState{ + initDiskState: *inspectState, + joined: false, + bootstrapped: false, + }, nil + } + + log.Info(ctx, "no stores bootstrapped and --join flag specified, awaiting init command or join with an already initialized node.") + close(s.bootstrapBlockCh) + + select { + case <-stopper.ShouldQuiesce(): + return nil, stop.ErrUnavailable + case state := <-s.bootstrapReqCh: + // Bootstrap() did its job. + log.Infof(ctx, "**** cluster %s has been created", state.clusterID) + return state, nil + case <-g.Connected: + // Gossip connected, that is, we know a ClusterID. Due to the early + // return above, we know that all of our engines are empty, i.e. we + // don't have a NodeID yet (and the cluster version is the minimum we + // support). Commence startup; the Node will realize it's short a NodeID + // and will request one. + // + // TODO(tbg): use a kv.DB to get NodeID and StoreIDs when necessary and + // set everything up here. This will take the Node out of that business + // entirely and means we'll need much fewer NodeID/ClusterIDContainers. + // (It's also so much simpler to think about). The RPC will also tell us + // a cluster version to use instead of the lowest possible one (reducing + // the short amount of time until the Gossip hook bumps the version); + // this doesn't fix anything but again, is simpler to think about. + clusterID, err := g.GetClusterID() + if err != nil { + return nil, err + } + inspectState.clusterID = clusterID + return &initState{ + initDiskState: *inspectState, + joined: true, + bootstrapped: false, + }, nil } } @@ -52,54 +202,28 @@ func (s *initServer) testOrSetRejectErr(err error) error { return nil } -type initServerResult int - -const ( - invalidInitResult initServerResult = iota - connectedToCluster - needBootstrap -) - -// awaitBootstrap blocks until the connected channel is closed or a Bootstrap() -// call is received. It returns true if a Bootstrap() call is received, -// instructing the caller to perform cluster bootstrap. It returns false if the -// connected channel is closed, telling the caller that someone else -// bootstrapped the cluster. Assuming that the connected channel comes from -// Gossip, this means that the cluster ID is now available in gossip. -func (s *initServer) awaitBootstrap() (initServerResult, error) { - select { - case <-s.connected: - _ = s.testOrSetRejectErr(fmt.Errorf("already connected to cluster")) - return connectedToCluster, nil - case <-s.bootstrapReqCh: - return needBootstrap, nil - case <-s.shouldStop: - err := fmt.Errorf("stop called while waiting to bootstrap") - _ = s.testOrSetRejectErr(err) - return invalidInitResult, err - } -} - -// Bootstrap unblocks an awaitBootstrap() call. If awaitBootstrap() hasn't been -// called yet, it will not block the next time it's called. +// Bootstrap implements the serverpb.Init service. Users set up a new +// CockroachDB server by calling this endpoint on *exactly one node* in the +// cluster (retrying only on that node). +// Attempting to bootstrap a node that was already bootstrapped will result in +// an error. // -// TODO(andrei): There's a race between gossip connecting and this initServer -// getting a Bootstrap request that allows both to succeed: there's no -// synchronization between gossip and this server and so gossip can succeed in -// propagating one cluster ID while this call succeeds in telling the Server to -// bootstrap and created a new cluster ID. We should fix it somehow by tangling -// the gossip.Server with this initServer such that they serialize access to a -// clusterID and decide among themselves a single winner for the race. +// NB: there is no protection against users erroneously bootstrapping multiple +// nodes. In that case, they end up with more than one cluster, and nodes +// panicking or refusing to connect to each other. func (s *initServer) Bootstrap( ctx context.Context, request *serverpb.BootstrapRequest, ) (response *serverpb.BootstrapResponse, err error) { + <-s.bootstrapBlockCh // block until ServeAndWait() is active + // NB: this isn't necessary since bootstrapCluster would fail, but this is + // cleaner. if err := s.testOrSetRejectErr(ErrClusterInitialized); err != nil { return nil, err } - close(s.bootstrapReqCh) + state, err := bootstrapCluster(ctx, s.engs, s.bootstrapVersion, s.bootstrapZoneConfig, s.bootstrapSystemZoneConfig) + if err != nil { + return nil, err + } + s.bootstrapReqCh <- state return &serverpb.BootstrapResponse{}, nil } - -// ErrClusterInitialized is reported when the Boostrap RPC is ran on -// a node already part of an initialized cluster. -var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized") diff --git a/pkg/server/node.go b/pkg/server/node.go index a9105e09ec74..dd22ce370064 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -207,7 +207,7 @@ func bootstrapCluster( bootstrapVersion clusterversion.ClusterVersion, defaultZoneConfig *zonepb.ZoneConfig, defaultSystemZoneConfig *zonepb.ZoneConfig, -) (uuid.UUID, error) { +) (*initState, error) { clusterID := uuid.MakeV4() // TODO(andrei): It'd be cool if this method wouldn't do anything to engines // other than the first one, and let regular node startup code deal with them. @@ -221,7 +221,7 @@ func bootstrapCluster( // Initialize the engine backing the store with the store ident and cluster // version. if err := kvserver.InitEngine(ctx, eng, sIdent, bootstrapVersion); err != nil { - return uuid.UUID{}, err + return nil, err } // Create first range, writing directly to engine. Note this does @@ -240,11 +240,21 @@ func bootstrapCluster( bootstrapVersion.Version, len(engines), splits, hlc.UnixNano(), ); err != nil { - return uuid.UUID{}, err + return nil, err } } } - return clusterID, nil + + state := &initState{ + initDiskState: initDiskState{ + nodeID: FirstNodeID, + clusterID: clusterID, + clusterVersion: bootstrapVersion, + initializedEngines: engines, + }, + joined: true, + } + return state, nil } // NewNode returns a new instance of Node. @@ -304,27 +314,6 @@ func (n *Node) AnnotateCtxWithSpan( return n.storeCfg.AmbientCtx.AnnotateCtxWithSpan(ctx, opName) } -func (n *Node) bootstrapCluster( - ctx context.Context, - engines []storage.Engine, - bootstrapVersion clusterversion.ClusterVersion, - defaultZoneConfig *zonepb.ZoneConfig, - defaultSystemZoneConfig *zonepb.ZoneConfig, -) error { - if n.initialBoot || n.clusterID.Get() != uuid.Nil { - return fmt.Errorf("cluster has already been initialized with ID %s", n.clusterID.Get()) - } - n.initialBoot = true - clusterID, err := bootstrapCluster(ctx, engines, bootstrapVersion, defaultZoneConfig, defaultSystemZoneConfig) - if err != nil { - return err - } - n.clusterID.Set(ctx, clusterID) - - log.Infof(ctx, "**** cluster %s has been created", clusterID) - return nil -} - func (n *Node) onClusterVersionChange(ctx context.Context, cv clusterversion.ClusterVersion) { if err := n.stores.OnClusterVersionChange(ctx, cv); err != nil { log.Fatal(ctx, errors.Wrapf(err, "updating cluster version to %v", cv)) @@ -339,37 +328,37 @@ func (n *Node) onClusterVersionChange(ctx context.Context, cv clusterversion.Clu func (n *Node) start( ctx context.Context, addr, sqlAddr net.Addr, - initializedEngines, emptyEngines []storage.Engine, + state initState, clusterName string, attrs roachpb.Attributes, locality roachpb.Locality, - cv clusterversion.ClusterVersion, localityAddress []roachpb.LocalityAddress, nodeDescriptorCallback func(descriptor roachpb.NodeDescriptor), ) error { - if err := clusterversion.Initialize(ctx, cv.Version, &n.storeCfg.Settings.SV); err != nil { + if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, &n.storeCfg.Settings.SV); err != nil { return err } // Obtaining the NodeID requires a dance of sorts. If the node has initialized // stores, the NodeID is persisted in each of them. If not, then we'll need to // use the KV store to get a NodeID assigned. - var nodeID roachpb.NodeID - if len(initializedEngines) > 0 { - firstIdent, err := kvserver.ReadStoreIdent(ctx, initializedEngines[0]) - if err != nil { - return err + n.initialBoot = state.joined + nodeID := state.nodeID + if nodeID == 0 { + if !state.joined { + log.Fatalf(ctx, "node has no NodeID, but claims to not be joining cluster") } - nodeID = firstIdent.NodeID - } else { - n.initialBoot = true // Wait until Gossip is connected before trying to allocate a NodeID. // This isn't strictly necessary but avoids trying to use the KV store // before it can possibly work. + // + // TODO(tbg): this should be obsolete as Gossip is always already + // connected at this point. Remove. if err := n.connectGossip(ctx); err != nil { return err } - // If no NodeID has been assigned yet, allocate a new node ID. + + // Allocate NodeID. ctxWithSpan, span := n.AnnotateCtxWithSpan(ctx, "alloc-node-id") newID, err := allocateNodeID(ctxWithSpan, n.storeCfg.DB) if err != nil { @@ -413,7 +402,7 @@ func (n *Node) start( n.storeCfg.ClosedTimestamp.Start(n.Descriptor.NodeID) // Create stores from the engines that were already bootstrapped. - for _, e := range initializedEngines { + for _, e := range state.initializedEngines { s := kvserver.NewStore(ctx, n.storeCfg, e, &n.Descriptor) if err := s.Start(ctx, n.stopper); err != nil { return errors.Errorf("failed to start store: %s", err) @@ -464,7 +453,7 @@ func (n *Node) start( return err } - if len(initializedEngines) != 0 { + if len(state.initializedEngines) != 0 { // Connect gossip before starting bootstrap. This will be necessary // to bootstrap new stores. We do it before initializing the NodeID // as well (if needed) to avoid awkward error messages until Gossip @@ -472,14 +461,16 @@ func (n *Node) start( // // NB: if we have no bootstrapped engines, then we've done this above // already. + // + // TODO(tbg): remove, see other caller to this method. if err := n.connectGossip(ctx); err != nil { return err } } // Bootstrap any uninitialized stores. - if len(emptyEngines) > 0 { - if err := n.bootstrapStores(ctx, emptyEngines, n.stopper); err != nil { + if len(state.newEngines) > 0 { + if err := n.bootstrapStores(ctx, state.newEngines, n.stopper); err != nil { return err } } @@ -504,8 +495,8 @@ func (n *Node) start( // bumped immediately, which would be possible if gossip got started earlier). n.startGossip(ctx, n.stopper) - allEngines := append([]storage.Engine(nil), initializedEngines...) - allEngines = append(allEngines, emptyEngines...) + allEngines := append([]storage.Engine(nil), state.initializedEngines...) + allEngines = append(allEngines, state.newEngines...) log.Infof(ctx, "%s: started with %v engine(s) and attributes %v", n, allEngines, attrs.Attrs) return nil } diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 67c584eae8c5..7f32b8ad7152 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -14,28 +14,141 @@ import ( "bytes" "context" "fmt" + "net" "reflect" "sort" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/gossip/resolver" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/server/status/statuspb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/netutil" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" + "google.golang.org/grpc" ) +// createTestNode creates an rpc server using the specified address, +// gossip instance, KV database and a node using the specified slice +// of engines. The server, clock and node are returned. If gossipBS is +// not nil, the gossip bootstrap address is set to gossipBS. +func createTestNode( + addr net.Addr, gossipBS net.Addr, t *testing.T, +) (*grpc.Server, net.Addr, kvserver.StoreConfig, *Node, *stop.Stopper) { + cfg := kvserver.TestStoreConfig(nil /* clock */) + st := cfg.Settings + + stopper := stop.NewStopper() + nodeRPCContext := rpc.NewContext( + log.AmbientContext{Tracer: cfg.Settings.Tracer}, nodeTestBaseContext, cfg.Clock, stopper, + cfg.Settings) + cfg.RPCContext = nodeRPCContext + cfg.ScanInterval = 10 * time.Hour + grpcServer := rpc.NewServer(nodeRPCContext) + cfg.Gossip = gossip.NewTest( + 0, + nodeRPCContext, + grpcServer, + stopper, + metric.NewRegistry(), + cfg.DefaultZoneConfig, + ) + retryOpts := base.DefaultRetryOptions() + retryOpts.Closer = stopper.ShouldQuiesce() + cfg.AmbientCtx.Tracer = st.Tracer + distSender := kvcoord.NewDistSender(kvcoord.DistSenderConfig{ + AmbientCtx: cfg.AmbientCtx, + Settings: st, + Clock: cfg.Clock, + RPCContext: nodeRPCContext, + RPCRetryOptions: &retryOpts, + NodeDialer: nodedialer.New(nodeRPCContext, gossip.AddressResolver(cfg.Gossip)), + }, cfg.Gossip) + tsf := kvcoord.NewTxnCoordSenderFactory( + kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: cfg.AmbientCtx, + Settings: st, + Clock: cfg.Clock, + Stopper: stopper, + }, + distSender, + ) + cfg.DB = kv.NewDB(cfg.AmbientCtx, tsf, cfg.Clock) + cfg.Transport = kvserver.NewDummyRaftTransport(st) + active, renewal := cfg.NodeLivenessDurations() + cfg.HistogramWindowInterval = metric.TestSampleInterval + cfg.NodeLiveness = kvserver.NewNodeLiveness( + cfg.AmbientCtx, + cfg.Clock, + cfg.DB, + nil, // engines; only used for stall checks + cfg.Gossip, + active, + renewal, + cfg.Settings, + cfg.HistogramWindowInterval, + ) + + kvserver.TimeUntilStoreDead.Override(&cfg.Settings.SV, 10*time.Millisecond) + cfg.StorePool = kvserver.NewStorePool( + cfg.AmbientCtx, + st, + cfg.Gossip, + cfg.Clock, + cfg.NodeLiveness.GetNodeCount, + kvserver.MakeStorePoolNodeLivenessFunc(cfg.NodeLiveness), + /* deterministic */ false, + ) + metricsRecorder := status.NewMetricsRecorder(cfg.Clock, cfg.NodeLiveness, nodeRPCContext, cfg.Gossip, st) + node := NewNode(cfg, metricsRecorder, metric.NewRegistry(), stopper, + kvcoord.MakeTxnMetrics(metric.TestSampleInterval), nil, /* execCfg */ + &nodeRPCContext.ClusterID) + roachpb.RegisterInternalServer(grpcServer, node) + node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(grpcServer) + ln, err := netutil.ListenAndServeGRPC(stopper, grpcServer, addr) + if err != nil { + t.Fatal(err) + } + if gossipBS != nil { + // Handle possibility of a :0 port specification. + if gossipBS.Network() == addr.Network() && gossipBS.String() == addr.String() { + gossipBS = ln.Addr() + } + r, err := resolver.NewResolverFromAddress(gossipBS) + if err != nil { + t.Fatal(err) + } + serverCfg := MakeConfig(context.TODO(), st) + serverCfg.GossipBootstrapResolvers = []resolver.Resolver{r} + filtered := serverCfg.FilterGossipBootstrapResolvers( + context.Background(), ln.Addr(), ln.Addr(), + ) + cfg.Gossip.Start(ln.Addr(), filtered) + } + return grpcServer, ln.Addr(), cfg, node, stopper +} + func formatKeys(keys []roachpb.Key) string { var buf bytes.Buffer for i, key := range keys { @@ -109,6 +222,8 @@ func TestBootstrapCluster(t *testing.T) { // TestBootstrapNewStore starts a cluster with two unbootstrapped // stores and verifies both stores are added and started. +// +// TODO(tbg): this test has rotted. func TestBootstrapNewStore(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -158,6 +273,8 @@ func TestBootstrapNewStore(t *testing.T) { // TestNodeJoin verifies a new node is able to join a bootstrapped // cluster consisting of one node. +// +// TODO(tbg): this test has rotted. func TestNodeJoin(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() @@ -249,8 +366,7 @@ func TestCorruptedClusterID(t *testing.T) { t.Fatal(err) } - var c base.ClusterIDContainer - _, _, _, err := inspectEngines(ctx, []storage.Engine{e}, cv.Version, cv.Version, &c) + _, err := inspectEngines(ctx, []storage.Engine{e}, cv.Version, cv.Version) if !testutils.IsError(err, `partially initialized`) { t.Fatal(err) } diff --git a/pkg/server/server.go b/pkg/server/server.go index 9870746d0c48..b9944fc1cfc3 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -173,23 +173,26 @@ type Server struct { startTime time.Time rpcContext *rpc.Context // The gRPC server on which the different RPC handlers will be registered. - grpc *grpcServer - gossip *gossip.Gossip - nodeDialer *nodedialer.Dialer - nodeLiveness *kvserver.NodeLiveness - storePool *kvserver.StorePool - tcsFactory *kvcoord.TxnCoordSenderFactory - distSender *kvcoord.DistSender - db *kv.DB - pgServer *pgwire.Server - distSQLServer *distsql.ServerImpl - node *Node - registry *metric.Registry - recorder *status.MetricsRecorder - runtime *status.RuntimeStatSampler - admin *adminServer - status *statusServer - authentication *authenticationServer + grpc *grpcServer + gossip *gossip.Gossip + nodeDialer *nodedialer.Dialer + nodeLiveness *kvserver.NodeLiveness + storePool *kvserver.StorePool + tcsFactory *kvcoord.TxnCoordSenderFactory + distSender *kvcoord.DistSender + db *kv.DB + pgServer *pgwire.Server + distSQLServer *distsql.ServerImpl + node *Node + registry *metric.Registry + recorder *status.MetricsRecorder + runtime *status.RuntimeStatSampler + admin *adminServer + status *statusServer + authentication *authenticationServer + // initServer receives requests to bootstrap a new cluster (via the + // Bootstrap RPC). This server will never accept requests if any of the + // stores is already initialized. initServer *initServer tsDB *ts.DB tsServer ts.Server @@ -700,13 +703,20 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { gw.RegisterService(s.grpc.Server) } - // TODO(andrei): We're creating an initServer even through the inspection of - // our engines in Server.Start() might reveal that we're already bootstrapped - // and so we don't need to accept a Bootstrap RPC. The creation of this server - // early means that a Bootstrap RPC might erroneously succeed. We should - // figure out early if our engines are bootstrapped and, if they are, create a - // dummy implementation of the InitServer that rejects all RPCs. - s.initServer = newInitServer(s.gossip.Connected, s.stopper.ShouldStop()) + bootstrapVersion := cfg.Settings.Version.BinaryVersion() + if knobs := cfg.TestingKnobs.Server; knobs != nil { + if ov := knobs.(*TestingKnobs).BootstrapVersionOverride; ov != (roachpb.Version{}) { + bootstrapVersion = ov + } + } + + s.initServer = newInitServer( + cfg.Settings.Version.BinaryVersion(), + cfg.Settings.Version.BinaryMinSupportedVersion(), + clusterversion.ClusterVersion{Version: bootstrapVersion}, + &cfg.DefaultZoneConfig, + &cfg.DefaultSystemZoneConfig, + ) serverpb.RegisterInitServer(s.grpc.Server, s.initServer) nodeInfo := sql.NodeInfo{ @@ -972,50 +982,51 @@ type ListenError struct { Addr string } -// inspectEngines goes through engines and checks which ones are bootstrapped -// and which ones are empty. -// It also calls SynthesizeClusterVersionFromEngines to get the cluster version, -// or to set it if no engines have a version in them already. +// inspectEngines goes through engines and populates in initDiskState. It also +// calls SynthesizeClusterVersionFromEngines, which selects and backfills the +// cluster version to all initialized engines. +// +// The initDiskState returned by this method will reflect a zero NodeID if none +// has been assigned yet (i.e. if none of the engines is initialized). func inspectEngines( ctx context.Context, engines []storage.Engine, binaryVersion, binaryMinSupportedVersion roachpb.Version, - clusterIDContainer *base.ClusterIDContainer, -) ( - bootstrappedEngines []storage.Engine, - emptyEngines []storage.Engine, - _ clusterversion.ClusterVersion, - _ error, -) { +) (*initDiskState, error) { + state := &initDiskState{} + for _, eng := range engines { storeIdent, err := kvserver.ReadStoreIdent(ctx, eng) if _, notBootstrapped := err.(*kvserver.NotBootstrappedError); notBootstrapped { - emptyEngines = append(emptyEngines, eng) + state.newEngines = append(state.newEngines, eng) continue } else if err != nil { - return nil, nil, clusterversion.ClusterVersion{}, err + return nil, err } + + if state.clusterID != uuid.Nil && state.clusterID != storeIdent.ClusterID { + return nil, errors.Errorf("conflicting store ClusterIDs: %s, %s", storeIdent.ClusterID, state.clusterID) + } + state.clusterID = storeIdent.ClusterID + if storeIdent.StoreID == 0 || storeIdent.NodeID == 0 || storeIdent.ClusterID == uuid.Nil { - return nil, nil, clusterversion.ClusterVersion{}, - errors.Errorf("partially initialized store: %+v", storeIdent) + return nil, errors.Errorf("partially initialized store: %+v", storeIdent) } - clusterID := clusterIDContainer.Get() - if storeIdent.ClusterID != uuid.Nil { - if clusterID == uuid.Nil { - clusterIDContainer.Set(ctx, storeIdent.ClusterID) - } else if storeIdent.ClusterID != clusterID { - return nil, nil, clusterversion.ClusterVersion{}, - errors.Errorf("conflicting store cluster IDs: %s, %s", storeIdent.ClusterID, clusterID) - } + + if state.nodeID != 0 && state.nodeID != storeIdent.NodeID { + return nil, errors.Errorf("conflicting store NodeIDs: %s, %s", storeIdent.NodeID, state.nodeID) } - bootstrappedEngines = append(bootstrappedEngines, eng) + state.nodeID = storeIdent.NodeID + + state.initializedEngines = append(state.initializedEngines, eng) } - cv, err := kvserver.SynthesizeClusterVersionFromEngines(ctx, bootstrappedEngines, binaryVersion, binaryMinSupportedVersion) + cv, err := kvserver.SynthesizeClusterVersionFromEngines(ctx, state.initializedEngines, binaryVersion, binaryMinSupportedVersion) if err != nil { - return nil, nil, clusterversion.ClusterVersion{}, err + return nil, err } - return bootstrappedEngines, emptyEngines, cv, nil + state.clusterVersion = cv + return state, nil } // listenerInfo is a helper used to write files containing various listener @@ -1445,21 +1456,13 @@ func (s *Server) Start(ctx context.Context) error { } } - bootstrappedEngines, _, _, err := inspectEngines( - ctx, s.engines, - s.cfg.Settings.Version.BinaryVersion(), - s.cfg.Settings.Version.BinaryMinSupportedVersion(), - &s.rpcContext.ClusterID) - if err != nil { - return errors.Wrap(err, "inspecting engines") - } - // Filter the gossip bootstrap resolvers based on the listen and // advertise addresses. listenAddrU := util.NewUnresolvedAddr("tcp", s.cfg.Addr) advAddrU := util.NewUnresolvedAddr("tcp", s.cfg.AdvertiseAddr) advSQLAddrU := util.NewUnresolvedAddr("tcp", s.cfg.SQLAdvertiseAddr) filtered := s.cfg.FilterGossipBootstrapResolvers(ctx, listenAddrU, advAddrU) + s.gossip.Start(advAddrU, filtered) log.Event(ctx, "started gossip") @@ -1467,25 +1470,45 @@ func (s *Server) Start(ctx context.Context) error { defer time.AfterFunc(30*time.Second, s.cfg.DelayedBootstrapFn).Stop() } - var hlcUpperBoundExists bool - // doBootstrap is set if we're the ones who bootstrapped the cluster. - var doBootstrap bool - if len(bootstrappedEngines) > 0 { - // The cluster was already initialized. - doBootstrap = false - if s.cfg.ReadyFn != nil { - s.cfg.ReadyFn(false /*waitForInit*/) - } + if len(s.cfg.GossipBootstrapResolvers) == 0 { + // If the node is started without join flags, attempt self-bootstrap. + // Note that we're not checking whether the node is already bootstrapped; + // if this is the case, Bootstrap will simply fail. + _ = s.stopper.RunAsyncTask(ctx, "bootstrap", func(ctx context.Context) { + _, err := s.initServer.Bootstrap(ctx, &serverpb.BootstrapRequest{}) + switch err { + case nil: + log.Infof(ctx, "**** add additional nodes by specifying --join=%s", s.cfg.AdvertiseAddr) + case ErrClusterInitialized: + default: + log.Fatalf(ctx, "bootstrap: %s", err) + } + }) + } - hlcUpperBound, err := kvserver.ReadMaxHLCUpperBound(ctx, bootstrappedEngines) - if err != nil { - log.Fatal(ctx, err) - } + // This opens the main listener. + startRPCServer(workersCtx) - if hlcUpperBound > 0 { - hlcUpperBoundExists = true - } + state, err := s.initServer.ServeAndWait(ctx, s.stopper, s.engines, s.gossip) + if err != nil { + log.Fatal(ctx, errors.Wrap(err, "during init")) + } + s.rpcContext.ClusterID.Set(ctx, state.clusterID) + // If there's no NodeID here, then we didn't just bootstrap. The Node will + // read its ID from the stores or request a new one via KV. + if state.nodeID != 0 { + s.rpcContext.NodeID.Set(ctx, state.nodeID) + } + + // NB: if this store is freshly bootstrapped (or no upper bound was + // persisted), hlcUpperBound will be zero. + hlcUpperBound, err := kvserver.ReadMaxHLCUpperBound(ctx, s.engines) + if err != nil { + log.Fatal(ctx, err) + } + + if hlcUpperBound > 0 { ensureClockMonotonicity( ctx, s.clock, @@ -1493,88 +1516,30 @@ func (s *Server) Start(ctx context.Context) error { hlcUpperBound, timeutil.SleepUntil, ) + } - // Ensure that any subsequent use of `cockroach init` will receive - // an error "the cluster was already initialized." - if _, err := s.initServer.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { - log.Fatal(ctx, err) - } - } else { - // We have no existing stores. We start an initServer and then wait for - // one of the following: - // - // - gossip connects (i.e. we're joining an existing cluster, perhaps - // freshly bootstrapped but this node doesn't have to know) - // - we auto-bootstrap (if no join flags were given) - // - a client bootstraps a cluster via node. + ready := make(chan struct{}) + if s.cfg.ReadyFn != nil { + // s.cfg.ReadyFn must be called in any case because the `start` + // command requires it to signal readiness to a process manager. // - // TODO(knz): This may need tweaking when #24118 is addressed. - - startRPCServer(workersCtx) - - ready := make(chan struct{}) - if s.cfg.ReadyFn != nil { - // s.cfg.ReadyFn must be called in any case because the `start` - // command requires it to signal readiness to a process manager. - // - // However we want to be somewhat precisely informative to the user - // about whether the node is waiting on init / join, or whether - // the join was successful straight away. So we spawn this goroutine - // and either: - // - its timer will fire after 2 seconds and we call ReadyFn(true) - // - bootstrap completes earlier and the ready chan gets closed, - // then we call ReadyFn(false). - go func() { - waitForInit := false - tm := time.After(2 * time.Second) - select { - case <-tm: - waitForInit = true - case <-ready: - } - s.cfg.ReadyFn(waitForInit) - }() - } - - log.Info(ctx, "no stores bootstrapped and --join flag specified, awaiting init command or join with an already initialized node.") - - if len(s.cfg.GossipBootstrapResolvers) == 0 { - // If the _unfiltered_ list of hosts from the --join flag is - // empty, then this node can bootstrap a new cluster. We disallow - // this if this node is being started with itself specified as a - // --join host, because that's too likely to be operator error. - if _, err := s.initServer.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { - return errors.Wrap(err, "while bootstrapping") - } - log.Infof(ctx, "**** add additional nodes by specifying --join=%s", s.cfg.AdvertiseAddr) - } - - initRes, err := s.initServer.awaitBootstrap() - close(ready) - if err != nil { - return err - } - - doBootstrap = initRes == needBootstrap - if doBootstrap { - if err := s.bootstrapCluster(ctx, s.bootstrapVersion()); err != nil { - return err + // However we want to be somewhat precisely informative to the user + // about whether the node is waiting on init / join, or whether + // the join was successful straight away. So we spawn this goroutine + // and either: + // - its timer will fire after 2 seconds and we call ReadyFn(true) + // - bootstrap completes earlier and the ready chan gets closed, + // then we call ReadyFn(false). + go func() { + waitForInit := false + tm := time.After(2 * time.Second) + select { + case <-tm: + waitForInit = true + case <-ready: } - } - } - - // This opens the main listener. - startRPCServer(workersCtx) - - // We ran this before, but might've bootstrapped in the meantime. This time - // we'll get the actual list of bootstrapped and empty engines. - bootstrappedEngines, emptyEngines, cv, err := inspectEngines( - ctx, s.engines, - s.cfg.Settings.Version.BinaryVersion(), - s.cfg.Settings.Version.BinaryMinSupportedVersion(), - &s.rpcContext.ClusterID) - if err != nil { - return errors.Wrap(err, "inspecting engines") + s.cfg.ReadyFn(waitForInit) + }() } // Record a walltime that is lower than the lowest hlc timestamp this current @@ -1588,11 +1553,10 @@ func (s *Server) Start(ctx context.Context) error { if err := s.node.start( ctx, advAddrU, advSQLAddrU, - bootstrappedEngines, emptyEngines, + *state, s.cfg.ClusterName, s.cfg.NodeAttributes, s.cfg.Locality, - cv, s.cfg.LocalityAddresses, s.execCfg.DistSQLPlanner.SetNodeDesc, ); err != nil { @@ -1600,7 +1564,7 @@ func (s *Server) Start(ctx context.Context) error { } log.Event(ctx, "started node") s.startPersistingHLCUpperBound( - hlcUpperBoundExists, + hlcUpperBound > 0, func(t int64) error { /* function to persist upper bound of HLC to all stores */ return s.node.SetHLCUpperBound(context.Background(), t) }, @@ -1660,6 +1624,25 @@ func (s *Server) Start(ctx context.Context) error { log.Event(ctx, "accepting connections") + if state.bootstrapped { + // If a new cluster is just starting up, force all the system ranges + // through the replication queue so they upreplicate as quickly as + // possible when a new node joins. Without this code, the upreplication + // would be up to the whim of the scanner, which might be too slow for + // new clusters. + // TODO(tbg): instead of this dubious band-aid we should make the + // replication queue reactive enough to avoid relying on the scanner + // alone. + var done bool + return s.node.stores.VisitStores(func(store *kvserver.Store) error { + if !done { + done = true + return store.ForceReplicationScanAndProcess() + } + return nil + }) + } + // Begin the node liveness heartbeat. Add a callback which records the local // store "last up" timestamp for every store whenever the liveness record is // updated. @@ -2071,37 +2054,6 @@ func (s *Server) startServeSQL( return nil } -func (s *Server) bootstrapVersion() roachpb.Version { - v := s.cfg.Settings.Version.BinaryVersion() - if knobs := s.cfg.TestingKnobs.Server; knobs != nil { - if ov := knobs.(*TestingKnobs).BootstrapVersionOverride; ov != (roachpb.Version{}) { - v = ov - } - } - return v -} - -func (s *Server) bootstrapCluster(ctx context.Context, bootstrapVersion roachpb.Version) error { - if err := s.node.bootstrapCluster( - ctx, s.engines, clusterversion.ClusterVersion{Version: bootstrapVersion}, - &s.cfg.DefaultZoneConfig, &s.cfg.DefaultSystemZoneConfig, - ); err != nil { - return err - } - // Force all the system ranges through the replication queue so they - // upreplicate as quickly as possible when a new node joins. Without this - // code, the upreplication would be up to the whim of the scanner, which - // might be too slow for new clusters. - done := false - return s.node.stores.VisitStores(func(store *kvserver.Store) error { - if !done { - done = true - return store.ForceReplicationScanAndProcess() - } - return nil - }) -} - func (s *Server) doDrain( ctx context.Context, modes []serverpb.DrainMode, setTo bool, ) ([]serverpb.DrainMode, error) { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 323b3cb47ad9..9173cbc58a10 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -684,9 +684,9 @@ func TestClusterIDMismatch(t *testing.T) { engines[i] = e } - _, _, _, err := inspectEngines( - context.TODO(), engines, roachpb.Version{}, roachpb.Version{}, &base.ClusterIDContainer{}) - expected := "conflicting store cluster IDs" + _, err := inspectEngines( + context.TODO(), engines, roachpb.Version{}, roachpb.Version{}) + expected := "conflicting store ClusterIDs" if !testutils.IsError(err, expected) { t.Fatalf("expected %s error, got %v", expected, err) }