Skip to content

Commit

Permalink
Merge #60297
Browse files Browse the repository at this point in the history
60297: colserde: precise slicing when deserializing arrow.Data r=yuzefovich a=yuzefovich

When we are deserializing data from Arrow format, we have a long `[]byte`
that contains several buffers within it (either 2 or 3, depending on the
encoding format). When we have 3 buffers, the second one is used for
offsets and the third one is the actual data. Previously, when slicing
out the second buffer we would not cap it which would result in that
buffer's capacity extending into the third buffer.

This shouldn't create any issues from the perspective of GC (since the
lifecycle of both buffers is the same), but it does trip up our memory
accounting system when it is estimating the footprint of the vectors
like Bytes that use 3 buffers because we're looking at the capacities of
the underlying data. Effectively, we would be double-counting the third
buffer.

This is now fixed by capping the slice for each of the buffers. As
a result, the memory estimate will likely become smaller after
round-tripping through a converter and a serializer (in the original
batch there might be extra capacity in the underlying slices that will
no longer be present after deserialization).

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 10, 2021
2 parents 39f954e + 4af6d45 commit 1577cac
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/col/colserde/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ func (s *RecordBatchSerializer) Deserialize(data *[]*array.Data, bytes []byte) (
buffers := make([]*memory.Buffer, s.numBuffers[fieldIdx])
for i := 0; i < s.numBuffers[fieldIdx]; i++ {
header.Buffers(&buf, bufferIdx)
bufData := bodyBytes[int(buf.Offset()):int(buf.Offset()+buf.Length())]
bufStart := buf.Offset()
bufEnd := bufStart + buf.Length()
// We need to cap the slice so that bufData's capacity doesn't
// extend into the data of the next buffer.
bufData := bodyBytes[bufStart:bufEnd:bufEnd]
buffers[i] = memory.NewBufferBytes(bufData)
bufferIdx++
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/cockroachdb/apd/v2"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/colserde"
"github.com/cockroachdb/cockroach/pkg/col/typeconv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -293,6 +295,41 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) {
}
}

func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) {
defer leaktest.AfterTest(t)()

var err error
rng, _ := randutil.NewPseudoRand()

typs := []*types.T{types.Bytes}
b := testAllocator.NewMemBatchWithFixedCapacity(typs, coldata.BatchSize())
bytesVec := b.ColVec(0).Bytes()
maxValueLen := coldata.BytesInitialAllocationFactor * 8
value := make([]byte, maxValueLen)
for i := 0; i < coldata.BatchSize(); i++ {
value = value[:rng.Intn(maxValueLen)]
_, err = rng.Read(value)
require.NoError(t, err)
bytesVec.Set(i, value)
}
b.SetLength(coldata.BatchSize())

originalMemoryEstimate := colmem.GetBatchMemSize(b)

c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)
b, err = roundTripBatch(b, c, r, typs)
require.NoError(t, err)

// We expect that the original memory estimate to be no smaller than the
// current estimate because in the original case the underlying flat []byte
// slice could have extra capacity which will not be present after
// round-tripping.
require.GreaterOrEqual(t, originalMemoryEstimate, colmem.GetBatchMemSize(b))
}

func BenchmarkRecordBatchSerializerInt64(b *testing.B) {
rng, _ := randutil.NewPseudoRand()

Expand Down

0 comments on commit 1577cac

Please sign in to comment.