-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathcfetcher_setup.go
334 lines (316 loc) · 12.3 KB
/
cfetcher_setup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colfetcher
import (
"context"
"sync"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
)
// cFetcherTableArgs describes the information about the index we're fetching
// from. Note that only columns that need to be fetched (i.e. requested by the
// caller) are included in the internal state.
type cFetcherTableArgs struct {
desc catalog.TableDescriptor
index catalog.Index
// ColIdxMap is a mapping from ColumnID of each column to its ordinal. Only
// needed columns are present.
ColIdxMap catalog.TableColMap
isSecondaryIndex bool
// cols are all needed columns of the table that are present in the index.
// The system columns, if requested, are at the end of cols.
cols []catalog.Column
// typs are the types of only needed columns from the table.
typs []*types.T
}
var cFetcherTableArgsPool = sync.Pool{
New: func() interface{} {
return &cFetcherTableArgs{}
},
}
func (a *cFetcherTableArgs) Release() {
// Deeply reset the column descriptors.
for i := range a.cols {
a.cols[i] = nil
}
*a = cFetcherTableArgs{
cols: a.cols[:0],
// The types are small objects, so we don't bother deeply resetting this
// slice.
typs: a.typs[:0],
}
cFetcherTableArgsPool.Put(a)
}
func (a *cFetcherTableArgs) populateTypes(cols []catalog.Column) {
if cap(a.typs) < len(cols) {
a.typs = make([]*types.T, len(cols))
} else {
a.typs = a.typs[:len(cols)]
}
for i := range cols {
a.typs[i] = cols[i].GetType()
}
}
// populateTableArgs fills all fields of the cFetcherTableArgs except for
// ColIdxMap. Note that all columns accessible from the index (i.e. present in
// the key or value part) will be included in the result. In order to prune
// the unnecessary columns away, use keepOnlyNeededColumns.
//
// If index is a secondary index, then all inaccessible columns are pruned away.
// In such a scenario a non-nil idxMap is returned that allows to remap ordinals
// referring to columns from the whole table to the correct positions among only
// accessible columns. post will be adjusted automatically. Columns that are
// not accessible from the secondary index have an undefined value corresponding
// to them if idxMap is non-nil.
//
// For example, say the table has 4 columns (@1, @2, @3, @4), but only 2 columns
// are present in the index we're reading from (@3, @1). In this case, the
// returned table args only contains columns (@1, @3) and we get an index map as
// idxMap = [0, x, 1, x] (where 'x' indicates an undefined value).
// Note that although @3 appears earlier than @1 in the index, because we
// iterate over all columns of the table according to their column ordinals, we
// will see @1 first, so it gets the 0th slot, and @3 second, so it gets the 1st
// slot.
func populateTableArgs(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
table catalog.TableDescriptor,
index catalog.Index,
invertedCol catalog.Column,
visibility execinfrapb.ScanVisibility,
hasSystemColumns bool,
post *execinfrapb.PostProcessSpec,
helper *colexecargs.ExprHelper,
) (_ *cFetcherTableArgs, idxMap []int, _ error) {
args := cFetcherTableArgsPool.Get().(*cFetcherTableArgs)
// First, find all columns present in the table and possibly include the
// system columns (when requested).
cols := args.cols[:0]
if visibility == execinfra.ScanVisibilityPublicAndNotPublic {
cols = append(cols, table.ReadableColumns()...)
} else {
cols = append(cols, table.PublicColumns()...)
}
if invertedCol != nil {
for i, col := range cols {
if col.GetID() == invertedCol.GetID() {
cols[i] = invertedCol
break
}
}
}
numSystemCols := 0
if hasSystemColumns {
systemCols := table.SystemColumns()
numSystemCols = len(systemCols)
cols = append(cols, systemCols...)
}
if !index.Primary() {
// If we have a secondary index, not all columns might be available from
// the index, so we'll prune the unavailable columns away.
colIDs := index.CollectKeyColumnIDs()
colIDs.UnionWith(index.CollectSecondaryStoredColumnIDs())
colIDs.UnionWith(index.CollectKeySuffixColumnIDs())
if colIDs.Len() < len(cols)-numSystemCols {
needTypesBeforeRemapping := post.RenderExprs != nil
if needTypesBeforeRemapping {
args.populateTypes(cols)
}
idxMap = make([]int, len(cols))
colIdx := 0
for i := range cols {
//gcassert:bce
id := cols[i].GetID()
if colIDs.Contains(id) || (hasSystemColumns && i >= len(cols)-numSystemCols) {
idxMap[i] = colIdx
cols[colIdx] = cols[i]
colIdx++
}
}
cols = cols[:colIdx]
if err := remapPostProcessSpec(
flowCtx, post, idxMap, helper, args.typs,
); err != nil {
return nil, nil, err
}
}
}
*args = cFetcherTableArgs{
desc: table,
index: index,
isSecondaryIndex: !index.Primary(),
cols: cols,
typs: args.typs,
}
args.populateTypes(cols)
// Before we can safely use types from the table descriptor, we need to
// make sure they are hydrated. In row execution engine it is done during
// the processor initialization, but neither ColBatchScan nor cFetcher are
// processors, so we need to do the hydration ourselves.
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(flowCtx.Txn)
return args, idxMap, resolver.HydrateTypeSlice(ctx, args.typs)
}
// keepOnlyNeededColumns updates the tableArgs to prune all unnecessary columns
// away based on neededColumns slice. If we're reading of the secondary index
// that is not covering all columns, idxMap must be non-nil describing the
// remapping that needs to be used for column ordinals from neededColumns.
// post is updated accordingly to refer to new ordinals of columns. The method
// also populates tableArgs.ColIdxMap.
//
// If traceKV is true, then all columns are considered as needed, and
// neededColumns is ignored.
func keepOnlyNeededColumns(
flowCtx *execinfra.FlowCtx,
tableArgs *cFetcherTableArgs,
idxMap []int,
neededColumns []uint32,
post *execinfrapb.PostProcessSpec,
helper *colexecargs.ExprHelper,
) error {
if !flowCtx.TraceKV && len(neededColumns) < len(tableArgs.cols) {
// If the tracing is not enabled and we don't need all of the available
// columns, we will prune all of the not needed columns away.
// First, populate a set of needed columns.
var neededColumnsSet util.FastIntSet
for _, neededColumn := range neededColumns {
neededColIdx := int(neededColumn)
if idxMap != nil {
neededColIdx = idxMap[neededColIdx]
}
neededColumnsSet.Add(neededColIdx)
}
// When idxMap is non-nil, we can reuse that. Note that in this case
// the length of idxMap is equal to the number of columns in the
// whole table, and we are reading from the secondary index, so the
// slice will have the sufficient size. We also don't need to reset
// it since we'll update the needed positions below.
if idxMap == nil {
idxMap = make([]int, len(tableArgs.typs))
}
// Iterate over all needed columns, populate the idxMap, and adjust
// the post-processing spec to refer only to the needed columns
// directly.
//
// If non-nil idxMap was passed into this method, we have to update it
// by essentially applying a projection on top of the already present
// projection. Consider the following example:
// idxMap = [0, x, 1, x] (where 'x' indicates an undefined value)
// and
// neededColumns = [2].
// Such a setup means that only columns with ordinals @1 and @3 are
// present in the secondary index while only @3 is actually needed.
// Above, we have already remapped neededColIdx = 2 to be 1, so now
// neededColumnsSet only contains 1. The post-processing already refers
// to this column as having index 1.
// However, since we are pruning the column with index 0 away, the
// post-processing stage will see a single column. Thus, we have to
// update the index map to be
// idxMap = [x, 0, x, x]
// and then remap the post-processing spec below so that it refers to
// the single needed column with the correct ordinal.
neededColIdx := 0
for idx, ok := neededColumnsSet.Next(0); ok; idx, ok = neededColumnsSet.Next(idx + 1) {
idxMap[idx] = neededColIdx
neededColIdx++
}
if err := remapPostProcessSpec(
flowCtx, post, idxMap, helper, tableArgs.typs,
); err != nil {
return err
}
// Now we have to actually prune out the unnecessary columns.
neededColIdx = 0
for idx, ok := neededColumnsSet.Next(0); ok; idx, ok = neededColumnsSet.Next(idx + 1) {
tableArgs.cols[neededColIdx] = tableArgs.cols[idx]
tableArgs.typs[neededColIdx] = tableArgs.typs[idx]
neededColIdx++
}
tableArgs.cols = tableArgs.cols[:neededColIdx]
tableArgs.typs = tableArgs.typs[:neededColIdx]
}
// Populate the ColIdxMap.
for i := range tableArgs.cols {
tableArgs.ColIdxMap.Set(tableArgs.cols[i].GetID(), i)
}
return nil
}
// remapPostProcessSpec updates post so that all IndexedVars refer to the new
// ordinals according to idxMap.
//
// For example, say we have idxMap = [0, 0, 1, 2, 0, 0] and a render expression
// like '(@1 + @4) / @3`, then it'll be updated into '(@1 + @3) / @2'. Such an
// idxMap indicates that the table has 6 columns and only 3 of them (0th, 2nd,
// 3rd) are needed.
//
// typsBeforeRemapping need to contain all the types of columns before the
// mapping of idxMap was applied. These will only be used if post.RenderExprs is
// not nil.
//
// If preserveFlowSpecs is true, then this method updates post to store the
// original output columns or render expressions. Notably, in order to not
// corrupt the flow specs that have been scheduled to run on the remote nodes,
// this method will allocate fresh slices instead of updating the old slices in
// place (the flow specs for the remote nodes have shallow copies of this
// PostProcessSpec).
// NB: it is ok that we're modifying the specs - we are in the flow setup path
// which occurs **after** we have sent out SetupFlowRequest RPCs. In other
// words, every node must have gotten the unmodified version of the spec and is
// now free to modify it as it pleases.
func remapPostProcessSpec(
flowCtx *execinfra.FlowCtx,
post *execinfrapb.PostProcessSpec,
idxMap []int,
helper *colexecargs.ExprHelper,
typsBeforeRemapping []*types.T,
) error {
if post.Projection {
outputColumns := post.OutputColumns
if flowCtx.PreserveFlowSpecs && post.OriginalOutputColumns == nil {
// This is the first time we're modifying this PostProcessSpec, but
// we've been asked to preserve the specs, so we have to set the
// original output columns. We are also careful to allocate a new
// slice to populate the updated projection.
post.OriginalOutputColumns = outputColumns
post.OutputColumns = make([]uint32, len(outputColumns))
}
for i, colIdx := range outputColumns {
post.OutputColumns[i] = uint32(idxMap[colIdx])
}
} else if post.RenderExprs != nil {
renderExprs := post.RenderExprs
if flowCtx.PreserveFlowSpecs && post.OriginalRenderExprs == nil {
// This is the first time we're modifying this PostProcessSpec, but
// we've been asked to preserve the specs, so we have to set the
// original render expressions. We are also careful to allocate a
// new slice to populate the updated render expressions.
post.OriginalRenderExprs = renderExprs
post.RenderExprs = make([]execinfrapb.Expression, len(renderExprs))
}
var err error
for i := range renderExprs {
// Make sure that the render expression is deserialized if we
// are on the remote node.
//
// It is ok to use the evalCtx of the flowCtx since it won't be
// mutated (we are not evaluating the expressions).
post.RenderExprs[i].LocalExpr, err = helper.ProcessExpr(renderExprs[i], flowCtx.EvalCtx, typsBeforeRemapping)
if err != nil {
return err
}
post.RenderExprs[i].LocalExpr = physicalplan.RemapIVarsInTypedExpr(renderExprs[i].LocalExpr, idxMap)
}
}
return nil
}