Skip to content

Commit

Permalink
rpc,kvclient: cull the internalClientAdapter
Browse files Browse the repository at this point in the history
rpc.internalClientAdapter is used to call into the roachpb.Internal gRPC
service of the local Node without actually going through gRPC. This
structure was implementing all of the ever-growing Internal interface,
even though it was only ever used for exactly two methods - the ones
used by the DistSender (Batch and RangeFeed). This patch extracts the
sub-interface of Internal needed by the DistSender, and trims the
internalClientAdapter to only implement the respective two methods.

There are multiple benefits in doing this trimming:
1) The internalClientAdapter had too much dead code. In particular for
streaming methods, the implementation is non-trivial. The respective
code is mostly copy-paste, but still.
2) The code in the internalClientAdapter, as written, was
insufficient(*) as it was not setting a magic auth key on the ctx
required in order to pass the Authentication gRPC server interceptor. In
the case of the two methods used by the DistSender, our
nodedialer.Dialer does an awkward thing where it create a ctx with the
magic key and returns that ctx over several levels to the grpcTransport,
which use it for the upcoming gRPC call. This patch removes this awkward
protocol by having the internalClientAdaptor populate the auth key in
the ctx.

(*) Full disclosure - the respective magic key is not actually needed
since we don't currently run the gRPC interceptors for these local RPCs.
But I'm trying to change that.

Release note: None
  • Loading branch information
andreimatei committed Jun 7, 2022
1 parent ba4d29e commit 0dade40
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 195 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,14 @@ func (ds *DistSender) singleRangeFeed(
}

args.Replica = transport.NextReplica()
clientCtx, client, err := transport.NextInternalClient(ctx)
client, err := transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
continue
}

log.VEventf(ctx, 3, "attempting to create a RangeFeed over replica %s", args.Replica)
stream, err := client.RangeFeed(clientCtx, &args)
stream, err := client.RangeFeed(ctx, &args)
if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
if grpcutil.IsAuthError(err) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(repl)
transport.EXPECT().NextInternalClient(gomock.Any()).Return(
ctx, nil, grpcstatus.Error(spec.errorCode, ""))
nil, grpcstatus.Error(spec.errorCode, ""))
}
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil)
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0])
transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil)
transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil)
transport.EXPECT().Release()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (l *simpleTransportAdapter) SendNext(

func (l *simpleTransportAdapter) NextInternalClient(
ctx context.Context,
) (context.Context, roachpb.InternalClient, error) {
) (rpc.RestrictedInternalClient, error) {
panic("unimplemented")
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (f *firstNErrorTransport) SendNext(

func (f *firstNErrorTransport) NextInternalClient(
ctx context.Context,
) (context.Context, roachpb.InternalClient, error) {
) (rpc.RestrictedInternalClient, error) {
panic("unimplemented")
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ type Transport interface {
SendNext(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, error)

// NextInternalClient returns the InternalClient to use for making RPC
// calls. Returns a context.Context which should be used when making RPC
// calls on the returned server (This context is annotated to mark this
// request as in-process and bypass ctx.Peer checks).
NextInternalClient(context.Context) (context.Context, roachpb.InternalClient, error)
// calls.
NextInternalClient(context.Context) (rpc.RestrictedInternalClient, error)

// NextReplica returns the replica descriptor of the replica to be tried in
// the next call to SendNext. MoveToFront will cause the return value to
Expand Down Expand Up @@ -182,7 +180,7 @@ func (gt *grpcTransport) SendNext(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
r := gt.replicas[gt.nextReplicaIdx]
ctx, iface, err := gt.NextInternalClient(ctx)
iface, err := gt.NextInternalClient(ctx)
if err != nil {
return nil, err
}
Expand All @@ -193,7 +191,10 @@ func (gt *grpcTransport) SendNext(

// NB: nodeID is unused, but accessible in stack traces.
func (gt *grpcTransport) sendBatch(
ctx context.Context, nodeID roachpb.NodeID, iface roachpb.InternalClient, ba roachpb.BatchRequest,
ctx context.Context,
nodeID roachpb.NodeID,
iface rpc.RestrictedInternalClient,
ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
// Bail out early if the context is already canceled. (GRPC will
// detect this pretty quickly, but the first check of the context
Expand Down Expand Up @@ -232,7 +233,7 @@ func (gt *grpcTransport) sendBatch(
// RPCs.
func (gt *grpcTransport) NextInternalClient(
ctx context.Context,
) (context.Context, roachpb.InternalClient, error) {
) (rpc.RestrictedInternalClient, error) {
r := gt.replicas[gt.nextReplicaIdx]
gt.nextReplicaIdx++
return gt.nodeDialer.DialInternalClient(ctx, r.NodeID, gt.class)
Expand Down Expand Up @@ -360,7 +361,7 @@ func (s *senderTransport) SendNext(

func (s *senderTransport) NextInternalClient(
ctx context.Context,
) (context.Context, roachpb.InternalClient, error) {
) (rpc.RestrictedInternalClient, error) {
panic("unimplemented")
}

Expand Down
1 change: 1 addition & 0 deletions pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"heartbeat.go",
"keepalive.go",
"metrics.go",
"restricted_internal_client.go",
"snappy.go",
"tls.go",
],
Expand Down
191 changes: 29 additions & 162 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ type Context struct {

rpcCompression bool

localInternalClient roachpb.InternalClient
localInternalClient RestrictedInternalClient

conns syncmap.Map

Expand Down Expand Up @@ -597,7 +597,7 @@ func (rpcCtx *Context) Metrics() *Metrics {
// https://github.com/cockroachdb/cockroach/pull/73309
func (rpcCtx *Context) GetLocalInternalClientForAddr(
target string, nodeID roachpb.NodeID,
) roachpb.InternalClient {
) RestrictedInternalClient {
if target == rpcCtx.Config.AdvertiseAddr && nodeID == rpcCtx.NodeID.Get() {
return rpcCtx.localInternalClient
}
Expand All @@ -608,63 +608,43 @@ type internalClientAdapter struct {
server roachpb.InternalServer
}

var _ RestrictedInternalClient = internalClientAdapter{}

// Batch implements the roachpb.InternalClient interface.
func (a internalClientAdapter) Batch(
ctx context.Context, ba *roachpb.BatchRequest, _ ...grpc.CallOption,
) (*roachpb.BatchResponse, error) {
// Mark this as originating locally, which is useful for the decision about
// memory allocation tracking.
ba.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL
return a.server.Batch(ctx, ba)
}

// RangeLookup implements the roachpb.InternalClient interface.
func (a internalClientAdapter) RangeLookup(
ctx context.Context, rl *roachpb.RangeLookupRequest, _ ...grpc.CallOption,
) (*roachpb.RangeLookupResponse, error) {
return a.server.RangeLookup(ctx, rl)
}

// Join implements the roachpb.InternalClient interface.
func (a internalClientAdapter) Join(
ctx context.Context, req *roachpb.JoinNodeRequest, _ ...grpc.CallOption,
) (*roachpb.JoinNodeResponse, error) {
return a.server.Join(ctx, req)
}

// ResetQuorum is part of the roachpb.InternalClient interface.
func (a internalClientAdapter) ResetQuorum(
ctx context.Context, req *roachpb.ResetQuorumRequest, _ ...grpc.CallOption,
) (*roachpb.ResetQuorumResponse, error) {
return a.server.ResetQuorum(ctx, req)
// Create a new context from the existing one with the "local request" field set.
// This tells the handler that this is an in-process request, bypassing ctx.Peer checks.
return a.server.Batch(grpcutil.NewLocalRequestContext(ctx), ba)
}

// TokenBucket is part of the roachpb.InternalClient interface.
func (a internalClientAdapter) TokenBucket(
ctx context.Context, in *roachpb.TokenBucketRequest, opts ...grpc.CallOption,
) (*roachpb.TokenBucketResponse, error) {
return a.server.TokenBucket(ctx, in)
}

// GetSpanConfigs is part of the roachpb.InternalClient interface.
func (a internalClientAdapter) GetSpanConfigs(
ctx context.Context, req *roachpb.GetSpanConfigsRequest, _ ...grpc.CallOption,
) (*roachpb.GetSpanConfigsResponse, error) {
return a.server.GetSpanConfigs(ctx, req)
}
// RangeFeed implements the roachpb.InternalClient interface.
func (a internalClientAdapter) RangeFeed(
ctx context.Context, args *roachpb.RangeFeedRequest, _ ...grpc.CallOption,
) (roachpb.Internal_RangeFeedClient, error) {
ctx, cancel := context.WithCancel(ctx)
ctx, sp := tracing.ChildSpan(ctx, "/cockroach.roachpb.Internal/RangeFeed")
rfAdapter := rangeFeedClientAdapter{
respStreamClientAdapter: makeRespStreamClientAdapter(grpcutil.NewLocalRequestContext(ctx)),
}

// GetAllSystemSpanConfigsThatApply is part of the roachpb.InternalClient interface.
func (a internalClientAdapter) GetAllSystemSpanConfigsThatApply(
ctx context.Context, req *roachpb.GetAllSystemSpanConfigsThatApplyRequest, _ ...grpc.CallOption,
) (*roachpb.GetAllSystemSpanConfigsThatApplyResponse, error) {
return a.server.GetAllSystemSpanConfigsThatApply(ctx, req)
}
// Mark this as originating locally.
args.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL
go func() {
defer cancel()
defer sp.Finish()
err := a.server.RangeFeed(args, rfAdapter)
if err == nil {
err = io.EOF
}
rfAdapter.errC <- err
}()

// UpdateSpanConfigs is part of the roachpb.InternalClient interface.
func (a internalClientAdapter) UpdateSpanConfigs(
ctx context.Context, req *roachpb.UpdateSpanConfigsRequest, _ ...grpc.CallOption,
) (*roachpb.UpdateSpanConfigsResponse, error) {
return a.server.UpdateSpanConfigs(ctx, req)
return rfAdapter, nil
}

type respStreamClientAdapter struct {
Expand Down Expand Up @@ -744,121 +724,8 @@ func (a rangeFeedClientAdapter) Send(e *roachpb.RangeFeedEvent) error {
var _ roachpb.Internal_RangeFeedClient = rangeFeedClientAdapter{}
var _ roachpb.Internal_RangeFeedServer = rangeFeedClientAdapter{}

// RangeFeed implements the roachpb.InternalClient interface.
func (a internalClientAdapter) RangeFeed(
ctx context.Context, args *roachpb.RangeFeedRequest, _ ...grpc.CallOption,
) (roachpb.Internal_RangeFeedClient, error) {
ctx, cancel := context.WithCancel(ctx)
ctx, sp := tracing.ChildSpan(ctx, "/cockroach.roachpb.Internal/RangeFeed")
rfAdapter := rangeFeedClientAdapter{
respStreamClientAdapter: makeRespStreamClientAdapter(ctx),
}

// Mark this as originating locally.
args.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL
go func() {
defer cancel()
defer sp.Finish()
err := a.server.RangeFeed(args, rfAdapter)
if err == nil {
err = io.EOF
}
rfAdapter.errC <- err
}()

return rfAdapter, nil
}

type gossipSubscriptionClientAdapter struct {
respStreamClientAdapter
}

// roachpb.Internal_GossipSubscriptionServer methods.
func (a gossipSubscriptionClientAdapter) Recv() (*roachpb.GossipSubscriptionEvent, error) {
e, err := a.recvInternal()
if err != nil {
return nil, err
}
return e.(*roachpb.GossipSubscriptionEvent), nil
}

// roachpb.Internal_GossipSubscriptionServer methods.
func (a gossipSubscriptionClientAdapter) Send(e *roachpb.GossipSubscriptionEvent) error {
return a.sendInternal(e)
}

var _ roachpb.Internal_GossipSubscriptionClient = gossipSubscriptionClientAdapter{}
var _ roachpb.Internal_GossipSubscriptionServer = gossipSubscriptionClientAdapter{}

// GossipSubscription is part of the roachpb.InternalClient interface.
func (a internalClientAdapter) GossipSubscription(
ctx context.Context, args *roachpb.GossipSubscriptionRequest, _ ...grpc.CallOption,
) (roachpb.Internal_GossipSubscriptionClient, error) {
ctx, cancel := context.WithCancel(ctx)
ctx, sp := tracing.ChildSpan(ctx, "/cockroach.roachpb.Internal/GossipSubscription")
gsAdapter := gossipSubscriptionClientAdapter{
respStreamClientAdapter: makeRespStreamClientAdapter(ctx),
}

go func() {
defer cancel()
defer sp.Finish()
err := a.server.GossipSubscription(args, gsAdapter)
if err == nil {
err = io.EOF
}
gsAdapter.errC <- err
}()

return gsAdapter, nil
}

type tenantSettingsClientAdapter struct {
respStreamClientAdapter
}

// roachpb.Internal_TenantSettingsServer methods.
func (a tenantSettingsClientAdapter) Recv() (*roachpb.TenantSettingsEvent, error) {
e, err := a.recvInternal()
if err != nil {
return nil, err
}
return e.(*roachpb.TenantSettingsEvent), nil
}

// roachpb.Internal_TenantSettingsServer methods.
func (a tenantSettingsClientAdapter) Send(e *roachpb.TenantSettingsEvent) error {
return a.sendInternal(e)
}

var _ roachpb.Internal_TenantSettingsClient = tenantSettingsClientAdapter{}
var _ roachpb.Internal_TenantSettingsServer = tenantSettingsClientAdapter{}

// TenantSettings is part of the roachpb.InternalClient interface.
func (a internalClientAdapter) TenantSettings(
ctx context.Context, args *roachpb.TenantSettingsRequest, _ ...grpc.CallOption,
) (roachpb.Internal_TenantSettingsClient, error) {
ctx, cancel := context.WithCancel(ctx)
gsAdapter := tenantSettingsClientAdapter{
respStreamClientAdapter: makeRespStreamClientAdapter(ctx),
}

go func() {
defer cancel()
err := a.server.TenantSettings(args, gsAdapter)
if err == nil {
err = io.EOF
}
gsAdapter.errC <- err
}()

return gsAdapter, nil
}

var _ roachpb.InternalClient = internalClientAdapter{}

// IsLocal returns true if the given InternalClient is local.
func IsLocal(iface roachpb.InternalClient) bool {
func IsLocal(iface RestrictedInternalClient) bool {
_, ok := iface.(internalClientAdapter)
return ok // internalClientAdapter is used for local connections.
}
Expand Down
Loading

0 comments on commit 0dade40

Please sign in to comment.