Skip to content

Commit

Permalink
server: fix node decommissioning itself
Browse files Browse the repository at this point in the history
Previously, if a node was asked to decommission itself, the
decommissioning process would sometimes hang or fail since the node
would become decommissioned and lose RPC access to the rest of the
cluster while it was building the response.

This patch returns an empty response from the
`adminServer.Decommission()` call when setting the final
`DECOMMISSIONED` status, thereby avoiding further use of cluster RPC
after decommissioning itself. It also defers self-decommissioning until
the end if multiple nodes are being decommissioned.

This change is backwards-compatible with old CLI versions, which will
simply output the now-empty status result set before completing with
"No more data reported on target nodes". The CLI has been updated to
simply omit the empty response.

Release justification: bug fixes and low-risk updates to new functionality

Release note (bug fix): Fixed a bug from 21.1-alpha where a node
decommissioning process could sometimes hang or fail when the
decommission request was submitted via the node being decommissioned.
  • Loading branch information
erikgrinaker committed Mar 3, 2021
1 parent 247f251 commit badefde
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 31 deletions.
9 changes: 1 addition & 8 deletions pkg/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,18 +466,11 @@ func runDecommissionNodeImpl(
NodeIDs: nodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED,
}
resp, err := c.Decommission(ctx, req)
_, err = c.Decommission(ctx, req)
if err != nil {
fmt.Fprintln(stderr)
return errors.Wrap(err, "while trying to mark as decommissioned")
}
if !reflect.DeepEqual(&prevResponse, resp) {
fmt.Fprintln(stderr)
if err := printDecommissionStatus(*resp); err != nil {
return err
}
prevResponse = *resp
}

fmt.Fprintln(os.Stdout, "\nNo more data reported on target nodes. "+
"Please verify cluster health before removing the nodes.")
Expand Down
10 changes: 1 addition & 9 deletions pkg/cmd/roachtest/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,7 @@ func registerAcceptance(r *testRegistry) {
{name: "build-analyze", fn: runBuildAnalyze},
{name: "cli/node-status", fn: runCLINodeStatus},
{name: "cluster-init", fn: runClusterInit},
{name: "decommission-self",
fn: runDecommissionSelf,
// Decommissioning self was observed to hang, though not in this test
// when run locally. More investigation is needed; there is a small
// chance that the original observation was in error. However, it
// seems likely that the problem exists even if it is rarely reproduced,
// so this test is skipped.
skip: "https://github.com/cockroachdb/cockroach/issues/56718",
},
{name: "decommission-self", fn: runDecommissionSelf},
{name: "event-log", fn: runEventLog},
{name: "gossip/peerings", fn: runGossipPeerings},
{name: "gossip/restart", fn: runGossipRestart},
Expand Down
4 changes: 1 addition & 3 deletions pkg/cmd/roachtest/mixed_version_decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ func runDecommissionMixedVersions(
// to communicate with the cluster (i.e. most commands against it will fail).
// This is also why we're making sure to avoid decommissioning pinnedUpgrade
// itself, as we use it to check the membership after.
//
// NB: we avoid runNode == targetNode here to temporarily avoid #56718.
fullyDecommissionStep(h.getRandNodeOtherThan(pinnedUpgrade), pinnedUpgrade, ""),
fullyDecommissionStep(h.getRandNodeOtherThan(pinnedUpgrade), h.getRandNode(), ""),
checkOneMembership(pinnedUpgrade, "decommissioned"),
)

Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ go_test(
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
21 changes: 16 additions & 5 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1922,14 +1922,14 @@ func (s *adminServer) DecommissionStatus(
) (*serverpb.DecommissionStatusResponse, error) {
// Get the number of replicas on each node. We *may* not need all of them,
// but that would be more complicated than seems worth it right now.
ns, err := s.server.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return nil, errors.Wrap(err, "loading node statuses")
}

nodeIDs := req.NodeIDs

// If no nodeIDs given, use all nodes.
if len(nodeIDs) == 0 {
ns, err := s.server.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return nil, errors.Wrap(err, "loading node statuses")
}
for _, status := range ns.Nodes {
nodeIDs = append(nodeIDs, status.Desc.NodeID)
}
Expand Down Expand Up @@ -1991,6 +1991,9 @@ func (s *adminServer) DecommissionStatus(
}

// Decommission sets the decommission flag to the specified value on the specified node(s).
// When the flag is set to DECOMMISSIONED, an empty response is returned on success -- this
// ensures a node can decommission itself, since the node could otherwise lose RPC access
// to the cluster while building the full response.
func (s *adminServer) Decommission(
ctx context.Context, req *serverpb.DecommissionRequest,
) (*serverpb.DecommissionStatusResponse, error) {
Expand All @@ -2005,6 +2008,14 @@ func (s *adminServer) Decommission(
if err := s.server.Decommission(ctx, req.TargetMembership, nodeIDs); err != nil {
return nil, err
}

// We return an empty response when setting the final DECOMMISSIONED state,
// since a node can be asked to decommission itself which may cause it to
// lose access to cluster RPC and fail to populate the response.
if req.TargetMembership == livenesspb.MembershipStatus_DECOMMISSIONED {
return &serverpb.DecommissionStatusResponse{}, nil
}

return s.DecommissionStatus(ctx, &serverpb.DecommissionStatusRequest{NodeIDs: nodeIDs})
}

Expand Down
73 changes: 73 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/debug"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
Expand Down Expand Up @@ -2010,3 +2013,73 @@ func TestEndpointTelemetryBasic(t *testing.T) {
"/cockroach.server.serverpb.Status/Statements",
)))
}

func TestDecommissionSelf(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderRace(t) // can't handle 7-node clusters

// Set up test cluster.
ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // saves time
})
defer tc.Stopper().Stop(ctx)

// Decommission several nodes, including the node we're submitting the
// decommission request to. We use the admin client in order to test the
// admin server's logic, which involves a subsequent DecommissionStatus
// call which could fail if used from a node that's just decommissioned.
adminSrv := tc.Server(4)
conn, err := adminSrv.RPCContext().GRPCDialNode(
adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
adminClient := serverpb.NewAdminClient(conn)
decomNodeIDs := []roachpb.NodeID{
tc.Server(4).NodeID(),
tc.Server(5).NodeID(),
tc.Server(6).NodeID(),
}

// The DECOMMISSIONING call should return a full status response.
resp, err := adminClient.Decommission(ctx, &serverpb.DecommissionRequest{
NodeIDs: decomNodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
})
require.NoError(t, err)
require.Len(t, resp.Status, len(decomNodeIDs))
for i, nodeID := range decomNodeIDs {
status := resp.Status[i]
require.Equal(t, nodeID, status.NodeID)
// Liveness entries may not have been updated yet.
require.Contains(t, []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_ACTIVE,
livenesspb.MembershipStatus_DECOMMISSIONING,
}, status.Membership, "unexpected membership status %v for node %v", status, nodeID)
}

// The DECOMMISSIONED call should return an empty response, to avoid
// erroring due to loss of cluster RPC access when decommissioning self.
resp, err = adminClient.Decommission(ctx, &serverpb.DecommissionRequest{
NodeIDs: decomNodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED,
})
require.NoError(t, err)
require.Empty(t, resp.Status)

// The nodes should now have been (or soon become) decommissioned.
for i := 0; i < tc.NumServers(); i++ {
srv := tc.Server(i)
expect := livenesspb.MembershipStatus_ACTIVE
for _, nodeID := range decomNodeIDs {
if srv.NodeID() == nodeID {
expect = livenesspb.MembershipStatus_DECOMMISSIONED
break
}
}
require.Eventually(t, func() bool {
liveness, ok := srv.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(srv.NodeID())
return ok && liveness.Membership == expect
}, 5*time.Second, 100*time.Millisecond, "timed out waiting for node %v status %v", i, expect)
}
}
29 changes: 23 additions & 6 deletions pkg/server/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -352,24 +353,28 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) {
tc := testcluster.StartTestCluster(t, numNodes, tcArgs)
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
require.Len(t, scratchRange.InternalReplicas, 1)
require.Equal(t, tc.Server(0).NodeID(), scratchRange.InternalReplicas[0].NodeID)

decomSrv := tc.Server(2)
for _, status := range []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_DECOMMISSIONING, livenesspb.MembershipStatus_DECOMMISSIONED,
} {
require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{3}))
require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{decomSrv.NodeID()}))
}

testutils.SucceedsSoon(t, func() error {
for _, idx := range []int{0, 1} {
clusterSrv := tc.Server(idx)
decomSrv := tc.Server(2)

// Within a short period of time, the cluster (n1, n2) will refuse to reach out to n3.
{
_, err := clusterSrv.RPCContext().GRPCDialNode(
decomSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
cause := errors.UnwrapAll(err)
s, ok := grpcstatus.FromError(cause)
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.PermissionDenied {
return errors.Errorf("expected permission denied for n%d->n%d, got %v", clusterSrv.NodeID(), decomSrv.NodeID(), err)
}
Expand All @@ -380,13 +385,25 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) {
_, err := decomSrv.RPCContext().GRPCDialNode(
clusterSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
cause := errors.UnwrapAll(err)
s, ok := grpcstatus.FromError(cause)
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.PermissionDenied {
return errors.Errorf("expected permission denied for n%d->n%d, got %v", decomSrv.NodeID(), clusterSrv.NodeID(), err)
}
}
}
return nil
})

// Trying to scan the scratch range via the decommissioned node should
// now result in a permission denied error.
_, err := decomSrv.DB().Scan(ctx, scratchKey, keys.MaxKey, 1)
require.Error(t, err)

// TODO(erikgrinaker): until cockroachdb/errors preserves grpcstatus.Error
// across errors.EncodeError() we just check for any error, this should
// check that it matches codes.PermissionDenied later.
//err = errors.UnwrapAll(err)
//s, ok := grpcstatus.FromError(err)
//require.True(t, ok, "expected gRPC error, got %T (%v)", err, err)
//require.Equal(t, codes.PermissionDenied, s.Code(), "expected permission denied error")
}
13 changes: 13 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -2129,6 +2130,18 @@ func (s *Server) Decommission(
}
}

// If we're asked to decommission ourself we may lose access to cluster RPC,
// so we decommission ourself last. We copy the slice to avoid mutating the
// input slice.
if targetStatus == livenesspb.MembershipStatus_DECOMMISSIONED {
orderedNodeIDs := make([]roachpb.NodeID, len(nodeIDs))
copy(orderedNodeIDs, nodeIDs)
sort.SliceStable(orderedNodeIDs, func(i, j int) bool {
return orderedNodeIDs[j] == s.NodeID()
})
nodeIDs = orderedNodeIDs
}

var event eventpb.EventPayload
var nodeDetails *eventpb.CommonNodeDecommissionDetails
if targetStatus.Decommissioning() {
Expand Down

0 comments on commit badefde

Please sign in to comment.