Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: introduce batches with dynamic capacity #52453

Merged
merged 3 commits into from
Aug 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (w *WorkloadKVConverter) Worker(ctx context.Context, evalCtx *tree.EvalCont
}
var alloc sqlbase.DatumAlloc
var a bufalloc.ByteAllocator
cb := coldata.NewMemBatchWithSize(nil /* types */, 0 /* size */, coldata.StandardColumnFactory)
cb := coldata.NewMemBatchWithCapacity(nil /* typs */, 0 /* capacity */, coldata.StandardColumnFactory)

for {
batchIdx := int(atomic.AddInt64(&w.batchIdxAtomic, 1))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/workloadccl/allccl/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func hashTableInitialData(
h hash.Hash, data workload.BatchedTuples, a *bufalloc.ByteAllocator,
) error {
var scratch [8]byte
b := coldata.NewMemBatchWithSize(nil /* types */, 0 /* size */, coldata.StandardColumnFactory)
b := coldata.NewMemBatchWithCapacity(nil /* typs */, 0 /* capacity */, coldata.StandardColumnFactory)
for batchIdx := 0; batchIdx < data.NumBatches; batchIdx++ {
*a = (*a)[:0]
data.FillBatch(batchIdx, b, a)
Expand Down
90 changes: 58 additions & 32 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type Batch interface {
Length() int
// SetLength sets the number of values in the columns in the batch.
SetLength(int)
// Capacity returns the maximum number of values that can be stored in the
// columns in the batch. Note that it could be a lower bound meaning some
// of the Vecs could actually have larger underlying capacity (for example,
// if they have been appended to).
Capacity() int
// Width returns the number of columns in the batch.
Width() int
// ColVec returns the ith Vec in this batch.
Expand Down Expand Up @@ -73,6 +78,9 @@ const defaultBatchSize = 1024
var batchSize int64 = defaultBatchSize

// BatchSize is the maximum number of tuples that fit in a column batch.
// TODO(yuzefovich): we are treating this method almost as if it were a
// constant while it performs an atomic operation. Think through whether it has
// a noticeable performance hit.
func BatchSize() int {
return int(atomic.LoadInt64(&batchSize))
}
Expand All @@ -96,19 +104,18 @@ func ResetBatchSizeForTests() {
atomic.SwapInt64(&batchSize, defaultBatchSize)
}

// NewMemBatch allocates a new in-memory Batch. An unsupported type will create
// a placeholder Vec that may not be accessed.
// NewMemBatch allocates a new in-memory Batch.
// TODO(jordan): pool these allocations.
func NewMemBatch(typs []*types.T, factory ColumnFactory) Batch {
return NewMemBatchWithSize(typs, BatchSize(), factory)
return NewMemBatchWithCapacity(typs, BatchSize(), factory)
}

// NewMemBatchWithSize allocates a new in-memory Batch with the given column
// size. Use for operators that have a precisely-sized output batch.
func NewMemBatchWithSize(typs []*types.T, size int, factory ColumnFactory) Batch {
b := NewMemBatchNoCols(typs, size).(*MemBatch)
// NewMemBatchWithCapacity allocates a new in-memory Batch with the given
// column size. Use for operators that have a precisely-sized output batch.
func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactory) Batch {
b := NewMemBatchNoCols(typs, capacity).(*MemBatch)
for i, t := range typs {
b.b[i] = NewMemColumn(t, size, factory)
b.b[i] = NewMemColumn(t, capacity, factory)
if b.b[i].CanonicalTypeFamily() == types.BytesFamily {
b.bytesVecIdxs = append(b.bytesVecIdxs, i)
}
Expand All @@ -119,20 +126,21 @@ func NewMemBatchWithSize(typs []*types.T, size int, factory ColumnFactory) Batch
// NewMemBatchNoCols creates a "skeleton" of new in-memory Batch. It allocates
// memory for the selection vector but does *not* allocate any memory for the
// column vectors - those will have to be added separately.
func NewMemBatchNoCols(typs []*types.T, size int) Batch {
if max := math.MaxUint16; size > max {
panic(fmt.Sprintf(`batches cannot have length larger than %d; requested %d`, max, size))
func NewMemBatchNoCols(typs []*types.T, capacity int) Batch {
if max := math.MaxUint16; capacity > max {
panic(fmt.Sprintf(`batches cannot have capacity larger than %d; requested %d`, max, capacity))
}
b := &MemBatch{}
b.capacity = capacity
b.b = make([]Vec, len(typs))
b.sel = make([]int, size)
b.sel = make([]int, capacity)
return b
}

// ZeroBatch is a schema-less Batch of length 0.
var ZeroBatch = &zeroBatch{
MemBatch: NewMemBatchWithSize(
nil /* types */, 0 /* size */, StandardColumnFactory,
MemBatch: NewMemBatchWithCapacity(
nil /* typs */, 0 /* capacity */, StandardColumnFactory,
).(*MemBatch),
}

Expand All @@ -148,6 +156,10 @@ func (b *zeroBatch) Length() int {
return 0
}

func (b *zeroBatch) Capacity() int {
return 0
}

func (b *zeroBatch) SetLength(int) {
panic("length should not be changed on zero batch")
}
Expand All @@ -170,24 +182,34 @@ func (b *zeroBatch) Reset([]*types.T, int, ColumnFactory) {

// MemBatch is an in-memory implementation of Batch.
type MemBatch struct {
// length of batch or sel in tuples
n int
// slice of columns in this batch.
// length is the length of batch or sel in tuples.
length int
// capacity is the maximum number of tuples that can be stored in this
// MemBatch.
capacity int
// b is the slice of columns in this batch.
b []Vec
// bytesVecIdxs stores the indices of all vectors of Bytes type in b. Bytes
// vectors require special handling, so rather than iterating over all
// vectors and checking whether they are of Bytes type we store this slice
// separately.
bytesVecIdxs []int
useSel bool
// if useSel is true, a selection vector from upstream. a selection vector is
// a list of selected column indexes in this memBatch's columns.
// sel is - if useSel is true - a selection vector from upstream. A
// selection vector is a list of selected tuple indices in this memBatch's
// columns (tuples for which indices are not in sel are considered to be
// "not present").
sel []int
}

// Length implements the Batch interface.
func (m *MemBatch) Length() int {
return m.n
return m.length
}

// Capacity implements the Batch interface.
func (m *MemBatch) Capacity() int {
return m.capacity
}

// Width implements the Batch interface.
Expand Down Expand Up @@ -219,11 +241,11 @@ func (m *MemBatch) SetSelection(b bool) {
}

// SetLength implements the Batch interface.
func (m *MemBatch) SetLength(n int) {
m.n = n
if n > 0 {
func (m *MemBatch) SetLength(length int) {
m.length = length
if length > 0 {
for _, bytesVecIdx := range m.bytesVecIdxs {
m.b[bytesVecIdx].Bytes().UpdateOffsetsToBeNonDecreasing(n)
m.b[bytesVecIdx].Bytes().UpdateOffsetsToBeNonDecreasing(length)
}
}
}
Expand All @@ -247,30 +269,34 @@ func (m *MemBatch) ReplaceCol(col Vec, colIdx int) {

// Reset implements the Batch interface.
func (m *MemBatch) Reset(typs []*types.T, length int, factory ColumnFactory) {
// The columns are always sized the same as the selection vector, so use it as
// a shortcut for the capacity (like a go slice, the batch's `Length` could be
// shorter than the capacity). We could be more defensive and type switch
// every column to verify its capacity, but that doesn't seem necessary yet.
cannotReuse := m == nil || len(m.sel) < length || m.Width() < len(typs)
cannotReuse := m == nil || m.Capacity() < length || m.Width() < len(typs)
for i := 0; i < len(typs) && !cannotReuse; i++ {
// TODO(yuzefovich): change this when DatumVec is introduced.
// TODO(yuzefovich): requiring that types are "identical" might be an
// overkill - the vectors could have the same physical representation
// but non-identical types. Think through this more.
if !m.ColVec(i).Type().Identical(typs[i]) {
cannotReuse = true
break
}
}
if cannotReuse {
*m = *NewMemBatchWithSize(typs, length, factory).(*MemBatch)
*m = *NewMemBatchWithCapacity(typs, length, factory).(*MemBatch)
m.SetLength(length)
return
}
// Yay! We can reuse m. NB It's not specified in the Reset contract, but
// probably a good idea to keep all modifications below this line.
m.SetLength(length)
m.sel = m.sel[:length]
//
// Note that we're intentionally not calling m.SetLength() here because
// that would update offsets in the bytes vectors which is not necessary
// since those will get reset in ResetInternalBatch anyway.
m.length = length
m.b = m.b[:len(typs)]
for i := range m.b {
m.b[i].SetLength(length)
}
m.sel = m.sel[:length]
for i, idx := range m.bytesVecIdxs {
if idx >= len(typs) {
m.bytesVecIdxs = m.bytesVecIdxs[:i]
Expand Down
8 changes: 4 additions & 4 deletions pkg/col/coldata/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ func TestBatchReset(t *testing.T) {
resetAndCheck(b, typsInt, 1, true)

// Exact length, reuse
b = coldata.NewMemBatchWithSize(typsInt, 1, coldata.StandardColumnFactory)
b = coldata.NewMemBatchWithCapacity(typsInt, 1, coldata.StandardColumnFactory)
resetAndCheck(b, typsInt, 1, true)

// Insufficient capacity, don't reuse
b = coldata.NewMemBatchWithSize(typsInt, 1, coldata.StandardColumnFactory)
b = coldata.NewMemBatchWithCapacity(typsInt, 1, coldata.StandardColumnFactory)
resetAndCheck(b, typsInt, 2, false)

// Selection vector gets reset
b = coldata.NewMemBatchWithSize(typsInt, 1, coldata.StandardColumnFactory)
b = coldata.NewMemBatchWithCapacity(typsInt, 1, coldata.StandardColumnFactory)
b.SetSelection(true)
b.Selection()[0] = 7
resetAndCheck(b, typsInt, 1, true)

// Nulls gets reset
b = coldata.NewMemBatchWithSize(typsInt, 1, coldata.StandardColumnFactory)
b = coldata.NewMemBatchWithCapacity(typsInt, 1, coldata.StandardColumnFactory)
b.ColVec(0).Nulls().SetNull(0)
resetAndCheck(b, typsInt, 1, true)

Expand Down
33 changes: 16 additions & 17 deletions pkg/col/coldata/vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ type Vec interface {

// Capacity returns the capacity of the Golang's slice that is underlying
// this Vec. Note that if there is no "slice" (like in case of flat bytes),
// the "capacity" of such object is undefined, so is the behavior of this
// method.
// then "capacity" of such object is equal to the number of elements.
Capacity() int
}

Expand All @@ -149,7 +148,7 @@ type memColumn struct {

// ColumnFactory is an interface that can construct columns for Batches.
type ColumnFactory interface {
MakeColumn(t *types.T, n int) Column
MakeColumn(t *types.T, length int) Column
}

type defaultColumnFactory struct{}
Expand All @@ -158,44 +157,44 @@ type defaultColumnFactory struct{}
// explicitly supported by the vectorized engine (i.e. not datum-backed).
var StandardColumnFactory ColumnFactory = &defaultColumnFactory{}

func (cf *defaultColumnFactory) MakeColumn(t *types.T, n int) Column {
func (cf *defaultColumnFactory) MakeColumn(t *types.T, length int) Column {
switch canonicalTypeFamily := typeconv.TypeFamilyToCanonicalTypeFamily(t.Family()); canonicalTypeFamily {
case types.BoolFamily:
return make(Bools, n)
return make(Bools, length)
case types.BytesFamily:
return NewBytes(n)
return NewBytes(length)
case types.IntFamily:
switch t.Width() {
case 16:
return make(Int16s, n)
return make(Int16s, length)
case 32:
return make(Int32s, n)
return make(Int32s, length)
case 0, 64:
return make(Int64s, n)
return make(Int64s, length)
default:
panic(fmt.Sprintf("unexpected integer width: %d", t.Width()))
}
case types.FloatFamily:
return make(Float64s, n)
return make(Float64s, length)
case types.DecimalFamily:
return make(Decimals, n)
return make(Decimals, length)
case types.TimestampTZFamily:
return make(Times, n)
return make(Times, length)
case types.IntervalFamily:
return make(Durations, n)
return make(Durations, length)
default:
panic(fmt.Sprintf("StandardColumnFactory doesn't support %s", t))
}
}

// NewMemColumn returns a new memColumn, initialized with a length using the
// given column factory.
func NewMemColumn(t *types.T, n int, factory ColumnFactory) Vec {
func NewMemColumn(t *types.T, length int, factory ColumnFactory) Vec {
return &memColumn{
t: t,
canonicalTypeFamily: typeconv.TypeFamilyToCanonicalTypeFamily(t.Family()),
col: factory.MakeColumn(t, n),
nulls: NewNulls(n),
col: factory.MakeColumn(t, length),
nulls: NewNulls(length),
}
}

Expand Down Expand Up @@ -340,7 +339,7 @@ func (m *memColumn) Capacity() int {
case types.BoolFamily:
return cap(m.col.(Bools))
case types.BytesFamily:
panic("Capacity should not be called on Vec of Bytes type")
return m.Bytes().Len()
case types.IntFamily:
switch m.t.Width() {
case 16:
Expand Down
2 changes: 1 addition & 1 deletion pkg/col/coldatatestutils/random_testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func RandomBatch(
length int,
nullProbability float64,
) coldata.Batch {
batch := allocator.NewMemBatchWithSize(typs, capacity)
batch := allocator.NewMemBatchWithFixedCapacity(typs, capacity)
if length == 0 {
length = capacity
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/col/coldatatestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
func CopyBatch(
original coldata.Batch, typs []*types.T, factory coldata.ColumnFactory,
) coldata.Batch {
b := coldata.NewMemBatchWithSize(typs, original.Length(), factory)
b := coldata.NewMemBatchWithCapacity(typs, original.Length(), factory)
b.SetLength(original.Length())
for colIdx, col := range original.ColVecs() {
b.ColVec(colIdx).Copy(coldata.CopySliceArgs{
Expand Down
6 changes: 3 additions & 3 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestArrowBatchConverterRandom(t *testing.T) {

arrowData, err := c.BatchToArrow(b)
require.NoError(t, err)
actual := testAllocator.NewMemBatchWithSize(typs, b.Length())
actual := testAllocator.NewMemBatchWithFixedCapacity(typs, b.Length())
require.NoError(t, c.ArrowToBatch(arrowData, actual))

coldata.AssertEquivalentBatches(t, expected, actual)
Expand Down Expand Up @@ -85,7 +85,7 @@ func roundTripBatch(
if err := r.Deserialize(&arrowDataOut, buf.Bytes()); err != nil {
return nil, err
}
actual := testAllocator.NewMemBatchWithSize(typs, b.Length())
actual := testAllocator.NewMemBatchWithFixedCapacity(typs, b.Length())
if err := c.ArrowToBatch(arrowDataOut, actual); err != nil {
return nil, err
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
data, err := c.BatchToArrow(batch)
require.NoError(b, err)
testPrefix := fmt.Sprintf("%s/nullFraction=%0.2f", typ.String(), nullFraction)
result := testAllocator.NewMemBatch([]*types.T{typ})
result := testAllocator.NewMemBatchWithMaxCapacity([]*types.T{typ})
b.Run(testPrefix+"/ArrowToBatch", func(b *testing.B) {
b.SetBytes(numBytes[typIdx])
for i := 0; i < b.N; i++ {
Expand Down
8 changes: 4 additions & 4 deletions pkg/col/colserde/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestFileRoundtrip(t *testing.T) {
// buffer.
for i := 0; i < 2; i++ {
func() {
roundtrip := testAllocator.NewMemBatchWithSize(typs, b.Length())
roundtrip := testAllocator.NewMemBatchWithFixedCapacity(typs, b.Length())
d, err := colserde.NewFileDeserializerFromBytes(typs, buf.Bytes())
require.NoError(t, err)
defer func() { require.NoError(t, d.Close()) }()
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestFileRoundtrip(t *testing.T) {
// file.
for i := 0; i < 2; i++ {
func() {
roundtrip := testAllocator.NewMemBatchWithSize(typs, b.Length())
roundtrip := testAllocator.NewMemBatchWithFixedCapacity(typs, b.Length())
d, err := colserde.NewFileDeserializerFromPath(typs, path)
require.NoError(t, err)
defer func() { require.NoError(t, d.Close()) }()
Expand All @@ -107,7 +107,7 @@ func TestFileIndexing(t *testing.T) {
require.NoError(t, err)

for i := 0; i < numInts; i++ {
b := testAllocator.NewMemBatchWithSize(typs, batchSize)
b := testAllocator.NewMemBatchWithFixedCapacity(typs, batchSize)
b.SetLength(batchSize)
b.ColVec(0).Int64()[0] = int64(i)
require.NoError(t, s.AppendBatch(b))
Expand All @@ -120,7 +120,7 @@ func TestFileIndexing(t *testing.T) {
require.Equal(t, typs, d.Typs())
require.Equal(t, numInts, d.NumBatches())
for batchIdx := numInts - 1; batchIdx >= 0; batchIdx-- {
b := testAllocator.NewMemBatchWithSize(typs, batchSize)
b := testAllocator.NewMemBatchWithFixedCapacity(typs, batchSize)
require.NoError(t, d.GetBatch(batchIdx, b))
require.Equal(t, batchSize, b.Length())
require.Equal(t, 1, b.Width())
Expand Down
Loading