Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: reuse gRPC streams across unary BatchRequest RPCs #136648

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ GIT_GREP="git $CONFIGS grep"
EXISTING_GO_GENERATE_COMMENTS="
pkg/config/field.go://go:generate stringer --type=Field --linecomment
pkg/rpc/context.go://go:generate mockgen -destination=mocks_generated_test.go --package=. Dialbacker
pkg/rpc/stream_pool.go://go:generate mockgen -destination=mocks_generated_test.go --package=. BatchStreamClient
pkg/roachprod/vm/aws/config.go://go:generate terraformgen -o terraform/main.tf
pkg/roachprod/prometheus/prometheus.go://go:generate mockgen -package=prometheus -destination=mocks_generated_test.go . Cluster
pkg/cmd/roachtest/clusterstats/collector.go://go:generate mockgen -package=clusterstats -destination mocks_generated_test.go github.com/cockroachdb/cockroach/pkg/roachprod/prometheus Client
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,4 @@ trace.span_registry.enabled boolean false if set, ongoing traces can be seen at
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.database_locality_metadata.enabled boolean true if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000024.3-upgrading-to-1000025.1-step-008 set the active cluster version in the format '<major>.<minor>' application
version version 1000024.3-upgrading-to-1000025.1-step-010 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,6 @@
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-database-locality-metadata-enabled" class="anchored"><code>ui.database_locality_metadata.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.3-upgrading-to-1000025.1-step-008</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.3-upgrading-to-1000025.1-step-010</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
5 changes: 5 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ const (
// range-ID local key, which is written below raft.
V25_1_AddRangeForceFlushKey

// V25_1_BatchStreamRPC adds the BatchStream RPC, which allows for more
// efficient Batch unary RPCs.
V25_1_BatchStreamRPC

// *************************************************
// Step (1) Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -240,6 +244,7 @@ var versionTable = [numKeys]roachpb.Version{
V25_1_AddJobsTables: {Major: 24, Minor: 3, Internal: 4},
V25_1_MoveRaftTruncatedState: {Major: 24, Minor: 3, Internal: 6},
V25_1_AddRangeForceFlushKey: {Major: 24, Minor: 3, Internal: 8},
V25_1_BatchStreamRPC: {Major: 24, Minor: 3, Internal: 10},

// *************************************************
// Step (2): Add new versions above this comment.
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ func (m *mockInternalClient) Batch(
return br, nil
}

func (m *mockInternalClient) BatchStream(
ctx context.Context, opts ...grpc.CallOption,
) (kvpb.Internal_BatchStreamClient, error) {
return nil, fmt.Errorf("unsupported BatchStream call")
}

// RangeLookup implements the kvpb.InternalClient interface.
func (m *mockInternalClient) RangeLookup(
ctx context.Context, rl *kvpb.RangeLookupRequest, _ ...grpc.CallOption,
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvtenant/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (*mockServer) Batch(context.Context, *kvpb.BatchRequest) (*kvpb.BatchRespon
panic("unimplemented")
}

func (m *mockServer) BatchStream(stream kvpb.Internal_BatchStreamServer) error {
panic("implement me")
}

func (m *mockServer) MuxRangeFeed(server kvpb.Internal_MuxRangeFeedServer) error {
panic("implement me")
}
Expand Down
26 changes: 16 additions & 10 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3668,44 +3668,50 @@ message JoinNodeResponse {

// Batch and RangeFeed service implemented by nodes for KV API requests.
service Internal {
rpc Batch (BatchRequest) returns (BatchResponse) {}
rpc Batch (BatchRequest) returns (BatchResponse) {}

// BatchStream is a streaming variant of Batch. There is a 1:1 correspondence
// between requests and responses. The method is used to facilitate pooling of
// gRPC streams to avoid the overhead of creating and discarding a new stream
// for each unary Batch RPC invocation. See rpc.BatchStreamPool.
rpc BatchStream (stream BatchRequest) returns (stream BatchResponse) {}

rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse) {}
rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {}
rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {}
rpc GossipSubscription (GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {}
rpc ResetQuorum (ResetQuorumRequest) returns (ResetQuorumResponse) {}

// TokenBucket is used by tenants to obtain Request Units and report
// consumption.
rpc TokenBucket (TokenBucketRequest) returns (TokenBucketResponse) {}
rpc TokenBucket (TokenBucketRequest) returns (TokenBucketResponse) {}

// Join a bootstrapped cluster. If the target node is itself not part of a
// bootstrapped cluster, an appropriate error is returned.
rpc Join(JoinNodeRequest) returns (JoinNodeResponse) { }
rpc Join (JoinNodeRequest) returns (JoinNodeResponse) {}

// GetSpanConfigs is used to fetch the span configurations over a given
// keyspan.
rpc GetSpanConfigs (GetSpanConfigsRequest) returns (GetSpanConfigsResponse) { }
rpc GetSpanConfigs (GetSpanConfigsRequest) returns (GetSpanConfigsResponse) {}

// GetAllSystemSpanConfigsThatApply is used to fetch all system span
// configurations that apply over a tenant's ranges.
rpc GetAllSystemSpanConfigsThatApply (GetAllSystemSpanConfigsThatApplyRequest) returns (GetAllSystemSpanConfigsThatApplyResponse) {}

// UpdateSpanConfigs is used to update the span configurations over given
// keyspans.
rpc UpdateSpanConfigs (UpdateSpanConfigsRequest) returns (UpdateSpanConfigsResponse) { }
rpc UpdateSpanConfigs (UpdateSpanConfigsRequest) returns (UpdateSpanConfigsResponse) {}

// SpanConfigConformance is used to determine whether ranges backing the given
// keyspans conform to span configs that apply over them.
rpc SpanConfigConformance (SpanConfigConformanceRequest) returns (SpanConfigConformanceResponse) { }
rpc SpanConfigConformance (SpanConfigConformanceRequest) returns (SpanConfigConformanceResponse) {}

// TenantSettings is used by tenants to obtain and stay up to date with tenant
// setting overrides.
rpc TenantSettings (TenantSettingsRequest) returns (stream TenantSettingsEvent) { }

rpc TenantSettings (TenantSettingsRequest) returns (stream TenantSettingsEvent) {}

// GetRangeDescriptors is used by tenants to get range descriptors for their
// own ranges.
rpc GetRangeDescriptors (GetRangeDescriptorsRequest) returns (stream GetRangeDescriptorsResponse) { }
rpc GetRangeDescriptors (GetRangeDescriptorsRequest) returns (stream GetRangeDescriptorsResponse) {}
}

// GetRangeDescriptorsRequest is used to fetch range descriptors.
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvpb/kvpbmock/mocks_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"restricted_internal_client.go",
"settings.go",
"snappy.go",
"stream_pool.go",
"tls.go",
],
embed = [":rpc_go_proto"],
Expand Down Expand Up @@ -90,7 +91,10 @@ go_library(
gomock(
name = "mock_rpc",
out = "mocks_generated_test.go",
interfaces = ["Dialbacker"],
interfaces = [
"BatchStreamClient",
"Dialbacker",
],
library = ":rpc",
package = "rpc",
self_package = "github.com/cockroachdb/cockroach/pkg/rpc",
Expand All @@ -116,6 +120,7 @@ go_test(
"metrics_test.go",
"peer_test.go",
"snappy_test.go",
"stream_pool_test.go",
"tls_test.go",
":mock_rpc", # keep
],
Expand Down Expand Up @@ -173,6 +178,7 @@ go_test(
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//peer",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
],
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/rpc/auth_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ func (a tenantAuthorizer) authorize(
req interface{},
) error {
switch fullMethod {
case "/cockroach.roachpb.Internal/Batch":
case "/cockroach.roachpb.Internal/Batch", "/cockroach.roachpb.Internal/BatchStream":
return a.authBatch(ctx, sv, tenID, req.(*kvpb.BatchRequest))

case "/cockroach.roachpb.Internal/RangeLookup":
return a.authRangeLookup(ctx, tenID, req.(*kvpb.RangeLookupRequest))

case "/cockroach.roachpb.Internal/RangeFeed", "/cockroach.roachpb.Internal/MuxRangeFeed":
return a.authRangeFeed(tenID, req.(*kvpb.RangeFeedRequest))

case "/cockroach.roachpb.Internal/GossipSubscription":
return a.authGossipSubscription(tenID, req.(*kvpb.GossipSubscriptionRequest))

Expand Down
26 changes: 25 additions & 1 deletion pkg/rpc/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,30 @@ func TestTenantAuthRequest(t *testing.T) {
expErr: noError,
},
},
"/cockroach.roachpb.Internal/BatchStream": {
{
req: &kvpb.BatchRequest{},
expErr: `requested key span /Max not fully contained in tenant keyspace /Tenant/1{0-1}`,
},
{
req: &kvpb.BatchRequest{Requests: makeReqs(
makeReq("a", "b"),
)},
expErr: `requested key span {a-b} not fully contained in tenant keyspace /Tenant/1{0-1}`,
},
{
req: &kvpb.BatchRequest{Requests: makeReqs(
makeReq(prefix(5, "a"), prefix(5, "b")),
)},
expErr: `requested key span /Tenant/5{a-b} not fully contained in tenant keyspace /Tenant/1{0-1}`,
},
{
req: &kvpb.BatchRequest{Requests: makeReqs(
makeReq(prefix(10, "a"), prefix(10, "b")),
)},
expErr: noError,
},
},
"/cockroach.roachpb.Internal/RangeLookup": {
{
req: &kvpb.RangeLookupRequest{},
Expand Down Expand Up @@ -1009,7 +1033,7 @@ func TestTenantAuthRequest(t *testing.T) {
// cross-read capability and the request is a read, expect no error.
if canCrossRead && strings.Contains(tc.expErr, "fully contained") {
switch method {
case "/cockroach.roachpb.Internal/Batch":
case "/cockroach.roachpb.Internal/Batch", "/cockroach.roachpb.Internal/BatchStream":
if tc.req.(*kvpb.BatchRequest).IsReadOnly() {
tc.expErr = noError
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/rpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,26 @@ type Connection struct {
// It always has to be signaled eventually, regardless of the stopper
// draining, etc, since callers might be blocking on it.
connFuture connFuture
// batchStreamPool holds a pool of BatchStreamClient streams established on
// the connection. The pool can be used to avoid the overhead of unary Batch
// RPCs.
//
// The pool is only initialized once the ClientConn is resolved.
batchStreamPool BatchStreamPool
}

// newConnectionToNodeID makes a Connection for the given node, class, and nontrivial Signal
// that should be queried in Connect().
func newConnectionToNodeID(k peerKey, breakerSignal func() circuit.Signal) *Connection {
func newConnectionToNodeID(
opts *ContextOptions, k peerKey, breakerSignal func() circuit.Signal,
) *Connection {
c := &Connection{
breakerSignalFn: breakerSignal,
k: k,
connFuture: connFuture{
ready: make(chan struct{}),
},
batchStreamPool: makeStreamPool(opts.Stopper, newBatchStream),
}
return c
}
Expand Down Expand Up @@ -156,6 +165,13 @@ func (c *Connection) Signal() circuit.Signal {
return c.breakerSignalFn()
}

func (c *Connection) BatchStreamPool() *BatchStreamPool {
if !c.connFuture.Resolved() {
panic("BatchStreamPool called on unresolved connection")
}
return &c.batchStreamPool
}

type connFuture struct {
ready chan struct{}
cc *grpc.ClientConn
Expand Down
4 changes: 4 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ func (*internalServer) Batch(context.Context, *kvpb.BatchRequest) (*kvpb.BatchRe
return nil, nil
}

func (*internalServer) BatchStream(stream kvpb.Internal_BatchStreamServer) error {
panic("unimplemented")
}

func (*internalServer) RangeLookup(
context.Context, *kvpb.RangeLookupRequest,
) (*kvpb.RangeLookupResponse, error) {
Expand Down
55 changes: 54 additions & 1 deletion pkg/rpc/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/rpc/nodedialer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/kv/kvbase",
"//pkg/kv/kvpb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/circuit",
"//pkg/util/log",
"//pkg/util/metamorphic",
"//pkg/util/stop",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand Down
Loading
Loading