-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathspan_assembler_tmpl.go
354 lines (309 loc) · 12.4 KB
/
span_assembler_tmpl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// {{/*
// +build execgen_template
//
// This file is the execgen template for span_encoder.eg.go. It's formatted in a
// special way, so it's both valid Go and a valid text/template input. This
// permits editing this file with editor support.
//
// */}}
package colexecjoin
import (
"sync"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
)
// NewColSpanAssembler returns a ColSpanAssembler operator that is able to
// generate lookup spans from input batches. The given size limit determines
// how many bytes should be allocated for span keys for each batch of spans.
func NewColSpanAssembler(
codec keys.SQLCodec,
allocator *colmem.Allocator,
table catalog.TableDescriptor,
index catalog.Index,
inputTypes []*types.T,
neededCols util.FastIntSet,
sizeLimit int,
) ColSpanAssembler {
colFamStartKeys, colFamEndKeys := getColFamilyEncodings(neededCols, table, index)
// Add span encoders to encode each primary key column as bytes. The
// ColSpanAssembler will later append these together to form valid spans.
var spanEncoders []spanEncoder
for i := 0; i < index.NumKeyColumns(); i++ {
asc := index.GetKeyColumnDirection(i) == descpb.IndexDescriptor_ASC
spanEncoders = append(spanEncoders, newSpanEncoder(allocator, inputTypes[i], asc, i))
}
b := spanAssemblerPool.Get().(*spanAssemblerBase)
base := spanAssemblerBase{
allocator: allocator,
spans: b.spans[:0],
keyPrefix: rowenc.MakeIndexKeyPrefix(codec, table, index.GetID()),
spanEncoders: spanEncoders,
spanCols: make([]*coldata.Bytes, len(spanEncoders)),
colFamStartKeys: colFamStartKeys,
colFamEndKeys: colFamEndKeys,
sizeLimit: sizeLimit,
}
if len(colFamStartKeys) == 0 {
return &spanAssemblerNoColFamily{spanAssemblerBase: base}
}
return &spanAssemblerWithColFamily{spanAssemblerBase: base}
}
var spanAssemblerPool = sync.Pool{
New: func() interface{} {
return &spanAssemblerBase{}
},
}
// ColSpanAssembler is a utility operator that generates a series of spans from
// input batches which can be used to perform an index join or lookup join.
type ColSpanAssembler interface {
execinfra.Releasable
// ConsumeBatch generates lookup spans from input batches and stores them to
// later be returned by GetSpans. Once the size limit has been reached,
// ConsumeBatch returns true. Note that it is up to the caller to choose
// whether to respect the size limit.
ConsumeBatch(coldata.Batch) (reachedLimit bool)
// GetSpans returns the set of spans that have been generated so far. The
// returned Spans object is still owned by the SpanAssembler, so subsequent
// calls to GetSpans will invalidate the spans returned by previous calls. A
// caller that wishes to hold on to spans over the course of multiple calls
// should perform a shallow copy of the Spans. GetSpans will return nil if it
// is called before ConsumeBatch.
GetSpans() roachpb.Spans
// Close closes the ColSpanAssembler operator.
Close()
}
// spanAssemblerBase extracts common fields between the SpanAssembler operators.
type spanAssemblerBase struct {
allocator *colmem.Allocator
// spans is the list of spans that have been assembled so far. spans is owned
// and reset upon each call to GetSpans by the SpanAssembler operator.
spans roachpb.Spans
// keyPrefix contains the portion of the encoding that is shared between all
// spans - namely, table, etc.
keyPrefix roachpb.Key
// scratchKey is a scratch space used to append the key prefix and the key
// column encodings. It is reused for each span.
scratchKey roachpb.Key
// spanEncoders is an ordered list of utility operators that encode each key
// column in vectorized fashion.
spanEncoders []spanEncoder
// spanCols is used to iterate through the input columns that contain the
// key encodings during span construction.
spanCols []*coldata.Bytes
// colFamStartKeys and colFamEndKeys is the list of start and end key suffixes
// for the column families that should be scanned. The spans will be split to
// scan over each family individually. Note that it is not necessarily
// possible to break a span into family scans.
colFamStartKeys, colFamEndKeys []roachpb.Key
// maxSpansLength tracks the largest length the spans field reaches, so that
// all the Span objects can be deeply reset upon a call to close.
maxSpansLength int
// sizeLimit is a limit on the number of bytes that can be allocated for the
// span keys before the spans are returned. It is a field instead of a
// constant so that it can be modified in tests.
sizeLimit int
// shouldReset tracks whether ConsumeBatch needs to reset the spans slice
// before appending to it.
shouldReset bool
}
// {{range .}}
type _OP_STRING struct {
spanAssemblerBase
}
var _ ColSpanAssembler = &_OP_STRING{}
// ConsumeBatch implements the ColSpanAssembler interface.
func (op *_OP_STRING) ConsumeBatch(batch coldata.Batch) (reachedLimit bool) {
n := batch.Length()
if n == 0 {
return false /* reachedLimit */
}
if op.shouldReset {
if len(op.spans) > op.maxSpansLength {
// Ensure that all initialized spans are deeply reset upon a call to Close.
op.maxSpansLength = len(op.spans)
}
op.spans = op.spans[:0]
}
op.shouldReset = false
for i := range op.spanEncoders {
op.spanCols[i] = op.spanEncoders[i].next(batch)
}
// TODO(drewk): currently we have to allocate each key individually because
// makeKVBatchFetcherWithSendFunc doesn't copy the underlying bytes of the
// spans. Consider making the batch fetcher perform a deep copy so we can
// store all keys in a flat byte slice, and reset and reuse it for each batch.
var spanBytes int
oldSpansCap := cap(op.spans)
for i := 0; i < n; i++ {
// Every key has a prefix encoding the table, index, etc.
op.scratchKey = append(op.scratchKey[:0], op.keyPrefix...)
for j := range op.spanCols {
// The encoding for each primary key column has previously been
// calculated and stored in an input column.
op.scratchKey = append(op.scratchKey, op.spanCols[j].Get(i)...)
}
// {{if .WithColFamilies}}
constructSpans(true)
// {{else}}
constructSpans(false)
// {{end}}
}
spansSizeInc := (cap(op.spans) - oldSpansCap) * int(spanSize)
op.allocator.AdjustMemoryUsage(int64(spanBytes + spansSizeInc))
return spanBytes >= op.sizeLimit
}
// {{end}}
const spanSize = unsafe.Sizeof(roachpb.Span{})
// GetSpans implements the ColSpanAssembler interface.
func (b *spanAssemblerBase) GetSpans() roachpb.Spans {
if b.shouldReset {
// No spans have been generated since the last call to GetSpans.
return nil
}
b.shouldReset = true
return b.spans
}
// Close implements the ColSpanAssembler interface.
func (b *spanAssemblerBase) Close() {
for i := range b.spanEncoders {
b.spanEncoders[i].close()
}
}
// Release implements the ColSpanAssembler interface.
func (b *spanAssemblerBase) Release() {
for i := range b.spanCols {
// Release references to input columns.
b.spanCols[i] = nil
}
b.spans = b.spans[:b.maxSpansLength]
for i := range b.spans {
// Deeply reset all spans that were initialized during execution.
b.spans[i] = roachpb.Span{}
}
*b = spanAssemblerBase{
spans: b.spans[:0],
}
spanAssemblerPool.Put(b)
}
// execgen:inline
// execgen:template<hasFamilies>
func constructSpans(hasFamilies bool) {
if hasFamilies {
// The span for each row can be split into a series of column family spans,
// which have the column family ID as a suffix. Individual column family
// spans can be served as Get requests, which are more efficient than Scan
// requests.
for j := range op.colFamStartKeys {
var span roachpb.Span
span.Key = make(roachpb.Key, 0, len(op.scratchKey)+len(op.colFamStartKeys[j]))
span.Key = append(span.Key, op.scratchKey...)
span.Key = append(span.Key, op.colFamStartKeys[j]...)
spanBytes += len(span.Key)
// The end key may be nil, in which case the span is a point lookup.
if len(op.colFamEndKeys[j]) > 0 {
span.EndKey = make(roachpb.Key, 0, len(op.scratchKey)+len(op.colFamEndKeys[j]))
span.EndKey = append(span.EndKey, op.scratchKey...)
span.EndKey = append(span.EndKey, op.colFamEndKeys[j]...)
spanBytes += len(span.EndKey)
}
op.spans = append(op.spans, span)
}
} else {
// The spans cannot be split into column family spans, so there will be
// exactly one span for each input row.
var span roachpb.Span
span.Key = make(roachpb.Key, 0, len(op.scratchKey))
span.Key = append(span.Key, op.scratchKey...)
spanBytes += len(span.Key)
span.EndKey = make(roachpb.Key, 0, len(op.scratchKey)+1)
span.EndKey = append(span.EndKey, op.scratchKey...)
// TODO(drewk): change this to use PrefixEnd() when interleaved indexes are
// permanently removed. Keep it this way for now to allow testing
// against the row engine, even though the vectorized index joiner doesn't
// allow interleaved indexes.
span.EndKey = encoding.EncodeInterleavedSentinel(span.EndKey)
spanBytes += len(span.EndKey)
op.spans = append(op.spans, span)
}
}
// getColFamilyEncodings returns two lists of keys of the same length. Each pair
// of keys at the same index corresponds to the suffixes of the start and end
// keys of a span over a specific column family (or adjacent column families).
// If the returned lists are empty, the spans cannot be split into separate
// family spans.
func getColFamilyEncodings(
neededCols util.FastIntSet, table catalog.TableDescriptor, index catalog.Index,
) (startKeys, endKeys []roachpb.Key) {
familyIDs := rowenc.NeededColumnFamilyIDs(neededCols, table, index)
if !canSplitSpans(len(familyIDs), table, index) {
return nil, nil
}
for i, familyID := range familyIDs {
var key roachpb.Key
key = keys.MakeFamilyKey(key, uint32(familyID))
if i > 0 && familyID-1 == familyIDs[i-1] && endKeys != nil {
// This column family is adjacent to the previous one. We can merge
// the two spans into one.
endKeys[len(endKeys)-1] = key.PrefixEnd()
} else {
startKeys = append(startKeys, key)
endKeys = append(endKeys, nil)
}
}
return startKeys, endKeys
}
// canSplitSpans returns true if the spans that will be generated by the
// SpanAssembler operator can be split into spans over individual column
// families. For index joins, either all spans can be split or none can because
// the lookup columns are never nullable (null values prevent the index key from
// being fully knowable).
func canSplitSpans(numNeededFamilies int, table catalog.TableDescriptor, index catalog.Index) bool {
// We can only split a span into separate family specific point lookups if:
//
// * The table is not a special system table. (System tables claim to have
// column families, but actually do not, since they're written to with
// raw KV puts in a "legacy" way.)
if table.GetID() > 0 && table.GetID() < keys.MaxReservedDescID {
return false
}
// * The index either has just 1 family (so we'll make a GetRequest) or we
// need fewer than every column family in the table (otherwise we'd just
// make a big ScanRequest).
numFamilies := len(table.GetFamilies())
if numFamilies > 1 && numNeededFamilies == numFamilies {
return false
}
// Other requirements that are always satisfied by index joins, and therefore
// do not need to be checked:
// * The index is unique.
// * The index is fully constrained.
// * If we're looking at a secondary index...
// * The index constraint must not contain null, since that would cause the
// index key to not be completely knowable.
// * The index cannot be inverted.
// * The index must store some columns.
// * The index is a new enough version.
//
// We've passed all the conditions, and should be able to safely split this
// span into multiple column-family-specific spans.
return true
}