Skip to content

Commit

Permalink
feat(adaptive):add new compression algorithm——zstd.
Browse files Browse the repository at this point in the history
  • Loading branch information
wqshr12345 committed Nov 1, 2023
1 parent a30823b commit a2afc41
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 18 deletions.
6 changes: 6 additions & 0 deletions adaptive/adaptive.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ type CompressInfo struct {
}

type ReportFunction func(CompressInfo)

const (
CompressTypeNone = uint8(0)
CompressTypeSnappy = uint8(1)
CompressTypeZstd = uint8(2)
)
10 changes: 7 additions & 3 deletions adaptive/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (

"github.com/wqshr12345/golib/compression"
"github.com/wqshr12345/golib/compression/snappy"
"github.com/wqshr12345/golib/compression/zstd"
)

func NewReader(r io.Reader, reportFunc ReportFunction) *Reader {
cmprs := make(map[uint8]compression.Decompressor)
cmprs[CompressTypeSnappy] = snappy.NewDecompressor()
cmprs[CompressTypeZstd] = zstd.NewDecompressor()
return &Reader{
inR: r,
cmpr: snappy.NewDecompressor(),
cmprs: cmprs,
reportFunc: reportFunc,
pkgID: 0,
}
Expand All @@ -36,7 +40,7 @@ type Reader struct {
// the data will be transported from inR.
inR io.Reader

cmpr compression.Decompressor
cmprs map[uint8]compression.Decompressor

err error

Expand Down Expand Up @@ -103,7 +107,7 @@ func (r *Reader) fill() error {
// TODO(wangqian): Use compressType to choose different decompressor.
// TODO(wangqian): Should we avoid memory allocate every times?
mid2Ts := time.Now().UnixNano()
r.oBuf = r.cmpr.Decompress(nil, compressedData)
r.oBuf = r.cmprs[uint8(compressType)].Decompress(nil, compressedData)
endTs := time.Now().UnixNano()
compressInfo := CompressInfo{
PkgId: r.pkgID,
Expand Down
29 changes: 16 additions & 13 deletions adaptive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ import (

"github.com/wqshr12345/golib/compression"
"github.com/wqshr12345/golib/compression/snappy"
)

const (
compressTypeNone = 0x00
compressTypeSnappy = 0x01
compressTypeZstd = 0x02
"github.com/wqshr12345/golib/compression/zstd"
)

/*
Expand Down Expand Up @@ -54,7 +49,9 @@ type Writer struct {
// the data will be transported to outW.
outW io.Writer

cmpr compression.Compressor
cmprs map[uint8]compression.Compressor

cmprType uint8

err error

Expand All @@ -64,14 +61,20 @@ type Writer struct {
compressInfo []CompressInfo
}

func NewWriter(w io.Writer, bufSize int) *Writer {
func NewWriter(w io.Writer, bufSize int, cmprType uint8) *Writer {
// if bufSize > maxOriginalDataSize {
// panic("lands: the buffer is larger than the maximum of snappy's compression data.")
// }

cmprs := make(map[uint8]compression.Compressor)
cmprs[CompressTypeSnappy] = snappy.NewCompressor()
cmprs[CompressTypeZstd] = zstd.NewCompressor()

return &Writer{
outW: w,
cmpr: snappy.NewCompressor(),
iBuf: make([]byte, 0, bufSize),
outW: w,
cmprs: cmprs,
cmprType: cmprType,
iBuf: make([]byte, 0, bufSize),
}
}

Expand Down Expand Up @@ -112,10 +115,10 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) {

// TODO(wangqian): use a oBuf to avoid memory allocate.
startTs := time.Now().UnixNano()
compressedData := w.cmpr.Compress(p)
compressedData := w.cmprs[w.cmprType].Compress(p)
midTs := time.Now().UnixNano()
dataLen := len(compressedData)
compressType := uint8(compressTypeSnappy)
compressType := w.cmprType

oBuf[0] = compressType
oBuf[1] = uint8(startTs >> 0)
Expand Down
25 changes: 25 additions & 0 deletions compression/zstd/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package zstd

import (
"github.com/klauspost/compress/zstd"
)

func NewDecompressor() *ZstdDecompressor {
decoder, _ := zstd.NewReader(nil)
return &ZstdDecompressor{
decoder: decoder,
}
}

type ZstdDecompressor struct {
decoder *zstd.Decoder
}

// this will return a original data.
func (c *ZstdDecompressor) Decompress(dst, src []byte) []byte {
ans, err := c.decoder.DecodeAll(src, dst)
if err != nil {
panic(err)
}
return ans
}
21 changes: 21 additions & 0 deletions compression/zstd/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package zstd

import (
"github.com/klauspost/compress/zstd"
)

func NewCompressor() *ZstdCompressor {
encoder, _ := zstd.NewWriter(nil)
return &ZstdCompressor{
encoder: encoder,
}
}

type ZstdCompressor struct {
encoder *zstd.Encoder
}

// this will return a zstd format data.
func (c *ZstdCompressor) Compress(src []byte) []byte {
return c.encoder.EncodeAll(src, make([]byte, 0, len(src)))
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ go 1.20

require (
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c
// github.com/DataDog/zstd v1.5.5
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatedier/kcp-go v2.0.4-0.20190803094908-fe8645b0a904+incompatible
github.com/golang/snappy v0.0.4
github.com/klauspost/compress v1.17.2
github.com/klauspost/reedsolomon v1.9.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.6.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.6 h1:dQ5ueTiftKxp0gyjKSx5+8BtPWkyQbd95m8Gys/RarI=
github.com/klauspost/cpuid/v2 v2.0.6/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/reedsolomon v1.9.15 h1:g2erWKD2M6rgnPf89fCji6jNlhMKMdXcuNHMW1SYCIo=
Expand Down
4 changes: 2 additions & 2 deletions io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func WithCompressionFromPool(rwc io.ReadWriteCloser) (out io.ReadWriteCloser, re
return
}

func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc adaptive.ReportFunction, bufSize int) (out interfaces.ReadWriteCloseReportFlusher, recycle func()) {
func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc adaptive.ReportFunction, bufSize int, compressType uint8) (out interfaces.ReadWriteCloseReportFlusher, recycle func()) {
sr := adaptive.NewReader(rwc, reportFunc)
sw := adaptive.NewWriter(rwc, bufSize)
sw := adaptive.NewWriter(rwc, bufSize, compressType)
out = WrapReadWriteCloseReportFlusher(sr, sw, func() error {
err := sw.Close()
err = rwc.Close()
Expand Down

0 comments on commit a2afc41

Please sign in to comment.