-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathop_creation.go
118 lines (108 loc) · 4.63 KB
/
op_creation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colexec
import (
"context"
"sync"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/marusama/semaphore"
)
// TestNewColOperator is a test helper that's always aliased to builder.NewColOperator.
// We inject this at test time, so tests can use NewColOperator from colexec
// package.
var TestNewColOperator func(ctx context.Context, flowCtx *execinfra.FlowCtx, args *NewColOperatorArgs,
) (r *NewColOperatorResult, err error)
// NewColOperatorArgs is a helper struct that encompasses all of the input
// arguments to NewColOperator call.
type NewColOperatorArgs struct {
Spec *execinfrapb.ProcessorSpec
Inputs []colexecbase.Operator
StreamingMemAccount *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Factory coldata.ColumnFactory
TestingKnobs struct {
// SpillingCallbackFn will be called when the spilling from an in-memory to
// disk-backed operator occurs. It should only be set in tests.
SpillingCallbackFn func()
// NumForcedRepartitions specifies a number of "repartitions" that a
// disk-backed operator should be forced to perform. "Repartition" can mean
// different things depending on the operator (for example, for hash joiner
// it is dividing original partition into multiple new partitions; for
// sorter it is merging already created partitions into new one before
// proceeding to the next partition from the input).
NumForcedRepartitions int
// UseStreamingMemAccountForBuffering specifies whether to use
// StreamingMemAccount when creating buffering operators and should only be
// set to 'true' in tests. The idea behind this flag is reducing the number
// of memory accounts and monitors we need to close, so we plumbed it into
// the planning code so that it doesn't create extra memory monitoring
// infrastructure (and so that we could use testMemAccount defined in
// main_test.go).
UseStreamingMemAccountForBuffering bool
// DiskSpillingDisabled specifies whether only in-memory operators should
// be created.
DiskSpillingDisabled bool
// DelegateFDAcquisitions should be observed by users of a
// PartitionedDiskQueue. During normal operations, these should acquire the
// maximum number of file descriptors they will use from FDSemaphore up
// front. Setting this testing knob to true disables that behavior and
// lets the PartitionedDiskQueue interact with the semaphore as partitions
// are opened/closed, which ensures that the number of open files never
// exceeds what is expected.
DelegateFDAcquisitions bool
}
}
// NewColOperatorResult is a helper struct that encompasses all of the return
// values of NewColOperator call.
type NewColOperatorResult struct {
Op colexecbase.Operator
KVReader execinfra.KVReader
ColumnTypes []*types.T
MetadataSources []execinfrapb.MetadataSource
// ToClose is a slice of components that need to be Closed.
ToClose []colexecbase.Closer
OpMonitors []*mon.BytesMonitor
OpAccounts []*mon.BoundAccount
Releasables []execinfra.Releasable
}
var _ execinfra.Releasable = &NewColOperatorResult{}
var newColOperatorResultPool = sync.Pool{
New: func() interface{} {
return &NewColOperatorResult{}
},
}
// GetNewColOperatorResult returns a new NewColOperatorResult.
func GetNewColOperatorResult() *NewColOperatorResult {
return newColOperatorResultPool.Get().(*NewColOperatorResult)
}
// Release implements the execinfra.Releasable interface.
func (r *NewColOperatorResult) Release() {
for _, releasable := range r.Releasables {
releasable.Release()
}
*r = NewColOperatorResult{
ColumnTypes: r.ColumnTypes[:0],
MetadataSources: r.MetadataSources[:0],
ToClose: r.ToClose[:0],
OpMonitors: r.OpMonitors[:0],
OpAccounts: r.OpAccounts[:0],
Releasables: r.Releasables[:0],
}
newColOperatorResultPool.Put(r)
}