Skip to content

Commit

Permalink
*,sstable: upgrade zstd to v1.5.6
Browse files Browse the repository at this point in the history
Upgrade zstd to v1.5.6. Due to changes in the mechanics of zstd.Decompress
function, this required a change to using a new zstd.DecompressInto entrypoint
that guarantees it'll deserialize into the provided destination buffer, never
allocating a new buffer.

Close #1706.
  • Loading branch information
jbowens committed Mar 19, 2024
1 parent 9d0109c commit 7dc1646
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 22 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/cockroachdb/pebble

require (
github.com/DataDog/zstd v1.4.5
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e
github.com/HdrHistogram/hdrhistogram-go v1.1.2
github.com/cespare/xxhash/v2 v2.2.0
github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190129172621-c8b1d7a94ddf/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
Expand Down
16 changes: 10 additions & 6 deletions sstable/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,27 @@ func decompressedLen(blockType blockType, b []byte) (int, int, error) {
}
}

func decompressInto(blockType blockType, compressed []byte, buf []byte) ([]byte, error) {
// decompressInto decompresses compressed into buf. The buf slice must have the
// exact size as the decompressed value.
func decompressInto(blockType blockType, compressed []byte, buf []byte) error {
var result []byte
var err error
switch blockType {
case snappyCompressionBlockType:
result, err = snappy.Decode(buf, compressed)
case zstdCompressionBlockType:
result, err = decodeZstd(buf, compressed)
default:
return base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(blockType))
}
if err != nil {
return nil, base.MarkCorruptionError(err)
return base.MarkCorruptionError(err)
}
if len(result) != 0 && (len(result) != len(buf) || &result[0] != &buf[0]) {
return nil, base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
if len(result) != len(buf) || (len(result) > 0 && &result[0] != &buf[0]) {
return base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
errors.Safe(result), errors.Safe(buf))
}
return result, nil
return nil
}

// decompressBlock decompresses an SST block, with manually-allocated space.
Expand All @@ -68,7 +72,7 @@ func decompressBlock(blockType blockType, b []byte) (*cache.Value, error) {
// Allocate sufficient space from the cache.
decoded := cache.Alloc(decodedLen)
decodedBuf := decoded.Buf()
if _, err := decompressInto(blockType, b, decodedBuf); err != nil {
if err := decompressInto(blockType, b, decodedBuf); err != nil {
cache.Free(decoded)
return nil, err
}
Expand Down
14 changes: 9 additions & 5 deletions sstable/compression_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ import (
"github.com/DataDog/zstd"
)

// decodeZstd decompresses b with the Zstandard algorithm.
// It reuses the preallocated capacity of decodedBuf if it is sufficient.
// On success, it returns the decoded byte slice.
func decodeZstd(decodedBuf, b []byte) ([]byte, error) {
return zstd.Decompress(decodedBuf, b)
// decodeZstd decompresses src with the Zstandard algorithm. The destination
// buffer must already be sufficiently sized, otherwise decodeZstd may error.
func decodeZstd(dst, src []byte) ([]byte, error) {
n, err := zstd.DecompressInto(dst, src)
// NB: zstd.DecompressInto may return n < 0 if err != nil.
if err != nil {
return nil, err
}
return dst[:n], nil
}

// encodeZstd compresses b with the Zstandard algorithm at default compression
Expand Down
9 changes: 4 additions & 5 deletions sstable/compression_nocgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ package sstable

import "github.com/klauspost/compress/zstd"

// decodeZstd decompresses b with the Zstandard algorithm.
// It reuses the preallocated capacity of decodedBuf if it is sufficient.
// On success, it returns the decoded byte slice.
func decodeZstd(decodedBuf, b []byte) ([]byte, error) {
// decodeZstd decompresses src with the Zstandard algorithm. The destination
// buffer must already be sufficiently sized, otherwise decodeZstd may error.
func decodeZstd(dst, src []byte) ([]byte, error) {
decoder, _ := zstd.NewReader(nil)
defer decoder.Close()
return decoder.DecodeAll(b, decodedBuf[:0])
return decoder.DecodeAll(src, dst[:0])
}

// encodeZstd compresses b with the Zstandard algorithm at default compression
Expand Down
2 changes: 1 addition & 1 deletion sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (r *Reader) readBlock(
} else {
decompressed = cacheValueOrBuf{v: cache.Alloc(decodedLen)}
}
if _, err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil {
if err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil {
compressed.release()
return bufferHandle{}, err
}
Expand Down
5 changes: 3 additions & 2 deletions sstable/suffix_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,9 @@ func readBlockBuf(r *Reader, bh BlockHandle, buf []byte) ([]byte, []byte, error)
if cap(buf) < decompressedLen {
buf = make([]byte, decompressedLen)
}
res, err := decompressInto(typ, raw[prefix:], buf[:decompressedLen])
return res, buf, err
dst := buf[:decompressedLen]
err = decompressInto(typ, raw[prefix:], dst)
return dst, buf, err
}

// memReader is a thin wrapper around a []byte such that it can be passed to
Expand Down
Binary file modified sstable/testdata/h.zstd-compression.sst
Binary file not shown.
8 changes: 8 additions & 0 deletions sstable/writer_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ func runTestFixtureOutput(fixture TestFixtureInfo) error {
return err
}

var rewrite bool
for _, arg := range os.Args {
rewrite = rewrite || arg == "--rewrite"
}
if rewrite {
return os.WriteFile(filepath.Join("testdata", fixture.Filename), got, 0644)
}

if !bytes.Equal(got, want) {
i := 0
for ; i < len(got) && i < len(want) && got[i] == want[i]; i++ {
Expand Down

0 comments on commit 7dc1646

Please sign in to comment.