From 7410cb1a2b867bcffbd06da555b68f76ad785ea1 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 10 Jul 2020 01:04:11 -0400 Subject: [PATCH] server: introduce join rpc for node id allocation This mostly follows the ideas in #32574, and serves as a crucial building block for #48843. Specifically this PR introduces a new Join RPC that new nodes can use, addressing already initialized nodes, to learn about the cluster ID and its node id. Previously joining nodes were responsible for allocating their own IDs and used to discover the cluster ID. By moving towards a more understandable flow of how nodes joins the cluster, we can build a few useful primitives on top of this: - we can prevent mismatched version nodes from joining the cluster - we can prevent decommissioned nodes from joining the cluster - we can add the liveness record for a given node as soon as it joins, which would simplify our liveness record handling code that is perennially concerned with missing liveness records The tiny bit of complexity in this PR comes from how we're able to migrate into this behavior from the old. To that end we retain the earlier gossip-based cluster ID discovery+node ID allocation for self behavior. Nodes with this patch will attempt to use this join RPC, if implemented on the addressed node, and fall back to using the previous behavior if not. It wasn't possible to use cluster versions for this migrations because this happens very early in the node start up process, and the version gating this change will not be active until much later in the crdb process lifetime. --- There are some leftover TODOs that I'm looking to address in this PR. They should be tiny, and be easy to retro-fit into what we have so far. Specifically I'm going to plumb the client address into the RPC so the server is able to generate backlinks (and solve the bidirectionality problem). I'm also going to try and add the liveness record for a joining node as part of the join rpc. Right now the tests verifying connectivity/bootstrap/join flags pass out of the box, but I'm going to try adding more randomized testing here to test full connectivity once I address these TODOs. Release note: None --- pkg/cli/init.go | 1 - pkg/cli/start.go | 2 +- pkg/clusterversion/cockroach_versions.go | 1 - pkg/cmd/roachtest/decommission.go | 4 + pkg/gossip/client.go | 2 +- pkg/server/config.go | 13 +- pkg/server/config_test.go | 7 +- pkg/server/grpc_server.go | 1 + pkg/server/init.go | 498 ++++++++++++++++++----- pkg/server/node.go | 4 + pkg/server/server.go | 65 +-- pkg/server/serverpb/init.pb.go | 482 +++++++++++++++++++++- pkg/server/serverpb/init.proto | 39 +- pkg/util/grpcutil/log.go | 8 +- 14 files changed, 953 insertions(+), 174 deletions(-) diff --git a/pkg/cli/init.go b/pkg/cli/init.go index d8e34c5626c2..3cc744e8c7ab 100644 --- a/pkg/cli/init.go +++ b/pkg/cli/init.go @@ -51,7 +51,6 @@ func runInit(cmd *cobra.Command, args []string) error { // Actually perform cluster initialization. c := serverpb.NewInitClient(conn) - if _, err = c.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { return err } diff --git a/pkg/cli/start.go b/pkg/cli/start.go index 6e8b7ee97ec2..99484cde6e76 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -670,7 +670,7 @@ func runStart(cmd *cobra.Command, args []string, disableReplication bool) error if waitForInit { log.Shout(ctx, log.Severity_INFO, - "initial startup completed.\n"+ + "initial startup completed\n"+ "Node will now attempt to join a running cluster, or wait for `cockroach init`.\n"+ "Client connections will be accepted after this completes successfully.\n"+ "Check the log file(s) for progress. ") diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index f14d38e4f5c8..0b7f67c4c9af 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -559,7 +559,6 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 14}, }, // Add new versions here (step two of two). - }) // TODO(irfansharif): clusterversion.binary{,MinimumSupported}Version diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index bd1b89c9282c..ae4d775494b2 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -319,6 +319,10 @@ func runDecommissionRecommission(ctx context.Context, t *test, c *cluster) { Multiplier: 2, } + // XXX: Hack, to see node come into liveness table. We should do this as + // part of the connect rpc. + time.Sleep(10 * time.Second) + // Partially decommission then recommission a random node, from another // random node. Run a couple of status checks while doing so. { diff --git a/pkg/gossip/client.go b/pkg/gossip/client.go index 9b8c536c160f..c0c0dd519263 100644 --- a/pkg/gossip/client.go +++ b/pkg/gossip/client.go @@ -275,7 +275,7 @@ func (c *client) handleResponse(ctx context.Context, g *Gossip, reply *Response) // Check whether this outgoing client is duplicating work already // being done by an incoming client, either because an outgoing // matches an incoming or the client is connecting to itself. - if nodeID := g.NodeID.Get(); nodeID == c.peerID { + if nodeID := g.NodeID.Get(); nodeID != 0 && nodeID == c.peerID { return errors.Errorf("stopping outgoing client to n%d (%s); loopback connection", c.peerID, c.addr) } else if g.hasIncomingLocked(c.peerID) && nodeID > c.peerID { // To avoid mutual shutdown, we only shutdown our client if our diff --git a/pkg/server/config.go b/pkg/server/config.go index 547e423e85ec..3a894dc39086 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -172,8 +172,9 @@ type KVConfig struct { // in zone configs. Attrs string - // JoinList is a list of node addresses that act as bootstrap hosts for - // connecting to the gossip network. + // JoinList is a list of node addresses that is used to form a network of KV + // servers. Assuming a connected graph, it suffices to initialize any server + // in the network. JoinList base.JoinListType // JoinPreferSRVRecords, if set, causes the lookup logic for the @@ -646,11 +647,13 @@ func (cfg *Config) InitNode(ctx context.Context) error { // FilterGossipBootstrapResolvers removes any gossip bootstrap resolvers which // match either this node's listen address or its advertised host address. -func (cfg *Config) FilterGossipBootstrapResolvers( - ctx context.Context, listen, advert net.Addr, -) []resolver.Resolver { +func (cfg *Config) FilterGossipBootstrapResolvers(ctx context.Context) []resolver.Resolver { + var listen, advert net.Addr + listen = util.NewUnresolvedAddr("tcp", cfg.Addr) + advert = util.NewUnresolvedAddr("tcp", cfg.AdvertiseAddr) filtered := make([]resolver.Resolver, 0, len(cfg.GossipBootstrapResolvers)) addrs := make([]string, 0, len(cfg.GossipBootstrapResolvers)) + for _, r := range cfg.GossipBootstrapResolvers { if r.Addr() == advert.String() || r.Addr() == listen.String() { if log.V(1) { diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index 71824616979d..d88f14ab868c 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip/resolver" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -191,10 +190,10 @@ func TestFilterGossipBootstrapResolvers(t *testing.T) { } cfg := MakeConfig(context.Background(), cluster.MakeTestingClusterSettings()) cfg.GossipBootstrapResolvers = resolvers + cfg.Addr = resolverSpecs[0] + cfg.AdvertiseAddr = resolverSpecs[2] - listenAddr := util.MakeUnresolvedAddr("tcp", resolverSpecs[0]) - advertAddr := util.MakeUnresolvedAddr("tcp", resolverSpecs[2]) - filtered := cfg.FilterGossipBootstrapResolvers(context.Background(), &listenAddr, &advertAddr) + filtered := cfg.FilterGossipBootstrapResolvers(context.Background()) if len(filtered) != 1 { t.Fatalf("expected one resolver; got %+v", filtered) } else if filtered[0].Addr() != resolverSpecs[1] { diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index 08627cbdc694..8de431df78bd 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -68,6 +68,7 @@ var rpcsAllowedWhileBootstrapping = map[string]struct{}{ "/cockroach.rpc.Heartbeat/Ping": {}, "/cockroach.gossip.Gossip/Gossip": {}, "/cockroach.server.serverpb.Init/Bootstrap": {}, + "/cockroach.server.serverpb.Init/Join": {}, "/cockroach.server.serverpb.Admin/Health": {}, } diff --git a/pkg/server/init.go b/pkg/server/init.go index 0e93ca3d25b5..cab5ec653644 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -13,97 +13,134 @@ package server import ( "context" "fmt" + "strings" + "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/gossip/resolver" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // ErrClusterInitialized is reported when the Bootstrap RPC is run on // a node that is already part of an initialized cluster. var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized") +// ErrNodeUninitialized is reported when the Join RPC is run against +// a node that itself is uninitialized. +var ErrNodeUninitialized = fmt.Errorf("node has not been initialized") + +// ErrJoinRPCUnimplemented is reported when the Join RPC is run against +// a node that does not know about the join RPC (i.e. it is running 20.1 or +// below). +// +// TODO(irfansharif): Remove this in 21.1. +var ErrJoinRPCUnimplemented = fmt.Errorf("node does not implement join rpc") + // initServer handles the bootstrapping process. It is instantiated early in the // server startup sequence to determine whether a NodeID and ClusterID are // available (true if and only if an initialized store is present). If all // engines are empty, either a new cluster needs to be started (via incoming -// Bootstrap RPC) or an existing one joined. Either way, the goal is to learn a -// ClusterID and NodeID (and initialize at least one store). All of this -// subtlety is encapsulated by the initServer, which offers a primitive -// ServeAndWait() after which point the startup code can assume that the -// Node/ClusterIDs are known. -// -// TODO(tbg): at the time of writing, when joining an existing cluster for the -// first time, the initServer provides only the clusterID. Fix this by giving -// the initServer a *kv.DB that it can use to assign a NodeID and StoreID, and -// later by switching to the connect RPC (#32574). +// Bootstrap RPC) or an existing one joined (via the outgoing Join RPC). Either +// way, the goal is to learn a ClusterID and NodeID (and initialize at least one +// store). All of this subtlety is encapsulated by the initServer, which offers +// a primitive ServeAndWait() after which point the startup code can assume that +// the Node/ClusterIDs are known. type initServer struct { + log.AmbientContext + // rpcContext embeds the fields needed by the RPC subsystem (the init server + // sends out Join RPCs during bootstrap). + rpcContext *rpc.Context + + // config houses a few configuration options needed by the init server. + config initServerCfg + mu struct { + // This mutex is grabbed during bootstrap and is used to serialized + // bootstrap attempts (and attempts to read whether or not his node has + // been bootstrapped). syncutil.Mutex - // If set, a Bootstrap() call is rejected with this error. + // If we encounter an unrecognized error during bootstrap, we use this + // field to block out future bootstrap attempts. rejectErr error } - // The version at which to bootstrap the cluster in Bootstrap(). - bootstrapVersion roachpb.Version - // The zone configs to bootstrap with. - bootstrapZoneConfig, bootstrapSystemZoneConfig *zonepb.ZoneConfig + // The state of the engines. This tells us whether the node is already - // bootstrapped. The goal of the initServer is to complete this by the - // time ServeAndWait returns. + // bootstrapped. The goal of the initServer is to stub this out by the time + // ServeAndWait returns. + // + // TODO(irfansharif): The ownership/access of this is a bit all over the + // place, and can be sanitized. inspectState *initDiskState - // If Bootstrap() succeeds, resulting initState will go here (to be consumed - // by ServeAndWait). + // If this CRDB node was `cockroach init`-ialized, the resulting init state + // will be passed through to this channel. bootstrapReqCh chan *initState + // If this CRDB node was able to join a bootstrapped cluster, the resulting + // init state will be passed through to this channel. + joinCh chan *initState + + // We hold onto a *kv.DB object to assign node IDs through the Join RPC. + db *kv.DB + + // resolvers is a list of node addresses that is used to form a connected + // graph/network of CRDB servers. Once a connected graph is constructed, it + // suffices for any node in the network to be initialized (which then + // propagates the cluster ID to the rest of the nodes). + // + // TODO(irfansharif): This last sentence is not yet true. Update this list + // is continually to maintain bidirectional network links. We want it so + // that it suffices to bootstrap n2 after having n2 point to n1 and n1 point + // to itself. n1 can learn about n2's address and attempt to connect back to + // it. We'll need to make sure that each server loops through its join list + // at least once so that the "links" are setup. We don't want get + // insta-bootstrapped and never set up those links. + resolvers []resolver.Resolver } -func setupInitServer( - ctx context.Context, - binaryVersion, binaryMinSupportedVersion roachpb.Version, - bootstrapVersion roachpb.Version, - bootstrapZoneConfig, bootstrapSystemZoneConfig *zonepb.ZoneConfig, - engines []storage.Engine, +func newInitServer( + actx log.AmbientContext, + rpcContext *rpc.Context, + inspectState *initDiskState, + db *kv.DB, + config initServerCfg, ) (*initServer, error) { - inspectState, err := inspectEngines(ctx, engines, binaryVersion, binaryMinSupportedVersion) - if err != nil { - return nil, err - } - s := &initServer{ + AmbientContext: actx, bootstrapReqCh: make(chan *initState, 1), - - inspectState: inspectState, - bootstrapVersion: bootstrapVersion, - bootstrapZoneConfig: bootstrapZoneConfig, - bootstrapSystemZoneConfig: bootstrapSystemZoneConfig, + inspectState: inspectState, + joinCh: make(chan *initState, 1), + rpcContext: rpcContext, + config: config, + db: db, + resolvers: config.resolvers(), } - - if len(inspectState.initializedEngines) > 0 { - // We have a NodeID/ClusterID, so don't allow bootstrap. - s.mu.rejectErr = ErrClusterInitialized - } - return s, nil } // initDiskState contains the part of initState that is read from stable // storage. // -// TODO(tbg): the above is a lie in the case in which we join an existing +// NB: The above is a lie in the case in which we join an existing mixed-version // cluster. In that case, the state returned from ServeAndWait will have the -// clusterID set from Gossip (and there will be no NodeID). The plan is to -// allocate the IDs in ServeAndWait itself eventually, at which point the -// lie disappears. +// clusterID set from Gossip (and there will be no NodeID). This is holdover +// behavior that can be removed in 21.1, at which point the lie disappears. type initDiskState struct { // nodeID is zero if joining an existing cluster. // @@ -149,20 +186,35 @@ func (s *initServer) NeedsInit() bool { // ServeAndWait waits until the server is ready to bootstrap. In the common case // of restarting an existing node, this immediately returns. When starting with // a blank slate (i.e. only empty engines), it waits for incoming Bootstrap -// request or for Gossip to connect (whichever happens earlier). +// request or for a successful Join RPC, whichever happens earlier. See [1]. // -// The returned initState may not reflect a bootstrapped cluster yet, but it -// is guaranteed to have a ClusterID set. +// The returned initState reflects a bootstrapped cluster (i.e. it has a cluster +// ID and a node ID for this server). See [2]. // // This method must be called only once. // +// NB: A gotcha that may not immediately be obvious is that we can never hope to +// have all stores initialized by the time ServeAndWait returns. This is because +// if this server is already bootstrapped, it might hold a replica of the range +// backing the StoreID allocating counter, and letting this server start may be +// necessary to restore quorum to that range. So in general, after this TODO, we +// will always leave this method with at least one store initialized, but not +// necessarily all. This is fine, since initializing additional stores later is +// easy. +// // TODO(tbg): give this a KV client and thus initialize at least one store in // all cases. +// +// [1]: In mixed version clusters it waits until Gossip connects (but this is +// slated to be removed in 21.1). +// [2]: This is not technically true for mixed version clusters where we leave +// the node ID unassigned until later, but this too is part of the deprecated +// init server behavior that is slated for removal in 21.1. func (s *initServer) ServeAndWait( - ctx context.Context, stopper *stop.Stopper, sv *settings.Values, g *gossip.Gossip, + ctx context.Context, stopper *stop.Stopper, sv *settings.Values, gossip *gossip.Gossip, ) (*initState, error) { + // If already bootstrapped, return early. if !s.NeedsInit() { - // If already bootstrapped, return early. return &initState{ initDiskState: *s.inspectState, joined: false, @@ -170,71 +222,124 @@ func (s *initServer) ServeAndWait( }, nil } - log.Info(ctx, "no stores bootstrapped and --join flag specified, awaiting "+ - "init command or join with an already initialized node.") - - select { - case <-stopper.ShouldQuiesce(): - return nil, stop.ErrUnavailable - case state := <-s.bootstrapReqCh: - // Bootstrap() did its job. At this point, we know that the cluster - // version will be bootstrapVersion (=state.clusterVersion.Version), but - // the version setting does not know yet (it was initialized as - // BinaryMinSupportedVersion because the engines were all - // uninitialized). We *could* just let the server start, and it would - // populate system.settings, which is then gossiped, and then the - // callback would update the version, but we take this shortcut to avoid - // having every freshly bootstrapped cluster spend time at an old - // cluster version. - if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, sv); err != nil { - return nil, err - } + log.Info(ctx, "no stores bootstrapped") + log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node") - log.Infof(ctx, "**** cluster %s has been created", state.clusterID) - return state, nil - case <-g.Connected: - // Gossip connected, that is, we know a ClusterID. Due to the early - // return above, we know that all of our engines are empty, i.e. we - // don't have a NodeID yet (and the cluster version is the minimum we - // support). Commence startup; the Node will realize it's short a NodeID - // and will request one. - // - // TODO(tbg): use a kv.DB to get NodeID and StoreIDs when necessary and - // set everything up here. This will take the Node out of that business - // entirely and means we'll need much fewer NodeID/ClusterIDContainers. - // (It's also so much simpler to think about). The RPC will also tell us - // a cluster version to use instead of the lowest possible one (reducing - // the short amount of time until the Gossip hook bumps the version); - // this doesn't fix anything but again, is simpler to think about. A - // gotcha that may not immediately be obvious is that we can never hope - // to have all stores initialized by the time ServeAndWait returns. This - // is because *if this server is already bootstrapped*, it might hold a - // replica of the range backing the StoreID allocating counter, and - // letting this server start may be necessary to restore quorum to that - // range. So in general, after this TODO, we will always leave this - // method with *at least one* store initialized, but not necessarily - // all. This is fine, since initializing additional stores later is - // easy. - clusterID, err := g.GetClusterID() - if err != nil { + joinCtx, cancelJoin := context.WithCancel(ctx) + defer cancelJoin() + + errCh := make(chan error, 1) + if err := stopper.RunTask(joinCtx, "init server: join loop", func(joinCtx context.Context) { + stopper.RunWorker(joinCtx, func(joinCtx context.Context) { + errCh <- s.startJoinLoop(joinCtx, stopper) + }) + }); err != nil { + return nil, err + } + + // gossipConnectedCh is used as a place holder for gossip.Connected. We + // don't trigger on gossip connectivity unless we have to, favoring instead + // the join RPC to discover the cluster ID (and node ID). If we're in a + // mixed-version cluster however (with 20.1 nodes), we'll fall back to using + // the legacy gossip connectivity mechanism to discover the cluster ID. + var gossipConnectedCh chan struct{} + for { + select { + case state := <-s.bootstrapReqCh: + // Bootstrap() did its job. At this point, we know that the cluster + // version will be the bootstrap version (aka the binary version[1]), + // but the version setting does not know yet (it was initialized as + // BinaryMinSupportedVersion because the engines were all + // uninitialized). We *could* just let the server start, and it + // would populate system.settings, which is then gossiped, and then + // the callback would update the version, but we take this shortcut + // to avoid having every freshly bootstrapped cluster spend time at + // an old cluster version. + // + // [1]: See the top-level comment in pkg/clusterversion to make + // sense of the many versions of...versions. + if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, sv); err != nil { + return nil, err + } + + log.Infof(ctx, "**** cluster %s has been created", state.clusterID) + log.Infof(ctx, "**** allocated node id %d for self", state.nodeID) + + s.inspectState.clusterID = state.clusterID + s.inspectState.initializedEngines = state.initializedEngines + + // Ensure we're draining out join attempt. + cancelJoin() + if err := <-errCh; err != nil { + log.Errorf(ctx, "**** error draining join thread: %v", err) + } + + return state, nil + case state := <-s.joinCh: + // TODO(irfansharif): Right now this doesn't actually do anything. + // We should have the Join RPC funnel in the right version to use. + if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, sv); err != nil { + return nil, err + } + + log.Infof(ctx, "**** joined cluster %s through join rpc", state.clusterID) + log.Infof(ctx, "**** received node id %d for self", state.nodeID) + + s.inspectState.clusterID = state.clusterID + // Ensure we're draining out join attempt. + cancelJoin() + if err := <-errCh; err != nil { + log.Errorf(ctx, "**** error draining join thread: %v", err) + } + + return state, nil + case <-gossipConnectedCh: + // We're in a mixed-version cluster, so we retain the legacy + // behavior of retrieving the cluster ID and deferring node ID + // allocation (happens in (*Node).start). + // + // TODO(irfansharif): Remove this in 21.1. + + // Gossip connected, that is, we know a ClusterID. Due to the early + // return above, we know that all of our engines are empty, i.e. we + // don't have a NodeID yet (and the cluster version is the minimum we + // support). Commence startup; the Node will realize it's short a + // NodeID and will request one. + clusterID, err := gossip.GetClusterID() + if err != nil { + return nil, err + } + + s.inspectState.clusterID = clusterID + state := &initState{ + initDiskState: *s.inspectState, + joined: true, + bootstrapped: false, + } + log.Infof(ctx, "**** joined cluster %s through gossip (legacy behavior)", state.clusterID) + return state, nil + case err := <-errCh: + if errors.Is(err, ErrJoinRPCUnimplemented) { + // We're in a mixed-version cluster, we're going to wire up the + // gossip connectivity mechanism to discover the cluster ID. + gossipConnectedCh = gossip.Connected + continue + } + log.Errorf(ctx, "error in attempting to join: %v", err) return nil, err + case <-stopper.ShouldQuiesce(): + return nil, stop.ErrUnavailable } - s.inspectState.clusterID = clusterID - return &initState{ - initDiskState: *s.inspectState, - joined: true, - bootstrapped: false, - }, nil } } var errInternalBootstrapError = errors.New("unable to bootstrap due to internal error") -// Bootstrap implements the serverpb.Init service. Users set up a new -// CockroachDB server by calling this endpoint on *exactly one node* in the -// cluster (retrying only on that node). -// Attempting to bootstrap a node that was already bootstrapped will result in -// an error. +// Bootstrap implements the serverpb.Init service. Users set up a new CRDB +// cluster by calling this endpoint on exactly one node in the cluster +// (typically retrying only on that node). This endpoint is what powers +// `cockroach init`. Attempting to bootstrap a node that was already +// bootstrapped will result in an `ErrClusterInitialized` error. // // NB: there is no protection against users erroneously bootstrapping multiple // nodes. In that case, they end up with more than one cluster, and nodes @@ -248,6 +353,10 @@ func (s *initServer) Bootstrap( s.mu.Lock() defer s.mu.Unlock() + if !s.NeedsInit() { + return nil, ErrClusterInitialized + } + if s.mu.rejectErr != nil { return nil, s.mu.rejectErr } @@ -258,18 +367,150 @@ func (s *initServer) Bootstrap( s.mu.rejectErr = errInternalBootstrapError return nil, s.mu.rejectErr } - s.mu.rejectErr = ErrClusterInitialized + s.bootstrapReqCh <- state return &serverpb.BootstrapResponse{}, nil } +// Join implements the serverpb.Init service. This is the "connectivity" API; +// individual CRDB servers are passed in a --join list and the join targets are +// addressed through this API. +// +// TODO(irfansharif): Perhaps we could opportunistically create a liveness +// record here so as to no longer have to worry about the liveness record not +// existing for a given node. +func (s *initServer) Join( + ctx context.Context, _ *serverpb.JoinNodeRequest, +) (*serverpb.JoinNodeResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.NeedsInit() { + // Server has not been bootstrapped yet + return nil, ErrNodeUninitialized + } + + ctxWithSpan, span := s.AnnotateCtxWithSpan(ctx, "alloc-node-id") + defer span.Finish() + + nodeID, err := allocateNodeID(ctxWithSpan, s.db) + if err != nil { + return nil, err + } + + log.Infof(ctxWithSpan, "**** allocated new node id %d", nodeID) + return &serverpb.JoinNodeResponse{ + ClusterID: s.inspectState.clusterID.GetBytes(), + NodeID: int32(nodeID), + }, nil +} + +func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) error { + dialOpts, err := s.rpcContext.GRPCDialOptions() + if err != nil { + return err + } + + var conns []*grpc.ClientConn + for _, r := range s.resolvers { + conn, err := grpc.DialContext(ctx, r.Addr(), dialOpts...) + if err != nil { + return err + } + + stopper.RunWorker(ctx, func(ctx context.Context) { + <-stopper.ShouldQuiesce() + if err := conn.Close(); err != nil { + log.Fatalf(ctx, "%v", err) + } + }) + conns = append(conns, conn) + } + + const joinRPCBackoff = time.Second + var tickChan <-chan time.Time + { + ticker := time.NewTicker(joinRPCBackoff) + tickChan = ticker.C + defer ticker.Stop() + } + + for idx := 0; ; idx = (idx + 1) % len(conns) { + select { + case <-tickChan: + success, err := s.attemptJoin(ctx, s.resolvers[idx].Addr(), conns[idx]) + if err != nil { + return err + } + if !success { + continue + } + return nil + case <-ctx.Done(): + return nil + case <-stopper.ShouldQuiesce(): + return nil + } + } +} + +func (s *initServer) attemptJoin( + ctx context.Context, addr string, conn *grpc.ClientConn, +) (success bool, err error) { + initClient := serverpb.NewInitClient(conn) + req := &serverpb.JoinNodeRequest{ + MinSupportedVersion: &clusterversion.TestingBinaryMinSupportedVersion, + Addr: s.config.advertiseAddr(), + } + resp, err := initClient.Join(ctx, req) + if err != nil { + if strings.Contains(err.Error(), ErrNodeUninitialized.Error()) { + log.Warningf(ctx, "node running on %s is itself uninitialized, retrying..", addr) + return false, nil + } + + if grpcutil.ConnectionRefusedRe.MatchString(err.Error()) { + log.Warningf(ctx, "unable to connect to %s, retrying..", addr) + return false, nil + } + + // If the target node does not implement the join RPC, we're in a + // mixed-version cluster and are talking to a v20.1 node. We error out + // so the init server knows to fall back on the gossip-based discovery + // mechanism for the clusterID. + if code := status.Code(errors.Cause(err)); code == codes.Unimplemented && + strings.Contains(err.Error(), `unknown method Join`) { + log.Warningf(ctx, "%s running an older version", addr) + return false, ErrJoinRPCUnimplemented + } + + return false, err + } + + clusterID, err := uuid.FromBytes(resp.ClusterID) + if err != nil { + return false, err + } + + s.inspectState.clusterID = clusterID + s.inspectState.nodeID = roachpb.NodeID(resp.NodeID) + state := &initState{ + initDiskState: *s.inspectState, + joined: true, + bootstrapped: false, + } + + s.joinCh <- state + return true, nil +} + func (s *initServer) tryBootstrap(ctx context.Context) (*initState, error) { - cv := clusterversion.ClusterVersion{Version: s.bootstrapVersion} + cv := clusterversion.ClusterVersion{Version: s.config.bootstrapVersion()} if err := kvserver.WriteClusterVersionToEngines(ctx, s.inspectState.newEngines, cv); err != nil { return nil, err } return bootstrapCluster( - ctx, s.inspectState.newEngines, s.bootstrapZoneConfig, s.bootstrapSystemZoneConfig, + ctx, s.inspectState.newEngines, s.config.defaultZoneConfig(), s.config.defaultSystemZoneConfig(), ) } @@ -278,3 +519,40 @@ func (s *initServer) tryBootstrap(ctx context.Context) (*initState, error) { func (s *initServer) DiskClusterVersion() clusterversion.ClusterVersion { return s.inspectState.clusterVersion } + +// initServerCfg is a thin wrapper around the server Config object, exposing +// only the fields needed by the init server. +type initServerCfg struct { + wrapped Config +} + +// bootstrapVersion returns the version at which to bootstrap the cluster in +// Bootstrap(). +func (c *initServerCfg) bootstrapVersion() roachpb.Version { + bootstrapVersion := c.wrapped.Settings.Version.BinaryVersion() + if knobs := c.wrapped.TestingKnobs.Server; knobs != nil { + if ov := knobs.(*TestingKnobs).BootstrapVersionOverride; ov != (roachpb.Version{}) { + bootstrapVersion = ov + } + } + return bootstrapVersion +} + +// defaultZoneConfig returns the zone config to bootstrap with. +func (c *initServerCfg) defaultZoneConfig() *zonepb.ZoneConfig { + return &c.wrapped.DefaultZoneConfig +} + +// defaultSystemZoneConfig returns the zone config to bootstrap system ranges +// with. +func (c *initServerCfg) defaultSystemZoneConfig() *zonepb.ZoneConfig { + return &c.wrapped.DefaultSystemZoneConfig +} + +func (c *initServerCfg) resolvers() []resolver.Resolver { + return c.wrapped.FilterGossipBootstrapResolvers(context.Background()) +} + +func (c *initServerCfg) advertiseAddr() string { + return c.wrapped.AdvertiseAddr +} diff --git a/pkg/server/node.go b/pkg/server/node.go index d36035a8b021..f3854535cb47 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -347,6 +347,10 @@ func (n *Node) start( n.initialBoot = state.joined nodeID := state.nodeID if nodeID == 0 { + // TODO(irfansharif): This codepath exists to maintain the legacy + // behavior of node ID allocation that was triggered on gossip + // connectivity. This was replaced by the Join RPC in 20.2, and can be + // removed in 21.1. if !state.joined { log.Fatalf(ctx, "node has no NodeID, but claims to not be joining cluster") } diff --git a/pkg/server/server.go b/pkg/server/server.go index 3fd7b214b273..5be44911c1b1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1017,6 +1017,17 @@ func (s *Server) startPersistingHLCUpperBound( // // The passed context can be used to trace the server startup. The context // should represent the general startup operation. +// +// XXX: Write a summary. What are _all_ the components I care about? +// - http server/admin UI +// - engines +// - rpc servers (Batch, Rangefeed, SQL, admin, status, authentication, +// tsServer) +// - init server +// - gossip +// - cluster version +// - /health end point +// - bootstrap func (s *Server) Start(ctx context.Context) error { ctx = s.AnnotateCtx(ctx) @@ -1070,29 +1081,25 @@ func (s *Server) Start(ctx context.Context) error { blobs.NewBlobClientFactory(s.nodeIDContainer.Get(), s.nodeDialer, s.st.ExternalIODir), &fileTableInternalExecutor, s.db) - bootstrapVersion := s.cfg.Settings.Version.BinaryVersion() - if knobs := s.cfg.TestingKnobs.Server; knobs != nil { - if ov := knobs.(*TestingKnobs).BootstrapVersionOverride; ov != (roachpb.Version{}) { - bootstrapVersion = ov - } - } - // Set up the init server. We have to do this relatively early because we // can't call RegisterInitServer() after `grpc.Serve`, which is called in // startRPCServer (and for the loopback grpc-gw connection). - initServer, err := setupInitServer( + initConfig := initServerCfg{wrapped: s.cfg} + inspectState, err := inspectEngines( ctx, + s.engines, s.cfg.Settings.Version.BinaryVersion(), s.cfg.Settings.Version.BinaryMinSupportedVersion(), - bootstrapVersion, - &s.cfg.DefaultZoneConfig, - &s.cfg.DefaultSystemZoneConfig, - s.engines, ) if err != nil { return err } + initServer, err := newInitServer(s.cfg.AmbientCtx, s.rpcContext, inspectState, s.db, initConfig) + if err != nil { + return err + } + { // Set up the callback that persists gossiped version bumps to the // engines. The invariant we uphold here is that the bump needs to be @@ -1283,13 +1290,24 @@ func (s *Server) Start(ctx context.Context) error { } } - // Filter the gossip bootstrap resolvers based on the listen and - // advertise addresses. - listenAddrU := util.NewUnresolvedAddr("tcp", s.cfg.Addr) + // Filter out self from the gossip bootstrap resolvers. + filtered := s.cfg.FilterGossipBootstrapResolvers(ctx) + advAddrU := util.NewUnresolvedAddr("tcp", s.cfg.AdvertiseAddr) advSQLAddrU := util.NewUnresolvedAddr("tcp", s.cfg.SQLAdvertiseAddr) advTenantAddrU := util.NewUnresolvedAddr("tcp", s.cfg.TenantAdvertiseAddr) - filtered := s.cfg.FilterGossipBootstrapResolvers(ctx, listenAddrU, advAddrU) + + // We need gossip to get spun up before the init server (which internally + // makes use of KV to allocate node IDs_. In an ideal world we'd be able to + // only spin up the very small subset of KV we need to allocate node IDs (or + // you can imagine being able to fetch node IDs from elsewhere entirely), + // but today there's this awkward dance best illustrated by the following + // example: + // + // In a two node cluster where n1 is started+bootstrapped, when n2 + // contacts n1 to allocate its node id, n1 actually needs gossip + // connectivity with n2 to have KV functioning, in order to allocate + // the node id for it. s.gossip.Start(advAddrU, filtered) log.Event(ctx, "started gossip") @@ -1305,8 +1323,6 @@ func (s *Server) Start(ctx context.Context) error { if _, err := initServer.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { return err } - } else { - log.Info(ctx, "awaiting init command or join with an already initialized node.") } // Set up calling s.cfg.ReadyFn at the right time. Essentially, this call @@ -1330,9 +1346,9 @@ func (s *Server) Start(ctx context.Context) error { } // This opens the main listener. When the listener is open, we can call - // initServerReadyFn since any request initiated to the initServer at that - // point will reach it once ServeAndWait starts handling the queue of incoming - // connections. + // onInitServerReady since any request initiated to the initServer at that + // point will reach it once ServeAndWait starts handling the queue of + // incoming connections. startRPCServer(workersCtx) onInitServerReady() state, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV, s.gossip) @@ -1343,6 +1359,9 @@ func (s *Server) Start(ctx context.Context) error { s.rpcContext.ClusterID.Set(ctx, state.clusterID) // If there's no NodeID here, then we didn't just bootstrap. The Node will // read its ID from the stores or request a new one via KV. + // + // TODO(irfansharif): Delete this once we 20.2 is cut. This only exists to + // be compatible with 20.1 clusters. if state.nodeID != 0 { s.rpcContext.NodeID.Set(ctx, state.nodeID) } @@ -1372,7 +1391,7 @@ func (s *Server) Start(ctx context.Context) error { // demonstrate that we're not doing anything functional here (and to // prevent bugs during further refactors). if s.rpcContext.ClusterID.Get() == uuid.Nil { - return errors.New("gossip should already be connected") + return errors.New("programming error: expected cluster id to be populated in rpc context") } unregister := s.gossip.RegisterCallback(gossip.KeyClusterID, func(string, roachpb.Value) { clusterID, err := s.gossip.GetClusterID() @@ -1391,7 +1410,7 @@ func (s *Server) Start(ctx context.Context) error { // but this gossip only happens once the first range has a leaseholder, i.e. // when a quorum of nodes has gone fully operational. _ = s.stopper.RunAsyncTask(ctx, "connect-gossip", func(ctx context.Context) { - log.Infof(ctx, "connecting to gossip network to verify cluster ID %q", state.clusterID) + log.Infof(ctx, "connecting to gossip network to verify cluster id %q", state.clusterID) select { case <-s.gossip.Connected: log.Infof(ctx, "node connected via gossip") diff --git a/pkg/server/serverpb/init.pb.go b/pkg/server/serverpb/init.pb.go index 6a132146a2c6..fbc41f4dd300 100644 --- a/pkg/server/serverpb/init.pb.go +++ b/pkg/server/serverpb/init.pb.go @@ -6,6 +6,7 @@ package serverpb import proto "github.com/gogo/protobuf/proto" import fmt "fmt" import math "math" +import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" import ( context "context" @@ -32,7 +33,7 @@ func (m *BootstrapRequest) Reset() { *m = BootstrapRequest{} } func (m *BootstrapRequest) String() string { return proto.CompactTextString(m) } func (*BootstrapRequest) ProtoMessage() {} func (*BootstrapRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_init_5d06be670cd568de, []int{0} + return fileDescriptor_init_5ab0d48daa4548e1, []int{0} } func (m *BootstrapRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -64,7 +65,7 @@ func (m *BootstrapResponse) Reset() { *m = BootstrapResponse{} } func (m *BootstrapResponse) String() string { return proto.CompactTextString(m) } func (*BootstrapResponse) ProtoMessage() {} func (*BootstrapResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_init_5d06be670cd568de, []int{1} + return fileDescriptor_init_5ab0d48daa4548e1, []int{1} } func (m *BootstrapResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -89,9 +90,93 @@ func (m *BootstrapResponse) XXX_DiscardUnknown() { var xxx_messageInfo_BootstrapResponse proto.InternalMessageInfo +// JoinNodeRequest is used to specify to the server node what the client's +// MinimumSupportedVersion is. If it's not compatible with the rest of the +// cluster, the join attempt is refused. +type JoinNodeRequest struct { + MinSupportedVersion *roachpb.Version `protobuf:"bytes,1,opt,name=min_supported_version,json=minSupportedVersion,proto3" json:"min_supported_version,omitempty"` + // TODO(irfansharif): Use this field to provide the client's address so that + // the server is able to reach back to it, setting up bidirectional network + // links. + Addr string `protobuf:"bytes,2,opt,name=addr,proto3" json:"addr,omitempty"` +} + +func (m *JoinNodeRequest) Reset() { *m = JoinNodeRequest{} } +func (m *JoinNodeRequest) String() string { return proto.CompactTextString(m) } +func (*JoinNodeRequest) ProtoMessage() {} +func (*JoinNodeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_init_5ab0d48daa4548e1, []int{2} +} +func (m *JoinNodeRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *JoinNodeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *JoinNodeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_JoinNodeRequest.Merge(dst, src) +} +func (m *JoinNodeRequest) XXX_Size() int { + return m.Size() +} +func (m *JoinNodeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_JoinNodeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_JoinNodeRequest proto.InternalMessageInfo + +// JoinNodeResponse informs the joining node what the cluster id is, and what +// node id was allocated to it. +// +// TODO(irfansharif): We should use this RPC to tell us the right cluster +// version to use (instead of using the minimum possible version and relying on +// gossip to bump to for us). +// TODO(irfansharif): We should use this RPC to also generate store IDs, instead +// of having each node do it for itself after being handed out a node ID. +type JoinNodeResponse struct { + ClusterID []byte `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` + NodeID int32 `protobuf:"varint,2,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` +} + +func (m *JoinNodeResponse) Reset() { *m = JoinNodeResponse{} } +func (m *JoinNodeResponse) String() string { return proto.CompactTextString(m) } +func (*JoinNodeResponse) ProtoMessage() {} +func (*JoinNodeResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_init_5ab0d48daa4548e1, []int{3} +} +func (m *JoinNodeResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *JoinNodeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *JoinNodeResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_JoinNodeResponse.Merge(dst, src) +} +func (m *JoinNodeResponse) XXX_Size() int { + return m.Size() +} +func (m *JoinNodeResponse) XXX_DiscardUnknown() { + xxx_messageInfo_JoinNodeResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_JoinNodeResponse proto.InternalMessageInfo + func init() { proto.RegisterType((*BootstrapRequest)(nil), "cockroach.server.serverpb.BootstrapRequest") proto.RegisterType((*BootstrapResponse)(nil), "cockroach.server.serverpb.BootstrapResponse") + proto.RegisterType((*JoinNodeRequest)(nil), "cockroach.server.serverpb.JoinNodeRequest") + proto.RegisterType((*JoinNodeResponse)(nil), "cockroach.server.serverpb.JoinNodeResponse") } // Reference imports to suppress errors if they are not otherwise used. @@ -106,8 +191,12 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type InitClient interface { - // Bootstrap an uninitialized cluster. + // Bootstrap an uninitialized cluster (inter-node links set up through the + // --join flags). Bootstrap(ctx context.Context, in *BootstrapRequest, opts ...grpc.CallOption) (*BootstrapResponse, error) + // Join a bootstrapped cluster. If the target node is itself not part of a + // bootstrapped cluster, an appropriate error is returned. + Join(ctx context.Context, in *JoinNodeRequest, opts ...grpc.CallOption) (*JoinNodeResponse, error) } type initClient struct { @@ -127,10 +216,23 @@ func (c *initClient) Bootstrap(ctx context.Context, in *BootstrapRequest, opts . return out, nil } +func (c *initClient) Join(ctx context.Context, in *JoinNodeRequest, opts ...grpc.CallOption) (*JoinNodeResponse, error) { + out := new(JoinNodeResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Init/Join", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // InitServer is the server API for Init service. type InitServer interface { - // Bootstrap an uninitialized cluster. + // Bootstrap an uninitialized cluster (inter-node links set up through the + // --join flags). Bootstrap(context.Context, *BootstrapRequest) (*BootstrapResponse, error) + // Join a bootstrapped cluster. If the target node is itself not part of a + // bootstrapped cluster, an appropriate error is returned. + Join(context.Context, *JoinNodeRequest) (*JoinNodeResponse, error) } func RegisterInitServer(s *grpc.Server, srv InitServer) { @@ -155,6 +257,24 @@ func _Init_Bootstrap_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Init_Join_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(JoinNodeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InitServer).Join(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Init/Join", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InitServer).Join(ctx, req.(*JoinNodeRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Init_serviceDesc = grpc.ServiceDesc{ ServiceName: "cockroach.server.serverpb.Init", HandlerType: (*InitServer)(nil), @@ -163,6 +283,10 @@ var _Init_serviceDesc = grpc.ServiceDesc{ MethodName: "Bootstrap", Handler: _Init_Bootstrap_Handler, }, + { + MethodName: "Join", + Handler: _Init_Join_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "server/serverpb/init.proto", @@ -204,6 +328,69 @@ func (m *BootstrapResponse) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *JoinNodeRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JoinNodeRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.MinSupportedVersion != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintInit(dAtA, i, uint64(m.MinSupportedVersion.Size())) + n1, err := m.MinSupportedVersion.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if len(m.Addr) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintInit(dAtA, i, uint64(len(m.Addr))) + i += copy(dAtA[i:], m.Addr) + } + return i, nil +} + +func (m *JoinNodeResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JoinNodeResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.ClusterID) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintInit(dAtA, i, uint64(len(m.ClusterID))) + i += copy(dAtA[i:], m.ClusterID) + } + if m.NodeID != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintInit(dAtA, i, uint64(m.NodeID)) + } + return i, nil +} + func encodeVarintInit(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -231,6 +418,39 @@ func (m *BootstrapResponse) Size() (n int) { return n } +func (m *JoinNodeRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.MinSupportedVersion != nil { + l = m.MinSupportedVersion.Size() + n += 1 + l + sovInit(uint64(l)) + } + l = len(m.Addr) + if l > 0 { + n += 1 + l + sovInit(uint64(l)) + } + return n +} + +func (m *JoinNodeResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ClusterID) + if l > 0 { + n += 1 + l + sovInit(uint64(l)) + } + if m.NodeID != 0 { + n += 1 + sovInit(uint64(m.NodeID)) + } + return n +} + func sovInit(x uint64) (n int) { for { n++ @@ -344,6 +564,218 @@ func (m *BootstrapResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *JoinNodeRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JoinNodeRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JoinNodeRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MinSupportedVersion", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthInit + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MinSupportedVersion == nil { + m.MinSupportedVersion = &roachpb.Version{} + } + if err := m.MinSupportedVersion.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addr", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthInit + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addr = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipInit(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthInit + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *JoinNodeResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JoinNodeResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JoinNodeResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ClusterID", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthInit + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ClusterID = append(m.ClusterID[:0], dAtA[iNdEx:postIndex]...) + if m.ClusterID == nil { + m.ClusterID = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + } + m.NodeID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NodeID |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipInit(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthInit + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipInit(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -449,19 +881,31 @@ var ( ErrIntOverflowInit = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("server/serverpb/init.proto", fileDescriptor_init_5d06be670cd568de) } - -var fileDescriptor_init_5d06be670cd568de = []byte{ - // 173 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2a, 0x4e, 0x2d, 0x2a, - 0x4b, 0x2d, 0xd2, 0x87, 0x50, 0x05, 0x49, 0xfa, 0x99, 0x79, 0x99, 0x25, 0x7a, 0x05, 0x45, 0xf9, - 0x25, 0xf9, 0x42, 0x92, 0xc9, 0xf9, 0xc9, 0xd9, 0x45, 0xf9, 0x89, 0xc9, 0x19, 0x7a, 0x10, 0x69, - 0x3d, 0x98, 0x2a, 0x25, 0x21, 0x2e, 0x01, 0xa7, 0xfc, 0xfc, 0x92, 0xe2, 0x92, 0xa2, 0xc4, 0x82, - 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0x25, 0x61, 0x2e, 0x41, 0x24, 0xb1, 0xe2, 0x82, 0xfc, - 0xbc, 0xe2, 0x54, 0xa3, 0x02, 0x2e, 0x16, 0xcf, 0xbc, 0xcc, 0x12, 0xa1, 0x0c, 0x2e, 0x4e, 0xb8, - 0xa4, 0x90, 0xb6, 0x1e, 0x4e, 0x93, 0xf5, 0xd0, 0x8d, 0x95, 0xd2, 0x21, 0x4e, 0x31, 0xc4, 0x3e, - 0x25, 0x06, 0x27, 0xad, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, - 0xf1, 0xc6, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, - 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x28, 0x0e, 0x98, 0xfe, 0x24, 0x36, 0xb0, 0x47, 0x8d, 0x01, - 0x01, 0x00, 0x00, 0xff, 0xff, 0x90, 0x15, 0x22, 0x79, 0x06, 0x01, 0x00, 0x00, +func init() { proto.RegisterFile("server/serverpb/init.proto", fileDescriptor_init_5ab0d48daa4548e1) } + +var fileDescriptor_init_5ab0d48daa4548e1 = []byte{ + // 366 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x4f, 0x6e, 0xe2, 0x30, + 0x18, 0xc5, 0xe3, 0x11, 0xc3, 0x4c, 0x3c, 0x33, 0x1a, 0x6a, 0xda, 0x8a, 0x66, 0x61, 0x50, 0xba, + 0x41, 0x80, 0x12, 0x89, 0xde, 0x20, 0x65, 0x93, 0x2e, 0x58, 0xa4, 0x52, 0x17, 0xdd, 0xa0, 0x10, + 0x5b, 0x60, 0xb5, 0xd8, 0xa9, 0xed, 0x70, 0x8e, 0x1e, 0x8b, 0x25, 0x52, 0x37, 0xac, 0x50, 0x1b, + 0x2e, 0x52, 0xe5, 0x5f, 0xa9, 0x90, 0x5a, 0xb1, 0x8a, 0xf5, 0xfc, 0x7b, 0x7e, 0xdf, 0xf7, 0x14, + 0x68, 0x29, 0x2a, 0x97, 0x54, 0xba, 0xc5, 0x27, 0x9e, 0xba, 0x8c, 0x33, 0xed, 0xc4, 0x52, 0x68, + 0x81, 0x2e, 0x22, 0x11, 0x3d, 0x48, 0x11, 0x46, 0x73, 0xa7, 0xb8, 0x76, 0x2a, 0xca, 0x3a, 0x9d, + 0x89, 0x99, 0xc8, 0x29, 0x37, 0x3b, 0x15, 0x06, 0xeb, 0x3c, 0x87, 0xe3, 0xa9, 0xbb, 0xa0, 0x3a, + 0x24, 0xa1, 0x0e, 0x0b, 0xdd, 0x46, 0xb0, 0xe1, 0x09, 0xa1, 0x95, 0x96, 0x61, 0x1c, 0xd0, 0xa7, + 0x84, 0x2a, 0x6d, 0x37, 0xe1, 0xc9, 0x27, 0x4d, 0xc5, 0x82, 0x2b, 0x6a, 0x27, 0xf0, 0xff, 0x8d, + 0x60, 0x7c, 0x2c, 0x08, 0x2d, 0x39, 0x34, 0x86, 0x67, 0x0b, 0xc6, 0x27, 0x2a, 0x89, 0x63, 0x21, + 0x35, 0x25, 0x93, 0x25, 0x95, 0x8a, 0x09, 0xde, 0x02, 0x1d, 0xd0, 0xfd, 0x33, 0xb4, 0x9c, 0xfd, + 0x90, 0x65, 0xba, 0x73, 0x57, 0x10, 0x41, 0x73, 0xc1, 0xf8, 0x6d, 0xe5, 0x2b, 0x45, 0x84, 0x60, + 0x2d, 0x24, 0x44, 0xb6, 0x7e, 0x74, 0x40, 0xd7, 0x0c, 0xf2, 0xb3, 0x4d, 0x61, 0x63, 0x1f, 0x5b, + 0x8c, 0x82, 0x06, 0x10, 0x46, 0x8f, 0x89, 0xd2, 0x54, 0x4e, 0x18, 0xc9, 0xc3, 0xfe, 0x7a, 0xff, + 0xd2, 0x6d, 0xdb, 0xbc, 0x2e, 0x54, 0x7f, 0x14, 0x98, 0x25, 0xe0, 0x13, 0x74, 0x09, 0x7f, 0x71, + 0x41, 0x68, 0x86, 0x66, 0x0f, 0xff, 0xf4, 0x60, 0xba, 0x6d, 0xd7, 0xb3, 0x07, 0xfd, 0x51, 0x50, + 0xcf, 0xae, 0x7c, 0x32, 0x7c, 0x01, 0xb0, 0xe6, 0x73, 0xa6, 0xd1, 0x1c, 0x9a, 0x1f, 0xbb, 0xa3, + 0xbe, 0xf3, 0x65, 0xcd, 0xce, 0x61, 0x6b, 0xd6, 0xe0, 0x38, 0xb8, 0xac, 0xd3, 0x40, 0x21, 0xac, + 0x65, 0x9b, 0xa1, 0xde, 0x37, 0xbe, 0x83, 0xc6, 0xad, 0xfe, 0x51, 0x6c, 0x15, 0xe1, 0xf5, 0x56, + 0x6f, 0xd8, 0x58, 0xa5, 0x18, 0xac, 0x53, 0x0c, 0x36, 0x29, 0x06, 0xaf, 0x29, 0x06, 0xcf, 0x3b, + 0x6c, 0xac, 0x77, 0xd8, 0xd8, 0xec, 0xb0, 0x71, 0xff, 0xbb, 0xb2, 0x4f, 0xeb, 0xf9, 0xff, 0x70, + 0xf5, 0x1e, 0x00, 0x00, 0xff, 0xff, 0x53, 0x52, 0x9e, 0x7f, 0x76, 0x02, 0x00, 0x00, } diff --git a/pkg/server/serverpb/init.proto b/pkg/server/serverpb/init.proto index 491115f9a502..c963f9b5cfe1 100644 --- a/pkg/server/serverpb/init.proto +++ b/pkg/server/serverpb/init.proto @@ -12,14 +12,43 @@ syntax = "proto3"; package cockroach.server.serverpb; option go_package = "serverpb"; -message BootstrapRequest { +import "gogoproto/gogo.proto"; +import "roachpb/metadata.proto"; + +message BootstrapRequest { } +message BootstrapResponse { } + +// JoinNodeRequest is used to specify to the server node what the client's +// MinimumSupportedVersion is. If it's not compatible with the rest of the +// cluster, the join attempt is refused. +message JoinNodeRequest { + roachpb.Version min_supported_version = 1; + + // TODO(irfansharif): Use this field to provide the client's address so that + // the server is able to reach back to it, setting up bidirectional network + // links. + string addr = 2; } -message BootstrapResponse { +// JoinNodeResponse informs the joining node what the cluster id is, and what +// node id was allocated to it. +// +// TODO(irfansharif): We should use this RPC to tell us the right cluster +// version to use (instead of using the minimum possible version and relying on +// gossip to bump to for us). +// TODO(irfansharif): We should use this RPC to also generate store IDs, instead +// of having each node do it for itself after being handed out a node ID. +message JoinNodeResponse { + bytes cluster_id = 1 [(gogoproto.customname) = "ClusterID"]; + int32 node_id = 2 [(gogoproto.customname) = "NodeID"]; } service Init { - // Bootstrap an uninitialized cluster. - rpc Bootstrap(BootstrapRequest) returns (BootstrapResponse) { - } + // Bootstrap an uninitialized cluster (inter-node links set up through the + // --join flags). + rpc Bootstrap(BootstrapRequest) returns (BootstrapResponse) { } + + // Join a bootstrapped cluster. If the target node is itself not part of a + // bootstrapped cluster, an appropriate error is returned. + rpc Join(JoinNodeRequest) returns (JoinNodeResponse) { } } diff --git a/pkg/util/grpcutil/log.go b/pkg/util/grpcutil/log.go index 19fea0454d9d..1bbfb18fd4f2 100644 --- a/pkg/util/grpcutil/log.go +++ b/pkg/util/grpcutil/log.go @@ -72,7 +72,7 @@ func (severity *logger) Warning(args ...interface{}) { if log.Severity(*severity) > log.Severity_WARNING { return } - if shouldPrint(connectivitySpamRe, 30*time.Second, args...) { + if shouldPrint(ConnectivitySpamRe, 30*time.Second, args...) { log.WarningfDepth(context.TODO(), 2, "", args...) } } @@ -146,7 +146,7 @@ func (severity *logger) V(i int) bool { // https://github.com/grpc/grpc-go/blob/v1.29.1/clientconn.go#L1275 var ( transportFailedRe = regexp.MustCompile(`^` + regexp.QuoteMeta(`grpc: addrConn.createTransport failed to connect to`)) - connectionRefusedRe = regexp.MustCompile( + ConnectionRefusedRe = regexp.MustCompile( strings.Join([]string{ // *nix regexp.QuoteMeta("connection refused"), @@ -159,8 +159,8 @@ var ( ) clientConnReuseRe = regexp.MustCompile("cannot reuse client connection") - connectivitySpamRe = regexp.MustCompile(transportFailedRe.String() + `.*` + - "(" + connectionRefusedRe.String() + "|" + clientConnReuseRe.String() + ")") + ConnectivitySpamRe = regexp.MustCompile(transportFailedRe.String() + `.*` + + "(" + ConnectionRefusedRe.String() + "|" + clientConnReuseRe.String() + ")") ) var spamMu = struct {