From 24537dbb73fe5ecc46f3fea63ca9dbf5b86bd2c5 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 30 Jun 2023 13:42:11 +0000 Subject: [PATCH] rpc: improve test coverage of initial heartbeat failures This patch improves test coverage for `InitialHeartbeatFailedError` and `grpcutil.RequestDidNotStart()`. It also removes some stale references about wanting gRPC to return unambiguious errors for certain types of connection failures. `InitialHeartbeatFailedError` has largerly replaced this need. Epic: none Release note: None --- pkg/rpc/context_test.go | 101 ++++++++++++++++++++++++ pkg/util/grpcutil/BUILD.bazel | 5 +- pkg/util/grpcutil/grpc_util.go | 26 ++----- pkg/util/grpcutil/grpc_util_test.go | 114 +++------------------------- 4 files changed, 119 insertions(+), 127 deletions(-) diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index ea1e8e523ec0..a0a88743491e 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -2498,3 +2498,104 @@ func checkMetrics(m *Metrics, healthy, unhealthy, inactive int64, checkDurations return nil } + +// TestInitialHeartbeatFailedError tests that InitialHeartbeatFailedError is +// returned for various scenarios. This is important for +// grpcutil.RequestDidNotStart to properly detect unambiguous failures. +func TestInitialHeartbeatFailedError(t *testing.T) { + defer leaktest.AfterTest(t)() + + const maxOffset = 0 + const nodeID = 1 + + hbErrType := (*netutil.InitialHeartbeatFailedError)(nil) + requireHeartbeatError := func(t *testing.T, err error) { + t.Helper() + require.Error(t, err) + require.True(t, errors.HasType(err, hbErrType), "got %T: %s", err, err) + } + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := timeutil.NewManualTime(timeutil.Unix(0, 20)) + serverCtx := newTestContext(uuid.MakeV4(), clock, maxOffset, stopper) + serverCtx.NodeID.Set(ctx, nodeID) + clientCtx := newTestContext(serverCtx.StorageClusterID.Get(), clock, maxOffset, stopper) + clientCtx.AddTestingDialOpts(grpc.WithConnectParams(grpc.ConnectParams{ + MinConnectTimeout: time.Second, + })) + + // Set up a ping handler that can error out. + var failPing, hangPing atomic.Bool + onHandlePing := func(ctx context.Context, req *PingRequest, resp *PingResponse) error { + if failPing.Load() { + return errors.New("error") + } + for hangPing.Load() { + time.Sleep(100 * time.Millisecond) + } + return nil + } + + // Rejected connection errors with InitialHeartbeatFailedError. + remoteAddr := "127.0.0.99:64072" + _, err := clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx) + requireHeartbeatError(t, err) + + // Hung listener errors with InitialHeartbeatFailedError. + hungLn, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer func() { + _ = hungLn.Close() + }() + remoteAddr = hungLn.Addr().String() + + _, err = clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx) + requireHeartbeatError(t, err) + + // Start server listener. + s := newTestServer(t, serverCtx) + RegisterHeartbeatServer(s, &HeartbeatService{ + clock: clock, + remoteClockMonitor: serverCtx.RemoteClocks, + clusterID: serverCtx.StorageClusterID, + nodeID: serverCtx.NodeID, + version: serverCtx.Settings.Version, + onHandlePing: onHandlePing, + }) + + ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr) + require.NoError(t, err) + remoteAddr = ln.Addr().String() + + // Before connecting, health does not return an InitialHeartbeatFailedError, + // it returns ErrNotHeartbeated. + err = clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Health() + require.Error(t, err) + require.True(t, errors.HasType(err, ErrNotHeartbeated)) + require.False(t, errors.HasType(err, hbErrType)) + + // Ping errors result in InitialHeartbeatFailedError. + failPing.Store(true) + _, err = clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx) + requireHeartbeatError(t, err) + failPing.Store(false) + + // Stalled pings result in InitialHeartbeatFailedError. + hangPing.Store(true) + _, err = clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx) + requireHeartbeatError(t, err) + hangPing.Store(false) + + // RPC circuit breakers will now be tripped. They should result in + // InitialHeartbeatFailedError until we finally recover. + testutils.SucceedsSoon(t, func() error { + _, err := clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx) + if err != nil { + requireHeartbeatError(t, err) + } + return err + }) +} diff --git a/pkg/util/grpcutil/BUILD.bazel b/pkg/util/grpcutil/BUILD.bazel index 7715ec906e34..a71319fc9f4c 100644 --- a/pkg/util/grpcutil/BUILD.bazel +++ b/pkg/util/grpcutil/BUILD.bazel @@ -41,19 +41,16 @@ go_test( embed = [":grpcutil"], deps = [ "//pkg/server", - "//pkg/testutils", - "//pkg/testutils/skip", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/log/severity", + "//pkg/util/netutil", "@com_github_cockroachdb_circuitbreaker//:circuitbreaker", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_status//:status", "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//health/grpc_health_v1", "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//status", ], diff --git a/pkg/util/grpcutil/grpc_util.go b/pkg/util/grpcutil/grpc_util.go index 866f308a9006..3737f6015ba6 100644 --- a/pkg/util/grpcutil/grpc_util.go +++ b/pkg/util/grpcutil/grpc_util.go @@ -150,26 +150,14 @@ func IsWaitingForInit(err error) bool { return ok && s.Code() == codes.Unavailable && strings.Contains(err.Error(), "node waiting for init") } -// RequestDidNotStart returns true if the given error from gRPC -// means that the request definitely could not have started on the -// remote server. +// RequestDidNotStart returns true if the given RPC error means that the request +// definitely could not have started on the remote server. func RequestDidNotStart(err error) bool { - if errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) || + // NB: gRPC doesn't provide a way to distinguish unambiguous failures, but + // InitialHeartbeatFailedError serves mostly the same purpose. See also + // https://github.com/grpc/grpc-go/issues/1443. + return errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) || errors.Is(err, circuit.ErrBreakerOpen) || IsConnectionRejected(err) || - IsWaitingForInit(err) { - return true - } - _, ok := status.FromError(errors.Cause(err)) - if !ok { - // This is a non-gRPC error; assume nothing. - return false - } - // This is where you'd hope to treat some gRPC errors as unambiguous. - // Unfortunately, gRPC provides no good way to distinguish ambiguous from - // unambiguous failures. - // - // https://github.com/grpc/grpc-go/issues/1443 - // https://github.com/cockroachave hdb/cockroach/issues/19708#issuecomment-343891640 - return false + IsWaitingForInit(err) } diff --git a/pkg/util/grpcutil/grpc_util_test.go b/pkg/util/grpcutil/grpc_util_test.go index 026575bad0df..dad1317c8a41 100644 --- a/pkg/util/grpcutil/grpc_util_test.go +++ b/pkg/util/grpcutil/grpc_util_test.go @@ -11,52 +11,21 @@ package grpcutil_test import ( - "context" "fmt" - "net" - "strings" "testing" circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/errors" "github.com/gogo/status" "github.com/stretchr/testify/require" - "google.golang.org/grpc" "google.golang.org/grpc/codes" - healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -// Implement the grpc health check interface (just because it's the -// simplest predefined RPC service I could find that seems unlikely to -// change out from under us) with an implementation that shuts itself -// down whenever anything calls it. This lets us distinguish errors -// caused by server shutdowns during the request from those when the -// server was already down. -type healthServer struct { - grpcServer *grpc.Server -} - -func (hs healthServer) Check( - ctx context.Context, req *healthpb.HealthCheckRequest, -) (*healthpb.HealthCheckResponse, error) { - hs.grpcServer.Stop() - - // Wait for the shutdown to happen before returning from this - // method. - <-ctx.Done() - return nil, errors.New("no one should see this") -} - -func (hs healthServer) Watch(*healthpb.HealthCheckRequest, healthpb.Health_WatchServer) error { - panic("not implemented") -} - func TestIsWaitingForInit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -76,75 +45,6 @@ func TestIsWaitingForInit(t *testing.T) { } } -func TestRequestDidNotStart(t *testing.T) { - defer leaktest.AfterTest(t)() - - skip.WithIssue(t, 19708) - - lis, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) - } - defer func() { - _ = lis.Close() - }() - - server := grpc.NewServer() - hs := healthServer{server} - healthpb.RegisterHealthServer(server, hs) - go func() { - _ = server.Serve(lis) - }() - //lint:ignore SA1019 grpc.WithInsecure is deprecated - conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) - if err != nil { - t.Fatal(err) - } - defer func() { - _ = conn.Close() // nolint:grpcconnclose - }() - client := healthpb.NewHealthClient(conn) - - // The first time, the request will start and we'll get a - // "connection is closing" message. - _, err = client.Check(context.Background(), &healthpb.HealthCheckRequest{}) - if err == nil { - t.Fatal("did not get expected error") - } else if grpcutil.RequestDidNotStart(err) { - t.Fatalf("request should have started, but got %s", err) - } else if !strings.Contains(err.Error(), "is closing") { - // This assertion is not essential to this test, but since this - // logic is sensitive to grpc error handling details it's safer to - // make the test fail when anything changes. This error could be - // either "transport is closing" or "connection is closing" - t.Fatalf("expected 'is closing' error but got %s", err) - } - - // Afterwards, the request will fail immediately without being sent. - // But if we try too soon, there's a chance the transport hasn't - // been put in the "transient failure" state yet and we get a - // different error. - testutils.SucceedsSoon(t, func() error { - _, err = client.Check(context.Background(), &healthpb.HealthCheckRequest{}) - if err == nil { - return errors.New("did not get expected error") - } else if !grpcutil.RequestDidNotStart(err) { - return errors.Wrap(err, "request should not have started, but got error") - } - return nil - }) - - // Once the transport is in the "transient failure" state it should - // stay that way, and every subsequent request will fail - // immediately. - _, err = client.Check(context.Background(), &healthpb.HealthCheckRequest{}) - if err == nil { - t.Fatal("did not get expected error") - } else if !grpcutil.RequestDidNotStart(err) { - t.Fatalf("request should not have started, but got %s", err) - } -} - func TestRequestDidNotStart_Errors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -153,13 +53,19 @@ func TestRequestDidNotStart_Errors(t *testing.T) { err error expect bool }{ - "breaker": {errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", 42), true}, - "waiting for init": {errors.Wrapf(server.NewWaitingForInitError("foo"), "failed"), true}, - "plain": {errors.New("foo"), false}, + "failed heartbeat": {&netutil.InitialHeartbeatFailedError{}, true}, + "waiting for init": {server.NewWaitingForInitError("foo"), true}, + "unauthenticated": {status.Error(codes.Unauthenticated, "unauthenticated"), true}, + "permission denied": {status.Error(codes.PermissionDenied, "permission denied"), true}, + "failed precondition": {status.Error(codes.FailedPrecondition, "failed precondition"), true}, + "circuit breaker": {circuit.ErrBreakerOpen, true}, + "plain": {errors.New("foo"), false}, } for name, tc := range testcases { t.Run(name, func(t *testing.T) { + // Make sure the error is properly detected both bare and wrapped. require.Equal(t, tc.expect, grpcutil.RequestDidNotStart(tc.err)) + require.Equal(t, tc.expect, grpcutil.RequestDidNotStart(errors.Wrap(tc.err, "wrapped"))) }) } }