Skip to content

Commit

Permalink
Merge #60451
Browse files Browse the repository at this point in the history
60451: colserde: do not cap byte slice for the last buffer when deserializing r=yuzefovich a=yuzefovich

We recently merged a change in which we cap each of the slices for
buffers which was needed to have better memory estimate. However, now we
might be under-estimating the footprint if the whole `bodyBytes` has
a lot of unused capacity.

Consider the following example when we have 3 buffers in the serialized
representation:
len(bodyBytes) == 10, cap(bodyBytes) == 20
len(buffer1) == 0, len(buffer2) == 1, len(buffer3) == 9.

Before the original fix, our estimate would be 20 (the capacity of the
second buffer) + 19 (the capacity of the third buffer) == 39 - huge
over-estimate. With the original fix but without this commit: 1 + 9 ==
10 - huge under-estimate. With this commit: 1 + 19 == 20 - exactly what
we want.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 13, 2021
2 parents ba1a144 + 1e7354b commit 9969862
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
12 changes: 7 additions & 5 deletions pkg/col/colserde/record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,13 @@ func (s *RecordBatchSerializer) Deserialize(data *[]*array.Data, bytes []byte) (
buffers := make([]*memory.Buffer, s.numBuffers[fieldIdx])
for i := 0; i < s.numBuffers[fieldIdx]; i++ {
header.Buffers(&buf, bufferIdx)
bufStart := buf.Offset()
bufEnd := bufStart + buf.Length()
// We need to cap the slice so that bufData's capacity doesn't
// extend into the data of the next buffer.
bufData := bodyBytes[bufStart:bufEnd:bufEnd]
bufData := bodyBytes[buf.Offset() : buf.Offset()+buf.Length()]
if i < len(buffers)-1 {
// We need to cap the slice so that bufData's capacity doesn't
// extend into the data of the next buffer if this buffer is not
// the last one (meaning there is that next buffer).
bufData = bufData[:buf.Length():buf.Length()]
}
buffers[i] = memory.NewBufferBytes(bufData)
bufferIdx++
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/col/colserde/record_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,20 +314,25 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) {
}
b.SetLength(coldata.BatchSize())

originalMemoryEstimate := colmem.GetBatchMemSize(b)
originalMemorySize := colmem.GetBatchMemSize(b)

c, err := colserde.NewArrowBatchConverter(typs)
require.NoError(t, err)
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)
b, err = roundTripBatch(b, c, r, typs)
require.NoError(t, err)
newMemorySize := colmem.GetBatchMemSize(b)

// We expect that the original memory estimate to be no smaller than the
// current estimate because in the original case the underlying flat []byte
// slice could have extra capacity which will not be present after
// round-tripping.
require.GreaterOrEqual(t, originalMemoryEstimate, colmem.GetBatchMemSize(b))
// We expect that the original and the new memory sizes are relatively close
// to each other (do not differ by more than a third). We cannot guarantee
// more precise bound here because the capacities of the underlying []byte
// slices is unpredictable. However, this check is sufficient to ensure that
// we don't double count memory under `Bytes.data`.
const maxDeviation = float64(0.33)
deviation := math.Abs(float64(originalMemorySize-newMemorySize) / (float64(originalMemorySize)))
require.GreaterOrEqualf(t, maxDeviation, deviation,
"new memory size %d is too far away from original %d", newMemorySize, originalMemorySize)
}

func BenchmarkRecordBatchSerializerInt64(b *testing.B) {
Expand Down

0 comments on commit 9969862

Please sign in to comment.