Skip to content

Commit

Permalink
server: delete node status entries during decommissioning
Browse files Browse the repository at this point in the history
Decommissioning a node did not remove its node status entry in the
`status-node-` keyspace. This caused the decommissioned node to remain
visible via e.g. `Status/Nodes` gRPC calls, affecting other systems that
did not check against the node's liveness entry.

This commit deletes a node's status entry when it is being
decommissioned. Unfortunately, since the status entry is inline this
cannot be transactional with the liveness entry, which can cause a stale
status entry to be left behind if the operation should fail or the node
should crash. In these cases, the decommission operation (which is
idempotent) must be run again to remove it.

To avoid a race condition where the decommissioned node's status
recorder loop resurrects the status entry after it has been removed,
this also changes the recorder to only update the status entry if it
already exists. The initial status entry is written during node startup,
and the node will now fail to start if the entry cannot be written.

Release note (bug fix): remove a node's status entry when the node is
decommissioned, to prevent it from appearing in API calls and UIs, and
avoid it affecting node constraints such as localities and attributes
for various operations.
  • Loading branch information
erikgrinaker committed Dec 15, 2020
1 parent 1c7e21a commit 98d6f49
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 116 deletions.
5 changes: 5 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@

`GET /_status/nodes`

Nodes returns status info for all commissioned nodes. Decommissioned nodes
are not included, except in rare cases where the node doing the
decommissioning crashed before completing the operation. In these cases,
the decommission operation can be rerun to clean up the status entry.

Don't introduce additional usages of this RPC. See #50707 for more details.
The underlying response type is something we're looking to get rid of.

Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/interactive_tests/test_demo_node_cmds.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,22 @@ eexpect "node 6 has been added with locality \"region=ca-central,zone=a\""
send "show regions from cluster;\r"
eexpect "ca-central | \{a\}"
eexpect "us-east1 | \{b,c,d\}"
eexpect "us-west1 | \{a,b\}"
eexpect "us-west1 | \{b\}"

# We use kv_node_status here because gossip_liveness is timing dependant.
# Node 4's status entry should have been removed by now.
send "select node_id, locality from crdb_internal.kv_node_status;\r"
eexpect "1 | region=us-east1,az=b"
eexpect "2 | region=us-east1,az=c"
eexpect "3 | region=us-east1,az=d"
eexpect "4 | region=us-west1,az=a"
eexpect "5 | region=us-west1,az=b"
eexpect "6 | region=ca-central,zone=a"

# Shut down the newly created node.
send "\\demo shutdown 6\r"
eexpect "node 6 has been shutdown"

# By now the node should have stabalized in gossip which allows us to query the more detailed information there.
# By now the node should have stabilized in gossip which allows us to query the more detailed information there.
send "select node_id, draining, decommissioning, membership from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false | active"
eexpect "2 | false | false | active"
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ go_test(
"authentication_test.go",
"config_test.go",
"connectivity_test.go",
"decommission_test.go",
"drain_test.go",
"graphite_test.go",
"intent_test.go",
Expand Down
57 changes: 57 additions & 0 deletions pkg/server/decommission_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// This test should really be in server_test.go but can't because it uses the
// server package, which is also imported by testutils/testcluster, causing
// an import cycle.
func TestDecommissionNodeStatus(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual, // saves time
})
defer tc.Stopper().Stop(ctx)
decomNodeID := tc.Server(2).NodeID()

// Make sure node status entries have been created.
for _, srv := range tc.Servers {
entry, err := srv.DB().Get(ctx, keys.NodeStatusKey(srv.NodeID()))
require.NoError(t, err)
require.NotNil(t, entry.Value, "node status entry not found for node %d", srv.NodeID())
}

require.NoError(t, tc.Servers[0].Decommission(
ctx, livenesspb.MembershipStatus_DECOMMISSIONING, []roachpb.NodeID{decomNodeID}))
require.NoError(t, tc.Servers[0].Decommission(
ctx, livenesspb.MembershipStatus_DECOMMISSIONED, []roachpb.NodeID{decomNodeID}))

// The node status entry should now have been cleaned up.
entry, err := tc.Server(0).DB().Get(ctx, keys.NodeStatusKey(decomNodeID))
require.NoError(t, err)
require.Nil(t, entry.Value, "found stale node status entry for node %d", decomNodeID)
}
27 changes: 19 additions & 8 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func (n *Node) initializeAdditionalStores(

// Write a new status summary after all stores have been initialized; this
// helps the UI remain responsive when new nodes are added.
if err := n.writeNodeStatus(ctx, 0 /* alertTTL */); err != nil {
if err := n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */); err != nil {
log.Warningf(ctx, "error writing node summary after store bootstrap: %s", err)
}

Expand Down Expand Up @@ -701,11 +701,13 @@ func (n *Node) startGraphiteStatsExporter(st *cluster.Settings) {

// startWriteNodeStatus begins periodically persisting status summaries for the
// node and its stores.
func (n *Node) startWriteNodeStatus(frequency time.Duration) {
func (n *Node) startWriteNodeStatus(frequency time.Duration) error {
ctx := logtags.AddTag(n.AnnotateCtx(context.Background()), "summaries", nil)
// Immediately record summaries once on server startup.
if err := n.writeNodeStatus(ctx, 0 /* alertTTL */); err != nil {
log.Warningf(ctx, "error recording initial status summaries: %s", err)
// Immediately record summaries once on server startup. The update loop below
// will only update the key if it exists, to avoid race conditions during
// node decommissioning, so we have to error out if we can't create it.
if err := n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */); err != nil {
return errors.Wrap(err, "error recording initial status summaries")
}
n.stopper.RunWorker(ctx, func(ctx context.Context) {
// Write a status summary immediately; this helps the UI remain
Expand All @@ -719,19 +721,28 @@ func (n *Node) startWriteNodeStatus(frequency time.Duration) {
// alerts don't disappear and reappear spuriously while at the same
// time ensuring that an alert doesn't linger for too long after having
// resolved.
if err := n.writeNodeStatus(ctx, 2*frequency); err != nil {
//
// The status key must already exist, to avoid race conditions
// during decommissioning of this node. Decommissioning may be
// carried out by a different node, so this avoids resurrecting
// the status entry after the decommissioner has removed it.
// See Server.Decommission().
if err := n.writeNodeStatus(ctx, 2*frequency, true /* mustExist */); err != nil {
log.Warningf(ctx, "error recording status summaries: %s", err)
}
case <-n.stopper.ShouldStop():
return
}
}
})
return nil
}

// writeNodeStatus retrieves status summaries from the supplied
// NodeStatusRecorder and persists them to the cockroach data store.
func (n *Node) writeNodeStatus(ctx context.Context, alertTTL time.Duration) error {
// If mustExist is true the status key must already exist and must
// not change during writing -- if false, the status is always written.
func (n *Node) writeNodeStatus(ctx context.Context, alertTTL time.Duration, mustExist bool) error {
var err error
if runErr := n.stopper.RunTask(ctx, "node.Node: writing summary", func(ctx context.Context) {
nodeStatus := n.recorder.GenerateNodeStatus(ctx)
Expand Down Expand Up @@ -762,7 +773,7 @@ func (n *Node) writeNodeStatus(ctx context.Context, alertTTL time.Duration) erro
// state (since it'll be incremented every ~10s).
}

err = n.recorder.WriteNodeStatus(ctx, n.storeCfg.DB, *nodeStatus)
err = n.recorder.WriteNodeStatus(ctx, n.storeCfg.DB, *nodeStatus, mustExist)
}); runErr != nil {
err = runErr
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
Expand Down Expand Up @@ -1620,7 +1621,9 @@ func (s *Server) PreStart(ctx context.Context) error {
})

// Begin recording status summaries.
s.node.startWriteNodeStatus(base.DefaultMetricsSampleInterval)
if err := s.node.startWriteNodeStatus(base.DefaultMetricsSampleInterval); err != nil {
return err
}

// Start the protected timestamp subsystem.
if err := s.protectedtsProvider.Start(ctx, s.stopper); err != nil {
Expand Down Expand Up @@ -2063,6 +2066,18 @@ func (s *Server) Decommission(
log.Errorf(ctx, "unable to record event: %+v: %+v", event, err)
}
}

// Similarly to the log event above, we may not be able to clean up the
// status entry if we crash or fail -- the status entry is inline, and
// thus cannot be transactional. However, since decommissioning is
// idempotent, we can attempt to remove the key regardless of whether
// the status changed, such that a stale key can be removed by
// decommissioning the node again.
if targetStatus.Decommissioned() {
if err := s.db.PutInline(ctx, keys.NodeStatusKey(nodeID), nil); err != nil {
log.Errorf(ctx, "unable to clean up node status data for node %d: %s", nodeID, err)
}
}
}
return nil
}
Expand Down
Loading

0 comments on commit 98d6f49

Please sign in to comment.