Skip to content

Commit

Permalink
colexec: change Close to take in a context
Browse files Browse the repository at this point in the history
Previously, all `Closer`s would use their own context (either captured
in `Init` or derived from the one in `Init`) in the implementation of
`Close` (for example, when they wanted to log something). However, due
to the way the draining of the wrapped row-by-row processors and
closing of `Closer`s is structured (the draining happens first), it was
possible for the captured context to have a tracing span which was
already `Finish`ed. This is so because the row-by-row processors derive
separate tracing spans and finish them automatically during draining
whereas the closure of `Closer`s happens later.

This commit fixes this issue by passing a context as an argument to
`Close` function, and most of the implementations now use that. Only
components that derive their own tracing span are allowed to use their
own context since they control when the span is finished.

Release note: None
  • Loading branch information
yuzefovich committed Feb 7, 2022
1 parent fc1957a commit 36f3597
Show file tree
Hide file tree
Showing 47 changed files with 211 additions and 187 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ func benchmarkAggregateFunction(
break
}
}
if err = a.(colexecop.Closer).Close(); err != nil {
if err = a.(colexecop.Closer).Close(ctx); err != nil {
b.Fatal(err)
}
source.Reset(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecagg/default_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ func (a *default_AGGKINDAggAlloc) newAggFunc() AggregateFunc {
return f
}

func (a *default_AGGKINDAggAlloc) Close() error {
func (a *default_AGGKINDAggAlloc) Close(ctx context.Context) error {
for _, fn := range a.returnedFns {
fn.fn.Close(fn.ctx)
fn.fn.Close(ctx)
}
a.returnedFns = nil
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecagg/hash_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecagg/ordered_default_agg.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ execinfra.Releasable = &NewColOperatorResult{}
// TestCleanupNoError releases the resources associated with this result and
// asserts that no error is returned. It should only be used in tests.
func (r *NewColOperatorResult) TestCleanupNoError(t testing.TB) {
require.NoError(t, r.ToClose.Close())
require.NoError(t, r.ToClose.Close(context.Background()))
}

var newColOperatorResultPool = sync.Pool{
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/colexec/colexecjoin/crossjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *crossJoiner) Next() coldata.Batch {
}
willEmit := c.willEmit()
if willEmit == 0 {
if err := c.Close(); err != nil {
if err := c.Close(c.Ctx); err != nil {
colexecerror.InternalError(err)
}
c.done = true
Expand Down Expand Up @@ -462,8 +462,7 @@ func (b *crossJoinerBase) Reset(ctx context.Context) {
b.builderState.numEmittedTotal = 0
}

func (b *crossJoinerBase) Close() error {
ctx := b.initHelper.EnsureCtx()
func (b *crossJoinerBase) Close(ctx context.Context) error {
if b.rightTuples != nil {
return b.rightTuples.Close(ctx)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colexecjoin/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,20 +889,20 @@ func (o *mergeJoinBase) completeRightBufferedGroup() {
o.finishRightBufferedGroup()
}

func (o *mergeJoinBase) Close() error {
func (o *mergeJoinBase) Close(ctx context.Context) error {
if !o.CloserHelper.Close() {
return nil
}
var lastErr error
for _, op := range []colexecop.Operator{o.left.source, o.right.source} {
if c, ok := op.(colexecop.Closer); ok {
if err := c.Close(); err != nil {
if err := c.Close(ctx); err != nil {
lastErr = err
}
}
}
if h := o.bufferedGroup.helper; h != nil {
if err := h.Close(); err != nil {
if err := h.Close(ctx); err != nil {
lastErr = err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexectestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func RunTestsWithOrderedCols(
// setting, the closing happens at the end of the query execution.
func closeIfCloser(t *testing.T, op colexecop.Operator) {
if c, ok := op.(colexecop.Closer); ok {
if err := c.Close(); err != nil {
if err := c.Close(context.Background()); err != nil {
t.Fatal(err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colexec/colexecwindow/buffered_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ const (
// buffer all tuples from each partition.
type bufferedWindower interface {
Init(ctx context.Context)
Close()
Close(context.Context)

// seekNextPartition is called during the windowSeeking state on the current
// batch. It gives windowers a chance to perform any necessary pre-processing,
Expand Down Expand Up @@ -357,7 +357,7 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
colexecerror.InternalError(
errors.AssertionFailedf("window operator in processing state without buffered rows"))
case windowFinished:
if err = b.Close(); err != nil {
if err = b.Close(b.Ctx); err != nil {
colexecerror.InternalError(err)
}
return coldata.ZeroBatch
Expand All @@ -369,16 +369,16 @@ func (b *bufferedWindowOp) Next() coldata.Batch {
}
}

func (b *bufferedWindowOp) Close() error {
func (b *bufferedWindowOp) Close(ctx context.Context) error {
if !b.CloserHelper.Close() || b.Ctx == nil {
// Either Close() has already been called or Init() was never called. In
// both cases there is nothing to do.
return nil
}
if err := b.bufferQueue.Close(b.EnsureCtx()); err != nil {
if err := b.bufferQueue.Close(ctx); err != nil {
return err
}
b.windower.Close()
b.windower.Close(ctx)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/count_rows_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func (a *countRowsWindowAggregator) Init(ctx context.Context) {
}

// Close implements the bufferedWindower interface.
func (a *countRowsWindowAggregator) Close() {
func (a *countRowsWindowAggregator) Close(ctx context.Context) {
if !a.CloserHelper.Close() {
return
}
a.framer.close()
a.buffer.Close(a.EnsureCtx())
a.buffer.Close(ctx)
}

// processBatch implements the bufferedWindower interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/first_last_nth_value_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ func (b *_OP_NAMEBase) Init(ctx context.Context) {
}

// Close implements the bufferedWindower interface.
func (b *_OP_NAMEBase) Close() {
func (b *_OP_NAMEBase) Close(ctx context.Context) {
if !b.CloserHelper.Close() {
return
}
b.buffer.Close(b.EnsureCtx())
b.buffer.Close(ctx)
}
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/first_value.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/lag.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/last_value.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/lead.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecwindow/lead_lag_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ func (b *_OP_NAMEBase) Init(ctx context.Context) {
}
}

func (b *_OP_NAMEBase) Close() {
func (b *_OP_NAMEBase) Close(ctx context.Context) {
if !b.CloserHelper.Close() {
return
}
b.buffer.Close(b.EnsureCtx())
b.buffer.Close(ctx)
}

// {{/*
Expand Down
Loading

0 comments on commit 36f3597

Please sign in to comment.