From 1e7354bd81cbddfcfde7456a65cc02c727bf9c73 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 10 Feb 2021 14:25:29 -0800 Subject: [PATCH] colserde: do not cap byte slice for the last buffer when deserializing We recently merged a change in which we cap each of the slices for buffers which was needed to have better memory estimate. However, now we might be under-estimating the footprint if the whole `bodyBytes` has a lot of unused capacity. Consider the following example when we have 3 buffers in the serialized representation: len(bodyBytes) == 10, cap(bodyBytes) == 20 len(buffer1) == 0, len(buffer2) == 1, len(buffer3) == 9. Before the original fix, our estimate would be 20 (the capacity of the second buffer) + 19 (the capacity of the third buffer) == 39 - huge over-estimate. With the original fix but without this commit: 1 + 9 == 10 - huge under-estimate. With this commit: 1 + 19 == 20 - exactly what we want. Release note: None --- pkg/col/colserde/record_batch.go | 12 +++++++----- pkg/col/colserde/record_batch_test.go | 17 +++++++++++------ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/col/colserde/record_batch.go b/pkg/col/colserde/record_batch.go index 7834507447b2..ceeddef3be72 100644 --- a/pkg/col/colserde/record_batch.go +++ b/pkg/col/colserde/record_batch.go @@ -284,11 +284,13 @@ func (s *RecordBatchSerializer) Deserialize(data *[]*array.Data, bytes []byte) ( buffers := make([]*memory.Buffer, s.numBuffers[fieldIdx]) for i := 0; i < s.numBuffers[fieldIdx]; i++ { header.Buffers(&buf, bufferIdx) - bufStart := buf.Offset() - bufEnd := bufStart + buf.Length() - // We need to cap the slice so that bufData's capacity doesn't - // extend into the data of the next buffer. - bufData := bodyBytes[bufStart:bufEnd:bufEnd] + bufData := bodyBytes[buf.Offset() : buf.Offset()+buf.Length()] + if i < len(buffers)-1 { + // We need to cap the slice so that bufData's capacity doesn't + // extend into the data of the next buffer if this buffer is not + // the last one (meaning there is that next buffer). + bufData = bufData[:buf.Length():buf.Length()] + } buffers[i] = memory.NewBufferBytes(bufData) bufferIdx++ } diff --git a/pkg/col/colserde/record_batch_test.go b/pkg/col/colserde/record_batch_test.go index c0cbe5b8d66c..f66c2b04a90c 100644 --- a/pkg/col/colserde/record_batch_test.go +++ b/pkg/col/colserde/record_batch_test.go @@ -314,7 +314,7 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) { } b.SetLength(coldata.BatchSize()) - originalMemoryEstimate := colmem.GetBatchMemSize(b) + originalMemorySize := colmem.GetBatchMemSize(b) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) @@ -322,12 +322,17 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) { require.NoError(t, err) b, err = roundTripBatch(b, c, r, typs) require.NoError(t, err) + newMemorySize := colmem.GetBatchMemSize(b) - // We expect that the original memory estimate to be no smaller than the - // current estimate because in the original case the underlying flat []byte - // slice could have extra capacity which will not be present after - // round-tripping. - require.GreaterOrEqual(t, originalMemoryEstimate, colmem.GetBatchMemSize(b)) + // We expect that the original and the new memory sizes are relatively close + // to each other (do not differ by more than a third). We cannot guarantee + // more precise bound here because the capacities of the underlying []byte + // slices is unpredictable. However, this check is sufficient to ensure that + // we don't double count memory under `Bytes.data`. + const maxDeviation = float64(0.33) + deviation := math.Abs(float64(originalMemorySize-newMemorySize) / (float64(originalMemorySize))) + require.GreaterOrEqualf(t, maxDeviation, deviation, + "new memory size %d is too far away from original %d", newMemorySize, originalMemorySize) } func BenchmarkRecordBatchSerializerInt64(b *testing.B) {