Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: fix node decommissioning itself #61356

Merged
merged 3 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ func (c *Connector) RangeLookup(
})
if err != nil {
log.Warningf(ctx, "error issuing RangeLookup RPC: %v", err)
if grpcutil.IsAuthenticationError(err) {
// Authentication error. Propagate.
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
return nil, nil, err
}
// Soft RPC error. Drop client and retry.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestConnectorRangeLookup(t *testing.T) {
desc, err := c.FirstRange()
require.Nil(t, desc)
require.Regexp(t, "does not have access to FirstRange", err)
require.True(t, grpcutil.IsAuthenticationError(err))
require.True(t, grpcutil.IsAuthError(err))
}

// TestConnectorRetriesUnreachable tests that Connector iterates over each of
Expand Down
9 changes: 1 addition & 8 deletions pkg/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,18 +466,11 @@ func runDecommissionNodeImpl(
NodeIDs: nodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED,
}
resp, err := c.Decommission(ctx, req)
_, err = c.Decommission(ctx, req)
if err != nil {
fmt.Fprintln(stderr)
return errors.Wrap(err, "while trying to mark as decommissioned")
}
if !reflect.DeepEqual(&prevResponse, resp) {
fmt.Fprintln(stderr)
if err := printDecommissionStatus(*resp); err != nil {
return err
}
prevResponse = *resp
}

fmt.Fprintln(os.Stdout, "\nNo more data reported on target nodes. "+
"Please verify cluster health before removing the nodes.")
Expand Down
10 changes: 1 addition & 9 deletions pkg/cmd/roachtest/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,7 @@ func registerAcceptance(r *testRegistry) {
{name: "build-analyze", fn: runBuildAnalyze},
{name: "cli/node-status", fn: runCLINodeStatus},
{name: "cluster-init", fn: runClusterInit},
{name: "decommission-self",
fn: runDecommissionSelf,
// Decommissioning self was observed to hang, though not in this test
// when run locally. More investigation is needed; there is a small
// chance that the original observation was in error. However, it
// seems likely that the problem exists even if it is rarely reproduced,
// so this test is skipped.
skip: "https://github.com/cockroachdb/cockroach/issues/56718",
},
{name: "decommission-self", fn: runDecommissionSelf},
{name: "event-log", fn: runEventLog},
{name: "gossip/peerings", fn: runGossipPeerings},
{name: "gossip/restart", fn: runGossipRestart},
Expand Down
4 changes: 1 addition & 3 deletions pkg/cmd/roachtest/mixed_version_decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ func runDecommissionMixedVersions(
// to communicate with the cluster (i.e. most commands against it will fail).
// This is also why we're making sure to avoid decommissioning pinnedUpgrade
// itself, as we use it to check the membership after.
//
// NB: we avoid runNode == targetNode here to temporarily avoid #56718.
fullyDecommissionStep(h.getRandNodeOtherThan(pinnedUpgrade), pinnedUpgrade, ""),
fullyDecommissionStep(h.getRandNodeOtherThan(pinnedUpgrade), h.getRandNode(), ""),
checkOneMembership(pinnedUpgrade, "decommissioned"),
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1888,8 +1888,8 @@ func (ds *DistSender) sendToReplicas(
ds.maybeIncrementErrCounters(br, err)

if err != nil {
if grpcutil.IsAuthenticationError(err) {
// Authentication error. Propagate.
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
if ambiguousError != nil {
return nil, roachpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ func (ds *DistSender) singleRangeFeed(
stream, err := client.RangeFeed(clientCtx, &args)
if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
if grpcutil.IsAuthenticationError(err) {
// Authentication error. Propagate.
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
return args.Timestamp, err
}
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,6 @@ func (e *CacheEntry) evictLeaseholder(
// IsRangeLookupErrorRetryable returns whether the provided range lookup error
// can be retried or whether it should be propagated immediately.
func IsRangeLookupErrorRetryable(err error) bool {
// For now, all errors are retryable except authentication errors.
return !grpcutil.IsAuthenticationError(err)
// For now, all errors are retryable except authentication/authorization.
return !grpcutil.IsAuthError(err)
}
2 changes: 1 addition & 1 deletion pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ go_test(
"auto_tls_init_test.go",
"config_test.go",
"connectivity_test.go",
"decommission_test.go",
"drain_test.go",
"graphite_test.go",
"idle_monitor_test.go",
Expand Down Expand Up @@ -289,6 +288,7 @@ go_test(
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
21 changes: 16 additions & 5 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1922,14 +1922,14 @@ func (s *adminServer) DecommissionStatus(
) (*serverpb.DecommissionStatusResponse, error) {
// Get the number of replicas on each node. We *may* not need all of them,
// but that would be more complicated than seems worth it right now.
ns, err := s.server.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return nil, errors.Wrap(err, "loading node statuses")
}

nodeIDs := req.NodeIDs

// If no nodeIDs given, use all nodes.
if len(nodeIDs) == 0 {
ns, err := s.server.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return nil, errors.Wrap(err, "loading node statuses")
}
for _, status := range ns.Nodes {
nodeIDs = append(nodeIDs, status.Desc.NodeID)
}
Expand Down Expand Up @@ -1991,6 +1991,9 @@ func (s *adminServer) DecommissionStatus(
}

// Decommission sets the decommission flag to the specified value on the specified node(s).
// When the flag is set to DECOMMISSIONED, an empty response is returned on success -- this
// ensures a node can decommission itself, since the node could otherwise lose RPC access
// to the cluster while building the full response.
func (s *adminServer) Decommission(
ctx context.Context, req *serverpb.DecommissionRequest,
) (*serverpb.DecommissionStatusResponse, error) {
Expand All @@ -2005,6 +2008,14 @@ func (s *adminServer) Decommission(
if err := s.server.Decommission(ctx, req.TargetMembership, nodeIDs); err != nil {
return nil, err
}

// We return an empty response when setting the final DECOMMISSIONED state,
// since a node can be asked to decommission itself which may cause it to
// lose access to cluster RPC and fail to populate the response.
if req.TargetMembership == livenesspb.MembershipStatus_DECOMMISSIONED {
return &serverpb.DecommissionStatusResponse{}, nil
}

return s.DecommissionStatus(ctx, &serverpb.DecommissionStatusRequest{NodeIDs: nodeIDs})
}

Expand Down
73 changes: 73 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/debug"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
Expand Down Expand Up @@ -2010,3 +2013,73 @@ func TestEndpointTelemetryBasic(t *testing.T) {
"/cockroach.server.serverpb.Status/Statements",
)))
}

func TestDecommissionSelf(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t) // can't handle 7-node clusters

// Set up test cluster.
ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // saves time
})
defer tc.Stopper().Stop(ctx)

// Decommission several nodes, including the node we're submitting the
// decommission request to. We use the admin client in order to test the
// admin server's logic, which involves a subsequent DecommissionStatus
// call which could fail if used from a node that's just decommissioned.
adminSrv := tc.Server(4)
conn, err := adminSrv.RPCContext().GRPCDialNode(
adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
adminClient := serverpb.NewAdminClient(conn)
decomNodeIDs := []roachpb.NodeID{
tc.Server(4).NodeID(),
tc.Server(5).NodeID(),
tc.Server(6).NodeID(),
}

// The DECOMMISSIONING call should return a full status response.
resp, err := adminClient.Decommission(ctx, &serverpb.DecommissionRequest{
NodeIDs: decomNodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
})
require.NoError(t, err)
require.Len(t, resp.Status, len(decomNodeIDs))
for i, nodeID := range decomNodeIDs {
status := resp.Status[i]
require.Equal(t, nodeID, status.NodeID)
// Liveness entries may not have been updated yet.
require.Contains(t, []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_ACTIVE,
livenesspb.MembershipStatus_DECOMMISSIONING,
}, status.Membership, "unexpected membership status %v for node %v", status, nodeID)
}

// The DECOMMISSIONED call should return an empty response, to avoid
// erroring due to loss of cluster RPC access when decommissioning self.
resp, err = adminClient.Decommission(ctx, &serverpb.DecommissionRequest{
NodeIDs: decomNodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED,
})
require.NoError(t, err)
require.Empty(t, resp.Status)

// The nodes should now have been (or soon become) decommissioned.
for i := 0; i < tc.NumServers(); i++ {
srv := tc.Server(i)
expect := livenesspb.MembershipStatus_ACTIVE
for _, nodeID := range decomNodeIDs {
if srv.NodeID() == nodeID {
expect = livenesspb.MembershipStatus_DECOMMISSIONED
break
}
}
require.Eventually(t, func() bool {
liveness, ok := srv.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(srv.NodeID())
return ok && liveness.Membership == expect
}, 5*time.Second, 100*time.Millisecond, "timed out waiting for node %v status %v", i, expect)
}
}
29 changes: 23 additions & 6 deletions pkg/server/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -352,24 +353,28 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) {
tc := testcluster.StartTestCluster(t, numNodes, tcArgs)
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
require.Len(t, scratchRange.InternalReplicas, 1)
require.Equal(t, tc.Server(0).NodeID(), scratchRange.InternalReplicas[0].NodeID)

decomSrv := tc.Server(2)
for _, status := range []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_DECOMMISSIONING, livenesspb.MembershipStatus_DECOMMISSIONED,
} {
require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{3}))
require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{decomSrv.NodeID()}))
}

testutils.SucceedsSoon(t, func() error {
for _, idx := range []int{0, 1} {
clusterSrv := tc.Server(idx)
decomSrv := tc.Server(2)

// Within a short period of time, the cluster (n1, n2) will refuse to reach out to n3.
{
_, err := clusterSrv.RPCContext().GRPCDialNode(
decomSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
cause := errors.UnwrapAll(err)
s, ok := grpcstatus.FromError(cause)
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.PermissionDenied {
return errors.Errorf("expected permission denied for n%d->n%d, got %v", clusterSrv.NodeID(), decomSrv.NodeID(), err)
}
Expand All @@ -380,13 +385,25 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) {
_, err := decomSrv.RPCContext().GRPCDialNode(
clusterSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
cause := errors.UnwrapAll(err)
s, ok := grpcstatus.FromError(cause)
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.PermissionDenied {
return errors.Errorf("expected permission denied for n%d->n%d, got %v", decomSrv.NodeID(), clusterSrv.NodeID(), err)
}
}
}
return nil
})

// Trying to scan the scratch range via the decommissioned node should
// now result in a permission denied error.
_, err := decomSrv.DB().Scan(ctx, scratchKey, keys.MaxKey, 1)
require.Error(t, err)

// TODO(erikgrinaker): until cockroachdb/errors preserves grpcstatus.Error
// across errors.EncodeError() we just check for any error, this should
// check that it matches codes.PermissionDenied later.
//err = errors.UnwrapAll(err)
//s, ok := grpcstatus.FromError(err)
//require.True(t, ok, "expected gRPC error, got %T (%v)", err, err)
//require.Equal(t, codes.PermissionDenied, s.Code(), "expected permission denied error")
}
57 changes: 0 additions & 57 deletions pkg/server/decommission_test.go

This file was deleted.

13 changes: 13 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -2129,6 +2130,18 @@ func (s *Server) Decommission(
}
}

// If we're asked to decommission ourself we may lose access to cluster RPC,
// so we decommission ourself last. We copy the slice to avoid mutating the
// input slice.
if targetStatus == livenesspb.MembershipStatus_DECOMMISSIONED {
orderedNodeIDs := make([]roachpb.NodeID, len(nodeIDs))
copy(orderedNodeIDs, nodeIDs)
sort.SliceStable(orderedNodeIDs, func(i, j int) bool {
return orderedNodeIDs[j] == s.NodeID()
})
nodeIDs = orderedNodeIDs
}

var event eventpb.EventPayload
var nodeDetails *eventpb.CommonNodeDecommissionDetails
if targetStatus.Decommissioning() {
Expand Down
Loading