Skip to content

Commit

Permalink
*: improve handling of permanent errors
Browse files Browse the repository at this point in the history
RPC errors are normally retried. However, in some cases the errors are
permanent such that retries are futile, and this can cause operations to
appear to hang as they keep retrying -- e.g. when running operations on
a decommissioned node. There is already some detection of permanent
errors, but it is incomplete.

This patch attempts to improve coverage of permanent errors, in
particular in the context of decommissioned nodes, and adds test cases
for these scenarios.

Release note: None
  • Loading branch information
erikgrinaker committed Mar 30, 2021
1 parent 172a846 commit c970f54
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 36 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -77,5 +78,7 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)
98 changes: 98 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,21 @@ import (
"bytes"
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func setup(t *testing.T) (serverutils.TestServerInterface, *kv.DB) {
Expand Down Expand Up @@ -508,3 +516,93 @@ func TestDB_Put_insecure(t *testing.T) {
}
checkResult(t, []byte("1"), result.ValueBytes())
}

// Test that all operations on a decommissioned node will return a
// permission denied error rather than hanging indefinitely due to
// internal retries.
func TestDBDecommissionedOperations(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // saves time
})
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
require.Len(t, scratchRange.InternalReplicas, 1)
require.Equal(t, tc.Server(0).NodeID(), scratchRange.InternalReplicas[0].NodeID)

// Decommission server 1.
srv := tc.Server(0)
decomSrv := tc.Server(1)
for _, status := range []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_DECOMMISSIONING, livenesspb.MembershipStatus_DECOMMISSIONED,
} {
require.NoError(t, srv.Decommission(ctx, status, []roachpb.NodeID{decomSrv.NodeID()}))
}

// Run a few different operations, which should all eventually error with
// PermissionDenied. We don't need full coverage of methods, since they use
// the same sender infrastructure, but should use a few variations to hit
// different sender code paths (e.g. with and without txns).
db := decomSrv.DB()
key := roachpb.Key([]byte("a"))
keyEnd := roachpb.Key([]byte("x"))
value := []byte{1, 2, 3}

testcases := []struct {
name string
op func() error
}{
{"Del", func() error {
return db.Del(ctx, key)
}},
{"DelRange", func() error {
return db.DelRange(ctx, key, keyEnd)
}},
{"Get", func() error {
_, err := db.Get(ctx, key)
return err
}},
{"GetForUpdate", func() error {
_, err := db.GetForUpdate(ctx, key)
return err
}},
{"Put", func() error {
return db.Put(ctx, key, value)
}},
{"Scan", func() error {
_, err := db.Scan(ctx, key, keyEnd, 0)
return err
}},
{"TxnGet", func() error {
_, err := db.NewTxn(ctx, "").Get(ctx, key)
return err
}},
{"TxnPut", func() error {
return db.NewTxn(ctx, "").Put(ctx, key, value)
}},
{"AdminTransferLease", func() error {
return db.AdminTransferLease(ctx, scratchKey, srv.GetFirstStoreID())
}},
}

for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
var err error
require.Eventually(t, func() bool {
err = tc.op()
s, ok := status.FromError(errors.UnwrapAll(err))
if s == nil || !ok {
return false
}
require.Equal(t, codes.PermissionDenied, s.Code())
return true
}, 10*time.Second, 100*time.Millisecond, "timed out waiting for gRPC error, got %v", err)
})
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,7 @@ func (ds *DistSender) sendPartialBatch(
// order to return the most recent error when we are out of retries.
pErr = roachpb.NewError(err)
if !rangecache.IsRangeLookupErrorRetryable(err) {
return response{pErr: roachpb.NewError(err)}
return response{pErr: pErr}
}
continue
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/liveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/storage",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/metric",
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -334,6 +335,9 @@ func (nl *NodeLiveness) SetDraining(
if log.V(1) {
log.Infof(ctx, "attempting to set liveness draining status to %v: %v", drain, err)
}
if grpcutil.IsAuthError(err) {
return err
}
continue
}
return nil
Expand Down Expand Up @@ -697,6 +701,9 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions
liveness, err := nl.getLivenessFromKV(ctx, nodeID)
if err != nil {
log.Infof(ctx, "unable to get liveness record from KV: %s", err)
if grpcutil.IsAuthError(err) {
return err
}
continue
}
oldLiveness = liveness
Expand Down
6 changes: 2 additions & 4 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@ import (
"golang.org/x/sync/syncmap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
)

func init() {
Expand Down Expand Up @@ -1051,7 +1049,7 @@ func (ctx *Context) grpcDialNodeInternal(
if err := ctx.Stopper.RunAsyncTask(
ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) {
err := ctx.runHeartbeat(conn, target, redialChan)
if err != nil && !grpcutil.IsClosedConnection(err) {
if err != nil && !grpcutil.IsClosedConnection(err) && !grpcutil.IsAuthError(err) {
log.Health.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
}
ctx.removeConn(conn, thisConnKeys...)
Expand Down Expand Up @@ -1167,7 +1165,7 @@ func (ctx *Context) runHeartbeat(
err = ping(goCtx)
}

if s, ok := grpcstatus.FromError(errors.UnwrapAll(err)); ok && s.Code() == codes.PermissionDenied {
if grpcutil.IsAuthError(err) {
returnErr = true
}

Expand Down
169 changes: 169 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func getAdminJSONProto(
Expand Down Expand Up @@ -2083,3 +2086,169 @@ func TestDecommissionSelf(t *testing.T) {
}, 5*time.Second, 100*time.Millisecond, "timed out waiting for node %v status %v", i, expect)
}
}

func TestAdminDecommissionedOperations(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // saves time
ServerArgs: base.TestServerArgs{
Insecure: true, // allows admin client without seting up certs
},
})
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
require.Len(t, scratchRange.InternalReplicas, 1)
require.Equal(t, tc.Server(0).NodeID(), scratchRange.InternalReplicas[0].NodeID)

// Decommission server 1 and wait for it to lose cluster access.
srv := tc.Server(0)
decomSrv := tc.Server(1)
for _, status := range []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_DECOMMISSIONING, livenesspb.MembershipStatus_DECOMMISSIONED,
} {
require.NoError(t, srv.Decommission(ctx, status, []roachpb.NodeID{decomSrv.NodeID()}))
}

require.Eventually(t, func() bool {
_, err := decomSrv.DB().Scan(ctx, keys.MinKey, keys.MaxKey, 0)
s, ok := status.FromError(errors.UnwrapAll(err))
return ok && s.Code() == codes.PermissionDenied
}, 10*time.Second, 100*time.Millisecond, "timed out waiting for server to lose cluster access")

// Set up an admin client.
conn, err := grpc.Dial(decomSrv.ServingRPCAddr(), grpc.WithInsecure())
require.NoError(t, err)
defer func() {
_ = conn.Close() // nolint:grpcconnclose
}()
adminClient := serverpb.NewAdminClient(conn)

// Run some operations on the decommissioned node. The ones that require
// access to the cluster should fail, other should succeed. We're mostly
// concerned with making sure they return rather than hang due to internal
// retries.
testcases := []struct {
name string
expectCode codes.Code
op func(serverpb.AdminClient) error
}{
{"Cluster", codes.OK, func(c serverpb.AdminClient) error {
_, err := c.Cluster(ctx, &serverpb.ClusterRequest{})
return err
}},
{"Databases", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.Databases(ctx, &serverpb.DatabasesRequest{})
return err
}},
{"DatabaseDetails", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.DatabaseDetails(ctx, &serverpb.DatabaseDetailsRequest{Database: "foo"})
return err
}},
{"DataDistribution", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.DataDistribution(ctx, &serverpb.DataDistributionRequest{})
return err
}},
{"Decommission", codes.Unknown, func(c serverpb.AdminClient) error {
_, err := c.Decommission(ctx, &serverpb.DecommissionRequest{
NodeIDs: []roachpb.NodeID{srv.NodeID(), decomSrv.NodeID()},
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED,
})
return err
}},
{"DecommissionStatus", codes.Unknown, func(c serverpb.AdminClient) error {
_, err := c.DecommissionStatus(ctx, &serverpb.DecommissionStatusRequest{
NodeIDs: []roachpb.NodeID{srv.NodeID(), decomSrv.NodeID()},
})
return err
}},
{"EnqueueRange", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.EnqueueRange(ctx, &serverpb.EnqueueRangeRequest{
RangeID: scratchRange.RangeID,
Queue: "replicaGC",
})
return err
}},
{"Events", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.Events(ctx, &serverpb.EventsRequest{})
return err
}},
{"Health", codes.OK, func(c serverpb.AdminClient) error {
_, err := c.Health(ctx, &serverpb.HealthRequest{})
return err
}},
{"Jobs", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.Jobs(ctx, &serverpb.JobsRequest{})
return err
}},
{"Liveness", codes.OK, func(c serverpb.AdminClient) error {
_, err := c.Liveness(ctx, &serverpb.LivenessRequest{})
return err
}},
{"Locations", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.Locations(ctx, &serverpb.LocationsRequest{})
return err
}},
{"NonTableStats", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.NonTableStats(ctx, &serverpb.NonTableStatsRequest{})
return err
}},
{"QueryPlan", codes.OK, func(c serverpb.AdminClient) error {
_, err := c.QueryPlan(ctx, &serverpb.QueryPlanRequest{Query: "SELECT 1"})
return err
}},
{"RangeLog", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.RangeLog(ctx, &serverpb.RangeLogRequest{})
return err
}},
{"Settings", codes.OK, func(c serverpb.AdminClient) error {
_, err := c.Settings(ctx, &serverpb.SettingsRequest{})
return err
}},
{"TableStats", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.TableStats(ctx, &serverpb.TableStatsRequest{Database: "foo", Table: "bar"})
return err
}},
{"TableDetails", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.TableDetails(ctx, &serverpb.TableDetailsRequest{Database: "foo", Table: "bar"})
return err
}},
{"Users", codes.Internal, func(c serverpb.AdminClient) error {
_, err := c.Users(ctx, &serverpb.UsersRequest{})
return err
}},
// We drain at the end, since it may evict us.
{"Drain", codes.Unknown, func(c serverpb.AdminClient) error {
stream, err := c.Drain(ctx, &serverpb.DrainRequest{DoDrain: true})
if err != nil {
return err
}
_, err = stream.Recv()
return err
}},
}

for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
var err error
require.Eventually(t, func() bool {
err = tc.op(adminClient)
if tc.expectCode == codes.OK {
require.NoError(t, err)
return true
}
s, ok := status.FromError(errors.UnwrapAll(err))
if s == nil || !ok {
return false
}
require.Equal(t, tc.expectCode, s.Code())
return true
}, 10*time.Second, 100*time.Millisecond, "timed out waiting for gRPC error, got %s", err)
})
}
}
Loading

0 comments on commit c970f54

Please sign in to comment.