Skip to content

Commit

Permalink
colserde: fix the edge case with nulls handling
Browse files Browse the repository at this point in the history
When serializing the data of Bool, Bytes, Int, and Float types when they
don't have any nulls in the vector, we don't explicit specify the null
bitmap. Previously, when deserializing such vectors with no nulls we
would simply call `UnsetNulls` on the `coldata.Nulls` object that is
currently present. However, it is possible that already present nulls
object cannot support the desired batch length. This could lead to index
out of bounds accesses. Note that in the vast majority of cases this
likely doesn't happen in practice because we check `MaybeHasNulls`, and
that would return `false` making us omit the null checking code.

Release note (bug fix): Previously, CockroachDB could encounter an
internal error in rare circumstances when executing queries via the
vectorized engine that operate on columns of BOOL, BYTES, INT, and FLOAT
types that have a mix of NULL and non-NULL values.
  • Loading branch information
yuzefovich committed Mar 26, 2021
1 parent 1c9289e commit d2b3d79
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 59 deletions.
6 changes: 6 additions & 0 deletions pkg/col/coldata/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ func (n *Nulls) Slice(start int, end int) Nulls {
return s
}

// MaxNumElements returns the maximum number of elements that this Nulls can
// accommodate.
func (n *Nulls) MaxNumElements() int {
return len(n.nulls) * 8
}

// NullBitmap returns the null bitmap.
func (n *Nulls) NullBitmap() []byte {
return n.nulls
Expand Down
3 changes: 3 additions & 0 deletions pkg/col/coldata/vec.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/col/coldata/vec_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ func SetValueAt(v Vec, elem interface{}, rowIdx int) {
// GetValueAt is an inefficient helper to get the value in a Vec when the type
// is unknown.
func GetValueAt(v Vec, rowIdx int) interface{} {
if v.Nulls().NullAt(rowIdx) {
return nil
}
t := v.Type()
switch v.CanonicalTypeFamily() {
// {{range .}}
Expand Down
18 changes: 8 additions & 10 deletions pkg/col/coldatatestutils/random_testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,16 @@ func (o *RandomDataOp) Next(context.Context) coldata.Batch {
selProbability float64
nullProbability float64
)
if o.selection {
selProbability = o.rng.Float64()
}
if o.nulls && o.rng.Float64() > 0.1 {
// Even if nulls are desired, in 10% of cases create a batch with no
// nulls at all.
nullProbability = o.rng.Float64()
}
for {
if o.selection {
selProbability = o.rng.Float64()
}
if o.nulls {
nullProbability = o.rng.Float64()
}

b := RandomBatchWithSel(o.allocator, o.rng, o.typs, o.batchSize, nullProbability, selProbability)
if !o.selection {
b.SetSelection(false)
}
if b.Length() == 0 {
// Don't return a zero-length batch until we return o.numBatches batches.
continue
Expand Down
9 changes: 8 additions & 1 deletion pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,13 @@ func handleNulls(arr array.Interface, vec coldata.Vec, batchLength int) {
// For types with the canonical type family of Bool, Bytes, Int, or
// Float, when there are no nulls, we have a null bitmap with zero
// length.
vec.Nulls().UnsetNulls()
if vec.Nulls().MaxNumElements() < batchLength {
// The current null bitmap doesn't have enough space, so we need to
// allocate a new one.
nulls := coldata.NewNulls(batchLength)
vec.SetNulls(&nulls)
} else {
vec.Nulls().UnsetNulls()
}
}
}
67 changes: 39 additions & 28 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,35 +61,27 @@ func TestArrowBatchConverterRandom(t *testing.T) {
coldata.AssertEquivalentBatches(t, expected, actual)
}

// roundTripBatch is a helper function that round trips a batch through the
// ArrowBatchConverter and RecordBatchSerializer. Make sure to copy the input
// batch before passing it to this function to assert equality.
// roundTripBatch is a helper function that pushes the source batch through the
// ArrowBatchConverter and RecordBatchSerializer. The result is written to dest.
func roundTripBatch(
b coldata.Batch,
c *colserde.ArrowBatchConverter,
r *colserde.RecordBatchSerializer,
typs []*types.T,
) (coldata.Batch, error) {
src, dest coldata.Batch, c *colserde.ArrowBatchConverter, r *colserde.RecordBatchSerializer,
) error {
var buf bytes.Buffer
arrowDataIn, err := c.BatchToArrow(b)
arrowDataIn, err := c.BatchToArrow(src)
if err != nil {
return nil, err
return err
}
_, _, err = r.Serialize(&buf, arrowDataIn, b.Length())
_, _, err = r.Serialize(&buf, arrowDataIn, src.Length())
if err != nil {
return nil, err
return err
}

var arrowDataOut []*array.Data
batchLength, err := r.Deserialize(&arrowDataOut, buf.Bytes())
if err != nil {
return nil, err
return err
}
actual := testAllocator.NewMemBatchWithFixedCapacity(typs, batchLength)
if err := c.ArrowToBatch(arrowDataOut, batchLength, actual); err != nil {
return nil, err
}
return actual, nil
return c.ArrowToBatch(arrowDataOut, batchLength, dest)
}

func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
Expand All @@ -98,26 +90,45 @@ func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
rng, _ := randutil.NewPseudoRand()
for run := 0; run < 10; run++ {
var typs []*types.T
var b coldata.Batch
var src coldata.Batch
if rng.Float64() < 0.1 {
// In 10% of cases we'll use a zero length schema.
b = testAllocator.NewMemBatchWithFixedCapacity(typs, rng.Intn(coldata.BatchSize())+1)
b.SetLength(b.Capacity())
src = testAllocator.NewMemBatchWithFixedCapacity(typs, rng.Intn(coldata.BatchSize())+1)
src.SetLength(src.Capacity())
} else {
typs, b = randomBatch(testAllocator)
typs, src = randomBatch(testAllocator)
}
dest := testAllocator.NewMemBatchWithMaxCapacity(typs)
c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)

// Make a copy of the original batch because the converter modifies and
// casts data without copying for performance reasons.
expected := coldatatestutils.CopyBatch(b, typs, testColumnFactory)
actual, err := roundTripBatch(b, c, r, typs)
require.NoError(t, err)
// Reuse the same destination batch as well as the ArrowBatchConverter
// and RecordBatchSerializer in order to simulate how these things are
// used in the production setting.
for i := 0; i < 10; i++ {
require.NoError(t, roundTripBatch(src, dest, c, r))

coldata.AssertEquivalentBatches(t, src, dest)
// Check that we can actually read each tuple from the destination
// batch.
for _, vec := range dest.ColVecs() {
for tupleIdx := 0; tupleIdx < dest.Length(); tupleIdx++ {
coldata.GetValueAt(vec, tupleIdx)
}
}

coldata.AssertEquivalentBatches(t, expected, actual)
// Generate the new source batch.
nullProbability := rng.Float64()
if rng.Float64() < 0.1 {
// In some cases, make sure that there are no nulls at all.
nullProbability = 0
}
capacity := rng.Intn(coldata.BatchSize()) + 1
length := rng.Intn(capacity)
src = coldatatestutils.RandomBatch(testAllocator, rng, typs, capacity, length, nullProbability)
}
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) {
rng, _ := randutil.NewPseudoRand()

typs := []*types.T{types.Bytes}
b := testAllocator.NewMemBatchWithFixedCapacity(typs, coldata.BatchSize())
bytesVec := b.ColVec(0).Bytes()
src := testAllocator.NewMemBatchWithMaxCapacity(typs)
dest := testAllocator.NewMemBatchWithMaxCapacity(typs)
bytesVec := src.ColVec(0).Bytes()
maxValueLen := coldata.BytesInitialAllocationFactor * 8
value := make([]byte, maxValueLen)
for i := 0; i < coldata.BatchSize(); i++ {
Expand All @@ -312,17 +313,16 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) {
require.NoError(t, err)
bytesVec.Set(i, value)
}
b.SetLength(coldata.BatchSize())

originalMemorySize := colmem.GetBatchMemSize(b)
src.SetLength(coldata.BatchSize())

c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)
b, err = roundTripBatch(b, c, r, typs)
require.NoError(t, err)
newMemorySize := colmem.GetBatchMemSize(b)
require.NoError(t, roundTripBatch(src, dest, c, r))

originalMemorySize := colmem.GetBatchMemSize(src)
newMemorySize := colmem.GetBatchMemSize(dest)

// We expect that the original and the new memory sizes are relatively close
// to each other (do not differ by more than a third). We cannot guarantee
Expand Down
25 changes: 13 additions & 12 deletions pkg/sql/colcontainer/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,22 @@ func TestDiskQueue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(directories))

// Run verification.
// Run verification. We reuse the same batch to dequeue into
// since that is the common pattern.
dest := coldata.NewMemBatch(typs, testColumnFactory)
for {
b := op.Next(ctx)
require.NoError(t, q.Enqueue(ctx, b))
if b.Length() == 0 {
src := op.Next(ctx)
require.NoError(t, q.Enqueue(ctx, src))
if src.Length() == 0 {
break
}
if rng.Float64() < dequeuedProbabilityBeforeAllEnqueuesAreDone {
if ok, err := q.Dequeue(ctx, b); !ok {
if ok, err := q.Dequeue(ctx, dest); !ok {
t.Fatal("queue incorrectly considered empty")
} else if err != nil {
t.Fatal(err)
}
coldata.AssertEquivalentBatches(t, batches[0], b)
coldata.AssertEquivalentBatches(t, batches[0], dest)
batches = batches[1:]
}
}
Expand All @@ -127,25 +129,24 @@ func TestDiskQueue(t *testing.T) {
}
for i := 0; i < numReadIterations; i++ {
batchIdx := 0
b := coldata.NewMemBatch(typs, testColumnFactory)
for batchIdx < len(batches) {
if ok, err := q.Dequeue(ctx, b); !ok {
if ok, err := q.Dequeue(ctx, dest); !ok {
t.Fatal("queue incorrectly considered empty")
} else if err != nil {
t.Fatal(err)
}
coldata.AssertEquivalentBatches(t, batches[batchIdx], b)
coldata.AssertEquivalentBatches(t, batches[batchIdx], dest)
batchIdx++
}

if testReuseCache {
// Trying to Enqueue after a Dequeue should return an error in these
// CacheModes.
require.Error(t, q.Enqueue(ctx, b))
require.Error(t, q.Enqueue(ctx, dest))
}

if ok, err := q.Dequeue(ctx, b); ok {
if b.Length() != 0 {
if ok, err := q.Dequeue(ctx, dest); ok {
if dest.Length() != 0 {
t.Fatal("queue should be empty")
}
} else if err != nil {
Expand Down

0 comments on commit d2b3d79

Please sign in to comment.