From 308a7519695d912cb0356f39209690d514396632 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 24 Feb 2022 03:21:43 -0800 Subject: [PATCH] Refactor zstd decoder (#498) # TLDR; * Streams can now be decoded without goroutines using `WithDecoderConcurrency(1)`. * `WithDecoderConcurrency(4)` is now default. If you need more concurrent `DecodeAll` operations, use `WithDecoderConcurrency(0)`. Goroutines exit when streams have finished reading (either error or EOF). Designed and tested to be compatible, but test before committing upgrade. # Changes Goroutines will now only be created on demand, and `WithDecoderConcurrency(1)` is now strictly synchronized. Decompression will typically be about 2x faster when using multiple goroutines, and will prepare input for the upstream reader async to reads. This can lead to ~3x faster input in total than using no goroutines. New default is now `WithDecoderConcurrency(4)` (or less, if GOMAXPROCS is less). Beyond 4, there is little benefit for streaming decompression. * No goroutines created, unless streaming, and auto-closed at error/EOF. * Synchronous stream decoding with `WithDecoderConcurrency(1)`. * Split sequence decoding/execution for streams up to 50% faster. * Simplified error flow. * Speedup on streams. * More consistent error reporting. * Improved error detection/compliance with reference decoder. * Improved test coverage. Fixes #477 --- huff0/bitreader.go | 121 +------- huff0/decompress.go | 63 ++-- zstd/bitreader.go | 4 + zstd/blockdec.go | 421 ++++++++++++--------------- zstd/bytebuf.go | 3 + zstd/decoder.go | 619 +++++++++++++++++++++++++++++++--------- zstd/decoder_options.go | 24 +- zstd/decoder_test.go | 453 ++++++++++++++++++----------- zstd/dict_test.go | 66 +++++ zstd/encoder_test.go | 4 +- zstd/framedec.go | 202 ++----------- zstd/fuzz.go | 11 + zstd/fuzz_none.go | 11 + zstd/history.go | 46 ++- zstd/seqdec.go | 285 +++++++++++++++--- zstd/testdata/bad.zip | Bin 3241 -> 6116 bytes zstd/testdata/good.zip | Bin 3914 -> 2848 bytes zstd/zstd.go | 4 + 18 files changed, 1447 insertions(+), 890 deletions(-) create mode 100644 zstd/fuzz.go create mode 100644 zstd/fuzz_none.go diff --git a/huff0/bitreader.go b/huff0/bitreader.go index a4979e8868..03562db16f 100644 --- a/huff0/bitreader.go +++ b/huff0/bitreader.go @@ -8,115 +8,10 @@ package huff0 import ( "encoding/binary" "errors" + "fmt" "io" ) -// bitReader reads a bitstream in reverse. -// The last set bit indicates the start of the stream and is used -// for aligning the input. -type bitReader struct { - in []byte - off uint // next byte to read is at in[off - 1] - value uint64 - bitsRead uint8 -} - -// init initializes and resets the bit reader. -func (b *bitReader) init(in []byte) error { - if len(in) < 1 { - return errors.New("corrupt stream: too short") - } - b.in = in - b.off = uint(len(in)) - // The highest bit of the last byte indicates where to start - v := in[len(in)-1] - if v == 0 { - return errors.New("corrupt stream, did not find end of stream") - } - b.bitsRead = 64 - b.value = 0 - if len(in) >= 8 { - b.fillFastStart() - } else { - b.fill() - b.fill() - } - b.bitsRead += 8 - uint8(highBit32(uint32(v))) - return nil -} - -// peekBitsFast requires that at least one bit is requested every time. -// There are no checks if the buffer is filled. -func (b *bitReader) peekBitsFast(n uint8) uint16 { - const regMask = 64 - 1 - v := uint16((b.value << (b.bitsRead & regMask)) >> ((regMask + 1 - n) & regMask)) - return v -} - -// fillFast() will make sure at least 32 bits are available. -// There must be at least 4 bytes available. -func (b *bitReader) fillFast() { - if b.bitsRead < 32 { - return - } - - // 2 bounds checks. - v := b.in[b.off-4 : b.off] - v = v[:4] - low := (uint32(v[0])) | (uint32(v[1]) << 8) | (uint32(v[2]) << 16) | (uint32(v[3]) << 24) - b.value = (b.value << 32) | uint64(low) - b.bitsRead -= 32 - b.off -= 4 -} - -func (b *bitReader) advance(n uint8) { - b.bitsRead += n -} - -// fillFastStart() assumes the bitreader is empty and there is at least 8 bytes to read. -func (b *bitReader) fillFastStart() { - // Do single re-slice to avoid bounds checks. - b.value = binary.LittleEndian.Uint64(b.in[b.off-8:]) - b.bitsRead = 0 - b.off -= 8 -} - -// fill() will make sure at least 32 bits are available. -func (b *bitReader) fill() { - if b.bitsRead < 32 { - return - } - if b.off > 4 { - v := b.in[b.off-4:] - v = v[:4] - low := (uint32(v[0])) | (uint32(v[1]) << 8) | (uint32(v[2]) << 16) | (uint32(v[3]) << 24) - b.value = (b.value << 32) | uint64(low) - b.bitsRead -= 32 - b.off -= 4 - return - } - for b.off > 0 { - b.value = (b.value << 8) | uint64(b.in[b.off-1]) - b.bitsRead -= 8 - b.off-- - } -} - -// finished returns true if all bits have been read from the bit stream. -func (b *bitReader) finished() bool { - return b.off == 0 && b.bitsRead >= 64 -} - -// close the bitstream and returns an error if out-of-buffer reads occurred. -func (b *bitReader) close() error { - // Release reference. - b.in = nil - if b.bitsRead > 64 { - return io.ErrUnexpectedEOF - } - return nil -} - // bitReader reads a bitstream in reverse. // The last set bit indicates the start of the stream and is used // for aligning the input. @@ -213,10 +108,17 @@ func (b *bitReaderBytes) finished() bool { return b.off == 0 && b.bitsRead >= 64 } +func (b *bitReaderBytes) remaining() uint { + return b.off*8 + uint(64-b.bitsRead) +} + // close the bitstream and returns an error if out-of-buffer reads occurred. func (b *bitReaderBytes) close() error { // Release reference. b.in = nil + if b.remaining() > 0 { + return fmt.Errorf("corrupt input: %d bits remain on stream", b.remaining()) + } if b.bitsRead > 64 { return io.ErrUnexpectedEOF } @@ -318,10 +220,17 @@ func (b *bitReaderShifted) finished() bool { return b.off == 0 && b.bitsRead >= 64 } +func (b *bitReaderShifted) remaining() uint { + return b.off*8 + uint(64-b.bitsRead) +} + // close the bitstream and returns an error if out-of-buffer reads occurred. func (b *bitReaderShifted) close() error { // Release reference. b.in = nil + if b.remaining() > 0 { + return fmt.Errorf("corrupt input: %d bits remain on stream", b.remaining()) + } if b.bitsRead > 64 { return io.ErrUnexpectedEOF } diff --git a/huff0/decompress.go b/huff0/decompress.go index 2668b64d37..3ae7d46771 100644 --- a/huff0/decompress.go +++ b/huff0/decompress.go @@ -741,6 +741,7 @@ func (d *Decoder) Decompress4X(dst, src []byte) ([]byte, error) { } var br [4]bitReaderShifted + // Decode "jump table" start := 6 for i := 0; i < 3; i++ { length := int(src[i*2]) | (int(src[i*2+1]) << 8) @@ -865,30 +866,18 @@ func (d *Decoder) Decompress4X(dst, src []byte) ([]byte, error) { } // Decode remaining. + remainBytes := dstEvery - (decoded / 4) for i := range br { offset := dstEvery * i + endsAt := offset + remainBytes + if endsAt > len(out) { + endsAt = len(out) + } br := &br[i] - bitsLeft := br.off*8 + uint(64-br.bitsRead) + bitsLeft := br.remaining() for bitsLeft > 0 { br.fill() - if false && br.bitsRead >= 32 { - if br.off >= 4 { - v := br.in[br.off-4:] - v = v[:4] - low := (uint32(v[0])) | (uint32(v[1]) << 8) | (uint32(v[2]) << 16) | (uint32(v[3]) << 24) - br.value = (br.value << 32) | uint64(low) - br.bitsRead -= 32 - br.off -= 4 - } else { - for br.off > 0 { - br.value = (br.value << 8) | uint64(br.in[br.off-1]) - br.bitsRead -= 8 - br.off-- - } - } - } - // end inline... - if offset >= len(out) { + if offset >= endsAt { d.bufs.Put(buf) return nil, errors.New("corruption detected: stream overrun 4") } @@ -902,6 +891,10 @@ func (d *Decoder) Decompress4X(dst, src []byte) ([]byte, error) { out[offset] = uint8(v >> 8) offset++ } + if offset != endsAt { + d.bufs.Put(buf) + return nil, fmt.Errorf("corruption detected: short output block %d, end %d != %d", i, offset, endsAt) + } decoded += offset - dstEvery*i err = br.close() if err != nil { @@ -1091,10 +1084,16 @@ func (d *Decoder) decompress4X8bit(dst, src []byte) ([]byte, error) { } // Decode remaining. + // Decode remaining. + remainBytes := dstEvery - (decoded / 4) for i := range br { offset := dstEvery * i + endsAt := offset + remainBytes + if endsAt > len(out) { + endsAt = len(out) + } br := &br[i] - bitsLeft := int(br.off*8) + int(64-br.bitsRead) + bitsLeft := br.remaining() for bitsLeft > 0 { if br.finished() { d.bufs.Put(buf) @@ -1117,7 +1116,7 @@ func (d *Decoder) decompress4X8bit(dst, src []byte) ([]byte, error) { } } // end inline... - if offset >= len(out) { + if offset >= endsAt { d.bufs.Put(buf) return nil, errors.New("corruption detected: stream overrun 4") } @@ -1126,10 +1125,14 @@ func (d *Decoder) decompress4X8bit(dst, src []byte) ([]byte, error) { v := single[uint8(br.value>>shift)].entry nBits := uint8(v) br.advance(nBits) - bitsLeft -= int(nBits) + bitsLeft -= uint(nBits) out[offset] = uint8(v >> 8) offset++ } + if offset != endsAt { + d.bufs.Put(buf) + return nil, fmt.Errorf("corruption detected: short output block %d, end %d != %d", i, offset, endsAt) + } decoded += offset - dstEvery*i err = br.close() if err != nil { @@ -1315,10 +1318,15 @@ func (d *Decoder) decompress4X8bitExactly(dst, src []byte) ([]byte, error) { } // Decode remaining. + remainBytes := dstEvery - (decoded / 4) for i := range br { offset := dstEvery * i + endsAt := offset + remainBytes + if endsAt > len(out) { + endsAt = len(out) + } br := &br[i] - bitsLeft := int(br.off*8) + int(64-br.bitsRead) + bitsLeft := br.remaining() for bitsLeft > 0 { if br.finished() { d.bufs.Put(buf) @@ -1341,7 +1349,7 @@ func (d *Decoder) decompress4X8bitExactly(dst, src []byte) ([]byte, error) { } } // end inline... - if offset >= len(out) { + if offset >= endsAt { d.bufs.Put(buf) return nil, errors.New("corruption detected: stream overrun 4") } @@ -1350,10 +1358,15 @@ func (d *Decoder) decompress4X8bitExactly(dst, src []byte) ([]byte, error) { v := single[br.peekByteFast()].entry nBits := uint8(v) br.advance(nBits) - bitsLeft -= int(nBits) + bitsLeft -= uint(nBits) out[offset] = uint8(v >> 8) offset++ } + if offset != endsAt { + d.bufs.Put(buf) + return nil, fmt.Errorf("corruption detected: short output block %d, end %d != %d", i, offset, endsAt) + } + decoded += offset - dstEvery*i err = br.close() if err != nil { diff --git a/zstd/bitreader.go b/zstd/bitreader.go index 753d17df63..d7cd15ba29 100644 --- a/zstd/bitreader.go +++ b/zstd/bitreader.go @@ -7,6 +7,7 @@ package zstd import ( "encoding/binary" "errors" + "fmt" "io" "math/bits" ) @@ -132,6 +133,9 @@ func (b *bitReader) remain() uint { func (b *bitReader) close() error { // Release reference. b.in = nil + if !b.finished() { + return fmt.Errorf("%d extra bits on block, should be 0", b.remain()) + } if b.bitsRead > 64 { return io.ErrUnexpectedEOF } diff --git a/zstd/blockdec.go b/zstd/blockdec.go index dc587b2c94..e5a38d1408 100644 --- a/zstd/blockdec.go +++ b/zstd/blockdec.go @@ -76,16 +76,25 @@ type blockDec struct { // Window size of the block. WindowSize uint64 - history chan *history - input chan struct{} - result chan decodeOutput - err error - decWG sync.WaitGroup + err error + + // Check against this crc + checkCRC []byte // Frame to use for singlethreaded decoding. // Should not be used by the decoder itself since parent may be another frame. localFrame *frameDec + sequence []seqVals + + async struct { + newHist *history + literals []byte + seqData []byte + seqSize int // Size of uncompressed sequences + fcs uint64 + } + // Block is RLE, this is the size. RLESize uint32 tmp [4]byte @@ -108,13 +117,8 @@ func (b *blockDec) String() string { func newBlockDec(lowMem bool) *blockDec { b := blockDec{ - lowMem: lowMem, - result: make(chan decodeOutput, 1), - input: make(chan struct{}, 1), - history: make(chan *history, 1), + lowMem: lowMem, } - b.decWG.Add(1) - go b.startDecoder() return &b } @@ -137,6 +141,12 @@ func (b *blockDec) reset(br byteBuffer, windowSize uint64) error { case blockTypeReserved: return ErrReservedBlockType case blockTypeRLE: + if cSize > maxCompressedBlockSize || cSize > int(b.WindowSize) { + if debugDecoder { + printf("rle block too big: csize:%d block: %+v\n", uint64(cSize), b) + } + return ErrWindowSizeExceeded + } b.RLESize = uint32(cSize) if b.lowMem { maxSize = cSize @@ -158,6 +168,13 @@ func (b *blockDec) reset(br byteBuffer, windowSize uint64) error { return ErrCompressedSizeTooBig } case blockTypeRaw: + if cSize > maxCompressedBlockSize || cSize > int(b.WindowSize) { + if debugDecoder { + printf("rle block too big: csize:%d block: %+v\n", uint64(cSize), b) + } + return ErrWindowSizeExceeded + } + b.RLESize = 0 // We do not need a destination for raw blocks. maxSize = -1 @@ -192,85 +209,14 @@ func (b *blockDec) sendErr(err error) { b.Last = true b.Type = blockTypeReserved b.err = err - b.input <- struct{}{} } // Close will release resources. // Closed blockDec cannot be reset. func (b *blockDec) Close() { - close(b.input) - close(b.history) - close(b.result) - b.decWG.Wait() -} - -// decodeAsync will prepare decoding the block when it receives input. -// This will separate output and history. -func (b *blockDec) startDecoder() { - defer b.decWG.Done() - for range b.input { - //println("blockDec: Got block input") - switch b.Type { - case blockTypeRLE: - if cap(b.dst) < int(b.RLESize) { - if b.lowMem { - b.dst = make([]byte, b.RLESize) - } else { - b.dst = make([]byte, maxBlockSize) - } - } - o := decodeOutput{ - d: b, - b: b.dst[:b.RLESize], - err: nil, - } - v := b.data[0] - for i := range o.b { - o.b[i] = v - } - hist := <-b.history - hist.append(o.b) - b.result <- o - case blockTypeRaw: - o := decodeOutput{ - d: b, - b: b.data, - err: nil, - } - hist := <-b.history - hist.append(o.b) - b.result <- o - case blockTypeCompressed: - b.dst = b.dst[:0] - err := b.decodeCompressed(nil) - o := decodeOutput{ - d: b, - b: b.dst, - err: err, - } - if debugDecoder { - println("Decompressed to", len(b.dst), "bytes, error:", err) - } - b.result <- o - case blockTypeReserved: - // Used for returning errors. - <-b.history - b.result <- decodeOutput{ - d: b, - b: nil, - err: b.err, - } - default: - panic("Invalid block type") - } - if debugDecoder { - println("blockDec: Finished block") - } - } } -// decodeAsync will prepare decoding the block when it receives the history. -// If history is provided, it will not fetch it from the channel. +// decodeBuf func (b *blockDec) decodeBuf(hist *history) error { switch b.Type { case blockTypeRLE: @@ -293,14 +239,23 @@ func (b *blockDec) decodeBuf(hist *history) error { return nil case blockTypeCompressed: saved := b.dst - b.dst = hist.b - hist.b = nil + // Append directly to history + if hist.ignoreBuffer == 0 { + b.dst = hist.b + hist.b = nil + } else { + b.dst = b.dst[:0] + } err := b.decodeCompressed(hist) if debugDecoder { println("Decompressed to total", len(b.dst), "bytes, hash:", xxhash.Sum64(b.dst), "error:", err) } - hist.b = b.dst - b.dst = saved + if hist.ignoreBuffer == 0 { + hist.b = b.dst + b.dst = saved + } else { + hist.appendKeep(b.dst) + } return err case blockTypeReserved: // Used for returning errors. @@ -310,30 +265,18 @@ func (b *blockDec) decodeBuf(hist *history) error { } } -// decodeCompressed will start decompressing a block. -// If no history is supplied the decoder will decodeAsync as much as possible -// before fetching from blockDec.history -func (b *blockDec) decodeCompressed(hist *history) error { - in := b.data - delayedHistory := hist == nil - - if delayedHistory { - // We must always grab history. - defer func() { - if hist == nil { - <-b.history - } - }() - } +func (b *blockDec) decodeLiterals(in []byte, hist *history) (remain []byte, err error) { // There must be at least one byte for Literals_Block_Type and one for Sequences_Section_Header if len(in) < 2 { - return ErrBlockTooSmall + return in, ErrBlockTooSmall } + litType := literalsBlockType(in[0] & 3) var litRegenSize int var litCompSize int sizeFormat := (in[0] >> 2) & 3 var fourStreams bool + var literals []byte switch litType { case literalsBlockRaw, literalsBlockRLE: switch sizeFormat { @@ -349,7 +292,7 @@ func (b *blockDec) decodeCompressed(hist *history) error { // Regenerated_Size uses 20 bits (0-1048575). Literals_Section_Header uses 3 bytes. if len(in) < 3 { println("too small: litType:", litType, " sizeFormat", sizeFormat, len(in)) - return ErrBlockTooSmall + return in, ErrBlockTooSmall } litRegenSize = int(in[0]>>4) + (int(in[1]) << 4) + (int(in[2]) << 12) in = in[3:] @@ -360,7 +303,7 @@ func (b *blockDec) decodeCompressed(hist *history) error { // Both Regenerated_Size and Compressed_Size use 10 bits (0-1023). if len(in) < 3 { println("too small: litType:", litType, " sizeFormat", sizeFormat, len(in)) - return ErrBlockTooSmall + return in, ErrBlockTooSmall } n := uint64(in[0]>>4) + (uint64(in[1]) << 4) + (uint64(in[2]) << 12) litRegenSize = int(n & 1023) @@ -371,7 +314,7 @@ func (b *blockDec) decodeCompressed(hist *history) error { fourStreams = true if len(in) < 4 { println("too small: litType:", litType, " sizeFormat", sizeFormat, len(in)) - return ErrBlockTooSmall + return in, ErrBlockTooSmall } n := uint64(in[0]>>4) + (uint64(in[1]) << 4) + (uint64(in[2]) << 12) + (uint64(in[3]) << 20) litRegenSize = int(n & 16383) @@ -381,7 +324,7 @@ func (b *blockDec) decodeCompressed(hist *history) error { fourStreams = true if len(in) < 5 { println("too small: litType:", litType, " sizeFormat", sizeFormat, len(in)) - return ErrBlockTooSmall + return in, ErrBlockTooSmall } n := uint64(in[0]>>4) + (uint64(in[1]) << 4) + (uint64(in[2]) << 12) + (uint64(in[3]) << 20) + (uint64(in[4]) << 28) litRegenSize = int(n & 262143) @@ -392,13 +335,15 @@ func (b *blockDec) decodeCompressed(hist *history) error { if debugDecoder { println("literals type:", litType, "litRegenSize:", litRegenSize, "litCompSize:", litCompSize, "sizeFormat:", sizeFormat, "4X:", fourStreams) } - var literals []byte - var huff *huff0.Scratch + if litRegenSize > int(b.WindowSize) || litRegenSize > maxCompressedBlockSize { + return in, ErrWindowSizeExceeded + } + switch litType { case literalsBlockRaw: if len(in) < litRegenSize { println("too small: litType:", litType, " sizeFormat", sizeFormat, "remain:", len(in), "want:", litRegenSize) - return ErrBlockTooSmall + return in, ErrBlockTooSmall } literals = in[:litRegenSize] in = in[litRegenSize:] @@ -406,7 +351,7 @@ func (b *blockDec) decodeCompressed(hist *history) error { case literalsBlockRLE: if len(in) < 1 { println("too small: litType:", litType, " sizeFormat", sizeFormat, "remain:", len(in), "want:", 1) - return ErrBlockTooSmall + return in, ErrBlockTooSmall } if cap(b.literalBuf) < litRegenSize { if b.lowMem { @@ -417,7 +362,6 @@ func (b *blockDec) decodeCompressed(hist *history) error { b.literalBuf = make([]byte, litRegenSize) } else { b.literalBuf = make([]byte, litRegenSize, maxCompressedLiteralSize) - } } } @@ -433,7 +377,7 @@ func (b *blockDec) decodeCompressed(hist *history) error { case literalsBlockTreeless: if len(in) < litCompSize { println("too small: litType:", litType, " sizeFormat", sizeFormat, "remain:", len(in), "want:", litCompSize) - return ErrBlockTooSmall + return in, ErrBlockTooSmall } // Store compressed literals, so we defer decoding until we get history. literals = in[:litCompSize] @@ -441,31 +385,65 @@ func (b *blockDec) decodeCompressed(hist *history) error { if debugDecoder { printf("Found %d compressed literals\n", litCompSize) } + huff := hist.huffTree + if huff == nil { + return in, errors.New("literal block was treeless, but no history was defined") + } + // Ensure we have space to store it. + if cap(b.literalBuf) < litRegenSize { + if b.lowMem { + b.literalBuf = make([]byte, 0, litRegenSize) + } else { + b.literalBuf = make([]byte, 0, maxCompressedLiteralSize) + } + } + var err error + // Use our out buffer. + huff.MaxDecodedSize = maxCompressedBlockSize + if fourStreams { + literals, err = huff.Decoder().Decompress4X(b.literalBuf[:0:litRegenSize], literals) + } else { + literals, err = huff.Decoder().Decompress1X(b.literalBuf[:0:litRegenSize], literals) + } + // Make sure we don't leak our literals buffer + if err != nil { + println("decompressing literals:", err) + return in, err + } + if len(literals) != litRegenSize { + return in, fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals)) + } + case literalsBlockCompressed: if len(in) < litCompSize { println("too small: litType:", litType, " sizeFormat", sizeFormat, "remain:", len(in), "want:", litCompSize) - return ErrBlockTooSmall + return in, ErrBlockTooSmall } literals = in[:litCompSize] in = in[litCompSize:] - huff = huffDecoderPool.Get().(*huff0.Scratch) - var err error // Ensure we have space to store it. if cap(b.literalBuf) < litRegenSize { if b.lowMem { b.literalBuf = make([]byte, 0, litRegenSize) } else { - b.literalBuf = make([]byte, 0, maxCompressedLiteralSize) + b.literalBuf = make([]byte, 0, maxCompressedBlockSize) } } - if huff == nil { - huff = &huff0.Scratch{} + huff := hist.huffTree + if huff == nil || (hist.dict != nil && huff == hist.dict.litEnc) { + huff = huffDecoderPool.Get().(*huff0.Scratch) + if huff == nil { + huff = &huff0.Scratch{} + } } + var err error huff, literals, err = huff0.ReadTable(literals, huff) if err != nil { println("reading huffman table:", err) - return err + return in, err } + hist.huffTree = huff + huff.MaxDecodedSize = maxCompressedBlockSize // Use our out buffer. if fourStreams { literals, err = huff.Decoder().Decompress4X(b.literalBuf[:0:litRegenSize], literals) @@ -474,24 +452,52 @@ func (b *blockDec) decodeCompressed(hist *history) error { } if err != nil { println("decoding compressed literals:", err) - return err + return in, err } // Make sure we don't leak our literals buffer if len(literals) != litRegenSize { - return fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals)) + return in, fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals)) } if debugDecoder { printf("Decompressed %d literals into %d bytes\n", litCompSize, litRegenSize) } } + hist.decoders.literals = literals + return in, nil +} + +// decodeCompressed will start decompressing a block. +func (b *blockDec) decodeCompressed(hist *history) error { + in := b.data + in, err := b.decodeLiterals(in, hist) + if err != nil { + return err + } + err = b.prepareSequences(in, hist) + if err != nil { + return err + } + if hist.decoders.nSeqs == 0 { + b.dst = append(b.dst, hist.decoders.literals...) + return nil + } + err = hist.decoders.decodeSync(hist) + if err != nil { + return err + } + b.dst = hist.decoders.out + hist.recentOffsets = hist.decoders.prevOffset + return nil +} +func (b *blockDec) prepareSequences(in []byte, hist *history) (err error) { // Decode Sequences // https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#sequences-section if len(in) < 1 { return ErrBlockTooSmall } + var nSeqs int seqHeader := in[0] - nSeqs := 0 switch { case seqHeader == 0: in = in[1:] @@ -512,7 +518,8 @@ func (b *blockDec) decodeCompressed(hist *history) error { in = in[3:] } - var seqs = &sequenceDecs{} + var seqs = &hist.decoders + seqs.nSeqs = nSeqs if nSeqs > 0 { if len(in) < 1 { return ErrBlockTooSmall @@ -541,6 +548,9 @@ func (b *blockDec) decodeCompressed(hist *history) error { } switch mode { case compModePredefined: + if seq.fse != nil && !seq.fse.preDefined { + fseDecoderPool.Put(seq.fse) + } seq.fse = &fsePredef[i] case compModeRLE: if br.remain() < 1 { @@ -548,34 +558,36 @@ func (b *blockDec) decodeCompressed(hist *history) error { } v := br.Uint8() br.advance(1) - dec := fseDecoderPool.Get().(*fseDecoder) + if seq.fse == nil || seq.fse.preDefined { + seq.fse = fseDecoderPool.Get().(*fseDecoder) + } symb, err := decSymbolValue(v, symbolTableX[i]) if err != nil { printf("RLE Transform table (%v) error: %v", tableIndex(i), err) return err } - dec.setRLE(symb) - seq.fse = dec + seq.fse.setRLE(symb) if debugDecoder { printf("RLE set to %+v, code: %v", symb, v) } case compModeFSE: println("Reading table for", tableIndex(i)) - dec := fseDecoderPool.Get().(*fseDecoder) - err := dec.readNCount(&br, uint16(maxTableSymbol[i])) + if seq.fse == nil || seq.fse.preDefined { + seq.fse = fseDecoderPool.Get().(*fseDecoder) + } + err := seq.fse.readNCount(&br, uint16(maxTableSymbol[i])) if err != nil { println("Read table error:", err) return err } - err = dec.transform(symbolTableX[i]) + err = seq.fse.transform(symbolTableX[i]) if err != nil { println("Transform table error:", err) return err } if debugDecoder { - println("Read table ok", "symbolLen:", dec.symbolLen) + println("Read table ok", "symbolLen:", seq.fse.symbolLen) } - seq.fse = dec case compModeRepeat: seq.repeat = true } @@ -585,140 +597,87 @@ func (b *blockDec) decodeCompressed(hist *history) error { } in = br.unread() } - - // Wait for history. - // All time spent after this is critical since it is strictly sequential. - if hist == nil { - hist = <-b.history - if hist.error { - return ErrDecoderClosed - } - } - - // Decode treeless literal block. - if litType == literalsBlockTreeless { - // TODO: We could send the history early WITHOUT the stream history. - // This would allow decoding treeless literals before the byte history is available. - // Silencia stats: Treeless 4393, with: 32775, total: 37168, 11% treeless. - // So not much obvious gain here. - - if hist.huffTree == nil { - return errors.New("literal block was treeless, but no history was defined") - } - // Ensure we have space to store it. - if cap(b.literalBuf) < litRegenSize { - if b.lowMem { - b.literalBuf = make([]byte, 0, litRegenSize) - } else { - b.literalBuf = make([]byte, 0, maxCompressedLiteralSize) - } - } - var err error - // Use our out buffer. - huff = hist.huffTree - if fourStreams { - literals, err = huff.Decoder().Decompress4X(b.literalBuf[:0:litRegenSize], literals) - } else { - literals, err = huff.Decoder().Decompress1X(b.literalBuf[:0:litRegenSize], literals) - } - // Make sure we don't leak our literals buffer - if err != nil { - println("decompressing literals:", err) - return err - } - if len(literals) != litRegenSize { - return fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals)) - } - } else { - if hist.huffTree != nil && huff != nil { - if hist.dict == nil || hist.dict.litEnc != hist.huffTree { - huffDecoderPool.Put(hist.huffTree) - } - hist.huffTree = nil - } - } - if huff != nil { - hist.huffTree = huff - } if debugDecoder { - println("Final literals:", len(literals), "hash:", xxhash.Sum64(literals), "and", nSeqs, "sequences.") + println("Literals:", len(seqs.literals), "hash:", xxhash.Sum64(seqs.literals), "and", seqs.nSeqs, "sequences.") } if nSeqs == 0 { - // Decompressed content is defined entirely as Literals Section content. - b.dst = append(b.dst, literals...) - if delayedHistory { - hist.append(literals) + if len(b.sequence) > 0 { + b.sequence = b.sequence[:0] } return nil } - - seqs, err := seqs.mergeHistory(&hist.decoders) - if err != nil { - return err + br := seqs.br + if br == nil { + br = &bitReader{} } - if debugDecoder { - println("History merged ok") - } - br := &bitReader{} if err := br.init(in); err != nil { return err } - // TODO: Investigate if sending history without decoders are faster. - // This would allow the sequences to be decoded async and only have to construct stream history. - // If only recent offsets were not transferred, this would be an obvious win. - // Also, if first 3 sequences don't reference recent offsets, all sequences can be decoded. + if err := seqs.initialize(br, hist, b.dst); err != nil { + println("initializing sequences:", err) + return err + } + return nil +} + +func (b *blockDec) decodeSequences(hist *history) error { + if cap(b.sequence) < hist.decoders.nSeqs { + if b.lowMem { + b.sequence = make([]seqVals, 0, hist.decoders.nSeqs) + } else { + b.sequence = make([]seqVals, 0, 0x7F00+0xffff) + } + } + b.sequence = b.sequence[:hist.decoders.nSeqs] + if hist.decoders.nSeqs == 0 { + return nil + } + hist.decoders.prevOffset = hist.recentOffsets + err := hist.decoders.decode(b.sequence) + hist.recentOffsets = hist.decoders.prevOffset + return err +} +func (b *blockDec) executeSequences(hist *history) error { hbytes := hist.b if len(hbytes) > hist.windowSize { hbytes = hbytes[len(hbytes)-hist.windowSize:] - // We do not need history any more. + // We do not need history anymore. if hist.dict != nil { hist.dict.content = nil } } - - if err := seqs.initialize(br, hist, literals, b.dst); err != nil { - println("initializing sequences:", err) - return err - } - - err = seqs.decode(nSeqs, br, hbytes) + hist.decoders.windowSize = hist.windowSize + hist.decoders.out = b.dst[:0] + err := hist.decoders.execute(b.sequence, hbytes) if err != nil { return err } - if !br.finished() { - return fmt.Errorf("%d extra bits on block, should be 0", br.remain()) - } + return b.updateHistory(hist) +} - err = br.close() - if err != nil { - printf("Closing sequences: %v, %+v\n", err, *br) - } +func (b *blockDec) updateHistory(hist *history) error { if len(b.data) > maxCompressedBlockSize { return fmt.Errorf("compressed block size too large (%d)", len(b.data)) } // Set output and release references. - b.dst = seqs.out - seqs.out, seqs.literals, seqs.hist = nil, nil, nil + b.dst = hist.decoders.out + hist.recentOffsets = hist.decoders.prevOffset - if !delayedHistory { - // If we don't have delayed history, no need to update. - hist.recentOffsets = seqs.prevOffset - return nil - } if b.Last { // if last block we don't care about history. println("Last block, no history returned") hist.b = hist.b[:0] return nil + } else { + hist.append(b.dst) + if debugDecoder { + println("Finished block with ", len(b.sequence), "sequences. Added", len(b.dst), "to history, now length", len(hist.b)) + } } - hist.append(b.dst) - hist.recentOffsets = seqs.prevOffset - if debugDecoder { - println("Finished block with literals:", len(literals), "and", nSeqs, "sequences.") - } + hist.decoders.out, hist.decoders.literals = nil, nil return nil } diff --git a/zstd/bytebuf.go b/zstd/bytebuf.go index aab71c6cf8..b80191e4b1 100644 --- a/zstd/bytebuf.go +++ b/zstd/bytebuf.go @@ -113,6 +113,9 @@ func (r *readerWrapper) readBig(n int, dst []byte) ([]byte, error) { func (r *readerWrapper) readByte() (byte, error) { n2, err := r.r.Read(r.tmp[:1]) if err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } return 0, err } if n2 != 1 { diff --git a/zstd/decoder.go b/zstd/decoder.go index f430f58b57..310edbec25 100644 --- a/zstd/decoder.go +++ b/zstd/decoder.go @@ -5,9 +5,13 @@ package zstd import ( - "errors" + "bytes" + "context" + "encoding/binary" "io" "sync" + + "github.com/klauspost/compress/zstd/internal/xxhash" ) // Decoder provides decoding of zstandard streams. @@ -22,12 +26,19 @@ type Decoder struct { // Unreferenced decoders, ready for use. decoders chan *blockDec - // Streams ready to be decoded. - stream chan decodeStream - // Current read position used for Reader functionality. current decoderState + // sync stream decoding + syncStream struct { + decodedFrame uint64 + br readerWrapper + enabled bool + inFrame bool + } + + frame *frameDec + // Custom dictionaries. // Always uses copies. dicts map[uint32]dict @@ -46,7 +57,10 @@ type decoderState struct { output chan decodeOutput // cancel remaining output. - cancel chan struct{} + cancel context.CancelFunc + + // crc of current frame + crc *xxhash.Digest flushed bool } @@ -81,7 +95,7 @@ func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) { return nil, err } } - d.current.output = make(chan decodeOutput, d.o.concurrent) + d.current.crc = xxhash.New() d.current.flushed = true if r == nil { @@ -130,7 +144,7 @@ func (d *Decoder) Read(p []byte) (int, error) { break } if !d.nextBlock(n == 0) { - return n, nil + return n, d.current.err } } } @@ -195,25 +209,29 @@ func (d *Decoder) Reset(r io.Reader) error { } return nil } - - if d.stream == nil { - d.stream = make(chan decodeStream, 1) - d.streamWg.Add(1) - go d.startStreamDecoder(d.stream) - } - // Remove current block. + d.stashDecoder() d.current.decodeOutput = decodeOutput{} d.current.err = nil - d.current.cancel = make(chan struct{}) d.current.flushed = false d.current.d = nil - d.stream <- decodeStream{ - r: r, - output: d.current.output, - cancel: d.current.cancel, + // Ensure no-one else is still running... + d.streamWg.Wait() + if d.frame == nil { + d.frame = newFrameDec(d.o) + } + + if d.o.concurrent == 1 { + return d.startSyncDecoder(r) } + + d.current.output = make(chan decodeOutput, d.o.concurrent) + ctx, cancel := context.WithCancel(context.Background()) + d.current.cancel = cancel + d.streamWg.Add(1) + go d.startStreamDecoder(ctx, r, d.current.output) + return nil } @@ -221,7 +239,7 @@ func (d *Decoder) Reset(r io.Reader) error { func (d *Decoder) drainOutput() { if d.current.cancel != nil { println("cancelling current") - close(d.current.cancel) + d.current.cancel() d.current.cancel = nil } if d.current.d != nil { @@ -243,12 +261,9 @@ func (d *Decoder) drainOutput() { } d.decoders <- v.d } - if v.err == errEndOfStream { - println("current flushed") - d.current.flushed = true - return - } } + d.current.output = nil + d.current.flushed = true } // WriteTo writes data to w until there's no more data to write or when an error occurs. @@ -287,7 +302,7 @@ func (d *Decoder) WriteTo(w io.Writer) (int64, error) { // DecodeAll can be used concurrently. // The Decoder concurrency limits will be respected. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) { - if d.current.err == ErrDecoderClosed { + if d.decoders == nil { return dst, ErrDecoderClosed } @@ -300,6 +315,9 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) { } frame.rawInput = nil frame.bBuf = nil + if frame.history.decoders.br != nil { + frame.history.decoders.br.in = nil + } d.decoders <- block }() frame.bBuf = input @@ -307,27 +325,31 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) { for { frame.history.reset() err := frame.reset(&frame.bBuf) - if err == io.EOF { - if debugDecoder { - println("frame reset return EOF") + if err != nil { + if err == io.EOF { + if debugDecoder { + println("frame reset return EOF") + } + return dst, nil } - return dst, nil + return dst, err } if frame.DictionaryID != nil { dict, ok := d.dicts[*frame.DictionaryID] if !ok { return nil, ErrUnknownDictionary } + if debugDecoder { + println("setting dict", frame.DictionaryID) + } frame.history.setDict(&dict) } - if err != nil { - return dst, err - } + if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) { return dst, ErrDecoderSizeExceeded } if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 { - // Never preallocate moe than 1 GB up front. + // Never preallocate more than 1 GB up front. if cap(dst)-len(dst) < int(frame.FrameContentSize) { dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)) copy(dst2, dst) @@ -368,33 +390,170 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) { // If non-blocking mode is used the returned boolean will be false // if no data was available without blocking. func (d *Decoder) nextBlock(blocking bool) (ok bool) { - if d.current.d != nil { - if debugDecoder { - printf("re-adding current decoder %p", d.current.d) - } - d.decoders <- d.current.d - d.current.d = nil - } if d.current.err != nil { // Keep error state. - return blocking + return false } + d.current.b = d.current.b[:0] + // SYNC: + if d.syncStream.enabled { + if !blocking { + return false + } + ok = d.nextBlockSync() + if !ok { + d.stashDecoder() + } + return ok + } + + //ASYNC: + d.stashDecoder() if blocking { - d.current.decodeOutput = <-d.current.output + d.current.decodeOutput, ok = <-d.current.output } else { select { - case d.current.decodeOutput = <-d.current.output: + case d.current.decodeOutput, ok = <-d.current.output: default: return false } } + if !ok { + // This should not happen, so signal error state... + d.current.err = io.ErrUnexpectedEOF + return false + } + next := d.current.decodeOutput + if next.d != nil && next.d.async.newHist != nil { + d.current.crc.Reset() + } if debugDecoder { - println("got", len(d.current.b), "bytes, error:", d.current.err) + var tmp [4]byte + binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b))) + println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp) } + + if len(next.b) > 0 { + n, err := d.current.crc.Write(next.b) + if err == nil { + if n != len(next.b) { + d.current.err = io.ErrShortWrite + } + } + } + if next.err == nil && next.d != nil && len(next.d.checkCRC) != 0 { + got := d.current.crc.Sum64() + var tmp [4]byte + binary.LittleEndian.PutUint32(tmp[:], uint32(got)) + if !bytes.Equal(tmp[:], next.d.checkCRC) && !ignoreCRC { + if debugDecoder { + println("CRC Check Failed:", tmp[:], " (got) !=", next.d.checkCRC, "(on stream)") + } + d.current.err = ErrCRCMismatch + } else { + if debugDecoder { + println("CRC ok", tmp[:]) + } + } + } + return true } +func (d *Decoder) nextBlockSync() (ok bool) { + if d.current.d == nil { + d.current.d = <-d.decoders + } + for len(d.current.b) == 0 { + if !d.syncStream.inFrame { + d.frame.history.reset() + d.current.err = d.frame.reset(&d.syncStream.br) + if d.current.err != nil { + return false + } + if d.frame.DictionaryID != nil { + dict, ok := d.dicts[*d.frame.DictionaryID] + if !ok { + d.current.err = ErrUnknownDictionary + return false + } else { + d.frame.history.setDict(&dict) + } + } + if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize { + d.current.err = ErrDecoderSizeExceeded + return false + } + + d.syncStream.decodedFrame = 0 + d.syncStream.inFrame = true + } + d.current.err = d.frame.next(d.current.d) + if d.current.err != nil { + return false + } + d.frame.history.ensureBlock() + if debugDecoder { + println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame) + } + histBefore := len(d.frame.history.b) + d.current.err = d.current.d.decodeBuf(&d.frame.history) + + if d.current.err != nil { + println("error after:", d.current.err) + return false + } + d.current.b = d.frame.history.b[histBefore:] + if debugDecoder { + println("history after:", len(d.frame.history.b)) + } + + // Check frame size (before CRC) + d.syncStream.decodedFrame += uint64(len(d.current.b)) + if d.frame.FrameContentSize > 0 && d.syncStream.decodedFrame > d.frame.FrameContentSize { + if debugDecoder { + printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize) + } + d.current.err = ErrFrameSizeExceeded + return false + } + + // Check FCS + if d.current.d.Last && d.frame.FrameContentSize > 0 && d.syncStream.decodedFrame != d.frame.FrameContentSize { + if debugDecoder { + printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize) + } + d.current.err = ErrFrameSizeMismatch + return false + } + + // Update/Check CRC + if d.frame.HasCheckSum { + d.frame.crc.Write(d.current.b) + if d.current.d.Last { + d.current.err = d.frame.checkCRC() + if d.current.err != nil { + println("CRC error:", d.current.err) + return false + } + } + } + d.syncStream.inFrame = !d.current.d.Last + } + return true +} + +func (d *Decoder) stashDecoder() { + if d.current.d != nil { + if debugDecoder { + printf("re-adding current decoder %p", d.current.d) + } + d.decoders <- d.current.d + d.current.d = nil + } +} + // Close will release all resources. // It is NOT possible to reuse the decoder after this. func (d *Decoder) Close() { @@ -402,10 +561,10 @@ func (d *Decoder) Close() { return } d.drainOutput() - if d.stream != nil { - close(d.stream) + if d.current.cancel != nil { + d.current.cancel() d.streamWg.Wait() - d.stream = nil + d.current.cancel = nil } if d.decoders != nil { close(d.decoders) @@ -456,100 +615,304 @@ type decodeOutput struct { err error } -type decodeStream struct { - r io.Reader - - // Blocks ready to be written to output. - output chan decodeOutput - - // cancel reading from the input - cancel chan struct{} +func (d *Decoder) startSyncDecoder(r io.Reader) error { + d.frame.history.reset() + d.syncStream.br = readerWrapper{r: r} + d.syncStream.inFrame = false + d.syncStream.enabled = true + d.syncStream.decodedFrame = 0 + return nil } -// errEndOfStream indicates that everything from the stream was read. -var errEndOfStream = errors.New("end-of-stream") - // Create Decoder: -// Spawn n block decoders. These accept tasks to decode a block. -// Create goroutine that handles stream processing, this will send history to decoders as they are available. -// Decoders update the history as they decode. -// When a block is returned: -// a) history is sent to the next decoder, -// b) content written to CRC. -// c) return data to WRITER. -// d) wait for next block to return data. -// Once WRITTEN, the decoders reused by the writer frame decoder for re-use. -func (d *Decoder) startStreamDecoder(inStream chan decodeStream) { +// ASYNC: +// Spawn 4 go routines. +// 0: Read frames and decode blocks. +// 1: Decode block and literals. Receives hufftree and seqdecs, returns seqdecs and huff tree. +// 2: Wait for recentOffsets if needed. Decode sequences, send recentOffsets. +// 3: Wait for stream history, execute sequences, send stream history. +func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) { defer d.streamWg.Done() - frame := newFrameDec(d.o) - for stream := range inStream { - if debugDecoder { - println("got new stream") + br := readerWrapper{r: r} + + var seqPrepare = make(chan *blockDec, d.o.concurrent) + var seqDecode = make(chan *blockDec, d.o.concurrent) + var seqExecute = make(chan *blockDec, d.o.concurrent) + + // Async 1: Prepare blocks... + go func() { + var hist history + var hasErr bool + for block := range seqPrepare { + if hasErr { + if block != nil { + seqDecode <- block + } + continue + } + if block.async.newHist != nil { + if debugDecoder { + println("Async 1: new history") + } + hist.reset() + if block.async.newHist.dict != nil { + hist.setDict(block.async.newHist.dict) + } + } + if block.err != nil || block.Type != blockTypeCompressed { + hasErr = block.err != nil + seqDecode <- block + continue + } + + remain, err := block.decodeLiterals(block.data, &hist) + block.err = err + hasErr = block.err != nil + if err == nil { + block.async.literals = hist.decoders.literals + block.async.seqData = remain + } else if debugDecoder { + println("decodeLiterals error:", err) + } + seqDecode <- block } - br := readerWrapper{r: stream.r} - decodeStream: - for { - frame.history.reset() - err := frame.reset(&br) - if debugDecoder && err != nil { - println("Frame decoder returned", err) + close(seqDecode) + }() + + // Async 2: Decode sequences... + go func() { + var hist history + var hasErr bool + + for block := range seqDecode { + if hasErr { + if block != nil { + seqExecute <- block + } + continue } - if err == nil && frame.DictionaryID != nil { - dict, ok := d.dicts[*frame.DictionaryID] - if !ok { - err = ErrUnknownDictionary - } else { - frame.history.setDict(&dict) + if block.async.newHist != nil { + if debugDecoder { + println("Async 2: new history, recent:", block.async.newHist.recentOffsets) + } + hist.decoders = block.async.newHist.decoders + hist.recentOffsets = block.async.newHist.recentOffsets + if block.async.newHist.dict != nil { + hist.setDict(block.async.newHist.dict) } } - if err != nil { - stream.output <- decodeOutput{ - err: err, + if block.err != nil || block.Type != blockTypeCompressed { + hasErr = block.err != nil + seqExecute <- block + continue + } + + hist.decoders.literals = block.async.literals + block.err = block.prepareSequences(block.async.seqData, &hist) + if debugDecoder && block.err != nil { + println("prepareSequences returned:", block.err) + } + hasErr = block.err != nil + if block.err == nil { + block.err = block.decodeSequences(&hist) + if debugDecoder && block.err != nil { + println("decodeSequences returned:", block.err) } - break + hasErr = block.err != nil + // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs] + block.async.seqSize = hist.decoders.seqSize } - if debugDecoder { - println("starting frame decoder") - } - - // This goroutine will forward history between frames. - frame.frameDone.Add(1) - frame.initAsync() - - go frame.startDecoder(stream.output) - decodeFrame: - // Go through all blocks of the frame. - for { - dec := <-d.decoders - select { - case <-stream.cancel: - if !frame.sendErr(dec, io.EOF) { - // To not let the decoder dangle, send it back. - stream.output <- decodeOutput{d: dec} + seqExecute <- block + } + close(seqExecute) + }() + + var wg sync.WaitGroup + wg.Add(1) + + // Async 3: Execute sequences... + frameHistCache := d.frame.history.b + go func() { + var hist history + var decodedFrame uint64 + var fcs uint64 + var hasErr bool + for block := range seqExecute { + out := decodeOutput{err: block.err, d: block} + if block.err != nil || hasErr { + hasErr = true + output <- out + continue + } + if block.async.newHist != nil { + if debugDecoder { + println("Async 3: new history") + } + hist.windowSize = block.async.newHist.windowSize + hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer + if block.async.newHist.dict != nil { + hist.setDict(block.async.newHist.dict) + } + + if cap(hist.b) < hist.allocFrameBuffer { + if cap(frameHistCache) >= hist.allocFrameBuffer { + hist.b = frameHistCache + } else { + hist.b = make([]byte, 0, hist.allocFrameBuffer) + println("Alloc history sized", hist.allocFrameBuffer) + } + } + hist.b = hist.b[:0] + fcs = block.async.fcs + decodedFrame = 0 + } + do := decodeOutput{err: block.err, d: block} + switch block.Type { + case blockTypeRLE: + if debugDecoder { + println("add rle block length:", block.RLESize) + } + + if cap(block.dst) < int(block.RLESize) { + if block.lowMem { + block.dst = make([]byte, block.RLESize) + } else { + block.dst = make([]byte, maxBlockSize) } - break decodeStream - default: } - err := frame.next(dec) - switch err { - case io.EOF: - // End of current frame, no error - println("EOF on next block") - break decodeFrame - case nil: - continue - default: - println("block decoder returned", err) - break decodeStream + block.dst = block.dst[:block.RLESize] + v := block.data[0] + for i := range block.dst { + block.dst[i] = v + } + hist.append(block.dst) + do.b = block.dst + case blockTypeRaw: + if debugDecoder { + println("add raw block length:", len(block.data)) + } + hist.append(block.data) + do.b = block.data + case blockTypeCompressed: + if debugDecoder { + println("execute with history length:", len(hist.b), "window:", hist.windowSize) + } + hist.decoders.seqSize = block.async.seqSize + hist.decoders.literals = block.async.literals + do.err = block.executeSequences(&hist) + hasErr = do.err != nil + if debugDecoder && hasErr { + println("executeSequences returned:", do.err) + } + do.b = block.dst + } + if !hasErr { + decodedFrame += uint64(len(do.b)) + if fcs > 0 && decodedFrame > fcs { + println("fcs exceeded", block.Last, fcs, decodedFrame) + do.err = ErrFrameSizeExceeded + hasErr = true + } else if block.Last && fcs > 0 && decodedFrame != fcs { + do.err = ErrFrameSizeMismatch + hasErr = true + } else { + println("fcs ok", block.Last, fcs, decodedFrame) + } + } + output <- do + } + close(output) + frameHistCache = hist.b + wg.Done() + if debugDecoder { + println("decoder goroutines finished") + } + }() + +decodeStream: + for { + frame := d.frame + if debugDecoder { + println("New frame...") + } + var historySent bool + frame.history.reset() + err := frame.reset(&br) + if debugDecoder && err != nil { + println("Frame decoder returned", err) + } + if err == nil && frame.DictionaryID != nil { + dict, ok := d.dicts[*frame.DictionaryID] + if !ok { + err = ErrUnknownDictionary + } else { + frame.history.setDict(&dict) + } + } + if err == nil && d.frame.WindowSize > d.o.maxWindowSize { + err = ErrDecoderSizeExceeded + } + if err != nil { + select { + case <-ctx.Done(): + case dec := <-d.decoders: + dec.sendErr(err) + seqPrepare <- dec + } + break decodeStream + } + + // Go through all blocks of the frame. + for { + var dec *blockDec + select { + case <-ctx.Done(): + break decodeStream + case dec = <-d.decoders: + // Once we have a decoder, we MUST return it. + } + err := frame.next(dec) + if !historySent { + h := frame.history + if debugDecoder { + println("Alloc History:", h.allocFrameBuffer) + } + dec.async.newHist = &h + dec.async.fcs = frame.FrameContentSize + historySent = true + } else { + dec.async.newHist = nil + } + if debugDecoder && err != nil { + println("next block returned error:", err) + } + dec.err = err + dec.checkCRC = nil + if dec.Last && frame.HasCheckSum && err == nil { + crc, err := frame.rawInput.readSmall(4) + if err != nil { + println("CRC missing?", err) + dec.err = err } + var tmp [4]byte + copy(tmp[:], crc) + dec.checkCRC = tmp[:] + if debugDecoder { + println("found crc to check:", dec.checkCRC) + } + } + err = dec.err + last := dec.Last + seqPrepare <- dec + if err != nil { + break decodeStream + } + if last { + break } - // All blocks have started decoding, check if there are more frames. - println("waiting for done") - frame.frameDone.Wait() - println("done waiting...") } - frame.frameDone.Wait() - println("Sending EOS") - stream.output <- decodeOutput{err: errEndOfStream} } + close(seqPrepare) + wg.Wait() + d.frame.history.b = frameHistCache } diff --git a/zstd/decoder_options.go b/zstd/decoder_options.go index 95cc9b8b81..fd05c9bb01 100644 --- a/zstd/decoder_options.go +++ b/zstd/decoder_options.go @@ -28,6 +28,9 @@ func (o *decoderOptions) setDefault() { concurrent: runtime.GOMAXPROCS(0), maxWindowSize: MaxWindowSize, } + if o.concurrent > 4 { + o.concurrent = 4 + } o.maxDecodedSize = 1 << 63 } @@ -37,16 +40,25 @@ func WithDecoderLowmem(b bool) DOption { return func(o *decoderOptions) error { o.lowMem = b; return nil } } -// WithDecoderConcurrency will set the concurrency, -// meaning the maximum number of decoders to run concurrently. -// The value supplied must be at least 1. -// By default this will be set to GOMAXPROCS. +// WithDecoderConcurrency sets the number of created decoders. +// When decoding block with DecodeAll, this will limit the number +// of possible concurrently running decodes. +// When decoding streams, this will limit the number of +// inflight blocks. +// When decoding streams and setting maximum to 1, +// no async decoding will be done. +// When a value of 0 is provided GOMAXPROCS will be used. +// By default this will be set to 4 or GOMAXPROCS, whatever is lower. func WithDecoderConcurrency(n int) DOption { return func(o *decoderOptions) error { - if n <= 0 { + if n < 0 { return errors.New("concurrency must be at least 1") } - o.concurrent = n + if n == 0 { + o.concurrent = runtime.GOMAXPROCS(0) + } else { + o.concurrent = n + } return nil } } diff --git a/zstd/decoder_test.go b/zstd/decoder_test.go index 5af4c406ca..5eeebada6c 100644 --- a/zstd/decoder_test.go +++ b/zstd/decoder_test.go @@ -200,13 +200,19 @@ func TestErrorWriter(t *testing.T) { } func TestNewDecoder(t *testing.T) { - defer timeout(60 * time.Second)() - testDecoderFile(t, "testdata/decoder.zip") - dec, err := NewReader(nil) - if err != nil { - t.Fatal(err) + for _, n := range []int{1, 4} { + t.Run(fmt.Sprintf("cpu-%d", n), func(t *testing.T) { + newFn := func() (*Decoder, error) { + return NewReader(nil, WithDecoderConcurrency(n)) + } + testDecoderFile(t, "testdata/decoder.zip", newFn) + dec, err := newFn() + if err != nil { + t.Fatal(err) + } + testDecoderDecodeAll(t, "testdata/decoder.zip", dec) + }) } - testDecoderDecodeAll(t, "testdata/decoder.zip", dec) } func TestNewDecoderMemory(t *testing.T) { @@ -384,26 +390,53 @@ func TestNewDecoderFrameSize(t *testing.T) { } func TestNewDecoderGood(t *testing.T) { - defer timeout(30 * time.Second)() - testDecoderFile(t, "testdata/good.zip") - dec, err := NewReader(nil) - if err != nil { - t.Fatal(err) + for _, n := range []int{1, 4} { + t.Run(fmt.Sprintf("cpu-%d", n), func(t *testing.T) { + newFn := func() (*Decoder, error) { + return NewReader(nil, WithDecoderConcurrency(n)) + } + testDecoderFile(t, "testdata/good.zip", newFn) + dec, err := newFn() + if err != nil { + t.Fatal(err) + } + testDecoderDecodeAll(t, "testdata/good.zip", dec) + }) } - testDecoderDecodeAll(t, "testdata/good.zip", dec) } func TestNewDecoderBad(t *testing.T) { - defer timeout(10 * time.Second)() - dec, err := NewReader(nil) - if err != nil { - t.Fatal(err) + var errMap = make(map[string]string) + if true { + t.Run("Reader-4", func(t *testing.T) { + newFn := func() (*Decoder, error) { + return NewReader(nil, WithDecoderConcurrency(4)) + } + testDecoderFileBad(t, "testdata/bad.zip", newFn, errMap) + + }) + t.Run("Reader-1", func(t *testing.T) { + newFn := func() (*Decoder, error) { + return NewReader(nil, WithDecoderConcurrency(1)) + } + testDecoderFileBad(t, "testdata/bad.zip", newFn, errMap) + }) } - testDecoderDecodeAllError(t, "testdata/bad.zip", dec) + t.Run("DecodeAll", func(t *testing.T) { + defer timeout(10 * time.Second)() + dec, err := NewReader(nil) + if err != nil { + t.Fatal(err) + } + testDecoderDecodeAllError(t, "testdata/bad.zip", dec, errMap) + }) } func TestNewDecoderLarge(t *testing.T) { - testDecoderFile(t, "testdata/large.zip") + newFn := func() (*Decoder, error) { + return NewReader(nil) + } + testDecoderFile(t, "testdata/large.zip", newFn) dec, err := NewReader(nil) if err != nil { t.Fatal(err) @@ -425,7 +458,7 @@ func TestNewReaderRead(t *testing.T) { } func TestNewDecoderBig(t *testing.T) { - if testing.Short() { + if testing.Short() || isRaceTest { t.SkipNow() } file := "testdata/zstd-10kfiles.zip" @@ -433,7 +466,10 @@ func TestNewDecoderBig(t *testing.T) { t.Skip("To run extended tests, download https://files.klauspost.com/compress/zstd-10kfiles.zip \n" + "and place it in " + file + "\n" + "Running it requires about 5GB of RAM") } - testDecoderFile(t, file) + newFn := func() (*Decoder, error) { + return NewReader(nil) + } + testDecoderFile(t, file, newFn) dec, err := NewReader(nil) if err != nil { t.Fatal(err) @@ -442,7 +478,7 @@ func TestNewDecoderBig(t *testing.T) { } func TestNewDecoderBigFile(t *testing.T) { - if testing.Short() { + if testing.Short() || isRaceTest { t.SkipNow() } file := "testdata/enwik9.zst" @@ -461,6 +497,7 @@ func TestNewDecoderBigFile(t *testing.T) { if err != nil { t.Fatal(err) } + defer dec.Close() n, err := io.Copy(ioutil.Discard, dec) if err != nil { t.Fatal(err) @@ -945,7 +982,7 @@ func TestDecoderMultiFrameReset(t *testing.T) { } } -func testDecoderFile(t *testing.T, fn string) { +func testDecoderFile(t *testing.T, fn string, newDec func() (*Decoder, error)) { data, err := ioutil.ReadFile(fn) if err != nil { t.Fatal(err) @@ -967,7 +1004,7 @@ func testDecoderFile(t *testing.T, fn string) { want[tt.Name+".zst"], _ = ioutil.ReadAll(r) } - dec, err := NewReader(nil) + dec, err := newDec() if err != nil { t.Error(err) return @@ -978,24 +1015,49 @@ func testDecoderFile(t *testing.T, fn string) { continue } t.Run("Reader-"+tt.Name, func(t *testing.T) { + defer timeout(10 * time.Second)() r, err := tt.Open() if err != nil { t.Error(err) return } - defer r.Close() - err = dec.Reset(r) + data, err := ioutil.ReadAll(r) + r.Close() if err != nil { t.Error(err) return } - got, err := ioutil.ReadAll(dec) + err = dec.Reset(ioutil.NopCloser(bytes.NewBuffer(data))) + if err != nil { + t.Error(err) + return + } + var got []byte + var gotError error + var wg sync.WaitGroup + wg.Add(1) + go func() { + got, gotError = ioutil.ReadAll(dec) + wg.Done() + }() + + // This decode should not interfere with the stream... + gotDecAll, err := dec.DecodeAll(data, nil) if err != nil { t.Error(err) if err != ErrCRCMismatch { + wg.Wait() return } } + wg.Wait() + if gotError != nil { + t.Error(err) + if err != ErrCRCMismatch { + return + } + } + wantB := want[tt.Name] if !bytes.Equal(wantB, got) { if len(wantB)+len(got) < 1000 { @@ -1015,11 +1077,93 @@ func testDecoderFile(t *testing.T, fn string) { t.Error("Output mismatch") return } + if !bytes.Equal(wantB, gotDecAll) { + if len(wantB)+len(got) < 1000 { + t.Logf(" got: %v\nwant: %v", got, wantB) + } else { + fileName, _ := filepath.Abs(filepath.Join("testdata", t.Name()+"-want.bin")) + _ = os.MkdirAll(filepath.Dir(fileName), os.ModePerm) + err := ioutil.WriteFile(fileName, wantB, os.ModePerm) + t.Log("Wrote file", fileName, err) + + fileName, _ = filepath.Abs(filepath.Join("testdata", t.Name()+"-got.bin")) + _ = os.MkdirAll(filepath.Dir(fileName), os.ModePerm) + err = ioutil.WriteFile(fileName, got, os.ModePerm) + t.Log("Wrote file", fileName, err) + } + t.Logf("Length, want: %d, got: %d", len(wantB), len(got)) + t.Error("DecodeAll Output mismatch") + } t.Log(len(got), "bytes returned, matches input, ok!") }) } } +func testDecoderFileBad(t *testing.T, fn string, newDec func() (*Decoder, error), errMap map[string]string) { + data, err := ioutil.ReadFile(fn) + if err != nil { + t.Fatal(err) + } + zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) + if err != nil { + t.Fatal(err) + } + var want = make(map[string][]byte) + for _, tt := range zr.File { + if strings.HasSuffix(tt.Name, ".zst") { + continue + } + r, err := tt.Open() + if err != nil { + t.Fatal(err) + return + } + want[tt.Name+".zst"], _ = ioutil.ReadAll(r) + } + + dec, err := newDec() + if err != nil { + t.Error(err) + return + } + defer dec.Close() + for _, tt := range zr.File { + t.Run(tt.Name, func(t *testing.T) { + defer timeout(10 * time.Second)() + r, err := tt.Open() + if err != nil { + t.Error(err) + return + } + defer r.Close() + err = dec.Reset(r) + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(dec) + if err == nil { + want := errMap[tt.Name] + if want == "" { + want = "" + } + t.Error("Did not get expected error", want, "- got ", len(got), "bytes") + return + } + if errMap[tt.Name] == "" { + errMap[tt.Name] = err.Error() + } else { + want := errMap[tt.Name] + if want != err.Error() { + t.Errorf("error mismatch, prev run got %s, now got %s", want, err.Error()) + } + return + } + t.Log("got error", err) + }) + } +} + func BenchmarkDecoder_DecoderSmall(b *testing.B) { fn := "testdata/benchdecoder.zip" data, err := ioutil.ReadFile(fn) @@ -1128,100 +1272,108 @@ func BenchmarkDecoder_DecodeAll(b *testing.B) { } } -func BenchmarkDecoder_DecodeAllParallel(b *testing.B) { - fn := "testdata/benchdecoder.zip" - data, err := ioutil.ReadFile(fn) - if err != nil { - b.Fatal(err) - } - zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) - if err != nil { - b.Fatal(err) - } - dec, err := NewReader(nil) - if err != nil { - b.Fatal(err) - return - } - defer dec.Close() - for _, tt := range zr.File { - if !strings.HasSuffix(tt.Name, ".zst") { - continue +func BenchmarkDecoder_DecodeAllFiles(b *testing.B) { + filepath.Walk("../testdata/", func(path string, info os.FileInfo, err error) error { + if info.IsDir() || info.Size() < 100 { + return nil } - b.Run(tt.Name, func(b *testing.B) { - r, err := tt.Open() - if err != nil { - b.Fatal(err) - } - defer r.Close() - in, err := ioutil.ReadAll(r) + b.Run(filepath.Base(path), func(b *testing.B) { + raw, err := ioutil.ReadFile(path) if err != nil { - b.Fatal(err) - } - got, err := dec.DecodeAll(in, nil) - if err != nil { - b.Fatal(err) + b.Error(err) } - b.SetBytes(int64(len(got))) - b.ReportAllocs() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - got := make([]byte, len(got)) - for pb.Next() { - _, err = dec.DecodeAll(in, got[:0]) + for i := SpeedFastest; i <= SpeedBestCompression; i++ { + if testing.Short() && i > SpeedFastest { + break + } + b.Run(i.String(), func(b *testing.B) { + enc, err := NewWriter(nil, WithEncoderLevel(i), WithSingleSegment(true)) if err != nil { - b.Fatal(err) + b.Error(err) } - } - }) + encoded := enc.EncodeAll(raw, nil) + if err != nil { + b.Error(err) + } + dec, err := NewReader(nil, WithDecoderConcurrency(1)) + if err != nil { + b.Error(err) + } + decoded, err := dec.DecodeAll(encoded, nil) + if err != nil { + b.Error(err) + } + b.SetBytes(int64(len(raw))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + decoded, err = dec.DecodeAll(encoded, decoded[:0]) + if err != nil { + b.Error(err) + } + } + b.ReportMetric(100*float64(len(encoded))/float64(len(raw)), "pct") + }) + } }) - } + return nil + }) } -/* -func BenchmarkDecoder_DecodeAllCgo(b *testing.B) { - fn := "testdata/benchdecoder.zip" - data, err := ioutil.ReadFile(fn) - if err != nil { - b.Fatal(err) - } - zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) - if err != nil { - b.Fatal(err) - } - for _, tt := range zr.File { - if !strings.HasSuffix(tt.Name, ".zst") { - continue +func BenchmarkDecoder_DecodeAllFilesP(b *testing.B) { + filepath.Walk("../testdata/", func(path string, info os.FileInfo, err error) error { + if info.IsDir() || info.Size() < 100 { + return nil } - b.Run(tt.Name, func(b *testing.B) { - tt := tt - r, err := tt.Open() - if err != nil { - b.Fatal(err) - } - defer r.Close() - in, err := ioutil.ReadAll(r) - if err != nil { - b.Fatal(err) - } - got, err := zstd.Decompress(nil, in) + b.Run(filepath.Base(path), func(b *testing.B) { + raw, err := ioutil.ReadFile(path) if err != nil { - b.Fatal(err) + b.Error(err) } - b.SetBytes(int64(len(got))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - got, err = zstd.Decompress(got, in) - if err != nil { - b.Fatal(err) + for i := SpeedFastest; i <= SpeedBestCompression; i++ { + if testing.Short() && i > SpeedFastest { + break } + b.Run(i.String(), func(b *testing.B) { + enc, err := NewWriter(nil, WithEncoderLevel(i), WithSingleSegment(true)) + if err != nil { + b.Error(err) + } + encoded := enc.EncodeAll(raw, nil) + if err != nil { + b.Error(err) + } + dec, err := NewReader(nil, WithDecoderConcurrency(0)) + if err != nil { + b.Error(err) + } + _, err = dec.DecodeAll(encoded, nil) + if err != nil { + b.Error(err) + } + + b.SetBytes(int64(len(raw))) + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + buf := make([]byte, len(raw)) + var err error + for pb.Next() { + buf, err = dec.DecodeAll(encoded, buf[:0]) + if err != nil { + b.Error(err) + } + } + }) + b.ReportMetric(100*float64(len(encoded))/float64(len(raw)), "pct") + }) } }) - } + return nil + }) } -func BenchmarkDecoder_DecodeAllParallelCgo(b *testing.B) { +func BenchmarkDecoder_DecodeAllParallel(b *testing.B) { fn := "testdata/benchdecoder.zip" data, err := ioutil.ReadFile(fn) if err != nil { @@ -1231,6 +1383,12 @@ func BenchmarkDecoder_DecodeAllParallelCgo(b *testing.B) { if err != nil { b.Fatal(err) } + dec, err := NewReader(nil, WithDecoderConcurrency(runtime.GOMAXPROCS(0))) + if err != nil { + b.Fatal(err) + return + } + defer dec.Close() for _, tt := range zr.File { if !strings.HasSuffix(tt.Name, ".zst") { continue @@ -1245,7 +1403,7 @@ func BenchmarkDecoder_DecodeAllParallelCgo(b *testing.B) { if err != nil { b.Fatal(err) } - got, err := zstd.Decompress(nil, in) + got, err := dec.DecodeAll(in, nil) if err != nil { b.Fatal(err) } @@ -1255,7 +1413,7 @@ func BenchmarkDecoder_DecodeAllParallelCgo(b *testing.B) { b.RunParallel(func(pb *testing.PB) { got := make([]byte, len(got)) for pb.Next() { - got, err = zstd.Decompress(got, in) + _, err = dec.DecodeAll(in, got[:0]) if err != nil { b.Fatal(err) } @@ -1265,63 +1423,6 @@ func BenchmarkDecoder_DecodeAllParallelCgo(b *testing.B) { } } -func BenchmarkDecoderSilesiaCgo(b *testing.B) { - fn := "testdata/silesia.tar.zst" - data, err := ioutil.ReadFile(fn) - if err != nil { - if os.IsNotExist(err) { - b.Skip("Missing testdata/silesia.tar.zst") - return - } - b.Fatal(err) - } - dec := zstd.NewReader(bytes.NewBuffer(data)) - n, err := io.Copy(ioutil.Discard, dec) - if err != nil { - b.Fatal(err) - } - - b.SetBytes(n) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - dec := zstd.NewReader(bytes.NewBuffer(data)) - _, err := io.CopyN(ioutil.Discard, dec, n) - if err != nil { - b.Fatal(err) - } - } -} -func BenchmarkDecoderEnwik9Cgo(b *testing.B) { - fn := "testdata/enwik9-1.zst" - data, err := ioutil.ReadFile(fn) - if err != nil { - if os.IsNotExist(err) { - b.Skip("Missing " + fn) - return - } - b.Fatal(err) - } - dec := zstd.NewReader(bytes.NewBuffer(data)) - n, err := io.Copy(ioutil.Discard, dec) - if err != nil { - b.Fatal(err) - } - - b.SetBytes(n) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - dec := zstd.NewReader(bytes.NewBuffer(data)) - _, err := io.CopyN(ioutil.Discard, dec, n) - if err != nil { - b.Fatal(err) - } - } -} - -*/ - func BenchmarkDecoderSilesia(b *testing.B) { fn := "testdata/silesia.tar.zst" data, err := ioutil.ReadFile(fn) @@ -1362,7 +1463,7 @@ func BenchmarkDecoderSilesia(b *testing.B) { } func BenchmarkDecoderEnwik9(b *testing.B) { - fn := "testdata/enwik9-1.zst" + fn := "testdata/enwik8.zst" data, err := ioutil.ReadFile(fn) if err != nil { if os.IsNotExist(err) { @@ -1476,7 +1577,7 @@ func testDecoderDecodeAll(t *testing.T, fn string, dec *Decoder) { }() } -func testDecoderDecodeAllError(t *testing.T, fn string, dec *Decoder) { +func testDecoderDecodeAllError(t *testing.T, fn string, dec *Decoder, errMap map[string]string) { data, err := ioutil.ReadFile(fn) if err != nil { t.Fatal(err) @@ -1493,9 +1594,8 @@ func testDecoderDecodeAllError(t *testing.T, fn string, dec *Decoder) { continue } wg.Add(1) - t.Run("DecodeAll-"+tt.Name, func(t *testing.T) { + t.Run(tt.Name, func(t *testing.T) { defer wg.Done() - t.Parallel() r, err := tt.Open() if err != nil { t.Fatal(err) @@ -1504,10 +1604,20 @@ func testDecoderDecodeAllError(t *testing.T, fn string, dec *Decoder) { if err != nil { t.Fatal(err) } - // make a buffer that is too small. - _, err = dec.DecodeAll(in, make([]byte, 0, 200)) + // make a buffer that is small. + got, err := dec.DecodeAll(in, make([]byte, 0, 20)) if err == nil { - t.Error("Did not get expected error") + t.Error("Did not get expected error, got", len(got), "bytes") + return + } + if errMap[tt.Name] == "" { + t.Error("cannot check error") + } else { + want := errMap[tt.Name] + if want != err.Error() { + t.Errorf("error mismatch, prev run got %s, now got %s", want, err.Error()) + } + return } }) } @@ -1674,6 +1784,9 @@ func TestResetNil(t *testing.T) { } func timeout(after time.Duration) (cancel func()) { + if isRaceTest { + return func() {} + } c := time.After(after) cc := make(chan struct{}) go func() { diff --git a/zstd/dict_test.go b/zstd/dict_test.go index bbc6b8d343..424107052a 100644 --- a/zstd/dict_test.go +++ b/zstd/dict_test.go @@ -411,3 +411,69 @@ func TestDecoder_MoreDicts(t *testing.T) { }) } } + +func TestDecoder_MoreDicts2(t *testing.T) { + // All files have CRC + // https://files.klauspost.com/compress/zstd-dict-tests.zip + fn := "testdata/zstd-dict-tests.zip" + data, err := ioutil.ReadFile(fn) + if err != nil { + t.Skip("extended dict test not found.") + } + zr, err := zip.NewReader(bytes.NewReader(data), int64(len(data))) + if err != nil { + t.Fatal(err) + } + + var dicts [][]byte + for _, tt := range zr.File { + if !strings.HasSuffix(tt.Name, ".dict") { + continue + } + func() { + r, err := tt.Open() + if err != nil { + t.Fatal(err) + } + defer r.Close() + in, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + dicts = append(dicts, in) + }() + } + dec, err := NewReader(nil, WithDecoderConcurrency(2), WithDecoderDicts(dicts...)) + if err != nil { + t.Fatal(err) + return + } + defer dec.Close() + for i, tt := range zr.File { + if !strings.HasSuffix(tt.Name, ".zst") { + continue + } + if testing.Short() && i > 50 { + continue + } + t.Run("decodeall-"+tt.Name, func(t *testing.T) { + r, err := tt.Open() + if err != nil { + t.Fatal(err) + } + defer r.Close() + in, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + got, err := dec.DecodeAll(in, nil) + if err != nil { + t.Fatal(err) + } + _, err = dec.DecodeAll(in, got[:0]) + if err != nil { + t.Fatal(err) + } + }) + } +} diff --git a/zstd/encoder_test.go b/zstd/encoder_test.go index b437225e4f..12bf207a15 100644 --- a/zstd/encoder_test.go +++ b/zstd/encoder_test.go @@ -497,8 +497,8 @@ func TestEncoder_EncoderHTML(t *testing.T) { } func TestEncoder_EncoderEnwik9(t *testing.T) { - testEncoderRoundtrip(t, "./testdata/enwik9.zst", []byte{0x28, 0xfa, 0xf4, 0x30, 0xca, 0x4b, 0x64, 0x12}) - testEncoderRoundtripWriter(t, "./testdata/enwik9.zst", []byte{0x28, 0xfa, 0xf4, 0x30, 0xca, 0x4b, 0x64, 0x12}) + //testEncoderRoundtrip(t, "./testdata/enwik9.zst", []byte{0x28, 0xfa, 0xf4, 0x30, 0xca, 0x4b, 0x64, 0x12}) + //testEncoderRoundtripWriter(t, "./testdata/enwik9.zst", []byte{0x28, 0xfa, 0xf4, 0x30, 0xca, 0x4b, 0x64, 0x12}) } // test roundtrip using io.ReaderFrom interface. diff --git a/zstd/framedec.go b/zstd/framedec.go index 989c79f8c3..29c3176b05 100644 --- a/zstd/framedec.go +++ b/zstd/framedec.go @@ -8,23 +8,17 @@ import ( "bytes" "encoding/hex" "errors" - "hash" "io" - "sync" "github.com/klauspost/compress/zstd/internal/xxhash" ) type frameDec struct { - o decoderOptions - crc hash.Hash64 - offset int64 + o decoderOptions + crc *xxhash.Digest WindowSize uint64 - // In order queue of blocks being decoded. - decoding chan *blockDec - // Frame history passed between blocks history history @@ -34,15 +28,10 @@ type frameDec struct { bBuf byteBuf FrameContentSize uint64 - frameDone sync.WaitGroup DictionaryID *uint32 HasCheckSum bool SingleSegment bool - - // asyncRunning indicates whether the async routine processes input on 'decoding'. - asyncRunningMu sync.Mutex - asyncRunning bool } const ( @@ -229,9 +218,10 @@ func (d *frameDec) reset(br byteBuffer) error { d.FrameContentSize = uint64(d1) | (uint64(d2) << 32) } if debugDecoder { - println("field size bits:", v, "fcsSize:", fcsSize, "FrameContentSize:", d.FrameContentSize, hex.EncodeToString(b[:fcsSize]), "singleseg:", d.SingleSegment, "window:", d.WindowSize) + println("Read FCS:", d.FrameContentSize) } } + // Move this to shared. d.HasCheckSum = fhd&(1<<2) != 0 if d.HasCheckSum { @@ -264,10 +254,16 @@ func (d *frameDec) reset(br byteBuffer) error { } d.history.windowSize = int(d.WindowSize) if d.o.lowMem && d.history.windowSize < maxBlockSize { - d.history.maxSize = d.history.windowSize * 2 + d.history.allocFrameBuffer = d.history.windowSize * 2 + // TODO: Maybe use FrameContent size } else { - d.history.maxSize = d.history.windowSize + maxBlockSize + d.history.allocFrameBuffer = d.history.windowSize + maxBlockSize } + + if debugDecoder { + println("Frame: Dict:", d.DictionaryID, "FrameContentSize:", d.FrameContentSize, "singleseg:", d.SingleSegment, "window:", d.WindowSize, "crc:", d.HasCheckSum) + } + // history contains input - maybe we do something d.rawInput = br return nil @@ -276,49 +272,18 @@ func (d *frameDec) reset(br byteBuffer) error { // next will start decoding the next block from stream. func (d *frameDec) next(block *blockDec) error { if debugDecoder { - printf("decoding new block %p:%p", block, block.data) + println("decoding new block") } err := block.reset(d.rawInput, d.WindowSize) if err != nil { println("block error:", err) // Signal the frame decoder we have a problem. - d.sendErr(block, err) + block.sendErr(err) return err } - block.input <- struct{}{} - if debugDecoder { - println("next block:", block) - } - d.asyncRunningMu.Lock() - defer d.asyncRunningMu.Unlock() - if !d.asyncRunning { - return nil - } - if block.Last { - // We indicate the frame is done by sending io.EOF - d.decoding <- block - return io.EOF - } - d.decoding <- block return nil } -// sendEOF will queue an error block on the frame. -// This will cause the frame decoder to return when it encounters the block. -// Returns true if the decoder was added. -func (d *frameDec) sendErr(block *blockDec, err error) bool { - d.asyncRunningMu.Lock() - defer d.asyncRunningMu.Unlock() - if !d.asyncRunning { - return false - } - - println("sending error", err.Error()) - block.sendErr(err) - d.decoding <- block - return true -} - // checkCRC will check the checksum if the frame has one. // Will return ErrCRCMismatch if crc check failed, otherwise nil. func (d *frameDec) checkCRC() error { @@ -340,7 +305,7 @@ func (d *frameDec) checkCRC() error { return err } - if !bytes.Equal(tmp[:], want) { + if !bytes.Equal(tmp[:], want) && !ignoreCRC { if debugDecoder { println("CRC Check Failed:", tmp[:], "!=", want) } @@ -352,131 +317,13 @@ func (d *frameDec) checkCRC() error { return nil } -func (d *frameDec) initAsync() { - if !d.o.lowMem && !d.SingleSegment { - // set max extra size history to 2MB. - d.history.maxSize = d.history.windowSize + maxBlockSize - } - // re-alloc if more than one extra block size. - if d.o.lowMem && cap(d.history.b) > d.history.maxSize+maxBlockSize { - d.history.b = make([]byte, 0, d.history.maxSize) - } - if cap(d.history.b) < d.history.maxSize { - d.history.b = make([]byte, 0, d.history.maxSize) - } - if cap(d.decoding) < d.o.concurrent { - d.decoding = make(chan *blockDec, d.o.concurrent) - } - if debugDecoder { - h := d.history - printf("history init. len: %d, cap: %d", len(h.b), cap(h.b)) - } - d.asyncRunningMu.Lock() - d.asyncRunning = true - d.asyncRunningMu.Unlock() -} - -// startDecoder will start decoding blocks and write them to the writer. -// The decoder will stop as soon as an error occurs or at end of frame. -// When the frame has finished decoding the *bufio.Reader -// containing the remaining input will be sent on frameDec.frameDone. -func (d *frameDec) startDecoder(output chan decodeOutput) { - written := int64(0) - - defer func() { - d.asyncRunningMu.Lock() - d.asyncRunning = false - d.asyncRunningMu.Unlock() - - // Drain the currently decoding. - d.history.error = true - flushdone: - for { - select { - case b := <-d.decoding: - b.history <- &d.history - output <- <-b.result - default: - break flushdone - } - } - println("frame decoder done, signalling done") - d.frameDone.Done() - }() - // Get decoder for first block. - block := <-d.decoding - block.history <- &d.history - for { - var next *blockDec - // Get result - r := <-block.result - if r.err != nil { - println("Result contained error", r.err) - output <- r - return - } - if debugDecoder { - println("got result, from ", d.offset, "to", d.offset+int64(len(r.b))) - d.offset += int64(len(r.b)) - } - if !block.Last { - // Send history to next block - select { - case next = <-d.decoding: - if debugDecoder { - println("Sending ", len(d.history.b), "bytes as history") - } - next.history <- &d.history - default: - // Wait until we have sent the block, so - // other decoders can potentially get the decoder. - next = nil - } - } - - // Add checksum, async to decoding. - if d.HasCheckSum { - n, err := d.crc.Write(r.b) - if err != nil { - r.err = err - if n != len(r.b) { - r.err = io.ErrShortWrite - } - output <- r - return - } - } - written += int64(len(r.b)) - if d.SingleSegment && uint64(written) > d.FrameContentSize { - println("runDecoder: single segment and", uint64(written), ">", d.FrameContentSize) - r.err = ErrFrameSizeExceeded - output <- r - return - } - if block.Last { - r.err = d.checkCRC() - output <- r - return - } - output <- r - if next == nil { - // There was no decoder available, we wait for one now that we have sent to the writer. - if debugDecoder { - println("Sending ", len(d.history.b), " bytes as history") - } - next = <-d.decoding - next.history <- &d.history - } - block = next - } -} - // runDecoder will create a sync decoder that will decode a block of data. func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) { saved := d.history.b // We use the history for output to avoid copying it. d.history.b = dst + d.history.ignoreBuffer = len(dst) // Store input length, so we only check new data. crcStart := len(dst) var err error @@ -489,7 +336,7 @@ func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) { println("next block:", dec) } err = dec.decodeBuf(&d.history) - if err != nil || dec.Last { + if err != nil { break } if uint64(len(d.history.b)) > d.o.maxDecodedSize { @@ -501,10 +348,23 @@ func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) { err = ErrFrameSizeExceeded break } + if d.FrameContentSize > 0 && uint64(len(d.history.b)-crcStart) > d.FrameContentSize { + println("runDecoder: FrameContentSize exceeded", uint64(len(d.history.b)-crcStart), ">", d.FrameContentSize) + err = ErrFrameSizeExceeded + break + } + if dec.Last { + break + } + if debugDecoder && d.FrameContentSize > 0 { + println("runDecoder: FrameContentSize", uint64(len(d.history.b)-crcStart), "<=", d.FrameContentSize) + } } dst = d.history.b if err == nil { - if d.HasCheckSum { + if d.FrameContentSize > 0 && uint64(len(d.history.b)-crcStart) != d.FrameContentSize { + err = ErrFrameSizeMismatch + } else if d.HasCheckSum { var n int n, err = d.crc.Write(dst[crcStart:]) if err == nil { diff --git a/zstd/fuzz.go b/zstd/fuzz.go new file mode 100644 index 0000000000..fda8a74228 --- /dev/null +++ b/zstd/fuzz.go @@ -0,0 +1,11 @@ +//go:build gofuzz +// +build gofuzz + +// Copyright 2019+ Klaus Post. All rights reserved. +// License information can be found in the LICENSE file. +// Based on work by Yann Collet, released under BSD License. + +package zstd + +// ignoreCRC can be used for fuzz testing to ignore CRC values... +const ignoreCRC = true diff --git a/zstd/fuzz_none.go b/zstd/fuzz_none.go new file mode 100644 index 0000000000..0515b201cc --- /dev/null +++ b/zstd/fuzz_none.go @@ -0,0 +1,11 @@ +//go:build !gofuzz +// +build !gofuzz + +// Copyright 2019+ Klaus Post. All rights reserved. +// License information can be found in the LICENSE file. +// Based on work by Yann Collet, released under BSD License. + +package zstd + +// ignoreCRC can be used for fuzz testing to ignore CRC values... +const ignoreCRC = false diff --git a/zstd/history.go b/zstd/history.go index f783e32d25..28b40153cc 100644 --- a/zstd/history.go +++ b/zstd/history.go @@ -10,20 +10,31 @@ import ( // history contains the information transferred between blocks. type history struct { - b []byte - huffTree *huff0.Scratch - recentOffsets [3]int + // Literal decompression + huffTree *huff0.Scratch + + // Sequence decompression decoders sequenceDecs - windowSize int - maxSize int - error bool - dict *dict + recentOffsets [3]int + + // History buffer... + b []byte + + // ignoreBuffer is meant to ignore a number of bytes + // when checking for matches in history + ignoreBuffer int + + windowSize int + allocFrameBuffer int // needed? + error bool + dict *dict } // reset will reset the history to initial state of a frame. // The history must already have been initialized to the desired size. func (h *history) reset() { h.b = h.b[:0] + h.ignoreBuffer = 0 h.error = false h.recentOffsets = [3]int{1, 4, 8} if f := h.decoders.litLengths.fse; f != nil && !f.preDefined { @@ -35,7 +46,7 @@ func (h *history) reset() { if f := h.decoders.matchLengths.fse; f != nil && !f.preDefined { fseDecoderPool.Put(f) } - h.decoders = sequenceDecs{} + h.decoders = sequenceDecs{br: h.decoders.br} if h.huffTree != nil { if h.dict == nil || h.dict.litEnc != h.huffTree { huffDecoderPool.Put(h.huffTree) @@ -54,6 +65,7 @@ func (h *history) setDict(dict *dict) { h.decoders.litLengths = dict.llDec h.decoders.offsets = dict.ofDec h.decoders.matchLengths = dict.mlDec + h.decoders.dict = dict.content h.recentOffsets = dict.offsets h.huffTree = dict.litEnc } @@ -83,6 +95,24 @@ func (h *history) append(b []byte) { copy(h.b[h.windowSize-len(b):], b) } +// ensureBlock will ensure there is space for at least one block... +func (h *history) ensureBlock() { + if cap(h.b) < h.allocFrameBuffer { + h.b = make([]byte, 0, h.allocFrameBuffer) + return + } + + avail := cap(h.b) - len(h.b) + if avail >= h.windowSize || avail > maxCompressedBlockSize { + return + } + // Move data down so we only have window size left. + // We know we have less than window size in b at this point. + discard := len(h.b) - h.windowSize + copy(h.b, h.b[discard:]) + h.b = h.b[:h.windowSize] +} + // append bytes to history without ever discarding anything. func (h *history) appendKeep(b []byte) { h.b = append(h.b, b...) diff --git a/zstd/seqdec.go b/zstd/seqdec.go index bc731e4cb6..e367281465 100644 --- a/zstd/seqdec.go +++ b/zstd/seqdec.go @@ -20,6 +20,10 @@ type seq struct { llCode, mlCode, ofCode uint8 } +type seqVals struct { + ll, ml, mo int +} + func (s seq) String() string { if s.offset <= 3 { if s.offset == 0 { @@ -61,16 +65,18 @@ type sequenceDecs struct { offsets sequenceDec matchLengths sequenceDec prevOffset [3]int - hist []byte dict []byte literals []byte out []byte + nSeqs int + br *bitReader + seqSize int windowSize int maxBits uint8 } // initialize all 3 decoders from the stream input. -func (s *sequenceDecs) initialize(br *bitReader, hist *history, literals, out []byte) error { +func (s *sequenceDecs) initialize(br *bitReader, hist *history, out []byte) error { if err := s.litLengths.init(br); err != nil { return errors.New("litLengths:" + err.Error()) } @@ -80,8 +86,7 @@ func (s *sequenceDecs) initialize(br *bitReader, hist *history, literals, out [] if err := s.matchLengths.init(br); err != nil { return errors.New("matchLengths:" + err.Error()) } - s.literals = literals - s.hist = hist.b + s.br = br s.prevOffset = hist.recentOffsets s.maxBits = s.litLengths.fse.maxBits + s.offsets.fse.maxBits + s.matchLengths.fse.maxBits s.windowSize = hist.windowSize @@ -94,11 +99,238 @@ func (s *sequenceDecs) initialize(br *bitReader, hist *history, literals, out [] } // decode sequences from the stream with the provided history. -func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error { +func (s *sequenceDecs) decode(seqs []seqVals) error { + br := s.br + + // Grab full sizes tables, to avoid bounds checks. + llTable, mlTable, ofTable := s.litLengths.fse.dt[:maxTablesize], s.matchLengths.fse.dt[:maxTablesize], s.offsets.fse.dt[:maxTablesize] + llState, mlState, ofState := s.litLengths.state.state, s.matchLengths.state.state, s.offsets.state.state + s.seqSize = 0 + litRemain := len(s.literals) + + for i := range seqs { + var ll, mo, ml int + if br.off > 4+((maxOffsetBits+16+16)>>3) { + // inlined function: + // ll, mo, ml = s.nextFast(br, llState, mlState, ofState) + + // Final will not read from stream. + var llB, mlB, moB uint8 + ll, llB = llState.final() + ml, mlB = mlState.final() + mo, moB = ofState.final() + + // extra bits are stored in reverse order. + br.fillFast() + mo += br.getBits(moB) + if s.maxBits > 32 { + br.fillFast() + } + ml += br.getBits(mlB) + ll += br.getBits(llB) + + if moB > 1 { + s.prevOffset[2] = s.prevOffset[1] + s.prevOffset[1] = s.prevOffset[0] + s.prevOffset[0] = mo + } else { + // mo = s.adjustOffset(mo, ll, moB) + // Inlined for rather big speedup + if ll == 0 { + // There is an exception though, when current sequence's literals_length = 0. + // In this case, repeated offsets are shifted by one, so an offset_value of 1 means Repeated_Offset2, + // an offset_value of 2 means Repeated_Offset3, and an offset_value of 3 means Repeated_Offset1 - 1_byte. + mo++ + } + + if mo == 0 { + mo = s.prevOffset[0] + } else { + var temp int + if mo == 3 { + temp = s.prevOffset[0] - 1 + } else { + temp = s.prevOffset[mo] + } + + if temp == 0 { + // 0 is not valid; input is corrupted; force offset to 1 + println("WARNING: temp was 0") + temp = 1 + } + + if mo != 1 { + s.prevOffset[2] = s.prevOffset[1] + } + s.prevOffset[1] = s.prevOffset[0] + s.prevOffset[0] = temp + mo = temp + } + } + br.fillFast() + } else { + if br.overread() { + printf("reading sequence %d, exceeded available data\n", i) + return io.ErrUnexpectedEOF + } + ll, mo, ml = s.next(br, llState, mlState, ofState) + br.fill() + } + + if debugSequences { + println("Seq", i, "Litlen:", ll, "mo:", mo, "(abs) ml:", ml) + } + // Evaluate. + // We might be doing this async, so do it early. + if mo == 0 && ml > 0 { + return fmt.Errorf("zero matchoff and matchlen (%d) > 0", ml) + } + if ml > maxMatchLen { + return fmt.Errorf("match len (%d) bigger than max allowed length", ml) + } + s.seqSize += ll + ml + if s.seqSize > maxBlockSize { + return fmt.Errorf("output (%d) bigger than max block size", s.seqSize) + } + litRemain -= ll + if litRemain < 0 { + return fmt.Errorf("unexpected literal count, want %d bytes, but only %d is available", ll, litRemain+ll) + } + seqs[i] = seqVals{ + ll: ll, + ml: ml, + mo: mo, + } + if i == len(seqs)-1 { + // This is the last sequence, so we shouldn't update state. + break + } + + // Manually inlined, ~ 5-20% faster + // Update all 3 states at once. Approx 20% faster. + nBits := llState.nbBits() + mlState.nbBits() + ofState.nbBits() + if nBits == 0 { + llState = llTable[llState.newState()&maxTableMask] + mlState = mlTable[mlState.newState()&maxTableMask] + ofState = ofTable[ofState.newState()&maxTableMask] + } else { + bits := br.get32BitsFast(nBits) + lowBits := uint16(bits >> ((ofState.nbBits() + mlState.nbBits()) & 31)) + llState = llTable[(llState.newState()+lowBits)&maxTableMask] + + lowBits = uint16(bits >> (ofState.nbBits() & 31)) + lowBits &= bitMask[mlState.nbBits()&15] + mlState = mlTable[(mlState.newState()+lowBits)&maxTableMask] + + lowBits = uint16(bits) & bitMask[ofState.nbBits()&15] + ofState = ofTable[(ofState.newState()+lowBits)&maxTableMask] + } + } + s.seqSize += litRemain + if s.seqSize > maxBlockSize { + return fmt.Errorf("output (%d) bigger than max block size", s.seqSize) + } + err := br.close() + if err != nil { + printf("Closing sequences: %v, %+v\n", err, *br) + } + return err +} + +// execute will execute the decoded sequence with the provided history. +// The sequence must be evaluated before being sent. +func (s *sequenceDecs) execute(seqs []seqVals, hist []byte) error { + // Ensure we have enough output size... + if len(s.out)+s.seqSize > cap(s.out) { + addBytes := s.seqSize + len(s.out) + s.out = append(s.out, make([]byte, addBytes)...) + s.out = s.out[:len(s.out)-addBytes] + } + + if debugDecoder { + printf("Execute %d seqs with hist %d, dict %d, literals: %d bytes\n", len(seqs), len(hist), len(s.dict), len(s.literals)) + } + + for _, seq := range seqs { + // Add literals + s.out = append(s.out, s.literals[:seq.ll]...) + s.literals = s.literals[seq.ll:] + out := s.out + + // Copy form dictionary... + if seq.mo > len(s.out)+len(hist) || seq.mo > s.windowSize { + if len(s.dict) == 0 { + return fmt.Errorf("match offset (%d) bigger than current history (%d)", seq.mo, len(s.out)+len(hist)) + } + + // we may be in dictionary. + dictO := len(s.dict) - (seq.mo - (len(s.out) + len(hist))) + if dictO < 0 || dictO >= len(s.dict) { + return fmt.Errorf("match offset (%d) bigger than current history+dict (%d)", seq.mo, len(s.out)+len(hist)+len(s.dict)) + } + end := dictO + seq.ml + if end > len(s.dict) { + out = append(out, s.dict[dictO:]...) + seq.mo -= len(s.dict) - dictO + seq.ml -= len(s.dict) - dictO + } else { + s.out = append(out, s.dict[dictO:end]...) + continue + } + } + + // Copy from history. + if v := seq.mo - len(s.out); v > 0 { + // v is the start position in history from end. + start := len(hist) - v + if seq.ml > v { + // Some goes into current block. + // Copy remainder of history + out = append(out, hist[start:]...) + seq.mo -= v + seq.ml -= v + } else { + s.out = append(out, hist[start:start+seq.ml]...) + continue + } + } + // We must be in current buffer now + if seq.ml > 0 { + start := len(s.out) - seq.mo + if seq.ml <= len(s.out)-start { + // No overlap + s.out = append(out, s.out[start:start+seq.ml]...) + continue + } else { + // Overlapping copy + // Extend destination slice and copy one byte at the time. + out = out[:len(out)+seq.ml] + src := out[start : start+seq.ml] + // Destination is the space we just added. + dst := out[len(out)-seq.ml:] + dst = dst[:len(src)] + for i := range src { + dst[i] = src[i] + } + } + } + s.out = out + } + // Add final literals + s.out = append(s.out, s.literals...) + + return nil +} + +// decode sequences from the stream with the provided history. +func (s *sequenceDecs) decodeSync(history *history) error { + br := s.br + seqs := s.nSeqs startSize := len(s.out) // Grab full sizes tables, to avoid bounds checks. llTable, mlTable, ofTable := s.litLengths.fse.dt[:maxTablesize], s.matchLengths.fse.dt[:maxTablesize], s.offsets.fse.dt[:maxTablesize] llState, mlState, ofState := s.litLengths.state.state, s.matchLengths.state.state, s.offsets.state.state + hist := history.b[history.ignoreBuffer:] for i := seqs - 1; i >= 0; i-- { if br.overread() { @@ -151,7 +383,7 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error { if temp == 0 { // 0 is not valid; input is corrupted; force offset to 1 - println("temp was 0") + println("WARNING: temp was 0") temp = 1 } @@ -233,15 +465,15 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error { // TODO: Blocks without history could be made to ignore this completely. if v := mo - len(s.out); v > 0 { // v is the start position in history from end. - start := len(s.hist) - v + start := len(hist) - v if ml > v { // Some goes into current block. // Copy remainder of history - out = append(out, s.hist[start:]...) + out = append(out, hist[start:]...) mo -= v ml -= v } else { - out = append(out, s.hist[start:start+ml]...) + out = append(out, hist[start:start+ml]...) ml = 0 } } @@ -293,7 +525,7 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error { // Add final literals s.out = append(s.out, s.literals...) - return nil + return br.close() } // update states, at least 27 bits must be available. @@ -457,36 +689,3 @@ func (s *sequenceDecs) adjustOffset(offset, litLen int, offsetB uint8) int { s.prevOffset[0] = temp return temp } - -// mergeHistory will merge history. -func (s *sequenceDecs) mergeHistory(hist *sequenceDecs) (*sequenceDecs, error) { - for i := uint(0); i < 3; i++ { - var sNew, sHist *sequenceDec - switch i { - default: - // same as "case 0": - sNew = &s.litLengths - sHist = &hist.litLengths - case 1: - sNew = &s.offsets - sHist = &hist.offsets - case 2: - sNew = &s.matchLengths - sHist = &hist.matchLengths - } - if sNew.repeat { - if sHist.fse == nil { - return nil, fmt.Errorf("sequence stream %d, repeat requested, but no history", i) - } - continue - } - if sNew.fse == nil { - return nil, fmt.Errorf("sequence stream %d, no fse found", i) - } - if sHist.fse != nil && !sHist.fse.preDefined { - fseDecoderPool.Put(sHist.fse) - } - sHist.fse = sNew.fse - } - return hist, nil -} diff --git a/zstd/testdata/bad.zip b/zstd/testdata/bad.zip index 32ae49986601a8db1eec2bcc50da7a1d6b3545e1..2c283f197e6a1a9baf975fd49e65d32b1ca1b567 100644 GIT binary patch literal 6116 zcmb7I4OCQB9)BaCK#2q>pq9_|TM^&)2e>+@l&O{0jGDR1eP1vKm?36_qT>eIW*gbZ zW}z9bd35cA+GE+LwUMG?$6CYckg1uumZoWH>FHSRz4Lf;-#lOEG{^Hl3=e-F_kaI) z=Kf~(%%tQTq8C9BknU}%7CR*h1AN$!BB+`G`+Su5Do`|hy_Odjt3*v;n99W zRkZl={^5D?>UG~gaBu(wqC%~Y5rmgiM_-fo+*1L<6(sCLHHfGc4g>v;pM2XgfEsAW z$1NJkdwOwmQFl70#2Ept}&k#f% z#cY^I=#hDoUrT``sL&*sHZW1FDe;Y1dg8=R`^mFl@h8tzcRhK(blo&J8S0_JlVBZP zOEq4!ZNK=KC5l>SiNd{)U#~d>j+GAi;@%eP*!7D?TXMsbdj>CFx42_y83_@n5QC{( ztB329Km^Y^37_b-b-D(3J3`ijFd5dJ2QOEhzs;IIJU~5EmW=|-#?|wG+sN7_*Fy`a zFg{!vsBydlphq=8snSVv6VSVZqapR_hkk&AMWDhHXyZbD7LUCp5A9KWM<%{};o9G| zf#%-Y52p?EH;0YXwX?dd_u#8+p#P6;U=z8owA@-b`P3R~915UCjRCUAF*GGghAyfC z&v1qzDjdrzhQvvdz#2eS1=VWvB8ng=3aKw7#01VHl=0}O@&l+NokZ%`to3Y?(un#MD-DlxKR zEyZ$95NFH$zQ?j&0R}763=j-ff$iZHRaXUFrWM+-*d7>M zd}uJ((YV;tomOz%;!Y)wLhkyF5 znxKYJ6os8G2>5-U{>xa4-omqcNJUYHldA&28k$ZsiY!Yi!>f!&3#tq`HBpyXRgLa_ z^z&i8XSpY*f~E5CDNQAA4ElZAxT7?hUT+Vx4pbgcj4Eq_&T>EyRguzwLh*t@0iHE@ z9dPhvY_Bdp)Rdo;ycd)mp^og!obNwP{#Pk$UtLyKbEM?0^rRiLl6og4^(K3JGRWom zA9`#%kY@OOkDtw0HVi6H?@<>iO421t0=lHiltBZQRWuE@2k;b2OB$=n96V349cXE4 z`u^A7-dp5tOX;}NTaeN*$6J-s;rH%s%&kbGZDIo+n(Om&WM5_(j3Ke z0;MRtXc$n2qC{DTIua#vk}jx}AdA*^+YW1E$FP~VF1eA7DG)?x>s;UPs*8u(;KdUa zQh=SLIXIVs%CQoVBw7Qy3>1N;01!n<;dz6DPiX5wkF8-^)J8U%0aOlN+A%?yes_ZT!wNC6!;7ZkW2#Ppmju(^~uC zpqs0x=RX;)Wj*zTqD$o7&Q(S0`~73Y;Ss^-FYGyXWZ{CBYMRn#oZOMOe9VT`Kc8J& zIO0FKCkF5DIFY_?=<)g2^na#REPVLw@2Gg`xVNxrTyyU)!GN-#K7Jv)Vd%Hv z`De>A9z56g$@XEFijqFs_j*qHLygO_Zrb9{IdH?{87Vmv&i;AW!lN@5Byanq=GV_F zC(dtZf1)7MdnAbn^Q(pgPmX$b-3`U*FMn70SGDeb@3O@=UfwzKn{&rM3!m83`RUy& z9v*dg+W$&6&Tr0tf9#^Io6?$3f0Z@mTl&DER}VKS^TInyFAX>^uO5}Zmj3pR^V=tl zeE@Uh@_`}I*V;Q*Y3zmqlV)T#3+%_|=$Djc)$qvs^4jeX(ZVW$0-)aI^|O%sn_uB-1} zr;lmb9C`EfEaUUrA6PfHmg-E^HX3t>&&A7f}7mYmwo zaD~vOwhS*CZc)q0;LKPEZ|X-NgL(b55|$Y}h!a`_M`GS_wq9qB!pi4ja9a{`sym|-R-Xiew7F^< z+|7YALSYB@Iw?>HayoFPChTCR8{Fx@Ek9w3-ynsSK$^nl+@lj_{tHZci3BOk48P}a z2}-zYdmwWnvB}Qt7Mif<+&+m#D9r30nXu#o07QZuU;qFB diff --git a/zstd/testdata/good.zip b/zstd/testdata/good.zip index 1b4c16cf8ec44946ca9f02cae9674e00a8fdda64..f6c230bc5d5b3e9d2d4ff61de33f30f411c2bc1a 100644 GIT binary patch delta 138 zcmX>lw?J&eV&=)5Y(kTjSVA}7W?90x*@;7kd9oCr5F_X0MsD%RJUqHg$CxKu^GHrU z#-qTvOcjs$S-MWG zCUWc7+!LK^Z+K5l>#Tp%BIYip^<~Y*pe+v*mJGtfdW$o;^_QPx4yFD!O+r2o=Y-*?ewMn~naAp6ZhB?a_PSx|SSg`^Y z_7G2fn0{f~QlMuR0I?2+r|NP03KU|D3mD@6gW@O5fPullz<`B;Ja6Lkr3;zf1ST<{ zPZ=2)Dg*v6*t~{Sk69Q?%AEX`L)-?GDg(S3nM4?HCrDtp0D%G!pJIk38j!xpXE^kj zI9MkC;V`WSB|&I%N-PHj4#H6E$rNNHBiPVX#Ku@wDZ4kH2Le@6fj#H2E z^5k4jBgSu&7jT*}X|Ybe0i+^XCkt>{G0kF~?89Zwbewf^E0;ABBirOdTxRvUYzz!( zMTxno@x|Gh1?XLx@I;3&HV{~B5Nj;N!K^@sFfhmf;TIsw Ifdj+?0IWn~00000 diff --git a/zstd/zstd.go b/zstd/zstd.go index ef1d49a009..0b0c2571dd 100644 --- a/zstd/zstd.go +++ b/zstd/zstd.go @@ -75,6 +75,10 @@ var ( // This is only returned if SingleSegment is specified on the frame. ErrFrameSizeExceeded = errors.New("frame size exceeded") + // ErrFrameSizeMismatch is returned if the stated frame size does not match the expected size. + // This is only returned if SingleSegment is specified on the frame. + ErrFrameSizeMismatch = errors.New("frame size does not match size on stream") + // ErrCRCMismatch is returned if CRC mismatches. ErrCRCMismatch = errors.New("CRC check failed")