Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:support zstd compress and uncompressed #1701

Merged
merged 8 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ on:
branches:
- master
pull_request:

permissions:
# Required: allow read access to the content for analysis.
contents: read
# Optional: allow read access to pull request. Use with `only-new-issues` option.
pull-requests: read
# Optional: Allow write access to checks to allow the action to annotate code in the PR.
checks: write

jobs:
lint:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go-version: [1.18.x, 1.19.x, 1.20.x, 1.21.x, 1.22.x]
go-version: [1.19.x, 1.20.x, 1.21.x, 1.22.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
74 changes: 59 additions & 15 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/andybalholm/brotli"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"github.com/valyala/bytebufferpool"
)

Expand Down Expand Up @@ -370,6 +371,7 @@ const FSCompressedFileSuffix = ".fasthttp.gz"
var FSCompressedFileSuffixes = map[string]string{
"gzip": ".fasthttp.gz",
"br": ".fasthttp.br",
"zstd": ".fasthttp.zst",
}

// FSHandlerCacheDuration is the default expiration duration for inactive
Expand Down Expand Up @@ -460,7 +462,9 @@ func (fs *FS) initRequestHandler() {

compressedFileSuffixes := fs.CompressedFileSuffixes
if compressedFileSuffixes["br"] == "" || compressedFileSuffixes["gzip"] == "" ||
compressedFileSuffixes["br"] == compressedFileSuffixes["gzip"] {
compressedFileSuffixes["zstd"] == "" || compressedFileSuffixes["br"] == compressedFileSuffixes["gzip"] ||
compressedFileSuffixes["br"] == compressedFileSuffixes["zstd"] ||
compressedFileSuffixes["gzip"] == compressedFileSuffixes["zstd"] {
// Copy global map
compressedFileSuffixes = make(map[string]string, len(FSCompressedFileSuffixes))
for k, v := range FSCompressedFileSuffixes {
Expand All @@ -471,6 +475,7 @@ func (fs *FS) initRequestHandler() {
if fs.CompressedFileSuffix != "" {
compressedFileSuffixes["gzip"] = fs.CompressedFileSuffix
compressedFileSuffixes["br"] = FSCompressedFileSuffixes["br"]
compressedFileSuffixes["zstd"] = FSCompressedFileSuffixes["zstd"]
}

h := &fsHandler{
Expand Down Expand Up @@ -794,6 +799,7 @@ const (
defaultCacheKind CacheKind = iota
brotliCacheKind
gzipCacheKind
zstdCacheKind
)

func newCacheManager(fs *FS) cacheManager {
Expand Down Expand Up @@ -1032,14 +1038,19 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
fileEncoding := ""
byteRange := ctx.Request.Header.peek(strRange)
if len(byteRange) == 0 && h.compress {
if h.compressBrotli && ctx.Request.Header.HasAcceptEncodingBytes(strBr) {
switch {
case h.compressBrotli && ctx.Request.Header.HasAcceptEncodingBytes(strBr):
mustCompress = true
fileCacheKind = brotliCacheKind
fileEncoding = "br"
} else if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) {
case ctx.Request.Header.HasAcceptEncodingBytes(strGzip):
mustCompress = true
fileCacheKind = gzipCacheKind
fileEncoding = "gzip"
case ctx.Request.Header.HasAcceptEncodingBytes(strZstd):
mustCompress = true
fileCacheKind = zstdCacheKind
fileEncoding = "zstd"
}
}

Expand Down Expand Up @@ -1097,10 +1108,13 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {

hdr := &ctx.Response.Header
if ff.compressed {
if fileEncoding == "br" {
switch fileEncoding {
case "br":
hdr.SetContentEncodingBytes(strBr)
} else if fileEncoding == "gzip" {
case "gzip":
hdr.SetContentEncodingBytes(strGzip)
case "zstd":
hdr.SetContentEncodingBytes(strZstd)
}
}

Expand Down Expand Up @@ -1304,10 +1318,13 @@ nestedContinue:

if mustCompress {
var zbuf bytebufferpool.ByteBuffer
if fileEncoding == "br" {
switch fileEncoding {
case "br":
zbuf.B = AppendBrotliBytesLevel(zbuf.B, w.B, CompressDefaultCompression)
} else if fileEncoding == "gzip" {
case "gzip":
zbuf.B = AppendGzipBytesLevel(zbuf.B, w.B, CompressDefaultCompression)
case "zstd":
zbuf.B = AppendZstdBytesLevel(zbuf.B, w.B, CompressZstdDefault)
}
w = &zbuf
}
Expand Down Expand Up @@ -1406,20 +1423,28 @@ func (h *fsHandler) compressFileNolock(
}
return nil, errNoCreatePermission
}
if fileEncoding == "br" {
switch fileEncoding {
case "br":
zw := acquireStacklessBrotliWriter(zf, CompressDefaultCompression)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessBrotliWriter(zw, CompressDefaultCompression)
} else if fileEncoding == "gzip" {
case "gzip":
zw := acquireStacklessGzipWriter(zf, CompressDefaultCompression)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessGzipWriter(zw, CompressDefaultCompression)
case "zstd":
zw := acquireStacklessZstdWriter(zf, CompressZstdDefault)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessZstdWriter(zw, CompressZstdDefault)
}
_ = zf.Close()
_ = f.Close()
Expand All @@ -1443,20 +1468,28 @@ func (h *fsHandler) newCompressedFSFileCache(f fs.File, fileInfo fs.FileInfo, fi
err error
)

if fileEncoding == "br" {
switch fileEncoding {
case "br":
zw := acquireStacklessBrotliWriter(w, CompressDefaultCompression)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessBrotliWriter(zw, CompressDefaultCompression)
} else if fileEncoding == "gzip" {
case "gzip":
zw := acquireStacklessGzipWriter(w, CompressDefaultCompression)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessGzipWriter(zw, CompressDefaultCompression)
case "zstd":
zw := acquireStacklessZstdWriter(w, CompressZstdDefault)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessZstdWriter(zw, CompressZstdDefault)
}
defer func() { _ = f.Close() }()

Expand Down Expand Up @@ -1600,21 +1633,28 @@ func (h *fsHandler) newFSFile(f fs.File, fileInfo fs.FileInfo, compressed bool,
func readFileHeader(f io.Reader, compressed bool, fileEncoding string) ([]byte, error) {
r := f
var (
br *brotli.Reader
zr *gzip.Reader
br *brotli.Reader
zr *gzip.Reader
zsr *zstd.Decoder
)
if compressed {
var err error
if fileEncoding == "br" {
switch fileEncoding {
case "br":
if br, err = acquireBrotliReader(f); err != nil {
return nil, err
}
r = br
} else if fileEncoding == "gzip" {
case "gzip":
if zr, err = acquireGzipReader(f); err != nil {
return nil, err
}
r = zr
case "zstd":
if zsr, err = acquireZstdReader(f); err != nil {
return nil, err
}
r = zsr
}
}

Expand All @@ -1639,6 +1679,10 @@ func readFileHeader(f io.Reader, compressed bool, fileEncoding string) ([]byte,
releaseGzipReader(zr)
}

if zsr != nil {
releaseZstdReader(zsr)
}

return data, err
}

Expand Down
70 changes: 70 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,23 @@ func (ctx *RequestCtx) RequestBodyStream() io.Reader {
return ctx.Request.bodyStream
}

func (req *Request) BodyUnzstd() ([]byte, error) {
return unzstdData(req.Body())
}

func (resp *Response) BodyUnzstd() ([]byte, error) {
return unzstdData(resp.Body())
}

func unzstdData(p []byte) ([]byte, error) {
var bb bytebufferpool.ByteBuffer
_, err := WriteUnzstd(&bb, p)
if err != nil {
return nil, err
}
return bb.B, nil
}

func inflateData(p []byte) ([]byte, error) {
var bb bytebufferpool.ByteBuffer
_, err := WriteInflate(&bb, p)
Expand All @@ -554,6 +571,8 @@ func (req *Request) BodyUncompressed() ([]byte, error) {
return req.BodyGunzip()
case "br":
return req.BodyUnbrotli()
case "zstd":
return req.BodyUnzstd()
default:
return nil, ErrContentEncodingUnsupported
}
Expand All @@ -574,6 +593,8 @@ func (resp *Response) BodyUncompressed() ([]byte, error) {
return resp.BodyGunzip()
case "br":
return resp.BodyUnbrotli()
case "zstd":
return resp.BodyUnzstd()
default:
return nil, ErrContentEncodingUnsupported
}
Expand Down Expand Up @@ -1849,6 +1870,55 @@ func (resp *Response) deflateBody(level int) error {
return nil
}

func (resp *Response) zstdBody(level int) error {
if len(resp.Header.ContentEncoding()) > 0 {
return nil
}

if !resp.Header.isCompressibleContentType() {
return nil
}

if resp.bodyStream != nil {
// Reset Content-Length to -1, since it is impossible
// to determine body size beforehand of streamed compression.
// For
resp.Header.SetContentLength(-1)

// Do not care about memory allocations here, since flate is slow
// and allocates a lot of memory by itself.
bs := resp.bodyStream
resp.bodyStream = NewStreamReader(func(sw *bufio.Writer) {
zw := acquireStacklessZstdWriter(sw, level)
fw := &flushWriter{
wf: zw,
bw: sw,
}
copyZeroAlloc(fw, bs) //nolint:errcheck
releaseStacklessZstdWriter(zw, level)
if bsc, ok := bs.(io.Closer); ok {
bsc.Close()
}
})
} else {
bodyBytes := resp.bodyBytes()
if len(bodyBytes) < minCompressLen {
return nil
}
w := responseBodyPool.Get()
w.B = AppendZstdBytesLevel(w.B, bodyBytes, level)

if resp.body != nil {
responseBodyPool.Put(resp.body)
}
resp.body = w
resp.bodyRaw = nil
}
resp.Header.SetContentEncodingBytes(strZstd)
resp.Header.addVaryBytes(strAcceptEncoding)
return nil
}

// Bodies with sizes smaller than minCompressLen aren't compressed at all.
const minCompressLen = 200

Expand Down
Binary file added request_body.zst
Binary file not shown.
9 changes: 7 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,13 @@ func CompressHandler(h RequestHandler) RequestHandler {
func CompressHandlerLevel(h RequestHandler, level int) RequestHandler {
return func(ctx *RequestCtx) {
h(ctx)
if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) {
switch {
case ctx.Request.Header.HasAcceptEncodingBytes(strGzip):
ctx.Response.gzipBody(level) //nolint:errcheck
} else if ctx.Request.Header.HasAcceptEncodingBytes(strDeflate) {
case ctx.Request.Header.HasAcceptEncodingBytes(strDeflate):
ctx.Response.deflateBody(level) //nolint:errcheck
case ctx.Request.Header.HasAcceptEncodingBytes(strZstd):
ctx.Response.zstdBody(level) //nolint:errcheck
}
}
}
Expand Down Expand Up @@ -559,6 +562,8 @@ func CompressHandlerBrotliLevel(h RequestHandler, brotliLevel, otherLevel int) R
ctx.Response.gzipBody(otherLevel) //nolint:errcheck
case ctx.Request.Header.HasAcceptEncodingBytes(strDeflate):
ctx.Response.deflateBody(otherLevel) //nolint:errcheck
case ctx.Request.Header.HasAcceptEncodingBytes(strZstd):
ctx.Response.zstdBody(otherLevel) //nolint:errcheck
}
}
}
Expand Down
1 change: 1 addition & 0 deletions strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var (
strClose = []byte("close")
strGzip = []byte("gzip")
strBr = []byte("br")
strZstd = []byte("zstd")
strDeflate = []byte("deflate")
strKeepAlive = []byte("keep-alive")
strUpgrade = []byte("Upgrade")
Expand Down
Loading
Loading