Skip to content

Commit

Permalink
Merge #52526
Browse files Browse the repository at this point in the history
52526: server: introduce join rpc for node id allocation r=irfansharif a=irfansharif

This mostly follows the ideas in #32574, and serves as a crucial
building block for #48843. Specifically this PR introduces a new Join
RPC that new nodes can use, addressing already initialized nodes, to
learn about the cluster ID and its node id. Previously joining nodes
were responsible for allocating their own IDs and used to discover the
cluster ID.

By moving towards a more understandable flow of how nodes joins the
cluster, we can build a few useful primitives on top of this:
 - we can prevent mismatched version nodes from joining the cluster
 - we can prevent decommissioned nodes from joining the cluster
 - we can add the liveness record for a given node as soon as it joins,
   which would simplify our liveness record handling code that is
   perennially concerned with missing liveness records

The tiny bit of complexity in this PR comes from how we're able to
migrate into this behavior from the old. To that end we retain the
earlier gossip-based cluster ID discovery+node ID allocation for self
behavior. Nodes with this patch will attempt to use this join RPC, if
implemented on the addressed node, and fall back to using the previous
behavior if not. It wasn't possible to use cluster versions for this
migrations because this happens very early in the node start up process,
and the version gating this change will not be active until much later
in the crdb process lifetime.

---

There are some leftover TODOs that I'm looking to address in this PR.
They should be tiny, and be easy to retro-fit into what we have so far.
Specifically I'm going to plumb the client address into the RPC so the
server is able to generate backlinks (and solve the bidirectionality
problem). I'm also going to try and add the liveness record for a
joining node as part of the join rpc. Right now the tests verifying
connectivity/bootstrap/join flags pass out of the box, but I'm going to
try adding more randomized testing here to test full connectivity once I
address these TODOs.

Release justification: Low risk, high benefit changes to existing functionality
Release note: None


Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
craig[bot] and irfansharif committed Aug 28, 2020
2 parents 2e4df33 + e755415 commit 0175d75
Show file tree
Hide file tree
Showing 33 changed files with 3,066 additions and 914 deletions.
541 changes: 541 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Large diffs are not rendered by default.

436 changes: 435 additions & 1 deletion c-deps/libroach/protos/roachpb/api.pb.h

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,8 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td></tr>
<tr><td><a name="crdb_internal.node_executable_version"></a><code>crdb_internal.node_executable_version() &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>Returns the version of CockroachDB this node is running.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.node_id"></a><code>crdb_internal.node_id() &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>Returns the node ID.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.notice"></a><code>crdb_internal.notice(msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.notice"></a><code>crdb_internal.notice(severity: <a href="string.html">string</a>, msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var rpcRetryOpts = retry.Options{
MaxBackoff: 4 * time.Microsecond,
}

var _ roachpb.InternalServer = &mockServer{}

type mockServer struct {
rangeLookupFn func(context.Context, *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error)
gossipSubFn func(*roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer) error
Expand All @@ -64,6 +66,12 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe
panic("unimplemented")
}

func (m *mockServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

func gossipEventForNodeDesc(desc *roachpb.NodeDescriptor) *roachpb.GossipSubscriptionEvent {
val, err := protoutil.Marshal(desc)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/cli/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func runInit(cmd *cobra.Command, args []string) error {

// Actually perform cluster initialization.
c := serverpb.NewInitClient(conn)

if _, err = c.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func runStart(cmd *cobra.Command, args []string, disableReplication bool) error

if waitForInit {
log.Shout(ctx, log.Severity_INFO,
"initial startup completed.\n"+
"initial startup completed\n"+
"Node will now attempt to join a running cluster, or wait for `cockroach init`.\n"+
"Client connections will be accepted after this completes successfully.\n"+
"Check the log file(s) for progress. ")
Expand Down Expand Up @@ -587,9 +587,9 @@ If problems persist, please see %s.`
s.PeriodicallyCheckForUpdates(ctx)
}

initialBoot := s.InitialBoot()
initialStart := s.InitialStart()

if disableReplication && initialBoot {
if disableReplication && initialStart {
// For start-single-node, set the default replication factor to
// 1 so as to avoid warning message and unnecessary rebalance
// churn.
Expand Down Expand Up @@ -649,7 +649,7 @@ If problems persist, please see %s.`
}
fmt.Fprintf(tw, "storage engine: \t%s\n", serverCfg.StorageEngine.String())
nodeID := s.NodeID()
if initialBoot {
if initialStart {
if nodeID == server.FirstNodeID {
fmt.Fprintf(tw, "status:\tinitialized new cluster\n")
} else {
Expand Down
1 change: 1 addition & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ var (
// this binary. If this binary is started using a store marked with an older
// version than binaryMinSupportedVersion, then the binary will exit with
// an error.
//
// We support everything after 19.1, including pre-release 19.2 versions.
// This is generally beneficial, but in particular it allows the
// version-upgrade roachtest to use a pre-release 19.2 binary before upgrading
Expand Down
15 changes: 15 additions & 0 deletions pkg/cmd/roachtest/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,21 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) {
Multiplier: 2,
}

// This is a pretty gross hack to let the bootstrap info (cluster ID,
// liveness records) disseminate through the cluster. Since it's no longer
// happening through gossip, it takes a bit longer to happen. We should do
// two things to improve our story here:
//
// - We should opportunistically write to the liveness table when adding a
// node through the Join RPC. This would also simplify the handling of
// empty liveness records (they would no longer exist).
// - We should add roachtest helpers that wait until each node has received
// cluster ID information, and use it in all the tests that need it (which
// may very well be all the tests).
//
// TODO(irfansharif): Do the above.
time.Sleep(30 * time.Second)

// Partially decommission then recommission a random node, from another
// random node. Run a couple of status checks while doing so.
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/mixed_version_decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,6 @@ func uploadVersion(nodes nodeListOption, version string) versionStep {
func startVersion(nodes nodeListOption, version string) versionStep {
return func(ctx context.Context, t *test, u *versionUpgradeTest) {
args := startArgs("--binary=" + cockroachBinaryPath(version))
u.c.Start(ctx, t, nodes, args, startArgsDontEncrypt, roachprodArgOption{"--sequential=false"})
u.c.Start(ctx, t, nodes, args, startArgsDontEncrypt)
}
}
227 changes: 227 additions & 0 deletions pkg/cmd/roachtest/mixed_version_join_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"context"
"fmt"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
)

func registerJoinInitMixed(r *testRegistry) {
numNodes := 4
r.Add(testSpec{
Name: "join-init/mixed",
Owner: OwnerKV,
MinVersion: "v20.2.0",
Cluster: makeClusterSpec(numNodes),
Run: func(ctx context.Context, t *test, c *cluster) {
runJoinInitMixed(ctx, t, c, r.buildVersion)
},
})
}

// runJoinInitMixed tests the mechanism used to allocate node IDs and
// disseminate cluster IDs in mixed version clusters.
//
// TODO(irfansharif): This test is only really useful for the 20.1/20.2
// timeframe where we introduced the Join RPC; we should remove this test at a
// future point.
func runJoinInitMixed(ctx context.Context, t *test, c *cluster, buildVersion version.Version) {
predecessorVersion, err := PredecessorVersion(buildVersion)
if err != nil {
t.Fatal(err)
}

// An empty string means that the cockroach binary specified by flag
// `cockroach` will be used.
const mainVersion = ""

// This test starts off with a two node cluster (node{1,2}) running at
// predecessor version. It then rolls over node2 to the current version. It
// then adds node3 to the cluster, which is randomized to be either the
// current version or the predecessor. It is also randomly configured to
// point to one of node1 or node2 (they're running different binary
// versions) in its join flags.
node1, node2, node3, node4 := 1, 2, 3, 4

nodeX := 1 + rand.Intn(2) // Either node1 or node2.
versionX := func() string { // Either current or predecessor version.
if rand.Intn(2) == 0 {
return mainVersion
}
return predecessorVersion
}()
t.l.Printf("nodeX = %d; versionX = \"%s\"", nodeX, versionX)

allNodes := c.All()
u := newVersionUpgradeTest(c,
// We upload both binaries to each node, to be able to vary the binary
// used when issuing `cockroach node` subcommands.
uploadVersion(allNodes, predecessorVersion),
uploadVersion(allNodes, mainVersion),

// Start everything at predecessor version.
startVersion(c.Range(1, 2), predecessorVersion),
waitForUpgradeStep(c.Range(1, 2)),
preventAutoUpgradeStep(node1),

checkNodeAndStoreIDs(node1, 1),
checkNodeAndStoreIDs(node2, 2),

// If we upgrade too soon, we some times run into "last used with
// cockroach version vX-1, is too old for running version vX+1" errors.
// Give it a generous window to persist the right version marker on
// disk.
//
// TODO(irfansharif): Figure out a better way to address this. This is
// applicable to a lot of tests. I'd naively expect `waitForUpgrade` to
// also wait for the on-disk version marker to get bumped. We might need
// to change crdb code to make that happen.
sleepStep(time.Minute),

// Roll node2 into the new version and check to see that it retains its
// node/cluster ID.
binaryUpgradeStep(c.Node(node2), mainVersion),
checkClusterIDsMatch(node1, node2),
checkNodeAndStoreIDs(node2, 2),

// Add node3 (running either predecessor version binary or current) to
// the cluster, pointing at nodeX (running either predecessor version
// binary or current).
addNodeStep(c.Node(node3), nodeX, versionX),
checkClusterIDsMatch(nodeX, node3),
checkNodeAndStoreIDs(node3, 3),

// Roll all nodes forward, and finalize upgrade.
binaryUpgradeStep(c.Range(1, 3), mainVersion),
allowAutoUpgradeStep(node1),
waitForUpgradeStep(c.Range(1, 3)),

checkNodeAndStoreIDs(node1, 1),
checkNodeAndStoreIDs(node2, 2),
checkNodeAndStoreIDs(node3, 3),

// TODO(irfansharif): We'd like to add a step like the one below, and
// will only be able to do so once 20.2 is cut. 20.1 code does not make
// use of the Join RPC to join the cluster, so this "gating mechanism"
// does not apply.
//
// Add node4 (running at predecessor version) to the cluster, pointing
// at nodeX (running new version, now with new cluster version active).
// We expect this to fail.
//
// unsuccessfullyAddNodeStep(c.Node(node4), nodeX, predecessorVersion),

// Add node4 (running at new version) to the cluster, pointing at nodeX.
// (running new version, now with new cluster version active).
addNodeStep(c.Node(node4), nodeX, mainVersion),
checkClusterIDsMatch(node1, node4),
checkNodeAndStoreIDs(node4, 4),
)

u.run(ctx, t)
}

func addNodeStep(nodes nodeListOption, joinNode int, newVersion string) versionStep {
return func(ctx context.Context, t *test, u *versionUpgradeTest) {
c := u.c
args := u.uploadVersion(ctx, t, nodes, newVersion)

for _, node := range nodes {
t.l.Printf("adding node %d to the cluster\n", node)
joinAddr := c.InternalAddr(ctx, c.Node(joinNode))[0]
c.Start(ctx, t, c.Node(node), args,
startArgs(fmt.Sprintf("-a=--join=%s", joinAddr)),
)
}
}
}

func unsuccessfullyAddNodeStep(nodes nodeListOption, joinNode int, newVersion string) versionStep {
return func(ctx context.Context, t *test, u *versionUpgradeTest) {
c := u.c
args := u.uploadVersion(ctx, t, nodes, newVersion)

for _, node := range nodes {
t.l.Printf("adding node %d to the cluster\n", node)
joinAddr := c.InternalAddr(ctx, c.Node(joinNode))[0]
err := c.StartE(ctx, c.Node(node), args,
// TODO(irfansharif): `roachprod` should be taught to skip
// adding default flags if manually specified via --args/-a.
// Today it includes both versions, which seems silly.
startArgs(fmt.Sprintf("-a=--join=%s", joinAddr)),
)
if !errors.Is(err, server.ErrIncompatibleBinaryVersion) {
t.Fatalf("expected err: %s, got %v", server.ErrIncompatibleBinaryVersion, err)
}
}
}
}

var _ = unsuccessfullyAddNodeStep

func checkClusterIDsMatch(nodeA, nodeB int) versionStep {
return func(ctx context.Context, t *test, u *versionUpgradeTest) {
var clusterIDA, clusterIDB uuid.UUID
{
db := u.conn(ctx, t, nodeA)
if err := db.QueryRow(`select crdb_internal.cluster_id();`).Scan(&clusterIDA); err != nil {
t.Fatal(err)
}
}
{
db := u.conn(ctx, t, nodeB)
if err := db.QueryRow(`select crdb_internal.cluster_id();`).Scan(&clusterIDB); err != nil {
t.Fatal(err)
}
}

if clusterIDA != clusterIDB {
t.Fatalf("expected to cluster ids %s and %s to match", clusterIDA.String(), clusterIDB.String())
}
}
}

func checkNodeAndStoreIDs(from int, exp int) versionStep {
return func(ctx context.Context, t *test, u *versionUpgradeTest) {
db := u.conn(ctx, t, from)
var nodeID, storeID int
if err := db.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1;`).Scan(&nodeID); err != nil {
t.Fatal(err)
}

if exp != nodeID {
t.Fatalf("expected to find node id %d, found %d", exp, nodeID)
}

if err := db.QueryRow(`SELECT store_id FROM crdb_internal.kv_store_status WHERE node_id = $1 LIMIT 1;`, nodeID).Scan(&storeID); err != nil {
t.Fatal(err)
}

if exp != storeID {
t.Fatalf("expected to find store id %d, found %d", exp, storeID)
}
}
}

func sleepStep(duration time.Duration) versionStep {
return func(ctx context.Context, t *test, u *versionUpgradeTest) {
t.l.Printf("sleeping for %s...", duration.String())
time.Sleep(duration)
}
}
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func registerTests(r *testRegistry) {
registerInterleaved(r)
registerJepsen(r)
registerJobsMixedVersions(r)
registerJoinInitMixed(r)
registerKV(r)
registerKVContention(r)
registerKVQuiescenceDead(r)
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/stretchr/testify/require"
)

var _ roachpb.InternalServer = Node(0)

type Node time.Duration

func (n Node) Batch(
Expand Down Expand Up @@ -61,6 +63,10 @@ func (n Node) GossipSubscription(
panic("unimplemented")
}

func (n Node) Join(context.Context, *roachpb.JoinNodeRequest) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

// TestSendToOneClient verifies that Send correctly sends a request
// to one server using the heartbeat RPC.
func TestSendToOneClient(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,9 @@ func (m *mockInternalClient) GossipSubscription(
) (roachpb.Internal_GossipSubscriptionClient, error) {
return nil, fmt.Errorf("unsupported GossipSubscripion call")
}

func (m *mockInternalClient) Join(
context.Context, *roachpb.JoinNodeRequest, ...grpc.CallOption,
) (*roachpb.JoinNodeResponse, error) {
return nil, fmt.Errorf("unsupported Join call")
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (r *Replica) maybeGossipFirstRange(ctx context.Context) *roachpb.Error {
// Gossip the cluster ID from all replicas of the first range; there
// is no expiration on the cluster ID.
if log.V(1) {
log.Infof(ctx, "gossiping cluster id %q from store %d, r%d", r.store.ClusterID(),
log.Infof(ctx, "gossiping cluster ID %q from store %d, r%d", r.store.ClusterID(),
r.store.StoreID(), r.RangeID)
}
if err := r.store.Gossip().AddClusterID(r.store.ClusterID()); err != nil {
Expand Down
Loading

0 comments on commit 0175d75

Please sign in to comment.