From 40e57d1759ef11dcd76c73b5d358ecb9abc6fadf Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 15 Nov 2022 18:43:32 -0800 Subject: [PATCH] sql: audit all processors to make their closure bullet-proof This commit replaces all usages of `ProcessorBaseNoHelper.Ctx` field with a call to the newly-introduced `Ctx()` method which returns a background context if the processor hasn't been started. This change makes it so that all processors now respect the contract of `colexecop.Closer` interface which says that `Close` must be safe to call even if `Init` hasn't been performed (in the context of processors this means that `Columnarizer.Init` wasn't called meaning that `Processor.Start` wasn't either). Initially, I attempted to fix this in #91446 by putting the protection into the columnarizer, but that led to broken assumptions since we wouldn't close all closers that we expected to (in particular, the materializer that is the input to the wrapped row-by-row processor wouldn't be closed). This commit takes a different approach and should fix the issue for good without introducing any flakiness. As a result, this commit fixes a rarely hit issue when the aggregator and the zigzag joiner attempt to log when they are closed if they haven't been started (that we see occasionally from sentry). The issue is quite rare though, so no release note seems appropriate. Release note: None --- pkg/ccl/backupccl/backup_processor.go | 2 +- pkg/ccl/backupccl/restore_data_processor.go | 4 +- .../backupccl/restore_data_processor_test.go | 9 +-- .../changefeedccl/changefeed_processors.go | 44 +++++++-------- .../stream_ingestion_frontier_processor.go | 12 ++-- .../stream_ingestion_processor.go | 12 ++-- pkg/sql/colexec/columnarizer.go | 12 ++-- pkg/sql/colexec/materializer.go | 10 +--- pkg/sql/execinfra/processorsbase.go | 37 +++++++----- pkg/sql/rowexec/aggregator.go | 56 +++++++++---------- pkg/sql/rowexec/countrows.go | 2 +- pkg/sql/rowexec/distinct.go | 10 ++-- pkg/sql/rowexec/hashjoiner.go | 24 ++++---- pkg/sql/rowexec/inverted_filterer.go | 18 +++--- pkg/sql/rowexec/inverted_joiner.go | 44 +++++++-------- pkg/sql/rowexec/joinreader.go | 50 ++++++++--------- pkg/sql/rowexec/joinreader_strategies.go | 32 +++++------ pkg/sql/rowexec/mergejoiner.go | 6 +- pkg/sql/rowexec/processors_test.go | 13 +++-- pkg/sql/rowexec/project_set.go | 8 +-- pkg/sql/rowexec/sample_aggregator.go | 6 +- pkg/sql/rowexec/sampler.go | 4 +- pkg/sql/rowexec/sorter.go | 18 +++--- pkg/sql/rowexec/tablereader.go | 14 ++--- pkg/sql/rowexec/windower.go | 34 +++++------ pkg/sql/rowexec/zigzagjoiner.go | 18 +++--- 26 files changed, 249 insertions(+), 250 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 20dbec394963..20924052bb22 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -211,7 +211,7 @@ func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() bp.agg.Close() if bp.InternalClose() { - bp.memAcc.Close(bp.Ctx) + bp.memAcc.Close(bp.Ctx()) } } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 35c76fea4645..808b2fc6f82b 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -549,8 +549,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce prog.ProgressDetails = *details case meta := <-rd.metaCh: return nil, meta - case <-rd.Ctx.Done(): - rd.MoveToDraining(rd.Ctx.Err()) + case <-rd.Ctx().Done(): + rd.MoveToDraining(rd.Ctx().Err()) return nil, rd.DrainHelper() } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 903d37ac51d7..f54e1cdd6c32 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -383,8 +383,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix) - mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx, - mockRestoreDataSpec) + mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec) require.NoError(t, err) ssts := make(chan mergedSST, 1) require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) @@ -427,15 +426,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } func newTestingRestoreDataProcessor( - ctx context.Context, - evalCtx *eval.Context, - flowCtx *execinfra.FlowCtx, - spec execinfrapb.RestoreDataSpec, + evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec, ) (*restoreDataProcessor, error) { rd := &restoreDataProcessor{ ProcessorBase: execinfra.ProcessorBase{ ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{ - Ctx: ctx, EvalCtx: evalCtx, }, }, diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index d447a7a6d523..c2d327162c8b 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -448,20 +448,20 @@ func (ca *changeAggregator) close() { } if ca.eventConsumer != nil { if err := ca.eventConsumer.Close(); err != nil { - log.Warningf(ca.Ctx, "error closing event consumer: %s", err) + log.Warningf(ca.Ctx(), "error closing event consumer: %s", err) } } if ca.sink != nil { if err := ca.sink.Close(); err != nil { - log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) + log.Warningf(ca.Ctx(), `error closing sink. goroutines may have leaked: %v`, err) } } - ca.memAcc.Close(ca.Ctx) + ca.memAcc.Close(ca.Ctx()) if ca.kvFeedMemMon != nil { - ca.kvFeedMemMon.Stop(ca.Ctx) + ca.kvFeedMemMon.Stop(ca.Ctx()) } - ca.MemMonitor.Stop(ca.Ctx) + ca.MemMonitor.Stop(ca.Ctx()) ca.InternalClose() } @@ -505,7 +505,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // kvFeed, sends off this event to the event consumer, and flushes the sink // if necessary. func (ca *changeAggregator) tick() error { - event, err := ca.eventProducer.Get(ca.Ctx) + event, err := ca.eventProducer.Get(ca.Ctx()) if err != nil { return err } @@ -520,16 +520,16 @@ func (ca *changeAggregator) tick() error { ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds()) } ca.recentKVCount++ - return ca.eventConsumer.ConsumeEvent(ca.Ctx, event) + return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event) case kvevent.TypeResolved: a := event.DetachAlloc() - a.Release(ca.Ctx) + a.Release(ca.Ctx()) resolved := event.Resolved() if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: - return ca.sink.Flush(ca.Ctx) + return ca.sink.Flush(ca.Ctx()) } return nil @@ -572,7 +572,7 @@ func (ca *changeAggregator) flushFrontier() error { // otherwise, we could lose buffered messages and violate the // at-least-once guarantee. This is also true for checkpointing the // resolved spans in the job progress. - if err := ca.sink.Flush(ca.Ctx); err != nil { + if err := ca.sink.Flush(ca.Ctx()); err != nil { return err } @@ -1000,11 +1000,11 @@ func (cf *changeFrontier) close() { } if cf.sink != nil { if err := cf.sink.Close(); err != nil { - log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err) + log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err) } } - cf.memAcc.Close(cf.Ctx) - cf.MemMonitor.Stop(cf.Ctx) + cf.memAcc.Close(cf.Ctx()) + cf.MemMonitor.Stop(cf.Ctx()) } } @@ -1108,7 +1108,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error { // job progress update closure, but it currently doesn't pass along the info // we'd need to do it that way. if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) { - logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV, + logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV, `got a span level timestamp %s for %s that is less than the initial high-water %s`, redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart)) continue @@ -1210,7 +1210,7 @@ func (cf *changeFrontier) maybeCheckpointJob( if err != nil { return false, err } - cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) + cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart)) return updated, nil } @@ -1228,7 +1228,7 @@ func (cf *changeFrontier) checkpointJobProgress( var updateSkipped error if cf.js.job != nil { - if err := cf.js.job.Update(cf.Ctx, nil, func( + if err := cf.js.job.Update(cf.Ctx(), nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { // If we're unable to update the job due to the job state, such as during @@ -1253,8 +1253,8 @@ func (cf *changeFrontier) checkpointJobProgress( if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { timestampManager = cf.deprecatedManageProtectedTimestamps } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { - log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) + if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil { + log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err) return err } @@ -1282,7 +1282,7 @@ func (cf *changeFrontier) checkpointJobProgress( } if updateSkipped != nil { - log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped) + log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped) return false, nil } @@ -1381,7 +1381,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { if !shouldEmit { return nil } - if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil { + if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil { return err } cf.lastEmitResolved = newResolved.GoTime() @@ -1420,13 +1420,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) { description = fmt.Sprintf("job %d", cf.spec.JobID) } if frontierChanged && cf.slowLogEveryN.ShouldProcess(now) { - log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s", + log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s", description, frontier, resolvedBehind) } if cf.slowLogEveryN.ShouldProcess(now) { s := cf.frontier.PeekFrontierSpan() - log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind) + log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind) } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 9ba472fe4e5e..e6a949bebcbe 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -297,13 +297,13 @@ func (sf *streamIngestionFrontier) Next() ( if err := sf.maybeUpdatePartitionProgress(); err != nil { // Updating the partition progress isn't a fatal error. - log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err) + log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err) } // Send back a row to the job so that it can update the progress. select { - case <-sf.Ctx.Done(): - sf.MoveToDraining(sf.Ctx.Err()) + case <-sf.Ctx().Done(): + sf.MoveToDraining(sf.Ctx().Err()) return nil, sf.DrainHelper() // Send the latest persisted highwater in the heartbeat to the source cluster // as even with retries we will never request an earlier row than it, and @@ -314,7 +314,7 @@ func (sf *streamIngestionFrontier) Next() ( case <-sf.heartbeatSender.stoppedChan: err := sf.heartbeatSender.wait() if err != nil { - log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err) + log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err) } sf.MoveToDraining(err) return nil, sf.DrainHelper() @@ -325,7 +325,7 @@ func (sf *streamIngestionFrontier) Next() ( func (sf *streamIngestionFrontier) close() { if err := sf.heartbeatSender.stop(); err != nil { - log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err) + log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err) } if sf.InternalClose() { sf.metrics.RunningCount.Dec(1) @@ -391,7 +391,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps( // maybeUpdatePartitionProgress polls the frontier and updates the job progress with // partition-specific information to track the status of each partition. func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { - ctx := sf.Ctx + ctx := sf.Ctx() updateFreq := JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV) if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq { return nil diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 5f399d2b1ee1..dc85085807b1 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -404,10 +404,10 @@ func (sip *streamIngestionProcessor) close() { } for _, client := range sip.streamPartitionClients { - _ = client.Close(sip.Ctx) + _ = client.Close(sip.Ctx()) } if sip.batcher != nil { - sip.batcher.Close(sip.Ctx) + sip.batcher.Close(sip.Ctx()) } if sip.maxFlushRateTimer != nil { sip.maxFlushRateTimer.Stop() @@ -538,7 +538,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil { - if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil { + if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx()); err != nil { return nil, err } } @@ -602,7 +602,7 @@ func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) er // careful with checkpoints: we can only send checkpoint whose TS >= SST batch TS // after the full SST gets ingested. - _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst") + _, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-sst") defer sp.Finish() return streamingccl.ScanSST(sst, sst.Span, func(keyVal storage.MVCCKeyValue) error { @@ -652,7 +652,7 @@ func (sip *streamIngestionProcessor) bufferDelRange(delRange *roachpb.RangeFeedD func (sip *streamIngestionProcessor) bufferRangeKeyVal( rangeKeyVal storage.MVCCRangeKeyValue, ) error { - _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-range-key") + _, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-range-key") defer sp.Finish() var err error @@ -790,7 +790,7 @@ func (r *rangeKeyBatcher) reset() { } func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { - ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush") + ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush") defer sp.Finish() flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)} diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 1db4bc8099be..d578edfac314 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -58,6 +58,7 @@ type Columnarizer struct { colexecop.NonExplainable mode columnarizerMode + initialized bool helper colmem.SetAccountingHelper metadataAllocator *colmem.Allocator input execinfra.RowSource @@ -171,13 +172,10 @@ func newColumnarizer( // Init is part of the colexecop.Operator interface. func (c *Columnarizer) Init(ctx context.Context) { - if c.removedFromFlow { - return - } - if c.Ctx != nil { - // Init has already been called. + if c.removedFromFlow || c.initialized { return } + c.initialized = true c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1) ctx = c.StartInternalNoSpan(ctx) c.input.Start(ctx) @@ -270,7 +268,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { // When this method returns, we no longer will have the reference to the // metadata, so we can release all memory from the metadata allocator. defer c.metadataAllocator.ReleaseAll() - if c.Ctx == nil { + if !c.initialized { // The columnarizer wasn't initialized, so the wrapped processors might // not have been started leaving them in an unsafe to drain state, so // we skip the draining. Mostly likely this happened because a panic was @@ -301,7 +299,7 @@ func (c *Columnarizer) Close(context.Context) error { func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata { // Close will call InternalClose(). Note that we don't return any trailing // metadata here because the columnarizers propagate it in DrainMeta. - if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { + if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil { // Close never returns an error. colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) } diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 6cdf9a2711e3..cfbd462dadc0 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -352,18 +352,10 @@ func (m *Materializer) close() { if m.Closed { return } - if m.Ctx == nil { - // In some edge cases (like when Init of an operator above this - // materializer encounters a panic), the materializer might never be - // started, yet it still will attempt to close its Closers. This - // context is only used for logging purposes, so it is ok to grab - // the background context in order to prevent a NPE below. - m.Ctx = context.Background() - } // Make sure to call InternalClose() only after closing the closers - this // allows the closers to utilize the unfinished tracing span (if tracing is // enabled). - m.closers.CloseAndLogOnErr(m.Ctx, "materializer") + m.closers.CloseAndLogOnErr(m.Ctx(), "materializer") m.InternalClose() } diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index e92b23f62605..38636ebf65f2 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -357,10 +357,10 @@ type ProcessorBaseNoHelper struct { // has been closed. Closed bool - // Ctx and span contain the tracing state while the processor is active + // ctx and span contain the tracing state while the processor is active // (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise // used). - Ctx context.Context + ctx context.Context span *tracing.Span // origCtx is the context from which ctx was derived. InternalClose() resets // ctx to this. @@ -518,7 +518,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { // not permitted. if err != nil { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "MoveToDraining called in state %s with err: %+v", pb.State, err) @@ -548,7 +548,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { func (pb *ProcessorBaseNoHelper) DrainHelper() *execinfrapb.ProducerMetadata { if pb.State == StateRunning { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "drain helper called in StateRunning", ) @@ -659,7 +659,7 @@ func (pb *ProcessorBase) HijackExecStatsForTrace() func() *execinfrapb.Component func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { if pb.State == StateTrailingMeta || pb.State == StateExhausted { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "moveToTrailingMeta called in state: %s", pb.State, @@ -682,10 +682,10 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { } } - if buildutil.CrdbTestBuild && pb.Ctx == nil { + if buildutil.CrdbTestBuild && pb.ctx == nil { panic( errors.AssertionFailedf( - "unexpected nil ProcessorBase.Ctx when draining. Was StartInternal called?", + "unexpected nil ProcessorBase.ctx when draining. Was StartInternal called?", ), ) } @@ -709,7 +709,7 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { // should continue processing other rows, with the awareness that the processor // might have been transitioned to the draining phase. func (pb *ProcessorBase) ProcessRowHelper(row rowenc.EncDatumRow) rowenc.EncDatumRow { - outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx, row) + outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx(), row) if err != nil { pb.MoveToDraining(err) return nil @@ -731,7 +731,7 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) { panic("processor output is not set for emitting rows") } pb.self.Start(ctx) - Run(pb.Ctx, pb.self, pb.Output) + Run(pb.ctx, pb.self, pb.Output) } // ProcStateOpts contains fields used by the ProcessorBase's family of functions @@ -872,16 +872,25 @@ func (pb *ProcessorBaseNoHelper) startImpl( ) context.Context { pb.origCtx = ctx if createSpan { - pb.Ctx, pb.span = ProcessorSpan(ctx, spanName) + pb.ctx, pb.span = ProcessorSpan(ctx, spanName) if pb.span != nil && pb.span.IsVerbose() { pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String())) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID))) } } else { - pb.Ctx = ctx + pb.ctx = ctx } - pb.EvalCtx.Context = pb.Ctx - return pb.Ctx + pb.EvalCtx.Context = pb.ctx + return pb.ctx +} + +// Ctx is an accessor method for ctx which is guaranteed to return non-nil +// context even if StartInternal() hasn't been called. +func (pb *ProcessorBaseNoHelper) Ctx() context.Context { + if pb.ctx == nil { + return context.Background() + } + return pb.ctx } // InternalClose helps processors implement the RowSource interface, performing @@ -938,7 +947,7 @@ func (pb *ProcessorBaseNoHelper) InternalCloseEx(onClose func()) bool { pb.span = nil // Reset the context so that any incidental uses after this point do not // access the finished span. - pb.Ctx = pb.origCtx + pb.ctx = pb.origCtx pb.EvalCtx.Context = pb.origCtx return true } diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index f742d31be420..2146bf7e8b23 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -345,17 +345,17 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) { func (ag *hashAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") // If we have started emitting rows, bucketsIter will represent which // buckets are still open, since buckets are closed once their results are // emitted. if ag.bucketsIter == nil { for _, bucket := range ag.buckets { - bucket.close(ag.Ctx) + bucket.close(ag.Ctx()) } } else { for _, bucket := range ag.bucketsIter { - ag.buckets[bucket].close(ag.Ctx) + ag.buckets[bucket].close(ag.Ctx()) } } // Make sure to release any remaining memory under 'buckets'. @@ -364,25 +364,25 @@ func (ag *hashAggregator) close() { // buckets since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the buckets. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } func (ag *orderedAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") if ag.bucket != nil { - ag.bucket.close(ag.Ctx) + ag.bucket.close(ag.Ctx()) } // Note that we should be closing accounts only after closing the // bucket since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the bucket. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } @@ -420,7 +420,7 @@ func (ag *hashAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -457,7 +457,7 @@ func (ag *hashAggregator) accumulateRows() ( // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(len(ag.buckets))*memsize.String); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(len(ag.buckets))*memsize.String); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -489,7 +489,7 @@ func (ag *orderedAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -533,7 +533,7 @@ func (ag *orderedAggregator) accumulateRows() ( func (ag *aggregatorBase) getAggResults( bucket aggregateFuncs, ) (aggregatorState, rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - defer bucket.close(ag.Ctx) + defer bucket.close(ag.Ctx()) for i, b := range bucket { result, err := b.Result() @@ -579,16 +579,16 @@ func (ag *hashAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } // Before we create a new 'buckets' map below, we need to "release" the // already accounted for memory of the current map. - ag.bucketsAcc.Shrink(ag.Ctx, int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - ag.bucketsAcc.Shrink(ag.Ctx, int64(len(ag.buckets))*memsize.String) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(len(ag.buckets))*memsize.String) ag.bucketsIter = nil ag.buckets = make(map[string]aggregateFuncs) ag.bucketsLenGrowThreshold = hashAggregatorBucketsInitialLen @@ -655,7 +655,7 @@ func (ag *orderedAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -693,7 +693,7 @@ func (ag *hashAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -715,7 +715,7 @@ func (ag *orderedAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -780,7 +780,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( canAdd := true if a.Distinct { canAdd, err = ag.funcs[i].isDistinct( - ag.Ctx, + ag.Ctx(), &ag.datumAlloc, groupKey, firstArg, @@ -793,7 +793,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( if !canAdd { continue } - if err := bucket[i].Add(ag.Ctx, firstArg, otherArgs...); err != nil { + if err := bucket[i].Add(ag.Ctx(), firstArg, otherArgs...); err != nil { return err } } @@ -811,7 +811,7 @@ func (ag *hashAggregator) encode( // used by the aggregate functions (and accounted for accordingly), // this can lead to over-accounting which is acceptable. appendTo, err = row[colIdx].Fingerprint( - ag.Ctx, ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, + ag.Ctx(), ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, ) if err != nil { return appendTo, err @@ -837,7 +837,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { bucket, ok := ag.buckets[string(encoded)] if !ok { - s, err := ag.arena.AllocBytes(ag.Ctx, encoded) + s, err := ag.arena.AllocBytes(ag.Ctx(), encoded) if err != nil { return err } @@ -848,7 +848,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { ag.buckets[s] = bucket if len(ag.buckets) == ag.bucketsLenGrowThreshold { toAccountFor := ag.bucketsLenGrowThreshold - ag.alreadyAccountedFor - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { return err } ag.alreadyAccountedFor = ag.bucketsLenGrowThreshold @@ -948,13 +948,13 @@ func (a *aggregateFuncHolder) isDistinct( } func (ag *aggregatorBase) createAggregateFuncs() (aggregateFuncs, error) { - if err := ag.bucketsAcc.Grow(ag.Ctx, sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { return nil, err } bucket := make(aggregateFuncs, len(ag.funcs)) for i, f := range ag.funcs { agg := f.create(ag.EvalCtx, f.arguments) - if err := ag.bucketsAcc.Grow(ag.Ctx, agg.Size()); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), agg.Size()); err != nil { return nil, err } bucket[i] = agg diff --git a/pkg/sql/rowexec/countrows.go b/pkg/sql/rowexec/countrows.go index 675542f6b7cd..5fe23396a1bf 100644 --- a/pkg/sql/rowexec/countrows.go +++ b/pkg/sql/rowexec/countrows.go @@ -86,7 +86,7 @@ func (ag *countAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMeta if row == nil { ret := make(rowenc.EncDatumRow, 1) ret[0] = rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(ag.count))} - rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx, ret) + rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx(), ret) // We're done as soon as we process our one output row, so we // transition into draining state. We will, however, return non-nil // error (if such occurs during rendering) separately below. diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 41a75656f563..d63dfe23f7f1 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -189,7 +189,7 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro // the references to the row (and to the newly allocated datums) // shortly, it'll likely take some time before GC reclaims that memory, // so we choose the over-accounting route to be safe. - appendTo, err = datum.Fingerprint(d.Ctx, d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) + appendTo, err = datum.Fingerprint(d.Ctx(), d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) if err != nil { return nil, err } @@ -212,8 +212,8 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro func (d *distinct) close() { if d.InternalClose() { - d.memAcc.Close(d.Ctx) - d.MemMonitor.Stop(d.Ctx) + d.memAcc.Close(d.Ctx()) + d.MemMonitor.Stop(d.Ctx()) } } @@ -258,7 +258,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { // allocated on it, which implies that UnsafeReset() is safe to call here. copy(d.lastGroupKey, row) d.haveLastGroupKey = true - if err := d.arena.UnsafeReset(d.Ctx); err != nil { + if err := d.arena.UnsafeReset(d.Ctx()); err != nil { d.MoveToDraining(err) break } @@ -282,7 +282,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { } continue } - s, err := d.arena.AllocBytes(d.Ctx, encoding) + s, err := d.arena.AllocBytes(d.Ctx(), encoding) if err != nil { d.MoveToDraining(err) break diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index fc6a8685d261..62c9b6c97b04 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -157,7 +157,7 @@ func newHashJoiner( } return h, h.hashTable.Init( - h.Ctx, + h.Ctx(), shouldMarkRightSide(h.joinType), h.rightSource.OutputTypes(), h.eqCols[rightSide], @@ -189,7 +189,7 @@ func (h *hashJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) case hjEmittingRightUnmatched: h.runningState, row, meta = h.emitRightUnmatched() default: - log.Fatalf(h.Ctx, "unsupported state: %d", h.runningState) + log.Fatalf(h.Ctx(), "unsupported state: %d", h.runningState) } if row == nil && meta == nil { @@ -235,14 +235,14 @@ func (h *hashJoiner) build() (hashJoinerState, rowenc.EncDatumRow, *execinfrapb. return hjStateUnknown, nil, h.DrainHelper() } // If hashTable is in-memory, pre-reserve the memory needed to mark. - if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx); err != nil { + if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } return hjReadingLeftSide, nil, nil } - err = h.hashTable.AddRow(h.Ctx, row) + err = h.hashTable.AddRow(h.Ctx(), row) // Regardless of the underlying row container (disk backed or in-memory // only), we cannot do anything about an error if it occurs. if err != nil { @@ -276,7 +276,7 @@ func (h *hashJoiner) readLeftSide() ( // hjEmittingRightUnmatched if unmatched rows on the right side need to // be emitted, otherwise finish. if shouldEmitUnmatchedRow(rightSide, h.joinType) { - i := h.hashTable.NewUnmarkedIterator(h.Ctx) + i := h.hashTable.NewUnmarkedIterator(h.Ctx()) i.Rewind() h.emittingRightUnmatchedState.iter = i return hjEmittingRightUnmatched, nil, nil @@ -290,14 +290,14 @@ func (h *hashJoiner) readLeftSide() ( h.probingRowState.row = row h.probingRowState.matched = false if h.probingRowState.iter == nil { - i, err := h.hashTable.NewBucketIterator(h.Ctx, row, h.eqCols[leftSide]) + i, err := h.hashTable.NewBucketIterator(h.Ctx(), row, h.eqCols[leftSide]) if err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } h.probingRowState.iter = i } else { - if err := h.probingRowState.iter.Reset(h.Ctx, row); err != nil { + if err := h.probingRowState.iter.Reset(h.Ctx(), row); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -358,7 +358,7 @@ func (h *hashJoiner) probeRow() ( h.probingRowState.matched = true shouldEmit := true if shouldMarkRightSide(h.joinType) { - if i.IsMarked(h.Ctx) { + if i.IsMarked(h.Ctx()) { switch h.joinType { case descpb.RightSemiJoin: // The row from the right already had a match and was emitted @@ -375,7 +375,7 @@ func (h *hashJoiner) probeRow() ( // whether we have a corresponding unmarked row from the right. h.probingRowState.matched = false } - } else if err := i.Mark(h.Ctx); err != nil { + } else if err := i.Mark(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -448,16 +448,16 @@ func (h *hashJoiner) emitRightUnmatched() ( func (h *hashJoiner) close() { if h.InternalClose() { - h.hashTable.Close(h.Ctx) + h.hashTable.Close(h.Ctx()) if h.probingRowState.iter != nil { h.probingRowState.iter.Close() } if h.emittingRightUnmatchedState.iter != nil { h.emittingRightUnmatchedState.iter.Close() } - h.MemMonitor.Stop(h.Ctx) + h.MemMonitor.Stop(h.Ctx()) if h.diskMonitor != nil { - h.diskMonitor.Stop(h.Ctx) + h.diskMonitor.Stop(h.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 7cc464e4c762..c50cbcdddc25 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -169,7 +169,7 @@ func (ifr *invertedFilterer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case ifrEmittingRows: ifr.runningState, row, meta = ifr.emitRow() default: - log.Fatalf(ifr.Ctx, "unsupported state: %d", ifr.runningState) + log.Fatalf(ifr.Ctx(), "unsupported state: %d", ifr.runningState) } if row == nil && meta == nil { continue @@ -194,9 +194,9 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr return ifrReadingInput, meta } if row == nil { - log.VEventf(ifr.Ctx, 1, "no more input rows") + log.VEventf(ifr.Ctx(), 1, "no more input rows") evalResult := ifr.invertedEval.evaluate() - ifr.rc.SetupForRead(ifr.Ctx, evalResult) + ifr.rc.SetupForRead(ifr.Ctx(), evalResult) // invertedEval had a single expression in the batch, and the results // for that expression are in evalResult[0]. ifr.evalResult = evalResult[0] @@ -245,7 +245,7 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr // evaluator. copy(ifr.keyRow, row[:ifr.invertedColIdx]) copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:]) - keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow) + keyIndex, err := ifr.rc.AddRow(ifr.Ctx(), ifr.keyRow) if err != nil { ifr.MoveToDraining(err) return ifrStateUnknown, ifr.DrainHelper() @@ -273,11 +273,11 @@ func (ifr *invertedFilterer) emitRow() ( } if ifr.resultIdx >= len(ifr.evalResult) { // We are done emitting all rows. - return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx)) + return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx())) } curRowIdx := ifr.resultIdx ifr.resultIdx++ - keyRow, err := ifr.rc.GetRow(ifr.Ctx, ifr.evalResult[curRowIdx], false /* skip */) + keyRow, err := ifr.rc.GetRow(ifr.Ctx(), ifr.evalResult[curRowIdx], false /* skip */) if err != nil { return drainFunc(err) } @@ -301,12 +301,12 @@ func (ifr *invertedFilterer) ConsumerClosed() { func (ifr *invertedFilterer) close() { if ifr.InternalClose() { - ifr.rc.Close(ifr.Ctx) + ifr.rc.Close(ifr.Ctx()) if ifr.MemMonitor != nil { - ifr.MemMonitor.Stop(ifr.Ctx) + ifr.MemMonitor.Stop(ifr.Ctx()) } if ifr.diskMonitor != nil { - ifr.diskMonitor.Stop(ifr.Ctx) + ifr.diskMonitor.Stop(ifr.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 2dc24f211e7b..a946d0e8947b 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -385,7 +385,7 @@ func (ij *invertedJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case ijEmittingRows: ij.runningState, row, meta = ij.emitRow() default: - log.Fatalf(ij.Ctx, "unsupported state: %d", ij.runningState) + log.Fatalf(ij.Ctx(), "unsupported state: %d", ij.runningState) } if row == nil && meta == nil { continue @@ -416,7 +416,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce break } - expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx, row) + expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx(), row) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -466,12 +466,12 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } if len(ij.inputRows) == 0 { - log.VEventf(ij.Ctx, 1, "no more input rows") + log.VEventf(ij.Ctx(), 1, "no more input rows") // We're done. ij.MoveToDraining(nil) return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "read %d input rows", len(ij.inputRows)) + log.VEventf(ij.Ctx(), 1, "read %d input rows", len(ij.inputRows)) spans, err := ij.batchedExprEval.init() if err != nil { @@ -495,9 +495,9 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "scanning %d spans", len(ij.indexSpans)) + log.VEventf(ij.Ctx(), 1, "scanning %d spans", len(ij.indexSpans)) if err = ij.fetcher.StartScan( - ij.Ctx, ij.indexSpans, nil, /* spanIDs */ + ij.Ctx(), ij.indexSpans, nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, ); err != nil { ij.MoveToDraining(err) @@ -508,11 +508,11 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.ProducerMetadata) { - log.VEventf(ij.Ctx, 1, "joining rows") + log.VEventf(ij.Ctx(), 1, "joining rows") // Read the entire set of rows that are part of the scan. for { // Fetch the next row and copy it into the row container. - fetchedRow, _, err := ij.fetcher.NextRow(ij.Ctx) + fetchedRow, _, err := ij.fetcher.NextRow(ij.Ctx()) if err != nil { ij.MoveToDraining(scrub.UnwrapScrubError(err)) return ijStateUnknown, ij.DrainHelper() @@ -557,7 +557,7 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ return ijStateUnknown, ij.DrainHelper() } if shouldAdd { - rowIdx, err := ij.indexRows.AddRow(ij.Ctx, fetchedRow) + rowIdx, err := ij.indexRows.AddRow(ij.Ctx(), fetchedRow) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -569,8 +569,8 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ } } ij.joinedRowIdx = ij.batchedExprEval.evaluate() - ij.indexRows.SetupForRead(ij.Ctx, ij.joinedRowIdx) - log.VEventf(ij.Ctx, 1, "done evaluating expressions") + ij.indexRows.SetupForRead(ij.Ctx(), ij.joinedRowIdx) + log.VEventf(ij.Ctx(), 1, "done evaluating expressions") return ijEmittingRows, nil } @@ -587,7 +587,7 @@ func (ij *invertedJoiner) emitRow() ( ) { // Finished processing the batch. if ij.emitCursor.inputRowIdx >= len(ij.joinedRowIdx) { - log.VEventf(ij.Ctx, 1, "done emitting rows") + log.VEventf(ij.Ctx(), 1, "done emitting rows") // Ready for another input batch. Reset state. ij.inputRows = ij.inputRows[:0] ij.batchedExprEval.reset() @@ -595,7 +595,7 @@ func (ij *invertedJoiner) emitRow() ( ij.emitCursor.outputRowIdx = 0 ij.emitCursor.inputRowIdx = 0 ij.emitCursor.seenMatch = false - if err := ij.indexRows.UnsafeReset(ij.Ctx); err != nil { + if err := ij.indexRows.UnsafeReset(ij.Ctx()); err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() } @@ -625,7 +625,7 @@ func (ij *invertedJoiner) emitRow() ( inputRow := ij.inputRows[ij.emitCursor.inputRowIdx] joinedRowIdx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - indexedRow, err := ij.indexRows.GetRow(ij.Ctx, joinedRowIdx, false /* skip */) + indexedRow, err := ij.indexRows.GetRow(ij.Ctx(), joinedRowIdx, false /* skip */) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() @@ -639,7 +639,7 @@ func (ij *invertedJoiner) emitRow() ( skipRemaining := func() error { for ; ij.emitCursor.outputRowIdx < len(ij.joinedRowIdx[ij.emitCursor.inputRowIdx]); ij.emitCursor.outputRowIdx++ { idx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - if _, err := ij.indexRows.GetRow(ij.Ctx, idx, true /* skip */); err != nil { + if _, err := ij.indexRows.GetRow(ij.Ctx(), idx, true /* skip */); err != nil { return err } } @@ -740,14 +740,14 @@ func (ij *invertedJoiner) ConsumerClosed() { func (ij *invertedJoiner) close() { if ij.InternalClose() { if ij.fetcher != nil { - ij.fetcher.Close(ij.Ctx) + ij.fetcher.Close(ij.Ctx()) } if ij.indexRows != nil { - ij.indexRows.Close(ij.Ctx) + ij.indexRows.Close(ij.Ctx()) } - ij.MemMonitor.Stop(ij.Ctx) + ij.MemMonitor.Stop(ij.Ctx()) if ij.diskMonitor != nil { - ij.diskMonitor.Stop(ij.Ctx) + ij.diskMonitor.Stop(ij.Ctx()) } } } @@ -762,14 +762,14 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - ij.scanStats = execstats.GetScanStats(ij.Ctx, ij.ExecStatsTrace) + ij.scanStats = execstats.GetScanStats(ij.Ctx(), ij.ExecStatsTrace) ret := execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(ij.fetcher.GetBytesRead())), TuplesRead: fis.NumTuples, KVTime: fis.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(ij.Ctx(), ij.ExecStatsTrace)), BatchRequestsIssued: optional.MakeUint(uint64(ij.fetcher.GetBatchRequestsIssued())), }, Exec: execinfrapb.ExecStats{ @@ -788,7 +788,7 @@ func (ij *invertedJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = ij.fetcher.GetBytesRead() meta.Metrics.RowsRead = ij.rowsRead - if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx, ij.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx(), ij.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 22f6295e2cca..883ba873d5c7 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -738,7 +738,7 @@ func (jr *joinReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) meta = jr.DrainHelper() jr.runningState = jrStateUnknown default: - log.Fatalf(jr.Ctx, "unsupported state: %d", jr.runningState) + log.Fatalf(jr.Ctx(), "unsupported state: %d", jr.runningState) } if row == nil && meta == nil { continue @@ -915,7 +915,7 @@ func (jr *joinReader) readInput() ( } if len(jr.scratchInputRows) == 0 { - log.VEventf(jr.Ctx, 1, "no more input rows") + log.VEventf(jr.Ctx(), 1, "no more input rows") if outRow != nil { return jrReadyToDrain, outRow, nil } @@ -923,7 +923,7 @@ func (jr *joinReader) readInput() ( jr.MoveToDraining(nil) return jrStateUnknown, nil, jr.DrainHelper() } - log.VEventf(jr.Ctx, 1, "read %d input rows", len(jr.scratchInputRows)) + log.VEventf(jr.Ctx(), 1, "read %d input rows", len(jr.scratchInputRows)) if jr.groupingState != nil && len(jr.scratchInputRows) > 0 { jr.updateGroupingStateForNonEmptyBatch() @@ -979,7 +979,7 @@ func (jr *joinReader) readInput() ( } } - log.VEventf(jr.Ctx, 1, "scanning %d spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d spans", len(spans)) // Note that the fetcher takes ownership of the spans slice - it will modify // it and perform the memory accounting. We don't care about the // modification here, but we want to be conscious about the memory @@ -997,7 +997,7 @@ func (jr *joinReader) readInput() ( } } if err = jr.fetcher.StartScan( - jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1010,7 +1010,7 @@ func (jr *joinReader) readInput() ( func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMetadata) { for { // Fetch the next row and tell the strategy to process it. - lookedUpRow, spanID, err := jr.fetcher.NextRow(jr.Ctx) + lookedUpRow, spanID, err := jr.fetcher.NextRow(jr.Ctx()) if err != nil { jr.MoveToDraining(scrub.UnwrapScrubError(err)) return jrStateUnknown, jr.DrainHelper() @@ -1022,7 +1022,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet jr.rowsRead++ jr.curBatchRowsRead++ - if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx, lookedUpRow, spanID); err != nil { + if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx(), lookedUpRow, spanID); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() } else if nextState != jrPerformingLookup { @@ -1055,13 +1055,13 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet sortSpans(spans, spanIDs) } - log.VEventf(jr.Ctx, 1, "scanning %d remote spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans)) bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) if !jr.shouldLimitBatches { bytesLimit = rowinfra.NoBytesLimit } if err := jr.fetcher.StartScan( - jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() @@ -1070,8 +1070,8 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet } } - log.VEvent(jr.Ctx, 1, "done joining rows") - jr.strategy.prepareToEmit(jr.Ctx) + log.VEvent(jr.Ctx(), 1, "done joining rows") + jr.strategy.prepareToEmit(jr.Ctx()) // Check if the strategy spilled to disk and reduce the batch size if it // did. @@ -1102,7 +1102,7 @@ func (jr *joinReader) emitRow() ( rowenc.EncDatumRow, *execinfrapb.ProducerMetadata, ) { - rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx) + rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx()) if err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1134,26 +1134,26 @@ func (jr *joinReader) ConsumerClosed() { func (jr *joinReader) close() { if jr.InternalClose() { if jr.fetcher != nil { - jr.fetcher.Close(jr.Ctx) + jr.fetcher.Close(jr.Ctx()) } if jr.usesStreamer { - jr.streamerInfo.budgetAcc.Close(jr.Ctx) - jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx) - jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx) + jr.streamerInfo.budgetAcc.Close(jr.Ctx()) + jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx()) + jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx()) if jr.streamerInfo.diskMonitor != nil { - jr.streamerInfo.diskMonitor.Stop(jr.Ctx) + jr.streamerInfo.diskMonitor.Stop(jr.Ctx()) } } - jr.strategy.close(jr.Ctx) - jr.memAcc.Close(jr.Ctx) + jr.strategy.close(jr.Ctx()) + jr.memAcc.Close(jr.Ctx()) if jr.limitedMemMonitor != nil { - jr.limitedMemMonitor.Stop(jr.Ctx) + jr.limitedMemMonitor.Stop(jr.Ctx()) } if jr.MemMonitor != nil { - jr.MemMonitor.Stop(jr.Ctx) + jr.MemMonitor.Stop(jr.Ctx()) } if jr.diskMonitor != nil { - jr.diskMonitor.Stop(jr.Ctx) + jr.diskMonitor.Stop(jr.Ctx()) } } } @@ -1169,14 +1169,14 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats { return nil } - jr.scanStats = execstats.GetScanStats(jr.Ctx, jr.ExecStatsTrace) + jr.scanStats = execstats.GetScanStats(jr.Ctx(), jr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(jr.fetcher.GetBytesRead())), TuplesRead: fis.NumTuples, KVTime: fis.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(jr.Ctx(), jr.ExecStatsTrace)), BatchRequestsIssued: optional.MakeUint(uint64(jr.fetcher.GetBatchRequestsIssued())), }, Output: jr.OutputHelper.Stats(), @@ -1203,7 +1203,7 @@ func (jr *joinReader) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.RowsRead = jr.rowsRead meta.Metrics.BytesRead = jr.fetcher.GetBytesRead() - if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx, jr.txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx(), jr.txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index 657dfdcd9861..dcb42caf786a 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -187,7 +187,7 @@ func (s *joinReaderNoOrderingStrategy) generateRemoteSpans() (roachpb.Spans, []i return nil, nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) generatedRemoteSpans() bool { @@ -200,7 +200,7 @@ func (s *joinReaderNoOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false s.emitState.unmatchedInputRowIndicesInitialized = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) processLookedUpRow( @@ -320,13 +320,13 @@ func (s *joinReaderNoOrderingStrategy) spilled() bool { return false } func (s *joinReaderNoOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderNoOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderNoOrderingStrategy) close(ctx context.Context) { @@ -405,7 +405,7 @@ func (s *joinReaderIndexJoinStrategy) processLookupRows( rows []rowenc.EncDatumRow, ) (roachpb.Spans, []int, error) { s.inputRows = rows - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderIndexJoinStrategy) processLookedUpRow( @@ -435,13 +435,13 @@ func (s *joinReaderIndexJoinStrategy) spilled() bool { func (s *joinReaderIndexJoinStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderIndexJoinStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderIndexJoinStrategy) close(ctx context.Context) { @@ -592,7 +592,7 @@ func (s *joinReaderOrderingStrategy) generateRemoteSpans() (roachpb.Spans, []int return nil, nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) generatedRemoteSpans() bool { @@ -642,7 +642,7 @@ func (s *joinReaderOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) processLookedUpRow( @@ -807,7 +807,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( lookedUpRow = s.emitCursor.notBufferedRow } else { lookedUpRow, err = s.lookedUpRows.GetRow( - s.Ctx, lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ + s.Ctx(), lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ ) } if err != nil { @@ -858,16 +858,16 @@ func (s *joinReaderOrderingStrategy) close(ctx context.Context) { func (s *joinReaderOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - if err := memAcc.Grow(s.Ctx, delta); err != nil { + if err := memAcc.Grow(s.Ctx(), delta); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } return nil } @@ -879,16 +879,16 @@ func (s *joinReaderOrderingStrategy) growMemoryAccount( func (s *joinReaderOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - if err := memAcc.Resize(s.Ctx, oldSz, newSz); err != nil { + if err := memAcc.Resize(s.Ctx(), oldSz, newSz); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } return nil } diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index 827215321b03..16c7b06484b3 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -232,7 +232,7 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada // TODO(paul): Investigate (with benchmarks) whether or not it's // worthwhile to only buffer one row from the right stream per batch // for semi-joins. - m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx, m.EvalCtx) + m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx(), m.EvalCtx) if meta != nil { return nil, meta } @@ -251,8 +251,8 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada func (m *mergeJoiner) close() { if m.InternalClose() { - m.streamMerger.close(m.Ctx) - m.MemMonitor.Stop(m.Ctx) + m.streamMerger.close(m.Ctx()) + m.MemMonitor.Stop(m.Ctx()) } } diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index b9126391bd97..07e7be0c6571 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -315,7 +315,8 @@ func TestAggregatorSpecAggregationEquals(t *testing.T) { func TestProcessorBaseContext(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() + // Use a custom context to distinguish it from the background one. + ctx := context.WithValue(context.Background(), struct{}{}, struct{}{}) st := cluster.MakeTestingClusterSettings() runTest := func(t *testing.T, f func(noop *noopProcessor)) { @@ -331,18 +332,22 @@ func TestProcessorBaseContext(t *testing.T) { if err != nil { t.Fatal(err) } + // Before Start we should get the background context. + if noop.Ctx() != context.Background() { + t.Fatalf("ProcessorBase.Ctx() didn't return the background context before Start") + } noop.Start(ctx) - origCtx := noop.Ctx + origCtx := noop.Ctx() // The context should be valid after Start but before Next is called in case // ConsumerDone or ConsumerClosed are called without calling Next. - if noop.Ctx == nil { + if noop.Ctx() == context.Background() { t.Fatalf("ProcessorBase.ctx not initialized") } f(noop) // The context should be reset after ConsumerClosed is called so that any // subsequent logging calls will not operate on closed spans. - if noop.Ctx != origCtx { + if noop.Ctx() != origCtx { t.Fatalf("ProcessorBase.ctx not reset on close") } } diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 9aacf6891448..370ad58449e0 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -159,7 +159,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // First, make sure to close its ValueGenerator from the previous // input row (if it exists). if ps.gens[i] != nil { - ps.gens[i].Close(ps.Ctx) + ps.gens[i].Close(ps.Ctx()) ps.gens[i] = nil } @@ -182,7 +182,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // Store the generator before Start so that it'll be closed even if // Start returns an error. ps.gens[i] = gen - if err := gen.Start(ps.Ctx, ps.FlowCtx.Txn); err != nil { + if err := gen.Start(ps.Ctx(), ps.FlowCtx.Txn); err != nil { return nil, nil, err } } @@ -203,7 +203,7 @@ func (ps *projectSetProcessor) nextGeneratorValues() (newValAvail bool, err erro numCols := int(ps.spec.NumColsPerGen[i]) if !ps.done[i] { // Yes; check whether this source still has some values available. - hasVals, err := gen.Next(ps.Ctx) + hasVals, err := gen.Next(ps.Ctx()) if err != nil { return false, err } @@ -314,7 +314,7 @@ func (ps *projectSetProcessor) close() { ps.InternalCloseEx(func() { for _, gen := range ps.gens { if gen != nil { - gen.Close(ps.Ctx) + gen.Close(ps.Ctx()) } } }) diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index cba01aacd3dc..67ebf28a1be5 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -205,9 +205,9 @@ func (s *sampleAggregator) Run(ctx context.Context) { func (s *sampleAggregator) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.tempMemAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.tempMemAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 365349fbeae9..c92e33208e5d 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -478,8 +478,8 @@ func (s *samplerProcessor) sampleRow( func (s *samplerProcessor) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 3d2bd1734b59..3fc9a882ad04 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -117,10 +117,10 @@ func (s *sorterBase) close() { if s.i != nil { s.i.Close() } - s.rows.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.rows.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) if s.diskMonitor != nil { - s.diskMonitor.Stop(s.Ctx) + s.diskMonitor.Stop(s.Ctx()) } } } @@ -250,13 +250,13 @@ func (s *sortAllProcessor) fill() (ok bool, _ error) { break } - if err := s.rows.AddRow(s.Ctx, row); err != nil { + if err := s.rows.AddRow(s.Ctx(), row); err != nil { return false, err } } - s.rows.Sort(s.Ctx) + s.rows.Sort(s.Ctx()) - s.i = s.rows.NewFinalIterator(s.Ctx) + s.i = s.rows.NewFinalIterator(s.Ctx()) s.i.Rewind() return true, nil } @@ -421,7 +421,7 @@ func newSortChunksProcessor( ); err != nil { return nil, err } - proc.i = proc.rows.NewFinalIterator(proc.Ctx) + proc.i = proc.rows.NewFinalIterator(proc.Ctx()) return proc, nil } @@ -448,7 +448,7 @@ func (s *sortChunksProcessor) chunkCompleted( // if a metadata record was encountered). The caller is expected to drain when // this returns false. func (s *sortChunksProcessor) fill() (bool, error) { - ctx := s.Ctx + ctx := s.Ctx() var meta *execinfrapb.ProducerMetadata @@ -518,7 +518,7 @@ func (s *sortChunksProcessor) Start(ctx context.Context) { // Next is part of the RowSource interface. func (s *sortChunksProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - ctx := s.Ctx + ctx := s.Ctx() for s.State == execinfra.StateRunning { ok, err := s.i.Valid() if err != nil { diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 2ea71b40f80b..40745a862d94 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -254,7 +254,7 @@ func TestingSetScannedRowProgressFrequency(val int64) func() { func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { for tr.State == execinfra.StateRunning { if !tr.scanStarted { - err := tr.startScan(tr.Ctx) + err := tr.startScan(tr.Ctx()) if err != nil { tr.MoveToDraining(err) break @@ -269,7 +269,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata return nil, meta } - row, _, err := tr.fetcher.NextRow(tr.Ctx) + row, _, err := tr.fetcher.NextRow(tr.Ctx()) if row == nil || err != nil { tr.MoveToDraining(err) break @@ -290,7 +290,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata func (tr *tableReader) close() { if tr.InternalClose() { if tr.fetcher != nil { - tr.fetcher.Close(tr.Ctx) + tr.fetcher.Close(tr.Ctx()) } } } @@ -306,13 +306,13 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - tr.scanStats = execstats.GetScanStats(tr.Ctx, tr.ExecStatsTrace) + tr.scanStats = execstats.GetScanStats(tr.Ctx(), tr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(tr.fetcher.GetBytesRead())), TuplesRead: is.NumTuples, KVTime: is.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(tr.Ctx, tr.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(tr.Ctx(), tr.ExecStatsTrace)), BatchRequestsIssued: optional.MakeUint(uint64(tr.fetcher.GetBatchRequestsIssued())), }, Output: tr.OutputHelper.Stats(), @@ -326,13 +326,13 @@ func (tr *tableReader) generateMeta() []execinfrapb.ProducerMetadata { if !tr.ignoreMisplannedRanges { nodeID, ok := tr.FlowCtx.NodeID.OptionalNodeID() if ok { - ranges := execinfra.MisplannedRanges(tr.Ctx, tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) + ranges := execinfra.MisplannedRanges(tr.Ctx(), tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) if ranges != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{Ranges: ranges}) } } } - if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx, tr.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx(), tr.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index 773e4029166b..687c19ed549d 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -223,7 +223,7 @@ func (w *windower) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { case windowerEmittingRows: w.runningState, row, meta = w.emitRow() default: - log.Fatalf(w.Ctx, "unsupported state: %d", w.runningState) + log.Fatalf(w.Ctx(), "unsupported state: %d", w.runningState) } if row == nil && meta == nil { @@ -245,16 +245,16 @@ func (w *windower) close() { if w.allRowsIterator != nil { w.allRowsIterator.Close() } - w.allRowsPartitioned.Close(w.Ctx) + w.allRowsPartitioned.Close(w.Ctx()) if w.partition != nil { - w.partition.Close(w.Ctx) + w.partition.Close(w.Ctx()) } for _, builtin := range w.builtins { - builtin.Close(w.Ctx, w.EvalCtx) + builtin.Close(w.Ctx(), w.EvalCtx) } - w.acc.Close(w.Ctx) - w.MemMonitor.Stop(w.Ctx) - w.diskMonitor.Stop(w.Ctx) + w.acc.Close(w.Ctx()) + w.MemMonitor.Stop(w.Ctx()) + w.diskMonitor.Stop(w.Ctx()) } } @@ -278,17 +278,17 @@ func (w *windower) accumulateRows() ( return windowerAccumulating, nil, meta } if row == nil { - log.VEvent(w.Ctx, 1, "accumulation complete") + log.VEvent(w.Ctx(), 1, "accumulation complete") w.inputDone = true // We need to sort all the rows based on partitionBy columns so that all // rows belonging to the same hash bucket are contiguous. - w.allRowsPartitioned.Sort(w.Ctx) + w.allRowsPartitioned.Sort(w.Ctx()) break } // The underlying row container will decode all datums as necessary, so we // don't need to worry about that. - if err := w.allRowsPartitioned.AddRow(w.Ctx, row); err != nil { + if err := w.allRowsPartitioned.AddRow(w.Ctx(), row); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -311,7 +311,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr return windowerStateUnknown, nil, w.DrainHelper() } - if err := w.computeWindowFunctions(w.Ctx, w.EvalCtx); err != nil { + if err := w.computeWindowFunctions(w.Ctx(), w.EvalCtx); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -341,7 +341,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr func (w *windower) spillAllRowsToDisk() error { if w.allRowsPartitioned != nil { if !w.allRowsPartitioned.UsingDisk() { - if err := w.allRowsPartitioned.SpillToDisk(w.Ctx); err != nil { + if err := w.allRowsPartitioned.SpillToDisk(w.Ctx()); err != nil { return err } } else { @@ -349,7 +349,7 @@ func (w *windower) spillAllRowsToDisk() error { // w.partition if possible. if w.partition != nil { if !w.partition.UsingDisk() { - if err := w.partition.SpillToDisk(w.Ctx); err != nil { + if err := w.partition.SpillToDisk(w.Ctx()); err != nil { return err } } @@ -363,12 +363,12 @@ func (w *windower) spillAllRowsToDisk() error { // error, it forces all rows to spill and attempts to grow acc by usage // one more time. func (w *windower) growMemAccount(acc *mon.BoundAccount, usage int64) error { - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { if sqlerrors.IsOutOfMemoryError(err) { if err := w.spillAllRowsToDisk(); err != nil { return err } - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { return err } } else { @@ -694,7 +694,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con } } } - if err := w.partition.AddRow(w.Ctx, row); err != nil { + if err := w.partition.AddRow(w.Ctx(), row); err != nil { return err } } @@ -708,7 +708,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con func (w *windower) populateNextOutputRow() (bool, error) { if w.partitionIdx < len(w.partitionSizes) { if w.allRowsIterator == nil { - w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx) + w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx()) w.allRowsIterator.Rewind() } // rowIdx is the index of the next row to be emitted from the diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index fdeaeb43781e..25c9927d10b0 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -493,9 +493,9 @@ func (z *zigzagJoiner) setupInfo( func (z *zigzagJoiner) close() { if z.InternalClose() { for i := range z.infos { - z.infos[i].fetcher.Close(z.Ctx) + z.infos[i].fetcher.Close(z.Ctx()) } - log.VEventf(z.Ctx, 2, "exiting zigzag joiner run") + log.VEventf(z.Ctx(), 2, "exiting zigzag joiner run") } } @@ -790,17 +790,17 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { curInfo := &z.infos[z.side] err := curInfo.fetcher.StartScan( - z.Ctx, + z.Ctx(), roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(z.EvalCtx.TestingKnobs.ForceProductionValues), zigzagJoinerBatchSize, ) if err != nil { - log.Errorf(z.Ctx, "scan error: %s", err) + log.Errorf(z.Ctx(), "scan error: %s", err) return err } - fetchedRow, err := z.fetchRow(z.Ctx) + fetchedRow, err := z.fetchRow(z.Ctx()) if err != nil { return scrub.UnwrapScrubError(err) } @@ -818,7 +818,7 @@ func (z *zigzagJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata z.MoveToDraining(err) break } - row, err := z.nextRow(z.Ctx) + row, err := z.nextRow(z.Ctx()) if err != nil { z.MoveToDraining(err) break @@ -843,11 +843,11 @@ func (z *zigzagJoiner) ConsumerClosed() { // execStatsForTrace implements ProcessorBase.ExecStatsForTrace. func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats { - z.scanStats = execstats.GetScanStats(z.Ctx, z.ExecStatsTrace) + z.scanStats = execstats.GetScanStats(z.Ctx(), z.ExecStatsTrace) kvStats := execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(z.getBytesRead())), - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(z.Ctx, z.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(z.Ctx(), z.ExecStatsTrace)), BatchRequestsIssued: optional.MakeUint(uint64(z.getBatchRequestsIssued())), } execstats.PopulateKVMVCCStats(&kvStats, &z.scanStats) @@ -895,7 +895,7 @@ func (z *zigzagJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = z.getBytesRead() meta.Metrics.RowsRead = z.getRowsRead() - if tfs := execinfra.GetLeafTxnFinalState(z.Ctx, z.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(z.Ctx(), z.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta