From 165a023298034c569a3088982a3d3edb88696cc3 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 15 Nov 2022 18:43:32 -0800 Subject: [PATCH] sql: audit all processors to make their closure bullet-proof This commit replaces all usages of `ProcessorBaseNoHelper.Ctx` field with a call to the newly-introduced `Ctx()` method which returns a background context if the processor hasn't been started. This change makes it so that all processors now respect the contract of `colexecop.Closer` interface which says that `Close` must be safe to call even if `Init` hasn't been performed (in the context of processors this means that `Columnarizer.Init` wasn't called meaning that `Processor.Start` wasn't either). Initially, I attempted to fix this in #91446 by putting the protection into the columnarizer, but that led to broken assumptions since we wouldn't close all closers that we expected to (in particular, the materializer that is the input to the wrapped row-by-row processor wouldn't be closed). This commit takes a different approach and should fix the issue for good without introducing any flakiness. As a result, this commit fixes a rarely hit issue when the aggregator and the zigzag joiner attempt to log when they are closed if they haven't been started (that we see occasionally from sentry). The issue is quite rare though, so no release note seems appropriate. Release note: None --- pkg/ccl/backupccl/backup_processor.go | 2 +- pkg/ccl/backupccl/restore_data_processor.go | 4 +- .../backupccl/restore_data_processor_test.go | 9 +-- .../changefeedccl/changefeed_processors.go | 40 ++++++------- .../stream_ingestion_frontier_processor.go | 12 ++-- .../stream_ingestion_processor.go | 12 ++-- pkg/sql/colexec/columnarizer.go | 19 ++----- pkg/sql/colexec/materializer.go | 10 +--- pkg/sql/colflow/vectorized_flow.go | 23 ++++---- pkg/sql/execinfra/processorsbase.go | 37 +++++++----- pkg/sql/rowexec/aggregator.go | 56 +++++++++---------- pkg/sql/rowexec/countrows.go | 2 +- pkg/sql/rowexec/distinct.go | 10 ++-- pkg/sql/rowexec/filterer.go | 2 +- pkg/sql/rowexec/hashjoiner.go | 24 ++++---- pkg/sql/rowexec/inverted_filterer.go | 18 +++--- pkg/sql/rowexec/inverted_joiner.go | 46 +++++++-------- pkg/sql/rowexec/joinerbase.go | 2 +- pkg/sql/rowexec/joinreader.go | 50 ++++++++--------- pkg/sql/rowexec/joinreader_strategies.go | 32 +++++------ pkg/sql/rowexec/mergejoiner.go | 6 +- pkg/sql/rowexec/processors_test.go | 13 +++-- pkg/sql/rowexec/project_set.go | 12 ++-- pkg/sql/rowexec/sample_aggregator.go | 6 +- pkg/sql/rowexec/sampler.go | 4 +- pkg/sql/rowexec/sorter.go | 18 +++--- pkg/sql/rowexec/tablereader.go | 14 ++--- pkg/sql/rowexec/windower.go | 34 +++++------ pkg/sql/rowexec/zigzagjoiner.go | 18 +++--- 29 files changed, 263 insertions(+), 272 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index c1df16842904..2d56b56185a8 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -221,7 +221,7 @@ func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() bp.agg.Close() if bp.InternalClose() { - bp.memAcc.Close(bp.Ctx) + bp.memAcc.Close(bp.Ctx()) } } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 3dae7307c1ee..6e2a72678683 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -559,8 +559,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce prog.ProgressDetails = *details case meta := <-rd.metaCh: return nil, meta - case <-rd.Ctx.Done(): - rd.MoveToDraining(rd.Ctx.Err()) + case <-rd.Ctx().Done(): + rd.MoveToDraining(rd.Ctx().Err()) return nil, rd.DrainHelper() } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 0fb04d123ed2..6c3234ed7f45 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -395,8 +395,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix) - mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx, - mockRestoreDataSpec) + mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec) require.NoError(t, err) ssts := make(chan mergedSST, 1) require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) @@ -439,15 +438,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } func newTestingRestoreDataProcessor( - ctx context.Context, - evalCtx *eval.Context, - flowCtx *execinfra.FlowCtx, - spec execinfrapb.RestoreDataSpec, + evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec, ) (*restoreDataProcessor, error) { rd := &restoreDataProcessor{ ProcessorBase: execinfra.ProcessorBase{ ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{ - Ctx: ctx, EvalCtx: evalCtx, }, }, diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 101d1ecff4e9..315677b898ae 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -446,7 +446,7 @@ func (ca *changeAggregator) close() { } if ca.eventConsumer != nil { if err := ca.eventConsumer.Close(); err != nil { - log.Warningf(ca.Ctx, "error closing event consumer: %s", err) + log.Warningf(ca.Ctx(), "error closing event consumer: %s", err) } } @@ -454,11 +454,11 @@ func (ca *changeAggregator) close() { // Best effort: context is often cancel by now, so we expect to see an error _ = ca.sink.Close() } - ca.memAcc.Close(ca.Ctx) + ca.memAcc.Close(ca.Ctx()) if ca.kvFeedMemMon != nil { - ca.kvFeedMemMon.Stop(ca.Ctx) + ca.kvFeedMemMon.Stop(ca.Ctx()) } - ca.MemMonitor.Stop(ca.Ctx) + ca.MemMonitor.Stop(ca.Ctx()) ca.InternalClose() } @@ -501,7 +501,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // kvFeed, sends off this event to the event consumer, and flushes the sink // if necessary. func (ca *changeAggregator) tick() error { - event, err := ca.eventProducer.Get(ca.Ctx) + event, err := ca.eventProducer.Get(ca.Ctx()) if err != nil { return err } @@ -516,16 +516,16 @@ func (ca *changeAggregator) tick() error { ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds()) } ca.recentKVCount++ - return ca.eventConsumer.ConsumeEvent(ca.Ctx, event) + return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event) case kvevent.TypeResolved: a := event.DetachAlloc() - a.Release(ca.Ctx) + a.Release(ca.Ctx()) resolved := event.Resolved() if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: - return ca.sink.Flush(ca.Ctx) + return ca.sink.Flush(ca.Ctx()) } return nil @@ -568,7 +568,7 @@ func (ca *changeAggregator) flushFrontier() error { // otherwise, we could lose buffered messages and violate the // at-least-once guarantee. This is also true for checkpointing the // resolved spans in the job progress. - if err := ca.sink.Flush(ca.Ctx); err != nil { + if err := ca.sink.Flush(ca.Ctx()); err != nil { return err } @@ -996,8 +996,8 @@ func (cf *changeFrontier) close() { // Best effort: context is often cancel by now, so we expect to see an error _ = cf.sink.Close() } - cf.memAcc.Close(cf.Ctx) - cf.MemMonitor.Stop(cf.Ctx) + cf.memAcc.Close(cf.Ctx()) + cf.MemMonitor.Stop(cf.Ctx()) } } @@ -1104,7 +1104,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error { // job progress update closure, but it currently doesn't pass along the info // we'd need to do it that way. if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) { - logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV, + logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV, `got a span level timestamp %s for %s that is less than the initial high-water %s`, redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart)) continue @@ -1206,7 +1206,7 @@ func (cf *changeFrontier) maybeCheckpointJob( if err != nil { return false, err } - cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) + cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart)) return updated, nil } @@ -1224,7 +1224,7 @@ func (cf *changeFrontier) checkpointJobProgress( var updateSkipped error if cf.js.job != nil { - if err := cf.js.job.Update(cf.Ctx, nil, func( + if err := cf.js.job.Update(cf.Ctx(), nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { // If we're unable to update the job due to the job state, such as during @@ -1249,8 +1249,8 @@ func (cf *changeFrontier) checkpointJobProgress( if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { timestampManager = cf.deprecatedManageProtectedTimestamps } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { - log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) + if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil { + log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err) return err } @@ -1278,7 +1278,7 @@ func (cf *changeFrontier) checkpointJobProgress( } if updateSkipped != nil { - log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped) + log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped) return false, nil } @@ -1377,7 +1377,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { if !shouldEmit { return nil } - if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil { + if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil { return err } cf.lastEmitResolved = newResolved.GoTime() @@ -1416,13 +1416,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) { description = fmt.Sprintf("job %d", cf.spec.JobID) } if frontierChanged && cf.slowLogEveryN.ShouldProcess(now) { - log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s", + log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s", description, frontier, resolvedBehind) } if cf.slowLogEveryN.ShouldProcess(now) { s := cf.frontier.PeekFrontierSpan() - log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind) + log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind) } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 5807bc10f2b5..2609c40d40b4 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -298,13 +298,13 @@ func (sf *streamIngestionFrontier) Next() ( if err := sf.maybeUpdatePartitionProgress(); err != nil { // Updating the partition progress isn't a fatal error. - log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err) + log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err) } // Send back a row to the job so that it can update the progress. select { - case <-sf.Ctx.Done(): - sf.MoveToDraining(sf.Ctx.Err()) + case <-sf.Ctx().Done(): + sf.MoveToDraining(sf.Ctx().Err()) return nil, sf.DrainHelper() // Send the latest persisted highwater in the heartbeat to the source cluster // as even with retries we will never request an earlier row than it, and @@ -315,7 +315,7 @@ func (sf *streamIngestionFrontier) Next() ( case <-sf.heartbeatSender.stoppedChan: err := sf.heartbeatSender.wait() if err != nil { - log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err) + log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err) } sf.MoveToDraining(err) return nil, sf.DrainHelper() @@ -326,7 +326,7 @@ func (sf *streamIngestionFrontier) Next() ( func (sf *streamIngestionFrontier) close() { if err := sf.heartbeatSender.stop(); err != nil { - log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err) + log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err) } if sf.InternalClose() { sf.metrics.RunningCount.Dec(1) @@ -392,7 +392,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps( // maybeUpdatePartitionProgress polls the frontier and updates the job progress with // partition-specific information to track the status of each partition. func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { - ctx := sf.Ctx + ctx := sf.Ctx() updateFreq := JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV) if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq { return nil diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 852914bf54e6..22aba9cbca3c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -405,10 +405,10 @@ func (sip *streamIngestionProcessor) close() { } for _, client := range sip.streamPartitionClients { - _ = client.Close(sip.Ctx) + _ = client.Close(sip.Ctx()) } if sip.batcher != nil { - sip.batcher.Close(sip.Ctx) + sip.batcher.Close(sip.Ctx()) } if sip.maxFlushRateTimer != nil { sip.maxFlushRateTimer.Stop() @@ -539,7 +539,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil { - if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil { + if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx()); err != nil { return nil, err } } @@ -603,7 +603,7 @@ func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) er // careful with checkpoints: we can only send checkpoint whose TS >= SST batch TS // after the full SST gets ingested. - _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst") + _, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-sst") defer sp.Finish() return streamingccl.ScanSST(sst, sst.Span, func(keyVal storage.MVCCKeyValue) error { @@ -653,7 +653,7 @@ func (sip *streamIngestionProcessor) bufferDelRange(delRange *roachpb.RangeFeedD func (sip *streamIngestionProcessor) bufferRangeKeyVal( rangeKeyVal storage.MVCCRangeKeyValue, ) error { - _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-range-key") + _, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-range-key") defer sp.Finish() var err error @@ -791,7 +791,7 @@ func (r *rangeKeyBatcher) reset() { } func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { - ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush") + ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush") defer sp.Finish() flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)} diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 09c18bf4ca8f..8f19b084dc14 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -58,6 +58,7 @@ type Columnarizer struct { colexecop.NonExplainable mode columnarizerMode + initialized bool helper colmem.SetAccountingHelper metadataAllocator *colmem.Allocator input execinfra.RowSource @@ -177,13 +178,10 @@ func newColumnarizer( // Init is part of the colexecop.Operator interface. func (c *Columnarizer) Init(ctx context.Context) { - if c.removedFromFlow { - return - } - if c.Ctx != nil { - // Init has already been called. + if c.removedFromFlow || c.initialized { return } + c.initialized = true c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1) ctx = c.StartInternal(ctx, "columnarizer" /* name */) c.input.Start(ctx) @@ -276,7 +274,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { // When this method returns, we no longer will have the reference to the // metadata, so we can release all memory from the metadata allocator. defer c.metadataAllocator.ReleaseAll() - if c.Ctx == nil { + if !c.initialized { // The columnarizer wasn't initialized, so the wrapped processors might // not have been started leaving them in an unsafe to drain state, so // we skip the draining. Mostly likely this happened because a panic was @@ -300,13 +298,6 @@ func (c *Columnarizer) Close(context.Context) error { return nil } c.helper.Release() - if c.Ctx == nil { - // The columnarizer wasn't initialized, so the wrapped processors might - // not have been started leaving them in a state unsafe for the - // InternalClose, so we skip that. Mostly likely this happened because a - // panic was encountered in Init. - return nil - } c.InternalClose() return nil } @@ -314,7 +305,7 @@ func (c *Columnarizer) Close(context.Context) error { func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata { // Close will call InternalClose(). Note that we don't return any trailing // metadata here because the columnarizers propagate it in DrainMeta. - if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { + if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil { // Close never returns an error. colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) } diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 9051b919bb00..449da21e879c 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -335,18 +335,10 @@ func (m *Materializer) close() { if m.Closed { return } - if m.Ctx == nil { - // In some edge cases (like when Init of an operator above this - // materializer encounters a panic), the materializer might never be - // started, yet it still will attempt to close its Closers. This - // context is only used for logging purposes, so it is ok to grab - // the background context in order to prevent a NPE below. - m.Ctx = context.Background() - } // Make sure to call InternalClose() only after closing the closers - this // allows the closers to utilize the unfinished tracing span (if tracing is // enabled). - m.closers.CloseAndLogOnErr(m.Ctx, "materializer") + m.closers.CloseAndLogOnErr(m.Ctx(), "materializer") m.InternalClose() } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index ec5aa8289076..3a8e7dc0ed21 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -384,18 +384,17 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) { // This cleans up all the memory and disk monitoring of the vectorized flow. f.creator.cleanup(ctx) - // TODO(yuzefovich): uncomment this once the assertion is no longer flaky. - //if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { - // // Check that all closers have been closed. Note that we don't check - // // this in case the flow was never started in the first place (it is ok - // // to not check this since closers haven't allocated any resources in - // // such a case). We also don't check when the panic injection is - // // enabled since then Close() might be legitimately not called (if a - // // panic is injected in Init() of the wrapped operator). - // if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers { - // colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed)) - // } - //} + if buildutil.CrdbTestBuild && f.FlowBase.Started() && !f.FlowCtx.EvalCtx.SessionData().TestingVectorizeInjectPanics { + // Check that all closers have been closed. Note that we don't check + // this in case the flow was never started in the first place (it is ok + // to not check this since closers haven't allocated any resources in + // such a case). We also don't check when the panic injection is + // enabled since then Close() might be legitimately not called (if a + // panic is injected in Init() of the wrapped operator). + if numClosed := atomic.LoadInt32(f.testingInfo.numClosed); numClosed != f.testingInfo.numClosers { + colexecerror.InternalError(errors.AssertionFailedf("expected %d components to be closed, but found that only %d were", f.testingInfo.numClosers, numClosed)) + } + } f.tempStorage.Lock() created := f.tempStorage.path != "" diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index de0a709dbbe4..0f179a064521 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -353,10 +353,10 @@ type ProcessorBaseNoHelper struct { // has been closed. Closed bool - // Ctx and span contain the tracing state while the processor is active + // ctx and span contain the tracing state while the processor is active // (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise // used). - Ctx context.Context + ctx context.Context span *tracing.Span // origCtx is the context from which ctx was derived. InternalClose() resets // ctx to this. @@ -517,7 +517,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { // not permitted. if err != nil { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "MoveToDraining called in state %s with err: %+v", pb.State, err) @@ -547,7 +547,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { func (pb *ProcessorBaseNoHelper) DrainHelper() *execinfrapb.ProducerMetadata { if pb.State == StateRunning { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "drain helper called in StateRunning", ) @@ -658,7 +658,7 @@ func (pb *ProcessorBase) HijackExecStatsForTrace() func() *execinfrapb.Component func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { if pb.State == StateTrailingMeta || pb.State == StateExhausted { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "moveToTrailingMeta called in state: %s", pb.State, @@ -681,10 +681,10 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { } } - if buildutil.CrdbTestBuild && pb.Ctx == nil { + if buildutil.CrdbTestBuild && pb.ctx == nil { panic( errors.AssertionFailedf( - "unexpected nil ProcessorBase.Ctx when draining. Was StartInternal called?", + "unexpected nil ProcessorBase.ctx when draining. Was StartInternal called?", ), ) } @@ -708,7 +708,7 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { // should continue processing other rows, with the awareness that the processor // might have been transitioned to the draining phase. func (pb *ProcessorBase) ProcessRowHelper(row rowenc.EncDatumRow) rowenc.EncDatumRow { - outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx, row) + outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx(), row) if err != nil { pb.MoveToDraining(err) return nil @@ -730,7 +730,7 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) { panic("processor output is not set for emitting rows") } pb.self.Start(ctx) - Run(pb.Ctx, pb.self, pb.Output) + Run(pb.ctx, pb.self, pb.Output) } // ProcStateOpts contains fields used by the ProcessorBase's family of functions @@ -857,18 +857,27 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing. // so that the caller doesn't mistakenly use old ctx object. func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context { pb.origCtx = ctx - pb.Ctx = ctx + pb.ctx = ctx noSpan := pb.FlowCtx != nil && pb.FlowCtx.Cfg != nil && pb.FlowCtx.Cfg.TestingKnobs.ProcessorNoTracingSpan if !noSpan { - pb.Ctx, pb.span = ProcessorSpan(ctx, name) + pb.ctx, pb.span = ProcessorSpan(ctx, name) if pb.span != nil && pb.span.IsVerbose() { pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String())) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID))) } } - pb.evalOrigCtx = pb.EvalCtx.SetDeprecatedContext(pb.Ctx) - return pb.Ctx + pb.evalOrigCtx = pb.EvalCtx.SetDeprecatedContext(pb.ctx) + return pb.ctx +} + +// Ctx is an accessor method for ctx which is guaranteed to return non-nil +// context even if StartInternal() hasn't been called. +func (pb *ProcessorBaseNoHelper) Ctx() context.Context { + if pb.ctx == nil { + return context.Background() + } + return pb.ctx } // InternalClose helps processors implement the RowSource interface, performing @@ -897,7 +906,7 @@ func (pb *ProcessorBaseNoHelper) InternalClose() bool { pb.span = nil // Reset the context so that any incidental uses after this point do not // access the finished span. - pb.Ctx = pb.origCtx + pb.ctx = pb.origCtx pb.EvalCtx.SetDeprecatedContext(pb.evalOrigCtx) return true } diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index 4ac11602237e..5952a29c4d23 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -349,17 +349,17 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) { func (ag *hashAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") // If we have started emitting rows, bucketsIter will represent which // buckets are still open, since buckets are closed once their results are // emitted. if ag.bucketsIter == nil { for _, bucket := range ag.buckets { - bucket.close(ag.Ctx) + bucket.close(ag.Ctx()) } } else { for _, bucket := range ag.bucketsIter { - ag.buckets[bucket].close(ag.Ctx) + ag.buckets[bucket].close(ag.Ctx()) } } // Make sure to release any remaining memory under 'buckets'. @@ -368,25 +368,25 @@ func (ag *hashAggregator) close() { // buckets since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the buckets. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } func (ag *orderedAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") if ag.bucket != nil { - ag.bucket.close(ag.Ctx) + ag.bucket.close(ag.Ctx()) } // Note that we should be closing accounts only after closing the // bucket since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the bucket. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } @@ -424,7 +424,7 @@ func (ag *hashAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -461,7 +461,7 @@ func (ag *hashAggregator) accumulateRows() ( // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(len(ag.buckets))*memsize.String); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(len(ag.buckets))*memsize.String); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -493,7 +493,7 @@ func (ag *orderedAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -537,7 +537,7 @@ func (ag *orderedAggregator) accumulateRows() ( func (ag *aggregatorBase) getAggResults( bucket aggregateFuncs, ) (aggregatorState, rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - defer bucket.close(ag.Ctx) + defer bucket.close(ag.Ctx()) for i, b := range bucket { result, err := b.Result() @@ -583,16 +583,16 @@ func (ag *hashAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } // Before we create a new 'buckets' map below, we need to "release" the // already accounted for memory of the current map. - ag.bucketsAcc.Shrink(ag.Ctx, int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - ag.bucketsAcc.Shrink(ag.Ctx, int64(len(ag.buckets))*memsize.String) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(len(ag.buckets))*memsize.String) ag.bucketsIter = nil ag.buckets = make(map[string]aggregateFuncs) ag.bucketsLenGrowThreshold = hashAggregatorBucketsInitialLen @@ -659,7 +659,7 @@ func (ag *orderedAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -697,7 +697,7 @@ func (ag *hashAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -719,7 +719,7 @@ func (ag *orderedAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -784,7 +784,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( canAdd := true if a.Distinct { canAdd, err = ag.funcs[i].isDistinct( - ag.Ctx, + ag.Ctx(), &ag.datumAlloc, groupKey, firstArg, @@ -797,7 +797,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( if !canAdd { continue } - if err := bucket[i].Add(ag.Ctx, firstArg, otherArgs...); err != nil { + if err := bucket[i].Add(ag.Ctx(), firstArg, otherArgs...); err != nil { return err } } @@ -815,7 +815,7 @@ func (ag *hashAggregator) encode( // used by the aggregate functions (and accounted for accordingly), // this can lead to over-accounting which is acceptable. appendTo, err = row[colIdx].Fingerprint( - ag.Ctx, ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, + ag.Ctx(), ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, ) if err != nil { return appendTo, err @@ -841,7 +841,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { bucket, ok := ag.buckets[string(encoded)] if !ok { - s, err := ag.arena.AllocBytes(ag.Ctx, encoded) + s, err := ag.arena.AllocBytes(ag.Ctx(), encoded) if err != nil { return err } @@ -852,7 +852,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { ag.buckets[s] = bucket if len(ag.buckets) == ag.bucketsLenGrowThreshold { toAccountFor := ag.bucketsLenGrowThreshold - ag.alreadyAccountedFor - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { return err } ag.alreadyAccountedFor = ag.bucketsLenGrowThreshold @@ -952,13 +952,13 @@ func (a *aggregateFuncHolder) isDistinct( } func (ag *aggregatorBase) createAggregateFuncs() (aggregateFuncs, error) { - if err := ag.bucketsAcc.Grow(ag.Ctx, sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { return nil, err } bucket := make(aggregateFuncs, len(ag.funcs)) for i, f := range ag.funcs { agg := f.create(ag.EvalCtx, f.arguments) - if err := ag.bucketsAcc.Grow(ag.Ctx, agg.Size()); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), agg.Size()); err != nil { return nil, err } bucket[i] = agg diff --git a/pkg/sql/rowexec/countrows.go b/pkg/sql/rowexec/countrows.go index a99e2e97d385..111603f47653 100644 --- a/pkg/sql/rowexec/countrows.go +++ b/pkg/sql/rowexec/countrows.go @@ -88,7 +88,7 @@ func (ag *countAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMeta if row == nil { ret := make(rowenc.EncDatumRow, 1) ret[0] = rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(ag.count))} - rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx, ret) + rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx(), ret) // We're done as soon as we process our one output row, so we // transition into draining state. We will, however, return non-nil // error (if such occurs during rendering) separately below. diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 00f3c2ecf661..0050760e3f15 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -189,7 +189,7 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro // the references to the row (and to the newly allocated datums) // shortly, it'll likely take some time before GC reclaims that memory, // so we choose the over-accounting route to be safe. - appendTo, err = datum.Fingerprint(d.Ctx, d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) + appendTo, err = datum.Fingerprint(d.Ctx(), d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) if err != nil { return nil, err } @@ -212,8 +212,8 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro func (d *distinct) close() { if d.InternalClose() { - d.memAcc.Close(d.Ctx) - d.MemMonitor.Stop(d.Ctx) + d.memAcc.Close(d.Ctx()) + d.MemMonitor.Stop(d.Ctx()) } } @@ -258,7 +258,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { // allocated on it, which implies that UnsafeReset() is safe to call here. copy(d.lastGroupKey, row) d.haveLastGroupKey = true - if err := d.arena.UnsafeReset(d.Ctx); err != nil { + if err := d.arena.UnsafeReset(d.Ctx()); err != nil { d.MoveToDraining(err) break } @@ -282,7 +282,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { } continue } - s, err := d.arena.AllocBytes(d.Ctx, encoding) + s, err := d.arena.AllocBytes(d.Ctx(), encoding) if err != nil { d.MoveToDraining(err) break diff --git a/pkg/sql/rowexec/filterer.go b/pkg/sql/rowexec/filterer.go index 00c9bbcbc979..7d086558b8f9 100644 --- a/pkg/sql/rowexec/filterer.go +++ b/pkg/sql/rowexec/filterer.go @@ -95,7 +95,7 @@ func (f *filtererProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet } // Perform the actual filtering. - passes, err := f.filter.EvalFilter(f.Ctx, row) + passes, err := f.filter.EvalFilter(f.Ctx(), row) if err != nil { f.MoveToDraining(err) break diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index 855493b12ae2..5aedd42dfab3 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -158,7 +158,7 @@ func newHashJoiner( } return h, h.hashTable.Init( - h.Ctx, + h.Ctx(), shouldMarkRightSide(h.joinType), h.rightSource.OutputTypes(), h.eqCols[rightSide], @@ -190,7 +190,7 @@ func (h *hashJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) case hjEmittingRightUnmatched: h.runningState, row, meta = h.emitRightUnmatched() default: - log.Fatalf(h.Ctx, "unsupported state: %d", h.runningState) + log.Fatalf(h.Ctx(), "unsupported state: %d", h.runningState) } if row == nil && meta == nil { @@ -236,14 +236,14 @@ func (h *hashJoiner) build() (hashJoinerState, rowenc.EncDatumRow, *execinfrapb. return hjStateUnknown, nil, h.DrainHelper() } // If hashTable is in-memory, pre-reserve the memory needed to mark. - if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx); err != nil { + if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } return hjReadingLeftSide, nil, nil } - err = h.hashTable.AddRow(h.Ctx, row) + err = h.hashTable.AddRow(h.Ctx(), row) // Regardless of the underlying row container (disk backed or in-memory // only), we cannot do anything about an error if it occurs. if err != nil { @@ -277,7 +277,7 @@ func (h *hashJoiner) readLeftSide() ( // hjEmittingRightUnmatched if unmatched rows on the right side need to // be emitted, otherwise finish. if shouldEmitUnmatchedRow(rightSide, h.joinType) { - i := h.hashTable.NewUnmarkedIterator(h.Ctx) + i := h.hashTable.NewUnmarkedIterator(h.Ctx()) i.Rewind() h.emittingRightUnmatchedState.iter = i return hjEmittingRightUnmatched, nil, nil @@ -291,14 +291,14 @@ func (h *hashJoiner) readLeftSide() ( h.probingRowState.row = row h.probingRowState.matched = false if h.probingRowState.iter == nil { - i, err := h.hashTable.NewBucketIterator(h.Ctx, row, h.eqCols[leftSide]) + i, err := h.hashTable.NewBucketIterator(h.Ctx(), row, h.eqCols[leftSide]) if err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } h.probingRowState.iter = i } else { - if err := h.probingRowState.iter.Reset(h.Ctx, row); err != nil { + if err := h.probingRowState.iter.Reset(h.Ctx(), row); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -359,7 +359,7 @@ func (h *hashJoiner) probeRow() ( h.probingRowState.matched = true shouldEmit := true if shouldMarkRightSide(h.joinType) { - if i.IsMarked(h.Ctx) { + if i.IsMarked(h.Ctx()) { switch h.joinType { case descpb.RightSemiJoin: // The row from the right already had a match and was emitted @@ -376,7 +376,7 @@ func (h *hashJoiner) probeRow() ( // whether we have a corresponding unmarked row from the right. h.probingRowState.matched = false } - } else if err := i.Mark(h.Ctx); err != nil { + } else if err := i.Mark(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -449,16 +449,16 @@ func (h *hashJoiner) emitRightUnmatched() ( func (h *hashJoiner) close() { if h.InternalClose() { - h.hashTable.Close(h.Ctx) + h.hashTable.Close(h.Ctx()) if h.probingRowState.iter != nil { h.probingRowState.iter.Close() } if h.emittingRightUnmatchedState.iter != nil { h.emittingRightUnmatchedState.iter.Close() } - h.MemMonitor.Stop(h.Ctx) + h.MemMonitor.Stop(h.Ctx()) if h.diskMonitor != nil { - h.diskMonitor.Stop(h.Ctx) + h.diskMonitor.Stop(h.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 96fdd9dd78e4..0c3bc13fefe4 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -169,7 +169,7 @@ func (ifr *invertedFilterer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case ifrEmittingRows: ifr.runningState, row, meta = ifr.emitRow() default: - log.Fatalf(ifr.Ctx, "unsupported state: %d", ifr.runningState) + log.Fatalf(ifr.Ctx(), "unsupported state: %d", ifr.runningState) } if row == nil && meta == nil { continue @@ -194,9 +194,9 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr return ifrReadingInput, meta } if row == nil { - log.VEventf(ifr.Ctx, 1, "no more input rows") + log.VEventf(ifr.Ctx(), 1, "no more input rows") evalResult := ifr.invertedEval.evaluate() - ifr.rc.SetupForRead(ifr.Ctx, evalResult) + ifr.rc.SetupForRead(ifr.Ctx(), evalResult) // invertedEval had a single expression in the batch, and the results // for that expression are in evalResult[0]. ifr.evalResult = evalResult[0] @@ -245,7 +245,7 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr // evaluator. copy(ifr.keyRow, row[:ifr.invertedColIdx]) copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:]) - keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow) + keyIndex, err := ifr.rc.AddRow(ifr.Ctx(), ifr.keyRow) if err != nil { ifr.MoveToDraining(err) return ifrStateUnknown, ifr.DrainHelper() @@ -273,11 +273,11 @@ func (ifr *invertedFilterer) emitRow() ( } if ifr.resultIdx >= len(ifr.evalResult) { // We are done emitting all rows. - return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx)) + return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx())) } curRowIdx := ifr.resultIdx ifr.resultIdx++ - keyRow, err := ifr.rc.GetRow(ifr.Ctx, ifr.evalResult[curRowIdx], false /* skip */) + keyRow, err := ifr.rc.GetRow(ifr.Ctx(), ifr.evalResult[curRowIdx], false /* skip */) if err != nil { return drainFunc(err) } @@ -301,12 +301,12 @@ func (ifr *invertedFilterer) ConsumerClosed() { func (ifr *invertedFilterer) close() { if ifr.InternalClose() { - ifr.rc.Close(ifr.Ctx) + ifr.rc.Close(ifr.Ctx()) if ifr.MemMonitor != nil { - ifr.MemMonitor.Stop(ifr.Ctx) + ifr.MemMonitor.Stop(ifr.Ctx()) } if ifr.diskMonitor != nil { - ifr.diskMonitor.Stop(ifr.Ctx) + ifr.diskMonitor.Stop(ifr.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 3eee3eec2ff8..dc825c00bf73 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -385,7 +385,7 @@ func (ij *invertedJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case ijEmittingRows: ij.runningState, row, meta = ij.emitRow() default: - log.Fatalf(ij.Ctx, "unsupported state: %d", ij.runningState) + log.Fatalf(ij.Ctx(), "unsupported state: %d", ij.runningState) } if row == nil && meta == nil { continue @@ -416,7 +416,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce break } - expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx, row) + expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx(), row) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -466,12 +466,12 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } if len(ij.inputRows) == 0 { - log.VEventf(ij.Ctx, 1, "no more input rows") + log.VEventf(ij.Ctx(), 1, "no more input rows") // We're done. ij.MoveToDraining(nil) return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "read %d input rows", len(ij.inputRows)) + log.VEventf(ij.Ctx(), 1, "read %d input rows", len(ij.inputRows)) spans, err := ij.batchedExprEval.init() if err != nil { @@ -495,9 +495,9 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "scanning %d spans", len(ij.indexSpans)) + log.VEventf(ij.Ctx(), 1, "scanning %d spans", len(ij.indexSpans)) if err = ij.fetcher.StartScan( - ij.Ctx, ij.indexSpans, nil, /* spanIDs */ + ij.Ctx(), ij.indexSpans, nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, ); err != nil { ij.MoveToDraining(err) @@ -508,11 +508,11 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.ProducerMetadata) { - log.VEventf(ij.Ctx, 1, "joining rows") + log.VEventf(ij.Ctx(), 1, "joining rows") // Read the entire set of rows that are part of the scan. for { // Fetch the next row and copy it into the row container. - fetchedRow, _, err := ij.fetcher.NextRow(ij.Ctx) + fetchedRow, _, err := ij.fetcher.NextRow(ij.Ctx()) if err != nil { ij.MoveToDraining(scrub.UnwrapScrubError(err)) return ijStateUnknown, ij.DrainHelper() @@ -557,7 +557,7 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ return ijStateUnknown, ij.DrainHelper() } if shouldAdd { - rowIdx, err := ij.indexRows.AddRow(ij.Ctx, fetchedRow) + rowIdx, err := ij.indexRows.AddRow(ij.Ctx(), fetchedRow) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -569,8 +569,8 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ } } ij.joinedRowIdx = ij.batchedExprEval.evaluate() - ij.indexRows.SetupForRead(ij.Ctx, ij.joinedRowIdx) - log.VEventf(ij.Ctx, 1, "done evaluating expressions") + ij.indexRows.SetupForRead(ij.Ctx(), ij.joinedRowIdx) + log.VEventf(ij.Ctx(), 1, "done evaluating expressions") return ijEmittingRows, nil } @@ -587,7 +587,7 @@ func (ij *invertedJoiner) emitRow() ( ) { // Finished processing the batch. if ij.emitCursor.inputRowIdx >= len(ij.joinedRowIdx) { - log.VEventf(ij.Ctx, 1, "done emitting rows") + log.VEventf(ij.Ctx(), 1, "done emitting rows") // Ready for another input batch. Reset state. ij.inputRows = ij.inputRows[:0] ij.batchedExprEval.reset() @@ -595,7 +595,7 @@ func (ij *invertedJoiner) emitRow() ( ij.emitCursor.outputRowIdx = 0 ij.emitCursor.inputRowIdx = 0 ij.emitCursor.seenMatch = false - if err := ij.indexRows.UnsafeReset(ij.Ctx); err != nil { + if err := ij.indexRows.UnsafeReset(ij.Ctx()); err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() } @@ -625,7 +625,7 @@ func (ij *invertedJoiner) emitRow() ( inputRow := ij.inputRows[ij.emitCursor.inputRowIdx] joinedRowIdx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - indexedRow, err := ij.indexRows.GetRow(ij.Ctx, joinedRowIdx, false /* skip */) + indexedRow, err := ij.indexRows.GetRow(ij.Ctx(), joinedRowIdx, false /* skip */) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() @@ -639,7 +639,7 @@ func (ij *invertedJoiner) emitRow() ( skipRemaining := func() error { for ; ij.emitCursor.outputRowIdx < len(ij.joinedRowIdx[ij.emitCursor.inputRowIdx]); ij.emitCursor.outputRowIdx++ { idx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - if _, err := ij.indexRows.GetRow(ij.Ctx, idx, true /* skip */); err != nil { + if _, err := ij.indexRows.GetRow(ij.Ctx(), idx, true /* skip */); err != nil { return err } } @@ -689,7 +689,7 @@ func (ij *invertedJoiner) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatum ij.combinedRow = append(ij.combinedRow[:0], lrow...) ij.combinedRow = append(ij.combinedRow, rrow...) if ij.onExprHelper.Expr != nil { - res, err := ij.onExprHelper.EvalFilter(ij.Ctx, ij.combinedRow) + res, err := ij.onExprHelper.EvalFilter(ij.Ctx(), ij.combinedRow) if !res || err != nil { return nil, err } @@ -740,14 +740,14 @@ func (ij *invertedJoiner) ConsumerClosed() { func (ij *invertedJoiner) close() { if ij.InternalClose() { if ij.fetcher != nil { - ij.fetcher.Close(ij.Ctx) + ij.fetcher.Close(ij.Ctx()) } if ij.indexRows != nil { - ij.indexRows.Close(ij.Ctx) + ij.indexRows.Close(ij.Ctx()) } - ij.MemMonitor.Stop(ij.Ctx) + ij.MemMonitor.Stop(ij.Ctx()) if ij.diskMonitor != nil { - ij.diskMonitor.Stop(ij.Ctx) + ij.diskMonitor.Stop(ij.Ctx()) } } } @@ -762,8 +762,8 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - ij.scanStats = execstats.GetScanStats(ij.Ctx, ij.ExecStatsTrace) - contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace) + ij.scanStats = execstats.GetScanStats(ij.Ctx(), ij.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(ij.Ctx(), ij.ExecStatsTrace) ret := execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ @@ -790,7 +790,7 @@ func (ij *invertedJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = ij.fetcher.GetBytesRead() meta.Metrics.RowsRead = ij.rowsRead - if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx, ij.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx(), ij.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinerbase.go b/pkg/sql/rowexec/joinerbase.go index 836e68e0fff1..96ce2eeefbd4 100644 --- a/pkg/sql/rowexec/joinerbase.go +++ b/pkg/sql/rowexec/joinerbase.go @@ -170,7 +170,7 @@ func (jb *joinerBase) render(lrow, rrow rowenc.EncDatumRow) (rowenc.EncDatumRow, } else { combinedRow = jb.combine(lrow, rrow) } - res, err := jb.onCond.EvalFilter(jb.Ctx, combinedRow) + res, err := jb.onCond.EvalFilter(jb.Ctx(), combinedRow) if !res || err != nil { return nil, err } diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index cad624d9c0de..e8e31e053f32 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -739,7 +739,7 @@ func (jr *joinReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) meta = jr.DrainHelper() jr.runningState = jrStateUnknown default: - log.Fatalf(jr.Ctx, "unsupported state: %d", jr.runningState) + log.Fatalf(jr.Ctx(), "unsupported state: %d", jr.runningState) } if row == nil && meta == nil { continue @@ -916,7 +916,7 @@ func (jr *joinReader) readInput() ( } if len(jr.scratchInputRows) == 0 { - log.VEventf(jr.Ctx, 1, "no more input rows") + log.VEventf(jr.Ctx(), 1, "no more input rows") if outRow != nil { return jrReadyToDrain, outRow, nil } @@ -924,7 +924,7 @@ func (jr *joinReader) readInput() ( jr.MoveToDraining(nil) return jrStateUnknown, nil, jr.DrainHelper() } - log.VEventf(jr.Ctx, 1, "read %d input rows", len(jr.scratchInputRows)) + log.VEventf(jr.Ctx(), 1, "read %d input rows", len(jr.scratchInputRows)) if jr.groupingState != nil && len(jr.scratchInputRows) > 0 { jr.updateGroupingStateForNonEmptyBatch() @@ -980,7 +980,7 @@ func (jr *joinReader) readInput() ( } } - log.VEventf(jr.Ctx, 1, "scanning %d spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d spans", len(spans)) // Note that the fetcher takes ownership of the spans slice - it will modify // it and perform the memory accounting. We don't care about the // modification here, but we want to be conscious about the memory @@ -998,7 +998,7 @@ func (jr *joinReader) readInput() ( } } if err = jr.fetcher.StartScan( - jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1011,7 +1011,7 @@ func (jr *joinReader) readInput() ( func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMetadata) { for { // Fetch the next row and tell the strategy to process it. - lookedUpRow, spanID, err := jr.fetcher.NextRow(jr.Ctx) + lookedUpRow, spanID, err := jr.fetcher.NextRow(jr.Ctx()) if err != nil { jr.MoveToDraining(scrub.UnwrapScrubError(err)) return jrStateUnknown, jr.DrainHelper() @@ -1023,7 +1023,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet jr.rowsRead++ jr.curBatchRowsRead++ - if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx, lookedUpRow, spanID); err != nil { + if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx(), lookedUpRow, spanID); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() } else if nextState != jrPerformingLookup { @@ -1056,13 +1056,13 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet sortSpans(spans, spanIDs) } - log.VEventf(jr.Ctx, 1, "scanning %d remote spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans)) bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) if !jr.shouldLimitBatches { bytesLimit = rowinfra.NoBytesLimit } if err := jr.fetcher.StartScan( - jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() @@ -1071,8 +1071,8 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet } } - log.VEvent(jr.Ctx, 1, "done joining rows") - jr.strategy.prepareToEmit(jr.Ctx) + log.VEvent(jr.Ctx(), 1, "done joining rows") + jr.strategy.prepareToEmit(jr.Ctx()) // Check if the strategy spilled to disk and reduce the batch size if it // did. @@ -1103,7 +1103,7 @@ func (jr *joinReader) emitRow() ( rowenc.EncDatumRow, *execinfrapb.ProducerMetadata, ) { - rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx) + rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx()) if err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1135,26 +1135,26 @@ func (jr *joinReader) ConsumerClosed() { func (jr *joinReader) close() { if jr.InternalClose() { if jr.fetcher != nil { - jr.fetcher.Close(jr.Ctx) + jr.fetcher.Close(jr.Ctx()) } if jr.usesStreamer { - jr.streamerInfo.budgetAcc.Close(jr.Ctx) - jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx) - jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx) + jr.streamerInfo.budgetAcc.Close(jr.Ctx()) + jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx()) + jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx()) if jr.streamerInfo.diskMonitor != nil { - jr.streamerInfo.diskMonitor.Stop(jr.Ctx) + jr.streamerInfo.diskMonitor.Stop(jr.Ctx()) } } - jr.strategy.close(jr.Ctx) - jr.memAcc.Close(jr.Ctx) + jr.strategy.close(jr.Ctx()) + jr.memAcc.Close(jr.Ctx()) if jr.limitedMemMonitor != nil { - jr.limitedMemMonitor.Stop(jr.Ctx) + jr.limitedMemMonitor.Stop(jr.Ctx()) } if jr.MemMonitor != nil { - jr.MemMonitor.Stop(jr.Ctx) + jr.MemMonitor.Stop(jr.Ctx()) } if jr.diskMonitor != nil { - jr.diskMonitor.Stop(jr.Ctx) + jr.diskMonitor.Stop(jr.Ctx()) } } } @@ -1170,8 +1170,8 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats { return nil } - jr.scanStats = execstats.GetScanStats(jr.Ctx, jr.ExecStatsTrace) - contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace) + jr.scanStats = execstats.GetScanStats(jr.Ctx(), jr.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(jr.Ctx(), jr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ @@ -1206,7 +1206,7 @@ func (jr *joinReader) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.RowsRead = jr.rowsRead meta.Metrics.BytesRead = jr.fetcher.GetBytesRead() - if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx, jr.txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx(), jr.txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index 657dfdcd9861..dcb42caf786a 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -187,7 +187,7 @@ func (s *joinReaderNoOrderingStrategy) generateRemoteSpans() (roachpb.Spans, []i return nil, nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) generatedRemoteSpans() bool { @@ -200,7 +200,7 @@ func (s *joinReaderNoOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false s.emitState.unmatchedInputRowIndicesInitialized = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) processLookedUpRow( @@ -320,13 +320,13 @@ func (s *joinReaderNoOrderingStrategy) spilled() bool { return false } func (s *joinReaderNoOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderNoOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderNoOrderingStrategy) close(ctx context.Context) { @@ -405,7 +405,7 @@ func (s *joinReaderIndexJoinStrategy) processLookupRows( rows []rowenc.EncDatumRow, ) (roachpb.Spans, []int, error) { s.inputRows = rows - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderIndexJoinStrategy) processLookedUpRow( @@ -435,13 +435,13 @@ func (s *joinReaderIndexJoinStrategy) spilled() bool { func (s *joinReaderIndexJoinStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderIndexJoinStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderIndexJoinStrategy) close(ctx context.Context) { @@ -592,7 +592,7 @@ func (s *joinReaderOrderingStrategy) generateRemoteSpans() (roachpb.Spans, []int return nil, nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) generatedRemoteSpans() bool { @@ -642,7 +642,7 @@ func (s *joinReaderOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) processLookedUpRow( @@ -807,7 +807,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( lookedUpRow = s.emitCursor.notBufferedRow } else { lookedUpRow, err = s.lookedUpRows.GetRow( - s.Ctx, lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ + s.Ctx(), lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ ) } if err != nil { @@ -858,16 +858,16 @@ func (s *joinReaderOrderingStrategy) close(ctx context.Context) { func (s *joinReaderOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - if err := memAcc.Grow(s.Ctx, delta); err != nil { + if err := memAcc.Grow(s.Ctx(), delta); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } return nil } @@ -879,16 +879,16 @@ func (s *joinReaderOrderingStrategy) growMemoryAccount( func (s *joinReaderOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - if err := memAcc.Resize(s.Ctx, oldSz, newSz); err != nil { + if err := memAcc.Resize(s.Ctx(), oldSz, newSz); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } return nil } diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index 9b192fdbfbfb..4bf0d82dbe87 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -233,7 +233,7 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada // TODO(paul): Investigate (with benchmarks) whether or not it's // worthwhile to only buffer one row from the right stream per batch // for semi-joins. - m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx, m.EvalCtx) + m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx(), m.EvalCtx) if meta != nil { return nil, meta } @@ -252,8 +252,8 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada func (m *mergeJoiner) close() { if m.InternalClose() { - m.streamMerger.close(m.Ctx) - m.MemMonitor.Stop(m.Ctx) + m.streamMerger.close(m.Ctx()) + m.MemMonitor.Stop(m.Ctx()) } } diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index 883dde2419a9..87afeeb0d896 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -315,7 +315,8 @@ func TestAggregatorSpecAggregationEquals(t *testing.T) { func TestProcessorBaseContext(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() + // Use a custom context to distinguish it from the background one. + ctx := context.WithValue(context.Background(), struct{}{}, struct{}{}) st := cluster.MakeTestingClusterSettings() runTest := func(t *testing.T, f func(noop *noopProcessor)) { @@ -332,18 +333,22 @@ func TestProcessorBaseContext(t *testing.T) { if err != nil { t.Fatal(err) } + // Before Start we should get the background context. + if noop.Ctx() != context.Background() { + t.Fatalf("ProcessorBase.Ctx() didn't return the background context before Start") + } noop.Start(ctx) - origCtx := noop.Ctx + origCtx := noop.Ctx() // The context should be valid after Start but before Next is called in case // ConsumerDone or ConsumerClosed are called without calling Next. - if noop.Ctx == nil { + if noop.Ctx() == context.Background() { t.Fatalf("ProcessorBase.ctx not initialized") } f(noop) // The context should be reset after ConsumerClosed is called so that any // subsequent logging calls will not operate on closed spans. - if noop.Ctx != origCtx { + if noop.Ctx() != origCtx { t.Fatalf("ProcessorBase.ctx not reset on close") } } diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 4bd52d9f5e3c..9ca6d7310d64 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -161,7 +161,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // First, make sure to close its ValueGenerator from the previous // input row (if it exists). if ps.gens[i] != nil { - ps.gens[i].Close(ps.Ctx) + ps.gens[i].Close(ps.Ctx()) ps.gens[i] = nil } @@ -169,7 +169,7 @@ func (ps *projectSetProcessor) nextInputRow() ( ps.exprHelpers[i].Row = row ps.EvalCtx.IVarContainer = ps.exprHelpers[i] - gen, err := eval.GetGenerator(ps.Ctx, ps.EvalCtx, fn) + gen, err := eval.GetGenerator(ps.Ctx(), ps.EvalCtx, fn) if err != nil { return nil, nil, err } @@ -184,7 +184,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // Store the generator before Start so that it'll be closed even if // Start returns an error. ps.gens[i] = gen - if err := gen.Start(ps.Ctx, ps.FlowCtx.Txn); err != nil { + if err := gen.Start(ps.Ctx(), ps.FlowCtx.Txn); err != nil { return nil, nil, err } } @@ -205,7 +205,7 @@ func (ps *projectSetProcessor) nextGeneratorValues() (newValAvail bool, err erro numCols := int(ps.spec.NumColsPerGen[i]) if !ps.done[i] { // Yes; check whether this source still has some values available. - hasVals, err := gen.Next(ps.Ctx) + hasVals, err := gen.Next(ps.Ctx()) if err != nil { return false, err } @@ -237,7 +237,7 @@ func (ps *projectSetProcessor) nextGeneratorValues() (newValAvail bool, err erro // Do we still need to produce the scalar value? (first row) if !ps.done[i] { // Yes. Produce it once, then indicate it's "done". - value, err := ps.exprHelpers[i].Eval(ps.Ctx, ps.rowBuffer) + value, err := ps.exprHelpers[i].Eval(ps.Ctx(), ps.rowBuffer) if err != nil { return false, err } @@ -320,7 +320,7 @@ func (ps *projectSetProcessor) close() { // InternalClose(). for i, gen := range ps.gens { if gen != nil { - gen.Close(ps.Ctx) + gen.Close(ps.Ctx()) ps.gens[i] = nil } } diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index f597be7ce925..7f8bd51615ba 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -205,9 +205,9 @@ func (s *sampleAggregator) Run(ctx context.Context) { func (s *sampleAggregator) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.tempMemAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.tempMemAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 5e89b80a08f7..a7840585bc18 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -478,8 +478,8 @@ func (s *samplerProcessor) sampleRow( func (s *samplerProcessor) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 3472139c6148..e3588145f150 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -117,10 +117,10 @@ func (s *sorterBase) close() { if s.i != nil { s.i.Close() } - s.rows.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.rows.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) if s.diskMonitor != nil { - s.diskMonitor.Stop(s.Ctx) + s.diskMonitor.Stop(s.Ctx()) } } } @@ -250,13 +250,13 @@ func (s *sortAllProcessor) fill() (ok bool, _ error) { break } - if err := s.rows.AddRow(s.Ctx, row); err != nil { + if err := s.rows.AddRow(s.Ctx(), row); err != nil { return false, err } } - s.rows.Sort(s.Ctx) + s.rows.Sort(s.Ctx()) - s.i = s.rows.NewFinalIterator(s.Ctx) + s.i = s.rows.NewFinalIterator(s.Ctx()) s.i.Rewind() return true, nil } @@ -423,7 +423,7 @@ func newSortChunksProcessor( ); err != nil { return nil, err } - proc.i = proc.rows.NewFinalIterator(proc.Ctx) + proc.i = proc.rows.NewFinalIterator(proc.Ctx()) return proc, nil } @@ -450,7 +450,7 @@ func (s *sortChunksProcessor) chunkCompleted( // if a metadata record was encountered). The caller is expected to drain when // this returns false. func (s *sortChunksProcessor) fill() (bool, error) { - ctx := s.Ctx + ctx := s.Ctx() var meta *execinfrapb.ProducerMetadata @@ -520,7 +520,7 @@ func (s *sortChunksProcessor) Start(ctx context.Context) { // Next is part of the RowSource interface. func (s *sortChunksProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - ctx := s.Ctx + ctx := s.Ctx() for s.State == execinfra.StateRunning { ok, err := s.i.Valid() if err != nil { diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 3df02374f832..4c43a2ac9abd 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -256,7 +256,7 @@ func TestingSetScannedRowProgressFrequency(val int64) func() { func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { for tr.State == execinfra.StateRunning { if !tr.scanStarted { - err := tr.startScan(tr.Ctx) + err := tr.startScan(tr.Ctx()) if err != nil { tr.MoveToDraining(err) break @@ -271,7 +271,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata return nil, meta } - row, _, err := tr.fetcher.NextRow(tr.Ctx) + row, _, err := tr.fetcher.NextRow(tr.Ctx()) if row == nil || err != nil { tr.MoveToDraining(err) break @@ -292,7 +292,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata func (tr *tableReader) close() { if tr.InternalClose() { if tr.fetcher != nil { - tr.fetcher.Close(tr.Ctx) + tr.fetcher.Close(tr.Ctx()) } } } @@ -308,8 +308,8 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - tr.scanStats = execstats.GetScanStats(tr.Ctx, tr.ExecStatsTrace) - contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(tr.Ctx, tr.ExecStatsTrace) + tr.scanStats = execstats.GetScanStats(tr.Ctx(), tr.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(tr.Ctx(), tr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(tr.fetcher.GetBytesRead())), @@ -330,13 +330,13 @@ func (tr *tableReader) generateMeta() []execinfrapb.ProducerMetadata { if !tr.ignoreMisplannedRanges { nodeID, ok := tr.FlowCtx.NodeID.OptionalNodeID() if ok { - ranges := execinfra.MisplannedRanges(tr.Ctx, tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) + ranges := execinfra.MisplannedRanges(tr.Ctx(), tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) if ranges != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{Ranges: ranges}) } } } - if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx, tr.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx(), tr.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index 95f52f44dff5..053b954f882e 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -225,7 +225,7 @@ func (w *windower) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { case windowerEmittingRows: w.runningState, row, meta = w.emitRow() default: - log.Fatalf(w.Ctx, "unsupported state: %d", w.runningState) + log.Fatalf(w.Ctx(), "unsupported state: %d", w.runningState) } if row == nil && meta == nil { @@ -247,16 +247,16 @@ func (w *windower) close() { if w.allRowsIterator != nil { w.allRowsIterator.Close() } - w.allRowsPartitioned.Close(w.Ctx) + w.allRowsPartitioned.Close(w.Ctx()) if w.partition != nil { - w.partition.Close(w.Ctx) + w.partition.Close(w.Ctx()) } for _, builtin := range w.builtins { - builtin.Close(w.Ctx, w.EvalCtx) + builtin.Close(w.Ctx(), w.EvalCtx) } - w.acc.Close(w.Ctx) - w.MemMonitor.Stop(w.Ctx) - w.diskMonitor.Stop(w.Ctx) + w.acc.Close(w.Ctx()) + w.MemMonitor.Stop(w.Ctx()) + w.diskMonitor.Stop(w.Ctx()) } } @@ -280,17 +280,17 @@ func (w *windower) accumulateRows() ( return windowerAccumulating, nil, meta } if row == nil { - log.VEvent(w.Ctx, 1, "accumulation complete") + log.VEvent(w.Ctx(), 1, "accumulation complete") w.inputDone = true // We need to sort all the rows based on partitionBy columns so that all // rows belonging to the same hash bucket are contiguous. - w.allRowsPartitioned.Sort(w.Ctx) + w.allRowsPartitioned.Sort(w.Ctx()) break } // The underlying row container will decode all datums as necessary, so we // don't need to worry about that. - if err := w.allRowsPartitioned.AddRow(w.Ctx, row); err != nil { + if err := w.allRowsPartitioned.AddRow(w.Ctx(), row); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -313,7 +313,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr return windowerStateUnknown, nil, w.DrainHelper() } - if err := w.computeWindowFunctions(w.Ctx, w.EvalCtx); err != nil { + if err := w.computeWindowFunctions(w.Ctx(), w.EvalCtx); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -343,7 +343,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr func (w *windower) spillAllRowsToDisk() error { if w.allRowsPartitioned != nil { if !w.allRowsPartitioned.UsingDisk() { - if err := w.allRowsPartitioned.SpillToDisk(w.Ctx); err != nil { + if err := w.allRowsPartitioned.SpillToDisk(w.Ctx()); err != nil { return err } } else { @@ -351,7 +351,7 @@ func (w *windower) spillAllRowsToDisk() error { // w.partition if possible. if w.partition != nil { if !w.partition.UsingDisk() { - if err := w.partition.SpillToDisk(w.Ctx); err != nil { + if err := w.partition.SpillToDisk(w.Ctx()); err != nil { return err } } @@ -365,12 +365,12 @@ func (w *windower) spillAllRowsToDisk() error { // error, it forces all rows to spill and attempts to grow acc by usage // one more time. func (w *windower) growMemAccount(acc *mon.BoundAccount, usage int64) error { - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { if sqlerrors.IsOutOfMemoryError(err) { if err := w.spillAllRowsToDisk(); err != nil { return err } - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { return err } } else { @@ -696,7 +696,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con } } } - if err := w.partition.AddRow(w.Ctx, row); err != nil { + if err := w.partition.AddRow(w.Ctx(), row); err != nil { return err } } @@ -710,7 +710,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con func (w *windower) populateNextOutputRow() (bool, error) { if w.partitionIdx < len(w.partitionSizes) { if w.allRowsIterator == nil { - w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx) + w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx()) w.allRowsIterator.Rewind() } // rowIdx is the index of the next row to be emitted from the diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index cb82fb6050dc..e231d524c62d 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -496,9 +496,9 @@ func (z *zigzagJoiner) setupInfo( func (z *zigzagJoiner) close() { if z.InternalClose() { for i := range z.infos { - z.infos[i].fetcher.Close(z.Ctx) + z.infos[i].fetcher.Close(z.Ctx()) } - log.VEventf(z.Ctx, 2, "exiting zigzag joiner run") + log.VEventf(z.Ctx(), 2, "exiting zigzag joiner run") } } @@ -793,17 +793,17 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { curInfo := &z.infos[z.side] err := curInfo.fetcher.StartScan( - z.Ctx, + z.Ctx(), roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(z.EvalCtx.TestingKnobs.ForceProductionValues), zigzagJoinerBatchSize, ) if err != nil { - log.Errorf(z.Ctx, "scan error: %s", err) + log.Errorf(z.Ctx(), "scan error: %s", err) return err } - fetchedRow, err := z.fetchRow(z.Ctx) + fetchedRow, err := z.fetchRow(z.Ctx()) if err != nil { return scrub.UnwrapScrubError(err) } @@ -821,7 +821,7 @@ func (z *zigzagJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata z.MoveToDraining(err) break } - row, err := z.nextRow(z.Ctx) + row, err := z.nextRow(z.Ctx()) if err != nil { z.MoveToDraining(err) break @@ -846,9 +846,9 @@ func (z *zigzagJoiner) ConsumerClosed() { // execStatsForTrace implements ProcessorBase.ExecStatsForTrace. func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats { - z.scanStats = execstats.GetScanStats(z.Ctx, z.ExecStatsTrace) + z.scanStats = execstats.GetScanStats(z.Ctx(), z.ExecStatsTrace) - contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(z.Ctx, z.ExecStatsTrace) + contentionTime, contentionEvents := execstats.GetCumulativeContentionTime(z.Ctx(), z.ExecStatsTrace) kvStats := execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(z.getBytesRead())), ContentionTime: optional.MakeTimeValue(contentionTime), @@ -900,7 +900,7 @@ func (z *zigzagJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = z.getBytesRead() meta.Metrics.RowsRead = z.getRowsRead() - if tfs := execinfra.GetLeafTxnFinalState(z.Ctx, z.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(z.Ctx(), z.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta