Skip to content

Commit

Permalink
sql: bust incorrect usages of evalCtx.Ctx()
Browse files Browse the repository at this point in the history
Previously, a bunch of callsites were incorrectly using evalCtx.Ctx()
instead of a more appropriate context. This was admittedly confusing,
and this commit does nothing to improve the confusion, though it does
fix the incorrect context usages.

Processors must never use evalCtx.Ctx() once they've been started - they
must always use the context embedded into their processor base.

Release note (bug fix): fix a crash that could occur when using logspy
tracing in some circumstances.
  • Loading branch information
jordanlewis committed Feb 21, 2019
1 parent 1387b4f commit 5b21532
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
typ = sqlbase.ColTypeInfoFromColTypes(colTypes)
}
rows = rowcontainer.NewRowContainer(subqueryMemAccount, typ, 0)
defer rows.Close(evalCtx.Ctx())
defer rows.Close(ctx)

subqueryRowReceiver := NewRowResultWriter(rows)
subqueryRecv.resultWriter = subqueryRowReceiver
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (m *mergeJoiner) nextRow() (sqlbase.EncDatumRow, *ProducerMetadata) {
// TODO(paul): Investigate (with benchmarks) whether or not it's
// worthwhile to only buffer one row from the right stream per batch
// for semi-joins.
m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.evalCtx)
m.leftRows, m.rightRows, meta = m.streamMerger.NextBatch(m.Ctx, m.evalCtx)
if meta != nil {
return nil, meta
}
Expand All @@ -242,7 +242,7 @@ func (m *mergeJoiner) nextRow() (sqlbase.EncDatumRow, *ProducerMetadata) {

func (m *mergeJoiner) close() {
if m.InternalClose() {
ctx := m.evalCtx.Ctx()
ctx := m.Ctx
m.streamMerger.close(ctx)
m.MemMonitor.Stop(ctx)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsqlrun/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *sorterBase) close() {
if s.i != nil {
s.i.Close()
}
ctx := s.evalCtx.Ctx()
ctx := s.Ctx
s.rows.Close(ctx)
s.MemMonitor.Stop(ctx)
if s.diskMonitor != nil {
Expand Down Expand Up @@ -522,7 +522,7 @@ func (s *sortChunksProcessor) chunkCompleted(
// if a metadata record was encountered). The caller is expected to drain when
// this returns false.
func (s *sortChunksProcessor) fill() (bool, error) {
ctx := s.evalCtx.Ctx()
ctx := s.Ctx

var meta *ProducerMetadata

Expand Down Expand Up @@ -592,6 +592,7 @@ func (s *sortChunksProcessor) Start(ctx context.Context) context.Context {

// Next is part of the RowSource interface.
func (s *sortChunksProcessor) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
ctx := s.Ctx
for s.State == StateRunning {
ok, err := s.i.Valid()
if err != nil {
Expand All @@ -600,7 +601,6 @@ func (s *sortChunksProcessor) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
}
// If we don't have an active chunk, clear and refill it.
if !ok {
ctx := s.evalCtx.Ctx()
if err := s.rows.UnsafeReset(ctx); err != nil {
s.MoveToDraining(err)
break
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsqlrun/stream_group_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *streamGroupAccumulator) start(ctx context.Context) {
// nextGroup returns the next group from the inputs. The returned slice is not safe
// to use after the next call to nextGroup.
func (s *streamGroupAccumulator) nextGroup(
evalCtx *tree.EvalContext,
ctx context.Context, evalCtx *tree.EvalContext,
) ([]sqlbase.EncDatumRow, *ProducerMetadata) {
if s.srcConsumed {
// If src has been exhausted, then we also must have advanced away from the
Expand All @@ -88,7 +88,7 @@ func (s *streamGroupAccumulator) nextGroup(
return s.curGroup, nil
}

if err := s.memAcc.Grow(evalCtx.Ctx(), int64(row.Size())); err != nil {
if err := s.memAcc.Grow(ctx, int64(row.Size())); err != nil {
return nil, &ProducerMetadata{Err: err}
}
row = s.rowAlloc.CopyRow(row)
Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *streamGroupAccumulator) nextGroup(
n := len(s.curGroup)
ret := s.curGroup[:n:n]
s.curGroup = s.curGroup[:0]
s.memAcc.Empty(evalCtx.Ctx())
s.memAcc.Empty(ctx)
s.leftoverRow = row
return ret, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsqlrun/stream_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,18 @@ func (sm *streamMerger) start(ctx context.Context) {
// the right stream, all matching on the equality columns. One of the sets can
// be empty.
func (sm *streamMerger) NextBatch(
evalCtx *tree.EvalContext,
ctx context.Context, evalCtx *tree.EvalContext,
) ([]sqlbase.EncDatumRow, []sqlbase.EncDatumRow, *ProducerMetadata) {
if sm.leftGroup == nil {
var meta *ProducerMetadata
sm.leftGroup, meta = sm.left.nextGroup(evalCtx)
sm.leftGroup, meta = sm.left.nextGroup(ctx, evalCtx)
if meta != nil {
return nil, nil, meta
}
}
if sm.rightGroup == nil {
var meta *ProducerMetadata
sm.rightGroup, meta = sm.right.nextGroup(evalCtx)
sm.rightGroup, meta = sm.right.nextGroup(ctx, evalCtx)
if meta != nil {
return nil, nil, meta
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/executor_statement_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (ex *connExecutor) recordStatementSummary(
sessionAge := phaseTimes[plannerEndExecStmt].
Sub(phaseTimes[sessionInit]).Seconds()

log.Infof(planner.EvalContext().Ctx(),
log.Infof(ctx,
"query stats: %d rows, %d retries, "+
"parse %.2fµs (%.1f%%), "+
"plan %.2fµs (%.1f%%), "+
Expand Down

0 comments on commit 5b21532

Please sign in to comment.