Skip to content

Commit

Permalink
Merge pull request cockroachdb#62916 from yuzefovich/backport20.2-62642
Browse files Browse the repository at this point in the history
release-20.2: colserde: fix the edge case with nulls handling
  • Loading branch information
yuzefovich authored Apr 1, 2021
2 parents aa2d3bb + d202caf commit eab7ded
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 49 deletions.
6 changes: 6 additions & 0 deletions pkg/col/coldata/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ func (n *Nulls) Slice(start int, end int) Nulls {
return s
}

// MaxNumElements returns the maximum number of elements that this Nulls can
// accommodate.
func (n *Nulls) MaxNumElements() int {
return len(n.nulls) * 8
}

// NullBitmap returns the null bitmap.
func (n *Nulls) NullBitmap() []byte {
return n.nulls
Expand Down
3 changes: 3 additions & 0 deletions pkg/col/coldata/vec.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/col/coldata/vec_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ func SetValueAt(v Vec, elem interface{}, rowIdx int) {
// GetValueAt is an inefficient helper to get the value in a Vec when the type
// is unknown.
func GetValueAt(v Vec, rowIdx int) interface{} {
if v.Nulls().NullAt(rowIdx) {
return nil
}
t := v.Type()
switch v.CanonicalTypeFamily() {
// {{range .}}
Expand Down
18 changes: 8 additions & 10 deletions pkg/col/coldatatestutils/random_testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,18 +365,16 @@ func (o *RandomDataOp) Next(context.Context) coldata.Batch {
selProbability float64
nullProbability float64
)
if o.selection {
selProbability = o.rng.Float64()
}
if o.nulls && o.rng.Float64() > 0.1 {
// Even if nulls are desired, in 10% of cases create a batch with no
// nulls at all.
nullProbability = o.rng.Float64()
}
for {
if o.selection {
selProbability = o.rng.Float64()
}
if o.nulls {
nullProbability = o.rng.Float64()
}

b := RandomBatchWithSel(o.allocator, o.rng, o.typs, o.batchSize, nullProbability, selProbability)
if !o.selection {
b.SetSelection(false)
}
if b.Length() == 0 {
// Don't return a zero-length batch until we return o.numBatches batches.
continue
Expand Down
17 changes: 16 additions & 1 deletion pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,22 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch)
if len(arrowBitmap) != 0 {
vec.Nulls().SetNullBitmap(arrowBitmap, n)
} else {
vec.Nulls().UnsetNulls()
// For types with the canonical type family of Bool, Bytes, Int, or
// Float, when there are no nulls, we have a null bitmap with zero
// length.
if vec.Nulls().MaxNumElements() < n {
// The current null bitmap doesn't have enough space, so we need
// to allocate a new one.
//
// Note that this has likely occurred because on the previous
// batch there were some nulls and we replaced the null bitmap
// with the arrowBitmap which happened to be of insufficient
// capacity for the current batch.
nulls := coldata.NewNulls(n)
vec.SetNulls(&nulls)
} else {
vec.Nulls().UnsetNulls()
}
}
b.SetSelection(false)
}
Expand Down
61 changes: 36 additions & 25 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,54 +61,65 @@ func TestArrowBatchConverterRandom(t *testing.T) {
coldata.AssertEquivalentBatches(t, expected, actual)
}

// roundTripBatch is a helper function that round trips a batch through the
// ArrowBatchConverter and RecordBatchSerializer and asserts that the output
// batch is equal to the input batch. Make sure to copy the input batch before
// passing it to this function to assert equality.
// roundTripBatch is a helper function that pushes the source batch through the
// ArrowBatchConverter and RecordBatchSerializer. The result is written to dest.
func roundTripBatch(
b coldata.Batch,
c *colserde.ArrowBatchConverter,
r *colserde.RecordBatchSerializer,
typs []*types.T,
) (coldata.Batch, error) {
src, dest coldata.Batch, c *colserde.ArrowBatchConverter, r *colserde.RecordBatchSerializer,
) error {
var buf bytes.Buffer
arrowDataIn, err := c.BatchToArrow(b)
arrowDataIn, err := c.BatchToArrow(src)
if err != nil {
return nil, err
return err
}
_, _, err = r.Serialize(&buf, arrowDataIn)
if err != nil {
return nil, err
return err
}

var arrowDataOut []*array.Data
if err := r.Deserialize(&arrowDataOut, buf.Bytes()); err != nil {
return nil, err
return err
}
actual := testAllocator.NewMemBatchWithFixedCapacity(typs, b.Length())
if err := c.ArrowToBatch(arrowDataOut, actual); err != nil {
return nil, err
}
return actual, nil
return c.ArrowToBatch(arrowDataOut, dest)
}

func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
defer leaktest.AfterTest(t)()
rng, _ := randutil.NewPseudoRand()

for run := 0; run < 10; run++ {
typs, b := randomBatch(testAllocator)
typs, src := randomBatch(testAllocator)
dest := testAllocator.NewMemBatchWithMaxCapacity(typs)
c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)

// Make a copy of the original batch because the converter modifies and
// casts data without copying for performance reasons.
expected := coldatatestutils.CopyBatch(b, typs, testColumnFactory)
actual, err := roundTripBatch(b, c, r, typs)
require.NoError(t, err)
// Reuse the same destination batch as well as the ArrowBatchConverter
// and RecordBatchSerializer in order to simulate how these things are
// used in the production setting.
for i := 0; i < 10; i++ {
require.NoError(t, roundTripBatch(src, dest, c, r))

coldata.AssertEquivalentBatches(t, src, dest)
// Check that we can actually read each tuple from the destination
// batch.
for _, vec := range dest.ColVecs() {
for tupleIdx := 0; tupleIdx < dest.Length(); tupleIdx++ {
coldata.GetValueAt(vec, tupleIdx)
}
}

coldata.AssertEquivalentBatches(t, expected, actual)
// Generate the new source batch.
nullProbability := rng.Float64()
if rng.Float64() < 0.1 {
// In some cases, make sure that there are no nulls at all.
nullProbability = 0
}
capacity := rng.Intn(coldata.BatchSize()) + 1
length := rng.Intn(capacity)
src = coldatatestutils.RandomBatch(testAllocator, rng, typs, capacity, length, nullProbability)
}
}
}

Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/colcontainer/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,22 @@ func TestDiskQueue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(directories))

// Run verification.
ctx := context.Background()
// Run verification. We reuse the same batch to dequeue into
// since that is the common pattern.
dest := coldata.NewMemBatch(typs, testColumnFactory)
for {
b := op.Next(ctx)
require.NoError(t, q.Enqueue(ctx, b))
if b.Length() == 0 {
src := op.Next(ctx)
require.NoError(t, q.Enqueue(ctx, src))
if src.Length() == 0 {
break
}
if rng.Float64() < dequeuedProbabilityBeforeAllEnqueuesAreDone {
if ok, err := q.Dequeue(ctx, b); !ok {
if ok, err := q.Dequeue(ctx, dest); !ok {
t.Fatal("queue incorrectly considered empty")
} else if err != nil {
t.Fatal(err)
}
coldata.AssertEquivalentBatches(t, batches[0], b)
coldata.AssertEquivalentBatches(t, batches[0], dest)
batches = batches[1:]
}
}
Expand All @@ -128,25 +129,24 @@ func TestDiskQueue(t *testing.T) {
}
for i := 0; i < numReadIterations; i++ {
batchIdx := 0
b := coldata.NewMemBatch(typs, testColumnFactory)
for batchIdx < len(batches) {
if ok, err := q.Dequeue(ctx, b); !ok {
if ok, err := q.Dequeue(ctx, dest); !ok {
t.Fatal("queue incorrectly considered empty")
} else if err != nil {
t.Fatal(err)
}
coldata.AssertEquivalentBatches(t, batches[batchIdx], b)
coldata.AssertEquivalentBatches(t, batches[batchIdx], dest)
batchIdx++
}

if testReuseCache {
// Trying to Enqueue after a Dequeue should return an error in these
// CacheModes.
require.Error(t, q.Enqueue(ctx, b))
require.Error(t, q.Enqueue(ctx, dest))
}

if ok, err := q.Dequeue(ctx, b); ok {
if b.Length() != 0 {
if ok, err := q.Dequeue(ctx, dest); ok {
if dest.Length() != 0 {
t.Fatal("queue should be empty")
}
} else if err != nil {
Expand Down

0 comments on commit eab7ded

Please sign in to comment.