Skip to content

Commit

Permalink
colflow: fix recent misuse of two slices in the flow setup
Browse files Browse the repository at this point in the history
We've recently added the reusing of metadataSourcesQueue and toClose
slices in order to reduce some allocations. However, the components that
are using those slices don't make a deep copy, and as a result, we
introduced a bug in which we were breaking the current contract. This
commit fixes the issue by going back to the old method (with slight
difference in that we currently delay any allocations unlike previously
when we allocated a slice with capacity of 1).

Release note: None (no release with this bug)
  • Loading branch information
yuzefovich committed Nov 12, 2020
1 parent ba363c7 commit c554914
Showing 1 changed file with 10 additions and 23 deletions.
33 changes: 10 additions & 23 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,7 @@ type vectorizedFlowCreator struct {
numClosers int32
numClosed int32

scratch struct {
inputs []colexecbase.Operator
metadataSourcesQueue []execinfrapb.MetadataSource
toClose []colexecbase.Closer
}
inputsScratch []colexecbase.Operator
}

var _ execinfra.Releasable = &vectorizedFlowCreator{}
Expand Down Expand Up @@ -570,7 +566,7 @@ func newVectorizedFlowCreator(
releasables: creator.releasables,
diskQueueCfg: diskQueueCfg,
fdSemaphore: fdSemaphore,
scratch: creator.scratch,
inputsScratch: creator.inputsScratch,
}
return creator
}
Expand Down Expand Up @@ -606,15 +602,7 @@ func (s *vectorizedFlowCreator) Release() {
monitors: s.monitors[:0],
accounts: s.accounts[:0],
releasables: s.releasables[:0],
scratch: struct {
inputs []colexecbase.Operator
metadataSourcesQueue []execinfrapb.MetadataSource
toClose []colexecbase.Closer
}{
inputs: s.scratch.inputs[:0],
metadataSourcesQueue: s.scratch.metadataSourcesQueue[:0],
toClose: s.scratch.toClose[:0],
},
inputsScratch: s.inputsScratch[:0],
}
vectorizedFlowCreatorPool.Put(s)
}
Expand Down Expand Up @@ -667,6 +655,7 @@ func (s *vectorizedFlowCreator) newStreamingMemAccount(
// setupRemoteOutputStream sets up an Outbox that will operate according to
// the given StreamEndpointSpec. It will also drain all MetadataSources in the
// metadataSourcesQueue.
// NOTE: The caller must not reuse the metadataSourcesQueue and toClose.
func (s *vectorizedFlowCreator) setupRemoteOutputStream(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -726,6 +715,7 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream(
// metadataSourcesQueue will always be fully consumed.
// NOTE: This method supports only BY_HASH routers. Callers should handle
// PASS_THROUGH routers separately.
// NOTE: The caller must not reuse the metadataSourcesQueue and toClose.
func (s *vectorizedFlowCreator) setupRouter(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand Down Expand Up @@ -942,7 +932,7 @@ func (s *vectorizedFlowCreator) setupInput(
// spec of pspec. The metadataSourcesQueue and toClose slices are fully consumed
// by either passing them to an outbox or HashRouter to be drained/closed, or
// storing them in streamIDToInputOp with the given op to be processed later.
// NOTE: The caller must not reuse the metadataSourcesQueue.
// NOTE: The caller must not reuse the metadataSourcesQueue and toClose.
func (s *vectorizedFlowCreator) setupOutput(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand All @@ -961,8 +951,6 @@ func (s *vectorizedFlowCreator) setupOutput(
op,
opOutputTypes,
output,
// Pass in a copy of the queue to reset metadataSourcesQueue for
// further appends without overwriting.
metadataSourcesQueue,
toClose,
factory,
Expand All @@ -979,8 +967,7 @@ func (s *vectorizedFlowCreator) setupOutput(
rootOperator: op, metadataSources: metadataSourcesQueue, toClose: toClose,
}
case execinfrapb.StreamEndpointSpec_REMOTE:
// Set up an Outbox. Note that we pass in a copy of metadataSourcesQueue
// so that we can reset it below and keep on writing to it.
// Set up an Outbox.
if s.recordingStats {
// If recording stats, we add a metadata source that will generate all
// stats data as metadata for the stats collectors created so far.
Expand Down Expand Up @@ -1089,12 +1076,12 @@ func (s *vectorizedFlowCreator) setupFlow(
// metadata from these sources is found, the metadataSourcesQueue should be
// added as part of one of the last unconnected inputDAGs in
// streamIDToInputOp. This is to avoid cycles.
metadataSourcesQueue := s.scratch.metadataSourcesQueue[:0]
var metadataSourcesQueue []execinfrapb.MetadataSource
// toClose is similar to metadataSourcesQueue with the difference that these
// components do not produce metadata and should be Closed even during
// non-graceful termination.
toClose := s.scratch.toClose[:0]
inputs := s.scratch.inputs[:0]
var toClose []colexecbase.Closer
inputs := s.inputsScratch[:0]
for i := range pspec.Input {
input, metadataSources, closers, localErr := s.setupInput(ctx, flowCtx, pspec.Input[i], opt, factory)
if localErr != nil {
Expand Down

0 comments on commit c554914

Please sign in to comment.