From ddc97f99eb33a236f14a303e0fa688919d55e33b Mon Sep 17 00:00:00 2001 From: Max-Cheng Date: Sat, 20 Jan 2024 21:57:38 +0800 Subject: [PATCH 1/8] feat:support zstd compress and uncompressed --- fs.go | 46 ++++++++++++++- http.go | 70 +++++++++++++++++++++++ server.go | 4 ++ strings.go | 1 + zstd.go | 158 +++++++++++++++++++++++++++++++++++++++++++++++++++ zstd_test.go | 102 +++++++++++++++++++++++++++++++++ 6 files changed, 378 insertions(+), 3 deletions(-) create mode 100644 zstd.go create mode 100644 zstd_test.go diff --git a/fs.go b/fs.go index 57260d1bf2..d9488f14fb 100644 --- a/fs.go +++ b/fs.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "github.com/klauspost/compress/zstd" "html" "io" "io/fs" @@ -370,6 +371,7 @@ const FSCompressedFileSuffix = ".fasthttp.gz" var FSCompressedFileSuffixes = map[string]string{ "gzip": ".fasthttp.gz", "br": ".fasthttp.br", + "zstd": ".fasthttp.zst", } // FSHandlerCacheDuration is the default expiration duration for inactive @@ -460,7 +462,9 @@ func (fs *FS) initRequestHandler() { compressedFileSuffixes := fs.CompressedFileSuffixes if compressedFileSuffixes["br"] == "" || compressedFileSuffixes["gzip"] == "" || - compressedFileSuffixes["br"] == compressedFileSuffixes["gzip"] { + compressedFileSuffixes["zstd"] == "" || compressedFileSuffixes["br"] == compressedFileSuffixes["gzip"] || + compressedFileSuffixes["br"] == compressedFileSuffixes["zstd"] || + compressedFileSuffixes["gzip"] == compressedFileSuffixes["zstd"] { // Copy global map compressedFileSuffixes = make(map[string]string, len(FSCompressedFileSuffixes)) for k, v := range FSCompressedFileSuffixes { @@ -471,6 +475,7 @@ func (fs *FS) initRequestHandler() { if fs.CompressedFileSuffix != "" { compressedFileSuffixes["gzip"] = fs.CompressedFileSuffix compressedFileSuffixes["br"] = FSCompressedFileSuffixes["br"] + compressedFileSuffixes["zstd"] = FSCompressedFileSuffixes["zstd"] } h := &fsHandler{ @@ -794,6 +799,7 @@ const ( defaultCacheKind CacheKind = iota brotliCacheKind gzipCacheKind + zstdCacheKind ) func newCacheManager(fs *FS) cacheManager { @@ -1040,6 +1046,11 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { mustCompress = true fileCacheKind = gzipCacheKind fileEncoding = "gzip" + } else if ctx.Request.Header.HasAcceptEncodingBytes(strZstd) { + mustCompress = true + fileCacheKind = zstdCacheKind + fileEncoding = "zstd" + } } @@ -1101,6 +1112,8 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { hdr.SetContentEncodingBytes(strBr) } else if fileEncoding == "gzip" { hdr.SetContentEncodingBytes(strGzip) + } else if fileEncoding == "zstd" { + hdr.SetContentEncodingBytes(strZstd) } } @@ -1308,6 +1321,8 @@ nestedContinue: zbuf.B = AppendBrotliBytesLevel(zbuf.B, w.B, CompressDefaultCompression) } else if fileEncoding == "gzip" { zbuf.B = AppendGzipBytesLevel(zbuf.B, w.B, CompressDefaultCompression) + } else if fileEncoding == "zstd" { + zbuf.B = AppendZstdBytesLevel(zbuf.B, w.B, CompressZstdDefault) } w = &zbuf } @@ -1420,6 +1435,14 @@ func (h *fsHandler) compressFileNolock( err = err1 } releaseStacklessGzipWriter(zw, CompressDefaultCompression) + } else if fileEncoding == "zstd" { + zw := acquireStacklessZstdWriter(zf, CompressZstdDefault) + _, err = copyZeroAlloc(zw, f) + if err1 := zw.Flush(); err == nil { + err = err1 + } + + releaseStacklessZstdWriter(zw, CompressZstdDefault) } _ = zf.Close() _ = f.Close() @@ -1457,6 +1480,13 @@ func (h *fsHandler) newCompressedFSFileCache(f fs.File, fileInfo fs.FileInfo, fi err = err1 } releaseStacklessGzipWriter(zw, CompressDefaultCompression) + } else if fileEncoding == "zstd" { + zw := acquireStacklessZstdWriter(w, CompressZstdDefault) + _, err = copyZeroAlloc(zw, f) + if err1 := zw.Flush(); err == nil { + err = err1 + } + releaseStacklessZstdWriter(zw, CompressZstdDefault) } defer func() { _ = f.Close() }() @@ -1600,8 +1630,9 @@ func (h *fsHandler) newFSFile(f fs.File, fileInfo fs.FileInfo, compressed bool, func readFileHeader(f io.Reader, compressed bool, fileEncoding string) ([]byte, error) { r := f var ( - br *brotli.Reader - zr *gzip.Reader + br *brotli.Reader + zr *gzip.Reader + zsr *zstd.Decoder ) if compressed { var err error @@ -1615,6 +1646,11 @@ func readFileHeader(f io.Reader, compressed bool, fileEncoding string) ([]byte, return nil, err } r = zr + } else if fileEncoding == "zstd" { + if zsr, err = acquireZstdReader(f); err != nil { + return nil, err + } + r = zsr } } @@ -1639,6 +1675,10 @@ func readFileHeader(f io.Reader, compressed bool, fileEncoding string) ([]byte, releaseGzipReader(zr) } + if zsr != nil { + releaseZstdReader(zsr) + } + return data, err } diff --git a/http.go b/http.go index 74d66cb319..c2395e4778 100644 --- a/http.go +++ b/http.go @@ -528,6 +528,23 @@ func (ctx *RequestCtx) RequestBodyStream() io.Reader { return ctx.Request.bodyStream } +func (req *Request) BodyUnzstd() ([]byte, error) { + return unzstdData(req.Body()) +} + +func (resp *Response) BodyUnzstd() ([]byte, error) { + return unzstdData(resp.Body()) +} + +func unzstdData(p []byte) ([]byte, error) { + var bb bytebufferpool.ByteBuffer + _, err := WriteUnzstd(&bb, p) + if err != nil { + return nil, err + } + return bb.B, nil +} + func inflateData(p []byte) ([]byte, error) { var bb bytebufferpool.ByteBuffer _, err := WriteInflate(&bb, p) @@ -554,6 +571,8 @@ func (req *Request) BodyUncompressed() ([]byte, error) { return req.BodyGunzip() case "br": return req.BodyUnbrotli() + case "zstd": + return req.BodyUnzstd() default: return nil, ErrContentEncodingUnsupported } @@ -574,6 +593,8 @@ func (resp *Response) BodyUncompressed() ([]byte, error) { return resp.BodyGunzip() case "br": return resp.BodyUnbrotli() + case "zstd": + return resp.BodyUnzstd() default: return nil, ErrContentEncodingUnsupported } @@ -1849,6 +1870,55 @@ func (resp *Response) deflateBody(level int) error { return nil } +func (resp *Response) zstdBody(level int) error { + if len(resp.Header.ContentEncoding()) > 0 { + return nil + } + + if !resp.Header.isCompressibleContentType() { + return nil + } + + if resp.bodyStream != nil { + // Reset Content-Length to -1, since it is impossible + // to determine body size beforehand of streamed compression. + // For + resp.Header.SetContentLength(-1) + + // Do not care about memory allocations here, since flate is slow + // and allocates a lot of memory by itself. + bs := resp.bodyStream + resp.bodyStream = NewStreamReader(func(sw *bufio.Writer) { + zw := acquireStacklessZstdWriter(sw, level) + fw := &flushWriter{ + wf: zw, + bw: sw, + } + copyZeroAlloc(fw, bs) + releaseStacklessZstdWriter(zw, level) + if bsc, ok := bs.(io.Closer); ok { + bsc.Close() + } + }) + } else { + bodyBytes := resp.bodyBytes() + if len(bodyBytes) < minCompressLen { + return nil + } + w := responseBodyPool.Get() + w.B = AppendZstdBytesLevel(w.B, bodyBytes, level) + + if resp.body != nil { + responseBodyPool.Put(resp.body) + } + resp.body = w + resp.bodyRaw = nil + } + resp.Header.SetContentEncodingBytes(strZstd) + resp.Header.addVaryBytes(strAcceptEncoding) + return nil +} + // Bodies with sizes smaller than minCompressLen aren't compressed at all. const minCompressLen = 200 diff --git a/server.go b/server.go index e3593cdf87..bdfbcc6d66 100644 --- a/server.go +++ b/server.go @@ -527,6 +527,8 @@ func CompressHandlerLevel(h RequestHandler, level int) RequestHandler { ctx.Response.gzipBody(level) //nolint:errcheck } else if ctx.Request.Header.HasAcceptEncodingBytes(strDeflate) { ctx.Response.deflateBody(level) //nolint:errcheck + } else if ctx.Request.Header.HasAcceptEncodingBytes(strZstd) { + ctx.Response.zstdBody(level) //nolint:errcheck } } } @@ -559,6 +561,8 @@ func CompressHandlerBrotliLevel(h RequestHandler, brotliLevel, otherLevel int) R ctx.Response.gzipBody(otherLevel) //nolint:errcheck case ctx.Request.Header.HasAcceptEncodingBytes(strDeflate): ctx.Response.deflateBody(otherLevel) //nolint:errcheck + case ctx.Request.Header.HasAcceptEncodingBytes(strZstd): + ctx.Response.zstdBody(otherLevel) //nolint:errcheck } } } diff --git a/strings.go b/strings.go index 33746780a0..a9e4072d4d 100644 --- a/strings.go +++ b/strings.go @@ -72,6 +72,7 @@ var ( strClose = []byte("close") strGzip = []byte("gzip") strBr = []byte("br") + strZstd = []byte("zstd") strDeflate = []byte("deflate") strKeepAlive = []byte("keep-alive") strUpgrade = []byte("Upgrade") diff --git a/zstd.go b/zstd.go new file mode 100644 index 0000000000..065078528a --- /dev/null +++ b/zstd.go @@ -0,0 +1,158 @@ +package fasthttp + +import ( + "fmt" + "github.com/klauspost/compress/zstd" + "github.com/valyala/fasthttp/stackless" + "io" + "sync" +) + +const ( + CompressZstdSpeedNotSet = iota + CompressZstdBestSpeed + CompressZstdDefault + CompressZstdSpeedBetter + CompressZstdBestCompression +) + +var ( + zstdDecoderPool sync.Pool + stacklessZstdWriterPoolMap = newStacklessZstdWriterPoolMap() +) + +func newStacklessZstdWriterPoolMap() []*sync.Pool { + // Initialize pools for all the compression levels defined + // in https://github.com/klauspost/compress/blob/v1.17.4/zstd/encoder_options.go#L146 + // Compression levels are normalized with normalizeCompressLevel, + // so the fit [0..7]. + p := make([]*sync.Pool, 6) + for i := range p { + p[i] = &sync.Pool{} + } + return p +} + +func acquireZstdReader(r io.Reader) (*zstd.Decoder, error) { + v := zstdDecoderPool.Get() + if v == nil { + return zstd.NewReader(r) + } + zr := v.(*zstd.Decoder) + if err := zr.Reset(r); err != nil { + return nil, err + } + return zr, nil +} + +func releaseZstdReader(zr *zstd.Decoder) { + zstdDecoderPool.Put(zr) +} + +var zstdEncoderPool sync.Pool + +func acquireZstdWriter(w io.Writer, level int) (*zstd.Encoder, error) { + v := zstdEncoderPool.Get() + if v == nil { + return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevel(level))) + } + zw := v.(*zstd.Encoder) + zw.Reset(w) + return zw, nil +} + +func releaseZstdWriter(zw *zstd.Encoder) { + zw.Close() + zstdEncoderPool.Put(zw) +} + +func acquireStacklessZstdWriter(w io.Writer, compressLevel int) stackless.Writer { + nLevel := normalizeZstdCompressLevel(compressLevel) + p := stacklessZstdWriterPoolMap[nLevel] + v := p.Get() + if v == nil { + return stackless.NewWriter(w, func(w io.Writer) stackless.Writer { + return acquireRealZstdWriter(w, compressLevel) + }) + } + sw := v.(stackless.Writer) + sw.Reset(w) + return sw + +} + +func releaseStacklessZstdWriter(zf stackless.Writer, zstdDefault int) { + zf.Close() + nLevel := normalizeZstdCompressLevel(zstdDefault) + p := stacklessZstdWriterPoolMap[nLevel] + p.Put(zf) +} + +func acquireRealZstdWriter(w io.Writer, level int) stackless.Writer { + nLevel := normalizeZstdCompressLevel(level) + p := stacklessZstdWriterPoolMap[nLevel] + v := p.Get() + if v == nil { + zw, err := acquireZstdWriter(w, level) + if err != nil { + panic(err) + } + return zw + } + zw := v.(*zstd.Encoder) + zw.Reset(w) + return zw +} + +func AppendZstdBytesLevel(dst, src []byte, level int) []byte { + w := &byteSliceWriter{dst} + WriteZstdLevel(w, src, level) //nolint:errcheck + return w.b +} + +func WriteZstdLevel(w io.Writer, src []byte, level int) (int, error) { + zw := acquireStacklessZstdWriter(w, level) + n, err := zw.Write(src) + releaseStacklessZstdWriter(zw, level) + return n, err +} + +// AppendZstdBytes appends zstd src to dst and returns the resulting dst. +func AppendZstdBytes(dst, src []byte) []byte { + return AppendZstdBytesLevel(dst, src, CompressBrotliDefaultCompression) +} + +// WriteUnzstd writes unzstd p to w and returns the number of uncompressed +// bytes written to w. +func WriteUnzstd(w io.Writer, p []byte) (int, error) { + r := &byteSliceReader{p} + zr, err := acquireZstdReader(r) + if err != nil { + return 0, err + } + n, err := copyZeroAlloc(w, zr) + releaseZstdReader(zr) + nn := int(n) + if int64(nn) != n { + return 0, fmt.Errorf("too much data unzstd: %d", n) + } + return nn, err +} + +// AppendUnzstdBytes appends unzstd src to dst and returns the resulting dst. +func AppendUnzstdBytes(dst, src []byte) ([]byte, error) { + w := &byteSliceWriter{dst} + _, err := WriteUnzstd(w, src) + return w.b, err +} + +// normalizes compression level into [0..7], so it could be used as an index +// in *PoolMap. +func normalizeZstdCompressLevel(level int) int { + // -2 is the lowest compression level - CompressHuffmanOnly + // 9 is the highest compression level - CompressBestCompression + if level < CompressZstdSpeedNotSet || level > CompressZstdBestCompression { + level = CompressZstdDefault + } + return level +} diff --git a/zstd_test.go b/zstd_test.go new file mode 100644 index 0000000000..dc0c45f339 --- /dev/null +++ b/zstd_test.go @@ -0,0 +1,102 @@ +package fasthttp + +import ( + "bytes" + "fmt" + "io" + "testing" +) + +func TestZstdBytesSerial(t *testing.T) { + t.Parallel() + + if err := testZstdBytes(); err != nil { + t.Fatal(err) + } +} + +func TestZstdBytesConcurrent(t *testing.T) { + t.Parallel() + + if err := testConcurrent(10, testZstdBytes); err != nil { + t.Fatal(err) + } +} + +func testZstdBytes() error { + for _, s := range compressTestcases { + if err := testZstdBytesSingleCase(s); err != nil { + return err + } + } + return nil +} + +func testZstdBytesSingleCase(s string) error { + prefix := []byte("foobar") + ZstdpedS := AppendZstdBytes(prefix, []byte(s)) + if !bytes.Equal(ZstdpedS[:len(prefix)], prefix) { + return fmt.Errorf("unexpected prefix when compressing %q: %q. Expecting %q", s, ZstdpedS[:len(prefix)], prefix) + } + + unZstdedS, err := AppendUnzstdBytes(prefix, ZstdpedS[len(prefix):]) + if err != nil { + return fmt.Errorf("unexpected error when uncompressing %q: %w", s, err) + } + if !bytes.Equal(unZstdedS[:len(prefix)], prefix) { + return fmt.Errorf("unexpected prefix when uncompressing %q: %q. Expecting %q", s, unZstdedS[:len(prefix)], prefix) + } + unZstdedS = unZstdedS[len(prefix):] + if string(unZstdedS) != s { + return fmt.Errorf("unexpected uncompressed string %q. Expecting %q", unZstdedS, s) + } + return nil +} + +func TestZstdCompressSerial(t *testing.T) { + t.Parallel() + + if err := testZstdCompress(); err != nil { + t.Fatal(err) + } +} + +func TestZstdCompressConcurrent(t *testing.T) { + t.Parallel() + + if err := testConcurrent(10, testZstdCompress); err != nil { + t.Fatal(err) + } +} + +func testZstdCompress() error { + for _, s := range compressTestcases { + if err := testZstdCompressSingleCase(s); err != nil { + return err + } + } + return nil +} + +func testZstdCompressSingleCase(s string) error { + var buf bytes.Buffer + zw := acquireStacklessZstdWriter(&buf, CompressZstdDefault) + if _, err := zw.Write([]byte(s)); err != nil { + return fmt.Errorf("unexpected error: %w. s=%q", err, s) + } + releaseStacklessZstdWriter(zw, CompressZstdDefault) + + zr, err := acquireZstdReader(&buf) + if err != nil { + return fmt.Errorf("unexpected error: %w. s=%q", err, s) + } + body, err := io.ReadAll(zr) + if err != nil { + return fmt.Errorf("unexpected error: %w. s=%q", err, s) + } + if string(body) != s { + return fmt.Errorf("unexpected string after decompression: %q. Expecting %q", body, s) + } + releaseZstdReader(zr) + return nil +} From 724fd41b86fb6dd85f978d844f75f30314d68a1f Mon Sep 17 00:00:00 2001 From: Max-Cheng Date: Sat, 20 Jan 2024 22:57:40 +0800 Subject: [PATCH 2/8] fix:real & stackless write using different pool to avoid get stackless.writer --- fs.go | 1 - zstd.go | 73 ++++++++++++++++++++++++++++++++++++++++----------------- 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/fs.go b/fs.go index d9488f14fb..31799de1b1 100644 --- a/fs.go +++ b/fs.go @@ -1441,7 +1441,6 @@ func (h *fsHandler) compressFileNolock( if err1 := zw.Flush(); err == nil { err = err1 } - releaseStacklessZstdWriter(zw, CompressZstdDefault) } _ = zf.Close() diff --git a/zstd.go b/zstd.go index 065078528a..df021706a4 100644 --- a/zstd.go +++ b/zstd.go @@ -1,8 +1,10 @@ package fasthttp import ( + "bytes" "fmt" "github.com/klauspost/compress/zstd" + "github.com/valyala/bytebufferpool" "github.com/valyala/fasthttp/stackless" "io" "sync" @@ -18,21 +20,11 @@ const ( var ( zstdDecoderPool sync.Pool - stacklessZstdWriterPoolMap = newStacklessZstdWriterPoolMap() + zstdEncoderPool sync.Pool + realZstdWriterPoolMap = newCompressWriterPoolMap() + stacklessZstdWriterPoolMap = newCompressWriterPoolMap() ) -func newStacklessZstdWriterPoolMap() []*sync.Pool { - // Initialize pools for all the compression levels defined - // in https://github.com/klauspost/compress/blob/v1.17.4/zstd/encoder_options.go#L146 - // Compression levels are normalized with normalizeCompressLevel, - // so the fit [0..7]. - p := make([]*sync.Pool, 6) - for i := range p { - p[i] = &sync.Pool{} - } - return p -} - func acquireZstdReader(r io.Reader) (*zstd.Decoder, error) { v := zstdDecoderPool.Get() if v == nil { @@ -49,8 +41,6 @@ func releaseZstdReader(zr *zstd.Decoder) { zstdDecoderPool.Put(zr) } -var zstdEncoderPool sync.Pool - func acquireZstdWriter(w io.Writer, level int) (*zstd.Encoder, error) { v := zstdEncoderPool.Get() if v == nil { @@ -88,9 +78,9 @@ func releaseStacklessZstdWriter(zf stackless.Writer, zstdDefault int) { p.Put(zf) } -func acquireRealZstdWriter(w io.Writer, level int) stackless.Writer { +func acquireRealZstdWriter(w io.Writer, level int) *zstd.Encoder { nLevel := normalizeZstdCompressLevel(level) - p := stacklessZstdWriterPoolMap[nLevel] + p := realZstdWriterPoolMap[nLevel] v := p.Get() if v == nil { zw, err := acquireZstdWriter(w, level) @@ -104,17 +94,56 @@ func acquireRealZstdWriter(w io.Writer, level int) stackless.Writer { return zw } +func releaseRealZstdWrter(zw *zstd.Encoder, level int) { + zw.Close() + nLevel := normalizeZstdCompressLevel(level) + p := realZstdWriterPoolMap[nLevel] + p.Put(zw) +} + func AppendZstdBytesLevel(dst, src []byte, level int) []byte { w := &byteSliceWriter{dst} WriteZstdLevel(w, src, level) //nolint:errcheck return w.b } -func WriteZstdLevel(w io.Writer, src []byte, level int) (int, error) { - zw := acquireStacklessZstdWriter(w, level) - n, err := zw.Write(src) - releaseStacklessZstdWriter(zw, level) - return n, err +func WriteZstdLevel(w io.Writer, p []byte, level int) (int, error) { + switch w.(type) { + case *byteSliceWriter, + *bytes.Buffer, + *bytebufferpool.ByteBuffer: + ctx := &compressCtx{ + w: w, + p: p, + level: level, + } + stacklessWriteZstd(ctx) + return len(p), nil + default: + zw := acquireStacklessZstdWriter(w, level) + n, err := zw.Write(p) + releaseStacklessZstdWriter(zw, level) + return n, err + } +} + +var ( + stacklessWriteZstdOnce sync.Once + stacklessWriteZstdFunc func(ctx any) bool +) + +func stacklessWriteZstd(ctx any) { + stacklessWriteZstdOnce.Do(func() { + stacklessWriteZstdFunc = stackless.NewFunc(nonblockingWriteZstd) + }) + stacklessWriteZstdFunc(ctx) +} + +func nonblockingWriteZstd(ctxv any) { + ctx := ctxv.(*compressCtx) + zw := acquireRealZstdWriter(ctx.w, ctx.level) + zw.Write(ctx.p) //nolint:errcheck + releaseRealZstdWrter(zw, ctx.level) } // AppendZstdBytes appends zstd src to dst and returns the resulting dst. From 16627c1793409913d4bd0cf1d521fae7a2c118ef Mon Sep 17 00:00:00 2001 From: Max-Cheng Date: Wed, 31 Jan 2024 22:01:16 +0800 Subject: [PATCH 3/8] fix:zstd normalize compress level --- zstd.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zstd.go b/zstd.go index df021706a4..4218db4595 100644 --- a/zstd.go +++ b/zstd.go @@ -108,6 +108,7 @@ func AppendZstdBytesLevel(dst, src []byte, level int) []byte { } func WriteZstdLevel(w io.Writer, p []byte, level int) (int, error) { + level = normalizeZstdCompressLevel(level) switch w.(type) { case *byteSliceWriter, *bytes.Buffer, @@ -178,8 +179,6 @@ func AppendUnzstdBytes(dst, src []byte) ([]byte, error) { // normalizes compression level into [0..7], so it could be used as an index // in *PoolMap. func normalizeZstdCompressLevel(level int) int { - // -2 is the lowest compression level - CompressHuffmanOnly - // 9 is the highest compression level - CompressBestCompression if level < CompressZstdSpeedNotSet || level > CompressZstdBestCompression { level = CompressZstdDefault } From 94ae58c366539551e43692df966191f5e841e4ec Mon Sep 17 00:00:00 2001 From: Max-Cheng Date: Sun, 11 Feb 2024 11:22:23 +0800 Subject: [PATCH 4/8] Change empty string checks to be more idiomatic (#1684) --- fs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs.go b/fs.go index 31799de1b1..4e5be6ca24 100644 --- a/fs.go +++ b/fs.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "github.com/klauspost/compress/zstd" "html" "io" "io/fs" @@ -19,6 +18,7 @@ import ( "github.com/andybalholm/brotli" "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/zstd" "github.com/valyala/bytebufferpool" ) From a22ee6f6a0d01cccf27565a268210269ab99e6a9 Mon Sep 17 00:00:00 2001 From: Max-Cheng Date: Sun, 11 Feb 2024 11:22:59 +0800 Subject: [PATCH 5/8] chore:lint fix and rebase with master --- fs.go | 1 - http.go | 2 +- zstd.go | 8 ++++---- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/fs.go b/fs.go index 4e5be6ca24..c2ea512292 100644 --- a/fs.go +++ b/fs.go @@ -1050,7 +1050,6 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { mustCompress = true fileCacheKind = zstdCacheKind fileEncoding = "zstd" - } } diff --git a/http.go b/http.go index c2395e4778..e078809421 100644 --- a/http.go +++ b/http.go @@ -1894,7 +1894,7 @@ func (resp *Response) zstdBody(level int) error { wf: zw, bw: sw, } - copyZeroAlloc(fw, bs) + copyZeroAlloc(fw, bs) //nolint:errcheck releaseStacklessZstdWriter(zw, level) if bsc, ok := bs.(io.Closer); ok { bsc.Close() diff --git a/zstd.go b/zstd.go index 4218db4595..a54d2569b4 100644 --- a/zstd.go +++ b/zstd.go @@ -3,11 +3,12 @@ package fasthttp import ( "bytes" "fmt" + "io" + "sync" + "github.com/klauspost/compress/zstd" "github.com/valyala/bytebufferpool" "github.com/valyala/fasthttp/stackless" - "io" - "sync" ) const ( @@ -51,7 +52,7 @@ func acquireZstdWriter(w io.Writer, level int) (*zstd.Encoder, error) { return zw, nil } -func releaseZstdWriter(zw *zstd.Encoder) { +func releaseZstdWriter(zw *zstd.Encoder) { //nolint:unused zw.Close() zstdEncoderPool.Put(zw) } @@ -68,7 +69,6 @@ func acquireStacklessZstdWriter(w io.Writer, compressLevel int) stackless.Writer sw := v.(stackless.Writer) sw.Reset(w) return sw - } func releaseStacklessZstdWriter(zf stackless.Writer, zstdDefault int) { From 37ee77fb6b455c81048aedbebb76eb6c3ba83ba7 Mon Sep 17 00:00:00 2001 From: Max-Cheng Date: Thu, 8 Feb 2024 00:09:41 +0800 Subject: [PATCH 6/8] chore:remove 1.18 test & upgrade compress version --- .github/workflows/test.yml | 2 +- request_body.zst | Bin 0 -> 31 bytes 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 request_body.zst diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5b0430d896..8cbf1fe738 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,7 +9,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.18.x, 1.19.x, 1.20.x, 1.21.x, 1.22.x] + go-version: [1.19.x, 1.20.x, 1.21.x] os: [ubuntu-latest, macos-latest, windows-latest] runs-on: ${{ matrix.os }} steps: diff --git a/request_body.zst b/request_body.zst new file mode 100644 index 0000000000000000000000000000000000000000..ea95e7301da9e039017dc7d051385b3fd3e95d9c GIT binary patch literal 31 ncmdPcs{fZIVj=@WWPWLpLQ!gAX=-taLQ;N8CD)SKYu^F@wn_|u literal 0 HcmV?d00001 From bbdeff80977ca43ca8fe5e1b02982dee1a243ad8 Mon Sep 17 00:00:00 2001 From: Max-Cheng Date: Thu, 8 Feb 2024 21:36:53 +0800 Subject: [PATCH 7/8] fix:error default compress level --- zstd.go | 3 ++- zstd_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/zstd.go b/zstd.go index a54d2569b4..15ab2463d0 100644 --- a/zstd.go +++ b/zstd.go @@ -8,6 +8,7 @@ import ( "github.com/klauspost/compress/zstd" "github.com/valyala/bytebufferpool" + "github.com/valyala/fasthttp/stackless" ) @@ -149,7 +150,7 @@ func nonblockingWriteZstd(ctxv any) { // AppendZstdBytes appends zstd src to dst and returns the resulting dst. func AppendZstdBytes(dst, src []byte) []byte { - return AppendZstdBytesLevel(dst, src, CompressBrotliDefaultCompression) + return AppendZstdBytesLevel(dst, src, CompressZstdDefault) } // WriteUnzstd writes unzstd p to w and returns the number of uncompressed diff --git a/zstd_test.go b/zstd_test.go index dc0c45f339..620ef7d708 100644 --- a/zstd_test.go +++ b/zstd_test.go @@ -38,7 +38,7 @@ func testZstdBytesSingleCase(s string) error { if !bytes.Equal(ZstdpedS[:len(prefix)], prefix) { return fmt.Errorf("unexpected prefix when compressing %q: %q. Expecting %q", s, ZstdpedS[:len(prefix)], prefix) } - + unZstdedS, err := AppendUnzstdBytes(prefix, ZstdpedS[len(prefix):]) if err != nil { return fmt.Errorf("unexpected error when uncompressing %q: %w", s, err) From 6484a90b7eece87968a7308d3e3f3ddd97633732 Mon Sep 17 00:00:00 2001 From: Erik Dubbelboer Date: Wed, 21 Feb 2024 06:08:40 +0100 Subject: [PATCH 8/8] Fix lint --- .github/workflows/lint.yml | 9 ++++++++ .github/workflows/test.yml | 2 +- fs.go | 42 ++++++++++++++++++++++---------------- server.go | 7 ++++--- zstd.go | 1 - zstd_test.go | 2 +- 6 files changed, 39 insertions(+), 24 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 48972fd64f..c250bf70e5 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -4,6 +4,15 @@ on: branches: - master pull_request: + +permissions: + # Required: allow read access to the content for analysis. + contents: read + # Optional: allow read access to pull request. Use with `only-new-issues` option. + pull-requests: read + # Optional: Allow write access to checks to allow the action to annotate code in the PR. + checks: write + jobs: lint: runs-on: ubuntu-latest diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8cbf1fe738..5cfc4e5edb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,7 +9,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.19.x, 1.20.x, 1.21.x] + go-version: [1.19.x, 1.20.x, 1.21.x, 1.22.x] os: [ubuntu-latest, macos-latest, windows-latest] runs-on: ${{ matrix.os }} steps: diff --git a/fs.go b/fs.go index c2ea512292..4f2bbbf606 100644 --- a/fs.go +++ b/fs.go @@ -1038,15 +1038,16 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { fileEncoding := "" byteRange := ctx.Request.Header.peek(strRange) if len(byteRange) == 0 && h.compress { - if h.compressBrotli && ctx.Request.Header.HasAcceptEncodingBytes(strBr) { + switch { + case h.compressBrotli && ctx.Request.Header.HasAcceptEncodingBytes(strBr): mustCompress = true fileCacheKind = brotliCacheKind fileEncoding = "br" - } else if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) { + case ctx.Request.Header.HasAcceptEncodingBytes(strGzip): mustCompress = true fileCacheKind = gzipCacheKind fileEncoding = "gzip" - } else if ctx.Request.Header.HasAcceptEncodingBytes(strZstd) { + case ctx.Request.Header.HasAcceptEncodingBytes(strZstd): mustCompress = true fileCacheKind = zstdCacheKind fileEncoding = "zstd" @@ -1107,11 +1108,12 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { hdr := &ctx.Response.Header if ff.compressed { - if fileEncoding == "br" { + switch fileEncoding { + case "br": hdr.SetContentEncodingBytes(strBr) - } else if fileEncoding == "gzip" { + case "gzip": hdr.SetContentEncodingBytes(strGzip) - } else if fileEncoding == "zstd" { + case "zstd": hdr.SetContentEncodingBytes(strZstd) } } @@ -1316,11 +1318,12 @@ nestedContinue: if mustCompress { var zbuf bytebufferpool.ByteBuffer - if fileEncoding == "br" { + switch fileEncoding { + case "br": zbuf.B = AppendBrotliBytesLevel(zbuf.B, w.B, CompressDefaultCompression) - } else if fileEncoding == "gzip" { + case "gzip": zbuf.B = AppendGzipBytesLevel(zbuf.B, w.B, CompressDefaultCompression) - } else if fileEncoding == "zstd" { + case "zstd": zbuf.B = AppendZstdBytesLevel(zbuf.B, w.B, CompressZstdDefault) } w = &zbuf @@ -1420,21 +1423,22 @@ func (h *fsHandler) compressFileNolock( } return nil, errNoCreatePermission } - if fileEncoding == "br" { + switch fileEncoding { + case "br": zw := acquireStacklessBrotliWriter(zf, CompressDefaultCompression) _, err = copyZeroAlloc(zw, f) if err1 := zw.Flush(); err == nil { err = err1 } releaseStacklessBrotliWriter(zw, CompressDefaultCompression) - } else if fileEncoding == "gzip" { + case "gzip": zw := acquireStacklessGzipWriter(zf, CompressDefaultCompression) _, err = copyZeroAlloc(zw, f) if err1 := zw.Flush(); err == nil { err = err1 } releaseStacklessGzipWriter(zw, CompressDefaultCompression) - } else if fileEncoding == "zstd" { + case "zstd": zw := acquireStacklessZstdWriter(zf, CompressZstdDefault) _, err = copyZeroAlloc(zw, f) if err1 := zw.Flush(); err == nil { @@ -1464,21 +1468,22 @@ func (h *fsHandler) newCompressedFSFileCache(f fs.File, fileInfo fs.FileInfo, fi err error ) - if fileEncoding == "br" { + switch fileEncoding { + case "br": zw := acquireStacklessBrotliWriter(w, CompressDefaultCompression) _, err = copyZeroAlloc(zw, f) if err1 := zw.Flush(); err == nil { err = err1 } releaseStacklessBrotliWriter(zw, CompressDefaultCompression) - } else if fileEncoding == "gzip" { + case "gzip": zw := acquireStacklessGzipWriter(w, CompressDefaultCompression) _, err = copyZeroAlloc(zw, f) if err1 := zw.Flush(); err == nil { err = err1 } releaseStacklessGzipWriter(zw, CompressDefaultCompression) - } else if fileEncoding == "zstd" { + case "zstd": zw := acquireStacklessZstdWriter(w, CompressZstdDefault) _, err = copyZeroAlloc(zw, f) if err1 := zw.Flush(); err == nil { @@ -1634,17 +1639,18 @@ func readFileHeader(f io.Reader, compressed bool, fileEncoding string) ([]byte, ) if compressed { var err error - if fileEncoding == "br" { + switch fileEncoding { + case "br": if br, err = acquireBrotliReader(f); err != nil { return nil, err } r = br - } else if fileEncoding == "gzip" { + case "gzip": if zr, err = acquireGzipReader(f); err != nil { return nil, err } r = zr - } else if fileEncoding == "zstd" { + case "zstd": if zsr, err = acquireZstdReader(f); err != nil { return nil, err } diff --git a/server.go b/server.go index bdfbcc6d66..426351b383 100644 --- a/server.go +++ b/server.go @@ -523,11 +523,12 @@ func CompressHandler(h RequestHandler) RequestHandler { func CompressHandlerLevel(h RequestHandler, level int) RequestHandler { return func(ctx *RequestCtx) { h(ctx) - if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) { + switch { + case ctx.Request.Header.HasAcceptEncodingBytes(strGzip): ctx.Response.gzipBody(level) //nolint:errcheck - } else if ctx.Request.Header.HasAcceptEncodingBytes(strDeflate) { + case ctx.Request.Header.HasAcceptEncodingBytes(strDeflate): ctx.Response.deflateBody(level) //nolint:errcheck - } else if ctx.Request.Header.HasAcceptEncodingBytes(strZstd) { + case ctx.Request.Header.HasAcceptEncodingBytes(strZstd): ctx.Response.zstdBody(level) //nolint:errcheck } } diff --git a/zstd.go b/zstd.go index 15ab2463d0..226a126326 100644 --- a/zstd.go +++ b/zstd.go @@ -8,7 +8,6 @@ import ( "github.com/klauspost/compress/zstd" "github.com/valyala/bytebufferpool" - "github.com/valyala/fasthttp/stackless" ) diff --git a/zstd_test.go b/zstd_test.go index 620ef7d708..dc0c45f339 100644 --- a/zstd_test.go +++ b/zstd_test.go @@ -38,7 +38,7 @@ func testZstdBytesSingleCase(s string) error { if !bytes.Equal(ZstdpedS[:len(prefix)], prefix) { return fmt.Errorf("unexpected prefix when compressing %q: %q. Expecting %q", s, ZstdpedS[:len(prefix)], prefix) } - + unZstdedS, err := AppendUnzstdBytes(prefix, ZstdpedS[len(prefix):]) if err != nil { return fmt.Errorf("unexpected error when uncompressing %q: %w", s, err)