Skip to content

Commit

Permalink
Merge #76306
Browse files Browse the repository at this point in the history
76306: rpc: run grpc interceptors for local RPCs r=andreimatei a=andreimatei

Before this patch, the gRPC interceptors that we normally run on the
client and the server were not being run for "RPCs" to the "Internal"
sevice going to the local node. Such RPCs are done through the
localClientAdapter, which bypasses gRPC for performance reasons. Not
running these interceptors is bad - they do useful things around
creating tasks, tracing, authentication/authorization.
Some of the things done by the interceptors was done separately and
redundantly in random places to account for the RPC local case.

This patch unifies the paths of local and remote RPCs by having the
localClientAdapter run the client and server interceptors. Care was
taken for the performance regression on the local path; still, one
new allocation is imposed by the client interceptor interface for
(unary) RPC calls: the result of the RPC needs to be allocated before
the RPC is made, and the proto that the server returns needs to be
copied over.
I've run some benchmarks with this change and it didn't show anything.

Node.batchInternal was simplified slightly to not create a task, since
the server interceptor now does it in all cases. Before, creating the
task was redundant in the non-local RPC case.

Fixes #74732

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Jun 7, 2022
2 parents 0c8f826 + 3ebf6ee commit 175eebd
Show file tree
Hide file tree
Showing 117 changed files with 1,400 additions and 913 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ ALL_TESTS = [
"//pkg/util/timeutil/pgdate:pgdate_test",
"//pkg/util/timeutil:timeutil_test",
"//pkg/util/tracing/collector:collector_test",
"//pkg/util/tracing/grpcinterceptor:grpcinterceptor_test",
"//pkg/util/tracing/service:service_test",
"//pkg/util/tracing:tracing_test",
"//pkg/util/treeprinter:treeprinter_test",
Expand Down
1 change: 0 additions & 1 deletion pkg/bench/rttanalysis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/system",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
8 changes: 4 additions & 4 deletions pkg/bench/rttanalysis/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
)

// ClusterConstructor is used to construct a Cluster for an individual case run.
Expand All @@ -31,7 +31,7 @@ func MakeClusterConstructor(
) ClusterConstructor {
return func(t testing.TB) *Cluster {
c := &Cluster{}
beforePlan := func(trace tracing.Recording, stmt string) {
beforePlan := func(trace tracingpb.Recording, stmt string) {
if _, ok := c.stmtToKVBatchRequests.Load(stmt); ok {
c.stmtToKVBatchRequests.Store(stmt, trace)
}
Expand Down Expand Up @@ -60,9 +60,9 @@ func (c *Cluster) clearStatementTrace(stmt string) {
c.stmtToKVBatchRequests.Store(stmt, nil)
}

func (c *Cluster) getStatementTrace(stmt string) (tracing.Recording, bool) {
func (c *Cluster) getStatementTrace(stmt string) (tracingpb.Recording, bool) {
out, _ := c.stmtToKVBatchRequests.Load(stmt)
r, ok := out.(tracing.Recording)
r, ok := out.(tracingpb.Recording)
return r, ok
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/bench/rttanalysis/rtt_analysis_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -116,7 +115,7 @@ func executeRoundTripTest(b testingB, tc RoundTripBenchTestCase, cc ClusterConst
roundTrips := 0
b.ResetTimer()
b.StopTimer()
var r tracing.Recording
var r tracingpb.Recording

// Do an extra iteration and don't record it in order to deal with effects of
// running it the first time.
Expand Down Expand Up @@ -168,12 +167,12 @@ const roundTripsMetric = "roundtrips"

// count the number of KvBatchRequests inside a recording, this is done by
// counting each "txn coordinator send" operation.
func countKvBatchRequestsInRecording(r tracing.Recording) (sends int, hasRetry bool) {
func countKvBatchRequestsInRecording(r tracingpb.Recording) (sends int, hasRetry bool) {
root := r[0]
return countKvBatchRequestsInSpan(r, root)
}

func countKvBatchRequestsInSpan(r tracing.Recording, sp tracingpb.RecordedSpan) (int, bool) {
func countKvBatchRequestsInSpan(r tracingpb.Recording, sp tracingpb.RecordedSpan) (int, bool) {
count := 0
// Count the number of OpTxnCoordSender operations while traversing the
// tree of spans.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/boundedstaleness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (bse *boundedStalenessEvents) onTxnRetry(
}
}

func (bse *boundedStalenessEvents) onStmtTrace(nodeIdx int, rec tracing.Recording, stmt string) {
func (bse *boundedStalenessEvents) onStmtTrace(nodeIdx int, rec tracingpb.Recording, stmt string) {
bse.mu.Lock()
defer bse.mu.Unlock()

Expand Down Expand Up @@ -275,7 +275,7 @@ func TestBoundedStalenessDataDriven(t *testing.T) {
clusterArgs.ServerArgsPerNode[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracing.Recording, stmt string) {
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
bse.onStmtTrace(i, trace, stmt)
},
OnTxnRetry: func(err error, evalCtx *eval.Context) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
defer utilccl.TestingEnableEnterprise()()

historicalQuery := `SELECT * FROM test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracing.Recording, 1)
recCh := make(chan tracingpb.Recording, 1)

var n2Addr, n3Addr syncutil.AtomicString
tc := testcluster.StartTestCluster(t, 4,
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
},
},
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracing.Recording, stmt string) {
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
if stmt == historicalQuery {
recCh <- trace
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ go_test(
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/kvccl/kvtenantccl/tenant_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
)
Expand All @@ -50,7 +50,7 @@ func testTenantTracesAreRedactedImpl(t *testing.T, redactable bool) {
visibleString = "tenant-can-see-this"
)

recCh := make(chan tracing.Recording, 1)
recCh := make(chan tracingpb.Recording, 1)

args := base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand All @@ -64,7 +64,7 @@ func testTenantTracesAreRedactedImpl(t *testing.T, redactable bool) {
},
},
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracing.Recording, stmt string) {
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
if stmt == testStmt {
recCh <- trace
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_test(
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/multiregionccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestMultiRegionDataDriven(t *testing.T) {
defer ds.cleanup(ctx)
var mu syncutil.Mutex
var traceStmt string
var recCh chan tracing.Recording
var recCh chan tracingpb.Recording
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "sleep-for-follower-read":
Expand All @@ -135,7 +136,7 @@ func TestMultiRegionDataDriven(t *testing.T) {
}
serverArgs := make(map[int]base.TestServerArgs)
localityNames := strings.Split(localities, ",")
recCh = make(chan tracing.Recording, 1)
recCh = make(chan tracingpb.Recording, 1)
for i, localityName := range localityNames {
localityCfg := roachpb.Locality{
Tiers: []roachpb.Tier{
Expand All @@ -147,7 +148,7 @@ func TestMultiRegionDataDriven(t *testing.T) {
Locality: localityCfg,
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracing.Recording, stmt string) {
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
mu.Lock()
defer mu.Unlock()
if stmt == traceStmt {
Expand Down Expand Up @@ -475,7 +476,7 @@ func nodeIdToIdx(t *testing.T, tc serverutils.TestClusterInterface, id roachpb.N
// message. An error is returned if more than one (or no) "dist sender send"
// messages are found in the recording.
func checkReadServedLocallyInSimpleRecording(
rec tracing.Recording,
rec tracingpb.Recording,
) (servedLocally bool, servedUsingFollowerReads bool, err error) {
foundDistSenderSend := false
for _, sp := range rec {
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/multiregionccl/roundtrips_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand All @@ -36,7 +37,7 @@ func TestEnsureLocalReadsOnGlobalTables(t *testing.T) {

// ensureOnlyLocalReads looks at a trace to ensure that reads were served
// locally. It returns true if the read was served as a follower read.
ensureOnlyLocalReads := func(t *testing.T, rec tracing.Recording) (servedUsingFollowerReads bool) {
ensureOnlyLocalReads := func(t *testing.T, rec tracingpb.Recording) (servedUsingFollowerReads bool) {
for _, sp := range rec {
if sp.Operation == "dist sender send" {
require.True(t, tracing.LogsContainMsg(sp, kvbase.RoutingRequestLocallyMsg),
Expand All @@ -57,11 +58,11 @@ func TestEnsureLocalReadsOnGlobalTables(t *testing.T) {
}

presentTimeRead := `SELECT * FROM t.test_table WHERE k=2`
recCh := make(chan tracing.Recording, 1)
recCh := make(chan tracingpb.Recording, 1)

knobs := base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracing.Recording, stmt string) {
WithStatementTrace: func(trace tracingpb.Recording, stmt string) {
if stmt == presentTimeRead {
recCh <- trace
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ go_library(
"//pkg/util/sysutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracing/zipper",
"//pkg/util/uuid",
"//pkg/workload",
Expand Down
7 changes: 4 additions & 3 deletions pkg/cli/debug_send_kv_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -226,7 +227,7 @@ func runSendKVBatch(cmd *cobra.Command, args []string) error {

func sendKVBatchRequestWithTracingOption(
ctx context.Context, verboseTrace bool, admin serverpb.AdminClient, ba *roachpb.BatchRequest,
) (br *roachpb.BatchResponse, rec tracing.Recording, err error) {
) (br *roachpb.BatchResponse, rec tracingpb.Recording, err error) {
var sp *tracing.Span
if verboseTrace {
// Set up a tracing span and enable verbose tracing if requested by
Expand All @@ -237,7 +238,7 @@ func sendKVBatchRequestWithTracingOption(
// because otherwise the unit test TestSendKVBatch becomes non-deterministic
// on the contents of the traceInfo JSON field in the request.
_, sp = tracing.NewTracer().StartSpanCtx(ctx, "debug-send-kv-batch",
tracing.WithRecording(tracing.RecordingVerbose))
tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()

// Inject the span metadata into the KV request.
Expand All @@ -252,7 +253,7 @@ func sendKVBatchRequestWithTracingOption(
sp.ImportRemoteRecording(br.CollectedSpans)

// Extract the recording.
rec = sp.GetRecording(tracing.RecordingVerbose)
rec = sp.GetRecording(tracingpb.RecordingVerbose)
}

return br, rec, errors.Wrap(err, "request failed")
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
3 changes: 2 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel/attribute"
)
Expand Down Expand Up @@ -384,7 +385,7 @@ func (r *Registry) runJob(
// A new root span will be created on every resumption of the job.
var spanOptions []tracing.SpanOption
if tj, ok := resumer.(TraceableJob); ok && tj.ForceRealSpan() {
spanOptions = append(spanOptions, tracing.WithRecording(tracing.RecordingStructured))
spanOptions = append(spanOptions, tracing.WithRecording(tracingpb.RecordingStructured))
}
// TODO(ajwerner): Move this writing up the trace ID down into
// stepThroughStateMachine where we're already often (and soon with
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
Expand Down Expand Up @@ -85,6 +86,7 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ go_test(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,14 @@ func (ds *DistSender) singleRangeFeed(
}

args.Replica = transport.NextReplica()
clientCtx, client, err := transport.NextInternalClient(ctx)
client, err := transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
continue
}

log.VEventf(ctx, 3, "attempting to create a RangeFeed over replica %s", args.Replica)
stream, err := client.RangeFeed(clientCtx, &args)
stream, err := client.RangeFeed(ctx, &args)
if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
if grpcutil.IsAuthError(err) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(repl)
transport.EXPECT().NextInternalClient(gomock.Any()).Return(
ctx, nil, grpcstatus.Error(spec.errorCode, ""))
nil, grpcstatus.Error(spec.errorCode, ""))
}
transport.EXPECT().IsExhausted().Return(true)
transport.EXPECT().Release()
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil)
transport.EXPECT().IsExhausted().Return(false)
transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0])
transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil)
transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil)
transport.EXPECT().Release()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (l *simpleTransportAdapter) SendNext(

func (l *simpleTransportAdapter) NextInternalClient(
ctx context.Context,
) (context.Context, roachpb.InternalClient, error) {
) (rpc.RestrictedInternalClient, error) {
panic("unimplemented")
}

Expand Down
Loading

0 comments on commit 175eebd

Please sign in to comment.