diff --git a/adaptive/adaptive.go b/adaptive/adaptive.go index af44862..17c558b 100644 --- a/adaptive/adaptive.go +++ b/adaptive/adaptive.go @@ -11,3 +11,9 @@ type CompressInfo struct { } type ReportFunction func(CompressInfo) + +const ( + CompressTypeNone = uint8(0) + CompressTypeSnappy = uint8(1) + CompressTypeZstd = uint8(2) +) diff --git a/adaptive/reader.go b/adaptive/reader.go index 18c3083..13f40e5 100644 --- a/adaptive/reader.go +++ b/adaptive/reader.go @@ -21,12 +21,16 @@ import ( "github.com/wqshr12345/golib/compression" "github.com/wqshr12345/golib/compression/snappy" + "github.com/wqshr12345/golib/compression/zstd" ) func NewReader(r io.Reader, reportFunc ReportFunction) *Reader { + cmprs := make(map[uint8]compression.Decompressor) + cmprs[CompressTypeSnappy] = snappy.NewDecompressor() + cmprs[CompressTypeZstd] = zstd.NewDecompressor() return &Reader{ inR: r, - cmpr: snappy.NewDecompressor(), + cmprs: cmprs, reportFunc: reportFunc, pkgID: 0, } @@ -36,7 +40,7 @@ type Reader struct { // the data will be transported from inR. inR io.Reader - cmpr compression.Decompressor + cmprs map[uint8]compression.Decompressor err error @@ -103,7 +107,7 @@ func (r *Reader) fill() error { // TODO(wangqian): Use compressType to choose different decompressor. // TODO(wangqian): Should we avoid memory allocate every times? mid2Ts := time.Now().UnixNano() - r.oBuf = r.cmpr.Decompress(nil, compressedData) + r.oBuf = r.cmprs[uint8(compressType)].Decompress(nil, compressedData) endTs := time.Now().UnixNano() compressInfo := CompressInfo{ PkgId: r.pkgID, diff --git a/adaptive/writer.go b/adaptive/writer.go index 1e789a3..ca8e97b 100644 --- a/adaptive/writer.go +++ b/adaptive/writer.go @@ -21,12 +21,7 @@ import ( "github.com/wqshr12345/golib/compression" "github.com/wqshr12345/golib/compression/snappy" -) - -const ( - compressTypeNone = 0x00 - compressTypeSnappy = 0x01 - compressTypeZstd = 0x02 + "github.com/wqshr12345/golib/compression/zstd" ) /* @@ -54,7 +49,9 @@ type Writer struct { // the data will be transported to outW. outW io.Writer - cmpr compression.Compressor + cmprs map[uint8]compression.Compressor + + cmprType uint8 err error @@ -64,14 +61,20 @@ type Writer struct { compressInfo []CompressInfo } -func NewWriter(w io.Writer, bufSize int) *Writer { +func NewWriter(w io.Writer, bufSize int, cmprType uint8) *Writer { // if bufSize > maxOriginalDataSize { // panic("lands: the buffer is larger than the maximum of snappy's compression data.") // } + + cmprs := make(map[uint8]compression.Compressor) + cmprs[CompressTypeSnappy] = snappy.NewCompressor() + cmprs[CompressTypeZstd] = zstd.NewCompressor() + return &Writer{ - outW: w, - cmpr: snappy.NewCompressor(), - iBuf: make([]byte, 0, bufSize), + outW: w, + cmprs: cmprs, + cmprType: cmprType, + iBuf: make([]byte, 0, bufSize), } } @@ -112,10 +115,10 @@ func (w *Writer) write(p []byte) (nRet int, errRet error) { // TODO(wangqian): use a oBuf to avoid memory allocate. startTs := time.Now().UnixNano() - compressedData := w.cmpr.Compress(p) + compressedData := w.cmprs[w.cmprType].Compress(p) midTs := time.Now().UnixNano() dataLen := len(compressedData) - compressType := uint8(compressTypeSnappy) + compressType := w.cmprType oBuf[0] = compressType oBuf[1] = uint8(startTs >> 0) diff --git a/compression/zstd/decoder.go b/compression/zstd/decoder.go new file mode 100644 index 0000000..65ef046 --- /dev/null +++ b/compression/zstd/decoder.go @@ -0,0 +1,25 @@ +package zstd + +import ( + "github.com/klauspost/compress/zstd" +) + +func NewDecompressor() *ZstdDecompressor { + decoder, _ := zstd.NewReader(nil) + return &ZstdDecompressor{ + decoder: decoder, + } +} + +type ZstdDecompressor struct { + decoder *zstd.Decoder +} + +// this will return a original data. +func (c *ZstdDecompressor) Decompress(dst, src []byte) []byte { + ans, err := c.decoder.DecodeAll(src, dst) + if err != nil { + panic(err) + } + return ans +} diff --git a/compression/zstd/encoder.go b/compression/zstd/encoder.go new file mode 100644 index 0000000..134ee42 --- /dev/null +++ b/compression/zstd/encoder.go @@ -0,0 +1,21 @@ +package zstd + +import ( + "github.com/klauspost/compress/zstd" +) + +func NewCompressor() *ZstdCompressor { + encoder, _ := zstd.NewWriter(nil) + return &ZstdCompressor{ + encoder: encoder, + } +} + +type ZstdCompressor struct { + encoder *zstd.Encoder +} + +// this will return a zstd format data. +func (c *ZstdCompressor) Compress(src []byte) []byte { + return c.encoder.EncodeAll(src, make([]byte, 0, len(src))) +} diff --git a/go.mod b/go.mod index 47dfb3b..a95c8b2 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,11 @@ go 1.20 require ( github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c + // github.com/DataDog/zstd v1.5.5 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatedier/kcp-go v2.0.4-0.20190803094908-fe8645b0a904+incompatible github.com/golang/snappy v0.0.4 + github.com/klauspost/compress v1.17.2 github.com/klauspost/reedsolomon v1.9.15 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 35d4bdc..db5cff8 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.6 h1:dQ5ueTiftKxp0gyjKSx5+8BtPWkyQbd95m8Gys/RarI= github.com/klauspost/cpuid/v2 v2.0.6/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/reedsolomon v1.9.15 h1:g2erWKD2M6rgnPf89fCji6jNlhMKMdXcuNHMW1SYCIo= diff --git a/io/io.go b/io/io.go index 1b1c4f4..5e4b973 100644 --- a/io/io.go +++ b/io/io.go @@ -113,9 +113,9 @@ func WithCompressionFromPool(rwc io.ReadWriteCloser) (out io.ReadWriteCloser, re return } -func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc adaptive.ReportFunction, bufSize int) (out interfaces.ReadWriteCloseReportFlusher, recycle func()) { +func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc adaptive.ReportFunction, bufSize int, compressType uint8) (out interfaces.ReadWriteCloseReportFlusher, recycle func()) { sr := adaptive.NewReader(rwc, reportFunc) - sw := adaptive.NewWriter(rwc, bufSize) + sw := adaptive.NewWriter(rwc, bufSize, compressType) out = WrapReadWriteCloseReportFlusher(sr, sw, func() error { err := sw.Close() err = rwc.Close()