diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index c1df16842904..2d56b56185a8 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -221,7 +221,7 @@ func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() bp.agg.Close() if bp.InternalClose() { - bp.memAcc.Close(bp.Ctx) + bp.memAcc.Close(bp.Ctx()) } } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 3dae7307c1ee..6e2a72678683 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -559,8 +559,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce prog.ProgressDetails = *details case meta := <-rd.metaCh: return nil, meta - case <-rd.Ctx.Done(): - rd.MoveToDraining(rd.Ctx.Err()) + case <-rd.Ctx().Done(): + rd.MoveToDraining(rd.Ctx().Err()) return nil, rd.DrainHelper() } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 0fb04d123ed2..6c3234ed7f45 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -395,8 +395,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix) - mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx, - mockRestoreDataSpec) + mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec) require.NoError(t, err) ssts := make(chan mergedSST, 1) require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) @@ -439,15 +438,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } func newTestingRestoreDataProcessor( - ctx context.Context, - evalCtx *eval.Context, - flowCtx *execinfra.FlowCtx, - spec execinfrapb.RestoreDataSpec, + evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec, ) (*restoreDataProcessor, error) { rd := &restoreDataProcessor{ ProcessorBase: execinfra.ProcessorBase{ ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{ - Ctx: ctx, EvalCtx: evalCtx, }, }, diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 101d1ecff4e9..315677b898ae 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -446,7 +446,7 @@ func (ca *changeAggregator) close() { } if ca.eventConsumer != nil { if err := ca.eventConsumer.Close(); err != nil { - log.Warningf(ca.Ctx, "error closing event consumer: %s", err) + log.Warningf(ca.Ctx(), "error closing event consumer: %s", err) } } @@ -454,11 +454,11 @@ func (ca *changeAggregator) close() { // Best effort: context is often cancel by now, so we expect to see an error _ = ca.sink.Close() } - ca.memAcc.Close(ca.Ctx) + ca.memAcc.Close(ca.Ctx()) if ca.kvFeedMemMon != nil { - ca.kvFeedMemMon.Stop(ca.Ctx) + ca.kvFeedMemMon.Stop(ca.Ctx()) } - ca.MemMonitor.Stop(ca.Ctx) + ca.MemMonitor.Stop(ca.Ctx()) ca.InternalClose() } @@ -501,7 +501,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // kvFeed, sends off this event to the event consumer, and flushes the sink // if necessary. func (ca *changeAggregator) tick() error { - event, err := ca.eventProducer.Get(ca.Ctx) + event, err := ca.eventProducer.Get(ca.Ctx()) if err != nil { return err } @@ -516,16 +516,16 @@ func (ca *changeAggregator) tick() error { ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds()) } ca.recentKVCount++ - return ca.eventConsumer.ConsumeEvent(ca.Ctx, event) + return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event) case kvevent.TypeResolved: a := event.DetachAlloc() - a.Release(ca.Ctx) + a.Release(ca.Ctx()) resolved := event.Resolved() if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: - return ca.sink.Flush(ca.Ctx) + return ca.sink.Flush(ca.Ctx()) } return nil @@ -568,7 +568,7 @@ func (ca *changeAggregator) flushFrontier() error { // otherwise, we could lose buffered messages and violate the // at-least-once guarantee. This is also true for checkpointing the // resolved spans in the job progress. - if err := ca.sink.Flush(ca.Ctx); err != nil { + if err := ca.sink.Flush(ca.Ctx()); err != nil { return err } @@ -996,8 +996,8 @@ func (cf *changeFrontier) close() { // Best effort: context is often cancel by now, so we expect to see an error _ = cf.sink.Close() } - cf.memAcc.Close(cf.Ctx) - cf.MemMonitor.Stop(cf.Ctx) + cf.memAcc.Close(cf.Ctx()) + cf.MemMonitor.Stop(cf.Ctx()) } } @@ -1104,7 +1104,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error { // job progress update closure, but it currently doesn't pass along the info // we'd need to do it that way. if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) { - logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV, + logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV, `got a span level timestamp %s for %s that is less than the initial high-water %s`, redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart)) continue @@ -1206,7 +1206,7 @@ func (cf *changeFrontier) maybeCheckpointJob( if err != nil { return false, err } - cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) + cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart)) return updated, nil } @@ -1224,7 +1224,7 @@ func (cf *changeFrontier) checkpointJobProgress( var updateSkipped error if cf.js.job != nil { - if err := cf.js.job.Update(cf.Ctx, nil, func( + if err := cf.js.job.Update(cf.Ctx(), nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { // If we're unable to update the job due to the job state, such as during @@ -1249,8 +1249,8 @@ func (cf *changeFrontier) checkpointJobProgress( if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { timestampManager = cf.deprecatedManageProtectedTimestamps } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { - log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) + if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil { + log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err) return err } @@ -1278,7 +1278,7 @@ func (cf *changeFrontier) checkpointJobProgress( } if updateSkipped != nil { - log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped) + log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped) return false, nil } @@ -1377,7 +1377,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { if !shouldEmit { return nil } - if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil { + if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil { return err } cf.lastEmitResolved = newResolved.GoTime() @@ -1416,13 +1416,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) { description = fmt.Sprintf("job %d", cf.spec.JobID) } if frontierChanged && cf.slowLogEveryN.ShouldProcess(now) { - log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s", + log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s", description, frontier, resolvedBehind) } if cf.slowLogEveryN.ShouldProcess(now) { s := cf.frontier.PeekFrontierSpan() - log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind) + log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind) } } diff --git a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel index c2ee10a6a5ee..a76d53840aed 100644 --- a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel @@ -46,6 +46,7 @@ go_test( "main_test.go", "setting_overrides_test.go", "tenant_kv_test.go", + "tenant_range_lookup_test.go", "tenant_trace_test.go", "tenant_upgrade_test.go", ], @@ -59,7 +60,10 @@ go_test( "//pkg/config", "//pkg/gossip", "//pkg/jobs", + "//pkg/keys", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/kvtenant", + "//pkg/kv/kvclient/rangecache", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index 18f38c593ade..fa7bd6aa7532 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -389,10 +389,7 @@ func (c *Connector) RangeLookup( // for more discussion on the choice of ReadConsistency and its // implications. ReadConsistency: rc, - // Until we add protection in the Internal service implementation to - // prevent prefetching from traversing into RangeDescriptors owned by - // other tenants, we must disable prefetching. - PrefetchNum: 0, + PrefetchNum: kvcoord.RangeLookupPrefetchCount, PrefetchReverse: useReverseScan, }) if err != nil { diff --git a/pkg/ccl/kvccl/kvtenantccl/connector_test.go b/pkg/ccl/kvccl/kvtenantccl/connector_test.go index 444735630ef0..2b3499ad7289 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -313,7 +314,7 @@ func TestConnectorRangeLookup(t *testing.T) { // Validate request. assert.Equal(t, roachpb.RKey("a"), req.Key) assert.Equal(t, roachpb.READ_UNCOMMITTED, req.ReadConsistency) - assert.Equal(t, int64(0), req.PrefetchNum) + assert.Equal(t, int64(kvcoord.RangeLookupPrefetchCount), req.PrefetchNum) assert.Equal(t, false, req.PrefetchReverse) // Respond. diff --git a/pkg/ccl/kvccl/kvtenantccl/tenant_range_lookup_test.go b/pkg/ccl/kvccl/kvtenantccl/tenant_range_lookup_test.go new file mode 100644 index 000000000000..b9dd06fff596 --- /dev/null +++ b/pkg/ccl/kvccl/kvtenantccl/tenant_range_lookup_test.go @@ -0,0 +1,81 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package kvtenantccl_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestRangeLookupPrefetchFiltering is an integration test to ensure that +// range results are filtered for the client. +func TestRangeLookupPrefetchFiltering(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + DisableDefaultTestTenant: true, // we're going to manually add tenants + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + ten2ID := roachpb.MustMakeTenantID(2) + tenant2, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{ + TenantID: ten2ID, + }) + require.NoError(t, err) + + // Split some ranges within tenant2 that we'll want to see in prefetch. + ten2Codec := keys.MakeSQLCodec(ten2ID) + ten2Split1 := append(ten2Codec.TenantPrefix(), 'a') + ten2Split2 := append(ten2Codec.TenantPrefix(), 'b') + { + tc.SplitRangeOrFatal(t, ten2Split1) + tc.SplitRangeOrFatal(t, ten2Split2) + } + + // Split some ranges for the tenant which comes after tenant2. + { + ten3Codec := keys.MakeSQLCodec(roachpb.MustMakeTenantID(3)) + tc.SplitRangeOrFatal(t, ten3Codec.TenantPrefix()) + tc.SplitRangeOrFatal(t, append(ten3Codec.TenantPrefix(), 'b')) + tc.SplitRangeOrFatal(t, append(ten3Codec.TenantPrefix(), 'c')) + } + + // Do the fetch and make sure we prefetch all the ranges we should see, + // and none of the ranges we should not. + db := tenant2.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().DB() + prefixRKey := keys.MustAddr(ten2Codec.TenantPrefix()) + res, prefetch, err := db.RangeLookup( + ctx, prefixRKey, + rangecache.ReadFromLeaseholder, false, /* useReverseScan */ + ) + require.NoError(t, err) + require.Len(t, res, 1) + require.Equal(t, prefixRKey, res[0].StartKey) + require.Len(t, prefetch, 2) + require.Equal(t, keys.MustAddr(ten2Split1), prefetch[0].StartKey) + require.Equal(t, keys.MustAddr(ten2Split2), prefetch[1].StartKey) +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 5807bc10f2b5..2609c40d40b4 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -298,13 +298,13 @@ func (sf *streamIngestionFrontier) Next() ( if err := sf.maybeUpdatePartitionProgress(); err != nil { // Updating the partition progress isn't a fatal error. - log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err) + log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err) } // Send back a row to the job so that it can update the progress. select { - case <-sf.Ctx.Done(): - sf.MoveToDraining(sf.Ctx.Err()) + case <-sf.Ctx().Done(): + sf.MoveToDraining(sf.Ctx().Err()) return nil, sf.DrainHelper() // Send the latest persisted highwater in the heartbeat to the source cluster // as even with retries we will never request an earlier row than it, and @@ -315,7 +315,7 @@ func (sf *streamIngestionFrontier) Next() ( case <-sf.heartbeatSender.stoppedChan: err := sf.heartbeatSender.wait() if err != nil { - log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err) + log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err) } sf.MoveToDraining(err) return nil, sf.DrainHelper() @@ -326,7 +326,7 @@ func (sf *streamIngestionFrontier) Next() ( func (sf *streamIngestionFrontier) close() { if err := sf.heartbeatSender.stop(); err != nil { - log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err) + log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err) } if sf.InternalClose() { sf.metrics.RunningCount.Dec(1) @@ -392,7 +392,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps( // maybeUpdatePartitionProgress polls the frontier and updates the job progress with // partition-specific information to track the status of each partition. func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { - ctx := sf.Ctx + ctx := sf.Ctx() updateFreq := JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV) if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq { return nil diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 852914bf54e6..22aba9cbca3c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -405,10 +405,10 @@ func (sip *streamIngestionProcessor) close() { } for _, client := range sip.streamPartitionClients { - _ = client.Close(sip.Ctx) + _ = client.Close(sip.Ctx()) } if sip.batcher != nil { - sip.batcher.Close(sip.Ctx) + sip.batcher.Close(sip.Ctx()) } if sip.maxFlushRateTimer != nil { sip.maxFlushRateTimer.Stop() @@ -539,7 +539,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil { - if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil { + if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx()); err != nil { return nil, err } } @@ -603,7 +603,7 @@ func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) er // careful with checkpoints: we can only send checkpoint whose TS >= SST batch TS // after the full SST gets ingested. - _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst") + _, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-sst") defer sp.Finish() return streamingccl.ScanSST(sst, sst.Span, func(keyVal storage.MVCCKeyValue) error { @@ -653,7 +653,7 @@ func (sip *streamIngestionProcessor) bufferDelRange(delRange *roachpb.RangeFeedD func (sip *streamIngestionProcessor) bufferRangeKeyVal( rangeKeyVal storage.MVCCRangeKeyValue, ) error { - _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-range-key") + _, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-range-key") defer sp.Finish() var err error @@ -791,7 +791,7 @@ func (r *rangeKeyBatcher) reset() { } func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { - ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush") + ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush") defer sp.Finish() flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 02bb3398e515..8587a21bec57 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -190,8 +190,9 @@ var CanSendToFollower = func( const ( // The default limit for asynchronous senders. defaultSenderConcurrency = 1024 - // The maximum number of range descriptors to prefetch during range lookups. - rangeLookupPrefetchCount = 8 + // RangeLookupPrefetchCount is the maximum number of range descriptors to prefetch + // during range lookups. + RangeLookupPrefetchCount = 8 // The maximum number of times a replica is retried when it repeatedly returns // stale lease info. sameReplicaRetryLimit = 10 @@ -560,7 +561,7 @@ func (ds *DistSender) RangeLookup( // RangeDescriptor is not on the first range we send the lookup too, we'll // still find it when we scan to the next range. This addresses the issue // described in #18032 and #16266, allowing us to support meta2 splits. - return kv.RangeLookup(ctx, ds, key.AsRawKey(), rc, rangeLookupPrefetchCount, useReverseScan) + return kv.RangeLookup(ctx, ds, key.AsRawKey(), rc, RangeLookupPrefetchCount, useReverseScan) } // FirstRange implements the RangeDescriptorDB interface. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 5fcb0450cb03..143b1364e43d 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -390,6 +390,7 @@ go_test( "status_ext_test.go", "status_test.go", "sticky_engine_test.go", + "tenant_range_lookup_test.go", "testserver_test.go", "user_test.go", "version_cluster_test.go", @@ -462,6 +463,7 @@ go_test( "//pkg/upgrade/upgrades", "//pkg/util", "//pkg/util/admission", + "//pkg/util/encoding", "//pkg/util/envutil", "//pkg/util/grpcutil", "//pkg/util/hlc", diff --git a/pkg/server/node.go b/pkg/server/node.go index c38364f7acad..dc687088da23 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1298,6 +1298,40 @@ func setupSpanForIncomingRPC( } } +func tenantPrefix(tenID roachpb.TenantID) roachpb.RSpan { + // TODO(nvanbenschoten): consider caching this span. + prefix := roachpb.RKey(keys.MakeTenantPrefix(tenID)) + return roachpb.RSpan{ + Key: prefix, + EndKey: prefix.PrefixEnd(), + } +} + +// filterRangeLookupResultsForTenant extracts the tenant ID from the context. +// It filters descs to only include the prefix which have a start key in the +// tenant's span. If there is no tenant in the context, it will filter all +// the descriptors. +func filterRangeLookupResponseForTenant( + ctx context.Context, descs []roachpb.RangeDescriptor, +) []roachpb.RangeDescriptor { + tenID, ok := roachpb.TenantFromContext(ctx) + if !ok { + // If we do not know the tenant, don't permit any pre-fetching. + return []roachpb.RangeDescriptor{} + } + rs := tenantPrefix(tenID) + truncated := descs[:0] + // We say that any range which has a start key within the tenant prefix is + // fair game for the tenant to know about. + for _, d := range descs { + if !rs.ContainsKey(d.StartKey) { + break + } + truncated = append(truncated, d) + } + return truncated +} + // RangeLookup implements the roachpb.InternalServer interface. func (n *Node) RangeLookup( ctx context.Context, req *roachpb.RangeLookupRequest, @@ -1324,7 +1358,7 @@ func (n *Node) RangeLookup( resp.Error = roachpb.NewError(err) } else { resp.Descriptors = rs - resp.PrefetchedDescriptors = preRs + resp.PrefetchedDescriptors = filterRangeLookupResponseForTenant(ctx, preRs) } return resp, nil } diff --git a/pkg/server/tenant_range_lookup_test.go b/pkg/server/tenant_range_lookup_test.go new file mode 100644 index 000000000000..d8d6c51eaf11 --- /dev/null +++ b/pkg/server/tenant_range_lookup_test.go @@ -0,0 +1,118 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestFilterRangeLookupResponseForTenant unit tests the logic to filter +// RangeLookup connector requests. +func TestFilterRangeLookupResponseForTenant(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + mkKey := func(tenant uint64, str string) roachpb.RKey { + codec := keys.MakeSQLCodec(roachpb.MustMakeTenantID(tenant)) + return encoding.EncodeStringAscending(codec.TenantPrefix(), str) + } + mkRangeDescriptor := func(start, end roachpb.RKey) roachpb.RangeDescriptor { + return *roachpb.NewRangeDescriptor(1, start, end, roachpb.MakeReplicaSet(nil)) + } + for _, tc := range []struct { + name string + id roachpb.TenantID + descs []roachpb.RangeDescriptor + exp int + skipTenantContext bool + }{ + // tenant 1, the "system tenant" can see everything. + { + name: "tenant 1 is special", + id: roachpb.MustMakeTenantID(1), + descs: []roachpb.RangeDescriptor{ + mkRangeDescriptor(mkKey(1, "a"), mkKey(1, "b")), + mkRangeDescriptor(mkKey(1, "b"), mkKey(1, "z")), + mkRangeDescriptor(mkKey(2, "z"), mkKey(2, "")), + mkRangeDescriptor(mkKey(2, ""), mkKey(2, "a")), + mkRangeDescriptor(mkKey(2, "a"), mkKey(3, "")), + mkRangeDescriptor(mkKey(3, ""), mkKey(4, "")), + }, + exp: 6, + }, + // tenant 2 is a normal secondary tenant and can only see its own data. + { + name: "filter to tenant data", + id: roachpb.MustMakeTenantID(2), + descs: []roachpb.RangeDescriptor{ + mkRangeDescriptor(mkKey(2, "a"), mkKey(2, "b")), + mkRangeDescriptor(mkKey(2, "b"), mkKey(2, "z")), + mkRangeDescriptor(mkKey(2, "z"), mkKey(3, "")), + mkRangeDescriptor(mkKey(3, ""), mkKey(3, "a")), + }, + exp: 3, + }, + // tenant 2 is a normal secondary tenant and can only see its own data, + // but this includes the case where the range overlaps with multiple + // tenants. + { + name: "filter to tenant data even though range crosses tenants", + id: roachpb.MustMakeTenantID(2), + descs: []roachpb.RangeDescriptor{ + mkRangeDescriptor(mkKey(2, "a"), mkKey(2, "b")), + mkRangeDescriptor(mkKey(2, "b"), mkKey(2, "z")), + mkRangeDescriptor(mkKey(2, "z"), mkKey(4, "")), + mkRangeDescriptor(mkKey(4, ""), mkKey(4, "a")), + }, + exp: 3, + }, + // If there is no tenant ID in the context, only one result should be + // returned. + { + id: roachpb.MustMakeTenantID(2), + descs: []roachpb.RangeDescriptor{ + mkRangeDescriptor(mkKey(2, "a"), mkKey(2, "b")), + mkRangeDescriptor(mkKey(2, "b"), mkKey(2, "z")), + mkRangeDescriptor(mkKey(2, "z"), mkKey(3, "")), + mkRangeDescriptor(mkKey(3, ""), mkKey(3, "a")), + }, + skipTenantContext: true, + exp: 0, + }, + // Other code should prevent a request that might return descriptors from + // another tenant, however, defensively this code should also filter them. + { + id: roachpb.MustMakeTenantID(3), + descs: []roachpb.RangeDescriptor{ + mkRangeDescriptor(mkKey(2, "a"), mkKey(2, "b")), + mkRangeDescriptor(mkKey(2, "b"), mkKey(2, "z")), + mkRangeDescriptor(mkKey(2, "z"), mkKey(3, "")), + mkRangeDescriptor(mkKey(3, ""), mkKey(3, "a")), + }, + exp: 0, + }, + } { + tenantCtx := ctx + if !tc.skipTenantContext { + tenantCtx = roachpb.NewContextForTenant(ctx, tc.id) + } + got := filterRangeLookupResponseForTenant(tenantCtx, tc.descs) + require.Len(t, got, tc.exp) + require.Equal(t, tc.descs[:tc.exp], got) + } +} diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 16f055548d21..7c93df68499f 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -58,6 +58,7 @@ type Columnarizer struct { colexecop.NonExplainable mode columnarizerMode + initialized bool helper colmem.SetAccountingHelper metadataAllocator *colmem.Allocator input execinfra.RowSource @@ -177,13 +178,10 @@ func newColumnarizer( // Init is part of the colexecop.Operator interface. func (c *Columnarizer) Init(ctx context.Context) { - if c.removedFromFlow { - return - } - if c.Ctx != nil { - // Init has already been called. + if c.removedFromFlow || c.initialized { return } + c.initialized = true c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1) ctx = c.StartInternal(ctx, "columnarizer" /* name */) c.input.Start(ctx) @@ -279,7 +277,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { // When this method returns, we no longer will have the reference to the // metadata, so we can release all memory from the metadata allocator. defer c.metadataAllocator.ReleaseAll() - if c.Ctx == nil { + if !c.initialized { // The columnarizer wasn't initialized, so the wrapped processors might // not have been started leaving them in an unsafe to drain state, so // we skip the draining. Mostly likely this happened because a panic was @@ -303,13 +301,6 @@ func (c *Columnarizer) Close(context.Context) error { return nil } c.helper.Release() - if c.Ctx == nil { - // The columnarizer wasn't initialized, so the wrapped processors might - // not have been started leaving them in a state unsafe for the - // InternalClose, so we skip that. Mostly likely this happened because a - // panic was encountered in Init. - return nil - } c.InternalClose() return nil } @@ -317,7 +308,7 @@ func (c *Columnarizer) Close(context.Context) error { func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata { // Close will call InternalClose(). Note that we don't return any trailing // metadata here because the columnarizers propagate it in DrainMeta. - if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { + if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil { // Close never returns an error. colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) } diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 9051b919bb00..449da21e879c 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -335,18 +335,10 @@ func (m *Materializer) close() { if m.Closed { return } - if m.Ctx == nil { - // In some edge cases (like when Init of an operator above this - // materializer encounters a panic), the materializer might never be - // started, yet it still will attempt to close its Closers. This - // context is only used for logging purposes, so it is ok to grab - // the background context in order to prevent a NPE below. - m.Ctx = context.Background() - } // Make sure to call InternalClose() only after closing the closers - this // allows the closers to utilize the unfinished tracing span (if tracing is // enabled). - m.closers.CloseAndLogOnErr(m.Ctx, "materializer") + m.closers.CloseAndLogOnErr(m.Ctx(), "materializer") m.InternalClose() } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 881ad639c40d..3f113e38666e 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -384,18 +384,17 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) { // This cleans up all the memory and disk monitoring of the vectorized flow. f.creator.cleanup(ctx) - // TODO(yuzefovich): uncomment this once the assertion is no longer flaky. - //if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { - // // Check that all closers have been closed. Note that we don't check - // // this in case the flow was never started in the first place (it is ok - // // to not check this since closers haven't allocated any resources in - // // such a case). We also don't check when the panic injection is - // // enabled since then Close() might be legitimately not called (if a - // // panic is injected in Init() of the wrapped operator). - // if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers { - // colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed)) - // } - //} + if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { + // Check that all closers have been closed. Note that we don't check + // this in case the flow was never started in the first place (it is ok + // to not check this since closers haven't allocated any resources in + // such a case). We also don't check when the panic injection is + // enabled since then Close() might be legitimately not called (if a + // panic is injected in Init() of the wrapped operator). + if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers { + colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed)) + } + } f.tempStorage.Lock() created := f.tempStorage.path != "" diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index de0a709dbbe4..0f179a064521 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -353,10 +353,10 @@ type ProcessorBaseNoHelper struct { // has been closed. Closed bool - // Ctx and span contain the tracing state while the processor is active + // ctx and span contain the tracing state while the processor is active // (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise // used). - Ctx context.Context + ctx context.Context span *tracing.Span // origCtx is the context from which ctx was derived. InternalClose() resets // ctx to this. @@ -517,7 +517,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { // not permitted. if err != nil { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "MoveToDraining called in state %s with err: %+v", pb.State, err) @@ -547,7 +547,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { func (pb *ProcessorBaseNoHelper) DrainHelper() *execinfrapb.ProducerMetadata { if pb.State == StateRunning { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "drain helper called in StateRunning", ) @@ -658,7 +658,7 @@ func (pb *ProcessorBase) HijackExecStatsForTrace() func() *execinfrapb.Component func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { if pb.State == StateTrailingMeta || pb.State == StateExhausted { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "moveToTrailingMeta called in state: %s", pb.State, @@ -681,10 +681,10 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { } } - if buildutil.CrdbTestBuild && pb.Ctx == nil { + if buildutil.CrdbTestBuild && pb.ctx == nil { panic( errors.AssertionFailedf( - "unexpected nil ProcessorBase.Ctx when draining. Was StartInternal called?", + "unexpected nil ProcessorBase.ctx when draining. Was StartInternal called?", ), ) } @@ -708,7 +708,7 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { // should continue processing other rows, with the awareness that the processor // might have been transitioned to the draining phase. func (pb *ProcessorBase) ProcessRowHelper(row rowenc.EncDatumRow) rowenc.EncDatumRow { - outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx, row) + outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx(), row) if err != nil { pb.MoveToDraining(err) return nil @@ -730,7 +730,7 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) { panic("processor output is not set for emitting rows") } pb.self.Start(ctx) - Run(pb.Ctx, pb.self, pb.Output) + Run(pb.ctx, pb.self, pb.Output) } // ProcStateOpts contains fields used by the ProcessorBase's family of functions @@ -857,18 +857,27 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing. // so that the caller doesn't mistakenly use old ctx object. func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context { pb.origCtx = ctx - pb.Ctx = ctx + pb.ctx = ctx noSpan := pb.FlowCtx != nil && pb.FlowCtx.Cfg != nil && pb.FlowCtx.Cfg.TestingKnobs.ProcessorNoTracingSpan if !noSpan { - pb.Ctx, pb.span = ProcessorSpan(ctx, name) + pb.ctx, pb.span = ProcessorSpan(ctx, name) if pb.span != nil && pb.span.IsVerbose() { pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String())) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID))) } } - pb.evalOrigCtx = pb.EvalCtx.SetDeprecatedContext(pb.Ctx) - return pb.Ctx + pb.evalOrigCtx = pb.EvalCtx.SetDeprecatedContext(pb.ctx) + return pb.ctx +} + +// Ctx is an accessor method for ctx which is guaranteed to return non-nil +// context even if StartInternal() hasn't been called. +func (pb *ProcessorBaseNoHelper) Ctx() context.Context { + if pb.ctx == nil { + return context.Background() + } + return pb.ctx } // InternalClose helps processors implement the RowSource interface, performing @@ -897,7 +906,7 @@ func (pb *ProcessorBaseNoHelper) InternalClose() bool { pb.span = nil // Reset the context so that any incidental uses after this point do not // access the finished span. - pb.Ctx = pb.origCtx + pb.ctx = pb.origCtx pb.EvalCtx.SetDeprecatedContext(pb.evalOrigCtx) return true } diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index a12c4823faa2..dcd3c6ddfe69 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -256,7 +256,7 @@ func (p *planNodeToRowSource) execStatsForTrace() *execinfrapb.ComponentStats { // Propagate RUs from IO requests. // TODO(drewk): we should consider propagating other stats for planNode // operators. - scanStats := execstats.GetScanStats(p.Ctx, p.ExecStatsTrace) + scanStats := execstats.GetScanStats(p.Ctx(), p.ExecStatsTrace) if scanStats.ConsumedRU == 0 { return nil } diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index 4ac11602237e..5952a29c4d23 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -349,17 +349,17 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) { func (ag *hashAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") // If we have started emitting rows, bucketsIter will represent which // buckets are still open, since buckets are closed once their results are // emitted. if ag.bucketsIter == nil { for _, bucket := range ag.buckets { - bucket.close(ag.Ctx) + bucket.close(ag.Ctx()) } } else { for _, bucket := range ag.bucketsIter { - ag.buckets[bucket].close(ag.Ctx) + ag.buckets[bucket].close(ag.Ctx()) } } // Make sure to release any remaining memory under 'buckets'. @@ -368,25 +368,25 @@ func (ag *hashAggregator) close() { // buckets since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the buckets. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } func (ag *orderedAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") if ag.bucket != nil { - ag.bucket.close(ag.Ctx) + ag.bucket.close(ag.Ctx()) } // Note that we should be closing accounts only after closing the // bucket since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the bucket. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } @@ -424,7 +424,7 @@ func (ag *hashAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -461,7 +461,7 @@ func (ag *hashAggregator) accumulateRows() ( // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(len(ag.buckets))*memsize.String); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(len(ag.buckets))*memsize.String); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -493,7 +493,7 @@ func (ag *orderedAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -537,7 +537,7 @@ func (ag *orderedAggregator) accumulateRows() ( func (ag *aggregatorBase) getAggResults( bucket aggregateFuncs, ) (aggregatorState, rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - defer bucket.close(ag.Ctx) + defer bucket.close(ag.Ctx()) for i, b := range bucket { result, err := b.Result() @@ -583,16 +583,16 @@ func (ag *hashAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } // Before we create a new 'buckets' map below, we need to "release" the // already accounted for memory of the current map. - ag.bucketsAcc.Shrink(ag.Ctx, int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - ag.bucketsAcc.Shrink(ag.Ctx, int64(len(ag.buckets))*memsize.String) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(len(ag.buckets))*memsize.String) ag.bucketsIter = nil ag.buckets = make(map[string]aggregateFuncs) ag.bucketsLenGrowThreshold = hashAggregatorBucketsInitialLen @@ -659,7 +659,7 @@ func (ag *orderedAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -697,7 +697,7 @@ func (ag *hashAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -719,7 +719,7 @@ func (ag *orderedAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -784,7 +784,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( canAdd := true if a.Distinct { canAdd, err = ag.funcs[i].isDistinct( - ag.Ctx, + ag.Ctx(), &ag.datumAlloc, groupKey, firstArg, @@ -797,7 +797,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( if !canAdd { continue } - if err := bucket[i].Add(ag.Ctx, firstArg, otherArgs...); err != nil { + if err := bucket[i].Add(ag.Ctx(), firstArg, otherArgs...); err != nil { return err } } @@ -815,7 +815,7 @@ func (ag *hashAggregator) encode( // used by the aggregate functions (and accounted for accordingly), // this can lead to over-accounting which is acceptable. appendTo, err = row[colIdx].Fingerprint( - ag.Ctx, ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, + ag.Ctx(), ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, ) if err != nil { return appendTo, err @@ -841,7 +841,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { bucket, ok := ag.buckets[string(encoded)] if !ok { - s, err := ag.arena.AllocBytes(ag.Ctx, encoded) + s, err := ag.arena.AllocBytes(ag.Ctx(), encoded) if err != nil { return err } @@ -852,7 +852,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { ag.buckets[s] = bucket if len(ag.buckets) == ag.bucketsLenGrowThreshold { toAccountFor := ag.bucketsLenGrowThreshold - ag.alreadyAccountedFor - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { return err } ag.alreadyAccountedFor = ag.bucketsLenGrowThreshold @@ -952,13 +952,13 @@ func (a *aggregateFuncHolder) isDistinct( } func (ag *aggregatorBase) createAggregateFuncs() (aggregateFuncs, error) { - if err := ag.bucketsAcc.Grow(ag.Ctx, sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { return nil, err } bucket := make(aggregateFuncs, len(ag.funcs)) for i, f := range ag.funcs { agg := f.create(ag.EvalCtx, f.arguments) - if err := ag.bucketsAcc.Grow(ag.Ctx, agg.Size()); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), agg.Size()); err != nil { return nil, err } bucket[i] = agg diff --git a/pkg/sql/rowexec/countrows.go b/pkg/sql/rowexec/countrows.go index a99e2e97d385..111603f47653 100644 --- a/pkg/sql/rowexec/countrows.go +++ b/pkg/sql/rowexec/countrows.go @@ -88,7 +88,7 @@ func (ag *countAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMeta if row == nil { ret := make(rowenc.EncDatumRow, 1) ret[0] = rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(ag.count))} - rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx, ret) + rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx(), ret) // We're done as soon as we process our one output row, so we // transition into draining state. We will, however, return non-nil // error (if such occurs during rendering) separately below. diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 00f3c2ecf661..0050760e3f15 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -189,7 +189,7 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro // the references to the row (and to the newly allocated datums) // shortly, it'll likely take some time before GC reclaims that memory, // so we choose the over-accounting route to be safe. - appendTo, err = datum.Fingerprint(d.Ctx, d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) + appendTo, err = datum.Fingerprint(d.Ctx(), d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) if err != nil { return nil, err } @@ -212,8 +212,8 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro func (d *distinct) close() { if d.InternalClose() { - d.memAcc.Close(d.Ctx) - d.MemMonitor.Stop(d.Ctx) + d.memAcc.Close(d.Ctx()) + d.MemMonitor.Stop(d.Ctx()) } } @@ -258,7 +258,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { // allocated on it, which implies that UnsafeReset() is safe to call here. copy(d.lastGroupKey, row) d.haveLastGroupKey = true - if err := d.arena.UnsafeReset(d.Ctx); err != nil { + if err := d.arena.UnsafeReset(d.Ctx()); err != nil { d.MoveToDraining(err) break } @@ -282,7 +282,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { } continue } - s, err := d.arena.AllocBytes(d.Ctx, encoding) + s, err := d.arena.AllocBytes(d.Ctx(), encoding) if err != nil { d.MoveToDraining(err) break diff --git a/pkg/sql/rowexec/filterer.go b/pkg/sql/rowexec/filterer.go index 00c9bbcbc979..7d086558b8f9 100644 --- a/pkg/sql/rowexec/filterer.go +++ b/pkg/sql/rowexec/filterer.go @@ -95,7 +95,7 @@ func (f *filtererProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } // Perform the actual filtering. - passes, err := f.filter.EvalFilter(f.Ctx, row) + passes, err := f.filter.EvalFilter(f.Ctx(), row) if err != nil { f.MoveToDraining(err) break diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index 855493b12ae2..5aedd42dfab3 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -158,7 +158,7 @@ func newHashJoiner( } return h, h.hashTable.Init( - h.Ctx, + h.Ctx(), shouldMarkRightSide(h.joinType), h.rightSource.OutputTypes(), h.eqCols[rightSide], @@ -190,7 +190,7 @@ func (h *hashJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) case hjEmittingRightUnmatched: h.runningState, row, meta = h.emitRightUnmatched() default: - log.Fatalf(h.Ctx, "unsupported state: %d", h.runningState) + log.Fatalf(h.Ctx(), "unsupported state: %d", h.runningState) } if row == nil && meta == nil { @@ -236,14 +236,14 @@ func (h *hashJoiner) build() (hashJoinerState, rowenc.EncDatumRow, *execinfrapb. return hjStateUnknown, nil, h.DrainHelper() } // If hashTable is in-memory, pre-reserve the memory needed to mark. - if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx); err != nil { + if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } return hjReadingLeftSide, nil, nil } - err = h.hashTable.AddRow(h.Ctx, row) + err = h.hashTable.AddRow(h.Ctx(), row) // Regardless of the underlying row container (disk backed or in-memory // only), we cannot do anything about an error if it occurs. if err != nil { @@ -277,7 +277,7 @@ func (h *hashJoiner) readLeftSide() ( // hjEmittingRightUnmatched if unmatched rows on the right side need to // be emitted, otherwise finish. if shouldEmitUnmatchedRow(rightSide, h.joinType) { - i := h.hashTable.NewUnmarkedIterator(h.Ctx) + i := h.hashTable.NewUnmarkedIterator(h.Ctx()) i.Rewind() h.emittingRightUnmatchedState.iter = i return hjEmittingRightUnmatched, nil, nil @@ -291,14 +291,14 @@ func (h *hashJoiner) readLeftSide() ( h.probingRowState.row = row h.probingRowState.matched = false if h.probingRowState.iter == nil { - i, err := h.hashTable.NewBucketIterator(h.Ctx, row, h.eqCols[leftSide]) + i, err := h.hashTable.NewBucketIterator(h.Ctx(), row, h.eqCols[leftSide]) if err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } h.probingRowState.iter = i } else { - if err := h.probingRowState.iter.Reset(h.Ctx, row); err != nil { + if err := h.probingRowState.iter.Reset(h.Ctx(), row); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -359,7 +359,7 @@ func (h *hashJoiner) probeRow() ( h.probingRowState.matched = true shouldEmit := true if shouldMarkRightSide(h.joinType) { - if i.IsMarked(h.Ctx) { + if i.IsMarked(h.Ctx()) { switch h.joinType { case descpb.RightSemiJoin: // The row from the right already had a match and was emitted @@ -376,7 +376,7 @@ func (h *hashJoiner) probeRow() ( // whether we have a corresponding unmarked row from the right. h.probingRowState.matched = false } - } else if err := i.Mark(h.Ctx); err != nil { + } else if err := i.Mark(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -449,16 +449,16 @@ func (h *hashJoiner) emitRightUnmatched() ( func (h *hashJoiner) close() { if h.InternalClose() { - h.hashTable.Close(h.Ctx) + h.hashTable.Close(h.Ctx()) if h.probingRowState.iter != nil { h.probingRowState.iter.Close() } if h.emittingRightUnmatchedState.iter != nil { h.emittingRightUnmatchedState.iter.Close() } - h.MemMonitor.Stop(h.Ctx) + h.MemMonitor.Stop(h.Ctx()) if h.diskMonitor != nil { - h.diskMonitor.Stop(h.Ctx) + h.diskMonitor.Stop(h.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 96fdd9dd78e4..0c3bc13fefe4 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -169,7 +169,7 @@ func (ifr *invertedFilterer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case ifrEmittingRows: ifr.runningState, row, meta = ifr.emitRow() default: - log.Fatalf(ifr.Ctx, "unsupported state: %d", ifr.runningState) + log.Fatalf(ifr.Ctx(), "unsupported state: %d", ifr.runningState) } if row == nil && meta == nil { continue @@ -194,9 +194,9 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr return ifrReadingInput, meta } if row == nil { - log.VEventf(ifr.Ctx, 1, "no more input rows") + log.VEventf(ifr.Ctx(), 1, "no more input rows") evalResult := ifr.invertedEval.evaluate() - ifr.rc.SetupForRead(ifr.Ctx, evalResult) + ifr.rc.SetupForRead(ifr.Ctx(), evalResult) // invertedEval had a single expression in the batch, and the results // for that expression are in evalResult[0]. ifr.evalResult = evalResult[0] @@ -245,7 +245,7 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr // evaluator. copy(ifr.keyRow, row[:ifr.invertedColIdx]) copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:]) - keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow) + keyIndex, err := ifr.rc.AddRow(ifr.Ctx(), ifr.keyRow) if err != nil { ifr.MoveToDraining(err) return ifrStateUnknown, ifr.DrainHelper() @@ -273,11 +273,11 @@ func (ifr *invertedFilterer) emitRow() ( } if ifr.resultIdx >= len(ifr.evalResult) { // We are done emitting all rows. - return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx)) + return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx())) } curRowIdx := ifr.resultIdx ifr.resultIdx++ - keyRow, err := ifr.rc.GetRow(ifr.Ctx, ifr.evalResult[curRowIdx], false /* skip */) + keyRow, err := ifr.rc.GetRow(ifr.Ctx(), ifr.evalResult[curRowIdx], false /* skip */) if err != nil { return drainFunc(err) } @@ -301,12 +301,12 @@ func (ifr *invertedFilterer) ConsumerClosed() { func (ifr *invertedFilterer) close() { if ifr.InternalClose() { - ifr.rc.Close(ifr.Ctx) + ifr.rc.Close(ifr.Ctx()) if ifr.MemMonitor != nil { - ifr.MemMonitor.Stop(ifr.Ctx) + ifr.MemMonitor.Stop(ifr.Ctx()) } if ifr.diskMonitor != nil { - ifr.diskMonitor.Stop(ifr.Ctx) + ifr.diskMonitor.Stop(ifr.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 027203bb930a..5e3bb8484923 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -385,7 +385,7 @@ func (ij *invertedJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case ijEmittingRows: ij.runningState, row, meta = ij.emitRow() default: - log.Fatalf(ij.Ctx, "unsupported state: %d", ij.runningState) + log.Fatalf(ij.Ctx(), "unsupported state: %d", ij.runningState) } if row == nil && meta == nil { continue @@ -416,7 +416,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce break } - expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx, row) + expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx(), row) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -466,12 +466,12 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } if len(ij.inputRows) == 0 { - log.VEventf(ij.Ctx, 1, "no more input rows") + log.VEventf(ij.Ctx(), 1, "no more input rows") // We're done. ij.MoveToDraining(nil) return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "read %d input rows", len(ij.inputRows)) + log.VEventf(ij.Ctx(), 1, "read %d input rows", len(ij.inputRows)) spans, err := ij.batchedExprEval.init() if err != nil { @@ -495,9 +495,9 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "scanning %d spans", len(ij.indexSpans)) + log.VEventf(ij.Ctx(), 1, "scanning %d spans", len(ij.indexSpans)) if err = ij.fetcher.StartScan( - ij.Ctx, ij.indexSpans, nil, /* spanIDs */ + ij.Ctx(), ij.indexSpans, nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, ); err != nil { ij.MoveToDraining(err) @@ -508,11 +508,11 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.ProducerMetadata) { - log.VEventf(ij.Ctx, 1, "joining rows") + log.VEventf(ij.Ctx(), 1, "joining rows") // Read the entire set of rows that are part of the scan. for { // Fetch the next row and copy it into the row container. - fetchedRow, _, err := ij.fetcher.NextRow(ij.Ctx) + fetchedRow, _, err := ij.fetcher.NextRow(ij.Ctx()) if err != nil { ij.MoveToDraining(scrub.UnwrapScrubError(err)) return ijStateUnknown, ij.DrainHelper() @@ -557,7 +557,7 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ return ijStateUnknown, ij.DrainHelper() } if shouldAdd { - rowIdx, err := ij.indexRows.AddRow(ij.Ctx, fetchedRow) + rowIdx, err := ij.indexRows.AddRow(ij.Ctx(), fetchedRow) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -569,8 +569,8 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ } } ij.joinedRowIdx = ij.batchedExprEval.evaluate() - ij.indexRows.SetupForRead(ij.Ctx, ij.joinedRowIdx) - log.VEventf(ij.Ctx, 1, "done evaluating expressions") + ij.indexRows.SetupForRead(ij.Ctx(), ij.joinedRowIdx) + log.VEventf(ij.Ctx(), 1, "done evaluating expressions") return ijEmittingRows, nil } @@ -587,7 +587,7 @@ func (ij *invertedJoiner) emitRow() ( ) { // Finished processing the batch. if ij.emitCursor.inputRowIdx >= len(ij.joinedRowIdx) { - log.VEventf(ij.Ctx, 1, "done emitting rows") + log.VEventf(ij.Ctx(), 1, "done emitting rows") // Ready for another input batch. Reset state. ij.inputRows = ij.inputRows[:0] ij.batchedExprEval.reset() @@ -595,7 +595,7 @@ func (ij *invertedJoiner) emitRow() ( ij.emitCursor.outputRowIdx = 0 ij.emitCursor.inputRowIdx = 0 ij.emitCursor.seenMatch = false - if err := ij.indexRows.UnsafeReset(ij.Ctx); err != nil { + if err := ij.indexRows.UnsafeReset(ij.Ctx()); err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() } @@ -625,7 +625,7 @@ func (ij *invertedJoiner) emitRow() ( inputRow := ij.inputRows[ij.emitCursor.inputRowIdx] joinedRowIdx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - indexedRow, err := ij.indexRows.GetRow(ij.Ctx, joinedRowIdx, false /* skip */) + indexedRow, err := ij.indexRows.GetRow(ij.Ctx(), joinedRowIdx, false /* skip */) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() @@ -639,7 +639,7 @@ func (ij *invertedJoiner) emitRow() ( skipRemaining := func() error { for ; ij.emitCursor.outputRowIdx < len(ij.joinedRowIdx[ij.emitCursor.inputRowIdx]); ij.emitCursor.outputRowIdx++ { idx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - if _, err := ij.indexRows.GetRow(ij.Ctx, idx, true /* skip */); err != nil { + if _, err := ij.indexRows.GetRow(ij.Ctx(), idx, true /* skip */); err != nil { return err } } @@ -689,7 +689,7 @@ func (ij *invertedJoiner) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatum ij.combinedRow = append(ij.combinedRow[:0], lrow...) ij.combinedRow = append(ij.combinedRow, rrow...) if ij.onExprHelper.Expr != nil { - res, err := ij.onExprHelper.EvalFilter(ij.Ctx, ij.combinedRow) + res, err := ij.onExprHelper.EvalFilter(ij.Ctx(), ij.combinedRow) if !res || err != nil { return nil, err } @@ -740,14 +740,14 @@ func (ij *invertedJoiner) ConsumerClosed() { func (ij *invertedJoiner) close() { if ij.InternalClose() { if ij.fetcher != nil { - ij.fetcher.Close(ij.Ctx) + ij.fetcher.Close(ij.Ctx()) } if ij.indexRows != nil { - ij.indexRows.Close(ij.Ctx) + ij.indexRows.Close(ij.Ctx()) } - ij.MemMonitor.Stop(ij.Ctx) + ij.MemMonitor.Stop(ij.Ctx()) if ij.diskMonitor != nil { - ij.diskMonitor.Stop(ij.Ctx) + ij.diskMonitor.Stop(ij.Ctx()) } } } @@ -762,8 +762,8 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - ij.scanStats = execstats.GetScanStats(ij.Ctx, ij.ExecStatsTrace) - contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace) + ij.scanStats = execstats.GetScanStats(ij.Ctx(), ij.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(ij.Ctx(), ij.ExecStatsTrace) ret := execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ @@ -791,7 +791,7 @@ func (ij *invertedJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = ij.fetcher.GetBytesRead() meta.Metrics.RowsRead = ij.rowsRead - if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx, ij.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx(), ij.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinerbase.go b/pkg/sql/rowexec/joinerbase.go index 836e68e0fff1..96ce2eeefbd4 100644 --- a/pkg/sql/rowexec/joinerbase.go +++ b/pkg/sql/rowexec/joinerbase.go @@ -170,7 +170,7 @@ func (jb *joinerBase) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatumRow, } else { combinedRow = jb.combine(lrow, rrow) } - res, err := jb.onCond.EvalFilter(jb.Ctx, combinedRow) + res, err := jb.onCond.EvalFilter(jb.Ctx(), combinedRow) if !res || err != nil { return nil, err } diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 3f6b76a899f5..217a2080cc6a 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -739,7 +739,7 @@ func (jr *joinReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) meta = jr.DrainHelper() jr.runningState = jrStateUnknown default: - log.Fatalf(jr.Ctx, "unsupported state: %d", jr.runningState) + log.Fatalf(jr.Ctx(), "unsupported state: %d", jr.runningState) } if row == nil && meta == nil { continue @@ -916,7 +916,7 @@ func (jr *joinReader) readInput() ( } if len(jr.scratchInputRows) == 0 { - log.VEventf(jr.Ctx, 1, "no more input rows") + log.VEventf(jr.Ctx(), 1, "no more input rows") if outRow != nil { return jrReadyToDrain, outRow, nil } @@ -924,7 +924,7 @@ func (jr *joinReader) readInput() ( jr.MoveToDraining(nil) return jrStateUnknown, nil, jr.DrainHelper() } - log.VEventf(jr.Ctx, 1, "read %d input rows", len(jr.scratchInputRows)) + log.VEventf(jr.Ctx(), 1, "read %d input rows", len(jr.scratchInputRows)) if jr.groupingState != nil && len(jr.scratchInputRows) > 0 { jr.updateGroupingStateForNonEmptyBatch() @@ -980,7 +980,7 @@ func (jr *joinReader) readInput() ( } } - log.VEventf(jr.Ctx, 1, "scanning %d spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d spans", len(spans)) // Note that the fetcher takes ownership of the spans slice - it will modify // it and perform the memory accounting. We don't care about the // modification here, but we want to be conscious about the memory @@ -998,7 +998,7 @@ func (jr *joinReader) readInput() ( } } if err = jr.fetcher.StartScan( - jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1011,7 +1011,7 @@ func (jr *joinReader) readInput() ( func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMetadata) { for { // Fetch the next row and tell the strategy to process it. - lookedUpRow, spanID, err := jr.fetcher.NextRow(jr.Ctx) + lookedUpRow, spanID, err := jr.fetcher.NextRow(jr.Ctx()) if err != nil { jr.MoveToDraining(scrub.UnwrapScrubError(err)) return jrStateUnknown, jr.DrainHelper() @@ -1023,7 +1023,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet jr.rowsRead++ jr.curBatchRowsRead++ - if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx, lookedUpRow, spanID); err != nil { + if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx(), lookedUpRow, spanID); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() } else if nextState != jrPerformingLookup { @@ -1056,13 +1056,13 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet sortSpans(spans, spanIDs) } - log.VEventf(jr.Ctx, 1, "scanning %d remote spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans)) bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) if !jr.shouldLimitBatches { bytesLimit = rowinfra.NoBytesLimit } if err := jr.fetcher.StartScan( - jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() @@ -1071,8 +1071,8 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet } } - log.VEvent(jr.Ctx, 1, "done joining rows") - jr.strategy.prepareToEmit(jr.Ctx) + log.VEvent(jr.Ctx(), 1, "done joining rows") + jr.strategy.prepareToEmit(jr.Ctx()) // Check if the strategy spilled to disk and reduce the batch size if it // did. @@ -1103,7 +1103,7 @@ func (jr *joinReader) emitRow() ( rowenc.EncDatumRow, *execinfrapb.ProducerMetadata, ) { - rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx) + rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx()) if err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1135,26 +1135,26 @@ func (jr *joinReader) ConsumerClosed() { func (jr *joinReader) close() { if jr.InternalClose() { if jr.fetcher != nil { - jr.fetcher.Close(jr.Ctx) + jr.fetcher.Close(jr.Ctx()) } if jr.usesStreamer { - jr.streamerInfo.budgetAcc.Close(jr.Ctx) - jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx) - jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx) + jr.streamerInfo.budgetAcc.Close(jr.Ctx()) + jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx()) + jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx()) if jr.streamerInfo.diskMonitor != nil { - jr.streamerInfo.diskMonitor.Stop(jr.Ctx) + jr.streamerInfo.diskMonitor.Stop(jr.Ctx()) } } - jr.strategy.close(jr.Ctx) - jr.memAcc.Close(jr.Ctx) + jr.strategy.close(jr.Ctx()) + jr.memAcc.Close(jr.Ctx()) if jr.limitedMemMonitor != nil { - jr.limitedMemMonitor.Stop(jr.Ctx) + jr.limitedMemMonitor.Stop(jr.Ctx()) } if jr.MemMonitor != nil { - jr.MemMonitor.Stop(jr.Ctx) + jr.MemMonitor.Stop(jr.Ctx()) } if jr.diskMonitor != nil { - jr.diskMonitor.Stop(jr.Ctx) + jr.diskMonitor.Stop(jr.Ctx()) } } } @@ -1170,8 +1170,8 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats { return nil } - jr.scanStats = execstats.GetScanStats(jr.Ctx, jr.ExecStatsTrace) - contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace) + jr.scanStats = execstats.GetScanStats(jr.Ctx(), jr.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(jr.Ctx(), jr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ @@ -1207,7 +1207,7 @@ func (jr *joinReader) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.RowsRead = jr.rowsRead meta.Metrics.BytesRead = jr.fetcher.GetBytesRead() - if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx, jr.txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx(), jr.txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index 657dfdcd9861..dcb42caf786a 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -187,7 +187,7 @@ func (s *joinReaderNoOrderingStrategy) generateRemoteSpans() (roachpb.Spans, []i return nil, nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) generatedRemoteSpans() bool { @@ -200,7 +200,7 @@ func (s *joinReaderNoOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false s.emitState.unmatchedInputRowIndicesInitialized = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) processLookedUpRow( @@ -320,13 +320,13 @@ func (s *joinReaderNoOrderingStrategy) spilled() bool { return false } func (s *joinReaderNoOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderNoOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderNoOrderingStrategy) close(ctx context.Context) { @@ -405,7 +405,7 @@ func (s *joinReaderIndexJoinStrategy) processLookupRows( rows []rowenc.EncDatumRow, ) (roachpb.Spans, []int, error) { s.inputRows = rows - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderIndexJoinStrategy) processLookedUpRow( @@ -435,13 +435,13 @@ func (s *joinReaderIndexJoinStrategy) spilled() bool { func (s *joinReaderIndexJoinStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderIndexJoinStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderIndexJoinStrategy) close(ctx context.Context) { @@ -592,7 +592,7 @@ func (s *joinReaderOrderingStrategy) generateRemoteSpans() (roachpb.Spans, []int return nil, nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) generatedRemoteSpans() bool { @@ -642,7 +642,7 @@ func (s *joinReaderOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) processLookedUpRow( @@ -807,7 +807,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( lookedUpRow = s.emitCursor.notBufferedRow } else { lookedUpRow, err = s.lookedUpRows.GetRow( - s.Ctx, lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ + s.Ctx(), lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ ) } if err != nil { @@ -858,16 +858,16 @@ func (s *joinReaderOrderingStrategy) close(ctx context.Context) { func (s *joinReaderOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - if err := memAcc.Grow(s.Ctx, delta); err != nil { + if err := memAcc.Grow(s.Ctx(), delta); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } return nil } @@ -879,16 +879,16 @@ func (s *joinReaderOrderingStrategy) growMemoryAccount( func (s *joinReaderOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - if err := memAcc.Resize(s.Ctx, oldSz, newSz); err != nil { + if err := memAcc.Resize(s.Ctx(), oldSz, newSz); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } return nil } diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index 9b192fdbfbfb..4bf0d82dbe87 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -233,7 +233,7 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada // TODO(paul): Investigate (with benchmarks) whether or not it's // worthwhile to only buffer one row from the right stream per batch // for semi-joins. - m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx, m.EvalCtx) + m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx(), m.EvalCtx) if meta != nil { return nil, meta } @@ -252,8 +252,8 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada func (m *mergeJoiner) close() { if m.InternalClose() { - m.streamMerger.close(m.Ctx) - m.MemMonitor.Stop(m.Ctx) + m.streamMerger.close(m.Ctx()) + m.MemMonitor.Stop(m.Ctx()) } } diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index 883dde2419a9..87afeeb0d896 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -315,7 +315,8 @@ func TestAggregatorSpecAggregationEquals(t *testing.T) { func TestProcessorBaseContext(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() + // Use a custom context to distinguish it from the background one. + ctx := context.WithValue(context.Background(), struct{}{}, struct{}{}) st := cluster.MakeTestingClusterSettings() runTest := func(t *testing.T, f func(noop *noopProcessor)) { @@ -332,18 +333,22 @@ func TestProcessorBaseContext(t *testing.T) { if err != nil { t.Fatal(err) } + // Before Start we should get the background context. + if noop.Ctx() != context.Background() { + t.Fatalf("ProcessorBase.Ctx() didn't return the background context before Start") + } noop.Start(ctx) - origCtx := noop.Ctx + origCtx := noop.Ctx() // The context should be valid after Start but before Next is called in case // ConsumerDone or ConsumerClosed are called without calling Next. - if noop.Ctx == nil { + if noop.Ctx() == context.Background() { t.Fatalf("ProcessorBase.ctx not initialized") } f(noop) // The context should be reset after ConsumerClosed is called so that any // subsequent logging calls will not operate on closed spans. - if noop.Ctx != origCtx { + if noop.Ctx() != origCtx { t.Fatalf("ProcessorBase.ctx not reset on close") } } diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 4bd52d9f5e3c..9ca6d7310d64 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -161,7 +161,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // First, make sure to close its ValueGenerator from the previous // input row (if it exists). if ps.gens[i] != nil { - ps.gens[i].Close(ps.Ctx) + ps.gens[i].Close(ps.Ctx()) ps.gens[i] = nil } @@ -169,7 +169,7 @@ func (ps *projectSetProcessor) nextInputRow() ( ps.exprHelpers[i].Row = row ps.EvalCtx.IVarContainer = ps.exprHelpers[i] - gen, err := eval.GetGenerator(ps.Ctx, ps.EvalCtx, fn) + gen, err := eval.GetGenerator(ps.Ctx(), ps.EvalCtx, fn) if err != nil { return nil, nil, err } @@ -184,7 +184,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // Store the generator before Start so that it'll be closed even if // Start returns an error. ps.gens[i] = gen - if err := gen.Start(ps.Ctx, ps.FlowCtx.Txn); err != nil { + if err := gen.Start(ps.Ctx(), ps.FlowCtx.Txn); err != nil { return nil, nil, err } } @@ -205,7 +205,7 @@ func (ps *projectSetProcessor) nextGeneratorValues() (newValAvail bool, err erro numCols := int(ps.spec.NumColsPerGen[i]) if !ps.done[i] { // Yes; check whether this source still has some values available. - hasVals, err := gen.Next(ps.Ctx) + hasVals, err := gen.Next(ps.Ctx()) if err != nil { return false, err } @@ -237,7 +237,7 @@ func (ps *projectSetProcessor) nextGeneratorValues() (newValAvail bool, err erro // Do we still need to produce the scalar value? (first row) if !ps.done[i] { // Yes. Produce it once, then indicate it's "done". - value, err := ps.exprHelpers[i].Eval(ps.Ctx, ps.rowBuffer) + value, err := ps.exprHelpers[i].Eval(ps.Ctx(), ps.rowBuffer) if err != nil { return false, err } @@ -320,7 +320,7 @@ func (ps *projectSetProcessor) close() { // InternalClose(). for i, gen := range ps.gens { if gen != nil { - gen.Close(ps.Ctx) + gen.Close(ps.Ctx()) ps.gens[i] = nil } } diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index a144caca4d2a..3e3d4747402d 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -205,9 +205,9 @@ func (s *sampleAggregator) Run(ctx context.Context) { func (s *sampleAggregator) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.tempMemAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.tempMemAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 5e89b80a08f7..a7840585bc18 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -478,8 +478,8 @@ func (s *samplerProcessor) sampleRow( func (s *samplerProcessor) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 3472139c6148..e3588145f150 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -117,10 +117,10 @@ func (s *sorterBase) close() { if s.i != nil { s.i.Close() } - s.rows.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.rows.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) if s.diskMonitor != nil { - s.diskMonitor.Stop(s.Ctx) + s.diskMonitor.Stop(s.Ctx()) } } } @@ -250,13 +250,13 @@ func (s *sortAllProcessor) fill() (ok bool, _ error) { break } - if err := s.rows.AddRow(s.Ctx, row); err != nil { + if err := s.rows.AddRow(s.Ctx(), row); err != nil { return false, err } } - s.rows.Sort(s.Ctx) + s.rows.Sort(s.Ctx()) - s.i = s.rows.NewFinalIterator(s.Ctx) + s.i = s.rows.NewFinalIterator(s.Ctx()) s.i.Rewind() return true, nil } @@ -423,7 +423,7 @@ func newSortChunksProcessor( ); err != nil { return nil, err } - proc.i = proc.rows.NewFinalIterator(proc.Ctx) + proc.i = proc.rows.NewFinalIterator(proc.Ctx()) return proc, nil } @@ -450,7 +450,7 @@ func (s *sortChunksProcessor) chunkCompleted( // if a metadata record was encountered). The caller is expected to drain when // this returns false. func (s *sortChunksProcessor) fill() (bool, error) { - ctx := s.Ctx + ctx := s.Ctx() var meta *execinfrapb.ProducerMetadata @@ -520,7 +520,7 @@ func (s *sortChunksProcessor) Start(ctx context.Context) { // Next is part of the RowSource interface. func (s *sortChunksProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - ctx := s.Ctx + ctx := s.Ctx() for s.State == execinfra.StateRunning { ok, err := s.i.Valid() if err != nil { diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 712a76303140..1900e934d2c4 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -256,7 +256,7 @@ func TestingSetScannedRowProgressFrequency(val int64) func() { func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { for tr.State == execinfra.StateRunning { if !tr.scanStarted { - err := tr.startScan(tr.Ctx) + err := tr.startScan(tr.Ctx()) if err != nil { tr.MoveToDraining(err) break @@ -271,7 +271,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata return nil, meta } - row, _, err := tr.fetcher.NextRow(tr.Ctx) + row, _, err := tr.fetcher.NextRow(tr.Ctx()) if row == nil || err != nil { tr.MoveToDraining(err) break @@ -292,7 +292,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata func (tr *tableReader) close() { if tr.InternalClose() { if tr.fetcher != nil { - tr.fetcher.Close(tr.Ctx) + tr.fetcher.Close(tr.Ctx()) } } } @@ -308,8 +308,8 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - tr.scanStats = execstats.GetScanStats(tr.Ctx, tr.ExecStatsTrace) - contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(tr.Ctx, tr.ExecStatsTrace) + tr.scanStats = execstats.GetScanStats(tr.Ctx(), tr.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(tr.Ctx(), tr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(tr.fetcher.GetBytesRead())), @@ -331,13 +331,13 @@ func (tr *tableReader) generateMeta() []execinfrapb.ProducerMetadata { if !tr.ignoreMisplannedRanges { nodeID, ok := tr.FlowCtx.NodeID.OptionalNodeID() if ok { - ranges := execinfra.MisplannedRanges(tr.Ctx, tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) + ranges := execinfra.MisplannedRanges(tr.Ctx(), tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) if ranges != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{Ranges: ranges}) } } } - if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx, tr.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx(), tr.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index 95f52f44dff5..053b954f882e 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -225,7 +225,7 @@ func (w *windower) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { case windowerEmittingRows: w.runningState, row, meta = w.emitRow() default: - log.Fatalf(w.Ctx, "unsupported state: %d", w.runningState) + log.Fatalf(w.Ctx(), "unsupported state: %d", w.runningState) } if row == nil && meta == nil { @@ -247,16 +247,16 @@ func (w *windower) close() { if w.allRowsIterator != nil { w.allRowsIterator.Close() } - w.allRowsPartitioned.Close(w.Ctx) + w.allRowsPartitioned.Close(w.Ctx()) if w.partition != nil { - w.partition.Close(w.Ctx) + w.partition.Close(w.Ctx()) } for _, builtin := range w.builtins { - builtin.Close(w.Ctx, w.EvalCtx) + builtin.Close(w.Ctx(), w.EvalCtx) } - w.acc.Close(w.Ctx) - w.MemMonitor.Stop(w.Ctx) - w.diskMonitor.Stop(w.Ctx) + w.acc.Close(w.Ctx()) + w.MemMonitor.Stop(w.Ctx()) + w.diskMonitor.Stop(w.Ctx()) } } @@ -280,17 +280,17 @@ func (w *windower) accumulateRows() ( return windowerAccumulating, nil, meta } if row == nil { - log.VEvent(w.Ctx, 1, "accumulation complete") + log.VEvent(w.Ctx(), 1, "accumulation complete") w.inputDone = true // We need to sort all the rows based on partitionBy columns so that all // rows belonging to the same hash bucket are contiguous. - w.allRowsPartitioned.Sort(w.Ctx) + w.allRowsPartitioned.Sort(w.Ctx()) break } // The underlying row container will decode all datums as necessary, so we // don't need to worry about that. - if err := w.allRowsPartitioned.AddRow(w.Ctx, row); err != nil { + if err := w.allRowsPartitioned.AddRow(w.Ctx(), row); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -313,7 +313,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr return windowerStateUnknown, nil, w.DrainHelper() } - if err := w.computeWindowFunctions(w.Ctx, w.EvalCtx); err != nil { + if err := w.computeWindowFunctions(w.Ctx(), w.EvalCtx); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -343,7 +343,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr func (w *windower) spillAllRowsToDisk() error { if w.allRowsPartitioned != nil { if !w.allRowsPartitioned.UsingDisk() { - if err := w.allRowsPartitioned.SpillToDisk(w.Ctx); err != nil { + if err := w.allRowsPartitioned.SpillToDisk(w.Ctx()); err != nil { return err } } else { @@ -351,7 +351,7 @@ func (w *windower) spillAllRowsToDisk() error { // w.partition if possible. if w.partition != nil { if !w.partition.UsingDisk() { - if err := w.partition.SpillToDisk(w.Ctx); err != nil { + if err := w.partition.SpillToDisk(w.Ctx()); err != nil { return err } } @@ -365,12 +365,12 @@ func (w *windower) spillAllRowsToDisk() error { // error, it forces all rows to spill and attempts to grow acc by usage // one more time. func (w *windower) growMemAccount(acc *mon.BoundAccount, usage int64) error { - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { if sqlerrors.IsOutOfMemoryError(err) { if err := w.spillAllRowsToDisk(); err != nil { return err } - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { return err } } else { @@ -696,7 +696,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con } } } - if err := w.partition.AddRow(w.Ctx, row); err != nil { + if err := w.partition.AddRow(w.Ctx(), row); err != nil { return err } } @@ -710,7 +710,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con func (w *windower) populateNextOutputRow() (bool, error) { if w.partitionIdx < len(w.partitionSizes) { if w.allRowsIterator == nil { - w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx) + w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx()) w.allRowsIterator.Rewind() } // rowIdx is the index of the next row to be emitted from the diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 18a3fbe38441..1508a71d092e 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -496,9 +496,9 @@ func (z *zigzagJoiner) setupInfo( func (z *zigzagJoiner) close() { if z.InternalClose() { for i := range z.infos { - z.infos[i].fetcher.Close(z.Ctx) + z.infos[i].fetcher.Close(z.Ctx()) } - log.VEventf(z.Ctx, 2, "exiting zigzag joiner run") + log.VEventf(z.Ctx(), 2, "exiting zigzag joiner run") } } @@ -793,17 +793,17 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { curInfo := &z.infos[z.side] err := curInfo.fetcher.StartScan( - z.Ctx, + z.Ctx(), roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(z.EvalCtx.TestingKnobs.ForceProductionValues), zigzagJoinerBatchSize, ) if err != nil { - log.Errorf(z.Ctx, "scan error: %s", err) + log.Errorf(z.Ctx(), "scan error: %s", err) return err } - fetchedRow, err := z.fetchRow(z.Ctx) + fetchedRow, err := z.fetchRow(z.Ctx()) if err != nil { return scrub.UnwrapScrubError(err) } @@ -821,7 +821,7 @@ func (z *zigzagJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata z.MoveToDraining(err) break } - row, err := z.nextRow(z.Ctx) + row, err := z.nextRow(z.Ctx()) if err != nil { z.MoveToDraining(err) break @@ -846,9 +846,9 @@ func (z *zigzagJoiner) ConsumerClosed() { // execStatsForTrace implements ProcessorBase.ExecStatsForTrace. func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats { - z.scanStats = execstats.GetScanStats(z.Ctx, z.ExecStatsTrace) + z.scanStats = execstats.GetScanStats(z.Ctx(), z.ExecStatsTrace) - contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(z.Ctx, z.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(z.Ctx(), z.ExecStatsTrace) kvStats := execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(z.getBytesRead())), ContentionTime: optional.MakeTimeValue(contentionTime), @@ -902,7 +902,7 @@ func (z *zigzagJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = z.getBytesRead() meta.Metrics.RowsRead = z.getRowsRead() - if tfs := execinfra.GetLeafTxnFinalState(z.Ctx, z.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(z.Ctx(), z.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/ui/workspaces/cluster-ui/src/statementDetails/statementDetails.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/statementDetails/statementDetails.fixture.ts index 4c5f80bcbe98..94bfda564442 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementDetails/statementDetails.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/statementDetails/statementDetails.fixture.ts @@ -37,6 +37,7 @@ const statementDetailsNoData: StatementDetailsResponse = { max_retries: new Long(0), legacy_last_err: "", num_rows: { mean: 0, squared_diffs: 0 }, + idle_lat: { mean: 0, squared_diffs: 0 }, parse_lat: { mean: 0, squared_diffs: 0 }, plan_lat: { mean: 0, squared_diffs: 0 }, run_lat: { mean: 0, squared_diffs: 0 }, @@ -96,6 +97,10 @@ const statementDetailsData: StatementDetailsResponse = { mean: 6, squared_diffs: 0, }, + idle_lat: { + mean: 0.0000876, + squared_diffs: 2.35792e-8, + }, parse_lat: { mean: 0.0000876, squared_diffs: 2.35792e-8, @@ -189,6 +194,10 @@ const statementDetailsData: StatementDetailsResponse = { mean: 6, squared_diffs: 0, }, + idle_lat: { + mean: 0.00004, + squared_diffs: 0, + }, parse_lat: { mean: 0.00004, squared_diffs: 0, @@ -285,6 +294,10 @@ const statementDetailsData: StatementDetailsResponse = { mean: 6, squared_diffs: 0, }, + idle_lat: { + mean: 0.000071, + squared_diffs: 4.050000000000001e-9, + }, parse_lat: { mean: 0.000071, squared_diffs: 4.050000000000001e-9, @@ -381,6 +394,10 @@ const statementDetailsData: StatementDetailsResponse = { mean: 6, squared_diffs: 0, }, + idle_lat: { + mean: 0.000046, + squared_diffs: 0, + }, parse_lat: { mean: 0.000046, squared_diffs: 0, @@ -477,6 +494,10 @@ const statementDetailsData: StatementDetailsResponse = { mean: 6, squared_diffs: 0, }, + idle_lat: { + mean: 0.00021, + squared_diffs: 0, + }, parse_lat: { mean: 0.00021, squared_diffs: 0, @@ -575,6 +596,10 @@ const statementDetailsData: StatementDetailsResponse = { mean: 6, squared_diffs: 0, }, + idle_lat: { + mean: 0.0000876, + squared_diffs: 2.35792e-8, + }, parse_lat: { mean: 0.0000876, squared_diffs: 2.35792e-8, @@ -669,6 +694,10 @@ const statementDetailsData: StatementDetailsResponse = { mean: 6, squared_diffs: 0, }, + idle_lat: { + mean: 0.0000876, + squared_diffs: 2.35792e-8, + }, parse_lat: { mean: 0.0000876, squared_diffs: 2.35792e-8, diff --git a/pkg/ui/workspaces/cluster-ui/src/statementDetails/statementDetails.tsx b/pkg/ui/workspaces/cluster-ui/src/statementDetails/statementDetails.tsx index 081a61ada4d2..549c981e44fe 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementDetails/statementDetails.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/statementDetails/statementDetails.tsx @@ -563,7 +563,12 @@ export class StatementDetails extends React.Component< generateExecuteAndPlanningTimeseries(statsPerAggregatedTs); const executionAndPlanningOps: Partial = { axes: [{}, { label: "Time Spent" }], - series: [{}, { label: "Execution" }, { label: "Planning" }], + series: [ + {}, + { label: "Execution" }, + { label: "Planning" }, + { label: "Idle" }, + ], width: cardWidth, }; diff --git a/pkg/ui/workspaces/cluster-ui/src/statementDetails/timeseriesUtils.ts b/pkg/ui/workspaces/cluster-ui/src/statementDetails/timeseriesUtils.ts index 70b522a890e9..b61a8740273f 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementDetails/timeseriesUtils.ts +++ b/pkg/ui/workspaces/cluster-ui/src/statementDetails/timeseriesUtils.ts @@ -21,14 +21,16 @@ export function generateExecuteAndPlanningTimeseries( const ts: Array = []; const execution: Array = []; const planning: Array = []; + const idle: Array = []; stats.forEach(function (stat: statementStatisticsPerAggregatedTs) { ts.push(TimestampToNumber(stat.aggregated_ts) * 1e3); execution.push(stat.stats.run_lat.mean * 1e9); planning.push(stat.stats.plan_lat.mean * 1e9); + idle.push(stat.stats.idle_lat.mean * 1e9); }); - return [ts, execution, planning]; + return [ts, execution, planning, idle]; } export function generateRowsProcessedTimeseries(