diff --git a/pkg/server/init.go b/pkg/server/init.go index 887727097cea..f915b81f7657 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -39,12 +39,12 @@ import ( // a node that is already part of an initialized cluster. var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized") -// ErrJoinRPCUnsupported is reported when the Join RPC is run against +// errJoinRPCUnsupported is reported when the Join RPC is run against // a node that does not know about the join RPC (i.e. it is running 20.1 or // below). // // TODO(irfansharif): Remove this in 21.1. -var ErrJoinRPCUnsupported = fmt.Errorf("node does not support the Join RPC") +var errJoinRPCUnsupported = fmt.Errorf("node does not support the Join RPC") // ErrIncompatibleBinaryVersion is returned when a CRDB node with a binary version X // attempts to join a cluster with an active version that's higher. This is not @@ -84,9 +84,6 @@ type initServer struct { // If this CRDB node was `cockroach init`-ialized, the resulting init state // will be passed through to this channel. bootstrapReqCh chan *initState - // If this CRDB node was able to join a bootstrapped cluster, the resulting - // init state will be passed through to this channel. - joinCh chan *initState } func newInitServer( @@ -95,7 +92,6 @@ func newInitServer( s := &initServer{ AmbientContext: actx, bootstrapReqCh: make(chan *initState, 1), - joinCh: make(chan *initState, 1), config: config, } s.mu.inspectState = inspectState @@ -154,6 +150,13 @@ func (s *initServer) needsInitLocked() bool { return len(s.mu.inspectState.initializedEngines) == 0 } +// joinResult is used to represent the result of a node attempting to join +// an already bootstrapped cluster. +type joinResult struct { + state *initState + err error +} + // ServeAndWait waits until the server is initialized, i.e. has a cluster ID, // node ID and has permission to join the cluster. In the common case of // restarting an existing node, this immediately returns. When starting with a @@ -210,12 +213,18 @@ func (s *initServer) ServeAndWait( var wg sync.WaitGroup wg.Add(1) - errCh := make(chan error, 1) + // If this CRDB node was able to join a bootstrapped cluster, the resulting + // init state will be passed through to this channel. + joinCh := make(chan joinResult, 1) if err := stopper.RunTask(joinCtx, "init server: join loop", func(joinCtx context.Context) { stopper.RunWorker(joinCtx, func(joinCtx context.Context) { defer wg.Done() - errCh <- s.startJoinLoop(joinCtx, stopper) + state, err := s.startJoinLoop(joinCtx, stopper) + joinCh <- joinResult{ + state: state, + err: err, + } }) }); err != nil { return nil, false, err @@ -263,10 +272,36 @@ func (s *initServer) ServeAndWait( s.mu.Unlock() return state, true, nil - case state := <-s.joinCh: + case result := <-joinCh: // Ensure we're draining out the join attempt. wg.Wait() - <-errCh + + if err := result.err; err != nil { + if errors.Is(err, errJoinRPCUnsupported) { + // We're in a mixed-version cluster, we start gossip and wire up + // the gossip connectivity mechanism to discover the cluster ID. + g = startGossipFn() + gossipConnectedCh = g.Connected + + // Let's nil out joinCh to prevent accidental re-use. + close(joinCh) + joinCh = nil + + continue + } + + if errors.Is(err, ErrIncompatibleBinaryVersion) { + return nil, false, err + } + + if err != nil { + // We expect the join RPC to blindly retry on all errors + // save for the two above. This should be unreachable code. + return nil, false, errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error: %v", err) + } + } + + state := result.state // TODO(irfansharif): We can try and initialize the the version // setting to the active cluster version, in the same way we do when @@ -311,25 +346,6 @@ func (s *initServer) ServeAndWait( } log.Infof(ctx, "joined cluster %s through gossip (legacy behavior)", state.clusterID) return state, true, nil - case err := <-errCh: - // We won't return from here in the happy path; we'll let the join - // switch block do it for us. - - if errors.Is(err, ErrJoinRPCUnsupported) { - // We're in a mixed-version cluster, we start gossip and wire up - // the gossip connectivity mechanism to discover the cluster ID. - g = startGossipFn() - gossipConnectedCh = g.Connected - continue - } - - if errors.Is(err, ErrIncompatibleBinaryVersion) { - return nil, false, err - } - - if err != nil { - log.Fatalf(ctx, "unexpected error: %s", err.Error()) - } case <-stopper.ShouldQuiesce(): return nil, false, stop.ErrUnavailable } @@ -377,17 +393,50 @@ func (s *initServer) Bootstrap( // startJoinLoop continuously tries connecting to nodes specified in the join // list in order to determine what the cluster ID is, and to be allocated a -// node+store ID. It can return ErrJoinRPCUnsupported, in which case the caller +// node+store ID. It can return errJoinRPCUnsupported, in which case the caller // is expected to fall back to the gossip-based cluster ID discovery mechanism. // It can also fail with ErrIncompatibleBinaryVersion, in which case we know we're // running a binary that's too old to join the rest of the cluster. -func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) error { +func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (*initState, error) { if len(s.config.resolvers) == 0 { // We're pointing to only ourselves, which is probably indicative of a // node that's going to be bootstrapped by the operator. We could opt to // not fall back to the gossip based connectivity mechanism, but we do // it anyway. - return ErrJoinRPCUnsupported + return nil, errJoinRPCUnsupported + } + + // Iterate through all the resolvers at least once to reduce time taken to + // cluster convergence. Keep this code block roughly in sync with the one + // below. + for _, res := range s.config.resolvers { + select { + case <-ctx.Done(): + return nil, context.Canceled + case <-stopper.ShouldQuiesce(): + return nil, stop.ErrUnavailable + default: + } + + addr := res.Addr() + state, err := s.attemptJoinTo(ctx, res.Addr()) + if err == nil { + return state, nil + } + + if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { + // Propagate upwards; these are error conditions the caller knows to + // expect. + return nil, err + } + + if IsWaitingForInit(err) { + log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) + } else { + log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + } + + // Try the next node if unsuccessful. } const joinRPCBackoff = time.Second @@ -402,40 +451,42 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) e addr := s.config.resolvers[idx].Addr() select { case <-tickChan: - err := s.attemptJoin(ctx, addr) - if errors.Is(err, ErrJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { + state, err := s.attemptJoinTo(ctx, addr) + if err == nil { + return state, nil + } + + if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { // Propagate upwards; these are error conditions the caller // knows to expect. - return err + return nil, err } - if err != nil { - // TODO(irfansharif): If startup logging gets too spammy, we - // could match against connection errors to generate nicer - // logging. See grpcutil.connectionRefusedRe. - - if IsWaitingForInit(err) { - log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) - } else { - log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) - } + // Blindly retry for all other errors, logging them for visibility. - // Blindly retry on error. - continue + // TODO(irfansharif): If startup logging gets too spammy, we + // could match against connection errors to generate nicer + // logging. See grpcutil.connectionRefusedRe. + + if IsWaitingForInit(err) { + log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) + } else { + log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) } - return nil case <-ctx.Done(): - return nil + return nil, context.Canceled case <-stopper.ShouldQuiesce(): - return nil + return nil, stop.ErrUnavailable } } } -func (s *initServer) attemptJoin(ctx context.Context, addr string) error { +// attemptJoinTo attempts to join to the node running at the given address. If +// successful, an initState is returned. +func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState, error) { conn, err := grpc.DialContext(ctx, addr, s.config.dialOpts...) if err != nil { - return err + return nil, err } defer func() { @@ -451,32 +502,37 @@ func (s *initServer) attemptJoin(ctx context.Context, addr string) error { resp, err := initClient.Join(ctx, req) if err != nil { // If the target node does not implement the Join RPC, or explicitly - // returns ErrJoinRPCUnsupported (because the cluster version + // returns errJoinRPCUnsupported (because the cluster version // introducing its usage is not yet active), we error out so the init // server knows to fall back on the gossip-based discovery mechanism for // the clusterID. status, ok := grpcstatus.FromError(errors.UnwrapAll(err)) if !ok { - return err + return nil, err } + // TODO(irfansharif): Here we're logging the error and also returning + // it. We should wrap the logged message with the right error instead. + // The caller code, as written, switches on the error type; that'll need + // to be changed as well. + if status.Code() == codes.Unimplemented { log.Infof(ctx, "%s running an older version; falling back to gossip-based cluster join", addr) - return ErrJoinRPCUnsupported + return nil, errJoinRPCUnsupported } if status.Code() == codes.PermissionDenied { log.Infof(ctx, "%s is running a version higher than our binary version %s", addr, req.BinaryVersion.String()) - return ErrIncompatibleBinaryVersion + return nil, ErrIncompatibleBinaryVersion } - return err + return nil, err } clusterID, err := uuid.FromBytes(resp.ClusterID) if err != nil { - return err + return nil, err } s.mu.Lock() @@ -491,8 +547,7 @@ func (s *initServer) attemptJoin(ctx context.Context, addr string) error { firstStoreID: roachpb.StoreID(resp.StoreID), } - s.joinCh <- state - return nil + return state, nil } func (s *initServer) tryBootstrapLocked(ctx context.Context) (*initState, error) {