Skip to content

Commit

Permalink
Merge pull request #100278 from erikgrinaker/backport22.2-100213
Browse files Browse the repository at this point in the history
release-22.2: grpcutil: consider `WaitingForInitError` as unambiguous
  • Loading branch information
erikgrinaker authored Apr 9, 2023
2 parents 95b157f + 636c54a commit 9a7c644
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 61 deletions.
5 changes: 2 additions & 3 deletions pkg/cli/rpc_node_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand Down Expand Up @@ -117,7 +116,7 @@ func doDrainNoTimeout(
ctx context.Context, c serverpb.AdminClient, targetNode string,
) (hardError, remainingWork bool, err error) {
defer func() {
if server.IsWaitingForInit(err) {
if grpcutil.IsWaitingForInit(err) {
log.Infof(ctx, "%v", err)
err = errors.New("node cannot be drained before it has been initialized")
}
Expand Down Expand Up @@ -216,7 +215,7 @@ func doShutdown(
) (hardError bool, err error) {
defer func() {
if err != nil {
if server.IsWaitingForInit(err) {
if grpcutil.IsWaitingForInit(err) {
log.Infof(ctx, "encountered error: %v", err)
err = errors.New("node cannot be shut down before it has been initialized")
err = errors.WithHint(err, "You can still stop the process using a service manager or a signal.")
Expand Down
1 change: 0 additions & 1 deletion pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ go_test(
"node_tombstone_storage_test.go",
"pagination_test.go",
"purge_auth_session_test.go",
"servemode_test.go",
"server_http_test.go",
"server_import_ts_test.go",
"server_internal_executor_factory_test.go",
Expand Down
16 changes: 4 additions & 12 deletions pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
package server

import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
Expand Down Expand Up @@ -80,7 +78,7 @@ func (s *grpcServer) intercept(fullName string) error {
return nil
}
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
return s.waitingForInitError(fullName)
return NewWaitingForInitError(fullName)
}
return nil
}
Expand All @@ -93,15 +91,9 @@ func (s *serveMode) get() serveMode {
return serveMode(atomic.LoadInt32((*int32)(s)))
}

// waitingForInitError creates an error indicating that the server cannot run
// NewWaitingForInitError creates an error indicating that the server cannot run
// the specified method until the node has been initialized.
func (s *grpcServer) waitingForInitError(methodName string) error {
func NewWaitingForInitError(methodName string) error {
// NB: this error string is sadly matched in grpcutil.IsWaitingForInit().
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
}

// IsWaitingForInit checks whether the provided error is because the node is
// still waiting for initialization.
func IsWaitingForInit(err error) bool {
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
return ok && s.Code() == codes.Unavailable && strings.Contains(err.Error(), "node waiting for init")
}
5 changes: 3 additions & 2 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -368,7 +369,7 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (
if err != nil {
// Try the next node if unsuccessful.

if IsWaitingForInit(err) {
if grpcutil.IsWaitingForInit(err) {
log.Infof(ctx, "%s is itself waiting for init, will retry", addr)
} else {
log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error())
Expand Down Expand Up @@ -414,7 +415,7 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (
// could match against connection errors to generate nicer
// logging. See grpcutil.connectionRefusedRe.

if IsWaitingForInit(err) {
if grpcutil.IsWaitingForInit(err) {
log.Infof(ctx, "%s is itself waiting for init, will retry", addr)
} else {
log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error())
Expand Down
36 changes: 0 additions & 36 deletions pkg/server/servemode_test.go

This file was deleted.

5 changes: 4 additions & 1 deletion pkg/util/grpcutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,19 @@ go_test(
args = ["-test.timeout=55s"],
embed = [":grpcutil"],
deps = [
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/severity",
"//pkg/util/timeutil",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_gogo_status//:status",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//health/grpc_health_v1",
],
)
Expand Down
10 changes: 9 additions & 1 deletion pkg/util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func IsAuthError(err error) bool {
return false
}

// IsWaitingForInit checks whether the provided error is because the node is
// still waiting for initialization.
func IsWaitingForInit(err error) bool {
s, ok := status.FromError(errors.UnwrapAll(err))
return ok && s.Code() == codes.Unavailable && strings.Contains(err.Error(), "node waiting for init")
}

// RequestDidNotStart returns true if the given error from gRPC
// means that the request definitely could not have started on the
// remote server.
Expand All @@ -141,7 +148,8 @@ func RequestDidNotStart(err error) bool {
if errors.HasType(err, connectionNotReadyError{}) ||
errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) ||
errors.Is(err, circuit.ErrBreakerOpen) ||
IsConnectionRejected(err) {
IsConnectionRejected(err) ||
IsWaitingForInit(err) {
return true
}
s, ok := status.FromError(errors.Cause(err))
Expand Down
47 changes: 42 additions & 5 deletions pkg/util/grpcutil/grpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ package grpcutil_test

import (
"context"
"fmt"
"net"
"strings"
"testing"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/gogo/status"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

Expand Down Expand Up @@ -52,6 +57,25 @@ func (hs healthServer) Watch(*healthpb.HealthCheckRequest, healthpb.Health_Watch
panic("not implemented")
}

func TestIsWaitingForInit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testcases := map[string]struct {
err error
expect bool
}{
"waiting for init": {server.NewWaitingForInitError("foo"), true},
"unavailable error": {status.Errorf(codes.Unavailable, "foo"), false},
"non-grpc": {fmt.Errorf("node waiting for init"), false},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
require.Equal(t, tc.expect, grpcutil.IsWaitingForInit(tc.err))
})
}
}

func TestRequestDidNotStart(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -121,8 +145,21 @@ func TestRequestDidNotStart(t *testing.T) {
}
}

func TestRequestDidNotStart_OpenBreaker(t *testing.T) {
err := errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", 42)
res := grpcutil.RequestDidNotStart(err)
assert.True(t, res)
func TestRequestDidNotStart_Errors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testcases := map[string]struct {
err error
expect bool
}{
"breaker": {errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", 42), true},
"waiting for init": {errors.Wrapf(server.NewWaitingForInitError("foo"), "failed"), true},
"plain": {errors.New("foo"), false},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
require.Equal(t, tc.expect, grpcutil.RequestDidNotStart(tc.err))
})
}
}

0 comments on commit 9a7c644

Please sign in to comment.