Skip to content

Commit

Permalink
feat(io):extract sth into common && add new stream compression
Browse files Browse the repository at this point in the history
interface:WithCompression2.
  • Loading branch information
wqshr12345 committed Nov 6, 2023
1 parent a2afc41 commit a7131e6
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 17 deletions.
11 changes: 6 additions & 5 deletions adaptive/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (
"io"
"time"

"github.com/wqshr12345/golib/common"
"github.com/wqshr12345/golib/compression"
"github.com/wqshr12345/golib/compression/snappy"
"github.com/wqshr12345/golib/compression/zstd"
)

func NewReader(r io.Reader, reportFunc ReportFunction) *Reader {
func NewReader(r io.Reader, reportFunc common.ReportFunction) *Reader {
cmprs := make(map[uint8]compression.Decompressor)
cmprs[CompressTypeSnappy] = snappy.NewDecompressor()
cmprs[CompressTypeZstd] = zstd.NewDecompressor()
cmprs[common.CompressTypeSnappy] = snappy.NewDecompressor()
cmprs[common.CompressTypeZstd] = zstd.NewDecompressor()
return &Reader{
inR: r,
cmprs: cmprs,
Expand All @@ -50,7 +51,7 @@ type Reader struct {
// oBuf[start:] represent valid data.
start int

reportFunc ReportFunction
reportFunc common.ReportFunction

pkgID int
}
Expand Down Expand Up @@ -109,7 +110,7 @@ func (r *Reader) fill() error {
mid2Ts := time.Now().UnixNano()
r.oBuf = r.cmprs[uint8(compressType)].Decompress(nil, compressedData)
endTs := time.Now().UnixNano()
compressInfo := CompressInfo{
compressInfo := common.CompressInfo{
PkgId: r.pkgID,
DataLen: len(r.oBuf),
CompressType: int(compressType),
Expand Down
11 changes: 6 additions & 5 deletions adaptive/writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 wqshr12345, [email protected]
// Copyright 2023 wqshr12345, [email protected]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ import (
"io"
"time"

"github.com/wqshr12345/golib/common"
"github.com/wqshr12345/golib/compression"
"github.com/wqshr12345/golib/compression/snappy"
"github.com/wqshr12345/golib/compression/zstd"
Expand Down Expand Up @@ -58,7 +59,7 @@ type Writer struct {
// iBuf is a buffer for the incoming unadaptiveencoded data.If it's nil, we will not buffer the incoming data.
iBuf []byte

compressInfo []CompressInfo
compressInfo []common.CompressInfo
}

func NewWriter(w io.Writer, bufSize int, cmprType uint8) *Writer {
Expand All @@ -67,8 +68,8 @@ func NewWriter(w io.Writer, bufSize int, cmprType uint8) *Writer {
// }

cmprs := make(map[uint8]compression.Compressor)
cmprs[CompressTypeSnappy] = snappy.NewCompressor()
cmprs[CompressTypeZstd] = zstd.NewCompressor()
cmprs[common.CompressTypeSnappy] = snappy.NewCompressor()
cmprs[common.CompressTypeZstd] = zstd.NewCompressor()

return &Writer{
outW: w,
Expand Down Expand Up @@ -168,7 +169,7 @@ func (w *Writer) Flush() error {
return w.err
}

func (w *Writer) Report(info CompressInfo) error {
func (w *Writer) Report(info common.CompressInfo) error {
// TODO(wangqian):Do more things.
w.compressInfo = append(w.compressInfo, info)
return nil
Expand Down
2 changes: 1 addition & 1 deletion adaptive/adaptive.go → common/common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package adaptive
package common

type CompressInfo struct {
PkgId int
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.20

require (
github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c
// github.com/DataDog/zstd v1.5.5
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatedier/kcp-go v2.0.4-0.20190803094908-fe8645b0a904+incompatible
github.com/golang/snappy v0.0.4
Expand Down
18 changes: 16 additions & 2 deletions interfaces/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package interfaces
import (
"io"

"github.com/wqshr12345/golib/adaptive"
"github.com/wqshr12345/golib/common"
)

type Reporter interface {
Report(adaptive.CompressInfo) error
Report(common.CompressInfo) error
}

type ReadReporter interface {
Expand All @@ -19,6 +19,11 @@ type Flusher interface {
Flush() error
}

type WriteFlusher interface {
io.Writer
Flusher
}

type WriteFlusherReporter interface {
io.Writer
Flusher
Expand All @@ -30,3 +35,12 @@ type ReadWriteCloseReportFlusher interface {
WriteFlusherReporter
io.Closer
}

type WriterFlusher2 interface {
Write2(p []byte) (n int, cmprInfo common.CompressInfo, isNil bool, err error)
Flush2() (cmprInfo common.CompressInfo, err error)
}

type Reader2 interface {
Read2(p []byte) (n int, cmprInfo common.CompressInfo, isNil bool, err error)
}
50 changes: 47 additions & 3 deletions io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"sync"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/wqshr12345/golib/adaptive"
"github.com/wqshr12345/golib/asyncio"
"github.com/wqshr12345/golib/common"
"github.com/wqshr12345/golib/crypto"
"github.com/wqshr12345/golib/interfaces"
"github.com/wqshr12345/golib/pool"
Expand Down Expand Up @@ -113,7 +115,18 @@ func WithCompressionFromPool(rwc io.ReadWriteCloser) (out io.ReadWriteCloser, re
return
}

func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc adaptive.ReportFunction, bufSize int, compressType uint8) (out interfaces.ReadWriteCloseReportFlusher, recycle func()) {
func WithCompression2(rwc io.ReadWriteCloser, compressType uint8) (out interfaces.ReadWriteCloseReportFlusher) {
sr, _ := zstd.NewReader(rwc)
sw, _ := zstd.NewWriter(rwc)
return WrapReadWriteCloseReportFlusher3(sr, sw, func() error {
err := sw.Close()
err = rwc.Close()
return err
})
}

// use normal compression interface.
func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc common.ReportFunction, bufSize int, compressType uint8) (out interfaces.ReadWriteCloseReportFlusher, recycle func()) {
sr := adaptive.NewReader(rwc, reportFunc)
sw := adaptive.NewWriter(rwc, bufSize, compressType)
out = WrapReadWriteCloseReportFlusher(sr, sw, func() error {
Expand Down Expand Up @@ -185,6 +198,28 @@ func WrapReadWriteCloseReportFlusher(r io.Reader, w interfaces.WriteFlusherRepor
}
}

type MockWriteFlusherReporter2 struct {
w interfaces.WriteFlusher
}

func NewMockWriteFlusherReporter2(w interfaces.WriteFlusher) *MockWriteFlusherReporter2 {
return &MockWriteFlusherReporter2{
w: w,
}
}

func (m *MockWriteFlusherReporter2) Write(p []byte) (n int, err error) {
return m.w.Write(p)
}

func (m *MockWriteFlusherReporter2) Flush() error {
return m.w.Flush()
}

func (m *MockWriteFlusherReporter2) Report(info common.CompressInfo) error {
return nil
}

type MockWriteFlusherReporter struct {
w io.Writer
}
Expand All @@ -203,7 +238,7 @@ func (m *MockWriteFlusherReporter) Flush() error {
return nil
}

func (m *MockWriteFlusherReporter) Report(info adaptive.CompressInfo) error {
func (m *MockWriteFlusherReporter) Report(info common.CompressInfo) error {
return nil
}

Expand All @@ -216,6 +251,15 @@ func WrapReadWriteCloseReportFlusher2(rwc io.ReadWriteCloser) interfaces.ReadWri
}
}

func WrapReadWriteCloseReportFlusher3(r io.Reader, w interfaces.WriteFlusher, closeFn func() error) interfaces.ReadWriteCloseReportFlusher {
return &ReadWriteCloseReportFlusher{
r: r,
w: NewMockWriteFlusherReporter2(w),
closeFn: closeFn,
closed: false,
}
}

func (rwcrf *ReadWriteCloseReportFlusher) Read(p []byte) (n int, err error) {
return rwcrf.r.Read(p)
}
Expand All @@ -239,7 +283,7 @@ func (rwcrf *ReadWriteCloseReportFlusher) Close() error {
return nil
}

func (rwcrf *ReadWriteCloseReportFlusher) Report(info adaptive.CompressInfo) error {
func (rwcrf *ReadWriteCloseReportFlusher) Report(info common.CompressInfo) error {
return rwcrf.w.Report(info)
}

Expand Down

0 comments on commit a7131e6

Please sign in to comment.