From 39944c36e91553794b762d24f214684f134bbc4a Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 18 May 2022 23:49:08 +0000 Subject: [PATCH] colflow: release disk resources in hash router in all cases Previously, it was possible for the disk-backed spilling queue used by the hash router outputs to not be closed when the hash router exited. Namely, this could occur if the router output was not fully exhausted (i.e. it could still produce more batches, but the consumer of the router output was satisfied and called `DrainMeta`). In such a scenario, `routerOutput.closeLocked` was never called because a zero-length batch was never given to `addBatch` nor the output was canceled due to an error. The flow cleanup also didn't save us because the router outputs are not added into `ToClose` slice. The bug is now fixed by closing the router output in `DrainMeta`. This behavior is acceptable because the caller is not interested in any more data, and closing the output can be done multiple times (it is a no-op on all calls except for the first one). There is no regression test since it's quite tricky to come up with given that the behavior of router outputs is non-deterministic, and I don't think it's worth introducing special knobs inside of `DrainMeta` / `Next` for this. The impact of not closing the spilling queue is that it might lead to leaking a file descriptor until the node restarts. Although the temporary directory is deleted on the flow cleanup, the bug would result in a leak of the disk space which is also "fixed" by the node restarts. Release note: None --- pkg/sql/colflow/routers.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/sql/colflow/routers.go b/pkg/sql/colflow/routers.go index eb2aac12acdb..af3dd6a98bc0 100644 --- a/pkg/sql/colflow/routers.go +++ b/pkg/sql/colflow/routers.go @@ -261,9 +261,10 @@ func (o *routerOutputOp) Next() coldata.Batch { o.nextErrorLocked(err) } } - // This is the last batch. closeLocked will set done to protect against - // further calls to Next since this is allowed by the interface as well as - // cleaning up and releasing possible disk infrastructure. + // This is the last batch. closeLocked will set the state of the output + // to draining to protect against further calls to Next since this is + // allowed by the interface as well as cleaning up and releasing + // possible disk infrastructure. o.closeLocked(o.Ctx) } return b @@ -271,8 +272,10 @@ func (o *routerOutputOp) Next() coldata.Batch { func (o *routerOutputOp) DrainMeta() []execinfrapb.ProducerMetadata { o.mu.Lock() - o.mu.state = routerOutputOpDraining o.maybeUnblockLocked() + // The call to DrainMeta() indicates that the caller will no longer need any + // more data from this output, so we can close it. + o.closeLocked(o.Ctx) o.mu.Unlock() return o.drainCoordinator.drainMeta() } @@ -282,6 +285,8 @@ func (o *routerOutputOp) initWithHashRouter(r *HashRouter) { o.drainCoordinator = r } +// closeLocked sets the state of the output to 'draining' as well as releases +// possible disk infrastructure. It is safe to be called multiple times. func (o *routerOutputOp) closeLocked(ctx context.Context) { o.mu.state = routerOutputOpDraining if err := o.mu.data.Close(ctx); err != nil {