From c47fe35c49049e4443ec9e41f966e8e5e255ac62 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 25 Mar 2021 21:14:20 -0700 Subject: [PATCH] colserde: fix the edge case with nulls handling When serializing the data of Bool, Bytes, Int, and Float types when they don't have any nulls in the vector, we don't explicit specify the null bitmap. Previously, when deserializing such vectors with no nulls we would simply call `UnsetNulls` on the `coldata.Nulls` object that is currently present. However, it is possible that already present nulls object cannot support the desired batch length. This could lead to index out of bounds accesses. Note that in the vast majority of cases this likely doesn't happen in practice because we check `MaybeHasNulls`, and that would return `false` making us omit the null checking code. Release note (bug fix): Previously, CockroachDB could encounter an internal error in rare circumstances when executing queries via the vectorized engine that operate on columns of BOOL, BYTES, INT, and FLOAT types that have a mix of NULL and non-NULL values. --- pkg/col/coldata/nulls.go | 6 ++ pkg/col/coldata/vec.eg.go | 3 + pkg/col/coldata/vec_tmpl.go | 3 + pkg/col/colserde/arrowbatchconverter.go | 9 ++- pkg/col/colserde/arrowbatchconverter_test.go | 67 ++++++++++++-------- pkg/col/colserde/record_batch_test.go | 16 ++--- 6 files changed, 67 insertions(+), 37 deletions(-) diff --git a/pkg/col/coldata/nulls.go b/pkg/col/coldata/nulls.go index 826e49ad36b4..557a8afe8edf 100644 --- a/pkg/col/coldata/nulls.go +++ b/pkg/col/coldata/nulls.go @@ -311,6 +311,12 @@ func (n *Nulls) Slice(start int, end int) Nulls { return s } +// MaxNumElements returns the maximum number of elements that this Nulls can +// accommodate. +func (n *Nulls) MaxNumElements() int { + return len(n.nulls) * 8 +} + // NullBitmap returns the null bitmap. func (n *Nulls) NullBitmap() []byte { return n.nulls diff --git a/pkg/col/coldata/vec.eg.go b/pkg/col/coldata/vec.eg.go index 3285856e050b..db0e3d774ee2 100644 --- a/pkg/col/coldata/vec.eg.go +++ b/pkg/col/coldata/vec.eg.go @@ -1172,6 +1172,9 @@ func SetValueAt(v Vec, elem interface{}, rowIdx int) { // GetValueAt is an inefficient helper to get the value in a Vec when the type // is unknown. func GetValueAt(v Vec, rowIdx int) interface{} { + if v.Nulls().NullAt(rowIdx) { + return nil + } t := v.Type() switch v.CanonicalTypeFamily() { case types.BoolFamily: diff --git a/pkg/col/coldata/vec_tmpl.go b/pkg/col/coldata/vec_tmpl.go index 179ac44f630a..044404c2b836 100644 --- a/pkg/col/coldata/vec_tmpl.go +++ b/pkg/col/coldata/vec_tmpl.go @@ -267,6 +267,9 @@ func SetValueAt(v Vec, elem interface{}, rowIdx int) { // GetValueAt is an inefficient helper to get the value in a Vec when the type // is unknown. func GetValueAt(v Vec, rowIdx int) interface{} { + if v.Nulls().NullAt(rowIdx) { + return nil + } t := v.Type() switch v.CanonicalTypeFamily() { // {{range .}} diff --git a/pkg/col/colserde/arrowbatchconverter.go b/pkg/col/colserde/arrowbatchconverter.go index ffb227697074..5d763c2b4aec 100644 --- a/pkg/col/colserde/arrowbatchconverter.go +++ b/pkg/col/colserde/arrowbatchconverter.go @@ -483,6 +483,13 @@ func handleNulls(arr array.Interface, vec coldata.Vec, batchLength int) { // For types with the canonical type family of Bool, Bytes, Int, or // Float, when there are no nulls, we have a null bitmap with zero // length. - vec.Nulls().UnsetNulls() + if vec.Nulls().MaxNumElements() < batchLength { + // The current null bitmap doesn't have enough space, so we need to + // allocate a new one. + nulls := coldata.NewNulls(batchLength) + vec.SetNulls(&nulls) + } else { + vec.Nulls().UnsetNulls() + } } } diff --git a/pkg/col/colserde/arrowbatchconverter_test.go b/pkg/col/colserde/arrowbatchconverter_test.go index 69faf977462f..9f6a343275fe 100644 --- a/pkg/col/colserde/arrowbatchconverter_test.go +++ b/pkg/col/colserde/arrowbatchconverter_test.go @@ -61,35 +61,27 @@ func TestArrowBatchConverterRandom(t *testing.T) { coldata.AssertEquivalentBatches(t, expected, actual) } -// roundTripBatch is a helper function that round trips a batch through the -// ArrowBatchConverter and RecordBatchSerializer. Make sure to copy the input -// batch before passing it to this function to assert equality. +// roundTripBatch is a helper function that pushes the source batch through the +// ArrowBatchConverter and RecordBatchSerializer. The result is written to dest. func roundTripBatch( - b coldata.Batch, - c *colserde.ArrowBatchConverter, - r *colserde.RecordBatchSerializer, - typs []*types.T, -) (coldata.Batch, error) { + src, dest coldata.Batch, c *colserde.ArrowBatchConverter, r *colserde.RecordBatchSerializer, +) error { var buf bytes.Buffer - arrowDataIn, err := c.BatchToArrow(b) + arrowDataIn, err := c.BatchToArrow(src) if err != nil { - return nil, err + return err } - _, _, err = r.Serialize(&buf, arrowDataIn, b.Length()) + _, _, err = r.Serialize(&buf, arrowDataIn, src.Length()) if err != nil { - return nil, err + return err } var arrowDataOut []*array.Data batchLength, err := r.Deserialize(&arrowDataOut, buf.Bytes()) if err != nil { - return nil, err + return err } - actual := testAllocator.NewMemBatchWithFixedCapacity(typs, batchLength) - if err := c.ArrowToBatch(arrowDataOut, batchLength, actual); err != nil { - return nil, err - } - return actual, nil + return c.ArrowToBatch(arrowDataOut, batchLength, dest) } func TestRecordBatchRoundtripThroughBytes(t *testing.T) { @@ -98,26 +90,45 @@ func TestRecordBatchRoundtripThroughBytes(t *testing.T) { rng, _ := randutil.NewPseudoRand() for run := 0; run < 10; run++ { var typs []*types.T - var b coldata.Batch + var src coldata.Batch if rng.Float64() < 0.1 { // In 10% of cases we'll use a zero length schema. - b = testAllocator.NewMemBatchWithFixedCapacity(typs, rng.Intn(coldata.BatchSize())+1) - b.SetLength(b.Capacity()) + src = testAllocator.NewMemBatchWithFixedCapacity(typs, rng.Intn(coldata.BatchSize())+1) + src.SetLength(src.Capacity()) } else { - typs, b = randomBatch(testAllocator) + typs, src = randomBatch(testAllocator) } + dest := testAllocator.NewMemBatchWithMaxCapacity(typs) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) r, err := colserde.NewRecordBatchSerializer(typs) require.NoError(t, err) - // Make a copy of the original batch because the converter modifies and - // casts data without copying for performance reasons. - expected := coldatatestutils.CopyBatch(b, typs, testColumnFactory) - actual, err := roundTripBatch(b, c, r, typs) - require.NoError(t, err) + // Reuse the same destination batch as well as the ArrowBatchConverter + // and RecordBatchSerializer in order to simulate how these things are + // used in the production setting. + for i := 0; i < 10; i++ { + require.NoError(t, roundTripBatch(src, dest, c, r)) + + coldata.AssertEquivalentBatches(t, src, dest) + // Check that we can actually read each tuple from the destination + // batch. + for _, vec := range dest.ColVecs() { + for tupleIdx := 0; tupleIdx < dest.Length(); tupleIdx++ { + coldata.GetValueAt(vec, tupleIdx) + } + } - coldata.AssertEquivalentBatches(t, expected, actual) + // Generate the new source batch. + nullProbability := rng.Float64() + if rng.Float64() < 0.1 { + // In some cases, make sure that there are no nulls at all. + nullProbability = 0 + } + capacity := rng.Intn(coldata.BatchSize()) + 1 + length := rng.Intn(capacity) + src = coldatatestutils.RandomBatch(testAllocator, rng, typs, capacity, length, nullProbability) + } } } diff --git a/pkg/col/colserde/record_batch_test.go b/pkg/col/colserde/record_batch_test.go index f66c2b04a90c..3a9091ef44ea 100644 --- a/pkg/col/colserde/record_batch_test.go +++ b/pkg/col/colserde/record_batch_test.go @@ -302,8 +302,9 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) { rng, _ := randutil.NewPseudoRand() typs := []*types.T{types.Bytes} - b := testAllocator.NewMemBatchWithFixedCapacity(typs, coldata.BatchSize()) - bytesVec := b.ColVec(0).Bytes() + src := testAllocator.NewMemBatchWithMaxCapacity(typs) + dest := testAllocator.NewMemBatchWithMaxCapacity(typs) + bytesVec := src.ColVec(0).Bytes() maxValueLen := coldata.BytesInitialAllocationFactor * 8 value := make([]byte, maxValueLen) for i := 0; i < coldata.BatchSize(); i++ { @@ -312,17 +313,16 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) { require.NoError(t, err) bytesVec.Set(i, value) } - b.SetLength(coldata.BatchSize()) - - originalMemorySize := colmem.GetBatchMemSize(b) + src.SetLength(coldata.BatchSize()) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) r, err := colserde.NewRecordBatchSerializer(typs) require.NoError(t, err) - b, err = roundTripBatch(b, c, r, typs) - require.NoError(t, err) - newMemorySize := colmem.GetBatchMemSize(b) + require.NoError(t, roundTripBatch(src, dest, c, r)) + + originalMemorySize := colmem.GetBatchMemSize(src) + newMemorySize := colmem.GetBatchMemSize(dest) // We expect that the original and the new memory sizes are relatively close // to each other (do not differ by more than a third). We cannot guarantee