From 26e87ab1965bd7cfedb2d1cd6f95f199e8aa87db Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 12 Dec 2024 12:24:48 -0800 Subject: [PATCH] sstable: optimize allocation of uncompressed buffers When we don't compress a data buffer, the `RawColumnWriter` makes a clone of the data buffer. In the sysbench workload, we see this allocation taking place for almost half of the blocks (suggesting that the data isn't very compressible). Use the buffer that we would have used for the compressed data instead of allocating a new one. --- sstable/block/compression.go | 15 +++++++++++++-- sstable/block/compression_test.go | 3 +++ sstable/colblk_writer.go | 20 +++++++++++++------- sstable/layout.go | 2 +- sstable/rowblk_writer.go | 20 +++++++++----------- sstable/writer_test.go | 4 ++-- 6 files changed, 41 insertions(+), 23 deletions(-) diff --git a/sstable/block/compression.go b/sstable/block/compression.go index f6b91540ef..751aae26e6 100644 --- a/sstable/block/compression.go +++ b/sstable/block/compression.go @@ -208,6 +208,13 @@ func (b PhysicalBlock) Clone() PhysicalBlock { return PhysicalBlock{data: data, trailer: b.trailer} } +// CloneUsingBuf makes a copy of the block data, using the given slice if it has +// enough capacity. +func (b PhysicalBlock) CloneUsingBuf(buf []byte) (_ PhysicalBlock, newBuf []byte) { + newBuf = append(buf[:0], b.data...) + return PhysicalBlock{data: newBuf, trailer: b.trailer}, newBuf +} + // IsCompressed returns true if the block is compressed. func (b *PhysicalBlock) IsCompressed() bool { return CompressionIndicator(b.trailer[0]) != NoCompressionIndicator @@ -260,14 +267,18 @@ func CompressAndChecksum( } // compress compresses a sstable block, using dstBuf as the desired destination. +// +// The result is aliased to dstBuf if that buffer had enough capacity, otherwise +// it is a newly-allocated buffer. func compress( compression Compression, b []byte, dstBuf []byte, ) (indicator CompressionIndicator, compressed []byte) { switch compression { case SnappyCompression: + // snappy relies on the length of the buffer, and not the capacity to + // determine if it needs to make an allocation. + dstBuf = dstBuf[:cap(dstBuf):cap(dstBuf)] return SnappyCompressionIndicator, snappy.Encode(dstBuf, b) - case NoCompression: - return NoCompressionIndicator, b case ZstdCompression: if len(dstBuf) < binary.MaxVarintLen64 { dstBuf = append(dstBuf, make([]byte, binary.MaxVarintLen64-len(dstBuf))...) diff --git a/sstable/block/compression_test.go b/sstable/block/compression_test.go index 23a950a52b..108a3888c3 100644 --- a/sstable/block/compression_test.go +++ b/sstable/block/compression_test.go @@ -23,6 +23,9 @@ func TestCompressionRoundtrip(t *testing.T) { rng := rand.New(rand.NewPCG(0, seed)) for compression := DefaultCompression + 1; compression < NCompression; compression++ { + if compression == NoCompression { + continue + } t.Run(compression.String(), func(t *testing.T) { payload := make([]byte, 1+rng.IntN(10<<10 /* 10 KiB */)) for i := range payload { diff --git a/sstable/colblk_writer.go b/sstable/colblk_writer.go index ed32490f06..cb347cd765 100644 --- a/sstable/colblk_writer.go +++ b/sstable/colblk_writer.go @@ -390,7 +390,10 @@ func (w *RawColumnWriter) AddWithForceObsolete( size := w.dataBlock.Size() if shouldFlushWithoutLatestKV(size, w.pendingDataBlockSize, entriesWithoutKV, &w.dataFlush) { // Flush the data block excluding the key we just added. - w.flushDataBlockWithoutNextKey(key.UserKey) + if err := w.flushDataBlockWithoutNextKey(key.UserKey); err != nil { + w.err = err + return err + } // flushDataBlockWithoutNextKey reset the data block builder, and we can // add the key to this next block now. w.dataBlock.Add(key, valueStoredWithKey, valuePrefix, eval.kcmp, eval.isObsolete) @@ -600,16 +603,19 @@ type compressedBlock struct { blockBuf blockBuf } -func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) { +func (w *RawColumnWriter) flushDataBlockWithoutNextKey(nextKey []byte) error { serializedBlock, lastKey := w.dataBlock.Finish(w.dataBlock.Rows()-1, w.pendingDataBlockSize) w.maybeIncrementTombstoneDenseBlocks(len(serializedBlock)) // Compute the separator that will be written to the index block alongside // this data block's end offset. It is the separator between the last key in // the finished block and the [nextKey] that was excluded from the block. w.separatorBuf = w.comparer.Separator(w.separatorBuf[:0], lastKey.UserKey, nextKey) - w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf) + if err := w.enqueueDataBlock(serializedBlock, lastKey, w.separatorBuf); err != nil { + return err + } w.dataBlock.Reset() w.pendingDataBlockSize = 0 + return nil } // maybeIncrementTombstoneDenseBlocks increments the number of tombstone dense @@ -650,7 +656,7 @@ func (w *RawColumnWriter) enqueueDataBlock( cb := compressedBlockPool.Get().(*compressedBlock) cb.blockBuf.checksummer.Type = w.opts.Checksum cb.physical = block.CompressAndChecksum( - &cb.blockBuf.compressedBuf, + &cb.blockBuf.dataBuf, serializedBlock, w.opts.Compression, &cb.blockBuf.checksummer, @@ -661,7 +667,7 @@ func (w *RawColumnWriter) enqueueDataBlock( // it to the write queue to be asynchronously written to disk. // TODO(jackson): Should we try to avoid this clone by tracking the // lifetime of the DataBlockWriters? - cb.physical = cb.physical.Clone() + cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf) } return w.enqueuePhysicalBlock(cb, separator) } @@ -1145,7 +1151,7 @@ func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProper cb := compressedBlockPool.Get().(*compressedBlock) cb.blockBuf.checksummer.Type = w.opts.Checksum cb.physical = block.CompressAndChecksum( - &cb.blockBuf.compressedBuf, + &cb.blockBuf.dataBuf, b, w.opts.Compression, &cb.blockBuf.checksummer, @@ -1156,7 +1162,7 @@ func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProper // it to the write queue to be asynchronously written to disk. // TODO(jackson): Should we try to avoid this clone by tracking the // lifetime of the DataBlockWriters? - cb.physical = cb.physical.Clone() + cb.physical, cb.blockBuf.dataBuf = cb.physical.CloneUsingBuf(cb.blockBuf.dataBuf) } if err := w.enqueuePhysicalBlock(cb, sep); err != nil { return err diff --git a/sstable/layout.go b/sstable/layout.go index 09b674ab00..e05700fbac 100644 --- a/sstable/layout.go +++ b/sstable/layout.go @@ -770,7 +770,7 @@ func (w *layoutWriter) writeBlock( b []byte, compression block.Compression, buf *blockBuf, ) (block.Handle, error) { return w.writePrecompressedBlock(block.CompressAndChecksum( - &buf.compressedBuf, b, compression, &buf.checksummer)) + &buf.dataBuf, b, compression, &buf.checksummer)) } // writePrecompressedBlock writes a pre-compressed block and its diff --git a/sstable/rowblk_writer.go b/sstable/rowblk_writer.go index d257801371..27546338d8 100644 --- a/sstable/rowblk_writer.go +++ b/sstable/rowblk_writer.go @@ -461,19 +461,17 @@ type blockBuf struct { // blockTrailerLen bytes, (5 * binary.MaxVarintLen64) bytes, and most // likely large enough for a block handle with properties. tmp [blockHandleLikelyMaxLen]byte - // compressedBuf is the destination buffer for compression. It is re-used over the - // lifetime of the blockBuf, avoiding the allocation of a temporary buffer for each block. - compressedBuf []byte - checksummer block.Checksummer + // dataBuf is the destination buffer for compression, or (in some cases where + // decompression is not used) for storing a copy of the data. It is re-used + // over the lifetime of the blockBuf, avoiding the allocation of a temporary + // buffer for each block. + dataBuf []byte + checksummer block.Checksummer } func (b *blockBuf) clear() { - // We can't assign b.compressedBuf[:0] to compressedBuf because snappy relies - // on the length of the buffer, and not the capacity to determine if it needs - // to make an allocation. - *b = blockBuf{ - compressedBuf: b.compressedBuf, checksummer: b.checksummer, - } + b.tmp = [blockHandleLikelyMaxLen]byte{} + b.dataBuf = b.dataBuf[:0] } // A dataBlockBuf holds all the state required to compress and write a data block to disk. @@ -545,7 +543,7 @@ func (d *dataBlockBuf) finish() { } func (d *dataBlockBuf) compressAndChecksum(c block.Compression) { - d.physical = block.CompressAndChecksum(&d.compressedBuf, d.uncompressed, c, &d.checksummer) + d.physical = block.CompressAndChecksum(&d.dataBuf, d.uncompressed, c, &d.checksummer) } func (d *dataBlockBuf) shouldFlush( diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 45c7748ca4..be01c02f2c 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -531,7 +531,7 @@ func TestBlockBufClear(t *testing.T) { defer leaktest.AfterTest(t)() b1 := &blockBuf{} b1.tmp[0] = 1 - b1.compressedBuf = make([]byte, 1) + b1.dataBuf = make([]byte, 1) b1.clear() testBlockBufClear(t, b1, &blockBuf{}) } @@ -539,7 +539,7 @@ func TestBlockBufClear(t *testing.T) { func TestClearDataBlockBuf(t *testing.T) { defer leaktest.AfterTest(t)() d := newDataBlockBuf(1, block.ChecksumTypeCRC32c) - d.blockBuf.compressedBuf = make([]byte, 1) + d.blockBuf.dataBuf = make([]byte, 1) d.dataBlock.Add(ikey("apple"), nil) d.dataBlock.Add(ikey("banana"), nil)