Skip to content

Commit

Permalink
colflow: release disk resources in hash router in all cases
Browse files Browse the repository at this point in the history
Previously, it was possible for the disk-backed spilling queue used
by the hash router outputs to not be closed when the hash router exited.
Namely, this could occur if the router output was not fully exhausted
(i.e. it could still produce more batches, but the consumer of the
router output was satisfied and called `DrainMeta`). In such a scenario,
`routerOutput.closeLocked` was never called because a zero-length batch
was never given to `addBatch` nor the output was canceled due to an
error. The flow cleanup also didn't save us because the router outputs
are not added into `ToClose` slice.

The bug is now fixed by closing the router output in `DrainMeta`. This
behavior is acceptable because the caller is not interested in any more
data, and closing the output can be done multiple times (it is a no-op
on all calls except for the first one). There is no regression test
since it's quite tricky to come up with given that the behavior of
router outputs is non-deterministic, and I don't think it's worth
introducing special knobs inside of `DrainMeta` / `Next` for this.

The impact of not closing the spilling queue is that it might lead to
leaking a file descriptor until the node restarts. Although the
temporary directory is deleted on the flow cleanup, the bug would result
in a leak of the disk space which is also "fixed" by the node restarts.

Release note: None
  • Loading branch information
yuzefovich committed May 18, 2022
1 parent bdde4c6 commit 39944c3
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions pkg/sql/colflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,18 +261,21 @@ func (o *routerOutputOp) Next() coldata.Batch {
o.nextErrorLocked(err)
}
}
// This is the last batch. closeLocked will set done to protect against
// further calls to Next since this is allowed by the interface as well as
// cleaning up and releasing possible disk infrastructure.
// This is the last batch. closeLocked will set the state of the output
// to draining to protect against further calls to Next since this is
// allowed by the interface as well as cleaning up and releasing
// possible disk infrastructure.
o.closeLocked(o.Ctx)
}
return b
}

func (o *routerOutputOp) DrainMeta() []execinfrapb.ProducerMetadata {
o.mu.Lock()
o.mu.state = routerOutputOpDraining
o.maybeUnblockLocked()
// The call to DrainMeta() indicates that the caller will no longer need any
// more data from this output, so we can close it.
o.closeLocked(o.Ctx)
o.mu.Unlock()
return o.drainCoordinator.drainMeta()
}
Expand All @@ -282,6 +285,8 @@ func (o *routerOutputOp) initWithHashRouter(r *HashRouter) {
o.drainCoordinator = r
}

// closeLocked sets the state of the output to 'draining' as well as releases
// possible disk infrastructure. It is safe to be called multiple times.
func (o *routerOutputOp) closeLocked(ctx context.Context) {
o.mu.state = routerOutputOpDraining
if err := o.mu.data.Close(ctx); err != nil {
Expand Down

0 comments on commit 39944c3

Please sign in to comment.