Skip to content

Commit

Permalink
sstable/block: move Compress, Decompress, DecompressedLen
Browse files Browse the repository at this point in the history
Move these compression routines from the sstable package into sstable/block.
Future work may also refactor these interfaces to hide more details of block
compression and decompression within the block package.
  • Loading branch information
jbowens committed Jul 31, 2024
1 parent 21672e3 commit 8af904f
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 159 deletions.
106 changes: 105 additions & 1 deletion sstable/block/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@

package block

import "github.com/cockroachdb/errors"
import (
"encoding/binary"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/golang/snappy"
)

// Compression is the per-block compression algorithm to use.
type Compression int
Expand Down Expand Up @@ -99,3 +106,100 @@ func (i CompressionIndicator) String() string {
panic(errors.Newf("sstable: unknown block type: %d", i))
}
}

// DecompressedLen returns the length of the provided block once decompressed,
// allowing the caller to allocate a buffer exactly sized to the decompressed
// payload. For some compression algorithms, the payload is prefixed with a
// varint encoding the length of the decompressed block. In such cases, a
// non-zero prefixLength is returned indicating the length of this prefix.
func DecompressedLen(
algo CompressionIndicator, b []byte,
) (decompressedLen int, prefixLength int, err error) {
switch algo {
case NoCompressionIndicator:
return 0, 0, nil
case SnappyCompressionIndicator:
l, err := snappy.DecodedLen(b)
return l, 0, err
case ZstdCompressionIndicator:
// This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
// if we implement these algorithms in the future.
decodedLenU64, varIntLen := binary.Uvarint(b)
if varIntLen <= 0 {
return 0, 0, base.CorruptionErrorf("pebble/table: compression block has invalid length")
}
return int(decodedLenU64), varIntLen, nil
default:
return 0, 0, base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(algo))
}
}

// DecompressInto decompresses compressed into buf. The buf slice must have the
// exact size as the decompressed value. Callers may use DecompressedLen to
// determine the correct size.
func DecompressInto(algo CompressionIndicator, compressed []byte, buf []byte) error {
var result []byte
var err error
switch algo {
case SnappyCompressionIndicator:
result, err = snappy.Decode(buf, compressed)
case ZstdCompressionIndicator:
result, err = decodeZstd(buf, compressed)
default:
return base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(algo))
}
if err != nil {
return base.MarkCorruptionError(err)
}
if len(result) != len(buf) || (len(result) > 0 && &result[0] != &buf[0]) {
return base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
errors.Safe(result), errors.Safe(buf))
}
return nil
}

// Decompress decompresses an sstable block into memory manually allocated with
// `cache.Alloc`. NB: If Decompress returns (nil, nil), no decompression was
// necessary and the caller may use `b` directly.
func Decompress(algo CompressionIndicator, b []byte) (*cache.Value, error) {
if algo == NoCompressionIndicator {
return nil, nil
}
// first obtain the decoded length.
decodedLen, prefixLen, err := DecompressedLen(algo, b)
if err != nil {
return nil, err
}
b = b[prefixLen:]
// Allocate sufficient space from the cache.
decoded := cache.Alloc(decodedLen)
decodedBuf := decoded.Buf()
if err := DecompressInto(algo, b, decodedBuf); err != nil {
cache.Free(decoded)
return nil, err
}
return decoded, nil
}

// Compress compresses a sstable block, using dstBuf as the desired destination.
func Compress(
compression Compression, b []byte, dstBuf []byte,
) (indicator CompressionIndicator, compressed []byte) {
switch compression {
case SnappyCompression:
return SnappyCompressionIndicator, snappy.Encode(dstBuf, b)
case NoCompression:
return NoCompressionIndicator, b
}

if len(dstBuf) < binary.MaxVarintLen64 {
dstBuf = append(dstBuf, make([]byte, binary.MaxVarintLen64-len(dstBuf))...)
}
varIntLen := binary.PutUvarint(dstBuf, uint64(len(b)))
switch compression {
case ZstdCompression:
return ZstdCompressionIndicator, encodeZstd(dstBuf, varIntLen, b)
default:
return NoCompressionIndicator, b
}
}
13 changes: 12 additions & 1 deletion sstable/compression_cgo.go → sstable/block/compression_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,25 @@
//go:build cgo
// +build cgo

package sstable
package block

import (
"bytes"

"github.com/DataDog/zstd"
)

// UseStandardZstdLib indicates whether the zstd implementation is a port of the
// official one in the facebook/zstd repository.
//
// This constant is only used in tests. Some tests rely on reproducibility of
// SST files, but a custom implementation of zstd will produce different
// compression result. So those tests have to be disabled in such cases.
//
// We cannot always use the official facebook/zstd implementation since it
// relies on CGo.
const UseStandardZstdLib = true

// decodeZstd decompresses src with the Zstandard algorithm. The destination
// buffer must already be sufficiently sized, otherwise decodeZstd may error.
func decodeZstd(dst, src []byte) ([]byte, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,21 @@
//go:build !cgo
// +build !cgo

package sstable
package block

import "github.com/klauspost/compress/zstd"

// UseStandardZstdLib indicates whether the zstd implementation is a port of the
// official one in the facebook/zstd repository.
//
// This constant is only used in tests. Some tests rely on reproducibility of
// SST files, but a custom implementation of zstd will produce different
// compression result. So those tests have to be disabled in such cases.
//
// We cannot always use the official facebook/zstd implementation since it
// relies on CGo.
const UseStandardZstdLib = false

// decodeZstd decompresses src with the Zstandard algorithm. The destination
// buffer must already be sufficiently sized, otherwise decodeZstd may error.
func decodeZstd(dst, src []byte) ([]byte, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package sstable
package block

import (
"encoding/binary"
Expand All @@ -11,7 +11,6 @@ import (
"time"

"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/stretchr/testify/require"
)

Expand All @@ -20,16 +19,16 @@ func TestCompressionRoundtrip(t *testing.T) {
t.Logf("seed %d", seed)
rng := rand.New(rand.NewSource(seed))

for compression := block.DefaultCompression + 1; compression < block.NCompression; compression++ {
for compression := DefaultCompression + 1; compression < NCompression; compression++ {
t.Run(compression.String(), func(t *testing.T) {
payload := make([]byte, rng.Intn(10<<10 /* 10 KiB */))
rng.Read(payload)
// Create a randomly-sized buffer to house the compressed output. If it's
// not sufficient, compressBlock should allocate one that is.
// not sufficient, Compress should allocate one that is.
compressedBuf := make([]byte, rng.Intn(1<<10 /* 1 KiB */))

btyp, compressed := compressBlock(compression, payload, compressedBuf)
v, err := decompressBlock(btyp, compressed)
btyp, compressed := Compress(compression, payload, compressedBuf)
v, err := Decompress(btyp, compressed)
require.NoError(t, err)
got := payload
if v != nil {
Expand All @@ -54,7 +53,7 @@ func TestDecompressionError(t *testing.T) {
fauxCompressed = fauxCompressed[:n+compressedPayloadLen]
rng.Read(fauxCompressed[n:])

v, err := decompressBlock(block.ZstdCompressionIndicator, fauxCompressed)
v, err := Decompress(ZstdCompressionIndicator, fauxCompressed)
t.Log(err)
require.Error(t, err)
require.Nil(t, v)
Expand Down
104 changes: 0 additions & 104 deletions sstable/compression.go

This file was deleted.

19 changes: 0 additions & 19 deletions sstable/compression_cgo_test.go

This file was deleted.

19 changes: 0 additions & 19 deletions sstable/compression_nocgo_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion sstable/raw_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ func compressAndChecksum(
) (compressed []byte, trailer block.Trailer) {
// Compress the buffer, discarding the result if the improvement isn't at
// least 12.5%.
algo, compressed := compressBlock(compression, b, blockBuf.compressedBuf)
algo, compressed := block.Compress(compression, b, blockBuf.compressedBuf)
if algo != block.NoCompressionIndicator && cap(compressed) > cap(blockBuf.compressedBuf) {
blockBuf.compressedBuf = compressed[:cap(compressed)]
}
Expand Down
4 changes: 2 additions & 2 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,14 +469,14 @@ func (r *Reader) readBlock(
decompressed = compressed
} else {
// Decode the length of the decompressed value.
decodedLen, prefixLen, err := decompressedLen(typ, compressed.Get())
decodedLen, prefixLen, err := block.DecompressedLen(typ, compressed.Get())
if err != nil {
compressed.Release()
return block.BufferHandle{}, err
}

decompressed = block.Alloc(decodedLen, bufferPool)
if err := decompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
if err := block.DecompressInto(typ, compressed.Get()[prefixLen:], decompressed.Get()); err != nil {
compressed.Release()
return block.BufferHandle{}, err
}
Expand Down
Loading

0 comments on commit 8af904f

Please sign in to comment.