diff --git a/pkg/cli/node.go b/pkg/cli/node.go index a594bf5595c3..1eb6ba9dfb8b 100644 --- a/pkg/cli/node.go +++ b/pkg/cli/node.go @@ -466,18 +466,11 @@ func runDecommissionNodeImpl( NodeIDs: nodeIDs, TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED, } - resp, err := c.Decommission(ctx, req) + _, err = c.Decommission(ctx, req) if err != nil { fmt.Fprintln(stderr) return errors.Wrap(err, "while trying to mark as decommissioned") } - if !reflect.DeepEqual(&prevResponse, resp) { - fmt.Fprintln(stderr) - if err := printDecommissionStatus(*resp); err != nil { - return err - } - prevResponse = *resp - } fmt.Fprintln(os.Stdout, "\nNo more data reported on target nodes. "+ "Please verify cluster health before removing the nodes.") diff --git a/pkg/cmd/roachtest/acceptance.go b/pkg/cmd/roachtest/acceptance.go index ff4be949bbbe..b78ab2b4c8d7 100644 --- a/pkg/cmd/roachtest/acceptance.go +++ b/pkg/cmd/roachtest/acceptance.go @@ -39,15 +39,7 @@ func registerAcceptance(r *testRegistry) { {name: "build-analyze", fn: runBuildAnalyze}, {name: "cli/node-status", fn: runCLINodeStatus}, {name: "cluster-init", fn: runClusterInit}, - {name: "decommission-self", - fn: runDecommissionSelf, - // Decommissioning self was observed to hang, though not in this test - // when run locally. More investigation is needed; there is a small - // chance that the original observation was in error. However, it - // seems likely that the problem exists even if it is rarely reproduced, - // so this test is skipped. - skip: "https://github.com/cockroachdb/cockroach/issues/56718", - }, + {name: "decommission-self", fn: runDecommissionSelf}, {name: "event-log", fn: runEventLog}, {name: "gossip/peerings", fn: runGossipPeerings}, {name: "gossip/restart", fn: runGossipRestart}, diff --git a/pkg/cmd/roachtest/mixed_version_decommission.go b/pkg/cmd/roachtest/mixed_version_decommission.go index 9ad0841a956d..aed72d9c93a9 100644 --- a/pkg/cmd/roachtest/mixed_version_decommission.go +++ b/pkg/cmd/roachtest/mixed_version_decommission.go @@ -87,9 +87,7 @@ func runDecommissionMixedVersions( // to communicate with the cluster (i.e. most commands against it will fail). // This is also why we're making sure to avoid decommissioning pinnedUpgrade // itself, as we use it to check the membership after. - // - // NB: we avoid runNode == targetNode here to temporarily avoid #56718. - fullyDecommissionStep(h.getRandNodeOtherThan(pinnedUpgrade), pinnedUpgrade, ""), + fullyDecommissionStep(h.getRandNodeOtherThan(pinnedUpgrade), h.getRandNode(), ""), checkOneMembership(pinnedUpgrade, "decommissioned"), ) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 0616a5c9bf47..d30e408d8486 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -288,6 +288,7 @@ go_test( "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", + "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", "//pkg/rpc", diff --git a/pkg/server/admin.go b/pkg/server/admin.go index e9c8cba606db..c00c482cddf4 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1922,14 +1922,14 @@ func (s *adminServer) DecommissionStatus( ) (*serverpb.DecommissionStatusResponse, error) { // Get the number of replicas on each node. We *may* not need all of them, // but that would be more complicated than seems worth it right now. - ns, err := s.server.status.Nodes(ctx, &serverpb.NodesRequest{}) - if err != nil { - return nil, errors.Wrap(err, "loading node statuses") - } - nodeIDs := req.NodeIDs + // If no nodeIDs given, use all nodes. if len(nodeIDs) == 0 { + ns, err := s.server.status.Nodes(ctx, &serverpb.NodesRequest{}) + if err != nil { + return nil, errors.Wrap(err, "loading node statuses") + } for _, status := range ns.Nodes { nodeIDs = append(nodeIDs, status.Desc.NodeID) } @@ -1991,6 +1991,9 @@ func (s *adminServer) DecommissionStatus( } // Decommission sets the decommission flag to the specified value on the specified node(s). +// When the flag is set to DECOMMISSIONED, an empty response is returned on success -- this +// ensures a node can decommission itself, since the node could otherwise lose RPC access +// to the cluster while building the full response. func (s *adminServer) Decommission( ctx context.Context, req *serverpb.DecommissionRequest, ) (*serverpb.DecommissionStatusResponse, error) { @@ -2005,6 +2008,14 @@ func (s *adminServer) Decommission( if err := s.server.Decommission(ctx, req.TargetMembership, nodeIDs); err != nil { return nil, err } + + // We return an empty response when setting the final DECOMMISSIONED state, + // since a node can be asked to decommission itself which may cause it to + // lose access to cluster RPC and fail to populate the response. + if req.TargetMembership == livenesspb.MembershipStatus_DECOMMISSIONED { + return &serverpb.DecommissionStatusResponse{}, nil + } + return s.DecommissionStatus(ctx, &serverpb.DecommissionStatusRequest{NodeIDs: nodeIDs}) } diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index 6b0844faf74d..a95ac3a43432 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -34,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/debug" @@ -2010,3 +2012,71 @@ func TestEndpointTelemetryBasic(t *testing.T) { "/cockroach.server.serverpb.Status/Statements", ))) } + +func TestDecommissionSelf(t *testing.T) { + skip.UnderStress(t) // can't handle 7-node clusters + skip.UnderRace(t) + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Set up test cluster. + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // saves time + }) + defer tc.Stopper().Stop(ctx) + + // Decommission several nodes, including the node we're submitting the + // decommission request to. We use the admin client in order to test the + // admin server's logic, which involves a subsequent DecommissionStatus + // call which could fail if used from a node that's just decommissioned + // itself and lost access to cluster RPC. + adminClient := serverpb.NewAdminClient(tc.ClientConn(ctx, t, 4)) + decomNodeIDs := []roachpb.NodeID{ + tc.Server(4).NodeID(), + tc.Server(5).NodeID(), + tc.Server(6).NodeID(), + } + + // The DECOMMISSIONING call should return a full status response. + resp, err := adminClient.Decommission(ctx, &serverpb.DecommissionRequest{ + NodeIDs: decomNodeIDs, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, + }) + require.NoError(t, err) + require.Len(t, resp.Status, len(decomNodeIDs)) + for i, nodeID := range decomNodeIDs { + status := resp.Status[i] + require.Equal(t, nodeID, status.NodeID) + // Liveness entries may not have been updated yet. + require.Contains(t, []livenesspb.MembershipStatus{ + livenesspb.MembershipStatus_ACTIVE, + livenesspb.MembershipStatus_DECOMMISSIONING, + }, status.Membership, "unexpected membership status %v for node %v", status, nodeID) + } + + // The DECOMMISSIONED call should return an empty response, to avoid + // erroring due to loss of cluster RPC access when decommissioning self. + resp, err = adminClient.Decommission(ctx, &serverpb.DecommissionRequest{ + NodeIDs: decomNodeIDs, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONED, + }) + require.NoError(t, err) + require.Empty(t, resp.Status) + + // The nodes should now have been (or soon become) decommissioned. + for i := 0; i < tc.NumServers(); i++ { + srv := tc.Server(i) + expect := livenesspb.MembershipStatus_ACTIVE + for _, nodeID := range decomNodeIDs { + if srv.NodeID() == nodeID { + expect = livenesspb.MembershipStatus_DECOMMISSIONED + break + } + } + require.Eventually(t, func() bool { + liveness, ok := srv.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(srv.NodeID()) + return ok && liveness.Membership == expect + }, 5*time.Second, 100*time.Millisecond, "timed out waiting for node %v status %v", i, expect) + } +} diff --git a/pkg/server/connectivity_test.go b/pkg/server/connectivity_test.go index a167ed9a56a6..ab1323613a24 100644 --- a/pkg/server/connectivity_test.go +++ b/pkg/server/connectivity_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -351,25 +352,29 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) { tc := testcluster.StartTestCluster(t, numNodes, tcArgs) defer tc.Stopper().Stop(ctx) + decomSrv := tc.Server(2) for _, status := range []livenesspb.MembershipStatus{ livenesspb.MembershipStatus_DECOMMISSIONING, livenesspb.MembershipStatus_DECOMMISSIONED, } { - require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{3})) + require.NoError(t, tc.Servers[0].Decommission(ctx, status, []roachpb.NodeID{decomSrv.NodeID()})) } + scratchKey := tc.ScratchRange(t) + scratchRange := tc.LookupRangeOrFatal(t, scratchKey) + require.Len(t, scratchRange.InternalReplicas, 1) + require.Equal(t, tc.Server(0).NodeID(), scratchRange.InternalReplicas[0].NodeID) + testutils.SucceedsSoon(t, func() error { for _, idx := range []int{0, 1} { clusterSrv := tc.Server(idx) - decomSrv := tc.Server(2) // Within a short period of time, the cluster (n1, n2) will refuse to reach out to n3. { _, err := clusterSrv.RPCContext().GRPCDialNode( decomSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass, ).Connect(ctx) - cause := errors.UnwrapAll(err) - s, ok := grpcstatus.FromError(cause) + s, ok := grpcstatus.FromError(errors.UnwrapAll(err)) if !ok || s.Code() != codes.PermissionDenied { return errors.Errorf("expected permission denied for n%d->n%d, got %v", clusterSrv.NodeID(), decomSrv.NodeID(), err) } @@ -380,8 +385,7 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) { _, err := decomSrv.RPCContext().GRPCDialNode( clusterSrv.RPCAddr(), decomSrv.NodeID(), rpc.DefaultClass, ).Connect(ctx) - cause := errors.UnwrapAll(err) - s, ok := grpcstatus.FromError(cause) + s, ok := grpcstatus.FromError(errors.UnwrapAll(err)) if !ok || s.Code() != codes.PermissionDenied { return errors.Errorf("expected permission denied for n%d->n%d, got %v", decomSrv.NodeID(), clusterSrv.NodeID(), err) } @@ -389,4 +393,17 @@ func TestDecommissionedNodeCannotConnect(t *testing.T) { } return nil }) + + // Trying to scan the scratch range via the decommissioned node should + // now result in a permission denied error. + _, err := decomSrv.DB().Scan(ctx, scratchKey, keys.MaxKey, 1) + require.Error(t, err) + + // TODO(erikgrinaker): until cockroachdb/errors preserves grpcstatus.Error + // across errors.EncodeError() we just check for any error, this should + // check that it matches codes.PermissionDenied later. + //err = errors.UnwrapAll(err) + //s, ok := grpcstatus.FromError(err) + //require.True(t, ok, "expected gRPC error, got %T (%v)", err, err) + //require.Equal(t, codes.PermissionDenied, s.Code(), "expected permission denied error") } diff --git a/pkg/server/server.go b/pkg/server/server.go index 4772888358b6..87fcc0f5c9a8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -23,6 +23,7 @@ import ( "path/filepath" "reflect" "runtime" + "sort" "strconv" "strings" "sync" @@ -2129,6 +2130,18 @@ func (s *Server) Decommission( } } + // If we're asked to decommission ourself we may lose access to cluster RPC, + // so we decommission ourself last. We copy the slice to avoid mutating the + // input slice. + if targetStatus == livenesspb.MembershipStatus_DECOMMISSIONED { + orderedNodeIDs := make([]roachpb.NodeID, len(nodeIDs)) + copy(orderedNodeIDs, nodeIDs) + sort.SliceStable(orderedNodeIDs, func(i, j int) bool { + return orderedNodeIDs[j] == s.NodeID() + }) + nodeIDs = orderedNodeIDs + } + var event eventpb.EventPayload var nodeDetails *eventpb.CommonNodeDecommissionDetails if targetStatus.Decommissioning() { diff --git a/pkg/testutils/serverutils/BUILD.bazel b/pkg/testutils/serverutils/BUILD.bazel index ba892d2fac39..9ab2c7f4abb3 100644 --- a/pkg/testutils/serverutils/BUILD.bazel +++ b/pkg/testutils/serverutils/BUILD.bazel @@ -26,5 +26,6 @@ go_library( "//pkg/util/stop", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", + "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index c8bc71157c69..63c200072e56 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -18,6 +18,7 @@ package serverutils import ( + "context" gosql "database/sql" "testing" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/stop" + "google.golang.org/grpc" ) // TestClusterInterface defines TestCluster functionality used by tests. @@ -43,6 +45,10 @@ type TestClusterInterface interface { // ServerConn returns a gosql.DB connection to a specific node. ServerConn(idx int) *gosql.DB + // ClientConn returns a grpc.ClientConn to a specific node, independent + // of any server's RPCContext. + ClientConn(ctx context.Context, t testing.TB, serverIdx int) *grpc.ClientConn + // StopServer stops a single server. StopServer(idx int) diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index 7f5f0bbb74b3..59ad3f2daf41 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_stretchr_testify//require", "@io_etcd_go_etcd_raft_v3//:raft", + "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 5b5e551b3968..59182f119e8d 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/logtags" "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" + "google.golang.org/grpc" ) // TestCluster represents a set of TestServers. The hope is that it can be used @@ -1417,6 +1418,27 @@ func (tc *TestCluster) GetRaftLeader(t testing.TB, key roachpb.RKey) *kvserver.R return raftLeaderRepl } +// ClientConn creates a new gRPC client connection to the specified server, +// using a new RPCContext that doesn't belong to any of the servers. This is +// useful to test client behavior without being affected by any server state. +func (tc *TestCluster) ClientConn( + ctx context.Context, t testing.TB, serverIdx int, +) *grpc.ClientConn { + srv := tc.Server(serverIdx) + srvContext := srv.RPCContext() + rpcContext := rpc.NewContext(rpc.ContextOptions{ + TenantID: roachpb.SystemTenantID, + Config: srvContext.Config, + Settings: srvContext.Settings, + AmbientCtx: log.AmbientContext{Tracer: srvContext.Settings.Tracer}, + Clock: hlc.NewClock(hlc.UnixNano, 0), + Stopper: tc.Stopper(), + }) + conn, err := rpcContext.GRPCDialNode(srv.RPCAddr(), srv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err, "failed to connect to server %s", srv.RPCAddr()) + return conn +} + // GetAdminClient gets the severpb.AdminClient for the specified server. func (tc *TestCluster) GetAdminClient( ctx context.Context, t testing.TB, serverIdx int,