Skip to content

Commit

Permalink
server: introduce join rpc for node id allocation
Browse files Browse the repository at this point in the history
This mostly follows the ideas in #32574, and serves as a crucial
building block for #48843. Specifically this PR introduces a new Join
RPC that new nodes can use, addressing already initialized nodes, to
learn about the cluster ID and its node ID. Previously joining nodes
were responsible for allocating their own IDs and used to discover the
cluster ID.

By moving towards a more understandable flow of how nodes joins the
cluster, we can build a few useful primitives on top of this:
 - we can prevent mismatched version nodes from joining the cluster
   which (this commit)
 - we can allocate the first store ID for a given node, which is a nice
   code simplification (this commit)
 - we can prevent decommissioned nodes from joining the cluster
   (future PR)
 - we can eliminate another usage of gossip where we previously used it
   to disseminate the cluster ID. In the 21.1 cycle we can defer gossip
   start until much later in the server start lifecycle (future PR)
 - we can add the liveness record for a given node as soon as it joins,
   which would simplify our liveness record handling code that is
   perennially concerned with missing liveness records (future PR)

The tiny bit of complexity in this PR comes from how we're able to
migrate into this behavior from the old. To that end we retain the
earlier gossip-based cluster ID discovery+node ID allocation for self
behavior. Nodes with this patch will attempt to use this join RPC, if
implemented on the addressed node, and fall back to using the previous
behavior if not.

Release justification: low risk, high benefit changes to existing functionality
Release note: None
  • Loading branch information
irfansharif committed Aug 28, 2020
1 parent 9a4feec commit a3284b7
Show file tree
Hide file tree
Showing 22 changed files with 2,743 additions and 830 deletions.
541 changes: 541 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Large diffs are not rendered by default.

436 changes: 435 additions & 1 deletion c-deps/libroach/protos/roachpb/api.pb.h

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var rpcRetryOpts = retry.Options{
MaxBackoff: 4 * time.Microsecond,
}

var _ roachpb.InternalServer = &mockServer{}

type mockServer struct {
rangeLookupFn func(context.Context, *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error)
gossipSubFn func(*roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer) error
Expand All @@ -64,6 +66,12 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe
panic("unimplemented")
}

func (m *mockServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

func gossipEventForNodeDesc(desc *roachpb.NodeDescriptor) *roachpb.GossipSubscriptionEvent {
val, err := protoutil.Marshal(desc)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/cli/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func runInit(cmd *cobra.Command, args []string) error {

// Actually perform cluster initialization.
c := serverpb.NewInitClient(conn)

if _, err = c.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func runStart(cmd *cobra.Command, args []string, disableReplication bool) error

if waitForInit {
log.Shout(ctx, log.Severity_INFO,
"initial startup completed.\n"+
"initial startup completed\n"+
"Node will now attempt to join a running cluster, or wait for `cockroach init`.\n"+
"Client connections will be accepted after this completes successfully.\n"+
"Check the log file(s) for progress. ")
Expand Down
1 change: 1 addition & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ var (
// this binary. If this binary is started using a store marked with an older
// version than binaryMinSupportedVersion, then the binary will exit with
// an error.
//
// We support everything after 19.1, including pre-release 19.2 versions.
// This is generally beneficial, but in particular it allows the
// version-upgrade roachtest to use a pre-release 19.2 binary before upgrading
Expand Down
15 changes: 15 additions & 0 deletions pkg/cmd/roachtest/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,21 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) {
Multiplier: 2,
}

// This is a pretty gross hack to let the bootstrap info (cluster ID,
// liveness records) disseminate through the cluster. Since it's no longer
// happening through gossip, it takes a bit longer to happen. We should do
// two things to improve our story here:
//
// - We should opportunistically write to the liveness table when adding a
// node through the Join RPC. This would also simplify the handling of
// empty liveness records (they would no longer exist).
// - We should add roachtest helpers that wait until each node has received
// cluster ID information, and use it in all the tests that need it (which
// may very well be all the tests).
//
// TODO(irfansharif): Do the above.
time.Sleep(30 * time.Second)

// Partially decommission then recommission a random node, from another
// random node. Run a couple of status checks while doing so.
{
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/stretchr/testify/require"
)

var _ roachpb.InternalServer = Node(0)

type Node time.Duration

func (n Node) Batch(
Expand Down Expand Up @@ -61,6 +63,10 @@ func (n Node) GossipSubscription(
panic("unimplemented")
}

func (n Node) Join(context.Context, *roachpb.JoinNodeRequest) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

// TestSendToOneClient verifies that Send correctly sends a request
// to one server using the heartbeat RPC.
func TestSendToOneClient(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,9 @@ func (m *mockInternalClient) GossipSubscription(
) (roachpb.Internal_GossipSubscriptionClient, error) {
return nil, fmt.Errorf("unsupported GossipSubscripion call")
}

func (m *mockInternalClient) Join(
context.Context, *roachpb.JoinNodeRequest, ...grpc.CallOption,
) (*roachpb.JoinNodeResponse, error) {
return nil, fmt.Errorf("unsupported Join call")
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (r *Replica) maybeGossipFirstRange(ctx context.Context) *roachpb.Error {
// Gossip the cluster ID from all replicas of the first range; there
// is no expiration on the cluster ID.
if log.V(1) {
log.Infof(ctx, "gossiping cluster id %q from store %d, r%d", r.store.ClusterID(),
log.Infof(ctx, "gossiping cluster ID %q from store %d, r%d", r.store.ClusterID(),
r.store.StoreID(), r.RangeID)
}
if err := r.store.Gossip().AddClusterID(r.store.ClusterID()); err != nil {
Expand Down
Loading

0 comments on commit a3284b7

Please sign in to comment.