Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: busy loop through resolver list during join process #53713

Merged
merged 2 commits into from
Sep 4, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 115 additions & 60 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ import (
// a node that is already part of an initialized cluster.
var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized")

// ErrJoinRPCUnsupported is reported when the Join RPC is run against
// errJoinRPCUnsupported is reported when the Join RPC is run against
// a node that does not know about the join RPC (i.e. it is running 20.1 or
// below).
//
// TODO(irfansharif): Remove this in 21.1.
var ErrJoinRPCUnsupported = fmt.Errorf("node does not support the Join RPC")
var errJoinRPCUnsupported = fmt.Errorf("node does not support the Join RPC")

// ErrIncompatibleBinaryVersion is returned when a CRDB node with a binary version X
// attempts to join a cluster with an active version that's higher. This is not
Expand Down Expand Up @@ -84,9 +84,6 @@ type initServer struct {
// If this CRDB node was `cockroach init`-ialized, the resulting init state
// will be passed through to this channel.
bootstrapReqCh chan *initState
// If this CRDB node was able to join a bootstrapped cluster, the resulting
// init state will be passed through to this channel.
joinCh chan *initState
}

func newInitServer(
Expand All @@ -95,7 +92,6 @@ func newInitServer(
s := &initServer{
AmbientContext: actx,
bootstrapReqCh: make(chan *initState, 1),
joinCh: make(chan *initState, 1),
config: config,
}
s.mu.inspectState = inspectState
Expand Down Expand Up @@ -154,6 +150,13 @@ func (s *initServer) needsInitLocked() bool {
return len(s.mu.inspectState.initializedEngines) == 0
}

// joinResult is used to represent the result of a node attempting to join
// an already bootstrapped cluster.
type joinResult struct {
state *initState
err error
}

// ServeAndWait waits until the server is initialized, i.e. has a cluster ID,
// node ID and has permission to join the cluster. In the common case of
// restarting an existing node, this immediately returns. When starting with a
Expand Down Expand Up @@ -210,12 +213,18 @@ func (s *initServer) ServeAndWait(

var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
// If this CRDB node was able to join a bootstrapped cluster, the resulting
// init state will be passed through to this channel.
joinCh := make(chan joinResult, 1)
if err := stopper.RunTask(joinCtx, "init server: join loop", func(joinCtx context.Context) {
stopper.RunWorker(joinCtx, func(joinCtx context.Context) {
defer wg.Done()

errCh <- s.startJoinLoop(joinCtx, stopper)
state, err := s.startJoinLoop(joinCtx, stopper)
joinCh <- joinResult{
state: state,
err: err,
}
})
}); err != nil {
return nil, false, err
Expand Down Expand Up @@ -263,10 +272,36 @@ func (s *initServer) ServeAndWait(
s.mu.Unlock()

return state, true, nil
case state := <-s.joinCh:
case result := <-joinCh:
// Ensure we're draining out the join attempt.
wg.Wait()
<-errCh

if err := result.err; err != nil {
if errors.Is(err, errJoinRPCUnsupported) {
// We're in a mixed-version cluster, we start gossip and wire up
// the gossip connectivity mechanism to discover the cluster ID.
g = startGossipFn()
gossipConnectedCh = g.Connected

// Let's nil out joinCh to prevent accidental re-use.
close(joinCh)
joinCh = nil

continue
}

if errors.Is(err, ErrIncompatibleBinaryVersion) {
return nil, false, err
}

if err != nil {
// We expect the join RPC to blindly retry on all errors
// save for the two above. This should be unreachable code.
return nil, false, errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error: %v", err)
}
}

state := result.state

// TODO(irfansharif): We can try and initialize the the version
// setting to the active cluster version, in the same way we do when
Expand Down Expand Up @@ -311,25 +346,6 @@ func (s *initServer) ServeAndWait(
}
log.Infof(ctx, "joined cluster %s through gossip (legacy behavior)", state.clusterID)
return state, true, nil
case err := <-errCh:
// We won't return from here in the happy path; we'll let the join
// switch block do it for us.

if errors.Is(err, ErrJoinRPCUnsupported) {
// We're in a mixed-version cluster, we start gossip and wire up
// the gossip connectivity mechanism to discover the cluster ID.
g = startGossipFn()
gossipConnectedCh = g.Connected
continue
}

if errors.Is(err, ErrIncompatibleBinaryVersion) {
return nil, false, err
}

if err != nil {
log.Fatalf(ctx, "unexpected error: %s", err.Error())
}
case <-stopper.ShouldQuiesce():
return nil, false, stop.ErrUnavailable
}
Expand Down Expand Up @@ -377,17 +393,50 @@ func (s *initServer) Bootstrap(

// startJoinLoop continuously tries connecting to nodes specified in the join
// list in order to determine what the cluster ID is, and to be allocated a
// node+store ID. It can return ErrJoinRPCUnsupported, in which case the caller
// node+store ID. It can return errJoinRPCUnsupported, in which case the caller
// is expected to fall back to the gossip-based cluster ID discovery mechanism.
// It can also fail with ErrIncompatibleBinaryVersion, in which case we know we're
// running a binary that's too old to join the rest of the cluster.
func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) error {
func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (*initState, error) {
if len(s.config.resolvers) == 0 {
// We're pointing to only ourselves, which is probably indicative of a
// node that's going to be bootstrapped by the operator. We could opt to
// not fall back to the gossip based connectivity mechanism, but we do
// it anyway.
return ErrJoinRPCUnsupported
return nil, errJoinRPCUnsupported
}

// Iterate through all the resolvers at least once to reduce time taken to
// cluster convergence. Keep this code block roughly in sync with the one
// below.
for _, res := range s.config.resolvers {
select {
case <-ctx.Done():
return nil, context.Canceled
case <-stopper.ShouldQuiesce():
return nil, stop.ErrUnavailable
default:
}

addr := res.Addr()
state, err := s.attemptJoinTo(ctx, res.Addr())
if err == nil {
return state, nil
}

if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) {
// Propagate upwards; these are error conditions the caller knows to
// expect.
return nil, err
}

if IsWaitingForInit(err) {
log.Warningf(ctx, "%s is itself waiting for init, will retry", addr)
} else {
log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error())
}

// Try the next node if unsuccessful.
}

const joinRPCBackoff = time.Second
Expand All @@ -402,40 +451,42 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) e
addr := s.config.resolvers[idx].Addr()
select {
case <-tickChan:
err := s.attemptJoin(ctx, addr)
if errors.Is(err, ErrJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) {
state, err := s.attemptJoinTo(ctx, addr)
if err == nil {
return state, nil
}

if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) {
// Propagate upwards; these are error conditions the caller
// knows to expect.
return err
return nil, err
}

if err != nil {
// TODO(irfansharif): If startup logging gets too spammy, we
// could match against connection errors to generate nicer
// logging. See grpcutil.connectionRefusedRe.

if IsWaitingForInit(err) {
log.Warningf(ctx, "%s is itself waiting for init, will retry", addr)
} else {
log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error())
}
// Blindly retry for all other errors, logging them for visibility.

// Blindly retry on error.
continue
// TODO(irfansharif): If startup logging gets too spammy, we
// could match against connection errors to generate nicer
// logging. See grpcutil.connectionRefusedRe.

if IsWaitingForInit(err) {
log.Warningf(ctx, "%s is itself waiting for init, will retry", addr)
} else {
log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error())
}
return nil
case <-ctx.Done():
return nil
return nil, context.Canceled
case <-stopper.ShouldQuiesce():
return nil
return nil, stop.ErrUnavailable
}
}
}

func (s *initServer) attemptJoin(ctx context.Context, addr string) error {
// attemptJoinTo attempts to join to the node running at the given address. If
// successful, an initState is returned.
func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState, error) {
conn, err := grpc.DialContext(ctx, addr, s.config.dialOpts...)
if err != nil {
return err
return nil, err
}

defer func() {
Expand All @@ -451,32 +502,37 @@ func (s *initServer) attemptJoin(ctx context.Context, addr string) error {
resp, err := initClient.Join(ctx, req)
if err != nil {
// If the target node does not implement the Join RPC, or explicitly
// returns ErrJoinRPCUnsupported (because the cluster version
// returns errJoinRPCUnsupported (because the cluster version
// introducing its usage is not yet active), we error out so the init
// server knows to fall back on the gossip-based discovery mechanism for
// the clusterID.

status, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok {
return err
return nil, err
}

// TODO(irfansharif): Here we're logging the error and also returning
// it. We should wrap the logged message with the right error instead.
// The caller code, as written, switches on the error type; that'll need
// to be changed as well.

if status.Code() == codes.Unimplemented {
log.Infof(ctx, "%s running an older version; falling back to gossip-based cluster join", addr)
return ErrJoinRPCUnsupported
return nil, errJoinRPCUnsupported
}

if status.Code() == codes.PermissionDenied {
log.Infof(ctx, "%s is running a version higher than our binary version %s", addr, req.BinaryVersion.String())
return ErrIncompatibleBinaryVersion
return nil, ErrIncompatibleBinaryVersion
}

return err
return nil, err
}

clusterID, err := uuid.FromBytes(resp.ClusterID)
if err != nil {
return err
return nil, err
}

s.mu.Lock()
Expand All @@ -491,8 +547,7 @@ func (s *initServer) attemptJoin(ctx context.Context, addr string) error {
firstStoreID: roachpb.StoreID(resp.StoreID),
}

s.joinCh <- state
return nil
return state, nil
}

func (s *initServer) tryBootstrapLocked(ctx context.Context) (*initState, error) {
Expand Down