diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index c58e3f77e770..a0e3caacaf9b 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -707,7 +707,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( typ = sqlbase.ColTypeInfoFromColTypes(colTypes) } rows = rowcontainer.NewRowContainer(subqueryMemAccount, typ, 0) - defer rows.Close(evalCtx.Ctx()) + defer rows.Close(ctx) subqueryRowReceiver := NewRowResultWriter(rows) subqueryRecv.resultWriter = subqueryRowReceiver diff --git a/pkg/sql/distsqlrun/mergejoiner.go b/pkg/sql/distsqlrun/mergejoiner.go index bc8c0a97e2ff..6a0c7beea503 100644 --- a/pkg/sql/distsqlrun/mergejoiner.go +++ b/pkg/sql/distsqlrun/mergejoiner.go @@ -226,7 +226,7 @@ func (m *mergeJoiner) nextRow() (sqlbase.EncDatumRow, *ProducerMetadata) { // TODO(paul): Investigate (with benchmarks) whether or not it's // worthwhile to only buffer one row from the right stream per batch // for semi-joins. - m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.evalCtx) + m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx, m.evalCtx) if meta != nil { return nil, meta } @@ -242,7 +242,7 @@ func (m *mergeJoiner) nextRow() (sqlbase.EncDatumRow, *ProducerMetadata) { func (m *mergeJoiner) close() { if m.InternalClose() { - ctx := m.evalCtx.Ctx() + ctx := m.Ctx m.streamMerger.close(ctx) m.MemMonitor.Stop(ctx) } diff --git a/pkg/sql/distsqlrun/sorter.go b/pkg/sql/distsqlrun/sorter.go index f4a4e04ac9bc..f2d887747703 100644 --- a/pkg/sql/distsqlrun/sorter.go +++ b/pkg/sql/distsqlrun/sorter.go @@ -142,7 +142,7 @@ func (s *sorterBase) close() { if s.i != nil { s.i.Close() } - ctx := s.evalCtx.Ctx() + ctx := s.Ctx s.rows.Close(ctx) s.MemMonitor.Stop(ctx) if s.diskMonitor != nil { @@ -522,7 +522,7 @@ func (s *sortChunksProcessor) chunkCompleted( // if a metadata record was encountered). The caller is expected to drain when // this returns false. func (s *sortChunksProcessor) fill() (bool, error) { - ctx := s.evalCtx.Ctx() + ctx := s.Ctx var meta *ProducerMetadata @@ -592,6 +592,7 @@ func (s *sortChunksProcessor) Start(ctx context.Context) context.Context { // Next is part of the RowSource interface. func (s *sortChunksProcessor) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { + ctx := s.Ctx for s.State == StateRunning { ok, err := s.i.Valid() if err != nil { @@ -600,7 +601,6 @@ func (s *sortChunksProcessor) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { } // If we don't have an active chunk, clear and refill it. if !ok { - ctx := s.evalCtx.Ctx() if err := s.rows.UnsafeReset(ctx); err != nil { s.MoveToDraining(err) break diff --git a/pkg/sql/distsqlrun/stream_group_accumulator.go b/pkg/sql/distsqlrun/stream_group_accumulator.go index 1fd29ed61bed..82eae66abb96 100644 --- a/pkg/sql/distsqlrun/stream_group_accumulator.go +++ b/pkg/sql/distsqlrun/stream_group_accumulator.go @@ -65,7 +65,7 @@ func (s *streamGroupAccumulator) start(ctx context.Context) { // nextGroup returns the next group from the inputs. The returned slice is not safe // to use after the next call to nextGroup. func (s *streamGroupAccumulator) nextGroup( - evalCtx *tree.EvalContext, + ctx context.Context, evalCtx *tree.EvalContext, ) ([]sqlbase.EncDatumRow, *ProducerMetadata) { if s.srcConsumed { // If src has been exhausted, then we also must have advanced away from the @@ -88,7 +88,7 @@ func (s *streamGroupAccumulator) nextGroup( return s.curGroup, nil } - if err := s.memAcc.Grow(evalCtx.Ctx(), int64(row.Size())); err != nil { + if err := s.memAcc.Grow(ctx, int64(row.Size())); err != nil { return nil, &ProducerMetadata{Err: err} } row = s.rowAlloc.CopyRow(row) @@ -117,7 +117,7 @@ func (s *streamGroupAccumulator) nextGroup( n := len(s.curGroup) ret := s.curGroup[:n:n] s.curGroup = s.curGroup[:0] - s.memAcc.Empty(evalCtx.Ctx()) + s.memAcc.Empty(ctx) s.leftoverRow = row return ret, nil } diff --git a/pkg/sql/distsqlrun/stream_merger.go b/pkg/sql/distsqlrun/stream_merger.go index ee4306224aeb..b969aa034367 100644 --- a/pkg/sql/distsqlrun/stream_merger.go +++ b/pkg/sql/distsqlrun/stream_merger.go @@ -49,18 +49,18 @@ func (sm *streamMerger) start(ctx context.Context) { // the right stream, all matching on the equality columns. One of the sets can // be empty. func (sm *streamMerger) NextBatch( - evalCtx *tree.EvalContext, + ctx context.Context, evalCtx *tree.EvalContext, ) ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRow, *ProducerMetadata) { if sm.leftGroup == nil { var meta *ProducerMetadata - sm.leftGroup, meta = sm.left.nextGroup(evalCtx) + sm.leftGroup, meta = sm.left.nextGroup(ctx, evalCtx) if meta != nil { return nil, nil, meta } } if sm.rightGroup == nil { var meta *ProducerMetadata - sm.rightGroup, meta = sm.right.nextGroup(evalCtx) + sm.rightGroup, meta = sm.right.nextGroup(ctx, evalCtx) if meta != nil { return nil, nil, meta } diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index da2c8f2331a5..d4bd37437466 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -169,7 +169,7 @@ func (ex *connExecutor) recordStatementSummary( sessionAge := phaseTimes[plannerEndExecStmt]. Sub(phaseTimes[sessionInit]).Seconds() - log.Infof(planner.EvalContext().Ctx(), + log.Infof(ctx, "query stats: %d rows, %d retries, "+ "parse %.2fµs (%.1f%%), "+ "plan %.2fµs (%.1f%%), "+