diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 20dbec394963..20924052bb22 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -211,7 +211,7 @@ func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() bp.agg.Close() if bp.InternalClose() { - bp.memAcc.Close(bp.Ctx) + bp.memAcc.Close(bp.Ctx()) } } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 35c76fea4645..808b2fc6f82b 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -549,8 +549,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce prog.ProgressDetails = *details case meta := <-rd.metaCh: return nil, meta - case <-rd.Ctx.Done(): - rd.MoveToDraining(rd.Ctx.Err()) + case <-rd.Ctx().Done(): + rd.MoveToDraining(rd.Ctx().Err()) return nil, rd.DrainHelper() } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 903d37ac51d7..f54e1cdd6c32 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -383,8 +383,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix) - mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx, - mockRestoreDataSpec) + mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec) require.NoError(t, err) ssts := make(chan mergedSST, 1) require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) @@ -427,15 +426,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } func newTestingRestoreDataProcessor( - ctx context.Context, - evalCtx *eval.Context, - flowCtx *execinfra.FlowCtx, - spec execinfrapb.RestoreDataSpec, + evalCtx *eval.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec, ) (*restoreDataProcessor, error) { rd := &restoreDataProcessor{ ProcessorBase: execinfra.ProcessorBase{ ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{ - Ctx: ctx, EvalCtx: evalCtx, }, }, diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index d447a7a6d523..c2d327162c8b 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -448,20 +448,20 @@ func (ca *changeAggregator) close() { } if ca.eventConsumer != nil { if err := ca.eventConsumer.Close(); err != nil { - log.Warningf(ca.Ctx, "error closing event consumer: %s", err) + log.Warningf(ca.Ctx(), "error closing event consumer: %s", err) } } if ca.sink != nil { if err := ca.sink.Close(); err != nil { - log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) + log.Warningf(ca.Ctx(), `error closing sink. goroutines may have leaked: %v`, err) } } - ca.memAcc.Close(ca.Ctx) + ca.memAcc.Close(ca.Ctx()) if ca.kvFeedMemMon != nil { - ca.kvFeedMemMon.Stop(ca.Ctx) + ca.kvFeedMemMon.Stop(ca.Ctx()) } - ca.MemMonitor.Stop(ca.Ctx) + ca.MemMonitor.Stop(ca.Ctx()) ca.InternalClose() } @@ -505,7 +505,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // kvFeed, sends off this event to the event consumer, and flushes the sink // if necessary. func (ca *changeAggregator) tick() error { - event, err := ca.eventProducer.Get(ca.Ctx) + event, err := ca.eventProducer.Get(ca.Ctx()) if err != nil { return err } @@ -520,16 +520,16 @@ func (ca *changeAggregator) tick() error { ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds()) } ca.recentKVCount++ - return ca.eventConsumer.ConsumeEvent(ca.Ctx, event) + return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event) case kvevent.TypeResolved: a := event.DetachAlloc() - a.Release(ca.Ctx) + a.Release(ca.Ctx()) resolved := event.Resolved() if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: - return ca.sink.Flush(ca.Ctx) + return ca.sink.Flush(ca.Ctx()) } return nil @@ -572,7 +572,7 @@ func (ca *changeAggregator) flushFrontier() error { // otherwise, we could lose buffered messages and violate the // at-least-once guarantee. This is also true for checkpointing the // resolved spans in the job progress. - if err := ca.sink.Flush(ca.Ctx); err != nil { + if err := ca.sink.Flush(ca.Ctx()); err != nil { return err } @@ -1000,11 +1000,11 @@ func (cf *changeFrontier) close() { } if cf.sink != nil { if err := cf.sink.Close(); err != nil { - log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err) + log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err) } } - cf.memAcc.Close(cf.Ctx) - cf.MemMonitor.Stop(cf.Ctx) + cf.memAcc.Close(cf.Ctx()) + cf.MemMonitor.Stop(cf.Ctx()) } } @@ -1108,7 +1108,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error { // job progress update closure, but it currently doesn't pass along the info // we'd need to do it that way. if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) { - logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV, + logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV, `got a span level timestamp %s for %s that is less than the initial high-water %s`, redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart)) continue @@ -1210,7 +1210,7 @@ func (cf *changeFrontier) maybeCheckpointJob( if err != nil { return false, err } - cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) + cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart)) return updated, nil } @@ -1228,7 +1228,7 @@ func (cf *changeFrontier) checkpointJobProgress( var updateSkipped error if cf.js.job != nil { - if err := cf.js.job.Update(cf.Ctx, nil, func( + if err := cf.js.job.Update(cf.Ctx(), nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { // If we're unable to update the job due to the job state, such as during @@ -1253,8 +1253,8 @@ func (cf *changeFrontier) checkpointJobProgress( if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { timestampManager = cf.deprecatedManageProtectedTimestamps } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { - log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) + if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil { + log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err) return err } @@ -1282,7 +1282,7 @@ func (cf *changeFrontier) checkpointJobProgress( } if updateSkipped != nil { - log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped) + log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped) return false, nil } @@ -1381,7 +1381,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { if !shouldEmit { return nil } - if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil { + if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil { return err } cf.lastEmitResolved = newResolved.GoTime() @@ -1420,13 +1420,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) { description = fmt.Sprintf("job %d", cf.spec.JobID) } if frontierChanged && cf.slowLogEveryN.ShouldProcess(now) { - log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s", + log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s", description, frontier, resolvedBehind) } if cf.slowLogEveryN.ShouldProcess(now) { s := cf.frontier.PeekFrontierSpan() - log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind) + log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind) } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 9ba472fe4e5e..e6a949bebcbe 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -297,13 +297,13 @@ func (sf *streamIngestionFrontier) Next() ( if err := sf.maybeUpdatePartitionProgress(); err != nil { // Updating the partition progress isn't a fatal error. - log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err) + log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err) } // Send back a row to the job so that it can update the progress. select { - case <-sf.Ctx.Done(): - sf.MoveToDraining(sf.Ctx.Err()) + case <-sf.Ctx().Done(): + sf.MoveToDraining(sf.Ctx().Err()) return nil, sf.DrainHelper() // Send the latest persisted highwater in the heartbeat to the source cluster // as even with retries we will never request an earlier row than it, and @@ -314,7 +314,7 @@ func (sf *streamIngestionFrontier) Next() ( case <-sf.heartbeatSender.stoppedChan: err := sf.heartbeatSender.wait() if err != nil { - log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err) + log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err) } sf.MoveToDraining(err) return nil, sf.DrainHelper() @@ -325,7 +325,7 @@ func (sf *streamIngestionFrontier) Next() ( func (sf *streamIngestionFrontier) close() { if err := sf.heartbeatSender.stop(); err != nil { - log.Errorf(sf.Ctx, "heartbeat sender exited with error: %s", err) + log.Errorf(sf.Ctx(), "heartbeat sender exited with error: %s", err) } if sf.InternalClose() { sf.metrics.RunningCount.Dec(1) @@ -391,7 +391,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps( // maybeUpdatePartitionProgress polls the frontier and updates the job progress with // partition-specific information to track the status of each partition. func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { - ctx := sf.Ctx + ctx := sf.Ctx() updateFreq := JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV) if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq { return nil diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 5f399d2b1ee1..dc85085807b1 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -404,10 +404,10 @@ func (sip *streamIngestionProcessor) close() { } for _, client := range sip.streamPartitionClients { - _ = client.Close(sip.Ctx) + _ = client.Close(sip.Ctx()) } if sip.batcher != nil { - sip.batcher.Close(sip.Ctx) + sip.batcher.Close(sip.Ctx()) } if sip.maxFlushRateTimer != nil { sip.maxFlushRateTimer.Stop() @@ -538,7 +538,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.RunAfterReceivingEvent != nil { - if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx); err != nil { + if err := streamingKnobs.RunAfterReceivingEvent(sip.Ctx()); err != nil { return nil, err } } @@ -602,7 +602,7 @@ func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) er // careful with checkpoints: we can only send checkpoint whose TS >= SST batch TS // after the full SST gets ingested. - _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst") + _, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-sst") defer sp.Finish() return streamingccl.ScanSST(sst, sst.Span, func(keyVal storage.MVCCKeyValue) error { @@ -652,7 +652,7 @@ func (sip *streamIngestionProcessor) bufferDelRange(delRange *roachpb.RangeFeedD func (sip *streamIngestionProcessor) bufferRangeKeyVal( rangeKeyVal storage.MVCCRangeKeyValue, ) error { - _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-range-key") + _, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-buffer-range-key") defer sp.Finish() var err error @@ -790,7 +790,7 @@ func (r *rangeKeyBatcher) reset() { } func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { - ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush") + ctx, sp := tracing.ChildSpan(sip.Ctx(), "stream-ingestion-flush") defer sp.Finish() flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)} diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 1db4bc8099be..d578edfac314 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -58,6 +58,7 @@ type Columnarizer struct { colexecop.NonExplainable mode columnarizerMode + initialized bool helper colmem.SetAccountingHelper metadataAllocator *colmem.Allocator input execinfra.RowSource @@ -171,13 +172,10 @@ func newColumnarizer( // Init is part of the colexecop.Operator interface. func (c *Columnarizer) Init(ctx context.Context) { - if c.removedFromFlow { - return - } - if c.Ctx != nil { - // Init has already been called. + if c.removedFromFlow || c.initialized { return } + c.initialized = true c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1) ctx = c.StartInternalNoSpan(ctx) c.input.Start(ctx) @@ -270,7 +268,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { // When this method returns, we no longer will have the reference to the // metadata, so we can release all memory from the metadata allocator. defer c.metadataAllocator.ReleaseAll() - if c.Ctx == nil { + if !c.initialized { // The columnarizer wasn't initialized, so the wrapped processors might // not have been started leaving them in an unsafe to drain state, so // we skip the draining. Mostly likely this happened because a panic was @@ -301,7 +299,7 @@ func (c *Columnarizer) Close(context.Context) error { func (c *Columnarizer) trailingMetaCallback() []execinfrapb.ProducerMetadata { // Close will call InternalClose(). Note that we don't return any trailing // metadata here because the columnarizers propagate it in DrainMeta. - if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { + if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil { // Close never returns an error. colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) } diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 6cdf9a2711e3..cfbd462dadc0 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -352,18 +352,10 @@ func (m *Materializer) close() { if m.Closed { return } - if m.Ctx == nil { - // In some edge cases (like when Init of an operator above this - // materializer encounters a panic), the materializer might never be - // started, yet it still will attempt to close its Closers. This - // context is only used for logging purposes, so it is ok to grab - // the background context in order to prevent a NPE below. - m.Ctx = context.Background() - } // Make sure to call InternalClose() only after closing the closers - this // allows the closers to utilize the unfinished tracing span (if tracing is // enabled). - m.closers.CloseAndLogOnErr(m.Ctx, "materializer") + m.closers.CloseAndLogOnErr(m.Ctx(), "materializer") m.InternalClose() } diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index e92b23f62605..38636ebf65f2 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -357,10 +357,10 @@ type ProcessorBaseNoHelper struct { // has been closed. Closed bool - // Ctx and span contain the tracing state while the processor is active + // ctx and span contain the tracing state while the processor is active // (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise // used). - Ctx context.Context + ctx context.Context span *tracing.Span // origCtx is the context from which ctx was derived. InternalClose() resets // ctx to this. @@ -518,7 +518,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { // not permitted. if err != nil { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "MoveToDraining called in state %s with err: %+v", pb.State, err) @@ -548,7 +548,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { func (pb *ProcessorBaseNoHelper) DrainHelper() *execinfrapb.ProducerMetadata { if pb.State == StateRunning { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "drain helper called in StateRunning", ) @@ -659,7 +659,7 @@ func (pb *ProcessorBase) HijackExecStatsForTrace() func() *execinfrapb.Component func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { if pb.State == StateTrailingMeta || pb.State == StateExhausted { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "moveToTrailingMeta called in state: %s", pb.State, @@ -682,10 +682,10 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { } } - if buildutil.CrdbTestBuild && pb.Ctx == nil { + if buildutil.CrdbTestBuild && pb.ctx == nil { panic( errors.AssertionFailedf( - "unexpected nil ProcessorBase.Ctx when draining. Was StartInternal called?", + "unexpected nil ProcessorBase.ctx when draining. Was StartInternal called?", ), ) } @@ -709,7 +709,7 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { // should continue processing other rows, with the awareness that the processor // might have been transitioned to the draining phase. func (pb *ProcessorBase) ProcessRowHelper(row rowenc.EncDatumRow) rowenc.EncDatumRow { - outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx, row) + outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx(), row) if err != nil { pb.MoveToDraining(err) return nil @@ -731,7 +731,7 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) { panic("processor output is not set for emitting rows") } pb.self.Start(ctx) - Run(pb.Ctx, pb.self, pb.Output) + Run(pb.ctx, pb.self, pb.Output) } // ProcStateOpts contains fields used by the ProcessorBase's family of functions @@ -872,16 +872,25 @@ func (pb *ProcessorBaseNoHelper) startImpl( ) context.Context { pb.origCtx = ctx if createSpan { - pb.Ctx, pb.span = ProcessorSpan(ctx, spanName) + pb.ctx, pb.span = ProcessorSpan(ctx, spanName) if pb.span != nil && pb.span.IsVerbose() { pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String())) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID))) } } else { - pb.Ctx = ctx + pb.ctx = ctx } - pb.EvalCtx.Context = pb.Ctx - return pb.Ctx + pb.EvalCtx.Context = pb.ctx + return pb.ctx +} + +// Ctx is an accessor method for ctx which is guaranteed to return non-nil +// context even if StartInternal() hasn't been called. +func (pb *ProcessorBaseNoHelper) Ctx() context.Context { + if pb.ctx == nil { + return context.Background() + } + return pb.ctx } // InternalClose helps processors implement the RowSource interface, performing @@ -938,7 +947,7 @@ func (pb *ProcessorBaseNoHelper) InternalCloseEx(onClose func()) bool { pb.span = nil // Reset the context so that any incidental uses after this point do not // access the finished span. - pb.Ctx = pb.origCtx + pb.ctx = pb.origCtx pb.EvalCtx.Context = pb.origCtx return true } diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index f742d31be420..2146bf7e8b23 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -345,17 +345,17 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) { func (ag *hashAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") // If we have started emitting rows, bucketsIter will represent which // buckets are still open, since buckets are closed once their results are // emitted. if ag.bucketsIter == nil { for _, bucket := range ag.buckets { - bucket.close(ag.Ctx) + bucket.close(ag.Ctx()) } } else { for _, bucket := range ag.bucketsIter { - ag.buckets[bucket].close(ag.Ctx) + ag.buckets[bucket].close(ag.Ctx()) } } // Make sure to release any remaining memory under 'buckets'. @@ -364,25 +364,25 @@ func (ag *hashAggregator) close() { // buckets since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the buckets. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } func (ag *orderedAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") if ag.bucket != nil { - ag.bucket.close(ag.Ctx) + ag.bucket.close(ag.Ctx()) } // Note that we should be closing accounts only after closing the // bucket since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the bucket. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } @@ -420,7 +420,7 @@ func (ag *hashAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -457,7 +457,7 @@ func (ag *hashAggregator) accumulateRows() ( // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(len(ag.buckets))*memsize.String); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(len(ag.buckets))*memsize.String); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -489,7 +489,7 @@ func (ag *orderedAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -533,7 +533,7 @@ func (ag *orderedAggregator) accumulateRows() ( func (ag *aggregatorBase) getAggResults( bucket aggregateFuncs, ) (aggregatorState, rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - defer bucket.close(ag.Ctx) + defer bucket.close(ag.Ctx()) for i, b := range bucket { result, err := b.Result() @@ -579,16 +579,16 @@ func (ag *hashAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } // Before we create a new 'buckets' map below, we need to "release" the // already accounted for memory of the current map. - ag.bucketsAcc.Shrink(ag.Ctx, int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - ag.bucketsAcc.Shrink(ag.Ctx, int64(len(ag.buckets))*memsize.String) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(len(ag.buckets))*memsize.String) ag.bucketsIter = nil ag.buckets = make(map[string]aggregateFuncs) ag.bucketsLenGrowThreshold = hashAggregatorBucketsInitialLen @@ -655,7 +655,7 @@ func (ag *orderedAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -693,7 +693,7 @@ func (ag *hashAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -715,7 +715,7 @@ func (ag *orderedAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -780,7 +780,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( canAdd := true if a.Distinct { canAdd, err = ag.funcs[i].isDistinct( - ag.Ctx, + ag.Ctx(), &ag.datumAlloc, groupKey, firstArg, @@ -793,7 +793,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( if !canAdd { continue } - if err := bucket[i].Add(ag.Ctx, firstArg, otherArgs...); err != nil { + if err := bucket[i].Add(ag.Ctx(), firstArg, otherArgs...); err != nil { return err } } @@ -811,7 +811,7 @@ func (ag *hashAggregator) encode( // used by the aggregate functions (and accounted for accordingly), // this can lead to over-accounting which is acceptable. appendTo, err = row[colIdx].Fingerprint( - ag.Ctx, ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, + ag.Ctx(), ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, ) if err != nil { return appendTo, err @@ -837,7 +837,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { bucket, ok := ag.buckets[string(encoded)] if !ok { - s, err := ag.arena.AllocBytes(ag.Ctx, encoded) + s, err := ag.arena.AllocBytes(ag.Ctx(), encoded) if err != nil { return err } @@ -848,7 +848,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { ag.buckets[s] = bucket if len(ag.buckets) == ag.bucketsLenGrowThreshold { toAccountFor := ag.bucketsLenGrowThreshold - ag.alreadyAccountedFor - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { return err } ag.alreadyAccountedFor = ag.bucketsLenGrowThreshold @@ -948,13 +948,13 @@ func (a *aggregateFuncHolder) isDistinct( } func (ag *aggregatorBase) createAggregateFuncs() (aggregateFuncs, error) { - if err := ag.bucketsAcc.Grow(ag.Ctx, sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { return nil, err } bucket := make(aggregateFuncs, len(ag.funcs)) for i, f := range ag.funcs { agg := f.create(ag.EvalCtx, f.arguments) - if err := ag.bucketsAcc.Grow(ag.Ctx, agg.Size()); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), agg.Size()); err != nil { return nil, err } bucket[i] = agg diff --git a/pkg/sql/rowexec/countrows.go b/pkg/sql/rowexec/countrows.go index 675542f6b7cd..5fe23396a1bf 100644 --- a/pkg/sql/rowexec/countrows.go +++ b/pkg/sql/rowexec/countrows.go @@ -86,7 +86,7 @@ func (ag *countAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMeta if row == nil { ret := make(rowenc.EncDatumRow, 1) ret[0] = rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(ag.count))} - rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx, ret) + rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx(), ret) // We're done as soon as we process our one output row, so we // transition into draining state. We will, however, return non-nil // error (if such occurs during rendering) separately below. diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 41a75656f563..d63dfe23f7f1 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -189,7 +189,7 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro // the references to the row (and to the newly allocated datums) // shortly, it'll likely take some time before GC reclaims that memory, // so we choose the over-accounting route to be safe. - appendTo, err = datum.Fingerprint(d.Ctx, d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) + appendTo, err = datum.Fingerprint(d.Ctx(), d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) if err != nil { return nil, err } @@ -212,8 +212,8 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro func (d *distinct) close() { if d.InternalClose() { - d.memAcc.Close(d.Ctx) - d.MemMonitor.Stop(d.Ctx) + d.memAcc.Close(d.Ctx()) + d.MemMonitor.Stop(d.Ctx()) } } @@ -258,7 +258,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { // allocated on it, which implies that UnsafeReset() is safe to call here. copy(d.lastGroupKey, row) d.haveLastGroupKey = true - if err := d.arena.UnsafeReset(d.Ctx); err != nil { + if err := d.arena.UnsafeReset(d.Ctx()); err != nil { d.MoveToDraining(err) break } @@ -282,7 +282,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { } continue } - s, err := d.arena.AllocBytes(d.Ctx, encoding) + s, err := d.arena.AllocBytes(d.Ctx(), encoding) if err != nil { d.MoveToDraining(err) break diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index fc6a8685d261..62c9b6c97b04 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -157,7 +157,7 @@ func newHashJoiner( } return h, h.hashTable.Init( - h.Ctx, + h.Ctx(), shouldMarkRightSide(h.joinType), h.rightSource.OutputTypes(), h.eqCols[rightSide], @@ -189,7 +189,7 @@ func (h *hashJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) case hjEmittingRightUnmatched: h.runningState, row, meta = h.emitRightUnmatched() default: - log.Fatalf(h.Ctx, "unsupported state: %d", h.runningState) + log.Fatalf(h.Ctx(), "unsupported state: %d", h.runningState) } if row == nil && meta == nil { @@ -235,14 +235,14 @@ func (h *hashJoiner) build() (hashJoinerState, rowenc.EncDatumRow, *execinfrapb. return hjStateUnknown, nil, h.DrainHelper() } // If hashTable is in-memory, pre-reserve the memory needed to mark. - if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx); err != nil { + if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } return hjReadingLeftSide, nil, nil } - err = h.hashTable.AddRow(h.Ctx, row) + err = h.hashTable.AddRow(h.Ctx(), row) // Regardless of the underlying row container (disk backed or in-memory // only), we cannot do anything about an error if it occurs. if err != nil { @@ -276,7 +276,7 @@ func (h *hashJoiner) readLeftSide() ( // hjEmittingRightUnmatched if unmatched rows on the right side need to // be emitted, otherwise finish. if shouldEmitUnmatchedRow(rightSide, h.joinType) { - i := h.hashTable.NewUnmarkedIterator(h.Ctx) + i := h.hashTable.NewUnmarkedIterator(h.Ctx()) i.Rewind() h.emittingRightUnmatchedState.iter = i return hjEmittingRightUnmatched, nil, nil @@ -290,14 +290,14 @@ func (h *hashJoiner) readLeftSide() ( h.probingRowState.row = row h.probingRowState.matched = false if h.probingRowState.iter == nil { - i, err := h.hashTable.NewBucketIterator(h.Ctx, row, h.eqCols[leftSide]) + i, err := h.hashTable.NewBucketIterator(h.Ctx(), row, h.eqCols[leftSide]) if err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } h.probingRowState.iter = i } else { - if err := h.probingRowState.iter.Reset(h.Ctx, row); err != nil { + if err := h.probingRowState.iter.Reset(h.Ctx(), row); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -358,7 +358,7 @@ func (h *hashJoiner) probeRow() ( h.probingRowState.matched = true shouldEmit := true if shouldMarkRightSide(h.joinType) { - if i.IsMarked(h.Ctx) { + if i.IsMarked(h.Ctx()) { switch h.joinType { case descpb.RightSemiJoin: // The row from the right already had a match and was emitted @@ -375,7 +375,7 @@ func (h *hashJoiner) probeRow() ( // whether we have a corresponding unmarked row from the right. h.probingRowState.matched = false } - } else if err := i.Mark(h.Ctx); err != nil { + } else if err := i.Mark(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -448,16 +448,16 @@ func (h *hashJoiner) emitRightUnmatched() ( func (h *hashJoiner) close() { if h.InternalClose() { - h.hashTable.Close(h.Ctx) + h.hashTable.Close(h.Ctx()) if h.probingRowState.iter != nil { h.probingRowState.iter.Close() } if h.emittingRightUnmatchedState.iter != nil { h.emittingRightUnmatchedState.iter.Close() } - h.MemMonitor.Stop(h.Ctx) + h.MemMonitor.Stop(h.Ctx()) if h.diskMonitor != nil { - h.diskMonitor.Stop(h.Ctx) + h.diskMonitor.Stop(h.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 7cc464e4c762..c50cbcdddc25 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -169,7 +169,7 @@ func (ifr *invertedFilterer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case ifrEmittingRows: ifr.runningState, row, meta = ifr.emitRow() default: - log.Fatalf(ifr.Ctx, "unsupported state: %d", ifr.runningState) + log.Fatalf(ifr.Ctx(), "unsupported state: %d", ifr.runningState) } if row == nil && meta == nil { continue @@ -194,9 +194,9 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr return ifrReadingInput, meta } if row == nil { - log.VEventf(ifr.Ctx, 1, "no more input rows") + log.VEventf(ifr.Ctx(), 1, "no more input rows") evalResult := ifr.invertedEval.evaluate() - ifr.rc.SetupForRead(ifr.Ctx, evalResult) + ifr.rc.SetupForRead(ifr.Ctx(), evalResult) // invertedEval had a single expression in the batch, and the results // for that expression are in evalResult[0]. ifr.evalResult = evalResult[0] @@ -245,7 +245,7 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr // evaluator. copy(ifr.keyRow, row[:ifr.invertedColIdx]) copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:]) - keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow) + keyIndex, err := ifr.rc.AddRow(ifr.Ctx(), ifr.keyRow) if err != nil { ifr.MoveToDraining(err) return ifrStateUnknown, ifr.DrainHelper() @@ -273,11 +273,11 @@ func (ifr *invertedFilterer) emitRow() ( } if ifr.resultIdx >= len(ifr.evalResult) { // We are done emitting all rows. - return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx)) + return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx())) } curRowIdx := ifr.resultIdx ifr.resultIdx++ - keyRow, err := ifr.rc.GetRow(ifr.Ctx, ifr.evalResult[curRowIdx], false /* skip */) + keyRow, err := ifr.rc.GetRow(ifr.Ctx(), ifr.evalResult[curRowIdx], false /* skip */) if err != nil { return drainFunc(err) } @@ -301,12 +301,12 @@ func (ifr *invertedFilterer) ConsumerClosed() { func (ifr *invertedFilterer) close() { if ifr.InternalClose() { - ifr.rc.Close(ifr.Ctx) + ifr.rc.Close(ifr.Ctx()) if ifr.MemMonitor != nil { - ifr.MemMonitor.Stop(ifr.Ctx) + ifr.MemMonitor.Stop(ifr.Ctx()) } if ifr.diskMonitor != nil { - ifr.diskMonitor.Stop(ifr.Ctx) + ifr.diskMonitor.Stop(ifr.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 2dc24f211e7b..a946d0e8947b 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -385,7 +385,7 @@ func (ij *invertedJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case ijEmittingRows: ij.runningState, row, meta = ij.emitRow() default: - log.Fatalf(ij.Ctx, "unsupported state: %d", ij.runningState) + log.Fatalf(ij.Ctx(), "unsupported state: %d", ij.runningState) } if row == nil && meta == nil { continue @@ -416,7 +416,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce break } - expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx, row) + expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx(), row) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -466,12 +466,12 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } if len(ij.inputRows) == 0 { - log.VEventf(ij.Ctx, 1, "no more input rows") + log.VEventf(ij.Ctx(), 1, "no more input rows") // We're done. ij.MoveToDraining(nil) return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "read %d input rows", len(ij.inputRows)) + log.VEventf(ij.Ctx(), 1, "read %d input rows", len(ij.inputRows)) spans, err := ij.batchedExprEval.init() if err != nil { @@ -495,9 +495,9 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "scanning %d spans", len(ij.indexSpans)) + log.VEventf(ij.Ctx(), 1, "scanning %d spans", len(ij.indexSpans)) if err = ij.fetcher.StartScan( - ij.Ctx, ij.indexSpans, nil, /* spanIDs */ + ij.Ctx(), ij.indexSpans, nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, ); err != nil { ij.MoveToDraining(err) @@ -508,11 +508,11 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.ProducerMetadata) { - log.VEventf(ij.Ctx, 1, "joining rows") + log.VEventf(ij.Ctx(), 1, "joining rows") // Read the entire set of rows that are part of the scan. for { // Fetch the next row and copy it into the row container. - fetchedRow, _, err := ij.fetcher.NextRow(ij.Ctx) + fetchedRow, _, err := ij.fetcher.NextRow(ij.Ctx()) if err != nil { ij.MoveToDraining(scrub.UnwrapScrubError(err)) return ijStateUnknown, ij.DrainHelper() @@ -557,7 +557,7 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ return ijStateUnknown, ij.DrainHelper() } if shouldAdd { - rowIdx, err := ij.indexRows.AddRow(ij.Ctx, fetchedRow) + rowIdx, err := ij.indexRows.AddRow(ij.Ctx(), fetchedRow) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -569,8 +569,8 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ } } ij.joinedRowIdx = ij.batchedExprEval.evaluate() - ij.indexRows.SetupForRead(ij.Ctx, ij.joinedRowIdx) - log.VEventf(ij.Ctx, 1, "done evaluating expressions") + ij.indexRows.SetupForRead(ij.Ctx(), ij.joinedRowIdx) + log.VEventf(ij.Ctx(), 1, "done evaluating expressions") return ijEmittingRows, nil } @@ -587,7 +587,7 @@ func (ij *invertedJoiner) emitRow() ( ) { // Finished processing the batch. if ij.emitCursor.inputRowIdx >= len(ij.joinedRowIdx) { - log.VEventf(ij.Ctx, 1, "done emitting rows") + log.VEventf(ij.Ctx(), 1, "done emitting rows") // Ready for another input batch. Reset state. ij.inputRows = ij.inputRows[:0] ij.batchedExprEval.reset() @@ -595,7 +595,7 @@ func (ij *invertedJoiner) emitRow() ( ij.emitCursor.outputRowIdx = 0 ij.emitCursor.inputRowIdx = 0 ij.emitCursor.seenMatch = false - if err := ij.indexRows.UnsafeReset(ij.Ctx); err != nil { + if err := ij.indexRows.UnsafeReset(ij.Ctx()); err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() } @@ -625,7 +625,7 @@ func (ij *invertedJoiner) emitRow() ( inputRow := ij.inputRows[ij.emitCursor.inputRowIdx] joinedRowIdx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - indexedRow, err := ij.indexRows.GetRow(ij.Ctx, joinedRowIdx, false /* skip */) + indexedRow, err := ij.indexRows.GetRow(ij.Ctx(), joinedRowIdx, false /* skip */) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() @@ -639,7 +639,7 @@ func (ij *invertedJoiner) emitRow() ( skipRemaining := func() error { for ; ij.emitCursor.outputRowIdx < len(ij.joinedRowIdx[ij.emitCursor.inputRowIdx]); ij.emitCursor.outputRowIdx++ { idx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - if _, err := ij.indexRows.GetRow(ij.Ctx, idx, true /* skip */); err != nil { + if _, err := ij.indexRows.GetRow(ij.Ctx(), idx, true /* skip */); err != nil { return err } } @@ -740,14 +740,14 @@ func (ij *invertedJoiner) ConsumerClosed() { func (ij *invertedJoiner) close() { if ij.InternalClose() { if ij.fetcher != nil { - ij.fetcher.Close(ij.Ctx) + ij.fetcher.Close(ij.Ctx()) } if ij.indexRows != nil { - ij.indexRows.Close(ij.Ctx) + ij.indexRows.Close(ij.Ctx()) } - ij.MemMonitor.Stop(ij.Ctx) + ij.MemMonitor.Stop(ij.Ctx()) if ij.diskMonitor != nil { - ij.diskMonitor.Stop(ij.Ctx) + ij.diskMonitor.Stop(ij.Ctx()) } } } @@ -762,14 +762,14 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - ij.scanStats = execstats.GetScanStats(ij.Ctx, ij.ExecStatsTrace) + ij.scanStats = execstats.GetScanStats(ij.Ctx(), ij.ExecStatsTrace) ret := execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(ij.fetcher.GetBytesRead())), TuplesRead: fis.NumTuples, KVTime: fis.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(ij.Ctx(), ij.ExecStatsTrace)), BatchRequestsIssued: optional.MakeUint(uint64(ij.fetcher.GetBatchRequestsIssued())), }, Exec: execinfrapb.ExecStats{ @@ -788,7 +788,7 @@ func (ij *invertedJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = ij.fetcher.GetBytesRead() meta.Metrics.RowsRead = ij.rowsRead - if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx, ij.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx(), ij.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 22f6295e2cca..883ba873d5c7 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -738,7 +738,7 @@ func (jr *joinReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) meta = jr.DrainHelper() jr.runningState = jrStateUnknown default: - log.Fatalf(jr.Ctx, "unsupported state: %d", jr.runningState) + log.Fatalf(jr.Ctx(), "unsupported state: %d", jr.runningState) } if row == nil && meta == nil { continue @@ -915,7 +915,7 @@ func (jr *joinReader) readInput() ( } if len(jr.scratchInputRows) == 0 { - log.VEventf(jr.Ctx, 1, "no more input rows") + log.VEventf(jr.Ctx(), 1, "no more input rows") if outRow != nil { return jrReadyToDrain, outRow, nil } @@ -923,7 +923,7 @@ func (jr *joinReader) readInput() ( jr.MoveToDraining(nil) return jrStateUnknown, nil, jr.DrainHelper() } - log.VEventf(jr.Ctx, 1, "read %d input rows", len(jr.scratchInputRows)) + log.VEventf(jr.Ctx(), 1, "read %d input rows", len(jr.scratchInputRows)) if jr.groupingState != nil && len(jr.scratchInputRows) > 0 { jr.updateGroupingStateForNonEmptyBatch() @@ -979,7 +979,7 @@ func (jr *joinReader) readInput() ( } } - log.VEventf(jr.Ctx, 1, "scanning %d spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d spans", len(spans)) // Note that the fetcher takes ownership of the spans slice - it will modify // it and perform the memory accounting. We don't care about the // modification here, but we want to be conscious about the memory @@ -997,7 +997,7 @@ func (jr *joinReader) readInput() ( } } if err = jr.fetcher.StartScan( - jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1010,7 +1010,7 @@ func (jr *joinReader) readInput() ( func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMetadata) { for { // Fetch the next row and tell the strategy to process it. - lookedUpRow, spanID, err := jr.fetcher.NextRow(jr.Ctx) + lookedUpRow, spanID, err := jr.fetcher.NextRow(jr.Ctx()) if err != nil { jr.MoveToDraining(scrub.UnwrapScrubError(err)) return jrStateUnknown, jr.DrainHelper() @@ -1022,7 +1022,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet jr.rowsRead++ jr.curBatchRowsRead++ - if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx, lookedUpRow, spanID); err != nil { + if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx(), lookedUpRow, spanID); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() } else if nextState != jrPerformingLookup { @@ -1055,13 +1055,13 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet sortSpans(spans, spanIDs) } - log.VEventf(jr.Ctx, 1, "scanning %d remote spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans)) bytesLimit := rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) if !jr.shouldLimitBatches { bytesLimit = rowinfra.NoBytesLimit } if err := jr.fetcher.StartScan( - jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() @@ -1070,8 +1070,8 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet } } - log.VEvent(jr.Ctx, 1, "done joining rows") - jr.strategy.prepareToEmit(jr.Ctx) + log.VEvent(jr.Ctx(), 1, "done joining rows") + jr.strategy.prepareToEmit(jr.Ctx()) // Check if the strategy spilled to disk and reduce the batch size if it // did. @@ -1102,7 +1102,7 @@ func (jr *joinReader) emitRow() ( rowenc.EncDatumRow, *execinfrapb.ProducerMetadata, ) { - rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx) + rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx()) if err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1134,26 +1134,26 @@ func (jr *joinReader) ConsumerClosed() { func (jr *joinReader) close() { if jr.InternalClose() { if jr.fetcher != nil { - jr.fetcher.Close(jr.Ctx) + jr.fetcher.Close(jr.Ctx()) } if jr.usesStreamer { - jr.streamerInfo.budgetAcc.Close(jr.Ctx) - jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx) - jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx) + jr.streamerInfo.budgetAcc.Close(jr.Ctx()) + jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx()) + jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx()) if jr.streamerInfo.diskMonitor != nil { - jr.streamerInfo.diskMonitor.Stop(jr.Ctx) + jr.streamerInfo.diskMonitor.Stop(jr.Ctx()) } } - jr.strategy.close(jr.Ctx) - jr.memAcc.Close(jr.Ctx) + jr.strategy.close(jr.Ctx()) + jr.memAcc.Close(jr.Ctx()) if jr.limitedMemMonitor != nil { - jr.limitedMemMonitor.Stop(jr.Ctx) + jr.limitedMemMonitor.Stop(jr.Ctx()) } if jr.MemMonitor != nil { - jr.MemMonitor.Stop(jr.Ctx) + jr.MemMonitor.Stop(jr.Ctx()) } if jr.diskMonitor != nil { - jr.diskMonitor.Stop(jr.Ctx) + jr.diskMonitor.Stop(jr.Ctx()) } } } @@ -1169,14 +1169,14 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats { return nil } - jr.scanStats = execstats.GetScanStats(jr.Ctx, jr.ExecStatsTrace) + jr.scanStats = execstats.GetScanStats(jr.Ctx(), jr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(jr.fetcher.GetBytesRead())), TuplesRead: fis.NumTuples, KVTime: fis.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(jr.Ctx(), jr.ExecStatsTrace)), BatchRequestsIssued: optional.MakeUint(uint64(jr.fetcher.GetBatchRequestsIssued())), }, Output: jr.OutputHelper.Stats(), @@ -1203,7 +1203,7 @@ func (jr *joinReader) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.RowsRead = jr.rowsRead meta.Metrics.BytesRead = jr.fetcher.GetBytesRead() - if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx, jr.txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx(), jr.txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index 657dfdcd9861..dcb42caf786a 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -187,7 +187,7 @@ func (s *joinReaderNoOrderingStrategy) generateRemoteSpans() (roachpb.Spans, []i return nil, nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) generatedRemoteSpans() bool { @@ -200,7 +200,7 @@ func (s *joinReaderNoOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false s.emitState.unmatchedInputRowIndicesInitialized = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) processLookedUpRow( @@ -320,13 +320,13 @@ func (s *joinReaderNoOrderingStrategy) spilled() bool { return false } func (s *joinReaderNoOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderNoOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderNoOrderingStrategy) close(ctx context.Context) { @@ -405,7 +405,7 @@ func (s *joinReaderIndexJoinStrategy) processLookupRows( rows []rowenc.EncDatumRow, ) (roachpb.Spans, []int, error) { s.inputRows = rows - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderIndexJoinStrategy) processLookedUpRow( @@ -435,13 +435,13 @@ func (s *joinReaderIndexJoinStrategy) spilled() bool { func (s *joinReaderIndexJoinStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderIndexJoinStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderIndexJoinStrategy) close(ctx context.Context) { @@ -592,7 +592,7 @@ func (s *joinReaderOrderingStrategy) generateRemoteSpans() (roachpb.Spans, []int return nil, nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) generatedRemoteSpans() bool { @@ -642,7 +642,7 @@ func (s *joinReaderOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) processLookedUpRow( @@ -807,7 +807,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( lookedUpRow = s.emitCursor.notBufferedRow } else { lookedUpRow, err = s.lookedUpRows.GetRow( - s.Ctx, lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ + s.Ctx(), lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */ ) } if err != nil { @@ -858,16 +858,16 @@ func (s *joinReaderOrderingStrategy) close(ctx context.Context) { func (s *joinReaderOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - if err := memAcc.Grow(s.Ctx, delta); err != nil { + if err := memAcc.Grow(s.Ctx(), delta); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } return nil } @@ -879,16 +879,16 @@ func (s *joinReaderOrderingStrategy) growMemoryAccount( func (s *joinReaderOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - if err := memAcc.Resize(s.Ctx, oldSz, newSz); err != nil { + if err := memAcc.Resize(s.Ctx(), oldSz, newSz); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } return nil } diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index 827215321b03..16c7b06484b3 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -232,7 +232,7 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada // TODO(paul): Investigate (with benchmarks) whether or not it's // worthwhile to only buffer one row from the right stream per batch // for semi-joins. - m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx, m.EvalCtx) + m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx(), m.EvalCtx) if meta != nil { return nil, meta } @@ -251,8 +251,8 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada func (m *mergeJoiner) close() { if m.InternalClose() { - m.streamMerger.close(m.Ctx) - m.MemMonitor.Stop(m.Ctx) + m.streamMerger.close(m.Ctx()) + m.MemMonitor.Stop(m.Ctx()) } } diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index b9126391bd97..07e7be0c6571 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -315,7 +315,8 @@ func TestAggregatorSpecAggregationEquals(t *testing.T) { func TestProcessorBaseContext(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() + // Use a custom context to distinguish it from the background one. + ctx := context.WithValue(context.Background(), struct{}{}, struct{}{}) st := cluster.MakeTestingClusterSettings() runTest := func(t *testing.T, f func(noop *noopProcessor)) { @@ -331,18 +332,22 @@ func TestProcessorBaseContext(t *testing.T) { if err != nil { t.Fatal(err) } + // Before Start we should get the background context. + if noop.Ctx() != context.Background() { + t.Fatalf("ProcessorBase.Ctx() didn't return the background context before Start") + } noop.Start(ctx) - origCtx := noop.Ctx + origCtx := noop.Ctx() // The context should be valid after Start but before Next is called in case // ConsumerDone or ConsumerClosed are called without calling Next. - if noop.Ctx == nil { + if noop.Ctx() == context.Background() { t.Fatalf("ProcessorBase.ctx not initialized") } f(noop) // The context should be reset after ConsumerClosed is called so that any // subsequent logging calls will not operate on closed spans. - if noop.Ctx != origCtx { + if noop.Ctx() != origCtx { t.Fatalf("ProcessorBase.ctx not reset on close") } } diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 9aacf6891448..370ad58449e0 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -159,7 +159,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // First, make sure to close its ValueGenerator from the previous // input row (if it exists). if ps.gens[i] != nil { - ps.gens[i].Close(ps.Ctx) + ps.gens[i].Close(ps.Ctx()) ps.gens[i] = nil } @@ -182,7 +182,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // Store the generator before Start so that it'll be closed even if // Start returns an error. ps.gens[i] = gen - if err := gen.Start(ps.Ctx, ps.FlowCtx.Txn); err != nil { + if err := gen.Start(ps.Ctx(), ps.FlowCtx.Txn); err != nil { return nil, nil, err } } @@ -203,7 +203,7 @@ func (ps *projectSetProcessor) nextGeneratorValues() (newValAvail bool, err erro numCols := int(ps.spec.NumColsPerGen[i]) if !ps.done[i] { // Yes; check whether this source still has some values available. - hasVals, err := gen.Next(ps.Ctx) + hasVals, err := gen.Next(ps.Ctx()) if err != nil { return false, err } @@ -314,7 +314,7 @@ func (ps *projectSetProcessor) close() { ps.InternalCloseEx(func() { for _, gen := range ps.gens { if gen != nil { - gen.Close(ps.Ctx) + gen.Close(ps.Ctx()) } } }) diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index cba01aacd3dc..67ebf28a1be5 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -205,9 +205,9 @@ func (s *sampleAggregator) Run(ctx context.Context) { func (s *sampleAggregator) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.tempMemAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.tempMemAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 365349fbeae9..c92e33208e5d 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -478,8 +478,8 @@ func (s *samplerProcessor) sampleRow( func (s *samplerProcessor) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 3d2bd1734b59..3fc9a882ad04 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -117,10 +117,10 @@ func (s *sorterBase) close() { if s.i != nil { s.i.Close() } - s.rows.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.rows.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) if s.diskMonitor != nil { - s.diskMonitor.Stop(s.Ctx) + s.diskMonitor.Stop(s.Ctx()) } } } @@ -250,13 +250,13 @@ func (s *sortAllProcessor) fill() (ok bool, _ error) { break } - if err := s.rows.AddRow(s.Ctx, row); err != nil { + if err := s.rows.AddRow(s.Ctx(), row); err != nil { return false, err } } - s.rows.Sort(s.Ctx) + s.rows.Sort(s.Ctx()) - s.i = s.rows.NewFinalIterator(s.Ctx) + s.i = s.rows.NewFinalIterator(s.Ctx()) s.i.Rewind() return true, nil } @@ -421,7 +421,7 @@ func newSortChunksProcessor( ); err != nil { return nil, err } - proc.i = proc.rows.NewFinalIterator(proc.Ctx) + proc.i = proc.rows.NewFinalIterator(proc.Ctx()) return proc, nil } @@ -448,7 +448,7 @@ func (s *sortChunksProcessor) chunkCompleted( // if a metadata record was encountered). The caller is expected to drain when // this returns false. func (s *sortChunksProcessor) fill() (bool, error) { - ctx := s.Ctx + ctx := s.Ctx() var meta *execinfrapb.ProducerMetadata @@ -518,7 +518,7 @@ func (s *sortChunksProcessor) Start(ctx context.Context) { // Next is part of the RowSource interface. func (s *sortChunksProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - ctx := s.Ctx + ctx := s.Ctx() for s.State == execinfra.StateRunning { ok, err := s.i.Valid() if err != nil { diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 2ea71b40f80b..40745a862d94 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -254,7 +254,7 @@ func TestingSetScannedRowProgressFrequency(val int64) func() { func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { for tr.State == execinfra.StateRunning { if !tr.scanStarted { - err := tr.startScan(tr.Ctx) + err := tr.startScan(tr.Ctx()) if err != nil { tr.MoveToDraining(err) break @@ -269,7 +269,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata return nil, meta } - row, _, err := tr.fetcher.NextRow(tr.Ctx) + row, _, err := tr.fetcher.NextRow(tr.Ctx()) if row == nil || err != nil { tr.MoveToDraining(err) break @@ -290,7 +290,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata func (tr *tableReader) close() { if tr.InternalClose() { if tr.fetcher != nil { - tr.fetcher.Close(tr.Ctx) + tr.fetcher.Close(tr.Ctx()) } } } @@ -306,13 +306,13 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - tr.scanStats = execstats.GetScanStats(tr.Ctx, tr.ExecStatsTrace) + tr.scanStats = execstats.GetScanStats(tr.Ctx(), tr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(tr.fetcher.GetBytesRead())), TuplesRead: is.NumTuples, KVTime: is.WaitTime, - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(tr.Ctx, tr.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(tr.Ctx(), tr.ExecStatsTrace)), BatchRequestsIssued: optional.MakeUint(uint64(tr.fetcher.GetBatchRequestsIssued())), }, Output: tr.OutputHelper.Stats(), @@ -326,13 +326,13 @@ func (tr *tableReader) generateMeta() []execinfrapb.ProducerMetadata { if !tr.ignoreMisplannedRanges { nodeID, ok := tr.FlowCtx.NodeID.OptionalNodeID() if ok { - ranges := execinfra.MisplannedRanges(tr.Ctx, tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) + ranges := execinfra.MisplannedRanges(tr.Ctx(), tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) if ranges != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{Ranges: ranges}) } } } - if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx, tr.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx(), tr.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index 773e4029166b..687c19ed549d 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -223,7 +223,7 @@ func (w *windower) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { case windowerEmittingRows: w.runningState, row, meta = w.emitRow() default: - log.Fatalf(w.Ctx, "unsupported state: %d", w.runningState) + log.Fatalf(w.Ctx(), "unsupported state: %d", w.runningState) } if row == nil && meta == nil { @@ -245,16 +245,16 @@ func (w *windower) close() { if w.allRowsIterator != nil { w.allRowsIterator.Close() } - w.allRowsPartitioned.Close(w.Ctx) + w.allRowsPartitioned.Close(w.Ctx()) if w.partition != nil { - w.partition.Close(w.Ctx) + w.partition.Close(w.Ctx()) } for _, builtin := range w.builtins { - builtin.Close(w.Ctx, w.EvalCtx) + builtin.Close(w.Ctx(), w.EvalCtx) } - w.acc.Close(w.Ctx) - w.MemMonitor.Stop(w.Ctx) - w.diskMonitor.Stop(w.Ctx) + w.acc.Close(w.Ctx()) + w.MemMonitor.Stop(w.Ctx()) + w.diskMonitor.Stop(w.Ctx()) } } @@ -278,17 +278,17 @@ func (w *windower) accumulateRows() ( return windowerAccumulating, nil, meta } if row == nil { - log.VEvent(w.Ctx, 1, "accumulation complete") + log.VEvent(w.Ctx(), 1, "accumulation complete") w.inputDone = true // We need to sort all the rows based on partitionBy columns so that all // rows belonging to the same hash bucket are contiguous. - w.allRowsPartitioned.Sort(w.Ctx) + w.allRowsPartitioned.Sort(w.Ctx()) break } // The underlying row container will decode all datums as necessary, so we // don't need to worry about that. - if err := w.allRowsPartitioned.AddRow(w.Ctx, row); err != nil { + if err := w.allRowsPartitioned.AddRow(w.Ctx(), row); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -311,7 +311,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr return windowerStateUnknown, nil, w.DrainHelper() } - if err := w.computeWindowFunctions(w.Ctx, w.EvalCtx); err != nil { + if err := w.computeWindowFunctions(w.Ctx(), w.EvalCtx); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -341,7 +341,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr func (w *windower) spillAllRowsToDisk() error { if w.allRowsPartitioned != nil { if !w.allRowsPartitioned.UsingDisk() { - if err := w.allRowsPartitioned.SpillToDisk(w.Ctx); err != nil { + if err := w.allRowsPartitioned.SpillToDisk(w.Ctx()); err != nil { return err } } else { @@ -349,7 +349,7 @@ func (w *windower) spillAllRowsToDisk() error { // w.partition if possible. if w.partition != nil { if !w.partition.UsingDisk() { - if err := w.partition.SpillToDisk(w.Ctx); err != nil { + if err := w.partition.SpillToDisk(w.Ctx()); err != nil { return err } } @@ -363,12 +363,12 @@ func (w *windower) spillAllRowsToDisk() error { // error, it forces all rows to spill and attempts to grow acc by usage // one more time. func (w *windower) growMemAccount(acc *mon.BoundAccount, usage int64) error { - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { if sqlerrors.IsOutOfMemoryError(err) { if err := w.spillAllRowsToDisk(); err != nil { return err } - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { return err } } else { @@ -694,7 +694,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con } } } - if err := w.partition.AddRow(w.Ctx, row); err != nil { + if err := w.partition.AddRow(w.Ctx(), row); err != nil { return err } } @@ -708,7 +708,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *eval.Con func (w *windower) populateNextOutputRow() (bool, error) { if w.partitionIdx < len(w.partitionSizes) { if w.allRowsIterator == nil { - w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx) + w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx()) w.allRowsIterator.Rewind() } // rowIdx is the index of the next row to be emitted from the diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index fdeaeb43781e..25c9927d10b0 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -493,9 +493,9 @@ func (z *zigzagJoiner) setupInfo( func (z *zigzagJoiner) close() { if z.InternalClose() { for i := range z.infos { - z.infos[i].fetcher.Close(z.Ctx) + z.infos[i].fetcher.Close(z.Ctx()) } - log.VEventf(z.Ctx, 2, "exiting zigzag joiner run") + log.VEventf(z.Ctx(), 2, "exiting zigzag joiner run") } } @@ -790,17 +790,17 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { curInfo := &z.infos[z.side] err := curInfo.fetcher.StartScan( - z.Ctx, + z.Ctx(), roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(z.EvalCtx.TestingKnobs.ForceProductionValues), zigzagJoinerBatchSize, ) if err != nil { - log.Errorf(z.Ctx, "scan error: %s", err) + log.Errorf(z.Ctx(), "scan error: %s", err) return err } - fetchedRow, err := z.fetchRow(z.Ctx) + fetchedRow, err := z.fetchRow(z.Ctx()) if err != nil { return scrub.UnwrapScrubError(err) } @@ -818,7 +818,7 @@ func (z *zigzagJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata z.MoveToDraining(err) break } - row, err := z.nextRow(z.Ctx) + row, err := z.nextRow(z.Ctx()) if err != nil { z.MoveToDraining(err) break @@ -843,11 +843,11 @@ func (z *zigzagJoiner) ConsumerClosed() { // execStatsForTrace implements ProcessorBase.ExecStatsForTrace. func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats { - z.scanStats = execstats.GetScanStats(z.Ctx, z.ExecStatsTrace) + z.scanStats = execstats.GetScanStats(z.Ctx(), z.ExecStatsTrace) kvStats := execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(z.getBytesRead())), - ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(z.Ctx, z.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(z.Ctx(), z.ExecStatsTrace)), BatchRequestsIssued: optional.MakeUint(uint64(z.getBatchRequestsIssued())), } execstats.PopulateKVMVCCStats(&kvStats, &z.scanStats) @@ -895,7 +895,7 @@ func (z *zigzagJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = z.getBytesRead() meta.Metrics.RowsRead = z.getRowsRead() - if tfs := execinfra.GetLeafTxnFinalState(z.Ctx, z.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(z.Ctx(), z.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta