diff --git a/pkg/sql/colflow/routers.go b/pkg/sql/colflow/routers.go index eb2aac12acdb..af3dd6a98bc0 100644 --- a/pkg/sql/colflow/routers.go +++ b/pkg/sql/colflow/routers.go @@ -261,9 +261,10 @@ func (o *routerOutputOp) Next() coldata.Batch { o.nextErrorLocked(err) } } - // This is the last batch. closeLocked will set done to protect against - // further calls to Next since this is allowed by the interface as well as - // cleaning up and releasing possible disk infrastructure. + // This is the last batch. closeLocked will set the state of the output + // to draining to protect against further calls to Next since this is + // allowed by the interface as well as cleaning up and releasing + // possible disk infrastructure. o.closeLocked(o.Ctx) } return b @@ -271,8 +272,10 @@ func (o *routerOutputOp) Next() coldata.Batch { func (o *routerOutputOp) DrainMeta() []execinfrapb.ProducerMetadata { o.mu.Lock() - o.mu.state = routerOutputOpDraining o.maybeUnblockLocked() + // The call to DrainMeta() indicates that the caller will no longer need any + // more data from this output, so we can close it. + o.closeLocked(o.Ctx) o.mu.Unlock() return o.drainCoordinator.drainMeta() } @@ -282,6 +285,8 @@ func (o *routerOutputOp) initWithHashRouter(r *HashRouter) { o.drainCoordinator = r } +// closeLocked sets the state of the output to 'draining' as well as releases +// possible disk infrastructure. It is safe to be called multiple times. func (o *routerOutputOp) closeLocked(ctx context.Context) { o.mu.state = routerOutputOpDraining if err := o.mu.data.Close(ctx); err != nil {