Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colflow: fix recent misuse of two slices in the flow setup #56597

Merged
merged 1 commit into from
Nov 12, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 10 additions & 23 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,7 @@ type vectorizedFlowCreator struct {
numClosers int32
numClosed int32

scratch struct {
inputs []colexecbase.Operator
metadataSourcesQueue []execinfrapb.MetadataSource
toClose []colexecbase.Closer
}
inputsScratch []colexecbase.Operator
}

var _ execinfra.Releasable = &vectorizedFlowCreator{}
Expand Down Expand Up @@ -570,7 +566,7 @@ func newVectorizedFlowCreator(
releasables: creator.releasables,
diskQueueCfg: diskQueueCfg,
fdSemaphore: fdSemaphore,
scratch: creator.scratch,
inputsScratch: creator.inputsScratch,
}
return creator
}
Expand Down Expand Up @@ -606,15 +602,7 @@ func (s *vectorizedFlowCreator) Release() {
monitors: s.monitors[:0],
accounts: s.accounts[:0],
releasables: s.releasables[:0],
scratch: struct {
inputs []colexecbase.Operator
metadataSourcesQueue []execinfrapb.MetadataSource
toClose []colexecbase.Closer
}{
inputs: s.scratch.inputs[:0],
metadataSourcesQueue: s.scratch.metadataSourcesQueue[:0],
toClose: s.scratch.toClose[:0],
},
inputsScratch: s.inputsScratch[:0],
}
vectorizedFlowCreatorPool.Put(s)
}
Expand Down Expand Up @@ -667,6 +655,7 @@ func (s *vectorizedFlowCreator) newStreamingMemAccount(
// setupRemoteOutputStream sets up an Outbox that will operate according to
// the given StreamEndpointSpec. It will also drain all MetadataSources in the
// metadataSourcesQueue.
// NOTE: The caller must not reuse the metadataSourcesQueue and toClose.
func (s *vectorizedFlowCreator) setupRemoteOutputStream(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -726,6 +715,7 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream(
// metadataSourcesQueue will always be fully consumed.
// NOTE: This method supports only BY_HASH routers. Callers should handle
// PASS_THROUGH routers separately.
// NOTE: The caller must not reuse the metadataSourcesQueue and toClose.
func (s *vectorizedFlowCreator) setupRouter(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -942,7 +932,7 @@ func (s *vectorizedFlowCreator) setupInput(
// spec of pspec. The metadataSourcesQueue and toClose slices are fully consumed
// by either passing them to an outbox or HashRouter to be drained/closed, or
// storing them in streamIDToInputOp with the given op to be processed later.
// NOTE: The caller must not reuse the metadataSourcesQueue.
// NOTE: The caller must not reuse the metadataSourcesQueue and toClose.
func (s *vectorizedFlowCreator) setupOutput(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand All @@ -961,8 +951,6 @@ func (s *vectorizedFlowCreator) setupOutput(
op,
opOutputTypes,
output,
// Pass in a copy of the queue to reset metadataSourcesQueue for
// further appends without overwriting.
metadataSourcesQueue,
toClose,
factory,
Expand All @@ -979,8 +967,7 @@ func (s *vectorizedFlowCreator) setupOutput(
rootOperator: op, metadataSources: metadataSourcesQueue, toClose: toClose,
}
case execinfrapb.StreamEndpointSpec_REMOTE:
// Set up an Outbox. Note that we pass in a copy of metadataSourcesQueue
// so that we can reset it below and keep on writing to it.
// Set up an Outbox.
if s.recordingStats {
// If recording stats, we add a metadata source that will generate all
// stats data as metadata for the stats collectors created so far.
Expand Down Expand Up @@ -1089,12 +1076,12 @@ func (s *vectorizedFlowCreator) setupFlow(
// metadata from these sources is found, the metadataSourcesQueue should be
// added as part of one of the last unconnected inputDAGs in
// streamIDToInputOp. This is to avoid cycles.
metadataSourcesQueue := s.scratch.metadataSourcesQueue[:0]
var metadataSourcesQueue []execinfrapb.MetadataSource
// toClose is similar to metadataSourcesQueue with the difference that these
// components do not produce metadata and should be Closed even during
// non-graceful termination.
toClose := s.scratch.toClose[:0]
inputs := s.scratch.inputs[:0]
var toClose []colexecbase.Closer
inputs := s.inputsScratch[:0]
for i := range pspec.Input {
input, metadataSources, closers, localErr := s.setupInput(ctx, flowCtx, pspec.Input[i], opt, factory)
if localErr != nil {
Expand Down