Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: colserde: fix the edge case with nulls handling #62915

Merged
merged 1 commit into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/col/coldata/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ func (n *Nulls) Slice(start int, end int) Nulls {
return s
}

// MaxNumElements returns the maximum number of elements that this Nulls can
// accommodate.
func (n *Nulls) MaxNumElements() int {
return len(n.nulls) * 8
}

// NullBitmap returns the null bitmap.
func (n *Nulls) NullBitmap() []byte {
return n.nulls
Expand Down
3 changes: 3 additions & 0 deletions pkg/col/coldata/vec.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/col/coldata/vec_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ func SetValueAt(v Vec, elem interface{}, rowIdx int) {
// GetValueAt is an inefficient helper to get the value in a Vec when the type
// is unknown.
func GetValueAt(v Vec, rowIdx int) interface{} {
if v.Nulls().NullAt(rowIdx) {
return nil
}
t := v.Type()
switch v.CanonicalTypeFamily() {
// {{range .}}
Expand Down
18 changes: 8 additions & 10 deletions pkg/col/coldatatestutils/random_testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,16 @@ func (o *RandomDataOp) Next(context.Context) coldata.Batch {
selProbability float64
nullProbability float64
)
if o.selection {
selProbability = o.rng.Float64()
}
if o.nulls && o.rng.Float64() > 0.1 {
// Even if nulls are desired, in 10% of cases create a batch with no
// nulls at all.
nullProbability = o.rng.Float64()
}
for {
if o.selection {
selProbability = o.rng.Float64()
}
if o.nulls {
nullProbability = o.rng.Float64()
}

b := RandomBatchWithSel(o.allocator, o.rng, o.typs, o.batchSize, nullProbability, selProbability)
if !o.selection {
b.SetSelection(false)
}
if b.Length() == 0 {
// Don't return a zero-length batch until we return o.numBatches batches.
continue
Expand Down
14 changes: 13 additions & 1 deletion pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,18 @@ func handleNulls(arr array.Interface, vec coldata.Vec, batchLength int) {
// For types with the canonical type family of Bool, Bytes, Int, or
// Float, when there are no nulls, we have a null bitmap with zero
// length.
vec.Nulls().UnsetNulls()
if vec.Nulls().MaxNumElements() < batchLength {
// The current null bitmap doesn't have enough space, so we need to
// allocate a new one.
//
// Note that this has likely occurred because on the previous batch
// there were some nulls and we replaced the null bitmap with the
// arrowBitmap which happened to be of insufficient capacity for the
// current batch.
nulls := coldata.NewNulls(batchLength)
vec.SetNulls(&nulls)
} else {
vec.Nulls().UnsetNulls()
}
}
}
67 changes: 39 additions & 28 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,35 +61,27 @@ func TestArrowBatchConverterRandom(t *testing.T) {
coldata.AssertEquivalentBatches(t, expected, actual)
}

// roundTripBatch is a helper function that round trips a batch through the
// ArrowBatchConverter and RecordBatchSerializer. Make sure to copy the input
// batch before passing it to this function to assert equality.
// roundTripBatch is a helper function that pushes the source batch through the
// ArrowBatchConverter and RecordBatchSerializer. The result is written to dest.
func roundTripBatch(
b coldata.Batch,
c *colserde.ArrowBatchConverter,
r *colserde.RecordBatchSerializer,
typs []*types.T,
) (coldata.Batch, error) {
src, dest coldata.Batch, c *colserde.ArrowBatchConverter, r *colserde.RecordBatchSerializer,
) error {
var buf bytes.Buffer
arrowDataIn, err := c.BatchToArrow(b)
arrowDataIn, err := c.BatchToArrow(src)
if err != nil {
return nil, err
return err
}
_, _, err = r.Serialize(&buf, arrowDataIn, b.Length())
_, _, err = r.Serialize(&buf, arrowDataIn, src.Length())
if err != nil {
return nil, err
return err
}

var arrowDataOut []*array.Data
batchLength, err := r.Deserialize(&arrowDataOut, buf.Bytes())
if err != nil {
return nil, err
return err
}
actual := testAllocator.NewMemBatchWithFixedCapacity(typs, batchLength)
if err := c.ArrowToBatch(arrowDataOut, batchLength, actual); err != nil {
return nil, err
}
return actual, nil
return c.ArrowToBatch(arrowDataOut, batchLength, dest)
}

func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
Expand All @@ -98,26 +90,45 @@ func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
rng, _ := randutil.NewPseudoRand()
for run := 0; run < 10; run++ {
var typs []*types.T
var b coldata.Batch
var src coldata.Batch
if rng.Float64() < 0.1 {
// In 10% of cases we'll use a zero length schema.
b = testAllocator.NewMemBatchWithFixedCapacity(typs, rng.Intn(coldata.BatchSize())+1)
b.SetLength(b.Capacity())
src = testAllocator.NewMemBatchWithFixedCapacity(typs, rng.Intn(coldata.BatchSize())+1)
src.SetLength(src.Capacity())
} else {
typs, b = randomBatch(testAllocator)
typs, src = randomBatch(testAllocator)
}
dest := testAllocator.NewMemBatchWithMaxCapacity(typs)
c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)

// Make a copy of the original batch because the converter modifies and
// casts data without copying for performance reasons.
expected := coldatatestutils.CopyBatch(b, typs, testColumnFactory)
actual, err := roundTripBatch(b, c, r, typs)
require.NoError(t, err)
// Reuse the same destination batch as well as the ArrowBatchConverter
// and RecordBatchSerializer in order to simulate how these things are
// used in the production setting.
for i := 0; i < 10; i++ {
require.NoError(t, roundTripBatch(src, dest, c, r))

coldata.AssertEquivalentBatches(t, src, dest)
// Check that we can actually read each tuple from the destination
// batch.
for _, vec := range dest.ColVecs() {
for tupleIdx := 0; tupleIdx < dest.Length(); tupleIdx++ {
coldata.GetValueAt(vec, tupleIdx)
}
}

coldata.AssertEquivalentBatches(t, expected, actual)
// Generate the new source batch.
nullProbability := rng.Float64()
if rng.Float64() < 0.1 {
// In some cases, make sure that there are no nulls at all.
nullProbability = 0
}
capacity := rng.Intn(coldata.BatchSize()) + 1
length := rng.Intn(capacity)
src = coldatatestutils.RandomBatch(testAllocator, rng, typs, capacity, length, nullProbability)
}
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) {
rng, _ := randutil.NewPseudoRand()

typs := []*types.T{types.Bytes}
b := testAllocator.NewMemBatchWithFixedCapacity(typs, coldata.BatchSize())
bytesVec := b.ColVec(0).Bytes()
src := testAllocator.NewMemBatchWithMaxCapacity(typs)
dest := testAllocator.NewMemBatchWithMaxCapacity(typs)
bytesVec := src.ColVec(0).Bytes()
maxValueLen := coldata.BytesInitialAllocationFactor * 8
value := make([]byte, maxValueLen)
for i := 0; i < coldata.BatchSize(); i++ {
Expand All @@ -312,17 +313,16 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) {
require.NoError(t, err)
bytesVec.Set(i, value)
}
b.SetLength(coldata.BatchSize())

originalMemorySize := colmem.GetBatchMemSize(b)
src.SetLength(coldata.BatchSize())

c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)
b, err = roundTripBatch(b, c, r, typs)
require.NoError(t, err)
newMemorySize := colmem.GetBatchMemSize(b)
require.NoError(t, roundTripBatch(src, dest, c, r))

originalMemorySize := colmem.GetBatchMemSize(src)
newMemorySize := colmem.GetBatchMemSize(dest)

// We expect that the original and the new memory sizes are relatively close
// to each other (do not differ by more than a third). We cannot guarantee
Expand Down
25 changes: 13 additions & 12 deletions pkg/sql/colcontainer/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,22 @@ func TestDiskQueue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(directories))

// Run verification.
// Run verification. We reuse the same batch to dequeue into
// since that is the common pattern.
dest := coldata.NewMemBatch(typs, testColumnFactory)
for {
b := op.Next(ctx)
require.NoError(t, q.Enqueue(ctx, b))
if b.Length() == 0 {
src := op.Next(ctx)
require.NoError(t, q.Enqueue(ctx, src))
if src.Length() == 0 {
break
}
if rng.Float64() < dequeuedProbabilityBeforeAllEnqueuesAreDone {
if ok, err := q.Dequeue(ctx, b); !ok {
if ok, err := q.Dequeue(ctx, dest); !ok {
t.Fatal("queue incorrectly considered empty")
} else if err != nil {
t.Fatal(err)
}
coldata.AssertEquivalentBatches(t, batches[0], b)
coldata.AssertEquivalentBatches(t, batches[0], dest)
batches = batches[1:]
}
}
Expand All @@ -127,25 +129,24 @@ func TestDiskQueue(t *testing.T) {
}
for i := 0; i < numReadIterations; i++ {
batchIdx := 0
b := coldata.NewMemBatch(typs, testColumnFactory)
for batchIdx < len(batches) {
if ok, err := q.Dequeue(ctx, b); !ok {
if ok, err := q.Dequeue(ctx, dest); !ok {
t.Fatal("queue incorrectly considered empty")
} else if err != nil {
t.Fatal(err)
}
coldata.AssertEquivalentBatches(t, batches[batchIdx], b)
coldata.AssertEquivalentBatches(t, batches[batchIdx], dest)
batchIdx++
}

if testReuseCache {
// Trying to Enqueue after a Dequeue should return an error in these
// CacheModes.
require.Error(t, q.Enqueue(ctx, b))
require.Error(t, q.Enqueue(ctx, dest))
}

if ok, err := q.Dequeue(ctx, b); ok {
if b.Length() != 0 {
if ok, err := q.Dequeue(ctx, dest); ok {
if dest.Length() != 0 {
t.Fatal("queue should be empty")
}
} else if err != nil {
Expand Down