diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 27026582776c..6db37a7f8e87 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -227,7 +227,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() if bp.InternalClose() { - bp.memAcc.Close(bp.Ctx) + bp.memAcc.Close(bp.Ctx()) } } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 438c32da178e..51d92b0d4413 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -568,8 +568,8 @@ func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Produce prog.ProgressDetails = *details case meta := <-rd.metaCh: return nil, meta - case <-rd.Ctx.Done(): - rd.MoveToDraining(rd.Ctx.Err()) + case <-rd.Ctx().Done(): + rd.MoveToDraining(rd.Ctx().Err()) return nil, rd.DrainHelper() } diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 82449036b0fd..7f5f211aae56 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -374,8 +374,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } expectedKVs := slurpSSTablesLatestKey(t, filepath.Join(dir, "foo"), slurp, srcPrefix, newPrefix) - mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(ctx, &evalCtx, &flowCtx, - mockRestoreDataSpec) + mockRestoreDataProcessor, err := newTestingRestoreDataProcessor(&evalCtx, &flowCtx, mockRestoreDataSpec) require.NoError(t, err) ssts := make(chan mergedSST, 1) require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) @@ -417,15 +416,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { } func newTestingRestoreDataProcessor( - ctx context.Context, - evalCtx *tree.EvalContext, - flowCtx *execinfra.FlowCtx, - spec execinfrapb.RestoreDataSpec, + evalCtx *tree.EvalContext, flowCtx *execinfra.FlowCtx, spec execinfrapb.RestoreDataSpec, ) (*restoreDataProcessor, error) { rd := &restoreDataProcessor{ ProcessorBase: execinfra.ProcessorBase{ ProcessorBaseNoHelper: execinfra.ProcessorBaseNoHelper{ - Ctx: ctx, EvalCtx: evalCtx, }, }, diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index eb7e940786e0..3d7fa51e198a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -484,15 +484,15 @@ func (ca *changeAggregator) close() { } if ca.sink != nil { if err := ca.sink.Close(); err != nil { - log.Warningf(ca.Ctx, `error closing sink. goroutines may have leaked: %v`, err) + log.Warningf(ca.Ctx(), `error closing sink. goroutines may have leaked: %v`, err) } } - ca.memAcc.Close(ca.Ctx) + ca.memAcc.Close(ca.Ctx()) if ca.kvFeedMemMon != nil { - ca.kvFeedMemMon.Stop(ca.Ctx) + ca.kvFeedMemMon.Stop(ca.Ctx()) } - ca.MemMonitor.Stop(ca.Ctx) + ca.MemMonitor.Stop(ca.Ctx()) ca.InternalClose() } @@ -536,7 +536,7 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet // kvFeed, sends off this event to the event consumer, and flushes the sink // if necessary. func (ca *changeAggregator) tick() error { - event, err := ca.eventProducer.Get(ca.Ctx) + event, err := ca.eventProducer.Get(ca.Ctx()) if err != nil { return err } @@ -551,16 +551,16 @@ func (ca *changeAggregator) tick() error { ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds()) } ca.recentKVCount++ - return ca.eventConsumer.ConsumeEvent(ca.Ctx, event) + return ca.eventConsumer.ConsumeEvent(ca.Ctx(), event) case kvevent.TypeResolved: a := event.DetachAlloc() - a.Release(ca.Ctx) + a.Release(ca.Ctx()) resolved := event.Resolved() if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: - return ca.sink.Flush(ca.Ctx) + return ca.sink.Flush(ca.Ctx()) } return nil @@ -601,7 +601,7 @@ func (ca *changeAggregator) flushFrontier() error { // otherwise, we could lose buffered messages and violate the // at-least-once guarantee. This is also true for checkpointing the // resolved spans in the job progress. - if err := ca.sink.Flush(ca.Ctx); err != nil { + if err := ca.sink.Flush(ca.Ctx()); err != nil { return err } @@ -626,7 +626,7 @@ func (ca *changeAggregator) flushFrontier() error { func (ca *changeAggregator) emitResolved(batch jobspb.ResolvedSpans) error { // TODO(smiskin): Remove post-22.2 - if !ca.flowCtx.Cfg.Settings.Version.IsActive(ca.Ctx, clusterversion.ChangefeedIdleness) { + if !ca.flowCtx.Cfg.Settings.Version.IsActive(ca.Ctx(), clusterversion.ChangefeedIdleness) { for _, resolved := range batch.ResolvedSpans { resolvedBytes, err := protoutil.Marshal(&resolved) if err != nil { @@ -1414,11 +1414,11 @@ func (cf *changeFrontier) close() { } if cf.sink != nil { if err := cf.sink.Close(); err != nil { - log.Warningf(cf.Ctx, `error closing sink. goroutines may have leaked: %v`, err) + log.Warningf(cf.Ctx(), `error closing sink. goroutines may have leaked: %v`, err) } } - cf.memAcc.Close(cf.Ctx) - cf.MemMonitor.Stop(cf.Ctx) + cf.memAcc.Close(cf.Ctx()) + cf.MemMonitor.Stop(cf.Ctx()) } } @@ -1506,7 +1506,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error { } var resolvedSpans jobspb.ResolvedSpans - if cf.flowCtx.Cfg.Settings.Version.IsActive(cf.Ctx, clusterversion.ChangefeedIdleness) { + if cf.flowCtx.Cfg.Settings.Version.IsActive(cf.Ctx(), clusterversion.ChangefeedIdleness) { if err := protoutil.Unmarshal([]byte(*raw), &resolvedSpans); err != nil { return errors.NewAssertionErrorWithWrappedErrf(err, `unmarshalling aggregator progress update: %x`, raw) @@ -1534,7 +1534,7 @@ func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error { // job progress update closure, but it currently doesn't pass along the info // we'd need to do it that way. if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) { - logcrash.ReportOrPanic(cf.Ctx, &cf.flowCtx.Cfg.Settings.SV, + logcrash.ReportOrPanic(cf.Ctx(), &cf.flowCtx.Cfg.Settings.SV, `got a span level timestamp %s for %s that is less than the initial high-water %s`, redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart)) continue @@ -1636,7 +1636,7 @@ func (cf *changeFrontier) maybeCheckpointJob( if err != nil { return false, err } - cf.js.checkpointCompleted(cf.Ctx, timeutil.Since(checkpointStart)) + cf.js.checkpointCompleted(cf.Ctx(), timeutil.Since(checkpointStart)) return updated, nil } @@ -1652,7 +1652,7 @@ func (cf *changeFrontier) checkpointJobProgress( } cf.metrics.FrontierUpdates.Inc(1) var updateSkipped error - if err := cf.js.job.Update(cf.Ctx, nil, func( + if err := cf.js.job.Update(cf.Ctx(), nil, func( txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, ) error { // If we're unable to update the job due to the job state, such as during @@ -1677,8 +1677,8 @@ func (cf *changeFrontier) checkpointJobProgress( if !changefeedbase.ActiveProtectedTimestampsEnabled.Get(&cf.flowCtx.Cfg.Settings.SV) { timestampManager = cf.deprecatedManageProtectedTimestamps } - if err := timestampManager(cf.Ctx, txn, changefeedProgress); err != nil { - log.Warningf(cf.Ctx, "error managing protected timestamp record: %v", err) + if err := timestampManager(cf.Ctx(), txn, changefeedProgress); err != nil { + log.Warningf(cf.Ctx(), "error managing protected timestamp record: %v", err) return err } @@ -1702,7 +1702,7 @@ func (cf *changeFrontier) checkpointJobProgress( } if updateSkipped != nil { - log.Warningf(cf.Ctx, "skipping changefeed checkpoint: %s", updateSkipped) + log.Warningf(cf.Ctx(), "skipping changefeed checkpoint: %s", updateSkipped) return false, nil } @@ -1795,7 +1795,7 @@ func (cf *changeFrontier) maybeEmitResolved(newResolved hlc.Timestamp) error { if !shouldEmit { return nil } - if err := emitResolvedTimestamp(cf.Ctx, cf.encoder, cf.sink, newResolved); err != nil { + if err := emitResolvedTimestamp(cf.Ctx(), cf.encoder, cf.sink, newResolved); err != nil { return err } cf.lastEmitResolved = newResolved.GoTime() @@ -1834,13 +1834,13 @@ func (cf *changeFrontier) maybeLogBehindSpan(frontierChanged bool) { description = fmt.Sprintf("job %d", cf.spec.JobID) } if frontierChanged { - log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s", + log.Infof(cf.Ctx(), "%s new resolved timestamp %s is behind by %s", description, frontier, resolvedBehind) } if cf.slowLogEveryN.ShouldProcess(now) { s := cf.frontier.PeekFrontierSpan() - log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind) + log.Infof(cf.Ctx(), "%s span %s is behind by %s", description, s, resolvedBehind) } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 56d3d4671eb1..ca1008b7dd9b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -255,7 +255,7 @@ func (sf *streamIngestionFrontier) Next() ( if err := sf.maybeUpdatePartitionProgress(); err != nil { // Updating the partition progress isn't a fatal error. - log.Errorf(sf.Ctx, "failed to update partition progress: %+v", err) + log.Errorf(sf.Ctx(), "failed to update partition progress: %+v", err) } var frontierChanged bool @@ -268,8 +268,8 @@ func (sf *streamIngestionFrontier) Next() ( // Send back a row to the job so that it can update the progress. newResolvedTS := sf.frontier.Frontier() select { - case <-sf.Ctx.Done(): - sf.MoveToDraining(sf.Ctx.Err()) + case <-sf.Ctx().Done(): + sf.MoveToDraining(sf.Ctx().Err()) return nil, sf.DrainHelper() // Send the frontier update in the heartbeat to the source cluster. case sf.heartbeatSender.frontierUpdates <- newResolvedTS: @@ -301,7 +301,7 @@ func (sf *streamIngestionFrontier) Next() ( func (sf *streamIngestionFrontier) ConsumerClosed() { if sf.InternalClose() { if err := sf.heartbeatSender.stop(); err != nil { - log.Errorf(sf.Ctx, "heartbeatSender exited with error: %s", err.Error()) + log.Errorf(sf.Ctx(), "heartbeatSender exited with error: %s", err.Error()) } } } @@ -349,7 +349,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps( // maybeUpdatePartitionProgress polls the frontier and updates the job progress with // partition-specific information to track the status of each partition. func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { - ctx := sf.Ctx + ctx := sf.Ctx() updateFreq := PartitionProgressFrequency.Get(&sf.flowCtx.Cfg.Settings.SV) if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq { return nil diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index af72e401ff67..9c09bb120080 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -322,7 +322,7 @@ func (sip *streamIngestionProcessor) close() { _ = client.Close() } if sip.batcher != nil { - sip.batcher.Close(sip.Ctx) + sip.batcher.Close(sip.Ctx()) } if sip.maxFlushRateTimer != nil { sip.maxFlushRateTimer.Stop() @@ -476,7 +476,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil { if streamingKnobs.RunAfterReceivingEvent != nil { - streamingKnobs.RunAfterReceivingEvent(sip.Ctx) + streamingKnobs.RunAfterReceivingEvent(sip.Ctx()) } } } @@ -503,13 +503,13 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err return sip.flush() case streamingccl.GenerationEvent: - log.Info(sip.Ctx, "GenerationEvent received") + log.Info(sip.Ctx(), "GenerationEvent received") select { case <-sip.cutoverCh: sip.internalDrained = true return nil, nil - case <-sip.Ctx.Done(): - return nil, sip.Ctx.Err() + case <-sip.Ctx().Done(): + return nil, sip.Ctx().Err() } default: return nil, errors.Newf("unknown streaming event type %v", event.Type()) @@ -552,7 +552,7 @@ func (sip *streamIngestionProcessor) bufferKV(event partitionEvent) error { } func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) error { - log.Infof(sip.Ctx, "got checkpoint %v", event.GetResolved()) + log.Infof(sip.Ctx(), "got checkpoint %v", event.GetResolved()) resolvedTimePtr := event.GetResolved() if resolvedTimePtr == nil { return errors.New("checkpoint event expected to have a resolved timestamp") @@ -574,13 +574,13 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { totalSize := 0 for _, kv := range sip.curBatch { - if err := sip.batcher.AddMVCCKey(sip.Ctx, kv.Key, kv.Value); err != nil { + if err := sip.batcher.AddMVCCKey(sip.Ctx(), kv.Key, kv.Value); err != nil { return nil, errors.Wrapf(err, "adding key %+v", kv) } totalSize += len(kv.Key.Key) + len(kv.Value) } - if err := sip.batcher.Flush(sip.Ctx); err != nil { + if err := sip.batcher.Flush(sip.Ctx()); err != nil { return nil, errors.Wrap(err, "flushing") } sip.metrics.Flushes.Inc(1) @@ -605,7 +605,7 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { sip.lastFlushTime = timeutil.Now() sip.bufferedCheckpoints = make(map[string]hlc.Timestamp) - return &flushedCheckpoints, sip.batcher.Reset(sip.Ctx) + return &flushedCheckpoints, sip.batcher.Reset(sip.Ctx()) } func init() { diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 7151b580bbf4..54f1c57973a8 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -53,10 +53,11 @@ type Columnarizer struct { execinfra.ProcessorBaseNoHelper colexecop.NonExplainable - mode columnarizerMode - helper colmem.SetAccountingHelper - input execinfra.RowSource - da tree.DatumAlloc + mode columnarizerMode + initialized bool + helper colmem.SetAccountingHelper + input execinfra.RowSource + da tree.DatumAlloc batch coldata.Batch vecs coldata.TypedVecs @@ -131,7 +132,7 @@ func newColumnarizer( // Close will call InternalClose(). Note that we don't return // any trailing metadata here because the columnarizers // propagate it in DrainMeta. - if err := c.Close(c.Ctx); buildutil.CrdbTestBuild && err != nil { + if err := c.Close(c.Ctx()); buildutil.CrdbTestBuild && err != nil { // Close never returns an error. colexecerror.InternalError(errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error from Columnarizer.Close")) } @@ -145,13 +146,10 @@ func newColumnarizer( // Init is part of the colexecop.Operator interface. func (c *Columnarizer) Init(ctx context.Context) { - if c.removedFromFlow { - return - } - if c.Ctx != nil { - // Init has already been called. + if c.removedFromFlow || c.initialized { return } + c.initialized = true c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1) ctx = c.StartInternalNoSpan(ctx) c.input.Start(ctx) @@ -241,7 +239,7 @@ func (c *Columnarizer) DrainMeta() []execinfrapb.ProducerMetadata { // We no longer need the batch. c.batch = nil c.helper.ReleaseMemory() - if c.Ctx == nil { + if !c.initialized { // The columnarizer wasn't initialized, so the wrapped processors might // not have been started leaving them in an unsafe to drain state, so // we skip the draining. Mostly likely this happened because a panic was diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 98a4a462aeb8..37285f151f13 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -311,15 +311,7 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata func (m *Materializer) close() { if m.InternalClose() { - if m.Ctx == nil { - // In some edge cases (like when Init of an operator above this - // materializer encounters a panic), the materializer might never be - // started, yet it still will attempt to close its Closers. This - // context is only used for logging purposes, so it is ok to grab - // the background context in order to prevent a NPE below. - m.Ctx = context.Background() - } - m.closers.CloseAndLogOnErr(m.Ctx, "materializer") + m.closers.CloseAndLogOnErr(m.Ctx(), "materializer") } } diff --git a/pkg/sql/execinfra/metadata_test_receiver.go b/pkg/sql/execinfra/metadata_test_receiver.go index b29afb6da476..0da183bead50 100644 --- a/pkg/sql/execinfra/metadata_test_receiver.go +++ b/pkg/sql/execinfra/metadata_test_receiver.go @@ -212,7 +212,7 @@ func (mtr *MetadataTestReceiver) Next() (rowenc.EncDatumRow, *execinfrapb.Produc // We don't use ProcessorBase.ProcessRowHelper() here because we need // special handling for errors: this proc never starts draining in order for // it to be as unintrusive as possible. - outRow, ok, err := mtr.OutputHelper.ProcessRow(mtr.Ctx, row) + outRow, ok, err := mtr.OutputHelper.ProcessRow(mtr.Ctx(), row) if err != nil { mtr.trailingMeta = append(mtr.trailingMeta, execinfrapb.ProducerMetadata{Err: err}) continue diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index f432062dfca7..336594df5bdc 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -367,10 +367,10 @@ type ProcessorBaseNoHelper struct { // has been closed. Closed bool - // Ctx and span contain the tracing state while the processor is active + // ctx and span contain the tracing state while the processor is active // (i.e. hasn't been closed). Initialized using flowCtx.Ctx (which should not be otherwise // used). - Ctx context.Context + ctx context.Context span *tracing.Span // origCtx is the context from which ctx was derived. InternalClose() resets // ctx to this. @@ -528,7 +528,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { // not permitted. if err != nil { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "MoveToDraining called in state %s with err: %+v", pb.State, err) @@ -558,7 +558,7 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) { func (pb *ProcessorBaseNoHelper) DrainHelper() *execinfrapb.ProducerMetadata { if pb.State == StateRunning { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "drain helper called in StateRunning", ) @@ -669,7 +669,7 @@ func (pb *ProcessorBase) HijackExecStatsForTrace() func() *execinfrapb.Component func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { if pb.State == StateTrailingMeta || pb.State == StateExhausted { logcrash.ReportOrPanic( - pb.Ctx, + pb.Ctx(), &pb.FlowCtx.Cfg.Settings.SV, "moveToTrailingMeta called in state: %s", pb.State, @@ -692,10 +692,10 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { } } - if buildutil.CrdbTestBuild && pb.Ctx == nil { + if buildutil.CrdbTestBuild && pb.ctx == nil { panic( errors.AssertionFailedf( - "unexpected nil ProcessorBase.Ctx when draining. Was StartInternal called?", + "unexpected nil ProcessorBase.ctx when draining. Was StartInternal called?", ), ) } @@ -719,7 +719,7 @@ func (pb *ProcessorBaseNoHelper) moveToTrailingMeta() { // should continue processing other rows, with the awareness that the processor // might have been transitioned to the draining phase. func (pb *ProcessorBase) ProcessRowHelper(row rowenc.EncDatumRow) rowenc.EncDatumRow { - outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx, row) + outRow, ok, err := pb.OutputHelper.ProcessRow(pb.Ctx(), row) if err != nil { pb.MoveToDraining(err) return nil @@ -751,7 +751,7 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context) { panic("processor output is not set for emitting rows") } pb.self.Start(ctx) - Run(pb.Ctx, pb.self, pb.Output) + Run(pb.ctx, pb.self, pb.Output) } // ProcStateOpts contains fields used by the ProcessorBase's family of functions @@ -868,9 +868,11 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing. // // It is likely that this method is called from RowSource.Start implementation, // and the recommended layout is the following: -// ctx = pb.StartInternal(ctx, name) -// < inputs >.Start(ctx) // if there are any inputs-RowSources to pb -// < other initialization > +// +// ctx = pb.StartInternal(ctx, name) +// < inputs >.Start(ctx) // if there are any inputs-RowSources to pb +// < other initialization > +// // so that the caller doesn't mistakenly use old ctx object. func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context { return pb.startImpl(ctx, true /* createSpan */, name) @@ -890,16 +892,25 @@ func (pb *ProcessorBaseNoHelper) startImpl( ) context.Context { pb.origCtx = ctx if createSpan { - pb.Ctx, pb.span = ProcessorSpan(ctx, spanName) + pb.ctx, pb.span = ProcessorSpan(ctx, spanName) if pb.span != nil && pb.span.IsVerbose() { pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String())) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID))) } } else { - pb.Ctx = ctx + pb.ctx = ctx + } + pb.EvalCtx.Context = pb.ctx + return pb.ctx +} + +// Ctx is an accessor method for ctx which is guaranteed to return non-nil +// context even if StartInternal() hasn't been called. +func (pb *ProcessorBaseNoHelper) Ctx() context.Context { + if pb.ctx == nil { + return context.Background() } - pb.EvalCtx.Context = pb.Ctx - return pb.Ctx + return pb.ctx } // InternalClose helps processors implement the RowSource interface, performing @@ -956,7 +967,7 @@ func (pb *ProcessorBaseNoHelper) InternalCloseEx(onClose func()) bool { pb.span = nil // Reset the context so that any incidental uses after this point do not // access the finished span. - pb.Ctx = pb.origCtx + pb.ctx = pb.origCtx pb.EvalCtx.Context = pb.origCtx return true } diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index c0ed8fc3aeca..6e0fd5b99f64 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -341,17 +341,17 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) { func (ag *hashAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") // If we have started emitting rows, bucketsIter will represent which // buckets are still open, since buckets are closed once their results are // emitted. if ag.bucketsIter == nil { for _, bucket := range ag.buckets { - bucket.close(ag.Ctx) + bucket.close(ag.Ctx()) } } else { for _, bucket := range ag.bucketsIter { - ag.buckets[bucket].close(ag.Ctx) + ag.buckets[bucket].close(ag.Ctx()) } } // Make sure to release any remaining memory under 'buckets'. @@ -360,25 +360,25 @@ func (ag *hashAggregator) close() { // buckets since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the buckets. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } func (ag *orderedAggregator) close() { if ag.InternalClose() { - log.VEventf(ag.Ctx, 2, "exiting aggregator") + log.VEventf(ag.Ctx(), 2, "exiting aggregator") if ag.bucket != nil { - ag.bucket.close(ag.Ctx) + ag.bucket.close(ag.Ctx()) } // Note that we should be closing accounts only after closing the // bucket since the latter might be releasing some precisely tracked // memory, and if we were to close the accounts first, there would be // no memory to release for the bucket. - ag.bucketsAcc.Close(ag.Ctx) - ag.aggFuncsAcc.Close(ag.Ctx) - ag.MemMonitor.Stop(ag.Ctx) + ag.bucketsAcc.Close(ag.Ctx()) + ag.aggFuncsAcc.Close(ag.Ctx()) + ag.MemMonitor.Stop(ag.Ctx()) } } @@ -416,7 +416,7 @@ func (ag *hashAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -453,7 +453,7 @@ func (ag *hashAggregator) accumulateRows() ( // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(len(ag.buckets))*memsize.String); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(len(ag.buckets))*memsize.String); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -485,7 +485,7 @@ func (ag *orderedAggregator) accumulateRows() ( return aggAccumulating, nil, meta } if row == nil { - log.VEvent(ag.Ctx, 1, "accumulation complete") + log.VEvent(ag.Ctx(), 1, "accumulation complete") ag.inputDone = true break } @@ -529,7 +529,7 @@ func (ag *orderedAggregator) accumulateRows() ( func (ag *aggregatorBase) getAggResults( bucket aggregateFuncs, ) (aggregatorState, rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - defer bucket.close(ag.Ctx) + defer bucket.close(ag.Ctx()) for i, b := range bucket { result, err := b.Result() @@ -575,16 +575,16 @@ func (ag *hashAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } // Before we create a new 'buckets' map below, we need to "release" the // already accounted for memory of the current map. - ag.bucketsAcc.Shrink(ag.Ctx, int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(ag.alreadyAccountedFor)*memsize.MapEntryOverhead) // Note that, for simplicity, we're ignoring the overhead of the slice of // strings. - ag.bucketsAcc.Shrink(ag.Ctx, int64(len(ag.buckets))*memsize.String) + ag.bucketsAcc.Shrink(ag.Ctx(), int64(len(ag.buckets))*memsize.String) ag.bucketsIter = nil ag.buckets = make(map[string]aggregateFuncs) ag.bucketsLenGrowThreshold = hashAggregatorBucketsInitialLen @@ -651,7 +651,7 @@ func (ag *orderedAggregator) emitRow() ( // the columns specified by ag.orderedGroupCols, so we need to continue // accumulating the remaining rows. - if err := ag.arena.UnsafeReset(ag.Ctx); err != nil { + if err := ag.arena.UnsafeReset(ag.Ctx()); err != nil { ag.MoveToDraining(err) return aggStateUnknown, nil, nil } @@ -689,7 +689,7 @@ func (ag *hashAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -711,7 +711,7 @@ func (ag *orderedAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case aggEmittingRows: ag.runningState, row, meta = ag.emitRow() default: - log.Fatalf(ag.Ctx, "unsupported state: %d", ag.runningState) + log.Fatalf(ag.Ctx(), "unsupported state: %d", ag.runningState) } if row == nil && meta == nil { @@ -776,7 +776,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( canAdd := true if a.Distinct { canAdd, err = ag.funcs[i].isDistinct( - ag.Ctx, + ag.Ctx(), &ag.datumAlloc, groupKey, firstArg, @@ -789,7 +789,7 @@ func (ag *aggregatorBase) accumulateRowIntoBucket( if !canAdd { continue } - if err := bucket[i].Add(ag.Ctx, firstArg, otherArgs...); err != nil { + if err := bucket[i].Add(ag.Ctx(), firstArg, otherArgs...); err != nil { return err } } @@ -807,7 +807,7 @@ func (ag *hashAggregator) encode( // used by the aggregate functions (and accounted for accordingly), // this can lead to over-accounting which is acceptable. appendTo, err = row[colIdx].Fingerprint( - ag.Ctx, ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, + ag.Ctx(), ag.inputTypes[colIdx], &ag.datumAlloc, appendTo, &ag.bucketsAcc, ) if err != nil { return appendTo, err @@ -833,7 +833,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { bucket, ok := ag.buckets[string(encoded)] if !ok { - s, err := ag.arena.AllocBytes(ag.Ctx, encoded) + s, err := ag.arena.AllocBytes(ag.Ctx(), encoded) if err != nil { return err } @@ -844,7 +844,7 @@ func (ag *hashAggregator) accumulateRow(row rowenc.EncDatumRow) error { ag.buckets[s] = bucket if len(ag.buckets) == ag.bucketsLenGrowThreshold { toAccountFor := ag.bucketsLenGrowThreshold - ag.alreadyAccountedFor - if err := ag.bucketsAcc.Grow(ag.Ctx, int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), int64(toAccountFor)*memsize.MapEntryOverhead); err != nil { return err } ag.alreadyAccountedFor = ag.bucketsLenGrowThreshold @@ -944,13 +944,13 @@ func (a *aggregateFuncHolder) isDistinct( } func (ag *aggregatorBase) createAggregateFuncs() (aggregateFuncs, error) { - if err := ag.bucketsAcc.Grow(ag.Ctx, sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), sizeOfAggregateFuncs+sizeOfAggregateFunc*int64(len(ag.funcs))); err != nil { return nil, err } bucket := make(aggregateFuncs, len(ag.funcs)) for i, f := range ag.funcs { agg := f.create(ag.EvalCtx, f.arguments) - if err := ag.bucketsAcc.Grow(ag.Ctx, agg.Size()); err != nil { + if err := ag.bucketsAcc.Grow(ag.Ctx(), agg.Size()); err != nil { return nil, err } bucket[i] = agg diff --git a/pkg/sql/rowexec/countrows.go b/pkg/sql/rowexec/countrows.go index 551655abbf52..e88bf7edf47c 100644 --- a/pkg/sql/rowexec/countrows.go +++ b/pkg/sql/rowexec/countrows.go @@ -85,7 +85,7 @@ func (ag *countAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMeta if row == nil { ret := make(rowenc.EncDatumRow, 1) ret[0] = rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(ag.count))} - rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx, ret) + rendered, _, err := ag.OutputHelper.ProcessRow(ag.Ctx(), ret) // We're done as soon as we process our one output row, so we // transition into draining state. We will, however, return non-nil // error (if such occurs during rendering) separately below. diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 4f01e1717173..98160909150d 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -187,7 +187,7 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro // the references to the row (and to the newly allocated datums) // shortly, it'll likely take some time before GC reclaims that memory, // so we choose the over-accounting route to be safe. - appendTo, err = datum.Fingerprint(d.Ctx, d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) + appendTo, err = datum.Fingerprint(d.Ctx(), d.types[colIdx], &d.datumAlloc, appendTo, &d.memAcc) if err != nil { return nil, err } @@ -210,8 +210,8 @@ func (d *distinct) encode(appendTo []byte, row rowenc.EncDatumRow) ([]byte, erro func (d *distinct) close() { if d.InternalClose() { - d.memAcc.Close(d.Ctx) - d.MemMonitor.Stop(d.Ctx) + d.memAcc.Close(d.Ctx()) + d.MemMonitor.Stop(d.Ctx()) } } @@ -256,7 +256,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { // allocated on it, which implies that UnsafeReset() is safe to call here. copy(d.lastGroupKey, row) d.haveLastGroupKey = true - if err := d.arena.UnsafeReset(d.Ctx); err != nil { + if err := d.arena.UnsafeReset(d.Ctx()); err != nil { d.MoveToDraining(err) break } @@ -280,7 +280,7 @@ func (d *distinct) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { } continue } - s, err := d.arena.AllocBytes(d.Ctx, encoding) + s, err := d.arena.AllocBytes(d.Ctx(), encoding) if err != nil { d.MoveToDraining(err) break diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index e69de84a96be..28239de2a34a 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -155,7 +155,7 @@ func newHashJoiner( } return h, h.hashTable.Init( - h.Ctx, + h.Ctx(), shouldMarkRightSide(h.joinType), h.rightSource.OutputTypes(), h.eqCols[rightSide], @@ -187,7 +187,7 @@ func (h *hashJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) case hjEmittingRightUnmatched: h.runningState, row, meta = h.emitRightUnmatched() default: - log.Fatalf(h.Ctx, "unsupported state: %d", h.runningState) + log.Fatalf(h.Ctx(), "unsupported state: %d", h.runningState) } if row == nil && meta == nil { @@ -233,14 +233,14 @@ func (h *hashJoiner) build() (hashJoinerState, rowenc.EncDatumRow, *execinfrapb. return hjStateUnknown, nil, h.DrainHelper() } // If hashTable is in-memory, pre-reserve the memory needed to mark. - if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx); err != nil { + if err = h.hashTable.ReserveMarkMemoryMaybe(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } return hjReadingLeftSide, nil, nil } - err = h.hashTable.AddRow(h.Ctx, row) + err = h.hashTable.AddRow(h.Ctx(), row) // Regardless of the underlying row container (disk backed or in-memory // only), we cannot do anything about an error if it occurs. if err != nil { @@ -274,7 +274,7 @@ func (h *hashJoiner) readLeftSide() ( // hjEmittingRightUnmatched if unmatched rows on the right side need to // be emitted, otherwise finish. if shouldEmitUnmatchedRow(rightSide, h.joinType) { - i := h.hashTable.NewUnmarkedIterator(h.Ctx) + i := h.hashTable.NewUnmarkedIterator(h.Ctx()) i.Rewind() h.emittingRightUnmatchedState.iter = i return hjEmittingRightUnmatched, nil, nil @@ -288,14 +288,14 @@ func (h *hashJoiner) readLeftSide() ( h.probingRowState.row = row h.probingRowState.matched = false if h.probingRowState.iter == nil { - i, err := h.hashTable.NewBucketIterator(h.Ctx, row, h.eqCols[leftSide]) + i, err := h.hashTable.NewBucketIterator(h.Ctx(), row, h.eqCols[leftSide]) if err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } h.probingRowState.iter = i } else { - if err := h.probingRowState.iter.Reset(h.Ctx, row); err != nil { + if err := h.probingRowState.iter.Reset(h.Ctx(), row); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -356,7 +356,7 @@ func (h *hashJoiner) probeRow() ( h.probingRowState.matched = true shouldEmit := true if shouldMarkRightSide(h.joinType) { - if i.IsMarked(h.Ctx) { + if i.IsMarked(h.Ctx()) { switch h.joinType { case descpb.RightSemiJoin: // The row from the right already had a match and was emitted @@ -373,7 +373,7 @@ func (h *hashJoiner) probeRow() ( // whether we have a corresponding unmarked row from the right. h.probingRowState.matched = false } - } else if err := i.Mark(h.Ctx); err != nil { + } else if err := i.Mark(h.Ctx()); err != nil { h.MoveToDraining(err) return hjStateUnknown, nil, h.DrainHelper() } @@ -446,16 +446,16 @@ func (h *hashJoiner) emitRightUnmatched() ( func (h *hashJoiner) close() { if h.InternalClose() { - h.hashTable.Close(h.Ctx) + h.hashTable.Close(h.Ctx()) if h.probingRowState.iter != nil { h.probingRowState.iter.Close() } if h.emittingRightUnmatchedState.iter != nil { h.emittingRightUnmatchedState.iter.Close() } - h.MemMonitor.Stop(h.Ctx) + h.MemMonitor.Stop(h.Ctx()) if h.diskMonitor != nil { - h.diskMonitor.Stop(h.Ctx) + h.diskMonitor.Stop(h.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index e4907520e424..cc5ddb569e74 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -167,7 +167,7 @@ func (ifr *invertedFilterer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMe case ifrEmittingRows: ifr.runningState, row, meta = ifr.emitRow() default: - log.Fatalf(ifr.Ctx, "unsupported state: %d", ifr.runningState) + log.Fatalf(ifr.Ctx(), "unsupported state: %d", ifr.runningState) } if row == nil && meta == nil { continue @@ -192,9 +192,9 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr return ifrReadingInput, meta } if row == nil { - log.VEventf(ifr.Ctx, 1, "no more input rows") + log.VEventf(ifr.Ctx(), 1, "no more input rows") evalResult := ifr.invertedEval.evaluate() - ifr.rc.SetupForRead(ifr.Ctx, evalResult) + ifr.rc.SetupForRead(ifr.Ctx(), evalResult) // invertedEval had a single expression in the batch, and the results // for that expression are in evalResult[0]. ifr.evalResult = evalResult[0] @@ -243,7 +243,7 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr // evaluator. copy(ifr.keyRow, row[:ifr.invertedColIdx]) copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:]) - keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow) + keyIndex, err := ifr.rc.AddRow(ifr.Ctx(), ifr.keyRow) if err != nil { ifr.MoveToDraining(err) return ifrStateUnknown, ifr.DrainHelper() @@ -271,11 +271,11 @@ func (ifr *invertedFilterer) emitRow() ( } if ifr.resultIdx >= len(ifr.evalResult) { // We are done emitting all rows. - return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx)) + return drainFunc(ifr.rc.UnsafeReset(ifr.Ctx())) } curRowIdx := ifr.resultIdx ifr.resultIdx++ - keyRow, err := ifr.rc.GetRow(ifr.Ctx, ifr.evalResult[curRowIdx], false /* skip */) + keyRow, err := ifr.rc.GetRow(ifr.Ctx(), ifr.evalResult[curRowIdx], false /* skip */) if err != nil { return drainFunc(err) } @@ -299,12 +299,12 @@ func (ifr *invertedFilterer) ConsumerClosed() { func (ifr *invertedFilterer) close() { if ifr.InternalClose() { - ifr.rc.Close(ifr.Ctx) + ifr.rc.Close(ifr.Ctx()) if ifr.MemMonitor != nil { - ifr.MemMonitor.Stop(ifr.Ctx) + ifr.MemMonitor.Stop(ifr.Ctx()) } if ifr.diskMonitor != nil { - ifr.diskMonitor.Stop(ifr.Ctx) + ifr.diskMonitor.Stop(ifr.Ctx()) } } } diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 1b2c74371212..2274f54ecc3b 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -385,7 +385,7 @@ func (ij *invertedJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad case ijEmittingRows: ij.runningState, row, meta = ij.emitRow() default: - log.Fatalf(ij.Ctx, "unsupported state: %d", ij.runningState) + log.Fatalf(ij.Ctx(), "unsupported state: %d", ij.runningState) } if row == nil && meta == nil { continue @@ -416,7 +416,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce break } - expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx, row) + expr, preFilterState, err := ij.datumsToInvertedExpr.Convert(ij.Ctx(), row) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -475,12 +475,12 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } if len(ij.inputRows) == 0 { - log.VEventf(ij.Ctx, 1, "no more input rows") + log.VEventf(ij.Ctx(), 1, "no more input rows") // We're done. ij.MoveToDraining(nil) return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "read %d input rows", len(ij.inputRows)) + log.VEventf(ij.Ctx(), 1, "read %d input rows", len(ij.inputRows)) spans, err := ij.batchedExprEval.init() if err != nil { @@ -504,9 +504,9 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce return ijStateUnknown, ij.DrainHelper() } - log.VEventf(ij.Ctx, 1, "scanning %d spans", len(ij.indexSpans)) + log.VEventf(ij.Ctx(), 1, "scanning %d spans", len(ij.indexSpans)) if err = ij.fetcher.StartScan( - ij.Ctx, ij.FlowCtx.Txn, ij.indexSpans, rowinfra.NoBytesLimit, rowinfra.NoRowLimit, + ij.Ctx(), ij.FlowCtx.Txn, ij.indexSpans, rowinfra.NoBytesLimit, rowinfra.NoRowLimit, ij.FlowCtx.TraceKV, ij.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ); err != nil { ij.MoveToDraining(err) @@ -517,11 +517,11 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce } func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.ProducerMetadata) { - log.VEventf(ij.Ctx, 1, "joining rows") + log.VEventf(ij.Ctx(), 1, "joining rows") // Read the entire set of rows that are part of the scan. for { // Fetch the next row and copy it into the row container. - ok, err := ij.fetcher.NextRowInto(ij.Ctx, ij.row, ij.colIdxMap) + ok, err := ij.fetcher.NextRowInto(ij.Ctx(), ij.row, ij.colIdxMap) if err != nil { ij.MoveToDraining(scrub.UnwrapScrubError(err)) return ijStateUnknown, ij.DrainHelper() @@ -574,7 +574,7 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ return ijStateUnknown, ij.DrainHelper() } if shouldAdd { - rowIdx, err := ij.indexRows.AddRow(ij.Ctx, ij.indexRow) + rowIdx, err := ij.indexRows.AddRow(ij.Ctx(), ij.indexRow) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() @@ -586,8 +586,8 @@ func (ij *invertedJoiner) performScan() (invertedJoinerState, *execinfrapb.Produ } } ij.joinedRowIdx = ij.batchedExprEval.evaluate() - ij.indexRows.SetupForRead(ij.Ctx, ij.joinedRowIdx) - log.VEventf(ij.Ctx, 1, "done evaluating expressions") + ij.indexRows.SetupForRead(ij.Ctx(), ij.joinedRowIdx) + log.VEventf(ij.Ctx(), 1, "done evaluating expressions") return ijEmittingRows, nil } @@ -604,7 +604,7 @@ func (ij *invertedJoiner) emitRow() ( ) { // Finished processing the batch. if ij.emitCursor.inputRowIdx >= len(ij.joinedRowIdx) { - log.VEventf(ij.Ctx, 1, "done emitting rows") + log.VEventf(ij.Ctx(), 1, "done emitting rows") // Ready for another input batch. Reset state. ij.inputRows = ij.inputRows[:0] ij.batchedExprEval.reset() @@ -612,7 +612,7 @@ func (ij *invertedJoiner) emitRow() ( ij.emitCursor.outputRowIdx = 0 ij.emitCursor.inputRowIdx = 0 ij.emitCursor.seenMatch = false - if err := ij.indexRows.UnsafeReset(ij.Ctx); err != nil { + if err := ij.indexRows.UnsafeReset(ij.Ctx()); err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() } @@ -642,7 +642,7 @@ func (ij *invertedJoiner) emitRow() ( inputRow := ij.inputRows[ij.emitCursor.inputRowIdx] joinedRowIdx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - indexedRow, err := ij.indexRows.GetRow(ij.Ctx, joinedRowIdx, false /* skip */) + indexedRow, err := ij.indexRows.GetRow(ij.Ctx(), joinedRowIdx, false /* skip */) if err != nil { ij.MoveToDraining(err) return ijStateUnknown, nil, ij.DrainHelper() @@ -657,7 +657,7 @@ func (ij *invertedJoiner) emitRow() ( skipRemaining := func() error { for ; ij.emitCursor.outputRowIdx < len(ij.joinedRowIdx[ij.emitCursor.inputRowIdx]); ij.emitCursor.outputRowIdx++ { idx := ij.joinedRowIdx[ij.emitCursor.inputRowIdx][ij.emitCursor.outputRowIdx] - if _, err := ij.indexRows.GetRow(ij.Ctx, idx, true /* skip */); err != nil { + if _, err := ij.indexRows.GetRow(ij.Ctx(), idx, true /* skip */); err != nil { return err } } @@ -760,14 +760,14 @@ func (ij *invertedJoiner) ConsumerClosed() { func (ij *invertedJoiner) close() { if ij.InternalClose() { if ij.fetcher != nil { - ij.fetcher.Close(ij.Ctx) + ij.fetcher.Close(ij.Ctx()) } if ij.indexRows != nil { - ij.indexRows.Close(ij.Ctx) + ij.indexRows.Close(ij.Ctx()) } - ij.MemMonitor.Stop(ij.Ctx) + ij.MemMonitor.Stop(ij.Ctx()) if ij.diskMonitor != nil { - ij.diskMonitor.Stop(ij.Ctx) + ij.diskMonitor.Stop(ij.Ctx()) } } } @@ -782,14 +782,14 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - ij.scanStats = execinfra.GetScanStats(ij.Ctx, ij.ExecStatsTrace) + ij.scanStats = execinfra.GetScanStats(ij.Ctx(), ij.ExecStatsTrace) ret := execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(ij.fetcher.GetBytesRead())), TuplesRead: fis.NumTuples, KVTime: fis.WaitTime, - ContentionTime: optional.MakeTimeValue(execinfra.GetCumulativeContentionTime(ij.Ctx, ij.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execinfra.GetCumulativeContentionTime(ij.Ctx(), ij.ExecStatsTrace)), }, Exec: execinfrapb.ExecStats{ MaxAllocatedMem: optional.MakeUint(uint64(ij.MemMonitor.MaximumBytes())), @@ -807,7 +807,7 @@ func (ij *invertedJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = ij.fetcher.GetBytesRead() meta.Metrics.RowsRead = ij.rowsRead - if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx, ij.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(ij.Ctx(), ij.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 40128979be6f..147c3b2117e3 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -673,7 +673,7 @@ func (jr *joinReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) meta = jr.DrainHelper() jr.runningState = jrStateUnknown default: - log.Fatalf(jr.Ctx, "unsupported state: %d", jr.runningState) + log.Fatalf(jr.Ctx(), "unsupported state: %d", jr.runningState) } if row == nil && meta == nil { continue @@ -816,7 +816,7 @@ func (jr *joinReader) readInput() ( } if len(jr.scratchInputRows) == 0 { - log.VEventf(jr.Ctx, 1, "no more input rows") + log.VEventf(jr.Ctx(), 1, "no more input rows") if outRow != nil { return jrReadyToDrain, outRow, nil } @@ -824,7 +824,7 @@ func (jr *joinReader) readInput() ( jr.MoveToDraining(nil) return jrStateUnknown, nil, jr.DrainHelper() } - log.VEventf(jr.Ctx, 1, "read %d input rows", len(jr.scratchInputRows)) + log.VEventf(jr.Ctx(), 1, "read %d input rows", len(jr.scratchInputRows)) if jr.groupingState != nil && len(jr.scratchInputRows) > 0 { jr.updateGroupingStateForNonEmptyBatch() @@ -875,7 +875,7 @@ func (jr *joinReader) readInput() ( sort.Sort(spans) } - log.VEventf(jr.Ctx, 1, "scanning %d spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d spans", len(spans)) // Note that the fetcher takes ownership of the spans slice - it will modify // it and perform the memory accounting. We don't care about the // modification here, but we want to be conscious about the memory @@ -883,12 +883,12 @@ func (jr *joinReader) readInput() ( // joinReaderStrategy doesn't account for any memory used by the spans. if jr.usesStreamer { var kvBatchFetcher *row.TxnKVStreamer - kvBatchFetcher, err = row.NewTxnKVStreamer(jr.Ctx, jr.streamerInfo.Streamer, spans, jr.keyLocking) + kvBatchFetcher, err = row.NewTxnKVStreamer(jr.Ctx(), jr.streamerInfo.Streamer, spans, jr.keyLocking) if err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() } - err = jr.fetcher.StartScanFrom(jr.Ctx, kvBatchFetcher, jr.FlowCtx.TraceKV) + err = jr.fetcher.StartScanFrom(jr.Ctx(), kvBatchFetcher, jr.FlowCtx.TraceKV) } else { var bytesLimit rowinfra.BytesLimit if !jr.shouldLimitBatches { @@ -900,7 +900,7 @@ func (jr *joinReader) readInput() ( } } err = jr.fetcher.StartScan( - jr.Ctx, jr.txn, spans, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), jr.txn, spans, bytesLimit, rowinfra.NoRowLimit, jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ) } @@ -931,7 +931,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet } // Fetch the next row and tell the strategy to process it. - lookedUpRow, err := jr.fetcher.NextRow(jr.Ctx) + lookedUpRow, err := jr.fetcher.NextRow(jr.Ctx()) if err != nil { jr.MoveToDraining(scrub.UnwrapScrubError(err)) return jrStateUnknown, jr.DrainHelper() @@ -943,7 +943,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet jr.rowsRead++ jr.curBatchRowsRead++ - if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx, lookedUpRow, key); err != nil { + if nextState, err := jr.strategy.processLookedUpRow(jr.Ctx(), lookedUpRow, key); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() } else if nextState != jrPerformingLookup { @@ -970,13 +970,13 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet // collection phase. sort.Sort(spans) - log.VEventf(jr.Ctx, 1, "scanning %d remote spans", len(spans)) + log.VEventf(jr.Ctx(), 1, "scanning %d remote spans", len(spans)) bytesLimit := rowinfra.DefaultBatchBytesLimit if !jr.shouldLimitBatches { bytesLimit = rowinfra.NoBytesLimit } if err := jr.fetcher.StartScan( - jr.Ctx, jr.txn, spans, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx(), jr.txn, spans, bytesLimit, rowinfra.NoRowLimit, jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ); err != nil { jr.MoveToDraining(err) @@ -986,8 +986,8 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet } } - log.VEvent(jr.Ctx, 1, "done joining rows") - jr.strategy.prepareToEmit(jr.Ctx) + log.VEvent(jr.Ctx(), 1, "done joining rows") + jr.strategy.prepareToEmit(jr.Ctx()) return jrEmittingRows, nil } @@ -999,7 +999,7 @@ func (jr *joinReader) emitRow() ( rowenc.EncDatumRow, *execinfrapb.ProducerMetadata, ) { - rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx) + rowToEmit, nextState, err := jr.strategy.nextRowToEmit(jr.Ctx()) if err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() @@ -1056,31 +1056,31 @@ func (jr *joinReader) ConsumerClosed() { func (jr *joinReader) close() { if jr.InternalClose() { if jr.fetcher != nil { - jr.fetcher.Close(jr.Ctx) + jr.fetcher.Close(jr.Ctx()) } if jr.usesStreamer { // We have to cleanup the streamer after closing the fetcher because // the latter might release some memory tracked by the budget of the // streamer. if jr.streamerInfo.Streamer != nil { - jr.streamerInfo.Streamer.Close(jr.Ctx) + jr.streamerInfo.Streamer.Close(jr.Ctx()) } - jr.streamerInfo.budgetAcc.Close(jr.Ctx) - jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx) + jr.streamerInfo.budgetAcc.Close(jr.Ctx()) + jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx()) if jr.streamerInfo.diskMonitor != nil { - jr.streamerInfo.diskMonitor.Stop(jr.Ctx) + jr.streamerInfo.diskMonitor.Stop(jr.Ctx()) } } - jr.strategy.close(jr.Ctx) - jr.memAcc.Close(jr.Ctx) + jr.strategy.close(jr.Ctx()) + jr.memAcc.Close(jr.Ctx()) if jr.limitedMemMonitor != nil { - jr.limitedMemMonitor.Stop(jr.Ctx) + jr.limitedMemMonitor.Stop(jr.Ctx()) } if jr.MemMonitor != nil { - jr.MemMonitor.Stop(jr.Ctx) + jr.MemMonitor.Stop(jr.Ctx()) } if jr.diskMonitor != nil { - jr.diskMonitor.Stop(jr.Ctx) + jr.diskMonitor.Stop(jr.Ctx()) } } } @@ -1096,14 +1096,14 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats { return nil } - jr.scanStats = execinfra.GetScanStats(jr.Ctx, jr.ExecStatsTrace) + jr.scanStats = execinfra.GetScanStats(jr.Ctx(), jr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ Inputs: []execinfrapb.InputStats{is}, KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(jr.fetcher.GetBytesRead())), TuplesRead: fis.NumTuples, KVTime: fis.WaitTime, - ContentionTime: optional.MakeTimeValue(execinfra.GetCumulativeContentionTime(jr.Ctx, jr.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execinfra.GetCumulativeContentionTime(jr.Ctx(), jr.ExecStatsTrace)), }, Output: jr.OutputHelper.Stats(), } @@ -1129,7 +1129,7 @@ func (jr *joinReader) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.RowsRead = jr.rowsRead meta.Metrics.BytesRead = jr.fetcher.GetBytesRead() - if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx, jr.txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(jr.Ctx(), jr.txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index 74a74bb09c37..8081c9895bf1 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -194,7 +194,7 @@ func (s *joinReaderNoOrderingStrategy) generateRemoteSpans() (roachpb.Spans, err return nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) generatedRemoteSpans() bool { @@ -207,7 +207,7 @@ func (s *joinReaderNoOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false s.emitState.unmatchedInputRowIndicesInitialized = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderNoOrderingStrategy) processLookedUpRow( @@ -327,13 +327,13 @@ func (s *joinReaderNoOrderingStrategy) spilled() bool { return false } func (s *joinReaderNoOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderNoOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderNoOrderingStrategy) close(ctx context.Context) { @@ -416,7 +416,7 @@ func (s *joinReaderIndexJoinStrategy) processLookupRows( rows []rowenc.EncDatumRow, ) (roachpb.Spans, error) { s.inputRows = rows - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderIndexJoinStrategy) processLookedUpRow( @@ -446,13 +446,13 @@ func (s *joinReaderIndexJoinStrategy) spilled() bool { func (s *joinReaderIndexJoinStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } func (s *joinReaderIndexJoinStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } func (s *joinReaderIndexJoinStrategy) close(ctx context.Context) { @@ -595,7 +595,7 @@ func (s *joinReaderOrderingStrategy) generateRemoteSpans() (roachpb.Spans, error return nil, errors.AssertionFailedf("generateRemoteSpans can only be called for locality optimized lookup joins") } s.remoteSpansGenerated = true - return gen.generateRemoteSpans(s.Ctx, s.inputRows) + return gen.generateRemoteSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) generatedRemoteSpans() bool { @@ -645,7 +645,7 @@ func (s *joinReaderOrderingStrategy) processLookupRows( s.inputRows = rows s.remoteSpansGenerated = false - return s.generateSpans(s.Ctx, s.inputRows) + return s.generateSpans(s.Ctx(), s.inputRows) } func (s *joinReaderOrderingStrategy) processLookedUpRow( @@ -771,7 +771,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit( return nil, jrEmittingRows, nil } - lookedUpRow, err := s.lookedUpRows.GetRow(s.Ctx, lookedUpRowIdx, false /* skip */) + lookedUpRow, err := s.lookedUpRows.GetRow(s.Ctx(), lookedUpRowIdx, false /* skip */) if err != nil { return nil, jrStateUnknown, err } @@ -820,16 +820,16 @@ func (s *joinReaderOrderingStrategy) close(ctx context.Context) { func (s *joinReaderOrderingStrategy) growMemoryAccount( memAcc *mon.BoundAccount, delta int64, ) error { - if err := memAcc.Grow(s.Ctx, delta); err != nil { + if err := memAcc.Grow(s.Ctx(), delta); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Grow(s.Ctx, delta)) + return addWorkmemHint(memAcc.Grow(s.Ctx(), delta)) } return nil } @@ -841,16 +841,16 @@ func (s *joinReaderOrderingStrategy) growMemoryAccount( func (s *joinReaderOrderingStrategy) resizeMemoryAccount( memAcc *mon.BoundAccount, oldSz, newSz int64, ) error { - if err := memAcc.Resize(s.Ctx, oldSz, newSz); err != nil { + if err := memAcc.Resize(s.Ctx(), oldSz, newSz); err != nil { // We don't have enough budget to account for the new size. Check // whether we can spill the looked up rows to disk to free up the // budget. - spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx) + spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx()) if !spilled || spillErr != nil { return addWorkmemHint(errors.CombineErrors(err, spillErr)) } // We freed up some budget, so try to perform the accounting again. - return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz)) + return addWorkmemHint(memAcc.Resize(s.Ctx(), oldSz, newSz)) } return nil } diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index 2d634bb3dc8f..fb4ad0bbcb0d 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -230,7 +230,7 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada // TODO(paul): Investigate (with benchmarks) whether or not it's // worthwhile to only buffer one row from the right stream per batch // for semi-joins. - m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx, m.EvalCtx) + m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx(), m.EvalCtx) if meta != nil { return nil, meta } @@ -249,8 +249,8 @@ func (m *mergeJoiner) nextRow() (rowenc.EncDatumRow, *execinfrapb.ProducerMetada func (m *mergeJoiner) close() { if m.InternalClose() { - m.streamMerger.close(m.Ctx) - m.MemMonitor.Stop(m.Ctx) + m.streamMerger.close(m.Ctx()) + m.MemMonitor.Stop(m.Ctx()) } } diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index 567c17a0e851..1b5788aee83e 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -342,7 +342,8 @@ func TestAggregatorSpecAggregationEquals(t *testing.T) { func TestProcessorBaseContext(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() + // Use a custom context to distinguish it from the background one. + ctx := context.WithValue(context.Background(), struct{}{}, struct{}{}) st := cluster.MakeTestingClusterSettings() runTest := func(t *testing.T, f func(noop *noopProcessor)) { @@ -358,18 +359,22 @@ func TestProcessorBaseContext(t *testing.T) { if err != nil { t.Fatal(err) } + // Before Start we should get the background context. + if noop.Ctx() != context.Background() { + t.Fatalf("ProcessorBase.Ctx() didn't return the background context before Start") + } noop.Start(ctx) - origCtx := noop.Ctx + origCtx := noop.Ctx() // The context should be valid after Start but before Next is called in case // ConsumerDone or ConsumerClosed are called without calling Next. - if noop.Ctx == nil { + if noop.Ctx() == context.Background() { t.Fatalf("ProcessorBase.ctx not initialized") } f(noop) // The context should be reset after ConsumerClosed is called so that any // subsequent logging calls will not operate on closed spans. - if noop.Ctx != origCtx { + if noop.Ctx() != origCtx { t.Fatalf("ProcessorBase.ctx not reset on close") } } diff --git a/pkg/sql/rowexec/project_set.go b/pkg/sql/rowexec/project_set.go index 50c3c9405c38..7e005e4d23d1 100644 --- a/pkg/sql/rowexec/project_set.go +++ b/pkg/sql/rowexec/project_set.go @@ -147,7 +147,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // First, make sure to close its ValueGenerator from the previous // input row (if it exists). if ps.gens[i] != nil { - ps.gens[i].Close(ps.Ctx) + ps.gens[i].Close(ps.Ctx()) ps.gens[i] = nil } @@ -165,7 +165,7 @@ func (ps *projectSetProcessor) nextInputRow() ( // Store the generator before Start so that it'll be closed even if // Start returns an error. ps.gens[i] = gen - if err := gen.Start(ps.Ctx, ps.FlowCtx.Txn); err != nil { + if err := gen.Start(ps.Ctx(), ps.FlowCtx.Txn); err != nil { return nil, nil, err } } @@ -186,7 +186,7 @@ func (ps *projectSetProcessor) nextGeneratorValues() (newValAvail bool, err erro numCols := int(ps.spec.NumColsPerGen[i]) if !ps.done[i] { // Yes; check whether this source still has some values available. - hasVals, err := gen.Next(ps.Ctx) + hasVals, err := gen.Next(ps.Ctx()) if err != nil { return false, err } @@ -297,7 +297,7 @@ func (ps *projectSetProcessor) close() { ps.InternalCloseEx(func() { for _, gen := range ps.gens { if gen != nil { - gen.Close(ps.Ctx) + gen.Close(ps.Ctx()) } } }) diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index 3ce0079c9d94..48f56d8a6b1a 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -203,9 +203,9 @@ func (s *sampleAggregator) Run(ctx context.Context) { func (s *sampleAggregator) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.tempMemAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.tempMemAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sampler.go b/pkg/sql/rowexec/sampler.go index 365349fbeae9..c92e33208e5d 100644 --- a/pkg/sql/rowexec/sampler.go +++ b/pkg/sql/rowexec/sampler.go @@ -478,8 +478,8 @@ func (s *samplerProcessor) sampleRow( func (s *samplerProcessor) close() { if s.InternalClose() { - s.memAcc.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.memAcc.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) } } diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index dd7e4ea5a252..c4ce3fb7afc1 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -116,10 +116,10 @@ func (s *sorterBase) close() { if s.i != nil { s.i.Close() } - s.rows.Close(s.Ctx) - s.MemMonitor.Stop(s.Ctx) + s.rows.Close(s.Ctx()) + s.MemMonitor.Stop(s.Ctx()) if s.diskMonitor != nil { - s.diskMonitor.Stop(s.Ctx) + s.diskMonitor.Stop(s.Ctx()) } } } @@ -422,7 +422,7 @@ func newSortChunksProcessor( ); err != nil { return nil, err } - proc.i = proc.rows.NewFinalIterator(proc.Ctx) + proc.i = proc.rows.NewFinalIterator(proc.Ctx()) return proc, nil } @@ -449,7 +449,7 @@ func (s *sortChunksProcessor) chunkCompleted( // if a metadata record was encountered). The caller is expected to drain when // this returns false. func (s *sortChunksProcessor) fill() (bool, error) { - ctx := s.Ctx + ctx := s.Ctx() var meta *execinfrapb.ProducerMetadata @@ -519,7 +519,7 @@ func (s *sortChunksProcessor) Start(ctx context.Context) { // Next is part of the RowSource interface. func (s *sortChunksProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - ctx := s.Ctx + ctx := s.Ctx() for s.State == execinfra.StateRunning { ok, err := s.i.Valid() if err != nil { diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index ebcaaa7ccbcf..71abb77c0e31 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -250,7 +250,7 @@ func TestingSetScannedRowProgressFrequency(val int64) func() { func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { for tr.State == execinfra.StateRunning { if !tr.scanStarted { - err := tr.startScan(tr.Ctx) + err := tr.startScan(tr.Ctx()) if err != nil { tr.MoveToDraining(err) break @@ -265,7 +265,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata return nil, meta } - row, err := tr.fetcher.NextRow(tr.Ctx) + row, err := tr.fetcher.NextRow(tr.Ctx()) if row == nil || err != nil { tr.MoveToDraining(err) break @@ -286,7 +286,7 @@ func (tr *tableReader) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata func (tr *tableReader) close() { if tr.InternalClose() { if tr.fetcher != nil { - tr.fetcher.Close(tr.Ctx) + tr.fetcher.Close(tr.Ctx()) } } } @@ -302,13 +302,13 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats { if !ok { return nil } - tr.scanStats = execinfra.GetScanStats(tr.Ctx, tr.ExecStatsTrace) + tr.scanStats = execinfra.GetScanStats(tr.Ctx(), tr.ExecStatsTrace) ret := &execinfrapb.ComponentStats{ KV: execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(tr.fetcher.GetBytesRead())), TuplesRead: is.NumTuples, KVTime: is.WaitTime, - ContentionTime: optional.MakeTimeValue(execinfra.GetCumulativeContentionTime(tr.Ctx, tr.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execinfra.GetCumulativeContentionTime(tr.Ctx(), tr.ExecStatsTrace)), }, Output: tr.OutputHelper.Stats(), } @@ -321,13 +321,13 @@ func (tr *tableReader) generateMeta() []execinfrapb.ProducerMetadata { if !tr.ignoreMisplannedRanges { nodeID, ok := tr.FlowCtx.NodeID.OptionalNodeID() if ok { - ranges := execinfra.MisplannedRanges(tr.Ctx, tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) + ranges := execinfra.MisplannedRanges(tr.Ctx(), tr.SpansCopy, nodeID, tr.FlowCtx.Cfg.RangeCache) if ranges != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{Ranges: ranges}) } } } - if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx, tr.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(tr.Ctx(), tr.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index cbd0e63631b7..f4120fb4a8a7 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -222,7 +222,7 @@ func (w *windower) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { case windowerEmittingRows: w.runningState, row, meta = w.emitRow() default: - log.Fatalf(w.Ctx, "unsupported state: %d", w.runningState) + log.Fatalf(w.Ctx(), "unsupported state: %d", w.runningState) } if row == nil && meta == nil { @@ -244,16 +244,16 @@ func (w *windower) close() { if w.allRowsIterator != nil { w.allRowsIterator.Close() } - w.allRowsPartitioned.Close(w.Ctx) + w.allRowsPartitioned.Close(w.Ctx()) if w.partition != nil { - w.partition.Close(w.Ctx) + w.partition.Close(w.Ctx()) } for _, builtin := range w.builtins { - builtin.Close(w.Ctx, w.EvalCtx) + builtin.Close(w.Ctx(), w.EvalCtx) } - w.acc.Close(w.Ctx) - w.MemMonitor.Stop(w.Ctx) - w.diskMonitor.Stop(w.Ctx) + w.acc.Close(w.Ctx()) + w.MemMonitor.Stop(w.Ctx()) + w.diskMonitor.Stop(w.Ctx()) } } @@ -277,17 +277,17 @@ func (w *windower) accumulateRows() ( return windowerAccumulating, nil, meta } if row == nil { - log.VEvent(w.Ctx, 1, "accumulation complete") + log.VEvent(w.Ctx(), 1, "accumulation complete") w.inputDone = true // We need to sort all the rows based on partitionBy columns so that all // rows belonging to the same hash bucket are contiguous. - w.allRowsPartitioned.Sort(w.Ctx) + w.allRowsPartitioned.Sort(w.Ctx()) break } // The underlying row container will decode all datums as necessary, so we // don't need to worry about that. - if err := w.allRowsPartitioned.AddRow(w.Ctx, row); err != nil { + if err := w.allRowsPartitioned.AddRow(w.Ctx(), row); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -310,7 +310,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr return windowerStateUnknown, nil, w.DrainHelper() } - if err := w.computeWindowFunctions(w.Ctx, w.EvalCtx); err != nil { + if err := w.computeWindowFunctions(w.Ctx(), w.EvalCtx); err != nil { w.MoveToDraining(err) return windowerStateUnknown, nil, w.DrainHelper() } @@ -340,7 +340,7 @@ func (w *windower) emitRow() (windowerState, rowenc.EncDatumRow, *execinfrapb.Pr func (w *windower) spillAllRowsToDisk() error { if w.allRowsPartitioned != nil { if !w.allRowsPartitioned.UsingDisk() { - if err := w.allRowsPartitioned.SpillToDisk(w.Ctx); err != nil { + if err := w.allRowsPartitioned.SpillToDisk(w.Ctx()); err != nil { return err } } else { @@ -348,7 +348,7 @@ func (w *windower) spillAllRowsToDisk() error { // w.partition if possible. if w.partition != nil { if !w.partition.UsingDisk() { - if err := w.partition.SpillToDisk(w.Ctx); err != nil { + if err := w.partition.SpillToDisk(w.Ctx()); err != nil { return err } } @@ -362,12 +362,12 @@ func (w *windower) spillAllRowsToDisk() error { // error, it forces all rows to spill and attempts to grow acc by usage // one more time. func (w *windower) growMemAccount(acc *mon.BoundAccount, usage int64) error { - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { if sqlerrors.IsOutOfMemoryError(err) { if err := w.spillAllRowsToDisk(); err != nil { return err } - if err := acc.Grow(w.Ctx, usage); err != nil { + if err := acc.Grow(w.Ctx(), usage); err != nil { return err } } else { @@ -693,7 +693,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *tree.Eva } } } - if err := w.partition.AddRow(w.Ctx, row); err != nil { + if err := w.partition.AddRow(w.Ctx(), row); err != nil { return err } } @@ -707,7 +707,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *tree.Eva func (w *windower) populateNextOutputRow() (bool, error) { if w.partitionIdx < len(w.partitionSizes) { if w.allRowsIterator == nil { - w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx) + w.allRowsIterator = w.allRowsPartitioned.NewUnmarkedIterator(w.Ctx()) w.allRowsIterator.Rewind() } // rowIdx is the index of the next row to be emitted from the diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 1e4616cad812..7bd7c5d22bcf 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -518,9 +518,9 @@ func (z *zigzagJoiner) setupInfo( func (z *zigzagJoiner) close() { if z.InternalClose() { for i := range z.infos { - z.infos[i].fetcher.Close(z.Ctx) + z.infos[i].fetcher.Close(z.Ctx()) } - log.VEventf(z.Ctx, 2, "exiting zigzag joiner run") + log.VEventf(z.Ctx(), 2, "exiting zigzag joiner run") } } @@ -942,7 +942,7 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { curInfo := z.infos[z.side] err := curInfo.fetcher.StartScan( - z.Ctx, + z.Ctx(), z.FlowCtx.Txn, roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, rowinfra.DefaultBatchBytesLimit, @@ -951,10 +951,10 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { z.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ) if err != nil { - log.Errorf(z.Ctx, "scan error: %s", err) + log.Errorf(z.Ctx(), "scan error: %s", err) return err } - fetchedRow, err := z.fetchRow(z.Ctx) + fetchedRow, err := z.fetchRow(z.Ctx()) if err != nil { return scrub.UnwrapScrubError(err) } @@ -972,7 +972,7 @@ func (z *zigzagJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata z.MoveToDraining(err) break } - row, err := z.nextRow(z.Ctx, z.FlowCtx.Txn) + row, err := z.nextRow(z.Ctx(), z.FlowCtx.Txn) if err != nil { z.MoveToDraining(err) break @@ -997,11 +997,11 @@ func (z *zigzagJoiner) ConsumerClosed() { // execStatsForTrace implements ProcessorBase.ExecStatsForTrace. func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats { - z.scanStats = execinfra.GetScanStats(z.Ctx, z.ExecStatsTrace) + z.scanStats = execinfra.GetScanStats(z.Ctx(), z.ExecStatsTrace) kvStats := execinfrapb.KVStats{ BytesRead: optional.MakeUint(uint64(z.getBytesRead())), - ContentionTime: optional.MakeTimeValue(execinfra.GetCumulativeContentionTime(z.Ctx, z.ExecStatsTrace)), + ContentionTime: optional.MakeTimeValue(execinfra.GetCumulativeContentionTime(z.Ctx(), z.ExecStatsTrace)), } execinfra.PopulateKVMVCCStats(&kvStats, &z.scanStats) for i := range z.infos { @@ -1040,7 +1040,7 @@ func (z *zigzagJoiner) generateMeta() []execinfrapb.ProducerMetadata { meta.Metrics = execinfrapb.GetMetricsMeta() meta.Metrics.BytesRead = z.getBytesRead() meta.Metrics.RowsRead = z.getRowsRead() - if tfs := execinfra.GetLeafTxnFinalState(z.Ctx, z.FlowCtx.Txn); tfs != nil { + if tfs := execinfra.GetLeafTxnFinalState(z.Ctx(), z.FlowCtx.Txn); tfs != nil { trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs}) } return trailingMeta