From 4af6d45cc3d0f24454d1dfc294a56ca28656309c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 8 Feb 2021 22:59:38 -0800 Subject: [PATCH] colserde: precise slicing when deserializing arrow.Data When we are deserializing data from Arrow format, we have a long `[]byte` that contains several buffers within it (either 2 or 3, depending on the encoding format). When we have 3 buffers, the second one is used for offsets and the third one is the actual data. Previously, when slicing out the second buffer we would not cap it which would result in that buffer's capacity extending into the third buffer. This shouldn't create any issues from the perspective of GC (since the lifecycle of both buffers is the same), but it does trip up our memory accounting system when it is estimating the footprint of the vectors like Bytes that use 3 buffers because we're looking at the capacities of the underlying data. Effectively, we would be double-counting the third buffer. This is now fixed by capping the slice for each of the buffers. As a result, the memory estimate will likely become smaller after round-tripping through a converter and a serializer (in the original batch there might be extra capacity in the underlying slices that will no longer be present after deserialization). Release note: None --- pkg/col/colserde/record_batch.go | 6 ++++- pkg/col/colserde/record_batch_test.go | 37 +++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/pkg/col/colserde/record_batch.go b/pkg/col/colserde/record_batch.go index 0ef0f31b5ebe..7834507447b2 100644 --- a/pkg/col/colserde/record_batch.go +++ b/pkg/col/colserde/record_batch.go @@ -284,7 +284,11 @@ func (s *RecordBatchSerializer) Deserialize(data *[]*array.Data, bytes []byte) ( buffers := make([]*memory.Buffer, s.numBuffers[fieldIdx]) for i := 0; i < s.numBuffers[fieldIdx]; i++ { header.Buffers(&buf, bufferIdx) - bufData := bodyBytes[int(buf.Offset()):int(buf.Offset()+buf.Length())] + bufStart := buf.Offset() + bufEnd := bufStart + buf.Length() + // We need to cap the slice so that bufData's capacity doesn't + // extend into the data of the next buffer. + bufData := bodyBytes[bufStart:bufEnd:bufEnd] buffers[i] = memory.NewBufferBytes(bufData) bufferIdx++ } diff --git a/pkg/col/colserde/record_batch_test.go b/pkg/col/colserde/record_batch_test.go index c16ba1f61c0a..c0cbe5b8d66c 100644 --- a/pkg/col/colserde/record_batch_test.go +++ b/pkg/col/colserde/record_batch_test.go @@ -25,9 +25,11 @@ import ( "github.com/apache/arrow/go/arrow/array" "github.com/apache/arrow/go/arrow/memory" "github.com/cockroachdb/apd/v2" + "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/colserde" "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -293,6 +295,41 @@ func TestRecordBatchSerializerSerializeDeserializeRandom(t *testing.T) { } } +func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) { + defer leaktest.AfterTest(t)() + + var err error + rng, _ := randutil.NewPseudoRand() + + typs := []*types.T{types.Bytes} + b := testAllocator.NewMemBatchWithFixedCapacity(typs, coldata.BatchSize()) + bytesVec := b.ColVec(0).Bytes() + maxValueLen := coldata.BytesInitialAllocationFactor * 8 + value := make([]byte, maxValueLen) + for i := 0; i < coldata.BatchSize(); i++ { + value = value[:rng.Intn(maxValueLen)] + _, err = rng.Read(value) + require.NoError(t, err) + bytesVec.Set(i, value) + } + b.SetLength(coldata.BatchSize()) + + originalMemoryEstimate := colmem.GetBatchMemSize(b) + + c, err := colserde.NewArrowBatchConverter(typs) + require.NoError(t, err) + r, err := colserde.NewRecordBatchSerializer(typs) + require.NoError(t, err) + b, err = roundTripBatch(b, c, r, typs) + require.NoError(t, err) + + // We expect that the original memory estimate to be no smaller than the + // current estimate because in the original case the underlying flat []byte + // slice could have extra capacity which will not be present after + // round-tripping. + require.GreaterOrEqual(t, originalMemoryEstimate, colmem.GetBatchMemSize(b)) +} + func BenchmarkRecordBatchSerializerInt64(b *testing.B) { rng, _ := randutil.NewPseudoRand()