diff --git a/pkg/cli/init.go b/pkg/cli/init.go index d8e34c5626c2..3cc744e8c7ab 100644 --- a/pkg/cli/init.go +++ b/pkg/cli/init.go @@ -51,7 +51,6 @@ func runInit(cmd *cobra.Command, args []string) error { // Actually perform cluster initialization. c := serverpb.NewInitClient(conn) - if _, err = c.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { return err } diff --git a/pkg/cli/start.go b/pkg/cli/start.go index 6e8b7ee97ec2..99484cde6e76 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -670,7 +670,7 @@ func runStart(cmd *cobra.Command, args []string, disableReplication bool) error if waitForInit { log.Shout(ctx, log.Severity_INFO, - "initial startup completed.\n"+ + "initial startup completed\n"+ "Node will now attempt to join a running cluster, or wait for `cockroach init`.\n"+ "Client connections will be accepted after this completes successfully.\n"+ "Check the log file(s) for progress. ") diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index f14d38e4f5c8..0b7f67c4c9af 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -559,7 +559,6 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 14}, }, // Add new versions here (step two of two). - }) // TODO(irfansharif): clusterversion.binary{,MinimumSupported}Version diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index bd1b89c9282c..ae4d775494b2 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -319,6 +319,10 @@ func runDecommissionRecommission(ctx context.Context, t *test, c *cluster) { Multiplier: 2, } + // XXX: Hack, to see node come into liveness table. We should do this as + // part of the connect rpc. + time.Sleep(10 * time.Second) + // Partially decommission then recommission a random node, from another // random node. Run a couple of status checks while doing so. { diff --git a/pkg/gossip/client.go b/pkg/gossip/client.go index 9b8c536c160f..c0c0dd519263 100644 --- a/pkg/gossip/client.go +++ b/pkg/gossip/client.go @@ -275,7 +275,7 @@ func (c *client) handleResponse(ctx context.Context, g *Gossip, reply *Response) // Check whether this outgoing client is duplicating work already // being done by an incoming client, either because an outgoing // matches an incoming or the client is connecting to itself. - if nodeID := g.NodeID.Get(); nodeID == c.peerID { + if nodeID := g.NodeID.Get(); nodeID != 0 && nodeID == c.peerID { return errors.Errorf("stopping outgoing client to n%d (%s); loopback connection", c.peerID, c.addr) } else if g.hasIncomingLocked(c.peerID) && nodeID > c.peerID { // To avoid mutual shutdown, we only shutdown our client if our diff --git a/pkg/server/config.go b/pkg/server/config.go index 547e423e85ec..3a894dc39086 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -172,8 +172,9 @@ type KVConfig struct { // in zone configs. Attrs string - // JoinList is a list of node addresses that act as bootstrap hosts for - // connecting to the gossip network. + // JoinList is a list of node addresses that is used to form a network of KV + // servers. Assuming a connected graph, it suffices to initialize any server + // in the network. JoinList base.JoinListType // JoinPreferSRVRecords, if set, causes the lookup logic for the @@ -646,11 +647,13 @@ func (cfg *Config) InitNode(ctx context.Context) error { // FilterGossipBootstrapResolvers removes any gossip bootstrap resolvers which // match either this node's listen address or its advertised host address. -func (cfg *Config) FilterGossipBootstrapResolvers( - ctx context.Context, listen, advert net.Addr, -) []resolver.Resolver { +func (cfg *Config) FilterGossipBootstrapResolvers(ctx context.Context) []resolver.Resolver { + var listen, advert net.Addr + listen = util.NewUnresolvedAddr("tcp", cfg.Addr) + advert = util.NewUnresolvedAddr("tcp", cfg.AdvertiseAddr) filtered := make([]resolver.Resolver, 0, len(cfg.GossipBootstrapResolvers)) addrs := make([]string, 0, len(cfg.GossipBootstrapResolvers)) + for _, r := range cfg.GossipBootstrapResolvers { if r.Addr() == advert.String() || r.Addr() == listen.String() { if log.V(1) { diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index 71824616979d..d88f14ab868c 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip/resolver" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -191,10 +190,10 @@ func TestFilterGossipBootstrapResolvers(t *testing.T) { } cfg := MakeConfig(context.Background(), cluster.MakeTestingClusterSettings()) cfg.GossipBootstrapResolvers = resolvers + cfg.Addr = resolverSpecs[0] + cfg.AdvertiseAddr = resolverSpecs[2] - listenAddr := util.MakeUnresolvedAddr("tcp", resolverSpecs[0]) - advertAddr := util.MakeUnresolvedAddr("tcp", resolverSpecs[2]) - filtered := cfg.FilterGossipBootstrapResolvers(context.Background(), &listenAddr, &advertAddr) + filtered := cfg.FilterGossipBootstrapResolvers(context.Background()) if len(filtered) != 1 { t.Fatalf("expected one resolver; got %+v", filtered) } else if filtered[0].Addr() != resolverSpecs[1] { diff --git a/pkg/server/grpc_server.go b/pkg/server/grpc_server.go index 08627cbdc694..8de431df78bd 100644 --- a/pkg/server/grpc_server.go +++ b/pkg/server/grpc_server.go @@ -68,6 +68,7 @@ var rpcsAllowedWhileBootstrapping = map[string]struct{}{ "/cockroach.rpc.Heartbeat/Ping": {}, "/cockroach.gossip.Gossip/Gossip": {}, "/cockroach.server.serverpb.Init/Bootstrap": {}, + "/cockroach.server.serverpb.Init/Join": {}, "/cockroach.server.serverpb.Admin/Health": {}, } diff --git a/pkg/server/init.go b/pkg/server/init.go index 0e93ca3d25b5..cab5ec653644 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -13,97 +13,134 @@ package server import ( "context" "fmt" + "strings" + "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/gossip/resolver" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // ErrClusterInitialized is reported when the Bootstrap RPC is run on // a node that is already part of an initialized cluster. var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized") +// ErrNodeUninitialized is reported when the Join RPC is run against +// a node that itself is uninitialized. +var ErrNodeUninitialized = fmt.Errorf("node has not been initialized") + +// ErrJoinRPCUnimplemented is reported when the Join RPC is run against +// a node that does not know about the join RPC (i.e. it is running 20.1 or +// below). +// +// TODO(irfansharif): Remove this in 21.1. +var ErrJoinRPCUnimplemented = fmt.Errorf("node does not implement join rpc") + // initServer handles the bootstrapping process. It is instantiated early in the // server startup sequence to determine whether a NodeID and ClusterID are // available (true if and only if an initialized store is present). If all // engines are empty, either a new cluster needs to be started (via incoming -// Bootstrap RPC) or an existing one joined. Either way, the goal is to learn a -// ClusterID and NodeID (and initialize at least one store). All of this -// subtlety is encapsulated by the initServer, which offers a primitive -// ServeAndWait() after which point the startup code can assume that the -// Node/ClusterIDs are known. -// -// TODO(tbg): at the time of writing, when joining an existing cluster for the -// first time, the initServer provides only the clusterID. Fix this by giving -// the initServer a *kv.DB that it can use to assign a NodeID and StoreID, and -// later by switching to the connect RPC (#32574). +// Bootstrap RPC) or an existing one joined (via the outgoing Join RPC). Either +// way, the goal is to learn a ClusterID and NodeID (and initialize at least one +// store). All of this subtlety is encapsulated by the initServer, which offers +// a primitive ServeAndWait() after which point the startup code can assume that +// the Node/ClusterIDs are known. type initServer struct { + log.AmbientContext + // rpcContext embeds the fields needed by the RPC subsystem (the init server + // sends out Join RPCs during bootstrap). + rpcContext *rpc.Context + + // config houses a few configuration options needed by the init server. + config initServerCfg + mu struct { + // This mutex is grabbed during bootstrap and is used to serialized + // bootstrap attempts (and attempts to read whether or not his node has + // been bootstrapped). syncutil.Mutex - // If set, a Bootstrap() call is rejected with this error. + // If we encounter an unrecognized error during bootstrap, we use this + // field to block out future bootstrap attempts. rejectErr error } - // The version at which to bootstrap the cluster in Bootstrap(). - bootstrapVersion roachpb.Version - // The zone configs to bootstrap with. - bootstrapZoneConfig, bootstrapSystemZoneConfig *zonepb.ZoneConfig + // The state of the engines. This tells us whether the node is already - // bootstrapped. The goal of the initServer is to complete this by the - // time ServeAndWait returns. + // bootstrapped. The goal of the initServer is to stub this out by the time + // ServeAndWait returns. + // + // TODO(irfansharif): The ownership/access of this is a bit all over the + // place, and can be sanitized. inspectState *initDiskState - // If Bootstrap() succeeds, resulting initState will go here (to be consumed - // by ServeAndWait). + // If this CRDB node was `cockroach init`-ialized, the resulting init state + // will be passed through to this channel. bootstrapReqCh chan *initState + // If this CRDB node was able to join a bootstrapped cluster, the resulting + // init state will be passed through to this channel. + joinCh chan *initState + + // We hold onto a *kv.DB object to assign node IDs through the Join RPC. + db *kv.DB + + // resolvers is a list of node addresses that is used to form a connected + // graph/network of CRDB servers. Once a connected graph is constructed, it + // suffices for any node in the network to be initialized (which then + // propagates the cluster ID to the rest of the nodes). + // + // TODO(irfansharif): This last sentence is not yet true. Update this list + // is continually to maintain bidirectional network links. We want it so + // that it suffices to bootstrap n2 after having n2 point to n1 and n1 point + // to itself. n1 can learn about n2's address and attempt to connect back to + // it. We'll need to make sure that each server loops through its join list + // at least once so that the "links" are setup. We don't want get + // insta-bootstrapped and never set up those links. + resolvers []resolver.Resolver } -func setupInitServer( - ctx context.Context, - binaryVersion, binaryMinSupportedVersion roachpb.Version, - bootstrapVersion roachpb.Version, - bootstrapZoneConfig, bootstrapSystemZoneConfig *zonepb.ZoneConfig, - engines []storage.Engine, +func newInitServer( + actx log.AmbientContext, + rpcContext *rpc.Context, + inspectState *initDiskState, + db *kv.DB, + config initServerCfg, ) (*initServer, error) { - inspectState, err := inspectEngines(ctx, engines, binaryVersion, binaryMinSupportedVersion) - if err != nil { - return nil, err - } - s := &initServer{ + AmbientContext: actx, bootstrapReqCh: make(chan *initState, 1), - - inspectState: inspectState, - bootstrapVersion: bootstrapVersion, - bootstrapZoneConfig: bootstrapZoneConfig, - bootstrapSystemZoneConfig: bootstrapSystemZoneConfig, + inspectState: inspectState, + joinCh: make(chan *initState, 1), + rpcContext: rpcContext, + config: config, + db: db, + resolvers: config.resolvers(), } - - if len(inspectState.initializedEngines) > 0 { - // We have a NodeID/ClusterID, so don't allow bootstrap. - s.mu.rejectErr = ErrClusterInitialized - } - return s, nil } // initDiskState contains the part of initState that is read from stable // storage. // -// TODO(tbg): the above is a lie in the case in which we join an existing +// NB: The above is a lie in the case in which we join an existing mixed-version // cluster. In that case, the state returned from ServeAndWait will have the -// clusterID set from Gossip (and there will be no NodeID). The plan is to -// allocate the IDs in ServeAndWait itself eventually, at which point the -// lie disappears. +// clusterID set from Gossip (and there will be no NodeID). This is holdover +// behavior that can be removed in 21.1, at which point the lie disappears. type initDiskState struct { // nodeID is zero if joining an existing cluster. // @@ -149,20 +186,35 @@ func (s *initServer) NeedsInit() bool { // ServeAndWait waits until the server is ready to bootstrap. In the common case // of restarting an existing node, this immediately returns. When starting with // a blank slate (i.e. only empty engines), it waits for incoming Bootstrap -// request or for Gossip to connect (whichever happens earlier). +// request or for a successful Join RPC, whichever happens earlier. See [1]. // -// The returned initState may not reflect a bootstrapped cluster yet, but it -// is guaranteed to have a ClusterID set. +// The returned initState reflects a bootstrapped cluster (i.e. it has a cluster +// ID and a node ID for this server). See [2]. // // This method must be called only once. // +// NB: A gotcha that may not immediately be obvious is that we can never hope to +// have all stores initialized by the time ServeAndWait returns. This is because +// if this server is already bootstrapped, it might hold a replica of the range +// backing the StoreID allocating counter, and letting this server start may be +// necessary to restore quorum to that range. So in general, after this TODO, we +// will always leave this method with at least one store initialized, but not +// necessarily all. This is fine, since initializing additional stores later is +// easy. +// // TODO(tbg): give this a KV client and thus initialize at least one store in // all cases. +// +// [1]: In mixed version clusters it waits until Gossip connects (but this is +// slated to be removed in 21.1). +// [2]: This is not technically true for mixed version clusters where we leave +// the node ID unassigned until later, but this too is part of the deprecated +// init server behavior that is slated for removal in 21.1. func (s *initServer) ServeAndWait( - ctx context.Context, stopper *stop.Stopper, sv *settings.Values, g *gossip.Gossip, + ctx context.Context, stopper *stop.Stopper, sv *settings.Values, gossip *gossip.Gossip, ) (*initState, error) { + // If already bootstrapped, return early. if !s.NeedsInit() { - // If already bootstrapped, return early. return &initState{ initDiskState: *s.inspectState, joined: false, @@ -170,71 +222,124 @@ func (s *initServer) ServeAndWait( }, nil } - log.Info(ctx, "no stores bootstrapped and --join flag specified, awaiting "+ - "init command or join with an already initialized node.") - - select { - case <-stopper.ShouldQuiesce(): - return nil, stop.ErrUnavailable - case state := <-s.bootstrapReqCh: - // Bootstrap() did its job. At this point, we know that the cluster - // version will be bootstrapVersion (=state.clusterVersion.Version), but - // the version setting does not know yet (it was initialized as - // BinaryMinSupportedVersion because the engines were all - // uninitialized). We *could* just let the server start, and it would - // populate system.settings, which is then gossiped, and then the - // callback would update the version, but we take this shortcut to avoid - // having every freshly bootstrapped cluster spend time at an old - // cluster version. - if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, sv); err != nil { - return nil, err - } + log.Info(ctx, "no stores bootstrapped") + log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node") - log.Infof(ctx, "**** cluster %s has been created", state.clusterID) - return state, nil - case <-g.Connected: - // Gossip connected, that is, we know a ClusterID. Due to the early - // return above, we know that all of our engines are empty, i.e. we - // don't have a NodeID yet (and the cluster version is the minimum we - // support). Commence startup; the Node will realize it's short a NodeID - // and will request one. - // - // TODO(tbg): use a kv.DB to get NodeID and StoreIDs when necessary and - // set everything up here. This will take the Node out of that business - // entirely and means we'll need much fewer NodeID/ClusterIDContainers. - // (It's also so much simpler to think about). The RPC will also tell us - // a cluster version to use instead of the lowest possible one (reducing - // the short amount of time until the Gossip hook bumps the version); - // this doesn't fix anything but again, is simpler to think about. A - // gotcha that may not immediately be obvious is that we can never hope - // to have all stores initialized by the time ServeAndWait returns. This - // is because *if this server is already bootstrapped*, it might hold a - // replica of the range backing the StoreID allocating counter, and - // letting this server start may be necessary to restore quorum to that - // range. So in general, after this TODO, we will always leave this - // method with *at least one* store initialized, but not necessarily - // all. This is fine, since initializing additional stores later is - // easy. - clusterID, err := g.GetClusterID() - if err != nil { + joinCtx, cancelJoin := context.WithCancel(ctx) + defer cancelJoin() + + errCh := make(chan error, 1) + if err := stopper.RunTask(joinCtx, "init server: join loop", func(joinCtx context.Context) { + stopper.RunWorker(joinCtx, func(joinCtx context.Context) { + errCh <- s.startJoinLoop(joinCtx, stopper) + }) + }); err != nil { + return nil, err + } + + // gossipConnectedCh is used as a place holder for gossip.Connected. We + // don't trigger on gossip connectivity unless we have to, favoring instead + // the join RPC to discover the cluster ID (and node ID). If we're in a + // mixed-version cluster however (with 20.1 nodes), we'll fall back to using + // the legacy gossip connectivity mechanism to discover the cluster ID. + var gossipConnectedCh chan struct{} + for { + select { + case state := <-s.bootstrapReqCh: + // Bootstrap() did its job. At this point, we know that the cluster + // version will be the bootstrap version (aka the binary version[1]), + // but the version setting does not know yet (it was initialized as + // BinaryMinSupportedVersion because the engines were all + // uninitialized). We *could* just let the server start, and it + // would populate system.settings, which is then gossiped, and then + // the callback would update the version, but we take this shortcut + // to avoid having every freshly bootstrapped cluster spend time at + // an old cluster version. + // + // [1]: See the top-level comment in pkg/clusterversion to make + // sense of the many versions of...versions. + if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, sv); err != nil { + return nil, err + } + + log.Infof(ctx, "**** cluster %s has been created", state.clusterID) + log.Infof(ctx, "**** allocated node id %d for self", state.nodeID) + + s.inspectState.clusterID = state.clusterID + s.inspectState.initializedEngines = state.initializedEngines + + // Ensure we're draining out join attempt. + cancelJoin() + if err := <-errCh; err != nil { + log.Errorf(ctx, "**** error draining join thread: %v", err) + } + + return state, nil + case state := <-s.joinCh: + // TODO(irfansharif): Right now this doesn't actually do anything. + // We should have the Join RPC funnel in the right version to use. + if err := clusterversion.Initialize(ctx, state.clusterVersion.Version, sv); err != nil { + return nil, err + } + + log.Infof(ctx, "**** joined cluster %s through join rpc", state.clusterID) + log.Infof(ctx, "**** received node id %d for self", state.nodeID) + + s.inspectState.clusterID = state.clusterID + // Ensure we're draining out join attempt. + cancelJoin() + if err := <-errCh; err != nil { + log.Errorf(ctx, "**** error draining join thread: %v", err) + } + + return state, nil + case <-gossipConnectedCh: + // We're in a mixed-version cluster, so we retain the legacy + // behavior of retrieving the cluster ID and deferring node ID + // allocation (happens in (*Node).start). + // + // TODO(irfansharif): Remove this in 21.1. + + // Gossip connected, that is, we know a ClusterID. Due to the early + // return above, we know that all of our engines are empty, i.e. we + // don't have a NodeID yet (and the cluster version is the minimum we + // support). Commence startup; the Node will realize it's short a + // NodeID and will request one. + clusterID, err := gossip.GetClusterID() + if err != nil { + return nil, err + } + + s.inspectState.clusterID = clusterID + state := &initState{ + initDiskState: *s.inspectState, + joined: true, + bootstrapped: false, + } + log.Infof(ctx, "**** joined cluster %s through gossip (legacy behavior)", state.clusterID) + return state, nil + case err := <-errCh: + if errors.Is(err, ErrJoinRPCUnimplemented) { + // We're in a mixed-version cluster, we're going to wire up the + // gossip connectivity mechanism to discover the cluster ID. + gossipConnectedCh = gossip.Connected + continue + } + log.Errorf(ctx, "error in attempting to join: %v", err) return nil, err + case <-stopper.ShouldQuiesce(): + return nil, stop.ErrUnavailable } - s.inspectState.clusterID = clusterID - return &initState{ - initDiskState: *s.inspectState, - joined: true, - bootstrapped: false, - }, nil } } var errInternalBootstrapError = errors.New("unable to bootstrap due to internal error") -// Bootstrap implements the serverpb.Init service. Users set up a new -// CockroachDB server by calling this endpoint on *exactly one node* in the -// cluster (retrying only on that node). -// Attempting to bootstrap a node that was already bootstrapped will result in -// an error. +// Bootstrap implements the serverpb.Init service. Users set up a new CRDB +// cluster by calling this endpoint on exactly one node in the cluster +// (typically retrying only on that node). This endpoint is what powers +// `cockroach init`. Attempting to bootstrap a node that was already +// bootstrapped will result in an `ErrClusterInitialized` error. // // NB: there is no protection against users erroneously bootstrapping multiple // nodes. In that case, they end up with more than one cluster, and nodes @@ -248,6 +353,10 @@ func (s *initServer) Bootstrap( s.mu.Lock() defer s.mu.Unlock() + if !s.NeedsInit() { + return nil, ErrClusterInitialized + } + if s.mu.rejectErr != nil { return nil, s.mu.rejectErr } @@ -258,18 +367,150 @@ func (s *initServer) Bootstrap( s.mu.rejectErr = errInternalBootstrapError return nil, s.mu.rejectErr } - s.mu.rejectErr = ErrClusterInitialized + s.bootstrapReqCh <- state return &serverpb.BootstrapResponse{}, nil } +// Join implements the serverpb.Init service. This is the "connectivity" API; +// individual CRDB servers are passed in a --join list and the join targets are +// addressed through this API. +// +// TODO(irfansharif): Perhaps we could opportunistically create a liveness +// record here so as to no longer have to worry about the liveness record not +// existing for a given node. +func (s *initServer) Join( + ctx context.Context, _ *serverpb.JoinNodeRequest, +) (*serverpb.JoinNodeResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.NeedsInit() { + // Server has not been bootstrapped yet + return nil, ErrNodeUninitialized + } + + ctxWithSpan, span := s.AnnotateCtxWithSpan(ctx, "alloc-node-id") + defer span.Finish() + + nodeID, err := allocateNodeID(ctxWithSpan, s.db) + if err != nil { + return nil, err + } + + log.Infof(ctxWithSpan, "**** allocated new node id %d", nodeID) + return &serverpb.JoinNodeResponse{ + ClusterID: s.inspectState.clusterID.GetBytes(), + NodeID: int32(nodeID), + }, nil +} + +func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) error { + dialOpts, err := s.rpcContext.GRPCDialOptions() + if err != nil { + return err + } + + var conns []*grpc.ClientConn + for _, r := range s.resolvers { + conn, err := grpc.DialContext(ctx, r.Addr(), dialOpts...) + if err != nil { + return err + } + + stopper.RunWorker(ctx, func(ctx context.Context) { + <-stopper.ShouldQuiesce() + if err := conn.Close(); err != nil { + log.Fatalf(ctx, "%v", err) + } + }) + conns = append(conns, conn) + } + + const joinRPCBackoff = time.Second + var tickChan <-chan time.Time + { + ticker := time.NewTicker(joinRPCBackoff) + tickChan = ticker.C + defer ticker.Stop() + } + + for idx := 0; ; idx = (idx + 1) % len(conns) { + select { + case <-tickChan: + success, err := s.attemptJoin(ctx, s.resolvers[idx].Addr(), conns[idx]) + if err != nil { + return err + } + if !success { + continue + } + return nil + case <-ctx.Done(): + return nil + case <-stopper.ShouldQuiesce(): + return nil + } + } +} + +func (s *initServer) attemptJoin( + ctx context.Context, addr string, conn *grpc.ClientConn, +) (success bool, err error) { + initClient := serverpb.NewInitClient(conn) + req := &serverpb.JoinNodeRequest{ + MinSupportedVersion: &clusterversion.TestingBinaryMinSupportedVersion, + Addr: s.config.advertiseAddr(), + } + resp, err := initClient.Join(ctx, req) + if err != nil { + if strings.Contains(err.Error(), ErrNodeUninitialized.Error()) { + log.Warningf(ctx, "node running on %s is itself uninitialized, retrying..", addr) + return false, nil + } + + if grpcutil.ConnectionRefusedRe.MatchString(err.Error()) { + log.Warningf(ctx, "unable to connect to %s, retrying..", addr) + return false, nil + } + + // If the target node does not implement the join RPC, we're in a + // mixed-version cluster and are talking to a v20.1 node. We error out + // so the init server knows to fall back on the gossip-based discovery + // mechanism for the clusterID. + if code := status.Code(errors.Cause(err)); code == codes.Unimplemented && + strings.Contains(err.Error(), `unknown method Join`) { + log.Warningf(ctx, "%s running an older version", addr) + return false, ErrJoinRPCUnimplemented + } + + return false, err + } + + clusterID, err := uuid.FromBytes(resp.ClusterID) + if err != nil { + return false, err + } + + s.inspectState.clusterID = clusterID + s.inspectState.nodeID = roachpb.NodeID(resp.NodeID) + state := &initState{ + initDiskState: *s.inspectState, + joined: true, + bootstrapped: false, + } + + s.joinCh <- state + return true, nil +} + func (s *initServer) tryBootstrap(ctx context.Context) (*initState, error) { - cv := clusterversion.ClusterVersion{Version: s.bootstrapVersion} + cv := clusterversion.ClusterVersion{Version: s.config.bootstrapVersion()} if err := kvserver.WriteClusterVersionToEngines(ctx, s.inspectState.newEngines, cv); err != nil { return nil, err } return bootstrapCluster( - ctx, s.inspectState.newEngines, s.bootstrapZoneConfig, s.bootstrapSystemZoneConfig, + ctx, s.inspectState.newEngines, s.config.defaultZoneConfig(), s.config.defaultSystemZoneConfig(), ) } @@ -278,3 +519,40 @@ func (s *initServer) tryBootstrap(ctx context.Context) (*initState, error) { func (s *initServer) DiskClusterVersion() clusterversion.ClusterVersion { return s.inspectState.clusterVersion } + +// initServerCfg is a thin wrapper around the server Config object, exposing +// only the fields needed by the init server. +type initServerCfg struct { + wrapped Config +} + +// bootstrapVersion returns the version at which to bootstrap the cluster in +// Bootstrap(). +func (c *initServerCfg) bootstrapVersion() roachpb.Version { + bootstrapVersion := c.wrapped.Settings.Version.BinaryVersion() + if knobs := c.wrapped.TestingKnobs.Server; knobs != nil { + if ov := knobs.(*TestingKnobs).BootstrapVersionOverride; ov != (roachpb.Version{}) { + bootstrapVersion = ov + } + } + return bootstrapVersion +} + +// defaultZoneConfig returns the zone config to bootstrap with. +func (c *initServerCfg) defaultZoneConfig() *zonepb.ZoneConfig { + return &c.wrapped.DefaultZoneConfig +} + +// defaultSystemZoneConfig returns the zone config to bootstrap system ranges +// with. +func (c *initServerCfg) defaultSystemZoneConfig() *zonepb.ZoneConfig { + return &c.wrapped.DefaultSystemZoneConfig +} + +func (c *initServerCfg) resolvers() []resolver.Resolver { + return c.wrapped.FilterGossipBootstrapResolvers(context.Background()) +} + +func (c *initServerCfg) advertiseAddr() string { + return c.wrapped.AdvertiseAddr +} diff --git a/pkg/server/node.go b/pkg/server/node.go index d36035a8b021..f3854535cb47 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -347,6 +347,10 @@ func (n *Node) start( n.initialBoot = state.joined nodeID := state.nodeID if nodeID == 0 { + // TODO(irfansharif): This codepath exists to maintain the legacy + // behavior of node ID allocation that was triggered on gossip + // connectivity. This was replaced by the Join RPC in 20.2, and can be + // removed in 21.1. if !state.joined { log.Fatalf(ctx, "node has no NodeID, but claims to not be joining cluster") } diff --git a/pkg/server/server.go b/pkg/server/server.go index 3fd7b214b273..5be44911c1b1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1017,6 +1017,17 @@ func (s *Server) startPersistingHLCUpperBound( // // The passed context can be used to trace the server startup. The context // should represent the general startup operation. +// +// XXX: Write a summary. What are _all_ the components I care about? +// - http server/admin UI +// - engines +// - rpc servers (Batch, Rangefeed, SQL, admin, status, authentication, +// tsServer) +// - init server +// - gossip +// - cluster version +// - /health end point +// - bootstrap func (s *Server) Start(ctx context.Context) error { ctx = s.AnnotateCtx(ctx) @@ -1070,29 +1081,25 @@ func (s *Server) Start(ctx context.Context) error { blobs.NewBlobClientFactory(s.nodeIDContainer.Get(), s.nodeDialer, s.st.ExternalIODir), &fileTableInternalExecutor, s.db) - bootstrapVersion := s.cfg.Settings.Version.BinaryVersion() - if knobs := s.cfg.TestingKnobs.Server; knobs != nil { - if ov := knobs.(*TestingKnobs).BootstrapVersionOverride; ov != (roachpb.Version{}) { - bootstrapVersion = ov - } - } - // Set up the init server. We have to do this relatively early because we // can't call RegisterInitServer() after `grpc.Serve`, which is called in // startRPCServer (and for the loopback grpc-gw connection). - initServer, err := setupInitServer( + initConfig := initServerCfg{wrapped: s.cfg} + inspectState, err := inspectEngines( ctx, + s.engines, s.cfg.Settings.Version.BinaryVersion(), s.cfg.Settings.Version.BinaryMinSupportedVersion(), - bootstrapVersion, - &s.cfg.DefaultZoneConfig, - &s.cfg.DefaultSystemZoneConfig, - s.engines, ) if err != nil { return err } + initServer, err := newInitServer(s.cfg.AmbientCtx, s.rpcContext, inspectState, s.db, initConfig) + if err != nil { + return err + } + { // Set up the callback that persists gossiped version bumps to the // engines. The invariant we uphold here is that the bump needs to be @@ -1283,13 +1290,24 @@ func (s *Server) Start(ctx context.Context) error { } } - // Filter the gossip bootstrap resolvers based on the listen and - // advertise addresses. - listenAddrU := util.NewUnresolvedAddr("tcp", s.cfg.Addr) + // Filter out self from the gossip bootstrap resolvers. + filtered := s.cfg.FilterGossipBootstrapResolvers(ctx) + advAddrU := util.NewUnresolvedAddr("tcp", s.cfg.AdvertiseAddr) advSQLAddrU := util.NewUnresolvedAddr("tcp", s.cfg.SQLAdvertiseAddr) advTenantAddrU := util.NewUnresolvedAddr("tcp", s.cfg.TenantAdvertiseAddr) - filtered := s.cfg.FilterGossipBootstrapResolvers(ctx, listenAddrU, advAddrU) + + // We need gossip to get spun up before the init server (which internally + // makes use of KV to allocate node IDs_. In an ideal world we'd be able to + // only spin up the very small subset of KV we need to allocate node IDs (or + // you can imagine being able to fetch node IDs from elsewhere entirely), + // but today there's this awkward dance best illustrated by the following + // example: + // + // In a two node cluster where n1 is started+bootstrapped, when n2 + // contacts n1 to allocate its node id, n1 actually needs gossip + // connectivity with n2 to have KV functioning, in order to allocate + // the node id for it. s.gossip.Start(advAddrU, filtered) log.Event(ctx, "started gossip") @@ -1305,8 +1323,6 @@ func (s *Server) Start(ctx context.Context) error { if _, err := initServer.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { return err } - } else { - log.Info(ctx, "awaiting init command or join with an already initialized node.") } // Set up calling s.cfg.ReadyFn at the right time. Essentially, this call @@ -1330,9 +1346,9 @@ func (s *Server) Start(ctx context.Context) error { } // This opens the main listener. When the listener is open, we can call - // initServerReadyFn since any request initiated to the initServer at that - // point will reach it once ServeAndWait starts handling the queue of incoming - // connections. + // onInitServerReady since any request initiated to the initServer at that + // point will reach it once ServeAndWait starts handling the queue of + // incoming connections. startRPCServer(workersCtx) onInitServerReady() state, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV, s.gossip) @@ -1343,6 +1359,9 @@ func (s *Server) Start(ctx context.Context) error { s.rpcContext.ClusterID.Set(ctx, state.clusterID) // If there's no NodeID here, then we didn't just bootstrap. The Node will // read its ID from the stores or request a new one via KV. + // + // TODO(irfansharif): Delete this once we 20.2 is cut. This only exists to + // be compatible with 20.1 clusters. if state.nodeID != 0 { s.rpcContext.NodeID.Set(ctx, state.nodeID) } @@ -1372,7 +1391,7 @@ func (s *Server) Start(ctx context.Context) error { // demonstrate that we're not doing anything functional here (and to // prevent bugs during further refactors). if s.rpcContext.ClusterID.Get() == uuid.Nil { - return errors.New("gossip should already be connected") + return errors.New("programming error: expected cluster id to be populated in rpc context") } unregister := s.gossip.RegisterCallback(gossip.KeyClusterID, func(string, roachpb.Value) { clusterID, err := s.gossip.GetClusterID() @@ -1391,7 +1410,7 @@ func (s *Server) Start(ctx context.Context) error { // but this gossip only happens once the first range has a leaseholder, i.e. // when a quorum of nodes has gone fully operational. _ = s.stopper.RunAsyncTask(ctx, "connect-gossip", func(ctx context.Context) { - log.Infof(ctx, "connecting to gossip network to verify cluster ID %q", state.clusterID) + log.Infof(ctx, "connecting to gossip network to verify cluster id %q", state.clusterID) select { case <-s.gossip.Connected: log.Infof(ctx, "node connected via gossip") diff --git a/pkg/server/serverpb/init.pb.go b/pkg/server/serverpb/init.pb.go index 6a132146a2c6..fbc41f4dd300 100644 --- a/pkg/server/serverpb/init.pb.go +++ b/pkg/server/serverpb/init.pb.go @@ -6,6 +6,7 @@ package serverpb import proto "github.com/gogo/protobuf/proto" import fmt "fmt" import math "math" +import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" import ( context "context" @@ -32,7 +33,7 @@ func (m *BootstrapRequest) Reset() { *m = BootstrapRequest{} } func (m *BootstrapRequest) String() string { return proto.CompactTextString(m) } func (*BootstrapRequest) ProtoMessage() {} func (*BootstrapRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_init_5d06be670cd568de, []int{0} + return fileDescriptor_init_5ab0d48daa4548e1, []int{0} } func (m *BootstrapRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -64,7 +65,7 @@ func (m *BootstrapResponse) Reset() { *m = BootstrapResponse{} } func (m *BootstrapResponse) String() string { return proto.CompactTextString(m) } func (*BootstrapResponse) ProtoMessage() {} func (*BootstrapResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_init_5d06be670cd568de, []int{1} + return fileDescriptor_init_5ab0d48daa4548e1, []int{1} } func (m *BootstrapResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -89,9 +90,93 @@ func (m *BootstrapResponse) XXX_DiscardUnknown() { var xxx_messageInfo_BootstrapResponse proto.InternalMessageInfo +// JoinNodeRequest is used to specify to the server node what the client's +// MinimumSupportedVersion is. If it's not compatible with the rest of the +// cluster, the join attempt is refused. +type JoinNodeRequest struct { + MinSupportedVersion *roachpb.Version `protobuf:"bytes,1,opt,name=min_supported_version,json=minSupportedVersion,proto3" json:"min_supported_version,omitempty"` + // TODO(irfansharif): Use this field to provide the client's address so that + // the server is able to reach back to it, setting up bidirectional network + // links. + Addr string `protobuf:"bytes,2,opt,name=addr,proto3" json:"addr,omitempty"` +} + +func (m *JoinNodeRequest) Reset() { *m = JoinNodeRequest{} } +func (m *JoinNodeRequest) String() string { return proto.CompactTextString(m) } +func (*JoinNodeRequest) ProtoMessage() {} +func (*JoinNodeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_init_5ab0d48daa4548e1, []int{2} +} +func (m *JoinNodeRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *JoinNodeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *JoinNodeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_JoinNodeRequest.Merge(dst, src) +} +func (m *JoinNodeRequest) XXX_Size() int { + return m.Size() +} +func (m *JoinNodeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_JoinNodeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_JoinNodeRequest proto.InternalMessageInfo + +// JoinNodeResponse informs the joining node what the cluster id is, and what +// node id was allocated to it. +// +// TODO(irfansharif): We should use this RPC to tell us the right cluster +// version to use (instead of using the minimum possible version and relying on +// gossip to bump to for us). +// TODO(irfansharif): We should use this RPC to also generate store IDs, instead +// of having each node do it for itself after being handed out a node ID. +type JoinNodeResponse struct { + ClusterID []byte `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` + NodeID int32 `protobuf:"varint,2,opt,name=node_id,json=nodeId,proto3" json:"node_id,omitempty"` +} + +func (m *JoinNodeResponse) Reset() { *m = JoinNodeResponse{} } +func (m *JoinNodeResponse) String() string { return proto.CompactTextString(m) } +func (*JoinNodeResponse) ProtoMessage() {} +func (*JoinNodeResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_init_5ab0d48daa4548e1, []int{3} +} +func (m *JoinNodeResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *JoinNodeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *JoinNodeResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_JoinNodeResponse.Merge(dst, src) +} +func (m *JoinNodeResponse) XXX_Size() int { + return m.Size() +} +func (m *JoinNodeResponse) XXX_DiscardUnknown() { + xxx_messageInfo_JoinNodeResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_JoinNodeResponse proto.InternalMessageInfo + func init() { proto.RegisterType((*BootstrapRequest)(nil), "cockroach.server.serverpb.BootstrapRequest") proto.RegisterType((*BootstrapResponse)(nil), "cockroach.server.serverpb.BootstrapResponse") + proto.RegisterType((*JoinNodeRequest)(nil), "cockroach.server.serverpb.JoinNodeRequest") + proto.RegisterType((*JoinNodeResponse)(nil), "cockroach.server.serverpb.JoinNodeResponse") } // Reference imports to suppress errors if they are not otherwise used. @@ -106,8 +191,12 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type InitClient interface { - // Bootstrap an uninitialized cluster. + // Bootstrap an uninitialized cluster (inter-node links set up through the + // --join flags). Bootstrap(ctx context.Context, in *BootstrapRequest, opts ...grpc.CallOption) (*BootstrapResponse, error) + // Join a bootstrapped cluster. If the target node is itself not part of a + // bootstrapped cluster, an appropriate error is returned. + Join(ctx context.Context, in *JoinNodeRequest, opts ...grpc.CallOption) (*JoinNodeResponse, error) } type initClient struct { @@ -127,10 +216,23 @@ func (c *initClient) Bootstrap(ctx context.Context, in *BootstrapRequest, opts . return out, nil } +func (c *initClient) Join(ctx context.Context, in *JoinNodeRequest, opts ...grpc.CallOption) (*JoinNodeResponse, error) { + out := new(JoinNodeResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Init/Join", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // InitServer is the server API for Init service. type InitServer interface { - // Bootstrap an uninitialized cluster. + // Bootstrap an uninitialized cluster (inter-node links set up through the + // --join flags). Bootstrap(context.Context, *BootstrapRequest) (*BootstrapResponse, error) + // Join a bootstrapped cluster. If the target node is itself not part of a + // bootstrapped cluster, an appropriate error is returned. + Join(context.Context, *JoinNodeRequest) (*JoinNodeResponse, error) } func RegisterInitServer(s *grpc.Server, srv InitServer) { @@ -155,6 +257,24 @@ func _Init_Bootstrap_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Init_Join_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(JoinNodeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InitServer).Join(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Init/Join", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InitServer).Join(ctx, req.(*JoinNodeRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Init_serviceDesc = grpc.ServiceDesc{ ServiceName: "cockroach.server.serverpb.Init", HandlerType: (*InitServer)(nil), @@ -163,6 +283,10 @@ var _Init_serviceDesc = grpc.ServiceDesc{ MethodName: "Bootstrap", Handler: _Init_Bootstrap_Handler, }, + { + MethodName: "Join", + Handler: _Init_Join_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "server/serverpb/init.proto", @@ -204,6 +328,69 @@ func (m *BootstrapResponse) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *JoinNodeRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JoinNodeRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.MinSupportedVersion != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintInit(dAtA, i, uint64(m.MinSupportedVersion.Size())) + n1, err := m.MinSupportedVersion.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if len(m.Addr) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintInit(dAtA, i, uint64(len(m.Addr))) + i += copy(dAtA[i:], m.Addr) + } + return i, nil +} + +func (m *JoinNodeResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *JoinNodeResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.ClusterID) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintInit(dAtA, i, uint64(len(m.ClusterID))) + i += copy(dAtA[i:], m.ClusterID) + } + if m.NodeID != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintInit(dAtA, i, uint64(m.NodeID)) + } + return i, nil +} + func encodeVarintInit(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -231,6 +418,39 @@ func (m *BootstrapResponse) Size() (n int) { return n } +func (m *JoinNodeRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.MinSupportedVersion != nil { + l = m.MinSupportedVersion.Size() + n += 1 + l + sovInit(uint64(l)) + } + l = len(m.Addr) + if l > 0 { + n += 1 + l + sovInit(uint64(l)) + } + return n +} + +func (m *JoinNodeResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ClusterID) + if l > 0 { + n += 1 + l + sovInit(uint64(l)) + } + if m.NodeID != 0 { + n += 1 + sovInit(uint64(m.NodeID)) + } + return n +} + func sovInit(x uint64) (n int) { for { n++ @@ -344,6 +564,218 @@ func (m *BootstrapResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *JoinNodeRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JoinNodeRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JoinNodeRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MinSupportedVersion", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthInit + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.MinSupportedVersion == nil { + m.MinSupportedVersion = &roachpb.Version{} + } + if err := m.MinSupportedVersion.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addr", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthInit + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addr = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipInit(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthInit + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *JoinNodeResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: JoinNodeResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: JoinNodeResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ClusterID", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthInit + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ClusterID = append(m.ClusterID[:0], dAtA[iNdEx:postIndex]...) + if m.ClusterID == nil { + m.ClusterID = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType) + } + m.NodeID = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowInit + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NodeID |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipInit(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthInit + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipInit(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -449,19 +881,31 @@ var ( ErrIntOverflowInit = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("server/serverpb/init.proto", fileDescriptor_init_5d06be670cd568de) } - -var fileDescriptor_init_5d06be670cd568de = []byte{ - // 173 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2a, 0x4e, 0x2d, 0x2a, - 0x4b, 0x2d, 0xd2, 0x87, 0x50, 0x05, 0x49, 0xfa, 0x99, 0x79, 0x99, 0x25, 0x7a, 0x05, 0x45, 0xf9, - 0x25, 0xf9, 0x42, 0x92, 0xc9, 0xf9, 0xc9, 0xd9, 0x45, 0xf9, 0x89, 0xc9, 0x19, 0x7a, 0x10, 0x69, - 0x3d, 0x98, 0x2a, 0x25, 0x21, 0x2e, 0x01, 0xa7, 0xfc, 0xfc, 0x92, 0xe2, 0x92, 0xa2, 0xc4, 0x82, - 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0x25, 0x61, 0x2e, 0x41, 0x24, 0xb1, 0xe2, 0x82, 0xfc, - 0xbc, 0xe2, 0x54, 0xa3, 0x02, 0x2e, 0x16, 0xcf, 0xbc, 0xcc, 0x12, 0xa1, 0x0c, 0x2e, 0x4e, 0xb8, - 0xa4, 0x90, 0xb6, 0x1e, 0x4e, 0x93, 0xf5, 0xd0, 0x8d, 0x95, 0xd2, 0x21, 0x4e, 0x31, 0xc4, 0x3e, - 0x25, 0x06, 0x27, 0xad, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, - 0xf1, 0xc6, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, - 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x28, 0x0e, 0x98, 0xfe, 0x24, 0x36, 0xb0, 0x47, 0x8d, 0x01, - 0x01, 0x00, 0x00, 0xff, 0xff, 0x90, 0x15, 0x22, 0x79, 0x06, 0x01, 0x00, 0x00, +func init() { proto.RegisterFile("server/serverpb/init.proto", fileDescriptor_init_5ab0d48daa4548e1) } + +var fileDescriptor_init_5ab0d48daa4548e1 = []byte{ + // 366 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x4f, 0x6e, 0xe2, 0x30, + 0x18, 0xc5, 0xe3, 0x11, 0xc3, 0x4c, 0x3c, 0x33, 0x1a, 0x6a, 0xda, 0x8a, 0x66, 0x61, 0x50, 0xba, + 0x41, 0x80, 0x12, 0x89, 0xde, 0x20, 0x65, 0x93, 0x2e, 0x58, 0xa4, 0x52, 0x17, 0xdd, 0xa0, 0x10, + 0x5b, 0x60, 0xb5, 0xd8, 0xa9, 0xed, 0x70, 0x8e, 0x1e, 0x8b, 0x25, 0x52, 0x37, 0xac, 0x50, 0x1b, + 0x2e, 0x52, 0xe5, 0x5f, 0xa9, 0x90, 0x5a, 0xb1, 0x8a, 0xf5, 0xfc, 0x7b, 0x7e, 0xdf, 0xf7, 0x14, + 0x68, 0x29, 0x2a, 0x97, 0x54, 0xba, 0xc5, 0x27, 0x9e, 0xba, 0x8c, 0x33, 0xed, 0xc4, 0x52, 0x68, + 0x81, 0x2e, 0x22, 0x11, 0x3d, 0x48, 0x11, 0x46, 0x73, 0xa7, 0xb8, 0x76, 0x2a, 0xca, 0x3a, 0x9d, + 0x89, 0x99, 0xc8, 0x29, 0x37, 0x3b, 0x15, 0x06, 0xeb, 0x3c, 0x87, 0xe3, 0xa9, 0xbb, 0xa0, 0x3a, + 0x24, 0xa1, 0x0e, 0x0b, 0xdd, 0x46, 0xb0, 0xe1, 0x09, 0xa1, 0x95, 0x96, 0x61, 0x1c, 0xd0, 0xa7, + 0x84, 0x2a, 0x6d, 0x37, 0xe1, 0xc9, 0x27, 0x4d, 0xc5, 0x82, 0x2b, 0x6a, 0x27, 0xf0, 0xff, 0x8d, + 0x60, 0x7c, 0x2c, 0x08, 0x2d, 0x39, 0x34, 0x86, 0x67, 0x0b, 0xc6, 0x27, 0x2a, 0x89, 0x63, 0x21, + 0x35, 0x25, 0x93, 0x25, 0x95, 0x8a, 0x09, 0xde, 0x02, 0x1d, 0xd0, 0xfd, 0x33, 0xb4, 0x9c, 0xfd, + 0x90, 0x65, 0xba, 0x73, 0x57, 0x10, 0x41, 0x73, 0xc1, 0xf8, 0x6d, 0xe5, 0x2b, 0x45, 0x84, 0x60, + 0x2d, 0x24, 0x44, 0xb6, 0x7e, 0x74, 0x40, 0xd7, 0x0c, 0xf2, 0xb3, 0x4d, 0x61, 0x63, 0x1f, 0x5b, + 0x8c, 0x82, 0x06, 0x10, 0x46, 0x8f, 0x89, 0xd2, 0x54, 0x4e, 0x18, 0xc9, 0xc3, 0xfe, 0x7a, 0xff, + 0xd2, 0x6d, 0xdb, 0xbc, 0x2e, 0x54, 0x7f, 0x14, 0x98, 0x25, 0xe0, 0x13, 0x74, 0x09, 0x7f, 0x71, + 0x41, 0x68, 0x86, 0x66, 0x0f, 0xff, 0xf4, 0x60, 0xba, 0x6d, 0xd7, 0xb3, 0x07, 0xfd, 0x51, 0x50, + 0xcf, 0xae, 0x7c, 0x32, 0x7c, 0x01, 0xb0, 0xe6, 0x73, 0xa6, 0xd1, 0x1c, 0x9a, 0x1f, 0xbb, 0xa3, + 0xbe, 0xf3, 0x65, 0xcd, 0xce, 0x61, 0x6b, 0xd6, 0xe0, 0x38, 0xb8, 0xac, 0xd3, 0x40, 0x21, 0xac, + 0x65, 0x9b, 0xa1, 0xde, 0x37, 0xbe, 0x83, 0xc6, 0xad, 0xfe, 0x51, 0x6c, 0x15, 0xe1, 0xf5, 0x56, + 0x6f, 0xd8, 0x58, 0xa5, 0x18, 0xac, 0x53, 0x0c, 0x36, 0x29, 0x06, 0xaf, 0x29, 0x06, 0xcf, 0x3b, + 0x6c, 0xac, 0x77, 0xd8, 0xd8, 0xec, 0xb0, 0x71, 0xff, 0xbb, 0xb2, 0x4f, 0xeb, 0xf9, 0xff, 0x70, + 0xf5, 0x1e, 0x00, 0x00, 0xff, 0xff, 0x53, 0x52, 0x9e, 0x7f, 0x76, 0x02, 0x00, 0x00, } diff --git a/pkg/server/serverpb/init.proto b/pkg/server/serverpb/init.proto index 491115f9a502..c963f9b5cfe1 100644 --- a/pkg/server/serverpb/init.proto +++ b/pkg/server/serverpb/init.proto @@ -12,14 +12,43 @@ syntax = "proto3"; package cockroach.server.serverpb; option go_package = "serverpb"; -message BootstrapRequest { +import "gogoproto/gogo.proto"; +import "roachpb/metadata.proto"; + +message BootstrapRequest { } +message BootstrapResponse { } + +// JoinNodeRequest is used to specify to the server node what the client's +// MinimumSupportedVersion is. If it's not compatible with the rest of the +// cluster, the join attempt is refused. +message JoinNodeRequest { + roachpb.Version min_supported_version = 1; + + // TODO(irfansharif): Use this field to provide the client's address so that + // the server is able to reach back to it, setting up bidirectional network + // links. + string addr = 2; } -message BootstrapResponse { +// JoinNodeResponse informs the joining node what the cluster id is, and what +// node id was allocated to it. +// +// TODO(irfansharif): We should use this RPC to tell us the right cluster +// version to use (instead of using the minimum possible version and relying on +// gossip to bump to for us). +// TODO(irfansharif): We should use this RPC to also generate store IDs, instead +// of having each node do it for itself after being handed out a node ID. +message JoinNodeResponse { + bytes cluster_id = 1 [(gogoproto.customname) = "ClusterID"]; + int32 node_id = 2 [(gogoproto.customname) = "NodeID"]; } service Init { - // Bootstrap an uninitialized cluster. - rpc Bootstrap(BootstrapRequest) returns (BootstrapResponse) { - } + // Bootstrap an uninitialized cluster (inter-node links set up through the + // --join flags). + rpc Bootstrap(BootstrapRequest) returns (BootstrapResponse) { } + + // Join a bootstrapped cluster. If the target node is itself not part of a + // bootstrapped cluster, an appropriate error is returned. + rpc Join(JoinNodeRequest) returns (JoinNodeResponse) { } } diff --git a/pkg/util/grpcutil/log.go b/pkg/util/grpcutil/log.go index 19fea0454d9d..1bbfb18fd4f2 100644 --- a/pkg/util/grpcutil/log.go +++ b/pkg/util/grpcutil/log.go @@ -72,7 +72,7 @@ func (severity *logger) Warning(args ...interface{}) { if log.Severity(*severity) > log.Severity_WARNING { return } - if shouldPrint(connectivitySpamRe, 30*time.Second, args...) { + if shouldPrint(ConnectivitySpamRe, 30*time.Second, args...) { log.WarningfDepth(context.TODO(), 2, "", args...) } } @@ -146,7 +146,7 @@ func (severity *logger) V(i int) bool { // https://github.com/grpc/grpc-go/blob/v1.29.1/clientconn.go#L1275 var ( transportFailedRe = regexp.MustCompile(`^` + regexp.QuoteMeta(`grpc: addrConn.createTransport failed to connect to`)) - connectionRefusedRe = regexp.MustCompile( + ConnectionRefusedRe = regexp.MustCompile( strings.Join([]string{ // *nix regexp.QuoteMeta("connection refused"), @@ -159,8 +159,8 @@ var ( ) clientConnReuseRe = regexp.MustCompile("cannot reuse client connection") - connectivitySpamRe = regexp.MustCompile(transportFailedRe.String() + `.*` + - "(" + connectionRefusedRe.String() + "|" + clientConnReuseRe.String() + ")") + ConnectivitySpamRe = regexp.MustCompile(transportFailedRe.String() + `.*` + + "(" + ConnectionRefusedRe.String() + "|" + clientConnReuseRe.String() + ")") ) var spamMu = struct {