From 9f9130fc9e673e86ecd3e2e4a37a48ba8b926f91 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 31 Aug 2020 17:58:51 -0400 Subject: [PATCH 1/2] server: improve error handling in join rpc Previously we used two separate channels to relay the results of using the join RPC, one for possible error in attempting to do and one for the actual result if successful. There was no real reason for this, and the code gluing them together was a bit fragile (and hid a latent bug). Previously we could deadlock if the select loop in ServeAndWait, for a successful join attempt, drained the errCh before draining joinCh. Because the successful joinCh drain then followed up with trying to drain the errCh (already drained by now), it would just block indefinitely. We just simplify all this to using one channel to encapsulate the results of the join attempt. Release justification: bug fixes and low-risk updates to new functionality Release note: None --- pkg/server/init.go | 142 ++++++++++++++++++++++++++------------------- 1 file changed, 82 insertions(+), 60 deletions(-) diff --git a/pkg/server/init.go b/pkg/server/init.go index 887727097cea..d2932c8f595e 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -39,12 +39,12 @@ import ( // a node that is already part of an initialized cluster. var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized") -// ErrJoinRPCUnsupported is reported when the Join RPC is run against +// errJoinRPCUnsupported is reported when the Join RPC is run against // a node that does not know about the join RPC (i.e. it is running 20.1 or // below). // // TODO(irfansharif): Remove this in 21.1. -var ErrJoinRPCUnsupported = fmt.Errorf("node does not support the Join RPC") +var errJoinRPCUnsupported = fmt.Errorf("node does not support the Join RPC") // ErrIncompatibleBinaryVersion is returned when a CRDB node with a binary version X // attempts to join a cluster with an active version that's higher. This is not @@ -84,9 +84,6 @@ type initServer struct { // If this CRDB node was `cockroach init`-ialized, the resulting init state // will be passed through to this channel. bootstrapReqCh chan *initState - // If this CRDB node was able to join a bootstrapped cluster, the resulting - // init state will be passed through to this channel. - joinCh chan *initState } func newInitServer( @@ -95,7 +92,6 @@ func newInitServer( s := &initServer{ AmbientContext: actx, bootstrapReqCh: make(chan *initState, 1), - joinCh: make(chan *initState, 1), config: config, } s.mu.inspectState = inspectState @@ -154,6 +150,13 @@ func (s *initServer) needsInitLocked() bool { return len(s.mu.inspectState.initializedEngines) == 0 } +// joinResult is used to represent the result of a node attempting to join +// an already bootstrapped cluster. +type joinResult struct { + state *initState + err error +} + // ServeAndWait waits until the server is initialized, i.e. has a cluster ID, // node ID and has permission to join the cluster. In the common case of // restarting an existing node, this immediately returns. When starting with a @@ -210,12 +213,18 @@ func (s *initServer) ServeAndWait( var wg sync.WaitGroup wg.Add(1) - errCh := make(chan error, 1) + // If this CRDB node was able to join a bootstrapped cluster, the resulting + // init state will be passed through to this channel. + joinCh := make(chan joinResult, 1) if err := stopper.RunTask(joinCtx, "init server: join loop", func(joinCtx context.Context) { stopper.RunWorker(joinCtx, func(joinCtx context.Context) { defer wg.Done() - errCh <- s.startJoinLoop(joinCtx, stopper) + state, err := s.startJoinLoop(joinCtx, stopper) + joinCh <- joinResult{ + state: state, + err: err, + } }) }); err != nil { return nil, false, err @@ -263,10 +272,36 @@ func (s *initServer) ServeAndWait( s.mu.Unlock() return state, true, nil - case state := <-s.joinCh: + case result := <-joinCh: // Ensure we're draining out the join attempt. wg.Wait() - <-errCh + + if err := result.err; err != nil { + if errors.Is(err, errJoinRPCUnsupported) { + // We're in a mixed-version cluster, we start gossip and wire up + // the gossip connectivity mechanism to discover the cluster ID. + g = startGossipFn() + gossipConnectedCh = g.Connected + + // Let's nil out joinCh to prevent accidental re-use. + close(joinCh) + joinCh = nil + + continue + } + + if errors.Is(err, ErrIncompatibleBinaryVersion) { + return nil, false, err + } + + if err != nil { + // We expect the join RPC to blindly retry on all errors + // save for the two above. This should be unreachable code. + return nil, false, errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error: %v", err) + } + } + + state := result.state // TODO(irfansharif): We can try and initialize the the version // setting to the active cluster version, in the same way we do when @@ -311,25 +346,6 @@ func (s *initServer) ServeAndWait( } log.Infof(ctx, "joined cluster %s through gossip (legacy behavior)", state.clusterID) return state, true, nil - case err := <-errCh: - // We won't return from here in the happy path; we'll let the join - // switch block do it for us. - - if errors.Is(err, ErrJoinRPCUnsupported) { - // We're in a mixed-version cluster, we start gossip and wire up - // the gossip connectivity mechanism to discover the cluster ID. - g = startGossipFn() - gossipConnectedCh = g.Connected - continue - } - - if errors.Is(err, ErrIncompatibleBinaryVersion) { - return nil, false, err - } - - if err != nil { - log.Fatalf(ctx, "unexpected error: %s", err.Error()) - } case <-stopper.ShouldQuiesce(): return nil, false, stop.ErrUnavailable } @@ -377,17 +393,17 @@ func (s *initServer) Bootstrap( // startJoinLoop continuously tries connecting to nodes specified in the join // list in order to determine what the cluster ID is, and to be allocated a -// node+store ID. It can return ErrJoinRPCUnsupported, in which case the caller +// node+store ID. It can return errJoinRPCUnsupported, in which case the caller // is expected to fall back to the gossip-based cluster ID discovery mechanism. // It can also fail with ErrIncompatibleBinaryVersion, in which case we know we're // running a binary that's too old to join the rest of the cluster. -func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) error { +func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (*initState, error) { if len(s.config.resolvers) == 0 { // We're pointing to only ourselves, which is probably indicative of a // node that's going to be bootstrapped by the operator. We could opt to // not fall back to the gossip based connectivity mechanism, but we do // it anyway. - return ErrJoinRPCUnsupported + return nil, errJoinRPCUnsupported } const joinRPCBackoff = time.Second @@ -402,40 +418,42 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) e addr := s.config.resolvers[idx].Addr() select { case <-tickChan: - err := s.attemptJoin(ctx, addr) - if errors.Is(err, ErrJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { + state, err := s.attemptJoinTo(ctx, addr) + if err == nil { + return state, nil + } + + if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { // Propagate upwards; these are error conditions the caller // knows to expect. - return err + return nil, err } - if err != nil { - // TODO(irfansharif): If startup logging gets too spammy, we - // could match against connection errors to generate nicer - // logging. See grpcutil.connectionRefusedRe. - - if IsWaitingForInit(err) { - log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) - } else { - log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) - } + // Blindly retry for all other errors, logging them for visibility. + + // TODO(irfansharif): If startup logging gets too spammy, we + // could match against connection errors to generate nicer + // logging. See grpcutil.connectionRefusedRe. - // Blindly retry on error. - continue + if IsWaitingForInit(err) { + log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) + } else { + log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) } - return nil case <-ctx.Done(): - return nil + return nil, context.Canceled case <-stopper.ShouldQuiesce(): - return nil + return nil, stop.ErrUnavailable } } } -func (s *initServer) attemptJoin(ctx context.Context, addr string) error { +// attemptJoinTo attempts to join to the node running at the given address. If +// successful, an initState is returned. +func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState, error) { conn, err := grpc.DialContext(ctx, addr, s.config.dialOpts...) if err != nil { - return err + return nil, err } defer func() { @@ -451,32 +469,37 @@ func (s *initServer) attemptJoin(ctx context.Context, addr string) error { resp, err := initClient.Join(ctx, req) if err != nil { // If the target node does not implement the Join RPC, or explicitly - // returns ErrJoinRPCUnsupported (because the cluster version + // returns errJoinRPCUnsupported (because the cluster version // introducing its usage is not yet active), we error out so the init // server knows to fall back on the gossip-based discovery mechanism for // the clusterID. status, ok := grpcstatus.FromError(errors.UnwrapAll(err)) if !ok { - return err + return nil, err } + // TODO(irfansharif): Here we're logging the error and also returning + // it. We should wrap the logged message with the right error instead. + // The caller code, as written, switches on the error type; that'll need + // to be changed as well. + if status.Code() == codes.Unimplemented { log.Infof(ctx, "%s running an older version; falling back to gossip-based cluster join", addr) - return ErrJoinRPCUnsupported + return nil, errJoinRPCUnsupported } if status.Code() == codes.PermissionDenied { log.Infof(ctx, "%s is running a version higher than our binary version %s", addr, req.BinaryVersion.String()) - return ErrIncompatibleBinaryVersion + return nil, ErrIncompatibleBinaryVersion } - return err + return nil, err } clusterID, err := uuid.FromBytes(resp.ClusterID) if err != nil { - return err + return nil, err } s.mu.Lock() @@ -491,8 +514,7 @@ func (s *initServer) attemptJoin(ctx context.Context, addr string) error { firstStoreID: roachpb.StoreID(resp.StoreID), } - s.joinCh <- state - return nil + return state, nil } func (s *initServer) tryBootstrapLocked(ctx context.Context) (*initState, error) { From c6bc053a676ee8dceefeb072d0b17cc5f5df03c4 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 31 Aug 2020 17:58:51 -0400 Subject: [PATCH 2/2] server: busy loop through resolver list during join process Deferred doing this in #52526. Probably a good idea to do have it, it'll bring down the cluster convergence time (time taken for all nodes to find out about the initialization) by a bit. Release justification: low risk, high benefit changes to existing functionality Release note: None --- pkg/server/init.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/pkg/server/init.go b/pkg/server/init.go index d2932c8f595e..f915b81f7657 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -406,6 +406,39 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( return nil, errJoinRPCUnsupported } + // Iterate through all the resolvers at least once to reduce time taken to + // cluster convergence. Keep this code block roughly in sync with the one + // below. + for _, res := range s.config.resolvers { + select { + case <-ctx.Done(): + return nil, context.Canceled + case <-stopper.ShouldQuiesce(): + return nil, stop.ErrUnavailable + default: + } + + addr := res.Addr() + state, err := s.attemptJoinTo(ctx, res.Addr()) + if err == nil { + return state, nil + } + + if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { + // Propagate upwards; these are error conditions the caller knows to + // expect. + return nil, err + } + + if IsWaitingForInit(err) { + log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) + } else { + log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + } + + // Try the next node if unsuccessful. + } + const joinRPCBackoff = time.Second var tickChan <-chan time.Time {