Skip to content

Commit

Permalink
Merge #56529
Browse files Browse the repository at this point in the history
56529: server: delete node status entries during decommissioning r=nvanbenschoten a=erikgrinaker

Decommissioning a node did not remove its node status entry in the
`status-node-` keyspace. This caused the decommissioned node to remain
visible via e.g. `Status/Nodes` gRPC calls, affecting other systems that
did not check against the node's liveness entry.

This commit deletes a node's status entry when it is being
decommissioned. Unfortunately, since the status entry is inline this
cannot be transactional with the liveness entry, which can cause a stale
status entry to be left behind if the operation should fail or the node
should crash. In these cases, the decommission operation (which is
idempotent) must be run again to remove it.

To avoid a race condition where the decommissioned node's status
recorder loop resurrects the status entry after it has been removed,
this also changes the recorder to only update the status entry if it
already exists. The initial status entry is written during node startup,
and the node will now fail to start if the entry cannot be written.

Release note (bug fix): remove a node's status entry when the node is
decommissioned, to prevent it from appearing in API calls and UIs, and
avoid it affecting node constraints such as localities and attributes
for various operations.

Resolves #51304.

This also adds support for conditional puts of inline values, as a separate commit -- see commit message for details.

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Dec 17, 2020
2 parents 740d934 + 090eef9 commit 1abab27
Show file tree
Hide file tree
Showing 23 changed files with 1,157 additions and 727 deletions.
5 changes: 5 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@

`GET /_status/nodes`

Nodes returns status info for all commissioned nodes. Decommissioned nodes
are not included, except in rare cases where the node doing the
decommissioning crashed before completing the operation. In these cases,
the decommission operation can be rerun to clean up the status entry.

Don't introduce additional usages of this RPC. See #50707 for more details.
The underlying response type is something we're looking to get rid of.

Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-8</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
6 changes: 3 additions & 3 deletions pkg/cli/interactive_tests/test_demo_node_cmds.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,22 @@ eexpect "node 6 has been added with locality \"region=ca-central,zone=a\""
send "show regions from cluster;\r"
eexpect "ca-central | \{a\}"
eexpect "us-east1 | \{b,c,d\}"
eexpect "us-west1 | \{a,b\}"
eexpect "us-west1 | \{b\}"

# We use kv_node_status here because gossip_liveness is timing dependant.
# Node 4's status entry should have been removed by now.
send "select node_id, locality from crdb_internal.kv_node_status;\r"
eexpect "1 | region=us-east1,az=b"
eexpect "2 | region=us-east1,az=c"
eexpect "3 | region=us-east1,az=d"
eexpect "4 | region=us-west1,az=a"
eexpect "5 | region=us-west1,az=b"
eexpect "6 | region=ca-central,zone=a"

# Shut down the newly created node.
send "\\demo shutdown 6\r"
eexpect "node 6 has been shutdown"

# By now the node should have stabalized in gossip which allows us to query the more detailed information there.
# By now the node should have stabilized in gossip which allows us to query the more detailed information there.
send "select node_id, draining, decommissioning, membership from crdb_internal.gossip_liveness ORDER BY node_id;\r"
eexpect "1 | false | false | active"
eexpect "2 | false | false | active"
Expand Down
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ const (
UniqueWithoutIndexConstraints
// VirtualComputedColumns is when virtual computed columns are supported.
VirtualComputedColumns
// CPutInline is conditional put support for inline values.
CPutInline

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -334,6 +336,10 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VirtualComputedColumns,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 8},
},
{
Key: CPutInline,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 10},
},

// Step (2): Add new versions here.
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvbase",
"//pkg/roachpb",
Expand Down
40 changes: 36 additions & 4 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kv
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -421,17 +422,44 @@ func (b *Batch) PutInline(key, value interface{}) {
// expValue needs to correspond to a Value.TagAndDataBytes() - i.e. a key's
// value without the checksum (as the checksum includes the key too).
func (b *Batch) CPut(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, false)
b.cputInternal(key, value, expValue, false, false)
}

// CPutAllowingIfNotExists is like CPut except it also allows the Put when the
// existing entry does not exist -- i.e. it succeeds if there is no existing
// entry or the existing entry has the expected value.
func (b *Batch) CPutAllowingIfNotExists(key, value interface{}, expValue []byte) {
b.cputInternal(key, value, expValue, true)
b.cputInternal(key, value, expValue, true, false)
}

func (b *Batch) cputInternal(key, value interface{}, expValue []byte, allowNotExist bool) {
// cPutInline conditionally sets the value for a key if the existing value is
// equal to expValue, but does not maintain multi-version values. To
// conditionally set a value only if the key doesn't currently exist, pass an
// empty expValue. The most recent value is always overwritten. Inline values
// cannot be mutated transactionally and should be used with caution.
//
// A new result will be appended to the batch which will contain a single row
// and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc).
//
// A nil value can be used to delete the respective key, since there is no
// DelInline(). This is different from CPut().
//
// Callers should check the version gate clusterversion.CPutInline to make sure
// this is supported. The method is unexported to prevent external callers using
// this without checking the version, since the CtxForCPutInline guard can't be
// used with Batch.
func (b *Batch) cPutInline(key, value interface{}, expValue []byte) {
// TODO(erikgrinaker): export once clusterversion.CPutInline is removed.
_ = clusterversion.CPutInline
b.cputInternal(key, value, expValue, false, true)
}

func (b *Batch) cputInternal(
key, value interface{}, expValue []byte, allowNotExist bool, inline bool,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 1, notRaw, err)
Expand All @@ -442,7 +470,11 @@ func (b *Batch) cputInternal(key, value interface{}, expValue []byte, allowNotEx
b.initResult(0, 1, notRaw, err)
return
}
b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist))
if inline {
b.appendReqs(roachpb.NewConditionalPutInline(k, v, expValue, allowNotExist))
} else {
b.appendReqs(roachpb.NewConditionalPut(k, v, expValue, allowNotExist))
}
b.initResult(1, 1, notRaw, nil)
}

Expand Down
65 changes: 65 additions & 0 deletions pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,71 @@ func TestClientPutInline(t *testing.T) {
}
}

func TestClientCPutInline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())
db := createTestClient(t, s)
ctx := kv.CtxForCPutInline(context.Background())
key := testUser + "/key"
value := []byte("value")

// Should fail on non-existent key with expected value.
if err := db.CPutInline(ctx, key, value, []byte("foo")); err == nil {
t.Fatalf("expected error, got nil")
}

// Setting value when expecting nil should work.
if err := db.CPutInline(ctx, key, value, nil); err != nil {
t.Fatalf("unable to set value: %s", err)
}
gr, err := db.Get(ctx, key)
if err != nil {
t.Fatalf("unable to get value: %s", err)
}
if !bytes.Equal(value, gr.ValueBytes()) {
t.Errorf("expected values equal; %s != %s", value, gr.ValueBytes())
}
if ts := gr.Value.Timestamp; !ts.IsEmpty() {
t.Fatalf("expected zero timestamp; got %s", ts)
}

// Updating existing value with nil expected value should fail.
if err := db.CPutInline(ctx, key, []byte("new"), nil); err == nil {
t.Fatalf("expected error, got nil")
}

// Updating value with other expected value should fail.
if err := db.CPutInline(ctx, key, []byte("new"), []byte("foo")); err == nil {
t.Fatalf("expected error, got nil")
}

// Updating when given correct value should work.
if err := db.CPutInline(ctx, key, []byte("new"), gr.Value.TagAndDataBytes()); err != nil {
t.Fatalf("unable to update value: %s", err)
}
gr, err = db.Get(ctx, key)
if err != nil {
t.Fatalf("unable to get value: %s", err)
} else if !bytes.Equal([]byte("new"), gr.ValueBytes()) {
t.Errorf("expected values equal; %s != %s", []byte("new"), gr.ValueBytes())
} else if ts := gr.Value.Timestamp; !ts.IsEmpty() {
t.Fatalf("expected zero timestamp; got %s", ts)
}

// Deleting when given nil and correct expected value should work.
if err := db.CPutInline(ctx, key, nil, gr.Value.TagAndDataBytes()); err != nil {
t.Fatalf("unable to delete value: %s", err)
}
gr, err = db.Get(ctx, key)
if err != nil {
t.Fatalf("unable to get value: %s", err)
} else if gr.Value != nil {
t.Fatalf("expected deleted value; got %s", gr.ValueBytes())
}
}

// TestClientEmptyValues verifies that empty values are preserved
// for both empty []byte and integer=0. This used to fail when we
// allowed the protobufs to be gob-encoded using the default go rpc
Expand Down
42 changes: 42 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -388,6 +389,47 @@ func (db *DB) CPut(ctx context.Context, key, value interface{}, expValue []byte)
return getOneErr(db.Run(ctx, b), b)
}

// CtxForCPutInline is a gate to make sure the caller is aware that CPutInline
// is only available with clusterversion.CPutInline, and must check this before
// using the method.
func CtxForCPutInline(ctx context.Context) context.Context {
// TODO(erikgrinaker): This code and all of its uses can be removed when the
// version below is removed:
_ = clusterversion.CPutInline
return context.WithValue(ctx, canUseCPutInline{}, canUseCPutInline{})
}

type canUseCPutInline struct{}

// CPutInline conditionally sets the value for a key if the existing value is
// equal to expValue, but does not maintain multi-version values. To
// conditionally set a value only if the key doesn't currently exist, pass an
// empty expValue. The most recent value is always overwritten. Inline values
// cannot be mutated transactionally and should be used with caution.
//
// Returns an error if the existing value is not equal to expValue.
//
// key can be either a byte slice or a string. value can be any key type, a
// protoutil.Message or any Go primitive type (bool, int, etc). A nil value
// means delete the key.
//
// An empty expValue means that the key is expected to not exist. If not empty,
// expValue needs to correspond to a Value.TagAndDataBytes() - i.e. a key's
// value without the checksum (as the checksum includes the key too).
//
// Callers should check the version gate clusterversion.CPutInline to make sure
// this is supported, and must wrap the context using CtxForCPutInline(ctx) to
// enable the call.
func (db *DB) CPutInline(ctx context.Context, key, value interface{}, expValue []byte) error {
if ctx.Value(canUseCPutInline{}) == nil {
return errors.New("CPutInline is new in 21.1, you must check the CPutInline cluster version " +
"and use CtxForCPutInline to enable it")
}
b := &Batch{}
b.cPutInline(key, value, expValue)
return getOneErr(db.Run(ctx, b), b)
}

// InitPut sets the first value for a key to value. A ConditionFailedError is
// reported if a value already exists for the key and it's not equal to the
// value passed in. If failOnTombstones is set to true, tombstones count as
Expand Down
58 changes: 58 additions & 0 deletions pkg/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,64 @@ func TestDB_CPut(t *testing.T) {
checkResult(t, []byte("4"), result.ValueBytes())
}

func TestDB_CPutInline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
s, db := setup(t)
defer s.Stopper().Stop(context.Background())
ctx := kv.CtxForCPutInline(context.Background())

if err := db.PutInline(ctx, "aa", "1"); err != nil {
t.Fatal(err)
}
if err := db.CPutInline(ctx, "aa", "2", kvclientutils.StrToCPutExistingValue("1")); err != nil {
t.Fatal(err)
}
result, err := db.Get(ctx, "aa")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte("2"), result.ValueBytes())

if err = db.CPutInline(ctx, "aa", "3", kvclientutils.StrToCPutExistingValue("1")); err == nil {
t.Fatal("expected error from conditional put")
}
result, err = db.Get(ctx, "aa")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte("2"), result.ValueBytes())

if err = db.CPutInline(ctx, "bb", "4", kvclientutils.StrToCPutExistingValue("1")); err == nil {
t.Fatal("expected error from conditional put")
}
result, err = db.Get(ctx, "bb")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte(""), result.ValueBytes())

if err = db.CPutInline(ctx, "bb", "4", nil); err != nil {
t.Fatal(err)
}
result, err = db.Get(ctx, "bb")
if err != nil {
t.Fatal(err)
}
checkResult(t, []byte("4"), result.ValueBytes())

if err = db.CPutInline(ctx, "aa", nil, kvclientutils.StrToCPutExistingValue("2")); err != nil {
t.Fatal(err)
}
result, err = db.Get(ctx, "aa")
if err != nil {
t.Fatal(err)
}
if result.Value != nil {
t.Fatalf("expected deleted value, got %x", result.ValueBytes())
}
}

func TestDB_InitPut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading

0 comments on commit 1abab27

Please sign in to comment.