-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathindex_join.go
321 lines (288 loc) · 10.8 KB
/
index_join.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colfetcher
import (
"context"
"sort"
"time"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)
// ColIndexJoin operators are used to execute index joins (lookup joins that
// scan the primary index and discard input rows).
type ColIndexJoin struct {
colexecop.InitHelper
colexecop.OneInputNode
state indexJoinState
// spanAssembler is used to construct the lookup spans for each input batch.
spanAssembler colexecspan.ColSpanAssembler
flowCtx *execinfra.FlowCtx
rf *cFetcher
// tracingSpan is created when the stats should be collected for the query
// execution, and it will be finished when closing the operator.
tracingSpan *tracing.Span
mu struct {
syncutil.Mutex
// rowsRead contains the number of total rows this ColIndexJoin has
// returned so far.
rowsRead int64
}
// ResultTypes is the slice of resulting column types from this operator.
// It should be used rather than the slice of column types from the scanned
// table because the scan might synthesize additional implicit system columns.
ResultTypes []*types.T
// maintainOrdering is true when the index join is required to maintain its
// input ordering, in which case the ordering of the spans cannot be changed.
maintainOrdering bool
}
var _ colexecop.KVReader = &ColIndexJoin{}
var _ execinfra.Releasable = &ColIndexJoin{}
var _ colexecop.ClosableOperator = &ColIndexJoin{}
// Init initializes a ColIndexJoin.
func (s *ColIndexJoin) Init(ctx context.Context) {
if !s.InitHelper.Init(ctx) {
return
}
// If tracing is enabled, we need to start a child span so that the only
// contention events present in the recording would be because of this
// cFetcher. Note that ProcessorSpan method itself will check whether
// tracing is enabled.
s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, "colindexjoin")
s.Input.Init(ctx)
}
type indexJoinState uint8
const (
indexJoinConstructingSpans indexJoinState = iota
indexJoinScanning
indexJoinDone
)
// Next is part of the Operator interface.
func (s *ColIndexJoin) Next() coldata.Batch {
for {
switch s.state {
case indexJoinConstructingSpans:
var spans roachpb.Spans
var rowCount int
for batch := s.Input.Next(); ; batch = s.Input.Next() {
// Because index joins discard input rows, we do not have to maintain a
// reference to input tuples after span generation.
rowCount += batch.Length()
if batch.Length() == 0 || s.spanAssembler.ConsumeBatch(batch) {
// Reached the memory limit or the end of the input.
spans = s.spanAssembler.GetSpans()
break
}
}
if len(spans) == 0 {
// No lookups left to perform.
s.state = indexJoinDone
continue
}
if !s.maintainOrdering {
// Sort the spans when !maintainOrdering. This allows lower layers to
// optimize iteration over the data. Note that the looked up rows are
// output unchanged, in the retrieval order, so it is not safe to do
// this when maintainOrdering is true (the ordering to be maintained
// may be different than the ordering in the index).
sort.Sort(spans)
}
// Index joins will always return exactly one output row per input row.
s.rf.setEstimatedRowCount(uint64(rowCount))
if err := s.rf.StartScan(
s.flowCtx.Txn, spans, false /* limitBatches */, 0 /* limitHint */, s.flowCtx.TraceKV,
s.flowCtx.EvalCtx.TestingKnobs.ForceProductionBatchSizes,
); err != nil {
colexecerror.InternalError(err)
}
s.state = indexJoinScanning
case indexJoinScanning:
batch, err := s.rf.NextBatch(s.Ctx)
if err != nil {
colexecerror.InternalError(err)
}
if batch.Selection() != nil {
colexecerror.InternalError(
errors.AssertionFailedf("unexpected selection vector on the batch coming from CFetcher"))
}
n := batch.Length()
if n == 0 {
s.state = indexJoinConstructingSpans
continue
}
s.mu.Lock()
s.mu.rowsRead += int64(n)
s.mu.Unlock()
return batch
case indexJoinDone:
return coldata.ZeroBatch
}
}
}
// DrainMeta is part of the colexecop.MetadataSource interface.
func (s *ColIndexJoin) DrainMeta() []execinfrapb.ProducerMetadata {
var trailingMeta []execinfrapb.ProducerMetadata
if tfs := execinfra.GetLeafTxnFinalState(s.Ctx, s.flowCtx.Txn); tfs != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{LeafTxnFinalState: tfs})
}
meta := execinfrapb.GetProducerMeta()
meta.Metrics = execinfrapb.GetMetricsMeta()
meta.Metrics.BytesRead = s.GetBytesRead()
meta.Metrics.RowsRead = s.GetRowsRead()
trailingMeta = append(trailingMeta, *meta)
if trace := execinfra.GetTraceData(s.Ctx); trace != nil {
trailingMeta = append(trailingMeta, execinfrapb.ProducerMetadata{TraceData: trace})
}
return trailingMeta
}
// GetBytesRead is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetBytesRead() int64 {
s.mu.Lock()
defer s.mu.Unlock()
// Note that if Init() was never called, s.rf.fetcher will remain nil, and
// GetBytesRead() will return 0. We are also holding the mutex, so a
// concurrent call to Init() will have to wait, and the fetcher will remain
// uninitialized until we return.
return s.rf.fetcher.GetBytesRead()
}
// GetRowsRead is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetRowsRead() int64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.rowsRead
}
// GetCumulativeContentionTime is part of the colexecop.KVReader interface.
func (s *ColIndexJoin) GetCumulativeContentionTime() time.Duration {
return execinfra.GetCumulativeContentionTime(s.Ctx)
}
// NewColIndexJoin creates a new ColIndexJoin operator.
func NewColIndexJoin(
ctx context.Context,
allocator *colmem.Allocator,
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
input colexecop.Operator,
spec *execinfrapb.JoinReaderSpec,
post *execinfrapb.PostProcessSpec,
inputTypes []*types.T,
) (*ColIndexJoin, error) {
// NB: we hit this with a zero NodeID (but !ok) with multi-tenancy.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok {
return nil, errors.Errorf("attempting to create a ColIndexJoin with uninitialized NodeID")
}
if !spec.LookupExpr.Empty() {
return nil, errors.AssertionFailedf("non-empty lookup expressions are not supported for index joins")
}
if !spec.RemoteLookupExpr.Empty() {
return nil, errors.AssertionFailedf("non-empty remote lookup expressions are not supported for index joins")
}
if !spec.OnExpr.Empty() {
return nil, errors.AssertionFailedf("non-empty ON expressions are not supported for index joins")
}
// TODO(ajwerner): The need to construct an immutable here
// indicates that we're probably doing this wrong. Instead we should be
// just setting the ID and Version in the spec or something like that and
// retrieving the hydrated immutable from cache.
table := spec.BuildTableDescriptor()
cols := table.PublicColumns()
if spec.Visibility == execinfra.ScanVisibilityPublicAndNotPublic {
cols = table.DeletableColumns()
}
columnIdxMap := catalog.ColumnIDToOrdinalMap(cols)
typs := catalog.ColumnTypesWithVirtualCol(cols, nil /* virtualCol */)
// Add all requested system columns to the output.
if spec.HasSystemColumns {
for _, sysCol := range table.SystemColumns() {
typs = append(typs, sysCol.GetType())
columnIdxMap.Set(sysCol.GetID(), columnIdxMap.Len())
}
}
// Before we can safely use types from the table descriptor, we need to
// make sure they are hydrated. In row execution engine it is done during
// the processor initialization, but neither ColIndexJoin nor cFetcher are
// processors, so we need to do the hydration ourselves.
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)
if err := resolver.HydrateTypeSlice(ctx, typs); err != nil {
return nil, err
}
indexIdx := int(spec.IndexIdx)
if indexIdx >= len(table.ActiveIndexes()) {
return nil, errors.Errorf("invalid indexIdx %d", indexIdx)
}
index := table.ActiveIndexes()[indexIdx]
// Retrieve the set of columns that the index join needs to fetch.
var neededColumns util.FastIntSet
if post.OutputColumns != nil {
for _, neededColumn := range post.OutputColumns {
neededColumns.Add(int(neededColumn))
}
} else {
proc := &execinfra.ProcOutputHelper{}
if err := proc.Init(post, typs, semaCtx, evalCtx); err != nil {
colexecerror.InternalError(err)
}
neededColumns = proc.NeededColumns()
}
fetcher, err := initCFetcher(
flowCtx, allocator, table, index, neededColumns, columnIdxMap, nil, /* virtualColumn */
cFetcherArgs{
visibility: spec.Visibility,
lockingStrength: spec.LockingStrength,
lockingWaitPolicy: spec.LockingWaitPolicy,
hasSystemColumns: spec.HasSystemColumns,
memoryLimit: execinfra.GetWorkMemLimit(flowCtx),
},
)
if err != nil {
return nil, err
}
// Allow 1/16 of the operator's working memory for the span key bytes.
batchSizeLimit := int(float64(execinfra.GetWorkMemLimit(flowCtx)) * defaultBatchSizeLimitRatio)
spanAssembler := colexecspan.NewColSpanAssembler(
flowCtx.Codec(), allocator, table, index, inputTypes, neededColumns, batchSizeLimit)
return &ColIndexJoin{
OneInputNode: colexecop.NewOneInputNode(input),
flowCtx: flowCtx,
rf: fetcher,
spanAssembler: spanAssembler,
ResultTypes: typs,
maintainOrdering: spec.MaintainOrdering,
}, nil
}
// defaultBatchSizeLimitRatio is the fraction of the operator working memory
// limit that is devoted to the underlying bytes for the span keys.
var defaultBatchSizeLimitRatio = 1.0 / 16.0
// Release implements the execinfra.Releasable interface.
func (s *ColIndexJoin) Release() {
s.rf.Release()
s.spanAssembler.Release()
}
// Close implements the colexecop.Closer interface.
func (s *ColIndexJoin) Close() error {
if s.tracingSpan != nil {
s.tracingSpan.Finish()
s.tracingSpan = nil
}
s.spanAssembler.Close()
return nil
}