Skip to content

Commit

Permalink
refractor(adaptive):Drop snappy's Read and Writer used in adaptive re…
Browse files Browse the repository at this point in the history
…ader/writer,replace them with snappy's Decode and Encode;Change interface to report compression info.
  • Loading branch information
wqshr12345 committed Oct 24, 2023
1 parent b6bb931 commit daf952c
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 93 deletions.
16 changes: 16 additions & 0 deletions adaptive/adaptive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package adaptive

type CompressInfo struct {
pkgId int
compressType int
compressTime int64
tranportTime int64
decompressTime int64
compressRatio float64
}

type ReportFunction func(CompressInfo)

type Reporter interface {
Report(CompressInfo)
}
121 changes: 64 additions & 57 deletions adaptive/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,84 +17,49 @@ package adaptive
import (
"errors"
"io"
"time"

"github.com/golang/snappy"
"github.com/wqshr12345/golib/compression"
"github.com/wqshr12345/golib/compression/snappy"
)

func NewReader(r io.Reader) *Reader {
func NewReader(r io.Reader, reportFunc ReportFunction) *Reader {
return &Reader{
inR: r,
cmprR: snappy.NewReader(r),
iBuf: make([]byte, packageHeaderSize),
inR: r,
cmpr: snappy.NewDecompressor(),
reportFunc: reportFunc,
}
}

type Reader struct {

// the data will be transported from inR.
inR io.Reader

cmprR io.Reader
cmpr compression.Decompressor

err error

//iBuf is a buffer for the incoming adaptiveencoded header.
iBuf []byte

// Note(wangqian):The actually decompressed data is stored in cmprR's buf.

dataOff int

dataLen int

compressType byte

timestamp int64
// buffered decompressed data.
oBuf []byte

// Note(wangqian): We should not use a oBuf, because here we will not do some decompress operations.We should not use memR as well, because in read logic,adaptive reader is behind the snappy reader.
}

func (r *Reader) Reset(reader io.Reader) {
r.inR = reader
r.cmprR = snappy.NewReader(reader)
r.iBuf = make([]byte, packageHeaderSize)
r.dataOff = 0
r.dataLen = 0
r.compressType = 0
r.timestamp = 0
r.err = nil
// oBuf[start:] represent valid data.
start int

reportFunc ReportFunction
}

func (r *Reader) Read(p []byte) (n int, err error) {
// 1. read package header to iBuf.

// Based on the premise that a package with an adaptive encoding format must encapsulate a Snappy chunk
if r.dataOff > r.dataLen {
return 0, errors.New("LANDS: off large than len")
func (r *Reader) Read(p []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
if r.dataOff == r.dataLen {
if !r.readFull(r.iBuf, true) {
return 0, r.err
}
r.compressType = r.iBuf[0]
r.timestamp = int64(r.iBuf[1]) | int64(r.iBuf[2])<<8 | int64(r.iBuf[3])<<16 | int64(r.iBuf[4])<<24 | int64(r.iBuf[5])<<32 | int64(r.iBuf[6])<<40 | int64(r.iBuf[7])<<48 | int64(r.iBuf[8])<<56
r.dataLen = int(r.iBuf[9]) | int(r.iBuf[10])<<8 | int(r.iBuf[11])<<16 | int(r.iBuf[12])<<24
r.dataOff = 0
}

// 2. read package header to dataBuf.

switch r.compressType {
case compressTypeSnappy:
if n, r.err = r.cmprR.Read(p); err != nil {
return n, r.err
}
r.dataOff += n
default:
panic("LANDS: unsupported compression type")
if err := r.fill(); err != nil {
return 0, err
}
return n, r.err

n := copy(p, r.oBuf[r.start:])
r.start += n
return n, nil
}

func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
Expand All @@ -106,3 +71,45 @@ func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
}
return true
}

func (r *Reader) fill() error {
if r.start > len(r.oBuf) {
panic("LANDS: start > end")
}
if r.start < len(r.oBuf) {
return nil
}
// 1. read package header.
iBuf := make([]byte, packageHeaderSize)
if !r.readFull(iBuf, true) {
return r.err
}
compressType := iBuf[0]
startTs := int64(iBuf[1]) | int64(iBuf[2])<<8 | int64(iBuf[3])<<16 | int64(iBuf[4])<<24 | int64(iBuf[5])<<32 | int64(iBuf[6])<<40 | int64(iBuf[7])<<48 | int64(iBuf[8])<<56
midTs := int64(iBuf[9]) | int64(iBuf[10])<<8 | int64(iBuf[11])<<16 | int64(iBuf[12])<<24 | int64(iBuf[13])<<32 | int64(iBuf[14])<<40 | int64(iBuf[15])<<48 | int64(iBuf[16])<<56
dataLen := int(iBuf[17]) | int(iBuf[18])<<8 | int(iBuf[19])<<16 | int(iBuf[20])<<24

compressedData := make([]byte, dataLen)

// 2. read compressed data.
if !r.readFull(compressedData, false) {
return r.err
}

// 3. decompress compressed data.
// TODO(wangqian): Use compressType to choose different decompressor.
// TODO(wangqian): Should we avoid memory allocate every times?
mid2Ts := time.Now().UnixNano()
r.oBuf = r.cmpr.Decompress(nil, compressedData)
endTs := time.Now().UnixNano()
compressInfo := CompressInfo{
compressType: int(compressType),
compressTime: midTs - startTs,
tranportTime: mid2Ts - midTs,
decompressTime: endTs - mid2Ts,
compressRatio: float64(len(r.oBuf)) / float64(dataLen),
}
r.reportFunc(compressInfo)
r.start = 0
return nil
}
81 changes: 47 additions & 34 deletions adaptive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
"io"
"time"

"github.com/golang/snappy"
"github.com/wqshr12345/golib/compression"
"github.com/wqshr12345/golib/compression/snappy"
)

const (
Expand All @@ -36,13 +37,13 @@ Adaptive Encoding Format:
// TODO(wangqian): extract the const parameter to a config file.
const (
// magicHeader = "LANDS"
packageTimestampSize = 8
packageTimestampSize = 16 // startTs + midTs
packageCompressTypeSize = 1
packageDataLenSize = 4

maxOriginalDataSize = 65536

// temoparily is work only on Snappy.
// temoparily is not used, because there is no reusable oBuf in reader.
maxCompressedDataSize = 76490

packageHeaderSize = packageTimestampSize + packageCompressTypeSize + packageDataLenSize
Expand All @@ -53,28 +54,24 @@ type Writer struct {
// the data will be transported to outW.
outW io.Writer

// only used to compress data, we will not transport data to w2.
cmprW io.Writer
cmpr compression.Compressor

err error

// iBuf is a buffer for the incoming unadaptiveencoded data.If it's nil, we will not buffer the incoming data.
iBuf []byte

// oBuf is a buffer for the outgoing adaptiveencoded header.
// Note(wangqian): The actually compressed data is stored in cmprW.
oBuf []byte
compressInfo []CompressInfo
}

func NewWriter(w io.Writer, bufSize int) *Writer {
if bufSize > maxOriginalDataSize {
panic("lands: the buffer is larger than the maximum of snappy's compression data.")
}
return &Writer{
outW: w,
cmprW: snappy.NewWriter(w),
iBuf: make([]byte, 0, bufSize),
oBuf: make([]byte, packageHeaderSize),
outW: w,
cmpr: snappy.NewCompressor(),
iBuf: make([]byte, 0, bufSize),
}
}

Expand Down Expand Up @@ -107,36 +104,47 @@ func (w *Writer) Write(p []byte) (nRet int, errRet error) {

// contruct a adaptive encodeing format use datas in p.
func (w *Writer) write(p []byte) (nRet int, errRet error) {

if w.err != nil {
return 0, w.err
}

oBuf := make([]byte, packageHeaderSize)

// TODO(wangqian): use a oBuf to avoid memory allocate.
startTs := time.Now().UnixNano()
compressedData := w.cmpr.Compress(p)
midTs := time.Now().UnixNano()
dataLen := len(compressedData)
compressType := uint8(compressTypeSnappy)
timestamp := time.Now().UnixNano()

// Note(wangqian): We should use original data len instead of compressed data len.
dataLen := len(p)

w.oBuf[0] = compressType
w.oBuf[1] = uint8(timestamp >> 0)
w.oBuf[2] = uint8(timestamp >> 8)
w.oBuf[3] = uint8(timestamp >> 16)
w.oBuf[4] = uint8(timestamp >> 24)
w.oBuf[5] = uint8(timestamp >> 32)
w.oBuf[6] = uint8(timestamp >> 40)
w.oBuf[7] = uint8(timestamp >> 48)
w.oBuf[8] = uint8(timestamp >> 56)
w.oBuf[9] = uint8(dataLen >> 0)
w.oBuf[10] = uint8(dataLen >> 8)
w.oBuf[11] = uint8(dataLen >> 16)
w.oBuf[12] = uint8(dataLen >> 24)

if _, err := w.outW.Write(w.oBuf[:packageHeaderSize]); err != nil {

oBuf[0] = compressType
oBuf[1] = uint8(startTs >> 0)
oBuf[2] = uint8(startTs >> 8)
oBuf[3] = uint8(startTs >> 16)
oBuf[4] = uint8(startTs >> 24)
oBuf[5] = uint8(startTs >> 32)
oBuf[6] = uint8(startTs >> 40)
oBuf[7] = uint8(startTs >> 48)
oBuf[8] = uint8(startTs >> 56)
oBuf[9] = uint8(midTs >> 0)
oBuf[10] = uint8(midTs >> 8)
oBuf[11] = uint8(midTs >> 16)
oBuf[12] = uint8(midTs >> 24)
oBuf[13] = uint8(midTs >> 32)
oBuf[14] = uint8(midTs >> 40)
oBuf[15] = uint8(midTs >> 48)
oBuf[16] = uint8(midTs >> 56)
oBuf[17] = uint8(dataLen >> 0)
oBuf[18] = uint8(dataLen >> 8)
oBuf[19] = uint8(dataLen >> 16)
oBuf[20] = uint8(dataLen >> 24)

if _, err := w.outW.Write(oBuf); err != nil {
w.err = err
return nRet, err
}

if _, err := w.cmprW.Write(p); err != nil {
if _, err := w.outW.Write(compressedData); err != nil {
w.err = err
return nRet, err
}
Expand All @@ -157,6 +165,11 @@ func (w *Writer) Flush() error {
return w.err
}

func (w *Writer) Report(info CompressInfo) {
// TODO(wangqian):Do more things.
w.compressInfo = append(w.compressInfo, info)
}

func (w *Writer) Close() error {
w.Flush()
ret := w.err
Expand Down
9 changes: 9 additions & 0 deletions compression/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package compression

type Compressor interface {
Compress([]byte) []byte
}

type Decompressor interface {
Decompress([]byte, []byte) []byte
}
21 changes: 21 additions & 0 deletions compression/snappy/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package snappy

import (
"github.com/golang/snappy"
)

func NewDecompressor() *SnappyDecompressor {
return &SnappyDecompressor{}
}

type SnappyDecompressor struct {
}

// this will return a original data.
func (c *SnappyDecompressor) Decompress(dst, src []byte) []byte {
ans, err := snappy.Decode(dst, src)
if err != nil {
panic(err)
}
return ans
}
17 changes: 17 additions & 0 deletions compression/snappy/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package snappy

import (
"github.com/golang/snappy"
)

func NewCompressor() *SnappyCompressor {
return &SnappyCompressor{}
}

type SnappyCompressor struct {
}

// this will return a snappy format data.
func (c *SnappyCompressor) Compress(src []byte) []byte {
return snappy.Encode(nil, src)
}
12 changes: 10 additions & 2 deletions io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func WithCompressionFromPool(rwc io.ReadWriteCloser) (out io.ReadWriteCloser, re
return
}

func WithAdaptiveEncoding(rwc io.ReadWriteCloser, bufSize int) (out io.ReadWriteCloser, recycle func()) {
sr := adaptive.NewReader(rwc)
func WithAdaptiveEncoding(rwc io.ReadWriteCloser, reportFunc adaptive.ReportFunction, bufSize int) (out io.ReadWriteCloser, recycle func()) {
sr := adaptive.NewReader(rwc, reportFunc)
sw := adaptive.NewWriter(rwc, bufSize)
out = WrapReadWriteCloser(sr, sw, func() error {
err := sw.Close()
Expand Down Expand Up @@ -139,3 +139,11 @@ func (rwc *ReadWriteCloser) Close() error {
}
return nil
}

func (rwc *ReadWriteCloser) Report(info adaptive.CompressInfo) {
if value, ok := rwc.w.(adaptive.Reporter); ok {
value.Report(info)
} else {
panic("the writer does not implement adaptive.Reporter interface")
}
}

0 comments on commit daf952c

Please sign in to comment.