Skip to content

Commit

Permalink
[wip] server,roachpb: move join rpc into InternalServer
Browse files Browse the repository at this point in the history
And address remaining PR comments (this commit will be squashed up).

Release note: None
  • Loading branch information
irfansharif committed Aug 20, 2020
1 parent 9df9d0a commit 22365db
Show file tree
Hide file tree
Showing 18 changed files with 2,127 additions and 1,310 deletions.
458 changes: 458 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Large diffs are not rendered by default.

356 changes: 355 additions & 1 deletion c-deps/libroach/protos/roachpb/api.pb.h

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var rpcRetryOpts = retry.Options{
MaxBackoff: 4 * time.Microsecond,
}

var _ roachpb.InternalServer = &mockServer{}

type mockServer struct {
rangeLookupFn func(context.Context, *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error)
gossipSubFn func(*roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer) error
Expand All @@ -64,6 +66,12 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe
panic("unimplemented")
}

func (m *mockServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

func gossipEventForNodeDesc(desc *roachpb.NodeDescriptor) *roachpb.GossipSubscriptionEvent {
val, err := protoutil.Marshal(desc)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ var (
// this binary. If this binary is started using a store marked with an older
// version than binaryMinSupportedVersion, then the binary will exit with
// an error.
//
// We support everything after 19.1, including pre-release 19.2 versions.
// This is generally beneficial, but in particular it allows the
// version-upgrade roachtest to use a pre-release 19.2 binary before upgrading
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/stretchr/testify/require"
)

var _ roachpb.InternalServer = Node(0)

type Node time.Duration

func (n Node) Batch(
Expand Down Expand Up @@ -61,6 +63,10 @@ func (n Node) GossipSubscription(
panic("unimplemented")
}

func (n Node) Join(context.Context, *roachpb.JoinNodeRequest) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

// TestSendToOneClient verifies that Send correctly sends a request
// to one server using the heartbeat RPC.
func TestSendToOneClient(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,9 @@ func (m *mockInternalClient) GossipSubscription(
) (roachpb.Internal_GossipSubscriptionClient, error) {
return nil, fmt.Errorf("unsupported GossipSubscripion call")
}

func (m *mockInternalClient) Join(
context.Context, *roachpb.JoinNodeRequest, ...grpc.CallOption,
) (*roachpb.JoinNodeResponse, error) {
return nil, fmt.Errorf("unsupported Join call")
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (r *Replica) maybeGossipFirstRange(ctx context.Context) *roachpb.Error {
// Gossip the cluster ID from all replicas of the first range; there
// is no expiration on the cluster ID.
if log.V(1) {
log.Infof(ctx, "gossiping cluster id %q from store %d, r%d", r.store.ClusterID(),
log.Infof(ctx, "gossiping cluster ID %q from store %d, r%d", r.store.ClusterID(),
r.store.StoreID(), r.RangeID)
}
if err := r.store.Gossip().AddClusterID(r.store.ClusterID()); err != nil {
Expand Down
1,643 changes: 1,019 additions & 624 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2196,10 +2196,37 @@ message GossipSubscriptionEvent {
Error error = 4;
}

// JoinNodeRequest is used to specify to the server node what the client's
// MinimumSupportedVersion is. If it's not compatible with the rest of the
// cluster, the join attempt is refused.
message JoinNodeRequest {
// TODO(irfansharif): Actually make use of this min supported version.
roachpb.Version min_supported_version = 1;
}

// JoinNodeResponse informs the joining node what the cluster id is and what
// node id was allocated to it.
//
// TODO(irfansharif): Think about the semantics for how store IDs get allocated.
// When letting a node join, we could return back at most (least?) one store ID,
// and entrust the node itself to allocate IDs for the remaining stores.
//
// TODO(irfansharif): We should use this RPC to tell us the right cluster
// version to use (instead of using the minimum possible version and relying on
// gossip to bump to for us).
message JoinNodeResponse {
bytes cluster_id = 1 [(gogoproto.customname) = "ClusterID"];
int32 node_id = 2 [(gogoproto.customname) = "NodeID"];
}

// Batch and RangeFeed service implemeted by nodes for KV API requests.
service Internal {
rpc Batch (BatchRequest) returns (BatchResponse) {}
rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse) {}
rpc RangeFeed (RangeFeedRequest) returns (stream RangeFeedEvent) {}
rpc GossipSubscription (GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {}

// Join a bootstrapped cluster. If the target node is itself not part of a
// bootstrapped cluster, an appropriate error is returned.
rpc Join(JoinNodeRequest) returns (JoinNodeResponse) { }
}
7 changes: 7 additions & 0 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,13 @@ func (a internalClientAdapter) RangeLookup(
return a.InternalServer.RangeLookup(ctx, rl)
}

// Join implements the roachpb.InternalClient interface.
func (a internalClientAdapter) Join(
ctx context.Context, req *roachpb.JoinNodeRequest, _ ...grpc.CallOption,
) (*roachpb.JoinNodeResponse, error) {
return a.InternalServer.Join(ctx, req)
}

type respStreamClientAdapter struct {
ctx context.Context
respC chan interface{}
Expand Down
8 changes: 8 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func TestHeartbeatCB(t *testing.T) {
})
}

var _ roachpb.InternalServer = &internalServer{}

type internalServer struct{}

func (*internalServer) Batch(
Expand All @@ -188,6 +190,12 @@ func (*internalServer) GossipSubscription(
panic("unimplemented")
}

func (*internalServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
panic("unimplemented")
}

// TestInternalServerAddress verifies that RPCContext uses AdvertiseAddr, not Addr, to
// determine whether to apply the local server optimization.
//
Expand Down
1 change: 0 additions & 1 deletion pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ var rpcsAllowedWhileBootstrapping = map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
"/cockroach.server.serverpb.Init/Bootstrap": {},
"/cockroach.server.serverpb.Init/Join": {},
"/cockroach.server.serverpb.Admin/Health": {},
}

Expand Down
Loading

0 comments on commit 22365db

Please sign in to comment.