-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
batch.go
336 lines (298 loc) · 10.3 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package coldata
import (
"fmt"
"math"
"strings"
"sync/atomic"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)
// Batch is the type that columnar operators receive and produce. It
// represents a set of column vectors (partial data columns) as well as
// metadata about a batch, like the selection vector (which rows in the column
// batch are selected).
type Batch interface {
// Length returns the number of values in the columns in the batch.
Length() int
// SetLength sets the number of values in the columns in the batch.
SetLength(int)
// Capacity returns the maximum number of values that can be stored in the
// columns in the batch. Note that it could be a lower bound meaning some
// of the Vecs could actually have larger underlying capacity (for example,
// if they have been appended to).
Capacity() int
// Width returns the number of columns in the batch.
Width() int
// ColVec returns the ith Vec in this batch.
ColVec(i int) Vec
// ColVecs returns all of the underlying Vecs in this batch.
ColVecs() []Vec
// Selection, if not nil, returns the selection vector on this batch: a
// densely-packed list of the indices in each column that have not been
// filtered out by a previous step.
Selection() []int
// SetSelection sets whether this batch is using its selection vector or not.
SetSelection(bool)
// AppendCol appends the given Vec to this batch.
AppendCol(Vec)
// ReplaceCol replaces the current Vec at the provided index with the
// provided Vec. The original and the replacement vectors *must* be of the
// same type.
ReplaceCol(Vec, int)
// Reset modifies the caller in-place to have the given length and columns
// with the given types. If it's possible, Reset will reuse the existing
// columns and allocations, invalidating existing references to the Batch or
// its Vecs. However, Reset does _not_ zero out the column data.
//
// NOTE: Reset can allocate a new Batch, so when calling from the vectorized
// engine consider either allocating a new Batch explicitly via
// colexec.Allocator or calling ResetInternalBatch.
Reset(typs []*types.T, length int, factory ColumnFactory)
// ResetInternalBatch resets a batch and its underlying Vecs for reuse. It's
// important for callers to call ResetInternalBatch if they own internal
// batches that they reuse as not doing this could result in correctness
// or memory blowup issues.
ResetInternalBatch()
// String returns a pretty representation of this batch.
String() string
}
var _ Batch = &MemBatch{}
// TODO(jordan): tune.
const defaultBatchSize = 1024
var batchSize int64 = defaultBatchSize
// BatchSize is the maximum number of tuples that fit in a column batch.
func BatchSize() int {
return int(atomic.LoadInt64(&batchSize))
}
// MaxBatchSize is the maximum acceptable size of batches.
const MaxBatchSize = 4096
// SetBatchSizeForTests modifies batchSize variable. It should only be used in
// tests. batch sizes greater than MaxBatchSize will return an error.
func SetBatchSizeForTests(newBatchSize int) error {
if newBatchSize > MaxBatchSize {
return errors.Errorf("batch size %d greater than maximum allowed batch size %d", newBatchSize, MaxBatchSize)
}
atomic.SwapInt64(&batchSize, int64(newBatchSize))
return nil
}
// ResetBatchSizeForTests resets the batchSize variable to the default batch
// size. It should only be used in tests.
func ResetBatchSizeForTests() {
atomic.SwapInt64(&batchSize, defaultBatchSize)
}
// NewMemBatch allocates a new in-memory Batch.
// TODO(jordan): pool these allocations.
func NewMemBatch(typs []*types.T, factory ColumnFactory) Batch {
return NewMemBatchWithCapacity(typs, BatchSize(), factory)
}
// NewMemBatchWithCapacity allocates a new in-memory Batch with the given
// column size. Use for operators that have a precisely-sized output batch.
func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactory) Batch {
b := NewMemBatchNoCols(typs, capacity).(*MemBatch)
for i, t := range typs {
b.b[i] = NewMemColumn(t, capacity, factory)
if b.b[i].CanonicalTypeFamily() == types.BytesFamily {
b.bytesVecIdxs = append(b.bytesVecIdxs, i)
}
}
return b
}
// NewMemBatchNoCols creates a "skeleton" of new in-memory Batch. It allocates
// memory for the selection vector but does *not* allocate any memory for the
// column vectors - those will have to be added separately.
func NewMemBatchNoCols(typs []*types.T, capacity int) Batch {
if max := math.MaxUint16; capacity > max {
panic(fmt.Sprintf(`batches cannot have length larger than %d; requested %d`, max, capacity))
}
b := &MemBatch{}
b.capacity = capacity
b.b = make([]Vec, len(typs))
b.sel = make([]int, capacity)
return b
}
// ZeroBatch is a schema-less Batch of length 0.
var ZeroBatch = &zeroBatch{
MemBatch: NewMemBatchWithCapacity(
nil /* typs */, 0 /* capacity */, StandardColumnFactory,
).(*MemBatch),
}
// zeroBatch is a wrapper around MemBatch that prohibits modifications of the
// batch.
type zeroBatch struct {
*MemBatch
}
var _ Batch = &zeroBatch{}
func (b *zeroBatch) Length() int {
return 0
}
func (b *zeroBatch) Capacity() int {
return 0
}
func (b *zeroBatch) SetLength(int) {
panic("length should not be changed on zero batch")
}
func (b *zeroBatch) SetSelection(bool) {
panic("selection should not be changed on zero batch")
}
func (b *zeroBatch) AppendCol(Vec) {
panic("no columns should be appended to zero batch")
}
func (b *zeroBatch) ReplaceCol(Vec, int) {
panic("no columns should be replaced in zero batch")
}
func (b *zeroBatch) Reset([]*types.T, int, ColumnFactory) {
panic("zero batch should not be reset")
}
// MemBatch is an in-memory implementation of Batch.
type MemBatch struct {
// length is the length of batch or sel in tuples.
length int
// capacity is the maximum number of tuples that can be stored in this
// MemBatch.
capacity int
// b is the slice of columns in this batch.
b []Vec
// bytesVecIdxs stores the indices of all vectors of Bytes type in b. Bytes
// vectors require special handling, so rather than iterating over all
// vectors and checking whether they are of Bytes type we store this slice
// separately.
bytesVecIdxs []int
useSel bool
// sel is - if useSel is true - a selection vector from upstream. A
// selection vector is a list of selected tuple indices in this memBatch's
// columns (tuples for which indices are not in sel are considered to be
// "not present").
sel []int
}
// Length implements the Batch interface.
func (m *MemBatch) Length() int {
return m.length
}
// Capacity implements the Batch interface.
func (m *MemBatch) Capacity() int {
return m.capacity
}
// Width implements the Batch interface.
func (m *MemBatch) Width() int {
return len(m.b)
}
// ColVec implements the Batch interface.
func (m *MemBatch) ColVec(i int) Vec {
return m.b[i]
}
// ColVecs implements the Batch interface.
func (m *MemBatch) ColVecs() []Vec {
return m.b
}
// Selection implements the Batch interface.
func (m *MemBatch) Selection() []int {
if !m.useSel {
return nil
}
return m.sel
}
// SetSelection implements the Batch interface.
func (m *MemBatch) SetSelection(b bool) {
m.useSel = b
}
// SetLength implements the Batch interface.
func (m *MemBatch) SetLength(length int) {
m.length = length
if length > 0 {
for _, bytesVecIdx := range m.bytesVecIdxs {
m.b[bytesVecIdx].Bytes().UpdateOffsetsToBeNonDecreasing(length)
}
}
}
// AppendCol implements the Batch interface.
func (m *MemBatch) AppendCol(col Vec) {
if col.CanonicalTypeFamily() == types.BytesFamily {
m.bytesVecIdxs = append(m.bytesVecIdxs, len(m.b))
}
m.b = append(m.b, col)
}
// ReplaceCol implements the Batch interface.
func (m *MemBatch) ReplaceCol(col Vec, colIdx int) {
if m.b[colIdx] != nil && !m.b[colIdx].Type().Identical(col.Type()) {
panic(fmt.Sprintf("unexpected replacement: original vector is %s "+
"whereas the replacement is %s", m.b[colIdx].Type(), col.Type()))
}
m.b[colIdx] = col
}
// Reset implements the Batch interface.
func (m *MemBatch) Reset(typs []*types.T, length int, factory ColumnFactory) {
cannotReuse := m == nil || m.Capacity() < length || m.Width() < len(typs)
for i := 0; i < len(typs) && !cannotReuse; i++ {
// TODO(yuzefovich): change this when DatumVec is introduced.
// TODO(yuzefovich): requiring that types are "identical" might be an
// overkill - the vectors could have the same physical representation
// but non-identical types. Think through this more.
if !m.ColVec(i).Type().Identical(typs[i]) {
cannotReuse = true
break
}
}
if cannotReuse {
*m = *NewMemBatchWithCapacity(typs, length, factory).(*MemBatch)
m.SetLength(length)
return
}
// Yay! We can reuse m. NB It's not specified in the Reset contract, but
// probably a good idea to keep all modifications below this line.
//
// Note that we're intentionally not calling m.SetLength() here because
// that would update offsets in the bytes vectors which is not necessary
// since those will get reset in ResetInternalBatch anyway.
m.length = length
m.b = m.b[:len(typs)]
for i := range m.b {
m.b[i].SetLength(length)
}
m.sel = m.sel[:length]
for i, idx := range m.bytesVecIdxs {
if idx >= len(typs) {
m.bytesVecIdxs = m.bytesVecIdxs[:i]
break
}
}
m.ResetInternalBatch()
}
// ResetInternalBatch implements the Batch interface.
func (m *MemBatch) ResetInternalBatch() {
m.SetSelection(false)
for _, v := range m.b {
if v.CanonicalTypeFamily() != types.UnknownFamily {
v.Nulls().UnsetNulls()
}
}
for _, bytesVecIdx := range m.bytesVecIdxs {
m.b[bytesVecIdx].Bytes().Reset()
}
}
// String returns a pretty representation of this batch.
func (m *MemBatch) String() string {
if m.Length() == 0 {
return "[zero-length batch]"
}
var builder strings.Builder
strs := make([]string, len(m.ColVecs()))
for i := 0; i < m.Length(); i++ {
builder.WriteString("\n[")
for colIdx, v := range m.ColVecs() {
strs[colIdx] = fmt.Sprintf("%v", GetValueAt(v, i))
}
builder.WriteString(strings.Join(strs, ", "))
builder.WriteString("]")
}
builder.WriteString("\n")
return builder.String()
}