Skip to content

Commit

Permalink
server: fix node decommissioning itself
Browse files Browse the repository at this point in the history
Previously, if a node was asked to decommission itself, the
decommissioning process would sometimes hang or fail since the node
would become decommissioned and lose RPC access to the rest of the
cluster while it was building the response.

This patch returns an empty response from the
`adminServer.Decommission()` call when setting the final
`DECOMMISSIONED` status, thereby avoiding further use of cluster RPC
after decommissioning itself. It also defers self-decommissioning until
the end if multiple nodes are being decommissioned.

This change is backwards-compatible with old CLI versions, which will
simply output the now-empty status result set before completing with
"No more data reported on target nodes". The CLI has been updated to
simply omit the empty response.

Some minor related changes to test infrastructure were also made,
specifically adding `TestCluster.ClientConn()` to create a client
connection that's independent from any server RPC contexts.

Release justification: low risk, high benefit changes to existing functionality

Release note (bug fix): Fixed a bug where a node decommissioning process
could sometimes hang or fail when the decommission request was submitted
via the node being decommissioned.
  • Loading branch information
erikgrinaker committed Mar 3, 2021
1 parent 247f251 commit 1aa1de3
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 31 deletions.
9 changes: 1 addition & 8 deletions pkg/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,18 +466,11 @@ func runDecommissionNodeImpl(
NodeIDs: nodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED,
}
resp, err := c.Decommission(ctx, req)
_, err = c.Decommission(ctx, req)
if err != nil {
fmt.Fprintln(stderr)
return errors.Wrap(err, "while trying to mark as decommissioned")
}
if !reflect.DeepEqual(&prevResponse, resp) {
fmt.Fprintln(stderr)
if err := printDecommissionStatus(*resp); err != nil {
return err
}
prevResponse = *resp
}

fmt.Fprintln(os.Stdout, "\nNo more data reported on target nodes. "+
"Please verify cluster health before removing the nodes.")
Expand Down
10 changes: 1 addition & 9 deletions pkg/cmd/roachtest/acceptance.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,7 @@ func registerAcceptance(r *testRegistry) {
{name: "build-analyze", fn: runBuildAnalyze},
{name: "cli/node-status", fn: runCLINodeStatus},
{name: "cluster-init", fn: runClusterInit},
{name: "decommission-self",
fn: runDecommissionSelf,
// Decommissioning self was observed to hang, though not in this test
// when run locally. More investigation is needed; there is a small
// chance that the original observation was in error. However, it
// seems likely that the problem exists even if it is rarely reproduced,
// so this test is skipped.
skip: "https://github.com/cockroachdb/cockroach/issues/56718",
},
{name: "decommission-self", fn: runDecommissionSelf},
{name: "event-log", fn: runEventLog},
{name: "gossip/peerings", fn: runGossipPeerings},
{name: "gossip/restart", fn: runGossipRestart},
Expand Down
4 changes: 1 addition & 3 deletions pkg/cmd/roachtest/mixed_version_decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ func runDecommissionMixedVersions(
// to communicate with the cluster (i.e. most commands against it will fail).
// This is also why we're making sure to avoid decommissioning pinnedUpgrade
// itself, as we use it to check the membership after.
//
// NB: we avoid runNode == targetNode here to temporarily avoid #56718.
fullyDecommissionStep(h.getRandNodeOtherThan(pinnedUpgrade), pinnedUpgrade, ""),
fullyDecommissionStep(h.getRandNodeOtherThan(pinnedUpgrade), h.getRandNode(), ""),
checkOneMembership(pinnedUpgrade, "decommissioned"),
)

Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ go_test(
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
21 changes: 16 additions & 5 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1922,14 +1922,14 @@ func (s *adminServer) DecommissionStatus(
) (*serverpb.DecommissionStatusResponse, error) {
// Get the number of replicas on each node. We *may* not need all of them,
// but that would be more complicated than seems worth it right now.
ns, err := s.server.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return nil, errors.Wrap(err, "loading node statuses")
}

nodeIDs := req.NodeIDs

// If no nodeIDs given, use all nodes.
if len(nodeIDs) == 0 {
ns, err := s.server.status.Nodes(ctx, &serverpb.NodesRequest{})
if err != nil {
return nil, errors.Wrap(err, "loading node statuses")
}
for _, status := range ns.Nodes {
nodeIDs = append(nodeIDs, status.Desc.NodeID)
}
Expand Down Expand Up @@ -1991,6 +1991,9 @@ func (s *adminServer) DecommissionStatus(
}

// Decommission sets the decommission flag to the specified value on the specified node(s).
// When the flag is set to DECOMMISSIONED, an empty response is returned on success -- this
// ensures a node can decommission itself, since the node could otherwise lose RPC access
// to the cluster while building the full response.
func (s *adminServer) Decommission(
ctx context.Context, req *serverpb.DecommissionRequest,
) (*serverpb.DecommissionStatusResponse, error) {
Expand All @@ -2005,6 +2008,14 @@ func (s *adminServer) Decommission(
if err := s.server.Decommission(ctx, req.TargetMembership, nodeIDs); err != nil {
return nil, err
}

// We return an empty response when setting the final DECOMMISSIONED state,
// since a node can be asked to decommission itself which may cause it to
// lose access to cluster RPC and fail to populate the response.
if req.TargetMembership == livenesspb.MembershipStatus_DECOMMISSIONED {
return &serverpb.DecommissionStatusResponse{}, nil
}

return s.DecommissionStatus(ctx, &serverpb.DecommissionStatusRequest{NodeIDs: nodeIDs})
}

Expand Down
70 changes: 70 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/debug"
Expand Down Expand Up @@ -2010,3 +2012,71 @@ func TestEndpointTelemetryBasic(t *testing.T) {
"/cockroach.server.serverpb.Status/Statements",
)))
}

func TestDecommissionSelf(t *testing.T) {
skip.UnderStress(t) // can't handle 7-node clusters
skip.UnderRace(t)
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Set up test cluster.
ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // saves time
})
defer tc.Stopper().Stop(ctx)

// Decommission several nodes, including the node we're submitting the
// decommission request to. We use the admin client in order to test the
// admin server's logic, which involves a subsequent DecommissionStatus
// call which could fail if used from a node that's just decommissioned
// itself and lost access to cluster RPC.
adminClient := serverpb.NewAdminClient(tc.ClientConn(ctx, t, 4))
decomNodeIDs := []roachpb.NodeID{
tc.Server(4).NodeID(),
tc.Server(5).NodeID(),
tc.Server(6).NodeID(),
}

// The DECOMMISSIONING call should return a full status response.
resp, err := adminClient.Decommission(ctx, &serverpb.DecommissionRequest{
NodeIDs: decomNodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
})
require.NoError(t, err)
require.Len(t, resp.Status, len(decomNodeIDs))
for i, nodeID := range decomNodeIDs {
status := resp.Status[i]
require.Equal(t, nodeID, status.NodeID)
// Liveness entries may not have been updated yet.
require.Contains(t, []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_ACTIVE,
livenesspb.MembershipStatus_DECOMMISSIONING,
}, status.Membership, "unexpected membership status %v for node %v", status, nodeID)
}

// The DECOMMISSIONED call should return an empty response, to avoid
// erroring due to loss of cluster RPC access when decommissioning self.
resp, err = adminClient.Decommission(ctx, &serverpb.DecommissionRequest{
NodeIDs: decomNodeIDs,
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED,
})
require.NoError(t, err)
require.Empty(t, resp.Status)

// The nodes should now have been (or soon become) decommissioned.
for i := 0; i < tc.NumServers(); i++ {
srv := tc.Server(i)
expect := livenesspb.MembershipStatus_ACTIVE
for _, nodeID := range decomNodeIDs {
if srv.NodeID() == nodeID {
expect = livenesspb.MembershipStatus_DECOMMISSIONED
break
}
}
require.Eventually(t, func() bool {
liveness, ok := srv.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(srv.NodeID())
return ok && liveness.Membership == expect
}, 5*time.Second, 100*time.Millisecond, "timed out waiting for node %v status %v", i, expect)
}
}
29 changes: 23 additions & 6 deletions pkg/server/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -351,25 +352,29 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) {

tc := testcluster.StartTestCluster(t, numNodes, tcArgs)
defer tc.Stopper().Stop(ctx)
decomSrv := tc.Server(2)

for _, status := range []livenesspb.MembershipStatus{
livenesspb.MembershipStatus_DECOMMISSIONING, livenesspb.MembershipStatus_DECOMMISSIONED,
} {
require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{3}))
require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{decomSrv.NodeID()}))
}

scratchKey := tc.ScratchRange(t)
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
require.Len(t, scratchRange.InternalReplicas, 1)
require.Equal(t, tc.Server(0).NodeID(), scratchRange.InternalReplicas[0].NodeID)

testutils.SucceedsSoon(t, func() error {
for _, idx := range []int{0, 1} {
clusterSrv := tc.Server(idx)
decomSrv := tc.Server(2)

// Within a short period of time, the cluster (n1, n2) will refuse to reach out to n3.
{
_, err := clusterSrv.RPCContext().GRPCDialNode(
decomSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
cause := errors.UnwrapAll(err)
s, ok := grpcstatus.FromError(cause)
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.PermissionDenied {
return errors.Errorf("expected permission denied for n%d->n%d, got %v", clusterSrv.NodeID(), decomSrv.NodeID(), err)
}
Expand All @@ -380,13 +385,25 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) {
_, err := decomSrv.RPCContext().GRPCDialNode(
clusterSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass,
).Connect(ctx)
cause := errors.UnwrapAll(err)
s, ok := grpcstatus.FromError(cause)
s, ok := grpcstatus.FromError(errors.UnwrapAll(err))
if !ok || s.Code() != codes.PermissionDenied {
return errors.Errorf("expected permission denied for n%d->n%d, got %v", decomSrv.NodeID(), clusterSrv.NodeID(), err)
}
}
}
return nil
})

// Trying to scan the scratch range via the decommissioned node should
// now result in a permission denied error.
_, err := decomSrv.DB().Scan(ctx, scratchKey, keys.MaxKey, 1)
require.Error(t, err)

// TODO(erikgrinaker): until cockroachdb/errors preserves grpcstatus.Error
// across errors.EncodeError() we just check for any error, this should
// check that it matches codes.PermissionDenied later.
//err = errors.UnwrapAll(err)
//s, ok := grpcstatus.FromError(err)
//require.True(t, ok, "expected gRPC error, got %T (%v)", err, err)
//require.Equal(t, codes.PermissionDenied, s.Code(), "expected permission denied error")
}
13 changes: 13 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -2129,6 +2130,18 @@ func (s *Server) Decommission(
}
}

// If we're asked to decommission ourself we may lose access to cluster RPC,
// so we decommission ourself last. We copy the slice to avoid mutating the
// input slice.
if targetStatus == livenesspb.MembershipStatus_DECOMMISSIONED {
orderedNodeIDs := make([]roachpb.NodeID, len(nodeIDs))
copy(orderedNodeIDs, nodeIDs)
sort.SliceStable(orderedNodeIDs, func(i, j int) bool {
return orderedNodeIDs[j] == s.NodeID()
})
nodeIDs = orderedNodeIDs
}

var event eventpb.EventPayload
var nodeDetails *eventpb.CommonNodeDecommissionDetails
if targetStatus.Decommissioning() {
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/serverutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ go_library(
"//pkg/util/stop",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@org_golang_google_grpc//:go_default_library",
],
)
6 changes: 6 additions & 0 deletions pkg/testutils/serverutils/test_cluster_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package serverutils

import (
"context"
gosql "database/sql"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"google.golang.org/grpc"
)

// TestClusterInterface defines TestCluster functionality used by tests.
Expand All @@ -43,6 +45,10 @@ type TestClusterInterface interface {
// ServerConn returns a gosql.DB connection to a specific node.
ServerConn(idx int) *gosql.DB

// ClientConn returns a grpc.ClientConn to a specific node, independent
// of any server's RPCContext.
ClientConn(ctx context.Context, t testing.TB, serverIdx int) *grpc.ClientConn

// StopServer stops a single server.
StopServer(idx int)

Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/testcluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_raft_v3//:raft",
"@org_golang_google_grpc//:go_default_library",
],
)

Expand Down
22 changes: 22 additions & 0 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/logtags"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3"
"google.golang.org/grpc"
)

// TestCluster represents a set of TestServers. The hope is that it can be used
Expand Down Expand Up @@ -1417,6 +1418,27 @@ func (tc *TestCluster) GetRaftLeader(t testing.TB, key roachpb.RKey) *kvserver.R
return raftLeaderRepl
}

// ClientConn creates a new gRPC client connection to the specified server,
// using a new RPCContext that doesn't belong to any of the servers. This is
// useful to test client behavior without being affected by any server state.
func (tc *TestCluster) ClientConn(
ctx context.Context, t testing.TB, serverIdx int,
) *grpc.ClientConn {
srv := tc.Server(serverIdx)
srvContext := srv.RPCContext()
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Config: srvContext.Config,
Settings: srvContext.Settings,
AmbientCtx: log.AmbientContext{Tracer: srvContext.Settings.Tracer},
Clock: hlc.NewClock(hlc.UnixNano, 0),
Stopper: tc.Stopper(),
})
conn, err := rpcContext.GRPCDialNode(srv.RPCAddr(), srv.NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err, "failed to connect to server %s", srv.RPCAddr())
return conn
}

// GetAdminClient gets the severpb.AdminClient for the specified server.
func (tc *TestCluster) GetAdminClient(
ctx context.Context, t testing.TB, serverIdx int,
Expand Down

0 comments on commit 1aa1de3

Please sign in to comment.