Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpcutil: consider WaitingForInitError as unambiguous #100213

Merged
merged 2 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/cli/rpc_node_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
Expand Down Expand Up @@ -117,7 +116,7 @@ func doDrainNoTimeout(
ctx context.Context, c serverpb.AdminClient, targetNode string,
) (hardError, remainingWork bool, err error) {
defer func() {
if server.IsWaitingForInit(err) {
if grpcutil.IsWaitingForInit(err) {
log.Infof(ctx, "%v", err)
err = errors.New("node cannot be drained before it has been initialized")
}
Expand Down Expand Up @@ -216,7 +215,7 @@ func doShutdown(
) (hardError bool, err error) {
defer func() {
if err != nil {
if server.IsWaitingForInit(err) {
if grpcutil.IsWaitingForInit(err) {
log.Infof(ctx, "encountered error: %v", err)
err = errors.New("node cannot be shut down before it has been initialized")
err = errors.WithHint(err, "You can still stop the process using a service manager or a signal.")
Expand Down
1 change: 0 additions & 1 deletion pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ go_test(
"node_tombstone_storage_test.go",
"pagination_test.go",
"purge_auth_session_test.go",
"servemode_test.go",
"server_controller_test.go",
"server_http_test.go",
"server_import_ts_test.go",
Expand Down
16 changes: 4 additions & 12 deletions pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
package server

import (
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
Expand Down Expand Up @@ -83,7 +81,7 @@ func (s *grpcServer) intercept(fullName string) error {
return nil
}
if _, allowed := rpcsAllowedWhileBootstrapping[fullName]; !allowed {
return s.waitingForInitError(fullName)
return NewWaitingForInitError(fullName)
}
return nil
}
Expand All @@ -96,15 +94,9 @@ func (s *serveMode) get() serveMode {
return serveMode(atomic.LoadInt32((*int32)(s)))
}

// waitingForInitError creates an error indicating that the server cannot run
// NewWaitingForInitError creates an error indicating that the server cannot run
// the specified method until the node has been initialized.
func (s *grpcServer) waitingForInitError(methodName string) error {
func NewWaitingForInitError(methodName string) error {
// NB: this error string is sadly matched in grpcutil.IsWaitingForInit().
return grpcstatus.Errorf(codes.Unavailable, "node waiting for init; %s not available", methodName)
}

// IsWaitingForInit checks whether the provided error is because the node is
// still waiting for initialization.
func IsWaitingForInit(err error) bool {
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
return ok && s.Code() == codes.Unavailable && strings.Contains(err.Error(), "node waiting for init")
}
5 changes: 3 additions & 2 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -370,7 +371,7 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (
if err != nil {
// Try the next node if unsuccessful.

if IsWaitingForInit(err) {
if grpcutil.IsWaitingForInit(err) {
log.Infof(ctx, "%s is itself waiting for init, will retry", addr)
} else {
log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error())
Expand Down Expand Up @@ -416,7 +417,7 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (
// could match against connection errors to generate nicer
// logging. See grpcutil.connectionRefusedRe.

if IsWaitingForInit(err) {
if grpcutil.IsWaitingForInit(err) {
log.Infof(ctx, "%s is itself waiting for init, will retry", addr)
} else {
log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error())
Expand Down
36 changes: 0 additions & 36 deletions pkg/server/servemode_test.go

This file was deleted.

3 changes: 2 additions & 1 deletion pkg/util/grpcutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ go_test(
args = ["-test.timeout=55s"],
embed = [":grpcutil"],
deps = [
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/severity",
"//pkg/util/timeutil",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_status//:status",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
Expand Down
10 changes: 9 additions & 1 deletion pkg/util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,21 @@ func IsAuthError(err error) bool {
return false
}

// IsWaitingForInit checks whether the provided error is because the node is
// still waiting for initialization.
func IsWaitingForInit(err error) bool {
s, ok := status.FromError(errors.UnwrapAll(err))
return ok && s.Code() == codes.Unavailable && strings.Contains(err.Error(), "node waiting for init")
}

// RequestDidNotStart returns true if the given error from gRPC
// means that the request definitely could not have started on the
// remote server.
func RequestDidNotStart(err error) bool {
if errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) ||
errors.Is(err, circuit.ErrBreakerOpen) ||
IsConnectionRejected(err) {
IsConnectionRejected(err) ||
IsWaitingForInit(err) {
return true
}
_, ok := status.FromError(errors.Cause(err))
Expand Down
47 changes: 42 additions & 5 deletions pkg/util/grpcutil/grpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ package grpcutil_test

import (
"context"
"fmt"
"net"
"strings"
"testing"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/gogo/status"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

Expand Down Expand Up @@ -52,6 +57,25 @@ func (hs healthServer) Watch(*healthpb.HealthCheckRequest, healthpb.Health_Watch
panic("not implemented")
}

func TestIsWaitingForInit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testcases := map[string]struct {
err error
expect bool
}{
"waiting for init": {server.NewWaitingForInitError("foo"), true},
"unavailable error": {status.Errorf(codes.Unavailable, "foo"), false},
"non-grpc": {fmt.Errorf("node waiting for init"), false},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
require.Equal(t, tc.expect, grpcutil.IsWaitingForInit(tc.err))
})
}
}

func TestRequestDidNotStart(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -121,8 +145,21 @@ func TestRequestDidNotStart(t *testing.T) {
}
}

func TestRequestDidNotStart_OpenBreaker(t *testing.T) {
err := errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", 42)
res := grpcutil.RequestDidNotStart(err)
assert.True(t, res)
func TestRequestDidNotStart_Errors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testcases := map[string]struct {
err error
expect bool
}{
"breaker": {errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", 42), true},
"waiting for init": {errors.Wrapf(server.NewWaitingForInitError("foo"), "failed"), true},
"plain": {errors.New("foo"), false},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
require.Equal(t, tc.expect, grpcutil.RequestDidNotStart(tc.err))
})
}
}