From e9bfb4a99ee6edf2213b3385fc25f002f406905d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alfonso=20Subiotto=20Marqu=C3=A9s?= Date: Mon, 16 Sep 2019 16:05:55 +0200 Subject: [PATCH] coldata: make coldata.BatchSize a package function This will allow us to vary the batchSize private variable int the future by providing a testing hook for external packages to modify the batch size at will. Release justification: Category 1 non-production code change. Release note: None --- pkg/col/coldata/batch.go | 12 +++- pkg/col/coldata/bytes.go | 4 +- pkg/col/coldata/nulls.go | 8 +-- pkg/col/coldata/nulls_test.go | 58 ++++++++--------- pkg/col/coldata/vec_test.go | 64 +++++++++---------- pkg/col/colserde/arrowbatchconverter.go | 4 +- pkg/col/colserde/arrowbatchconverter_test.go | 14 ++-- pkg/sql/colexec/aggregator.go | 8 +-- pkg/sql/colexec/aggregator_test.go | 20 +++--- pkg/sql/colexec/buffer.go | 2 +- pkg/sql/colexec/builtin_funcs_test.go | 20 +++--- pkg/sql/colexec/case.go | 4 +- pkg/sql/colexec/cast_tmpl.go | 4 +- pkg/sql/colexec/cfetcher.go | 6 +- pkg/sql/colexec/coalescer.go | 10 +-- pkg/sql/colexec/coalescer_test.go | 10 +-- pkg/sql/colexec/columnarizer.go | 8 +-- pkg/sql/colexec/columnarizer_test.go | 2 +- pkg/sql/colexec/const_tmpl.go | 2 +- pkg/sql/colexec/deselector_test.go | 6 +- pkg/sql/colexec/distinct_test.go | 8 +-- pkg/sql/colexec/distinct_tmpl.go | 4 +- .../execgen/cmd/execgen/projection_ops_gen.go | 6 +- pkg/sql/colexec/execpb/stats.go | 4 +- pkg/sql/colexec/hash_aggregator.go | 8 +-- pkg/sql/colexec/hashjoiner.go | 24 +++---- pkg/sql/colexec/hashjoiner_test.go | 10 +-- pkg/sql/colexec/hashjoiner_tmpl.go | 4 +- pkg/sql/colexec/like_ops_test.go | 8 +-- pkg/sql/colexec/mergejoiner.go | 12 ++-- pkg/sql/colexec/mergejoiner_test.go | 58 ++++++++--------- pkg/sql/colexec/mergejoiner_util.go | 2 +- pkg/sql/colexec/offset_test.go | 4 +- pkg/sql/colexec/orderedsynchronizer.go | 4 +- pkg/sql/colexec/orderedsynchronizer_test.go | 6 +- pkg/sql/colexec/ordinality.go | 2 +- pkg/sql/colexec/ordinality_test.go | 4 +- pkg/sql/colexec/projection_ops_test.go | 24 +++---- pkg/sql/colexec/random_testutils.go | 5 +- pkg/sql/colexec/routers.go | 10 +-- pkg/sql/colexec/routers_test.go | 45 ++++++------- pkg/sql/colexec/select_in_test.go | 14 ++-- pkg/sql/colexec/select_in_tmpl.go | 2 +- pkg/sql/colexec/selection_ops_test.go | 28 ++++---- pkg/sql/colexec/sort.go | 2 +- pkg/sql/colexec/sort_chunks.go | 4 +- pkg/sql/colexec/sort_chunks_test.go | 12 ++-- pkg/sql/colexec/sort_test.go | 20 +++--- pkg/sql/colexec/sorttopk.go | 6 +- pkg/sql/colexec/stats_test.go | 12 ++-- pkg/sql/colexec/unorderedsynchronizer_test.go | 8 +-- pkg/sql/colexec/utils_test.go | 14 ++-- pkg/sql/colexec/zerocolumns_tmpl.go | 2 +- pkg/sql/colflow/colrpc/colrpc_test.go | 4 +- pkg/sql/colflow/colrpc/inbox.go | 2 +- pkg/sql/colflow/colrpc/inbox_test.go | 2 +- .../colflow/vectorized_flow_shutdown_test.go | 2 +- 57 files changed, 327 insertions(+), 325 deletions(-) diff --git a/pkg/col/coldata/batch.go b/pkg/col/coldata/batch.go index 2f5bccb5fcca..295fa6a69fc9 100644 --- a/pkg/col/coldata/batch.go +++ b/pkg/col/coldata/batch.go @@ -54,14 +54,20 @@ type Batch interface { var _ Batch = &MemBatch{} +const maxBatchSize = 1024 + +var batchSize = uint16(1024) + // BatchSize is the maximum number of tuples that fit in a column batch. // TODO(jordan): tune -const BatchSize = 1024 +func BatchSize() uint16 { + return batchSize +} // NewMemBatch allocates a new in-memory Batch. // TODO(jordan): pool these allocations. func NewMemBatch(types []coltypes.T) Batch { - return NewMemBatchWithSize(types, BatchSize) + return NewMemBatchWithSize(types, int(BatchSize())) } // NewMemBatchWithSize allocates a new in-memory Batch with the given column @@ -133,7 +139,7 @@ func (m *MemBatch) SetLength(n uint16) { // AppendCol implements the Batch interface. func (m *MemBatch) AppendCol(t coltypes.T) { - m.b = append(m.b, NewMemColumn(t, BatchSize)) + m.b = append(m.b, NewMemColumn(t, int(BatchSize()))) } // Reset implements the Batch interface. diff --git a/pkg/col/coldata/bytes.go b/pkg/col/coldata/bytes.go index 3a1ee744526b..47a03a76fc53 100644 --- a/pkg/col/coldata/bytes.go +++ b/pkg/col/coldata/bytes.go @@ -65,7 +65,7 @@ func (b *Bytes) Len() int { return len(b.data) } -var zeroBytesColumn = make([][]byte, BatchSize) +var zeroBytesColumn = make([][]byte, BatchSize()) // Zero zeroes out the underlying bytes. func (b *Bytes) Zero() { @@ -330,7 +330,7 @@ func (b *flatBytes) Len() int { return len(b.offsets) } -var zeroInt32Slice = make([]int32, BatchSize) +var zeroInt32Slice = make([]int32, BatchSize()) // Zero zeroes out the underlying bytes. Note that this doesn't change the // length. Use this instead of Reset if you need to be able to Get zeroed byte diff --git a/pkg/col/coldata/nulls.go b/pkg/col/coldata/nulls.go index 134d5783b5f8..e58336fb149c 100644 --- a/pkg/col/coldata/nulls.go +++ b/pkg/col/coldata/nulls.go @@ -10,13 +10,13 @@ package coldata -// zeroedNulls is a zeroed out slice representing a bitmap of size BatchSize. +// zeroedNulls is a zeroed out slice representing a bitmap of size maxBatchSize. // This is copied to efficiently set all nulls. -var zeroedNulls [(BatchSize-1)/8 + 1]byte +var zeroedNulls [(maxBatchSize-1)/8 + 1]byte -// filledNulls is a slice representing a bitmap of size BatchSize with every +// filledNulls is a slice representing a bitmap of size maxBatchSize with every // single bit set. -var filledNulls [(BatchSize-1)/8 + 1]byte +var filledNulls [(maxBatchSize-1)/8 + 1]byte // bitMask[i] is a byte with a single bit set at i. var bitMask = [8]byte{0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40, 0x80} diff --git a/pkg/col/coldata/nulls_test.go b/pkg/col/coldata/nulls_test.go index 40d9d2b7f7ec..4a5a7400349a 100644 --- a/pkg/col/coldata/nulls_test.go +++ b/pkg/col/coldata/nulls_test.go @@ -28,13 +28,13 @@ var nulls5 Nulls var nulls10 Nulls // pos is a collection of interesting boundary indices to use in tests. -var pos = []uint64{0, 1, 63, 64, 65, BatchSize - 1, BatchSize} +var pos = []uint64{0, 1, 63, 64, 65, uint64(BatchSize()) - 1, uint64(BatchSize())} func init() { - nulls3 = NewNulls(BatchSize) - nulls5 = NewNulls(BatchSize) - nulls10 = NewNulls(BatchSize * 2) - for i := uint16(0); i < BatchSize; i++ { + nulls3 = NewNulls(int(BatchSize())) + nulls5 = NewNulls(int(BatchSize())) + nulls10 = NewNulls(int(BatchSize()) * 2) + for i := uint16(0); i < BatchSize(); i++ { if i%3 == 0 { nulls3.SetNull(i) } @@ -42,7 +42,7 @@ func init() { nulls5.SetNull(i) } } - for i := uint16(0); i < BatchSize*2; i++ { + for i := uint16(0); i < BatchSize()*2; i++ { if i%10 == 0 { nulls10.SetNull(i) } @@ -50,7 +50,7 @@ func init() { } func TestNullAt(t *testing.T) { - for i := uint16(0); i < BatchSize; i++ { + for i := uint16(0); i < BatchSize(); i++ { if i%3 == 0 { require.True(t, nulls3.NullAt(i)) } else { @@ -62,9 +62,9 @@ func TestNullAt(t *testing.T) { func TestSetNullRange(t *testing.T) { for _, start := range pos { for _, end := range pos { - n := NewNulls(BatchSize) + n := NewNulls(int(BatchSize())) n.SetNullRange(start, end) - for i := uint64(0); i < BatchSize; i++ { + for i := uint64(0); i < uint64(BatchSize()); i++ { expected := i >= start && i < end require.Equal(t, expected, n.NullAt64(i), "NullAt(%d) should be %t after SetNullRange(%d, %d)", i, expected, start, end) @@ -76,10 +76,10 @@ func TestSetNullRange(t *testing.T) { func TestUnsetNullRange(t *testing.T) { for _, start := range pos { for _, end := range pos { - n := NewNulls(BatchSize) + n := NewNulls(int(BatchSize())) n.SetNulls() n.UnsetNullRange(start, end) - for i := uint64(0); i < BatchSize; i++ { + for i := uint64(0); i < uint64(BatchSize()); i++ { notExpected := i >= start && i < end require.NotEqual(t, notExpected, n.NullAt64(i), "NullAt(%d) saw %t, expected %t, after SetNullRange(%d, %d)", i, n.NullAt64(i), !notExpected, start, end) @@ -89,8 +89,8 @@ func TestUnsetNullRange(t *testing.T) { } func TestSwapNulls(t *testing.T) { - n := NewNulls(BatchSize) - swapPos := []uint64{0, 1, 63, 64, 65, BatchSize - 1} + n := NewNulls(int(BatchSize())) + swapPos := []uint64{0, 1, 63, 64, 65, uint64(BatchSize()) - 1} idxInSwapPos := func(idx uint64) bool { for _, p := range swapPos { if p == idx { @@ -108,7 +108,7 @@ func TestSwapNulls(t *testing.T) { for _, i := range swapPos { for _, j := range swapPos { n.swap(i, j) - for k := uint64(0); k < BatchSize; k++ { + for k := uint64(0); k < uint64(BatchSize()); k++ { require.Equal(t, idxInSwapPos(k), n.NullAt64(k), "after swapping NULLS (%d, %d), NullAt(%d) saw %t, expected %t", i, j, k, n.NullAt64(k), idxInSwapPos(k)) } @@ -120,7 +120,7 @@ func TestSwapNulls(t *testing.T) { // Test that swapping null with not null changes things appropriately. n.UnsetNulls() swaps := map[uint64]uint64{ - 0: BatchSize - 1, + 0: uint64(BatchSize()) - 1, 1: 62, 2: 3, 63: 65, @@ -141,7 +141,7 @@ func TestSwapNulls(t *testing.T) { n.swap(i, j) require.Truef(t, n.NullAt64(i), "after swapping not null and null (%d, %d), found null=%t at %d", i, j, n.NullAt64(i), i) require.Truef(t, !n.NullAt64(j), "after swapping not null and null (%d, %d), found null=%t at %d", i, j, !n.NullAt64(j), j) - for k := uint64(0); k < BatchSize; k++ { + for k := uint64(0); k < uint64(BatchSize()); k++ { if idxInSwaps(k) { continue } @@ -160,7 +160,7 @@ func TestSwapNulls(t *testing.T) { for _, i := range swapPos { for _, j := range swapPos { n.swap(i, j) - for k := uint64(0); k < BatchSize; k++ { + for k := uint64(0); k < uint64(BatchSize()); k++ { require.Equal(t, idxInSwapPos(k), !n.NullAt64(k), "after swapping NULLS (%d, %d), NullAt(%d) saw %t, expected %t", i, j, k, !n.NullAt64(k), idxInSwapPos(k)) } @@ -171,9 +171,9 @@ func TestSwapNulls(t *testing.T) { func TestNullsTruncate(t *testing.T) { for _, size := range pos { - n := NewNulls(BatchSize) + n := NewNulls(int(BatchSize())) n.Truncate(uint16(size)) - for i := uint16(0); i < BatchSize; i++ { + for i := uint16(0); i < BatchSize(); i++ { expected := uint64(i) >= size require.Equal(t, expected, n.NullAt(i), "NullAt(%d) should be %t after Truncate(%d)", i, expected, size) @@ -183,10 +183,10 @@ func TestNullsTruncate(t *testing.T) { func TestUnsetNullsAfter(t *testing.T) { for _, size := range pos { - n := NewNulls(BatchSize) + n := NewNulls(int(BatchSize())) n.SetNulls() n.UnsetNullsAfter(uint16(size)) - for i := uint16(0); i < BatchSize; i++ { + for i := uint16(0); i < BatchSize(); i++ { expected := uint64(i) < size require.Equal(t, expected, n.NullAt(i), "NullAt(%d) should be %t after UnsetNullsAfter(%d)", i, expected, size) @@ -195,19 +195,19 @@ func TestUnsetNullsAfter(t *testing.T) { } func TestSetAndUnsetNulls(t *testing.T) { - n := NewNulls(BatchSize) - for i := uint16(0); i < BatchSize; i++ { + n := NewNulls(int(BatchSize())) + for i := uint16(0); i < BatchSize(); i++ { require.False(t, n.NullAt(i)) } n.SetNulls() - for i := uint16(0); i < BatchSize; i++ { + for i := uint16(0); i < BatchSize(); i++ { require.True(t, n.NullAt(i)) } - for i := uint16(0); i < BatchSize; i += 3 { + for i := uint16(0); i < BatchSize(); i += 3 { n.UnsetNull(i) } - for i := uint16(0); i < BatchSize; i++ { + for i := uint16(0); i < BatchSize(); i++ { if i%3 == 0 { require.False(t, n.NullAt(i)) } else { @@ -216,7 +216,7 @@ func TestSetAndUnsetNulls(t *testing.T) { } n.UnsetNulls() - for i := uint16(0); i < BatchSize; i++ { + for i := uint16(0); i < BatchSize(); i++ { require.False(t, n.NullAt(i)) } } @@ -264,7 +264,7 @@ func TestNullsSetWithSel(t *testing.T) { args := SliceArgs{ // Neither type nor the length here matter. Src: NewMemColumn(coltypes.Bool, 0), - Sel: make([]uint16, BatchSize), + Sel: make([]uint16, BatchSize()), } // Make a selection vector with every even index. (This turns nulls10 into // nulls5.) @@ -318,7 +318,7 @@ func TestSlice(t *testing.T) { } } // Ensure we haven't modified the receiver. - for i := uint16(0); i < BatchSize; i++ { + for i := uint16(0); i < BatchSize(); i++ { expected := i%3 == 0 require.Equal(t, expected, nulls3.NullAt(i)) } diff --git a/pkg/col/coldata/vec_test.go b/pkg/col/coldata/vec_test.go index 6b8cf282f59d..b5a8e212898d 100644 --- a/pkg/col/coldata/vec_test.go +++ b/pkg/col/coldata/vec_test.go @@ -24,10 +24,10 @@ func TestMemColumnSlice(t *testing.T) { rng, _ := randutil.NewPseudoRand() - c := NewMemColumn(coltypes.Int64, BatchSize) + c := NewMemColumn(coltypes.Int64, int(BatchSize())) ints := c.Int64() - for i := uint16(0); i < BatchSize; i++ { + for i := uint16(0); i < BatchSize(); i++ { ints[i] = int64(i) if i%2 == 0 { // Set every other value to null. @@ -38,8 +38,8 @@ func TestMemColumnSlice(t *testing.T) { startSlice := uint16(1) endSlice := uint16(0) for startSlice > endSlice { - startSlice = uint16(rng.Intn(BatchSize)) - endSlice = uint16(1 + rng.Intn(BatchSize)) + startSlice = uint16(rng.Intn(int(BatchSize()))) + endSlice = uint16(1 + rng.Intn(int(BatchSize()))) } slice := c.Slice(coltypes.Int64, uint64(startSlice), uint64(endSlice)) @@ -112,12 +112,12 @@ func TestNullRanges(t *testing.T) { }, } - c := NewMemColumn(coltypes.Int64, BatchSize) + c := NewMemColumn(coltypes.Int64, int(BatchSize())) for _, tc := range tcs { c.Nulls().UnsetNulls() c.Nulls().SetNullRange(tc.start, tc.end) - for i := uint64(0); i < BatchSize; i++ { + for i := uint64(0); i < uint64(BatchSize()); i++ { if i >= tc.start && i < tc.end { if !c.Nulls().NullAt64(i) { t.Fatalf("expected null at %d, start: %d end: %d", i, tc.start, tc.end) @@ -135,7 +135,7 @@ func TestAppend(t *testing.T) { // TODO(asubiotto): Test nulls. const typ = coltypes.Int64 - src := NewMemColumn(typ, BatchSize) + src := NewMemColumn(typ, int(BatchSize())) sel := make([]uint16, len(src.Int64())) for i := range sel { sel[i] = uint16(i) @@ -150,9 +150,9 @@ func TestAppend(t *testing.T) { name: "AppendSimple", args: SliceArgs{ // DestIdx must be specified to append to the end of dest. - DestIdx: BatchSize, + DestIdx: uint64(BatchSize()), }, - expectedLength: BatchSize * 2, + expectedLength: int(BatchSize()) * 2, }, { name: "AppendOverwriteSimple", @@ -160,7 +160,7 @@ func TestAppend(t *testing.T) { // DestIdx 0, the default value, will start appending at index 0. DestIdx: 0, }, - expectedLength: BatchSize, + expectedLength: int(BatchSize()), }, { name: "AppendOverwriteSlice", @@ -168,7 +168,7 @@ func TestAppend(t *testing.T) { // Start appending at index 10. DestIdx: 10, }, - expectedLength: BatchSize + 10, + expectedLength: int(BatchSize()) + 10, }, { name: "AppendSlice", @@ -196,7 +196,7 @@ func TestAppend(t *testing.T) { Sel: sel[:len(sel)/2], SrcEndIdx: uint64(len(sel) / 2), }, - expectedLength: 5 + (BatchSize / 2), + expectedLength: 5 + (int(BatchSize()) / 2), }, } @@ -205,10 +205,10 @@ func TestAppend(t *testing.T) { tc.args.ColType = typ if tc.args.SrcEndIdx == 0 { // SrcEndIdx is always required. - tc.args.SrcEndIdx = BatchSize + tc.args.SrcEndIdx = uint64(BatchSize()) } t.Run(tc.name, func(t *testing.T) { - dest := NewMemColumn(typ, BatchSize) + dest := NewMemColumn(typ, int(BatchSize())) dest.Append(tc.args) require.Equal(t, tc.expectedLength, len(dest.Int64())) }) @@ -219,7 +219,7 @@ func TestCopy(t *testing.T) { // TODO(asubiotto): Test nulls. const typ = coltypes.Int64 - src := NewMemColumn(typ, BatchSize) + src := NewMemColumn(typ, int(BatchSize())) srcInts := src.Int64() for i := range srcInts { srcInts[i] = int64(i + 1) @@ -257,11 +257,11 @@ func TestCopy(t *testing.T) { SliceArgs: SliceArgs{ // Use DestIdx 1 to make sure that it is respected. DestIdx: 1, - SrcEndIdx: BatchSize - 1, + SrcEndIdx: uint64(BatchSize()) - 1, }, }, // expectedSum uses sum of positive integers formula. - expectedSum: ((BatchSize - 1) * (BatchSize)) / 2, + expectedSum: ((int(BatchSize()) - 1) * (int(BatchSize()))) / 2, }, { name: "CopyWithSel", @@ -286,7 +286,7 @@ func TestCopy(t *testing.T) { tc.args.Src = src tc.args.ColType = typ t.Run(tc.name, func(t *testing.T) { - dest := NewMemColumn(typ, BatchSize) + dest := NewMemColumn(typ, int(BatchSize())) dest.Copy(tc.args) destInts := dest.Int64() firstNonZero := 0 @@ -307,7 +307,7 @@ func TestCopyNulls(t *testing.T) { const typ = coltypes.Int64 // Set up the destination vector. - dst := NewMemColumn(typ, BatchSize) + dst := NewMemColumn(typ, int(BatchSize())) dstInts := dst.Int64() for i := range dstInts { dstInts[i] = int64(1) @@ -318,7 +318,7 @@ func TestCopyNulls(t *testing.T) { } // Set up the source vector. - src := NewMemColumn(typ, BatchSize) + src := NewMemColumn(typ, int(BatchSize())) srcInts := src.Int64() for i := range srcInts { srcInts[i] = 2 @@ -353,7 +353,7 @@ func TestCopyNulls(t *testing.T) { } // Verify that the remaining elements in dst have not been touched. - for i := 10; i < BatchSize; i++ { + for i := 10; i < int(BatchSize()); i++ { require.True(t, dstInts[i] == 1, "data in dst outside copy range has been changed") require.True(t, !dst.Nulls().NullAt(uint16(i)), "no extra nulls were added") } @@ -364,7 +364,7 @@ func TestCopySelOnDestDoesNotUnsetOldNulls(t *testing.T) { // Set up the destination vector. It is all nulls except for a single // non-null at index 0. - dst := NewMemColumn(typ, BatchSize) + dst := NewMemColumn(typ, int(BatchSize())) dstInts := dst.Int64() for i := range dstInts { dstInts[i] = 1 @@ -373,7 +373,7 @@ func TestCopySelOnDestDoesNotUnsetOldNulls(t *testing.T) { dst.Nulls().UnsetNull(0) // Set up the source vector with two nulls. - src := NewMemColumn(typ, BatchSize) + src := NewMemColumn(typ, int(BatchSize())) srcInts := src.Int64() for i := range srcInts { srcInts[i] = 2 @@ -411,7 +411,7 @@ func TestCopySelOnDestDoesNotUnsetOldNulls(t *testing.T) { func BenchmarkAppend(b *testing.B) { const typ = coltypes.Int64 - src := NewMemColumn(typ, BatchSize) + src := NewMemColumn(typ, int(BatchSize())) sel := make([]uint16, len(src.Int64())) benchCases := []struct { @@ -433,14 +433,14 @@ func BenchmarkAppend(b *testing.B) { for _, bc := range benchCases { bc.args.Src = src bc.args.ColType = typ - bc.args.SrcEndIdx = BatchSize - dest := NewMemColumn(typ, BatchSize) + bc.args.SrcEndIdx = uint64(BatchSize()) + dest := NewMemColumn(typ, int(BatchSize())) b.Run(bc.name, func(b *testing.B) { - b.SetBytes(8 * BatchSize) + b.SetBytes(8 * int64(BatchSize())) for i := 0; i < b.N; i++ { dest.Append(bc.args) // "Reset" dest for another round. - dest.SetCol(dest.Int64()[:BatchSize]) + dest.SetCol(dest.Int64()[:BatchSize()]) } }) } @@ -449,7 +449,7 @@ func BenchmarkAppend(b *testing.B) { func BenchmarkCopy(b *testing.B) { const typ = coltypes.Int64 - src := NewMemColumn(typ, BatchSize) + src := NewMemColumn(typ, int(BatchSize())) sel := make([]uint16, len(src.Int64())) benchCases := []struct { @@ -473,10 +473,10 @@ func BenchmarkCopy(b *testing.B) { for _, bc := range benchCases { bc.args.Src = src bc.args.ColType = typ - bc.args.SrcEndIdx = BatchSize - dest := NewMemColumn(typ, BatchSize) + bc.args.SrcEndIdx = uint64(BatchSize()) + dest := NewMemColumn(typ, int(BatchSize())) b.Run(bc.name, func(b *testing.B) { - b.SetBytes(8 * BatchSize) + b.SetBytes(8 * int64(BatchSize())) for i := 0; i < b.N; i++ { dest.Copy(bc.args) } diff --git a/pkg/col/colserde/arrowbatchconverter.go b/pkg/col/colserde/arrowbatchconverter.go index d946e3a911a8..7d51dbea27d0 100644 --- a/pkg/col/colserde/arrowbatchconverter.go +++ b/pkg/col/colserde/arrowbatchconverter.go @@ -95,7 +95,7 @@ var supportedTypes = func() map[coltypes.T]struct{} { // BatchToArrow converts the first batch.Length elements of the batch into an // arrow []*array.Data. It is assumed that the batch is not larger than -// coldata.BatchSize. The returned []*array.Data may only be used until the +// coldata.BatchSize(). The returned []*array.Data may only be used until the // next call to BatchToArrow. func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data, error) { if batch.Width() != len(c.typs) { @@ -191,7 +191,7 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data, } // ArrowToBatch converts []*array.Data to a coldata.Batch. There must not be -// more than coldata.BatchSize elements in data. It's safe to call ArrowToBatch +// more than coldata.BatchSize() elements in data. It's safe to call ArrowToBatch // concurrently. // // The passed in batch is overwritten, but after this method returns it stays diff --git a/pkg/col/colserde/arrowbatchconverter_test.go b/pkg/col/colserde/arrowbatchconverter_test.go index 7485e8f360d9..a9cfece69a18 100644 --- a/pkg/col/colserde/arrowbatchconverter_test.go +++ b/pkg/col/colserde/arrowbatchconverter_test.go @@ -41,7 +41,7 @@ func randomBatch() ([]coltypes.T, coldata.Batch) { typs[i] = availableTyps[rng.Intn(len(availableTyps))] } - capacity := rng.Intn(coldata.BatchSize) + 1 + capacity := rng.Intn(int(coldata.BatchSize())) + 1 length := rng.Intn(capacity) b := colexec.RandomBatch(rng, typs, capacity, length, rng.Float64()) return typs, b @@ -51,7 +51,7 @@ func randomBatch() ([]coltypes.T, coldata.Batch) { // use the returned batch to assert equality, not as an input to a testing // function, since Copy simplifies the internals (e.g. if there are zero // elements to copy, copyBatch returns a zero-capacity batch, which is less -// interesting than testing a batch with a different capacity of BatchSize but +// interesting than testing a batch with a different capacity of BatchSize() but // zero elements). func copyBatch(original coldata.Batch) coldata.Batch { typs := make([]coltypes.T, original.Width()) @@ -82,7 +82,7 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) { require.Equal(t, expected.Width(), actual.Width()) for colIdx := 0; colIdx < expected.Width(); colIdx++ { // Verify equality of ColVecs (this includes nulls). Since the coldata.Vec - // backing array is always of coldata.BatchSize due to the scratch batch + // backing array is always of coldata.BatchSize() due to the scratch batch // that the converter keeps around, the coldata.Vec needs to be sliced to // the first length elements to match on length, otherwise the check will // fail. @@ -200,10 +200,10 @@ func BenchmarkArrowBatchConverter(b *testing.B) { // numBytes corresponds 1:1 to typs and specifies how many bytes we are // converting on one iteration of the benchmark for the corresponding type in // typs. - numBytes := []int64{coldata.BatchSize, fixedLen * coldata.BatchSize, 8 * coldata.BatchSize} + numBytes := []int64{int64(coldata.BatchSize()), fixedLen * int64(coldata.BatchSize()), 8 * int64(coldata.BatchSize())} // Run a benchmark on every type we care about. for typIdx, typ := range typs { - batch := colexec.RandomBatch(rng, []coltypes.T{typ}, coldata.BatchSize, 0 /* length */, 0 /* nullProbability */) + batch := colexec.RandomBatch(rng, []coltypes.T{typ}, int(coldata.BatchSize()), 0 /* length */, 0 /* nullProbability */) if batch.Width() != 1 { b.Fatalf("unexpected batch width: %d", batch.Width()) } @@ -249,7 +249,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) { if len(data) != 1 { b.Fatal("expected arrow batch of length 1") } - if data[0].Len() != coldata.BatchSize { + if data[0].Len() != int(coldata.BatchSize()) { b.Fatal("unexpected number of elements") } } @@ -272,7 +272,7 @@ func BenchmarkArrowBatchConverter(b *testing.B) { if result.Width() != 1 { b.Fatal("expected one column") } - if result.Length() != coldata.BatchSize { + if result.Length() != coldata.BatchSize() { b.Fatal("unexpected number of elements") } } diff --git a/pkg/sql/colexec/aggregator.go b/pkg/sql/colexec/aggregator.go index fd8ba8b6d673..9008f626cfd9 100644 --- a/pkg/sql/colexec/aggregator.go +++ b/pkg/sql/colexec/aggregator.go @@ -104,7 +104,7 @@ type orderedAggregator struct { // resumeIdx is the index at which the aggregation functions should start // writing to on the next iteration of Next(). resumeIdx int - // outputSize is col.BatchSize by default. + // outputSize is col.BatchSize() by default. outputSize int } @@ -243,11 +243,11 @@ func makeAggregateFuncs( } func (a *orderedAggregator) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes(a.outputTypes, coldata.BatchSize*2) + return EstimateBatchSizeBytes(a.outputTypes, int(coldata.BatchSize()*2)) } func (a *orderedAggregator) initWithOutputBatchSize(outputSize uint16) { - a.initWithInputAndOutputBatchSize(coldata.BatchSize, int(outputSize)) + a.initWithInputAndOutputBatchSize(int(coldata.BatchSize()), int(outputSize)) } func (a *orderedAggregator) initWithInputAndOutputBatchSize(inputSize, outputSize int) { @@ -265,7 +265,7 @@ func (a *orderedAggregator) initWithInputAndOutputBatchSize(inputSize, outputSiz } func (a *orderedAggregator) Init() { - a.initWithInputAndOutputBatchSize(coldata.BatchSize, coldata.BatchSize) + a.initWithInputAndOutputBatchSize(int(coldata.BatchSize()), int(coldata.BatchSize())) } func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch { diff --git a/pkg/sql/colexec/aggregator_test.go b/pkg/sql/colexec/aggregator_test.go index 022f5b9f0165..bf67207da80d 100644 --- a/pkg/sql/colexec/aggregator_test.go +++ b/pkg/sql/colexec/aggregator_test.go @@ -39,7 +39,7 @@ type aggregatorTestCase struct { aggCols [][]uint32 input tuples expected tuples - // {output}BatchSize if not 0 are passed in to NewOrderedAggregator to + // {output}BatchSize() if not 0 are passed in to NewOrderedAggregator to // divide input/output batches. batchSize int outputBatchSize int @@ -115,10 +115,10 @@ func (tc *aggregatorTestCase) init() error { tc.colTypes = defaultColTyps } if tc.batchSize == 0 { - tc.batchSize = coldata.BatchSize + tc.batchSize = int(coldata.BatchSize()) } if tc.outputBatchSize == 0 { - tc.outputBatchSize = coldata.BatchSize + tc.outputBatchSize = int(coldata.BatchSize()) } return nil } @@ -466,13 +466,13 @@ func TestAggregatorRandom(t *testing.T) { // to make sure the aggregations are correct. rng, _ := randutil.NewPseudoRand() ctx := context.Background() - for _, groupSize := range []int{1, 2, coldata.BatchSize / 4, coldata.BatchSize / 2} { + for _, groupSize := range []int{1, 2, int(coldata.BatchSize()) / 4, int(coldata.BatchSize()) / 2} { for _, numInputBatches := range []int{1, 2, 64} { for _, hasNulls := range []bool{true, false} { for _, agg := range aggTypes { t.Run(fmt.Sprintf("%s/groupSize=%d/numInputBatches=%d/hasNulls=%t", agg.name, groupSize, numInputBatches, hasNulls), func(t *testing.T) { - nTuples := coldata.BatchSize * numInputBatches + nTuples := int(coldata.BatchSize()) * numInputBatches typs := []coltypes.T{coltypes.Int64, coltypes.Float64} cols := []coldata.Vec{ coldata.NewMemColumn(typs[0], nTuples), @@ -592,10 +592,10 @@ func TestAggregatorRandom(t *testing.T) { } i++ } - totalInputRows := numInputBatches * coldata.BatchSize + totalInputRows := numInputBatches * int(coldata.BatchSize()) nOutputRows := totalInputRows / groupSize - expBatches := (nOutputRows / coldata.BatchSize) - if nOutputRows%coldata.BatchSize != 0 { + expBatches := nOutputRows / int(coldata.BatchSize()) + if nOutputRows%int(coldata.BatchSize()) != 0 { expBatches++ } if i != expBatches { @@ -625,14 +625,14 @@ func BenchmarkAggregator(b *testing.B) { b.Run(fName, func(b *testing.B) { for _, agg := range aggTypes { for _, typ := range []coltypes.T{coltypes.Int64, coltypes.Decimal} { - for _, groupSize := range []int{1, 2, coldata.BatchSize / 2, coldata.BatchSize} { + for _, groupSize := range []int{1, 2, int(coldata.BatchSize()) / 2, int(coldata.BatchSize())} { for _, hasNulls := range []bool{false, true} { for _, numInputBatches := range []int{64} { b.Run(fmt.Sprintf("%s/%s/groupSize=%d/hasNulls=%t/numInputBatches=%d", agg.name, typ.String(), groupSize, hasNulls, numInputBatches), func(b *testing.B) { colTypes := []coltypes.T{coltypes.Int64, typ} - nTuples := numInputBatches * coldata.BatchSize + nTuples := numInputBatches * int(coldata.BatchSize()) cols := []coldata.Vec{coldata.NewMemColumn(coltypes.Int64, nTuples), coldata.NewMemColumn(typ, nTuples)} groups := cols[0].Int64() curGroup := -1 diff --git a/pkg/sql/colexec/buffer.go b/pkg/sql/colexec/buffer.go index 87e6f6ca6948..45f3b4de728f 100644 --- a/pkg/sql/colexec/buffer.go +++ b/pkg/sql/colexec/buffer.go @@ -38,7 +38,7 @@ func NewBufferOp(input Operator) Operator { return &bufferOp{ input: input, batch: &selectionBatch{ - sel: make([]uint16, coldata.BatchSize), + sel: make([]uint16, coldata.BatchSize()), }, } } diff --git a/pkg/sql/colexec/builtin_funcs_test.go b/pkg/sql/colexec/builtin_funcs_test.go index 103daa33dc04..13bec6b4cdc2 100644 --- a/pkg/sql/colexec/builtin_funcs_test.go +++ b/pkg/sql/colexec/builtin_funcs_test.go @@ -107,8 +107,8 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b batch := coldata.NewMemBatch([]coltypes.T{coltypes.Int64}) col := batch.ColVec(0).Int64() - for i := int64(0); i < coldata.BatchSize; i++ { - if float64(i) < coldata.BatchSize*selectivity { + for i := 0; i < int(coldata.BatchSize()); i++ { + if float64(i) < float64(coldata.BatchSize())*selectivity { col[i] = -1 } else { col[i] = 1 @@ -116,19 +116,19 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b } if hasNulls { - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { if rand.Float64() < nullProbability { batch.ColVec(0).Nulls().SetNull(uint16(i)) } } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) if useSelectionVector { batch.SetSelection(true) sel := batch.Selection() - for i := int64(0); i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { sel[i] = uint16(i) } } @@ -147,7 +147,7 @@ func benchmarkBuiltinFunctions(b *testing.B, useSelectionVector bool, hasNulls b } op := NewBuiltinFunctionOperator(tctx, typedExpr.(*tree.FuncExpr), []types.T{*types.Int}, []int{0}, 1, source) - b.SetBytes(int64(8 * coldata.BatchSize)) + b.SetBytes(int64(8 * coldata.BatchSize())) b.ResetTimer() for i := 0; i < b.N; i++ { op.Next(ctx) @@ -176,12 +176,12 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) { sCol := batch.ColVec(1).Int64() eCol := batch.ColVec(2).Int64() outCol := batch.ColVec(3).Bytes() - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { bCol.Set(i, []byte("hello there")) sCol[i] = 1 eCol[i] = 4 } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) source := NewRepeatableBatchSource(batch) source.Init() @@ -220,7 +220,7 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) { specOp.Init() b.Run("DefaultBuiltinOperator", func(b *testing.B) { - b.SetBytes(int64(len("hello there") * coldata.BatchSize)) + b.SetBytes(int64(len("hello there") * int(coldata.BatchSize()))) b.ResetTimer() for i := 0; i < b.N; i++ { defaultOp.Next(ctx) @@ -231,7 +231,7 @@ func BenchmarkCompareSpecializedOperators(b *testing.B) { }) b.Run("SpecializedSubstringOperator", func(b *testing.B) { - b.SetBytes(int64(len("hello there") * coldata.BatchSize)) + b.SetBytes(int64(len("hello there") * int(coldata.BatchSize()))) b.ResetTimer() for i := 0; i < b.N; i++ { specOp.Next(ctx) diff --git a/pkg/sql/colexec/case.go b/pkg/sql/colexec/case.go index 98398d302633..2060d4c0ae00 100644 --- a/pkg/sql/colexec/case.go +++ b/pkg/sql/colexec/case.go @@ -55,7 +55,7 @@ func (c *caseOp) Child(nth int) execinfra.OpNode { func (c *caseOp) EstimateStaticMemoryUsage() int { // We statically use a single selection vector, origSel. - return coldata.BatchSize * sizeOfInt16 + return int(coldata.BatchSize()) * sizeOfInt16 } // NewCaseOp returns an operator that runs a case statement. @@ -83,7 +83,7 @@ func NewCaseOp( thenIdxs: thenIdxs, outputIdx: outputIdx, typ: typ, - origSel: make([]uint16, coldata.BatchSize), + origSel: make([]uint16, coldata.BatchSize()), } } diff --git a/pkg/sql/colexec/cast_tmpl.go b/pkg/sql/colexec/cast_tmpl.go index 631e33c0b8ad..bc056912d84d 100644 --- a/pkg/sql/colexec/cast_tmpl.go +++ b/pkg/sql/colexec/cast_tmpl.go @@ -117,7 +117,7 @@ type castOpNullAny struct { var _ StaticMemoryOperator = &castOpNullAny{} func (c *castOpNullAny) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes([]coltypes.T{c.toType}, coldata.BatchSize) + return EstimateBatchSizeBytes([]coltypes.T{c.toType}, int(coldata.BatchSize())) } func (c *castOpNullAny) Init() { @@ -173,7 +173,7 @@ type castOp_FROMTYPE_TOTYPE struct { var _ StaticMemoryOperator = &castOp_FROMTYPE_TOTYPE{} func (c *castOp_FROMTYPE_TOTYPE) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes([]coltypes.T{c.toType}, coldata.BatchSize) + return EstimateBatchSizeBytes([]coltypes.T{c.toType}, int(coldata.BatchSize())) } func (c *castOp_FROMTYPE_TOTYPE) Init() { diff --git a/pkg/sql/colexec/cfetcher.go b/pkg/sql/colexec/cfetcher.go index 28c9ba18f1e3..5e21f1a7c920 100644 --- a/pkg/sql/colexec/cfetcher.go +++ b/pkg/sql/colexec/cfetcher.go @@ -288,7 +288,7 @@ func (rf *cFetcher) Init( } rf.machine.batch = coldata.NewMemBatch(typs) rf.machine.colvecs = rf.machine.batch.ColVecs() - rf.estimatedStaticMemoryUsage = EstimateBatchSizeBytes(typs, coldata.BatchSize) + rf.estimatedStaticMemoryUsage = EstimateBatchSizeBytes(typs, int(coldata.BatchSize())) var err error @@ -545,7 +545,7 @@ const ( const debugState = false // NextBatch processes keys until we complete one batch of rows, -// coldata.BatchSize in length, which are returned in columnar format as an +// coldata.BatchSize() in length, which are returned in columnar format as an // exec.Batch. The batch contains one Vec per table column, regardless of the // index used; columns that are not needed (as per neededCols) are empty. The // Batch should not be modified and is only valid until the next call. @@ -753,7 +753,7 @@ func (rf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { } rf.machine.rowIdx++ rf.shiftState() - if rf.machine.rowIdx >= coldata.BatchSize { + if rf.machine.rowIdx >= coldata.BatchSize() { rf.pushState(stateResetBatch) rf.machine.batch.SetLength(rf.machine.rowIdx) rf.machine.rowIdx = 0 diff --git a/pkg/sql/colexec/coalescer.go b/pkg/sql/colexec/coalescer.go index 194ff9a51aed..fd65281180ae 100644 --- a/pkg/sql/colexec/coalescer.go +++ b/pkg/sql/colexec/coalescer.go @@ -18,7 +18,7 @@ import ( ) // coalescerOp consumes the input operator and coalesces the resulting batches -// to return full batches of coldata.BatchSize. +// to return full batches of coldata.BatchSize(). type coalescerOp struct { OneInputNode NonExplainable @@ -41,7 +41,7 @@ func NewCoalescerOp(input Operator, colTypes []coltypes.T) Operator { } func (p *coalescerOp) EstimateStaticMemoryUsage() int { - return 2 * EstimateBatchSizeBytes(p.inputTypes, coldata.BatchSize) + return 2 * EstimateBatchSizeBytes(p.inputTypes, int(coldata.BatchSize())) } func (p *coalescerOp) Init() { @@ -59,8 +59,8 @@ func (p *coalescerOp) Next(ctx context.Context) coldata.Batch { p.buffer = tempBatch p.buffer.SetLength(0) - for p.group.Length() < coldata.BatchSize { - leftover := coldata.BatchSize - p.group.Length() + for p.group.Length() < coldata.BatchSize() { + leftover := coldata.BatchSize() - p.group.Length() batch := p.input.Next(ctx) batchSize := batch.Length() @@ -112,7 +112,7 @@ func (p *coalescerOp) Next(ctx context.Context) coldata.Batch { if batchSize <= leftover { p.group.SetLength(p.group.Length() + batchSize) } else { - p.group.SetLength(coldata.BatchSize) + p.group.SetLength(coldata.BatchSize()) p.buffer.SetLength(batchSize - leftover) } } diff --git a/pkg/sql/colexec/coalescer_test.go b/pkg/sql/colexec/coalescer_test.go index 75d192428475..ca9f1a73a7e3 100644 --- a/pkg/sql/colexec/coalescer_test.go +++ b/pkg/sql/colexec/coalescer_test.go @@ -21,7 +21,7 @@ import ( func TestCoalescer(t *testing.T) { // Large tuple number for coalescing. - nRows := coldata.BatchSize*3 + 7 + nRows := int(coldata.BatchSize()*3 + 7) large := make(tuples, nRows) largeTypes := []coltypes.T{coltypes.Int64} @@ -60,7 +60,7 @@ func TestCoalescer(t *testing.T) { func BenchmarkCoalescer(b *testing.B) { ctx := context.Background() // The input operator to the coalescer returns a batch of random size from [1, - // coldata.BatchSize) each time. + // coldata.BatchSize()) each time. nCols := 4 sourceTypes := make([]coltypes.T, nCols) @@ -72,14 +72,14 @@ func BenchmarkCoalescer(b *testing.B) { for colIdx := 0; colIdx < nCols; colIdx++ { col := batch.ColVec(colIdx).Int64() - for i := int64(0); i < coldata.BatchSize; i++ { + for i := int64(0); i < int64(coldata.BatchSize()); i++ { col[i] = i } } for _, nBatches := range []int{1 << 1, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} { - b.Run(fmt.Sprintf("rows=%d", nBatches*int(coldata.BatchSize)), func(b *testing.B) { - b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize) * nCols)) + b.Run(fmt.Sprintf("rows=%d", nBatches*int(coldata.BatchSize())), func(b *testing.B) { + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols)) b.ResetTimer() for i := 0; i < b.N; i++ { source := newRandomLengthBatchSource(batch) diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 289906f4fc5e..a1a3115133c3 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -24,7 +24,7 @@ import ( ) // Columnarizer turns an execinfra.RowSource input into an Operator output, by -// reading the input in chunks of size coldata.BatchSize and converting each +// reading the input in chunks of size coldata.BatchSize() and converting each // chunk into a coldata.Batch column by column. type Columnarizer struct { execinfra.ProcessorBase @@ -71,13 +71,13 @@ func NewColumnarizer( // EstimateStaticMemoryUsage is part of the StaticMemoryOperator // interface. func (c *Columnarizer) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes(c.typs, coldata.BatchSize) + return EstimateBatchSizeBytes(c.typs, int(coldata.BatchSize())) } // Init is part of the Operator interface. func (c *Columnarizer) Init() { c.batch = coldata.NewMemBatch(c.typs) - c.buffered = make(sqlbase.EncDatumRows, coldata.BatchSize) + c.buffered = make(sqlbase.EncDatumRows, coldata.BatchSize()) for i := range c.buffered { c.buffered[i] = make(sqlbase.EncDatumRow, len(c.typs)) } @@ -91,7 +91,7 @@ func (c *Columnarizer) Next(context.Context) coldata.Batch { // Buffer up n rows. nRows := uint16(0) columnTypes := c.OutputTypes() - for ; nRows < coldata.BatchSize; nRows++ { + for ; nRows < coldata.BatchSize(); nRows++ { row, meta := c.input.Next() if meta != nil { c.accumulatedMeta = append(c.accumulatedMeta, *meta) diff --git a/pkg/sql/colexec/columnarizer_test.go b/pkg/sql/colexec/columnarizer_test.go index 398b19347437..efb2d2331b47 100644 --- a/pkg/sql/colexec/columnarizer_test.go +++ b/pkg/sql/colexec/columnarizer_test.go @@ -30,7 +30,7 @@ func TestColumnarizerResetsInternalBatch(t *testing.T) { typs := []types.T{*types.Int} // There will be at least two batches of rows so that we can see whether the // internal batch is reset. - nRows := coldata.BatchSize * 2 + nRows := int(coldata.BatchSize()) * 2 nCols := len(typs) rows := sqlbase.MakeIntRows(nRows, nCols) input := execinfra.NewRepeatableRowSource(typs, rows) diff --git a/pkg/sql/colexec/const_tmpl.go b/pkg/sql/colexec/const_tmpl.go index 5093819b3bb7..9150086c008b 100644 --- a/pkg/sql/colexec/const_tmpl.go +++ b/pkg/sql/colexec/const_tmpl.go @@ -128,7 +128,7 @@ type constNullOp struct { var _ StaticMemoryOperator = &constNullOp{} func (c constNullOp) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes([]coltypes.T{c.typ}, coldata.BatchSize) + return EstimateBatchSizeBytes([]coltypes.T{c.typ}, int(coldata.BatchSize())) } func (c constNullOp) Init() { diff --git a/pkg/sql/colexec/deselector_test.go b/pkg/sql/colexec/deselector_test.go index 6a528f04843a..6374b627ec9c 100644 --- a/pkg/sql/colexec/deselector_test.go +++ b/pkg/sql/colexec/deselector_test.go @@ -86,16 +86,16 @@ func BenchmarkDeselector(b *testing.B) { for colIdx := 0; colIdx < nCols; colIdx++ { col := batch.ColVec(colIdx).Int64() - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { col[i] = int64(i) } } for _, probOfOmitting := range []float64{0.1, 0.9} { - sel := randomSel(rng, coldata.BatchSize, probOfOmitting) + sel := randomSel(rng, coldata.BatchSize(), probOfOmitting) batchLen := uint16(len(sel)) for _, nBatches := range []int{1 << 1, 1 << 2, 1 << 4, 1 << 8} { - b.Run(fmt.Sprintf("rows=%d/after selection=%d", nBatches*coldata.BatchSize, nBatches*int(batchLen)), func(b *testing.B) { + b.Run(fmt.Sprintf("rows=%d/after selection=%d", nBatches*int(coldata.BatchSize()), nBatches*int(batchLen)), func(b *testing.B) { // We're measuring the amount of data that is not selected out. b.SetBytes(int64(8 * nBatches * int(batchLen) * nCols)) batch.SetSelection(true) diff --git a/pkg/sql/colexec/distinct_test.go b/pkg/sql/colexec/distinct_test.go index daa6f5bbab21..123a4e5a0c72 100644 --- a/pkg/sql/colexec/distinct_test.go +++ b/pkg/sql/colexec/distinct_test.go @@ -119,7 +119,7 @@ func BenchmarkSortedDistinct(b *testing.B) { bCol := batch.ColVec(2).Int64() lastA := int64(0) lastB := int64(0) - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { // 1/4 chance of changing each distinct coldata. if rng.Float64() > 0.75 { lastA++ @@ -130,7 +130,7 @@ func BenchmarkSortedDistinct(b *testing.B) { aCol[i] = lastA bCol[i] = lastB } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) source := NewRepeatableBatchSource(batch) source.Init() @@ -143,14 +143,14 @@ func BenchmarkSortedDistinct(b *testing.B) { for _, nulls := range []bool{false, true} { b.Run(fmt.Sprintf("nulls=%t", nulls), func(b *testing.B) { if nulls { - n := coldata.NewNulls(coldata.BatchSize) + n := coldata.NewNulls(int(coldata.BatchSize())) // Setting one value to null is enough to trigger the null handling // logic for the entire batch. n.SetNull(0) batch.ColVec(1).SetNulls(&n) batch.ColVec(2).SetNulls(&n) } - b.SetBytes(int64(8 * coldata.BatchSize * 3)) + b.SetBytes(int64(8 * coldata.BatchSize() * 3)) for i := 0; i < b.N; i++ { distinct.Next(ctx) } diff --git a/pkg/sql/colexec/distinct_tmpl.go b/pkg/sql/colexec/distinct_tmpl.go index 2e75fbedae86..6b42d4006d60 100644 --- a/pkg/sql/colexec/distinct_tmpl.go +++ b/pkg/sql/colexec/distinct_tmpl.go @@ -39,7 +39,7 @@ import ( func OrderedDistinctColsToOperators( input Operator, distinctCols []uint32, typs []coltypes.T, ) (Operator, []bool, error) { - distinctCol := make([]bool, coldata.BatchSize) + distinctCol := make([]bool, coldata.BatchSize()) // zero the boolean column on every iteration. input = fnOp{ OneInputNode: NewOneInputNode(input), @@ -61,7 +61,7 @@ func OrderedDistinctColsToOperators( } distinctChain := &distinctChainOps{ resettableOperator: r, - estimatedStaticMemoryUsage: EstimateBatchSizeBytes([]coltypes.T{coltypes.Bool}, coldata.BatchSize), + estimatedStaticMemoryUsage: EstimateBatchSizeBytes([]coltypes.T{coltypes.Bool}, int(coldata.BatchSize())), } return distinctChain, distinctCol, nil } diff --git a/pkg/sql/colexec/execgen/cmd/execgen/projection_ops_gen.go b/pkg/sql/colexec/execgen/cmd/execgen/projection_ops_gen.go index fe3e9d0d713b..4576a8c8be12 100644 --- a/pkg/sql/colexec/execgen/cmd/execgen/projection_ops_gen.go +++ b/pkg/sql/colexec/execgen/cmd/execgen/projection_ops_gen.go @@ -46,7 +46,7 @@ type {{template "opRConstName" .}} struct { } func (p {{template "opRConstName" .}}) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes([]coltypes.T{coltypes.{{.RetTyp}}}, coldata.BatchSize) + return EstimateBatchSizeBytes([]coltypes.T{coltypes.{{.RetTyp}}}, int(coldata.BatchSize())) } func (p {{template "opRConstName" .}}) Next(ctx context.Context) coldata.Batch { @@ -96,7 +96,7 @@ type {{template "opLConstName" .}} struct { } func (p {{template "opLConstName" .}}) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes([]coltypes.T{coltypes.{{.RetTyp}}}, coldata.BatchSize) + return EstimateBatchSizeBytes([]coltypes.T{coltypes.{{.RetTyp}}}, int(coldata.BatchSize())) } func (p {{template "opLConstName" .}}) Next(ctx context.Context) coldata.Batch { @@ -145,7 +145,7 @@ type {{template "opName" .}} struct { } func (p {{template "opName" .}}) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes([]coltypes.T{coltypes.{{.RetTyp}}}, coldata.BatchSize) + return EstimateBatchSizeBytes([]coltypes.T{coltypes.{{.RetTyp}}}, int(coldata.BatchSize())) } func (p {{template "opName" .}}) Next(ctx context.Context) coldata.Batch { diff --git a/pkg/sql/colexec/execpb/stats.go b/pkg/sql/colexec/execpb/stats.go index 30c98d38b003..f69cfdfc7a07 100644 --- a/pkg/sql/colexec/execpb/stats.go +++ b/pkg/sql/colexec/execpb/stats.go @@ -40,7 +40,7 @@ func (vs *VectorizedStats) Stats() map[string]string { } selectivity := float64(0) if vs.NumBatches > 0 { - selectivity = float64(vs.NumTuples) / float64(coldata.BatchSize*vs.NumBatches) + selectivity = float64(vs.NumTuples) / float64(int64(coldata.BatchSize())*vs.NumBatches) } return map[string]string{ batchesOutputTagSuffix: fmt.Sprintf("%d", vs.NumBatches), @@ -68,7 +68,7 @@ func (vs *VectorizedStats) StatsForQueryPlan() []string { } selectivity := float64(0) if vs.NumBatches > 0 { - selectivity = float64(vs.NumTuples) / float64(coldata.BatchSize*vs.NumBatches) + selectivity = float64(vs.NumTuples) / float64(int64(coldata.BatchSize())*vs.NumBatches) } return []string{ fmt.Sprintf("%s: %d", batchesOutputQueryPlanSuffix, vs.NumBatches), diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 51006ebabf7c..3f425972c12b 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -121,7 +121,7 @@ func NewHashAggregator( return nil, err } - distinctCol := make([]bool, coldata.BatchSize) + distinctCol := make([]bool, coldata.BatchSize()) grouper := &hashGrouper{ builder: builder, @@ -153,7 +153,7 @@ type hashGrouper struct { ht *hashTable // sel is an ordered list of indices to select representing the input rows. - // This selection vector is much bigger than coldata.BatchSize and should be + // This selection vector is much bigger than coldata.BatchSize() and should be // batched with the hashGrouper operator. sel []uint64 // distinct represents whether each corresponding row is part of a new group. @@ -228,11 +228,11 @@ func (op *hashGrouper) Next(ctx context.Context) coldata.Batch { } // Create and return the next batch of input to a maximum size of - // coldata.BatchSize. The rows in the new batch is specified by the corresponding + // coldata.BatchSize(). The rows in the new batch is specified by the corresponding // slice in the selection vector. nSelected := uint16(0) - batchEnd := op.batchStart + uint64(coldata.BatchSize) + batchEnd := op.batchStart + uint64(coldata.BatchSize()) if batchEnd > op.ht.size { batchEnd = op.ht.size } diff --git a/pkg/sql/colexec/hashjoiner.go b/pkg/sql/colexec/hashjoiner.go index 80541c52e86e..5dba55ed4865 100644 --- a/pkg/sql/colexec/hashjoiner.go +++ b/pkg/sql/colexec/hashjoiner.go @@ -86,7 +86,7 @@ type hashJoinerSourceSpec struct { // hashJoinEqOp performs a hash join on the input tables equality columns. // It requires that the output for every input batch in the probe phase fits -// within col.BatchSize, otherwise the behavior is undefined. A join is performed +// within col.BatchSize(), otherwise the behavior is undefined. A join is performed // and there is no guarantee on the ordering of the output columns. // // Before the build phase, all equality and output columns from the build table @@ -157,7 +157,7 @@ type hashJoinerSourceSpec struct { // 3. Now, head stores the keyID of the first match in the build table for every // probe table key. ht.same is used to select all build key matches for each // probe key, which are added to the resulting batch. Output batching is done -// to ensure that each batch is at most col.BatchSize. +// to ensure that each batch is at most col.BatchSize(). // // In the case that an outer join on the probe table side is performed, every // single probe row is kept even if its groupID is 0. If a groupID of 0 is @@ -304,7 +304,7 @@ func (hj *hashJoinEqOp) initEmitting() { func (hj *hashJoinEqOp) emitUnmatched() { nResults := uint16(0) - for nResults < coldata.BatchSize && hj.emittingUnmatchedState.rowIdx < hj.ht.size { + for nResults < coldata.BatchSize() && hj.emittingUnmatchedState.rowIdx < hj.ht.size { if !hj.prober.buildRowMatched[hj.emittingUnmatchedState.rowIdx] { hj.prober.buildIdx[nResults] = hj.emittingUnmatchedState.rowIdx nResults++ @@ -493,14 +493,14 @@ func makeHashTable( bucketSize: bucketSize, - groupID: make([]uint64, coldata.BatchSize), - toCheck: make([]uint16, coldata.BatchSize), - differs: make([]bool, coldata.BatchSize), + groupID: make([]uint64, coldata.BatchSize()), + toCheck: make([]uint16, coldata.BatchSize()), + differs: make([]bool, coldata.BatchSize()), - headID: make([]uint64, coldata.BatchSize), + headID: make([]uint64, coldata.BatchSize()), keys: make([]coldata.Vec, len(eqCols)), - buckets: make([]uint64, coldata.BatchSize), + buckets: make([]uint64, coldata.BatchSize()), allowNullEquality: allowNullEquality, } @@ -634,7 +634,7 @@ func (builder *hashJoinBuilder) exec(ctx context.Context) { nKeyCols := len(builder.spec.eqCols) batchStart := uint64(0) for batchStart < builder.ht.size { - batchEnd := batchStart + coldata.BatchSize + batchEnd := batchStart + uint64(coldata.BatchSize()) if batchEnd > builder.ht.size { batchEnd = builder.ht.size } @@ -778,7 +778,7 @@ func makeHashJoinProber( var probeRowUnmatched []bool if probe.outer { - probeRowUnmatched = make([]bool, coldata.BatchSize) + probeRowUnmatched = make([]bool, coldata.BatchSize()) } return &hashJoinProber{ @@ -786,8 +786,8 @@ func makeHashJoinProber( batch: coldata.NewMemBatch(outColTypes), - buildIdx: make([]uint64, coldata.BatchSize), - probeIdx: make([]uint16, coldata.BatchSize), + buildIdx: make([]uint64, coldata.BatchSize()), + probeIdx: make([]uint16, coldata.BatchSize()), spec: probe, build: build, diff --git a/pkg/sql/colexec/hashjoiner_test.go b/pkg/sql/colexec/hashjoiner_test.go index 625299e65702..0cf5c418091e 100644 --- a/pkg/sql/colexec/hashjoiner_test.go +++ b/pkg/sql/colexec/hashjoiner_test.go @@ -774,12 +774,12 @@ func BenchmarkHashJoiner(b *testing.B) { for colIdx := 0; colIdx < nCols; colIdx++ { col := batch.ColVec(colIdx).Int64() - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { col[i] = int64(i) } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) for _, hasNulls := range []bool{false, true} { b.Run(fmt.Sprintf("nulls=%v", hasNulls), func(b *testing.B) { @@ -801,10 +801,10 @@ func BenchmarkHashJoiner(b *testing.B) { for _, buildDistinct := range []bool{true, false} { b.Run(fmt.Sprintf("distinct=%v", buildDistinct), func(b *testing.B) { for _, nBatches := range []int{1 << 1, 1 << 8, 1 << 12} { - b.Run(fmt.Sprintf("rows=%d", nBatches*coldata.BatchSize), func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize (rows / + b.Run(fmt.Sprintf("rows=%d", nBatches*int(coldata.BatchSize())), func(b *testing.B) { + // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows / // batch) * nCols (number of columns / row) * 2 (number of sources). - b.SetBytes(int64(8 * nBatches * coldata.BatchSize * nCols * 2)) + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2)) b.ResetTimer() for i := 0; i < b.N; i++ { leftSource := newFiniteBatchSource(batch, nBatches) diff --git a/pkg/sql/colexec/hashjoiner_tmpl.go b/pkg/sql/colexec/hashjoiner_tmpl.go index 8834b87a7322..077fa4d70022 100644 --- a/pkg/sql/colexec/hashjoiner_tmpl.go +++ b/pkg/sql/colexec/hashjoiner_tmpl.go @@ -224,7 +224,7 @@ func _COLLECT_RIGHT_OUTER( } for { - if nResults >= coldata.BatchSize { + if nResults >= coldata.BatchSize() { prober.prevBatch = batch return nResults } @@ -267,7 +267,7 @@ func _COLLECT_NO_OUTER( for i := uint16(0); i < batchSize; i++ { currentID := prober.ht.headID[i] for currentID != 0 { - if nResults >= coldata.BatchSize { + if nResults >= coldata.BatchSize() { prober.prevBatch = batch return nResults } diff --git a/pkg/sql/colexec/like_ops_test.go b/pkg/sql/colexec/like_ops_test.go index 5303b7bb3b95..64c4b34cee02 100644 --- a/pkg/sql/colexec/like_ops_test.go +++ b/pkg/sql/colexec/like_ops_test.go @@ -102,7 +102,7 @@ func BenchmarkLikeOps(b *testing.B) { batch := coldata.NewMemBatch([]coltypes.T{coltypes.Bytes}) col := batch.ColVec(0).Bytes() width := 64 - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { col.Set(i, randutil.RandBytes(rng, width)) } @@ -110,12 +110,12 @@ func BenchmarkLikeOps(b *testing.B) { // everything out. prefix := "abc" suffix := "xyz" - for i := 0; i < coldata.BatchSize/2; i++ { + for i := 0; i < int(coldata.BatchSize())/2; i++ { copy(col.Get(i)[:3], prefix) copy(col.Get(i)[width-3:], suffix) } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) source := NewRepeatableBatchSource(batch) source.Init() @@ -148,7 +148,7 @@ func BenchmarkLikeOps(b *testing.B) { for _, tc := range testCases { b.Run(tc.name, func(b *testing.B) { tc.op.Init() - b.SetBytes(int64(width * coldata.BatchSize)) + b.SetBytes(int64(width * int(coldata.BatchSize()))) for i := 0; i < b.N; i++ { tc.op.Next(ctx) } diff --git a/pkg/sql/colexec/mergejoiner.go b/pkg/sql/colexec/mergejoiner.go index d7b7ee7f9d0c..df1c24439447 100644 --- a/pkg/sql/colexec/mergejoiner.go +++ b/pkg/sql/colexec/mergejoiner.go @@ -383,22 +383,22 @@ func (o *mergeJoinBase) EstimateStaticMemoryUsage() int { filterMemUsage = o.filter.EstimateStaticMemoryUsage() } return EstimateBatchSizeBytes( - o.getOutColTypes(), coldata.BatchSize, + o.getOutColTypes(), int(coldata.BatchSize()), ) + // base.output EstimateBatchSizeBytes( - o.left.sourceTypes, coldata.BatchSize, + o.left.sourceTypes, int(coldata.BatchSize()), ) + // base.proberState.lBufferedGroup EstimateBatchSizeBytes( - o.right.sourceTypes, coldata.BatchSize, + o.right.sourceTypes, int(coldata.BatchSize()), ) + // base.proberState.rBufferedGroup - 4*sizeOfGroup*coldata.BatchSize + // base.groups + 4*sizeOfGroup*int(coldata.BatchSize()) + // base.groups leftDistincter.EstimateStaticMemoryUsage() + // base.left.distincter rightDistincter.EstimateStaticMemoryUsage() + // base.right.distincter filterMemUsage } func (o *mergeJoinBase) Init() { - o.initWithOutputBatchSize(coldata.BatchSize) + o.initWithOutputBatchSize(coldata.BatchSize()) } func (o *mergeJoinBase) initWithOutputBatchSize(outBatchSize uint16) { @@ -418,7 +418,7 @@ func (o *mergeJoinBase) initWithOutputBatchSize(outBatchSize uint16) { o.builderState.lGroups = make([]group, 1) o.builderState.rGroups = make([]group, 1) - o.groups = makeGroupsBuffer(coldata.BatchSize) + o.groups = makeGroupsBuffer(int(coldata.BatchSize())) o.resetBuilderCrossProductState() if o.filter != nil { diff --git a/pkg/sql/colexec/mergejoiner_test.go b/pkg/sql/colexec/mergejoiner_test.go index 0c5e447c7f60..ba86048a0a48 100644 --- a/pkg/sql/colexec/mergejoiner_test.go +++ b/pkg/sql/colexec/mergejoiner_test.go @@ -44,7 +44,7 @@ type mjTestCase struct { func (tc *mjTestCase) Init() { if tc.outputBatchSize == 0 { - tc.outputBatchSize = coldata.BatchSize + tc.outputBatchSize = coldata.BatchSize() } if len(tc.leftDirections) == 0 { @@ -197,7 +197,7 @@ func TestMergeJoiner(t *testing.T) { expected: tuples{{1}, {1}, {2}, {2}, {2}, {3}, {4}}, }, { - description: "cross product test, batch size = 1024 (col.BatchSize)", + description: "cross product test, batch size = col.BatchSize()", leftTypes: []coltypes.T{coltypes.Int64}, rightTypes: []coltypes.T{coltypes.Int64}, leftTuples: tuples{{1}, {1}, {1}, {1}}, @@ -1472,10 +1472,10 @@ func TestMergeJoiner(t *testing.T) { func TestMergeJoinerMultiBatch(t *testing.T) { ctx := context.Background() for _, numInputBatches := range []int{1, 2, 16} { - for _, outBatchSize := range []uint16{1, 16, coldata.BatchSize} { + for _, outBatchSize := range []uint16{1, 16, coldata.BatchSize()} { t.Run(fmt.Sprintf("numInputBatches=%d", numInputBatches), func(t *testing.T) { - nTuples := coldata.BatchSize * numInputBatches + nTuples := int(coldata.BatchSize()) * numInputBatches typs := []coltypes.T{coltypes.Int64} cols := []coldata.Vec{coldata.NewMemColumn(typs[0], nTuples)} groups := cols[0].Int64() @@ -1535,11 +1535,11 @@ func TestMergeJoinerMultiBatch(t *testing.T) { // correctly. func TestMergeJoinerMultiBatchRuns(t *testing.T) { ctx := context.Background() - for _, groupSize := range []int{coldata.BatchSize / 8, coldata.BatchSize / 4, coldata.BatchSize / 2} { + for _, groupSize := range []int{int(coldata.BatchSize()) / 8, int(coldata.BatchSize()) / 4, int(coldata.BatchSize()) / 2} { for _, numInputBatches := range []int{1, 2, 16} { t.Run(fmt.Sprintf("groupSize=%d/numInputBatches=%d", groupSize, numInputBatches), func(t *testing.T) { - nTuples := coldata.BatchSize * numInputBatches + nTuples := int(coldata.BatchSize()) * numInputBatches typs := []coltypes.T{coltypes.Int64, coltypes.Int64} cols := []coldata.Vec{coldata.NewMemColumn(typs[0], nTuples), coldata.NewMemColumn(typs[1], nTuples)} for i := range cols[0].Int64() { @@ -1588,9 +1588,9 @@ func TestMergeJoinerMultiBatchRuns(t *testing.T) { i++ } - if count != groupSize*coldata.BatchSize*numInputBatches { + if count != groupSize*int(coldata.BatchSize())*numInputBatches { t.Fatalf("found count %d, expected count %d", - count, groupSize*coldata.BatchSize*numInputBatches) + count, groupSize*int(coldata.BatchSize())*numInputBatches) } }) } @@ -1602,12 +1602,12 @@ func TestMergeJoinerMultiBatchRuns(t *testing.T) { // correctly. func TestMergeJoinerLongMultiBatchCount(t *testing.T) { ctx := context.Background() - for _, groupSize := range []int{1, 2, coldata.BatchSize / 4, coldata.BatchSize / 2} { + for _, groupSize := range []int{1, 2, int(coldata.BatchSize()) / 4, int(coldata.BatchSize()) / 2} { for _, numInputBatches := range []int{1, 2, 16} { - for _, outBatchSize := range []uint16{1, 16, coldata.BatchSize} { + for _, outBatchSize := range []uint16{1, 16, coldata.BatchSize()} { t.Run(fmt.Sprintf("groupSize=%d/numInputBatches=%d", groupSize, numInputBatches), func(t *testing.T) { - nTuples := coldata.BatchSize * numInputBatches + nTuples := int(coldata.BatchSize()) * numInputBatches typs := []coltypes.T{coltypes.Int64} cols := []coldata.Vec{coldata.NewMemColumn(typs[0], nTuples)} groups := cols[0].Int64() @@ -1656,11 +1656,11 @@ func TestMergeJoinerLongMultiBatchCount(t *testing.T) { // correctly. func TestMergeJoinerMultiBatchCountRuns(t *testing.T) { ctx := context.Background() - for _, groupSize := range []int{coldata.BatchSize / 8, coldata.BatchSize / 4, coldata.BatchSize / 2} { + for _, groupSize := range []int{int(coldata.BatchSize()) / 8, int(coldata.BatchSize()) / 4, int(coldata.BatchSize()) / 2} { for _, numInputBatches := range []int{1, 2, 16} { t.Run(fmt.Sprintf("groupSize=%d/numInputBatches=%d", groupSize, numInputBatches), func(t *testing.T) { - nTuples := coldata.BatchSize * numInputBatches + nTuples := int(coldata.BatchSize()) * numInputBatches typs := []coltypes.T{coltypes.Int64} cols := []coldata.Vec{coldata.NewMemColumn(typs[0], nTuples)} groups := cols[0].Int64() @@ -1694,9 +1694,9 @@ func TestMergeJoinerMultiBatchCountRuns(t *testing.T) { for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) { count += int(b.Length()) } - if count != groupSize*coldata.BatchSize*numInputBatches { + if count != groupSize*int(coldata.BatchSize())*numInputBatches { t.Fatalf("found count %d, expected count %d", - count, groupSize*coldata.BatchSize*numInputBatches) + count, groupSize*int(coldata.BatchSize())*numInputBatches) } }) } @@ -1782,7 +1782,7 @@ func TestMergeJoinerRandomized(t *testing.T) { for _, randomIncrement := range []int64{0, 1} { t.Run(fmt.Sprintf("numInputBatches=%dmaxRunLength=%dskipValues=%trandomIncrement=%d", numInputBatches, maxRunLength, skipValues, randomIncrement), func(t *testing.T) { - nTuples := coldata.BatchSize * numInputBatches + nTuples := int(coldata.BatchSize()) * numInputBatches typs := []coltypes.T{coltypes.Int64} lCols, rCols, exp := newBatchesOfRandIntRows(nTuples, typs, maxRunLength, skipValues, randomIncrement) leftSource := newChunkingBatchSource(typs, lCols, uint64(nTuples)) @@ -1839,12 +1839,12 @@ func TestMergeJoinerRandomized(t *testing.T) { func newBatchOfIntRows(nCols int, batch coldata.Batch) coldata.Batch { for colIdx := 0; colIdx < nCols; colIdx++ { col := batch.ColVec(colIdx).Int64() - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { col[i] = int64(i) } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) for colIdx := 0; colIdx < nCols; colIdx++ { vec := batch.ColVec(colIdx) @@ -1856,12 +1856,12 @@ func newBatchOfIntRows(nCols int, batch coldata.Batch) coldata.Batch { func newBatchOfRepeatedIntRows(nCols int, batch coldata.Batch, numRepeats int) coldata.Batch { for colIdx := 0; colIdx < nCols; colIdx++ { col := batch.ColVec(colIdx).Int64() - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { col[i] = int64((i + 1) / numRepeats) } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) for colIdx := 0; colIdx < nCols; colIdx++ { vec := batch.ColVec(colIdx) @@ -1883,10 +1883,10 @@ func BenchmarkMergeJoiner(b *testing.B) { // 1:1 join. for _, nBatches := range []int{1, 4, 16, 1024} { - b.Run(fmt.Sprintf("rows=%d", nBatches*coldata.BatchSize), func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize (rows / + b.Run(fmt.Sprintf("rows=%d", nBatches*int(coldata.BatchSize())), func(b *testing.B) { + // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows / // batch) * nCols (number of columns / row) * 2 (number of sources). - b.SetBytes(int64(8 * nBatches * coldata.BatchSize * nCols * 2)) + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2)) b.ResetTimer() for i := 0; i < b.N; i++ { leftSource := newFiniteBatchSource(newBatchOfIntRows(nCols, batch), nBatches) @@ -1924,10 +1924,10 @@ func BenchmarkMergeJoiner(b *testing.B) { // Groups on left side. for _, nBatches := range []int{1, 4, 16, 1024} { - b.Run(fmt.Sprintf("oneSideRepeat-rows=%d", nBatches*coldata.BatchSize), func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize (rows / + b.Run(fmt.Sprintf("oneSideRepeat-rows=%d", nBatches*int(coldata.BatchSize())), func(b *testing.B) { + // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows / // batch) * nCols (number of columns / row) * 2 (number of sources). - b.SetBytes(int64(8 * nBatches * coldata.BatchSize * nCols * 2)) + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2)) b.ResetTimer() for i := 0; i < b.N; i++ { leftSource := newFiniteBatchSource(newBatchOfRepeatedIntRows(nCols, batch, nBatches), nBatches) @@ -1965,11 +1965,11 @@ func BenchmarkMergeJoiner(b *testing.B) { // Groups on both sides. for _, nBatches := range []int{1, 4, 16, 32} { numRepeats := nBatches - b.Run(fmt.Sprintf("bothSidesRepeat-rows=%d", nBatches*coldata.BatchSize), func(b *testing.B) { + b.Run(fmt.Sprintf("bothSidesRepeat-rows=%d", nBatches*int(coldata.BatchSize())), func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize (rows / + // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows / // batch) * nCols (number of columns / row) * 2 (number of sources). - b.SetBytes(int64(8 * nBatches * coldata.BatchSize * nCols * 2)) + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols * 2)) b.ResetTimer() for i := 0; i < b.N; i++ { leftSource := newFiniteBatchSource(newBatchOfRepeatedIntRows(nCols, batch, numRepeats), nBatches) diff --git a/pkg/sql/colexec/mergejoiner_util.go b/pkg/sql/colexec/mergejoiner_util.go index f03a7ca01539..0f123243ed75 100644 --- a/pkg/sql/colexec/mergejoiner_util.go +++ b/pkg/sql/colexec/mergejoiner_util.go @@ -209,7 +209,7 @@ func newMJBufferedGroup(types []coltypes.T) *mjBufferedGroup { colVecs: make([]coldata.Vec, len(types)), } for i, t := range types { - bg.colVecs[i] = coldata.NewMemColumn(t, coldata.BatchSize) + bg.colVecs[i] = coldata.NewMemColumn(t, int(coldata.BatchSize())) } return bg } diff --git a/pkg/sql/colexec/offset_test.go b/pkg/sql/colexec/offset_test.go index 5dd6f866c425..cf6f5ab180b8 100644 --- a/pkg/sql/colexec/offset_test.go +++ b/pkg/sql/colexec/offset_test.go @@ -63,13 +63,13 @@ func TestOffset(t *testing.T) { func BenchmarkOffset(b *testing.B) { ctx := context.Background() batch := coldata.NewMemBatch([]coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64}) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) source := NewRepeatableBatchSource(batch) source.Init() o := NewOffsetOp(source, 1) // Set throughput proportional to size of the selection vector. - b.SetBytes(2 * coldata.BatchSize) + b.SetBytes(int64(2 * coldata.BatchSize())) for i := 0; i < b.N; i++ { o.(*offsetOp).Reset() o.Next(ctx) diff --git a/pkg/sql/colexec/orderedsynchronizer.go b/pkg/sql/colexec/orderedsynchronizer.go index bbb47efc3a48..29ed32a8432a 100644 --- a/pkg/sql/colexec/orderedsynchronizer.go +++ b/pkg/sql/colexec/orderedsynchronizer.go @@ -64,7 +64,7 @@ func NewOrderedSynchronizer( // EstimateStaticMemoryUsage implements the StaticMemoryOperator interface. func (o *OrderedSynchronizer) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes(o.columnTypes, coldata.BatchSize) + return EstimateBatchSizeBytes(o.columnTypes, int(coldata.BatchSize())) } // Next is part of the Operator interface. @@ -78,7 +78,7 @@ func (o *OrderedSynchronizer) Next(ctx context.Context) coldata.Batch { } o.output.ResetInternalBatch() outputIdx := uint16(0) - for outputIdx < coldata.BatchSize { + for outputIdx < coldata.BatchSize() { // Determine the batch with the smallest row. minBatch := -1 for i := range o.inputs { diff --git a/pkg/sql/colexec/orderedsynchronizer_test.go b/pkg/sql/colexec/orderedsynchronizer_test.go index 0b66ab8a68de..14e291854f7f 100644 --- a/pkg/sql/colexec/orderedsynchronizer_test.go +++ b/pkg/sql/colexec/orderedsynchronizer_test.go @@ -203,9 +203,9 @@ func BenchmarkOrderedSynchronizer(b *testing.B) { batches := make([]coldata.Batch, numInputs) for i := range batches { batches[i] = coldata.NewMemBatch([]coltypes.T{coltypes.Int64}) - batches[i].SetLength(coldata.BatchSize) + batches[i].SetLength(coldata.BatchSize()) } - for i := int64(0); i < coldata.BatchSize*numInputs; i++ { + for i := int64(0); i < int64(coldata.BatchSize())*numInputs; i++ { batch := batches[i%numInputs] batch.ColVec(0).Int64()[i/numInputs] = i } @@ -224,7 +224,7 @@ func BenchmarkOrderedSynchronizer(b *testing.B) { } op.Init() - b.SetBytes(int64(8 * coldata.BatchSize * numInputs)) + b.SetBytes(8 * int64(coldata.BatchSize()) * numInputs) b.ResetTimer() for i := 0; i < b.N; i++ { op.Next(ctx) diff --git a/pkg/sql/colexec/ordinality.go b/pkg/sql/colexec/ordinality.go index 6b610d46ee32..2f778fec0e28 100644 --- a/pkg/sql/colexec/ordinality.go +++ b/pkg/sql/colexec/ordinality.go @@ -33,7 +33,7 @@ type ordinalityOp struct { var _ StaticMemoryOperator = &ordinalityOp{} func (c *ordinalityOp) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes([]coltypes.T{coltypes.Int64}, coldata.BatchSize) + return EstimateBatchSizeBytes([]coltypes.T{coltypes.Int64}, int(coldata.BatchSize())) } const colNotAppended = -1 diff --git a/pkg/sql/colexec/ordinality_test.go b/pkg/sql/colexec/ordinality_test.go index 9536b3709b4c..7ffe9c312f19 100644 --- a/pkg/sql/colexec/ordinality_test.go +++ b/pkg/sql/colexec/ordinality_test.go @@ -53,13 +53,13 @@ func BenchmarkOrdinality(b *testing.B) { ctx := context.Background() batch := coldata.NewMemBatch([]coltypes.T{coltypes.Int64, coltypes.Int64, coltypes.Int64}) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) source := NewRepeatableBatchSource(batch) source.Init() ordinality := NewOrdinalityOp(source) - b.SetBytes(int64(8 * coldata.BatchSize * batch.Width())) + b.SetBytes(int64(8 * int(coldata.BatchSize()) * batch.Width())) for i := 0; i < b.N; i++ { ordinality.Next(ctx) } diff --git a/pkg/sql/colexec/projection_ops_test.go b/pkg/sql/colexec/projection_ops_test.go index 0a54ffbfaa01..18100397f7e4 100644 --- a/pkg/sql/colexec/projection_ops_test.go +++ b/pkg/sql/colexec/projection_ops_test.go @@ -57,21 +57,21 @@ func benchmarkProjPlusInt64Int64ConstOp(b *testing.B, useSelectionVector bool, h batch := coldata.NewMemBatch([]coltypes.T{coltypes.Int64, coltypes.Int64}) col := batch.ColVec(0).Int64() - for i := int64(0); i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { col[i] = 1 } if hasNulls { - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { if rand.Float64() < nullProbability { batch.ColVec(0).Nulls().SetNull(uint16(i)) } } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) if useSelectionVector { batch.SetSelection(true) sel := batch.Selection() - for i := int64(0); i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { sel[i] = uint16(i) } } @@ -88,7 +88,7 @@ func benchmarkProjPlusInt64Int64ConstOp(b *testing.B, useSelectionVector bool, h } plusOp.Init() - b.SetBytes(int64(8 * coldata.BatchSize)) + b.SetBytes(int64(8 * coldata.BatchSize())) for i := 0; i < b.N; i++ { plusOp.Next(ctx) } @@ -191,14 +191,14 @@ func benchmarkProjOp( case coltypes.Int64: col1 := batch.ColVec(0).Int64() col2 := batch.ColVec(1).Int64() - for i := int64(0); i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { col1[i] = 1 col2[i] = 1 } case coltypes.Int32: col1 := batch.ColVec(0).Int32() col2 := batch.ColVec(1).Int32() - for i := int32(0); i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { col1[i] = 1 col2[i] = 1 } @@ -206,7 +206,7 @@ func benchmarkProjOp( b.Fatalf("unsupported type: %s", intType) } if hasNulls { - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { if rand.Float64() < nullProbability { batch.ColVec(0).Nulls().SetNull(uint16(i)) } @@ -215,12 +215,12 @@ func benchmarkProjOp( } } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) if useSelectionVector { batch.SetSelection(true) sel := batch.Selection() - for i := int64(0); i < coldata.BatchSize; i++ { - sel[i] = uint16(i) + for i := uint16(0); i < coldata.BatchSize(); i++ { + sel[i] = i } } source := NewRepeatableBatchSource(batch) @@ -229,7 +229,7 @@ func benchmarkProjOp( op := makeProjOp(source, intType) op.Init() - b.SetBytes(int64(8 * coldata.BatchSize * 2)) + b.SetBytes(int64(8 * coldata.BatchSize() * 2)) for i := 0; i < b.N; i++ { op.Next(ctx) } diff --git a/pkg/sql/colexec/random_testutils.go b/pkg/sql/colexec/random_testutils.go index 55d207c18e9c..9e1432c4ead7 100644 --- a/pkg/sql/colexec/random_testutils.go +++ b/pkg/sql/colexec/random_testutils.go @@ -160,7 +160,6 @@ func randomBatchWithSel( const ( defaultMaxSchemaLength = 8 - defaultBatchSize = coldata.BatchSize defaultNumBatches = 4 ) @@ -178,7 +177,7 @@ type RandomDataOpArgs struct { // MaxSchemaLength is the maximum length of the operator's schema, which will // be at least one type. MaxSchemaLength int - // BatchSize is the size of batches returned. + // BatchSize() is the size of batches returned. BatchSize int // NumBatches is the number of batches returned before the final, zero batch. NumBatches int @@ -211,7 +210,7 @@ func NewRandomDataOp(rng *rand.Rand, args RandomDataOpArgs) *RandomDataOp { var ( availableTyps = coltypes.AllTypes maxSchemaLength = defaultMaxSchemaLength - batchSize = defaultBatchSize + batchSize = int(coldata.BatchSize()) numBatches = defaultNumBatches ) if args.AvailableTyps != nil { diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index 56ca36ff81c3..cac1277a5df7 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -37,7 +37,7 @@ type routerOutput interface { // defaultRouterOutputBlockedThreshold is the number of unread values buffered // by the routerOutputOp after which the output is considered blocked. -const defaultRouterOutputBlockedThreshold = coldata.BatchSize * 2 +var defaultRouterOutputBlockedThreshold = int(coldata.BatchSize() * 2) type routerOutputOp struct { // input is a reference to our router. @@ -62,7 +62,7 @@ type routerOutputOp struct { } // These fields default to defaultRouterOutputBlockedThreshold and - // coldata.BatchSize but are modified by tests to test edge cases. + // coldata.BatchSize() but are modified by tests to test edge cases. // blockedThreshold is the number of buffered values above which we consider // a router output to be blocked. blockedThreshold int @@ -89,7 +89,7 @@ var _ Operator = &routerOutputOp{} // it. func newRouterOutputOp(types []coltypes.T, unblockedEventsChan chan<- struct{}) *routerOutputOp { return newRouterOutputOpWithBlockedThresholdAndBatchSize( - types, unblockedEventsChan, defaultRouterOutputBlockedThreshold, coldata.BatchSize, + types, unblockedEventsChan, defaultRouterOutputBlockedThreshold, int(coldata.BatchSize()), ) } @@ -341,10 +341,10 @@ func newHashRouterWithOutputs( outputs: outputs, unblockedEventsChan: unblockEventsChan, } - r.scratch.buckets = make([]uint64, coldata.BatchSize) + r.scratch.buckets = make([]uint64, coldata.BatchSize()) r.scratch.selections = make([][]uint16, len(outputs)) for i := range r.scratch.selections { - r.scratch.selections[i] = make([]uint16, 0, coldata.BatchSize) + r.scratch.selections[i] = make([]uint16, 0, coldata.BatchSize()) } return r } diff --git a/pkg/sql/colexec/routers_test.go b/pkg/sql/colexec/routers_test.go index c3345fa18917..9135962acd05 100644 --- a/pkg/sql/colexec/routers_test.go +++ b/pkg/sql/colexec/routers_test.go @@ -32,8 +32,8 @@ import ( // a one-column coltypes.Int64 batch where each element is its ordinal and an // accompanying selection vector that selects every index in tuples. func getDataAndFullSelection() (tuples, []uint16) { - data := make(tuples, coldata.BatchSize) - fullSelection := make([]uint16, coldata.BatchSize) + data := make(tuples, coldata.BatchSize()) + fullSelection := make([]uint16, coldata.BatchSize()) for i := range data { data[i] = tuple{i} fullSelection[i] = uint16(i) @@ -59,14 +59,14 @@ func TestRouterOutputAddBatch(t *testing.T) { name string }{ { - inputBatchSize: coldata.BatchSize, - outputBatchSize: coldata.BatchSize, + inputBatchSize: coldata.BatchSize(), + outputBatchSize: int(coldata.BatchSize()), blockedThreshold: defaultRouterOutputBlockedThreshold, selection: fullSelection, name: "OneBatch", }, { - inputBatchSize: coldata.BatchSize, + inputBatchSize: coldata.BatchSize(), outputBatchSize: 4, blockedThreshold: defaultRouterOutputBlockedThreshold, selection: fullSelection, @@ -74,14 +74,14 @@ func TestRouterOutputAddBatch(t *testing.T) { }, { inputBatchSize: 4, - outputBatchSize: coldata.BatchSize, + outputBatchSize: int(coldata.BatchSize()), blockedThreshold: defaultRouterOutputBlockedThreshold, selection: fullSelection, name: "MultipleInputBatchesLTOutputSize", }, { - inputBatchSize: coldata.BatchSize, - outputBatchSize: coldata.BatchSize, + inputBatchSize: coldata.BatchSize(), + outputBatchSize: int(coldata.BatchSize()), blockedThreshold: defaultRouterOutputBlockedThreshold, selection: fullSelection[:len(fullSelection)/4], name: "QuarterSelection", @@ -177,7 +177,7 @@ func TestRouterOutputNext(t *testing.T) { var wg sync.WaitGroup batchChan := make(chan coldata.Batch) o := newRouterOutputOp([]coltypes.T{coltypes.Int64}, unblockedEventsChan) - in := newOpTestInput(coldata.BatchSize, data, nil /* typs */) + in := newOpTestInput(coldata.BatchSize(), data, nil /* typs */) in.Init() wg.Add(1) go func() { @@ -255,7 +255,7 @@ func TestRouterOutputNext(t *testing.T) { ch := make(chan struct{}, 2) o := newRouterOutputOpWithBlockedThresholdAndBatchSize( - []coltypes.T{coltypes.Int64}, ch, blockThreshold, coldata.BatchSize, + []coltypes.T{coltypes.Int64}, ch, blockThreshold, int(coldata.BatchSize()), ) in := newOpTestInput(smallBatchSize, data, nil /* typs */) out := newOpTestOutput(o, expected) @@ -301,8 +301,8 @@ func TestRouterOutputRandom(t *testing.T) { rng, _ := randutil.NewPseudoRand() - const maxValues = coldata.BatchSize * 4 var ( + maxValues = int(coldata.BatchSize()) * 4 blockedThreshold = 1 + rng.Intn(maxValues-1) outputSize = 1 + rng.Intn(maxValues-1) ) @@ -437,14 +437,14 @@ func TestHashRouterComputesDestination(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - data := make(tuples, coldata.BatchSize) + data := make(tuples, coldata.BatchSize()) valsYetToSee := make(map[int64]struct{}) for i := range data { data[i] = tuple{i} valsYetToSee[int64(i)] = struct{}{} } - in := newOpTestInput(coldata.BatchSize, data, nil /* typs */) + in := newOpTestInput(coldata.BatchSize(), data, nil /* typs */) in.Init() var ( @@ -514,7 +514,7 @@ func TestHashRouterCancellation(t *testing.T) { // Never-ending input of 0s. batch := coldata.NewMemBatch([]coltypes.T{coltypes.Int64}) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) in := NewRepeatableBatchSource(batch) unbufferedCh := make(chan struct{}) @@ -603,7 +603,7 @@ func TestHashRouterOneOutput(t *testing.T) { rng, _ := randutil.NewPseudoRand() - sel := randomSel(rng, coldata.BatchSize, rng.Float64()) + sel := randomSel(rng, coldata.BatchSize(), rng.Float64()) data, _ := getDataAndFullSelection() typs := []coltypes.T{coltypes.Int64} @@ -642,12 +642,9 @@ func TestHashRouterRandom(t *testing.T) { rng, _ := randutil.NewPseudoRand() - const ( - maxValues = coldata.BatchSize * 4 - maxOutputs = coldata.BatchSize - ) - var ( + maxValues = int(coldata.BatchSize()) * 4 + maxOutputs = int(coldata.BatchSize()) blockedThreshold = 1 + rng.Intn(maxValues-1) outputSize = 1 + rng.Intn(maxValues-1) numOutputs = 1 + rng.Intn(maxOutputs-1) @@ -780,7 +777,7 @@ func BenchmarkHashRouter(b *testing.B) { // Use only one type. Note: the more types you use, the more you inflate the // numbers. batch := coldata.NewMemBatch(types) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) input := NewRepeatableBatchSource(batch) var wg sync.WaitGroup @@ -788,7 +785,7 @@ func BenchmarkHashRouter(b *testing.B) { for _, numInputBatches := range []int{2, 4, 8, 16} { b.Run(fmt.Sprintf("numOutputs=%d/numInputBatches=%d", numOutputs, numInputBatches), func(b *testing.B) { r, outputs := NewHashRouter(input, types, []int{0}, numOutputs) - b.SetBytes(8 * coldata.BatchSize * int64(numInputBatches)) + b.SetBytes(8 * int64(coldata.BatchSize()) * int64(numInputBatches)) // We expect distribution to not change. This is a sanity check that // we're resetting properly. var expectedDistribution []int @@ -821,8 +818,8 @@ func BenchmarkHashRouter(b *testing.B) { for i := range actualDistribution { sum += actualDistribution[i] } - if sum != numInputBatches*coldata.BatchSize { - b.Fatalf("unexpected sum %d, expected %d", sum, numInputBatches*coldata.BatchSize) + if sum != numInputBatches*int(coldata.BatchSize()) { + b.Fatalf("unexpected sum %d, expected %d", sum, numInputBatches*int(coldata.BatchSize())) } if expectedDistribution == nil { expectedDistribution = make([]int, len(actualDistribution)) diff --git a/pkg/sql/colexec/select_in_test.go b/pkg/sql/colexec/select_in_test.go index 9bd8b332ffb2..be4eab0bb826 100644 --- a/pkg/sql/colexec/select_in_test.go +++ b/pkg/sql/colexec/select_in_test.go @@ -95,8 +95,8 @@ func benchmarkSelectInInt64(b *testing.B, useSelectionVector bool, hasNulls bool batch := coldata.NewMemBatch([]coltypes.T{coltypes.Int64}) col1 := batch.ColVec(0).Int64() - for i := int64(0); i < coldata.BatchSize; i++ { - if float64(i) < coldata.BatchSize*selectivity { + for i := 0; i < int(coldata.BatchSize()); i++ { + if float64(i) < float64(coldata.BatchSize())*selectivity { col1[i] = -1 } else { col1[i] = 1 @@ -104,20 +104,20 @@ func benchmarkSelectInInt64(b *testing.B, useSelectionVector bool, hasNulls bool } if hasNulls { - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { if rand.Float64() < nullProbability { batch.ColVec(0).Nulls().SetNull(uint16(i)) } } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) if useSelectionVector { batch.SetSelection(true) sel := batch.Selection() - for i := int64(0); i < coldata.BatchSize; i++ { - sel[i] = uint16(i) + for i := uint16(0); i < coldata.BatchSize(); i++ { + sel[i] = i } } @@ -130,7 +130,7 @@ func benchmarkSelectInInt64(b *testing.B, useSelectionVector bool, hasNulls bool } inOp.Init() - b.SetBytes(int64(8 * coldata.BatchSize)) + b.SetBytes(int64(8 * coldata.BatchSize())) b.ResetTimer() for i := 0; i < b.N; i++ { inOp.Next(ctx) diff --git a/pkg/sql/colexec/select_in_tmpl.go b/pkg/sql/colexec/select_in_tmpl.go index 277fb838e466..8848260a38fb 100644 --- a/pkg/sql/colexec/select_in_tmpl.go +++ b/pkg/sql/colexec/select_in_tmpl.go @@ -141,7 +141,7 @@ type projectInOp_TYPE struct { var _ StaticMemoryOperator = &projectInOp_TYPE{} func (p *projectInOp_TYPE) EstimateStaticMemoryUsage() int { - return EstimateBatchSizeBytes([]coltypes.T{coltypes.Bool}, coldata.BatchSize) + return EstimateBatchSizeBytes([]coltypes.T{coltypes.Bool}, int(coldata.BatchSize())) } func fillDatumRow_TYPE(ct *types.T, datumTuple *tree.DTuple) ([]_GOTYPE, bool, error) { diff --git a/pkg/sql/colexec/selection_ops_test.go b/pkg/sql/colexec/selection_ops_test.go index 0fa639f0efa5..53cb2c6c45f2 100644 --- a/pkg/sql/colexec/selection_ops_test.go +++ b/pkg/sql/colexec/selection_ops_test.go @@ -134,26 +134,26 @@ func benchmarkSelLTInt64Int64ConstOp(b *testing.B, useSelectionVector bool, hasN batch := coldata.NewMemBatch([]coltypes.T{coltypes.Int64}) col := batch.ColVec(0).Int64() - for i := int64(0); i < coldata.BatchSize; i++ { - if float64(i) < coldata.BatchSize*selectivity { + for i := 0; i < int(coldata.BatchSize()); i++ { + if float64(i) < float64(coldata.BatchSize())*selectivity { col[i] = -1 } else { col[i] = 1 } } if hasNulls { - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { if rand.Float64() < nullProbability { batch.ColVec(0).Nulls().SetNull(uint16(i)) } } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) if useSelectionVector { batch.SetSelection(true) sel := batch.Selection() - for i := int64(0); i < coldata.BatchSize; i++ { - sel[i] = uint16(i) + for i := uint16(0); i < coldata.BatchSize(); i++ { + sel[i] = i } } source := NewRepeatableBatchSource(batch) @@ -168,7 +168,7 @@ func benchmarkSelLTInt64Int64ConstOp(b *testing.B, useSelectionVector bool, hasN } plusOp.Init() - b.SetBytes(int64(8 * coldata.BatchSize)) + b.SetBytes(int64(8 * coldata.BatchSize())) b.ResetTimer() for i := 0; i < b.N; i++ { plusOp.Next(ctx) @@ -191,15 +191,15 @@ func benchmarkSelLTInt64Int64Op(b *testing.B, useSelectionVector bool, hasNulls batch := coldata.NewMemBatch([]coltypes.T{coltypes.Int64, coltypes.Int64}) col1 := batch.ColVec(0).Int64() col2 := batch.ColVec(1).Int64() - for i := int64(0); i < coldata.BatchSize; i++ { - if float64(i) < coldata.BatchSize*selectivity { + for i := 0; i < int(coldata.BatchSize()); i++ { + if float64(i) < float64(coldata.BatchSize())*selectivity { col1[i], col2[i] = -1, 1 } else { col1[i], col2[i] = 1, -1 } } if hasNulls { - for i := 0; i < coldata.BatchSize; i++ { + for i := 0; i < int(coldata.BatchSize()); i++ { if rand.Float64() < nullProbability { batch.ColVec(0).Nulls().SetNull(uint16(i)) } @@ -208,12 +208,12 @@ func benchmarkSelLTInt64Int64Op(b *testing.B, useSelectionVector bool, hasNulls } } } - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) if useSelectionVector { batch.SetSelection(true) sel := batch.Selection() - for i := int64(0); i < coldata.BatchSize; i++ { - sel[i] = uint16(i) + for i := uint16(0); i < coldata.BatchSize(); i++ { + sel[i] = i } } source := NewRepeatableBatchSource(batch) @@ -228,7 +228,7 @@ func benchmarkSelLTInt64Int64Op(b *testing.B, useSelectionVector bool, hasNulls } plusOp.Init() - b.SetBytes(int64(8 * coldata.BatchSize * 2)) + b.SetBytes(int64(8 * coldata.BatchSize() * 2)) b.ResetTimer() for i := 0; i < b.N; i++ { plusOp.Next(ctx) diff --git a/pkg/sql/colexec/sort.go b/pkg/sql/colexec/sort.go index efaafb8e0dd1..51274c283baa 100644 --- a/pkg/sql/colexec/sort.go +++ b/pkg/sql/colexec/sort.go @@ -240,7 +240,7 @@ func (p *sortOp) Next(ctx context.Context) coldata.Batch { p.state = sortEmitting fallthrough case sortEmitting: - newEmitted := p.emitted + uint64(coldata.BatchSize) + newEmitted := p.emitted + uint64(coldata.BatchSize()) if newEmitted > p.input.getNumTuples() { newEmitted = p.input.getNumTuples() } diff --git a/pkg/sql/colexec/sort_chunks.go b/pkg/sql/colexec/sort_chunks.go index cb1e60921cd7..ab067e265344 100644 --- a/pkg/sql/colexec/sort_chunks.go +++ b/pkg/sql/colexec/sort_chunks.go @@ -185,7 +185,7 @@ type chunker struct { // buffered indicates the number of currently buffered tuples. buffered uint64 // bufferedColumns is a buffer to store tuples when a chunk is bigger than - // col.BatchSize or when the chunk is the last in the last read batch (we + // col.BatchSize() or when the chunk is the last in the last read batch (we // don't know yet where the end of such chunk is). bufferedColumns []coldata.Vec @@ -222,7 +222,7 @@ func (s *chunker) init() { for i := 0; i < len(s.inputTypes); i++ { s.bufferedColumns[i] = coldata.NewMemColumn(s.inputTypes[i], 0) } - s.partitionCol = make([]bool, coldata.BatchSize) + s.partitionCol = make([]bool, coldata.BatchSize()) s.chunks = make([]uint64, 0, 16) } diff --git a/pkg/sql/colexec/sort_chunks_test.go b/pkg/sql/colexec/sort_chunks_test.go index 49c2302fb81b..2553f932a385 100644 --- a/pkg/sql/colexec/sort_chunks_test.go +++ b/pkg/sql/colexec/sort_chunks_test.go @@ -261,17 +261,17 @@ func BenchmarkSortChunks(b *testing.B) { } b.Run( fmt.Sprintf("%s/rows=%d/cols=%d/matchLen=%d/avgChunkSize=%d", - sorterNames[sorterIdx], nBatches*coldata.BatchSize, nCols, matchLen, avgChunkSize), + sorterNames[sorterIdx], nBatches*int(coldata.BatchSize()), nCols, matchLen, avgChunkSize), func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * coldata.BatchSize (rows / + // 8 (bytes / int64) * nBatches (number of batches) * coldata.BatchSize() (rows / // batch) * nCols (number of columns / row). - b.SetBytes(int64(8 * nBatches * coldata.BatchSize * nCols)) + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols)) typs := make([]coltypes.T, nCols) for i := range typs { typs[i] = coltypes.Int64 } batch := coldata.NewMemBatch(typs) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) ordCols := make([]execinfrapb.Ordering_Column, nCols) for i := range ordCols { ordCols[i].ColIdx = uint32(i) @@ -283,7 +283,7 @@ func BenchmarkSortChunks(b *testing.B) { col := batch.ColVec(i).Int64() col[0] = 0 - for j := 1; j < coldata.BatchSize; j++ { + for j := 1; j < int(coldata.BatchSize()); j++ { if i < matchLen { col[j] = col[j-1] if rng.Float64() < 1.0/float64(avgChunkSize) { @@ -294,7 +294,7 @@ func BenchmarkSortChunks(b *testing.B) { } } } - rowsTotal := nBatches * coldata.BatchSize + rowsTotal := nBatches * int(coldata.BatchSize()) b.ResetTimer() for n := 0; n < b.N; n++ { source := newFiniteChunksSource(batch, nBatches, matchLen) diff --git a/pkg/sql/colexec/sort_test.go b/pkg/sql/colexec/sort_test.go index 15e2b6eaf07f..3ae6e0a22f10 100644 --- a/pkg/sql/colexec/sort_test.go +++ b/pkg/sql/colexec/sort_test.go @@ -270,24 +270,24 @@ func BenchmarkSort(b *testing.B) { for _, nBatches := range []int{1 << 1, 1 << 4, 1 << 8} { for _, nCols := range []int{1, 2, 4} { for _, topK := range []bool{false, true} { - name := fmt.Sprintf("rows=%d/cols=%d/topK=%t", nBatches*int(coldata.BatchSize), nCols, topK) + name := fmt.Sprintf("rows=%d/cols=%d/topK=%t", nBatches*int(coldata.BatchSize()), nCols, topK) b.Run(name, func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * coldata.BatchSize (rows / + // 8 (bytes / int64) * nBatches (number of batches) * coldata.BatchSize() (rows / // batch) * nCols (number of columns / row). - b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize) * nCols)) + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols)) typs := make([]coltypes.T, nCols) for i := range typs { typs[i] = coltypes.Int64 } batch := coldata.NewMemBatch(typs) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) ordCols := make([]execinfrapb.Ordering_Column, nCols) for i := range ordCols { ordCols[i].ColIdx = uint32(i) ordCols[i].Direction = execinfrapb.Ordering_Column_Direction(rng.Int() % 2) col := batch.ColVec(i).Int64() - for j := 0; j < coldata.BatchSize; j++ { + for j := 0; j < int(coldata.BatchSize()); j++ { col[j] = rng.Int63() % int64((i*1024)+1) } } @@ -327,19 +327,19 @@ func BenchmarkAllSpooler(b *testing.B) { for _, nBatches := range []int{1 << 1, 1 << 4, 1 << 8} { for _, nCols := range []int{1, 2, 4} { - b.Run(fmt.Sprintf("rows=%d/cols=%d", nBatches*coldata.BatchSize, nCols), func(b *testing.B) { - // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize (rows / + b.Run(fmt.Sprintf("rows=%d/cols=%d", nBatches*int(coldata.BatchSize()), nCols), func(b *testing.B) { + // 8 (bytes / int64) * nBatches (number of batches) * col.BatchSize() (rows / // batch) * nCols (number of columns / row). - b.SetBytes(int64(8 * nBatches * coldata.BatchSize * nCols)) + b.SetBytes(int64(8 * nBatches * int(coldata.BatchSize()) * nCols)) typs := make([]coltypes.T, nCols) for i := range typs { typs[i] = coltypes.Int64 } batch := coldata.NewMemBatch(typs) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) for i := 0; i < nCols; i++ { col := batch.ColVec(i).Int64() - for j := 0; j < coldata.BatchSize; j++ { + for j := 0; j < int(coldata.BatchSize()); j++ { col[j] = rng.Int63() % int64((i*1024)+1) } } diff --git a/pkg/sql/colexec/sorttopk.go b/pkg/sql/colexec/sorttopk.go index adfee9f699f4..b83031455c6b 100644 --- a/pkg/sql/colexec/sorttopk.go +++ b/pkg/sql/colexec/sorttopk.go @@ -87,7 +87,7 @@ func (t *topKSorter) Init() { typ := t.inputTypes[i] t.comparators[i] = GetVecComparator(typ, 2) } - t.output = coldata.NewMemBatchWithSize(t.inputTypes, coldata.BatchSize) + t.output = coldata.NewMemBatchWithSize(t.inputTypes, int(coldata.BatchSize())) } func (t *topKSorter) Next(ctx context.Context) coldata.Batch { @@ -196,8 +196,8 @@ func (t *topKSorter) emit() coldata.Batch { t.output.SetLength(0) return t.output } - if toEmit > coldata.BatchSize { - toEmit = coldata.BatchSize + if toEmit > coldata.BatchSize() { + toEmit = coldata.BatchSize() } for i := range t.inputTypes { vec := t.output.ColVec(i) diff --git a/pkg/sql/colexec/stats_test.go b/pkg/sql/colexec/stats_test.go index f234ce4aebb4..8bb6ddb5bbf0 100644 --- a/pkg/sql/colexec/stats_test.go +++ b/pkg/sql/colexec/stats_test.go @@ -26,7 +26,7 @@ import ( // TestNumBatches is a unit test for NumBatches field of VectorizedStats. func TestNumBatches(t *testing.T) { nBatches := 10 - noop := NewNoop(makeFiniteChunksSourceWithBatchSize(nBatches, coldata.BatchSize)) + noop := NewNoop(makeFiniteChunksSourceWithBatchSize(nBatches, int(coldata.BatchSize()))) vsc := NewVectorizedStatsCollector(noop, 0 /* id */, true /* isStall */, timeutil.NewStopWatch()) vsc.Init() for { @@ -65,14 +65,14 @@ func TestVectorizedStatsCollector(t *testing.T) { mjInputWatch := timeutil.NewTestStopWatch(timeSource.Now) leftSource := &timeAdvancingOperator{ - OneInputNode: NewOneInputNode(makeFiniteChunksSourceWithBatchSize(nBatches, coldata.BatchSize)), + OneInputNode: NewOneInputNode(makeFiniteChunksSourceWithBatchSize(nBatches, int(coldata.BatchSize()))), timeSource: timeSource, } leftInput := NewVectorizedStatsCollector(leftSource, 0 /* id */, true /* isStall */, timeutil.NewTestStopWatch(timeSource.Now)) leftInput.SetOutputWatch(mjInputWatch) rightSource := &timeAdvancingOperator{ - OneInputNode: NewOneInputNode(makeFiniteChunksSourceWithBatchSize(nBatches, coldata.BatchSize)), + OneInputNode: NewOneInputNode(makeFiniteChunksSourceWithBatchSize(nBatches, int(coldata.BatchSize()))), timeSource: timeSource, } rightInput := NewVectorizedStatsCollector(rightSource, 1 /* id */, true /* isStall */, timeutil.NewTestStopWatch(timeSource.Now)) @@ -101,7 +101,7 @@ func TestVectorizedStatsCollector(t *testing.T) { mjStatsCollector := NewVectorizedStatsCollector(timeAdvancingMergeJoiner, 2 /* id */, false /* isStall */, mjInputWatch) // The inputs are identical, so the merge joiner should output nBatches - // batches with each having coldata.BatchSize tuples. + // batches with each having coldata.BatchSize() tuples. mjStatsCollector.Init() batchCount := 0 for { @@ -109,14 +109,14 @@ func TestVectorizedStatsCollector(t *testing.T) { if b.Length() == 0 { break } - require.Equal(t, coldata.BatchSize, int(b.Length())) + require.Equal(t, coldata.BatchSize(), b.Length()) batchCount++ } mjStatsCollector.FinalizeStats() require.Equal(t, nBatches, batchCount) require.Equal(t, nBatches, int(mjStatsCollector.NumBatches)) - require.Equal(t, nBatches*coldata.BatchSize, int(mjStatsCollector.NumTuples)) + require.Equal(t, nBatches*int(coldata.BatchSize()), int(mjStatsCollector.NumTuples)) // Two inputs are advancing the time source for a total of 2 * nBatches // advances, but these do not count towards merge joiner execution time. // Merge joiner advances the time on its every non-empty batch totaling diff --git a/pkg/sql/colexec/unorderedsynchronizer_test.go b/pkg/sql/colexec/unorderedsynchronizer_test.go index 7388d3c51a47..a04b6a4d4a2e 100644 --- a/pkg/sql/colexec/unorderedsynchronizer_test.go +++ b/pkg/sql/colexec/unorderedsynchronizer_test.go @@ -43,7 +43,7 @@ func TestUnorderedSynchronizer(t *testing.T) { inputs := make([]Operator, numInputs) for i := range inputs { - source := NewRepeatableBatchSource(RandomBatch(rng, typs, coldata.BatchSize, 0 /* length */, rng.Float64())) + source := NewRepeatableBatchSource(RandomBatch(rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64())) source.ResetBatchesToReturn(numBatches) inputs[i] = source } @@ -130,14 +130,14 @@ func BenchmarkUnorderedSynchronizer(b *testing.B) { typs := []coltypes.T{coltypes.Int64} inputs := make([]Operator, numInputs) for i := range inputs { - batch := coldata.NewMemBatchWithSize(typs, coldata.BatchSize) - batch.SetLength(coldata.BatchSize) + batch := coldata.NewMemBatchWithSize(typs, int(coldata.BatchSize())) + batch.SetLength(coldata.BatchSize()) inputs[i] = NewRepeatableBatchSource(batch) } var wg sync.WaitGroup ctx, cancelFn := context.WithCancel(context.Background()) s := NewUnorderedSynchronizer(inputs, typs, &wg) - b.SetBytes(8 * coldata.BatchSize) + b.SetBytes(8 * int64(coldata.BatchSize())) b.ResetTimer() for i := 0; i < b.N; i++ { s.Next(ctx) diff --git a/pkg/sql/colexec/utils_test.go b/pkg/sql/colexec/utils_test.go index 39724b52a45c..136897449e28 100644 --- a/pkg/sql/colexec/utils_test.go +++ b/pkg/sql/colexec/utils_test.go @@ -305,7 +305,7 @@ func runTestsWithFn( ) { rng, _ := randutil.NewPseudoRand() - for _, batchSize := range []uint16{1, 2, 3, 16, 1024} { + for _, batchSize := range []uint16{1, uint16(math.Trunc(.002 * float64(coldata.BatchSize()))), uint16(math.Trunc(.003 * float64(coldata.BatchSize()))), uint16(math.Trunc(.016 * float64(coldata.BatchSize()))), coldata.BatchSize()} { for _, useSel := range []bool{false, true} { t.Run(fmt.Sprintf("batchSize=%d/sel=%t", batchSize, useSel), func(t *testing.T) { inputSources := make([]Operator, len(tups)) @@ -445,7 +445,7 @@ func (s *opTestInput) Init() { } s.batch = coldata.NewMemBatch(s.typs) - s.selection = make([]uint16, coldata.BatchSize) + s.selection = make([]uint16, coldata.BatchSize()) for i := range s.selection { s.selection[i] = uint16(i) } @@ -499,7 +499,7 @@ func (s *opTestInput) Next(context.Context) coldata.Batch { // than the max batch size, so the test will panic if this part of the slice // is accidentally accessed. for i := range s.selection[batchSize:] { - s.selection[int(batchSize)+i] = coldata.BatchSize + 1 + s.selection[int(batchSize)+i] = coldata.BatchSize() + 1 } s.batch.SetSelection(true) @@ -911,7 +911,7 @@ type randomLengthBatchSource struct { var _ Operator = &randomLengthBatchSource{} // newRandomLengthBatchSource returns a new Operator initialized to return a -// batch of random length between [1, col.BatchSize) forever. +// batch of random length between [1, col.BatchSize()) forever. func newRandomLengthBatchSource(batch coldata.Batch) *randomLengthBatchSource { return &randomLengthBatchSource{ internalBatch: batch, @@ -923,7 +923,7 @@ func (r *randomLengthBatchSource) Init() { } func (r *randomLengthBatchSource) Next(context.Context) coldata.Batch { - r.internalBatch.SetLength(uint16(randutil.RandIntInRange(r.rng, 1, int(coldata.BatchSize)))) + r.internalBatch.SetLength(uint16(randutil.RandIntInRange(r.rng, 1, int(coldata.BatchSize())))) return r.internalBatch } @@ -1067,7 +1067,7 @@ func TestRepeatableBatchSourceWithFixedSel(t *testing.T) { } // chunkingBatchSource is a batch source that takes unlimited-size columns and -// chunks them into BatchSize-sized chunks when Nexted. +// chunks them into BatchSize()-sized chunks when Nexted. type chunkingBatchSource struct { ZeroInputNode typs []coltypes.T @@ -1104,7 +1104,7 @@ func (c *chunkingBatchSource) Next(context.Context) coldata.Batch { if c.curIdx >= c.len { c.batch.SetLength(0) } - lastIdx := c.curIdx + coldata.BatchSize + lastIdx := c.curIdx + uint64(coldata.BatchSize()) if lastIdx > c.len { lastIdx = c.len } diff --git a/pkg/sql/colexec/zerocolumns_tmpl.go b/pkg/sql/colexec/zerocolumns_tmpl.go index c7250b067301..71c1c9310c72 100644 --- a/pkg/sql/colexec/zerocolumns_tmpl.go +++ b/pkg/sql/colexec/zerocolumns_tmpl.go @@ -33,6 +33,6 @@ var _ apd.Decimal // */}} // {{range .}} -var zero_TYPEColumn = make([]_GOTYPE, coldata.BatchSize) +var zero_TYPEColumn = make([]_GOTYPE, coldata.BatchSize()) // {{end}} diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 93f6be893a3a..4b8a849263e0 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -501,7 +501,7 @@ func BenchmarkOutboxInbox(b *testing.B) { typs := []coltypes.T{coltypes.Int64} batch := coldata.NewMemBatch(typs) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) input := colexec.NewRepeatableBatchSource(batch) @@ -520,7 +520,7 @@ func BenchmarkOutboxInbox(b *testing.B) { streamHandlerErrCh := handleStream(serverStream.Context(), inbox, serverStream, func() { close(serverStreamNotification.Donec) }) - b.SetBytes(8 * coldata.BatchSize) + b.SetBytes(8 * int64(coldata.BatchSize())) b.ResetTimer() for i := 0; i < b.N; i++ { inbox.Next(ctx) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index bc780e1077de..948c859bb9f9 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -148,7 +148,7 @@ func NewInbox(typs []coltypes.T, streamID execinfrapb.StreamID) (*Inbox, error) // EstimateStaticMemoryUsage implements the StaticMemoryOperator interface. func (i *Inbox) EstimateStaticMemoryUsage() int { - return colexec.EstimateBatchSizeBytes(i.typs, coldata.BatchSize) + return colexec.EstimateBatchSizeBytes(i.typs, int(coldata.BatchSize())) } // maybeInitLocked calls Inbox.initLocked if the inbox is not initialized and diff --git a/pkg/sql/colflow/colrpc/inbox_test.go b/pkg/sql/colflow/colrpc/inbox_test.go index 462bbe88169c..ff4dda4b2c45 100644 --- a/pkg/sql/colflow/colrpc/inbox_test.go +++ b/pkg/sql/colflow/colrpc/inbox_test.go @@ -199,7 +199,7 @@ func TestInboxShutdown(t *testing.T) { nextSleep = time.Millisecond * time.Duration(rng.Intn(10)) runWithStreamSleep = time.Millisecond * time.Duration(rng.Intn(10)) typs = []coltypes.T{coltypes.Int64} - batch = colexec.RandomBatch(rng, typs, coldata.BatchSize, 0 /* length */, rng.Float64()) + batch = colexec.RandomBatch(rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64()) ) for _, runDrainMetaGoroutine := range []bool{false, true} { diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 3f3ffcfe6b2b..e69ab79a5804 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -217,7 +217,7 @@ func TestVectorizedFlowShutdown(t *testing.T) { runOutboxInbox(ctxRemote, cancelRemote, hashRouterOutputs[i], inboxes[i], streamID, outboxMetadataSources) } else { batch := coldata.NewMemBatch(typs) - batch.SetLength(coldata.BatchSize) + batch.SetLength(coldata.BatchSize()) runOutboxInbox(ctxRemote, cancelRemote, colexec.NewRepeatableBatchSource(batch), inboxes[i], streamID, outboxMetadataSources) } streamID++