From a7131e6d378d28a473769bd8aaba7f4025d6a0e1 Mon Sep 17 00:00:00 2001 From: Wang Qian <1498953301@qq.com> Date: Mon, 6 Nov 2023 19:35:16 +0800 Subject: [PATCH] feat(io):extract sth into common && add new stream compression interface:WithCompression2. --- adaptive/reader.go | 11 +++--- adaptive/writer.go | 11 +++--- adaptive/adaptive.go => common/common.go | 2 +- go.mod | 1 - interfaces/common.go | 18 ++++++++- io/io.go | 50 ++++++++++++++++++++++-- 6 files changed, 76 insertions(+), 17 deletions(-) rename adaptive/adaptive.go => common/common.go (95%) diff --git a/adaptive/reader.go b/adaptive/reader.go index 13f40e5..1a99534 100644 --- a/adaptive/reader.go +++ b/adaptive/reader.go @@ -19,15 +19,16 @@ import ( "io" "time" + "github.com/wqshr12345/golib/common" "github.com/wqshr12345/golib/compression" "github.com/wqshr12345/golib/compression/snappy" "github.com/wqshr12345/golib/compression/zstd" ) -func NewReader(r io.Reader, reportFunc ReportFunction) *Reader { +func NewReader(r io.Reader, reportFunc common.ReportFunction) *Reader { cmprs := make(map[uint8]compression.Decompressor) - cmprs[CompressTypeSnappy] = snappy.NewDecompressor() - cmprs[CompressTypeZstd] = zstd.NewDecompressor() + cmprs[common.CompressTypeSnappy] = snappy.NewDecompressor() + cmprs[common.CompressTypeZstd] = zstd.NewDecompressor() return &Reader{ inR: r, cmprs: cmprs, @@ -50,7 +51,7 @@ type Reader struct { // oBuf[start:] represent valid data. start int - reportFunc ReportFunction + reportFunc common.ReportFunction pkgID int } @@ -109,7 +110,7 @@ func (r *Reader) fill() error { mid2Ts := time.Now().UnixNano() r.oBuf = r.cmprs[uint8(compressType)].Decompress(nil, compressedData) endTs := time.Now().UnixNano() - compressInfo := CompressInfo{ + compressInfo := common.CompressInfo{ PkgId: r.pkgID, DataLen: len(r.oBuf), CompressType: int(compressType), diff --git a/adaptive/writer.go b/adaptive/writer.go index ca8e97b..8f70233 100644 --- a/adaptive/writer.go +++ b/adaptive/writer.go @@ -1,4 +1,4 @@ -// Copyright 2019 wqshr12345, wqshr12345@gmail.com +// Copyright 2023 wqshr12345, wqshr12345@gmail.com // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import ( "io" "time" + "github.com/wqshr12345/golib/common" "github.com/wqshr12345/golib/compression" "github.com/wqshr12345/golib/compression/snappy" "github.com/wqshr12345/golib/compression/zstd" @@ -58,7 +59,7 @@ type Writer struct { // iBuf is a buffer for the incoming unadaptiveencoded data.If it's nil, we will not buffer the incoming data. iBuf []byte - compressInfo []CompressInfo + compressInfo []common.CompressInfo } func NewWriter(w io.Writer, bufSize int, cmprType uint8) *Writer { @@ -67,8 +68,8 @@ func NewWriter(w io.Writer, bufSize int, cmprType uint8) *Writer { // } cmprs := make(map[uint8]compression.Compressor) - cmprs[CompressTypeSnappy] = snappy.NewCompressor() - cmprs[CompressTypeZstd] = zstd.NewCompressor() + cmprs[common.CompressTypeSnappy] = snappy.NewCompressor() + cmprs[common.CompressTypeZstd] = zstd.NewCompressor() return &Writer{ outW: w, @@ -168,7 +169,7 @@ func (w *Writer) Flush() error { return w.err } -func (w *Writer) Report(info CompressInfo) error { +func (w *Writer) Report(info common.CompressInfo) error { // TODO(wangqian):Do more things. w.compressInfo = append(w.compressInfo, info) return nil diff --git a/adaptive/adaptive.go b/common/common.go similarity index 95% rename from adaptive/adaptive.go rename to common/common.go index 17c558b..0518d93 100644 --- a/adaptive/adaptive.go +++ b/common/common.go @@ -1,4 +1,4 @@ -package adaptive +package common type CompressInfo struct { PkgId int diff --git a/go.mod b/go.mod index a95c8b2..96d8e43 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.20 require ( github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c - // github.com/DataDog/zstd v1.5.5 github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatedier/kcp-go v2.0.4-0.20190803094908-fe8645b0a904+incompatible github.com/golang/snappy v0.0.4 diff --git a/interfaces/common.go b/interfaces/common.go index 5c95514..bb5f86e 100644 --- a/interfaces/common.go +++ b/interfaces/common.go @@ -3,11 +3,11 @@ package interfaces import ( "io" - "github.com/wqshr12345/golib/adaptive" + "github.com/wqshr12345/golib/common" ) type Reporter interface { - Report(adaptive.CompressInfo) error + Report(common.CompressInfo) error } type ReadReporter interface { @@ -19,6 +19,11 @@ type Flusher interface { Flush() error } +type WriteFlusher interface { + io.Writer + Flusher +} + type WriteFlusherReporter interface { io.Writer Flusher @@ -30,3 +35,12 @@ type ReadWriteCloseReportFlusher interface { WriteFlusherReporter io.Closer } + +type WriterFlusher2 interface { + Write2(p []byte) (n int, cmprInfo common.CompressInfo, isNil bool, err error) + Flush2() (cmprInfo common.CompressInfo, err error) +} + +type Reader2 interface { + Read2(p []byte) (n int, cmprInfo common.CompressInfo, isNil bool, err error) +} diff --git a/io/io.go b/io/io.go index 5e4b973..8d2586d 100644 --- a/io/io.go +++ b/io/io.go @@ -19,8 +19,10 @@ import ( "sync" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" "github.com/wqshr12345/golib/adaptive" "github.com/wqshr12345/golib/asyncio" + "github.com/wqshr12345/golib/common" "github.com/wqshr12345/golib/crypto" "github.com/wqshr12345/golib/interfaces" "github.com/wqshr12345/golib/pool" @@ -113,7 +115,18 @@ func WithCompressionFromPool(rwc io.ReadWriteCloser) (out io.ReadWriteCloser, re return } -func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc adaptive.ReportFunction, bufSize int, compressType uint8) (out interfaces.ReadWriteCloseReportFlusher, recycle func()) { +func WithCompression2(rwc io.ReadWriteCloser, compressType uint8) (out interfaces.ReadWriteCloseReportFlusher) { + sr, _ := zstd.NewReader(rwc) + sw, _ := zstd.NewWriter(rwc) + return WrapReadWriteCloseReportFlusher3(sr, sw, func() error { + err := sw.Close() + err = rwc.Close() + return err + }) +} + +// use normal compression interface. +func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc common.ReportFunction, bufSize int, compressType uint8) (out interfaces.ReadWriteCloseReportFlusher, recycle func()) { sr := adaptive.NewReader(rwc, reportFunc) sw := adaptive.NewWriter(rwc, bufSize, compressType) out = WrapReadWriteCloseReportFlusher(sr, sw, func() error { @@ -185,6 +198,28 @@ func WrapReadWriteCloseReportFlusher(r io.Reader, w interfaces.WriteFlusherRepor } } +type MockWriteFlusherReporter2 struct { + w interfaces.WriteFlusher +} + +func NewMockWriteFlusherReporter2(w interfaces.WriteFlusher) *MockWriteFlusherReporter2 { + return &MockWriteFlusherReporter2{ + w: w, + } +} + +func (m *MockWriteFlusherReporter2) Write(p []byte) (n int, err error) { + return m.w.Write(p) +} + +func (m *MockWriteFlusherReporter2) Flush() error { + return m.w.Flush() +} + +func (m *MockWriteFlusherReporter2) Report(info common.CompressInfo) error { + return nil +} + type MockWriteFlusherReporter struct { w io.Writer } @@ -203,7 +238,7 @@ func (m *MockWriteFlusherReporter) Flush() error { return nil } -func (m *MockWriteFlusherReporter) Report(info adaptive.CompressInfo) error { +func (m *MockWriteFlusherReporter) Report(info common.CompressInfo) error { return nil } @@ -216,6 +251,15 @@ func WrapReadWriteCloseReportFlusher2(rwc io.ReadWriteCloser) interfaces.ReadWri } } +func WrapReadWriteCloseReportFlusher3(r io.Reader, w interfaces.WriteFlusher, closeFn func() error) interfaces.ReadWriteCloseReportFlusher { + return &ReadWriteCloseReportFlusher{ + r: r, + w: NewMockWriteFlusherReporter2(w), + closeFn: closeFn, + closed: false, + } +} + func (rwcrf *ReadWriteCloseReportFlusher) Read(p []byte) (n int, err error) { return rwcrf.r.Read(p) } @@ -239,7 +283,7 @@ func (rwcrf *ReadWriteCloseReportFlusher) Close() error { return nil } -func (rwcrf *ReadWriteCloseReportFlusher) Report(info adaptive.CompressInfo) error { +func (rwcrf *ReadWriteCloseReportFlusher) Report(info common.CompressInfo) error { return rwcrf.w.Report(info) }