Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring and Add test code for compress #622

Merged
merged 18 commits into from
Aug 24, 2020
20 changes: 12 additions & 8 deletions internal/compress/gob.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ package compress

import (
"bytes"
"encoding/gob"
"io"
"reflect"

"github.com/vdaas/vald/internal/compress/gob"
"github.com/vdaas/vald/internal/errors"
)

type gobCompressor struct {
transcoder gob.Transcoder
}

// NewGob returns a Compressor implemented using gob.
func NewGob(opts ...GobOption) (Compressor, error) {
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
c := new(gobCompressor)
c := &gobCompressor{
transcoder: gob.New(),
}
for _, opt := range append(defaultGobOpts, opts...) {
if err := opt(c); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
Expand All @@ -42,7 +46,7 @@ func NewGob(opts ...GobOption) (Compressor, error) {

func (g *gobCompressor) CompressVector(vector []float32) ([]byte, error) {
buf := new(bytes.Buffer)
err := gob.NewEncoder(buf).Encode(vector)
err := g.transcoder.NewEncoder(buf).Encode(vector)
if err != nil {
return nil, err
}
Expand All @@ -52,7 +56,7 @@ func (g *gobCompressor) CompressVector(vector []float32) ([]byte, error) {

func (g *gobCompressor) DecompressVector(bs []byte) ([]float32, error) {
var vector []float32
err := gob.NewDecoder(bytes.NewBuffer(bs)).Decode(&vector)
err := g.transcoder.NewDecoder(bytes.NewBuffer(bs)).Decode(&vector)
if err != nil {
return nil, err
}
Expand All @@ -63,20 +67,20 @@ func (g *gobCompressor) DecompressVector(bs []byte) ([]float32, error) {
func (g *gobCompressor) Reader(src io.ReadCloser) (io.ReadCloser, error) {
return &gobReader{
src: src,
decoder: gob.NewDecoder(src),
decoder: g.transcoder.NewDecoder(src),
}, nil
}

func (g *gobCompressor) Writer(dst io.WriteCloser) (io.WriteCloser, error) {
return &gobWriter{
dst: dst,
encoder: gob.NewEncoder(dst),
encoder: g.transcoder.NewEncoder(dst),
}, nil
}

type gobReader struct {
src io.ReadCloser
decoder *gob.Decoder
decoder gob.Decoder
}

func (gr *gobReader) Read(p []byte) (n int, err error) {
Expand All @@ -94,7 +98,7 @@ func (gr *gobReader) Close() error {

type gobWriter struct {
dst io.WriteCloser
encoder *gob.Encoder
encoder gob.Encoder
}

func (gw *gobWriter) Write(p []byte) (n int, err error) {
Expand Down
54 changes: 54 additions & 0 deletions internal/compress/gob/gob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package gob
hlts2 marked this conversation as resolved.
Show resolved Hide resolved

import (
"encoding/gob"
"io"
)

// Encoder represents an interface for Encoder of gob.
type Encoder interface {
Encode(e interface{}) error
}
kpango marked this conversation as resolved.
Show resolved Hide resolved

// Decoder represents an interface for Decoder of gob.
type Decoder interface {
Decode(e interface{}) error
}

// Transcoder is an interface to create Encoder and Decoder implementation.
type Transcoder interface {
NewEncoder(w io.Writer) Encoder
NewDecoder(r io.Reader) Decoder
}

type transcoder struct{}

// New returns Transcoder implementation.
func New() Transcoder {
return new(transcoder)
}

// NewEncoder returns Encoder implementation.
func (*transcoder) NewEncoder(w io.Writer) Encoder {
return gob.NewEncoder(w)
}

// NewDecoder returns Decoder implementation.
func (*transcoder) NewDecoder(r io.Reader) Decoder {
return gob.NewDecoder(r)
}
54 changes: 54 additions & 0 deletions internal/compress/gob/gob_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package gob
hlts2 marked this conversation as resolved.
Show resolved Hide resolved

import "io"

// MockEncoder represents mock struct of Encoder.
type MockEncoder struct {
EncodeFunc func(e interface{}) error
}

// Encode calls EncodeFunc.
func (m *MockEncoder) Encode(e interface{}) error {
return m.EncodeFunc(e)
}

// MockDecoder represents mock struct of Decoder.
type MockDecoder struct {
DecodeFunc func(e interface{}) error
}

// Decode calls DecodeFunc.
func (m *MockDecoder) Decode(e interface{}) error {
return m.DecodeFunc(e)
}

// MockTranscoder represents mock struct of Transcoder.
type MockTranscoder struct {
NewEncoderFunc func(w io.Writer) Encoder
NewDecoderFunc func(r io.Reader) Decoder
}

// NewEncoder calls NewEncoderFunc.
func (m *MockTranscoder) NewEncoder(w io.Writer) Encoder {
return m.NewEncoderFunc(w)
}

// NewDecoder calls NewEncoderFunc.
func (m *MockTranscoder) NewDecoder(r io.Reader) Decoder {
return m.NewDecoderFunc(r)
}
24 changes: 18 additions & 6 deletions internal/compress/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ import (
"io"
"reflect"

"github.com/klauspost/compress/gzip"
"github.com/vdaas/vald/internal/compress/gzip"
"github.com/vdaas/vald/internal/errors"
)

type gzipCompressor struct {
gobc Compressor
compressionLevel int
gzip gzip.Gzip
}

// NewGzip returns Compressor implementation.
func NewGzip(opts ...GzipOption) (Compressor, error) {
c := new(gzipCompressor)
c := &gzipCompressor{
gzip: gzip.New(),
}
for _, opt := range append(defaultGzipOpts, opts...) {
if err := opt(c); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
Expand All @@ -42,9 +46,10 @@ func NewGzip(opts ...GzipOption) (Compressor, error) {
return c, nil
}

// CompressVector Compress the data and returns an error if compression fails.
func (g *gzipCompressor) CompressVector(vector []float32) ([]byte, error) {
buf := new(bytes.Buffer)
gw, err := gzip.NewWriterLevel(buf, g.compressionLevel)
gw, err := g.gzip.NewWriterLevel(buf, g.compressionLevel)
if err != nil {
return nil, err
}
Expand All @@ -67,9 +72,10 @@ func (g *gzipCompressor) CompressVector(vector []float32) ([]byte, error) {
return buf.Bytes(), nil
}

// CompressVector Decompress the compressed data and returns an error if decompression fails.
func (g *gzipCompressor) DecompressVector(bs []byte) ([]float32, error) {
buf := new(bytes.Buffer)
gr, err := gzip.NewReader(bytes.NewBuffer(bs))
gr, err := g.gzip.NewReader(bytes.NewBuffer(bs))
if err != nil {
return nil, err
}
Expand All @@ -87,8 +93,9 @@ func (g *gzipCompressor) DecompressVector(bs []byte) ([]float32, error) {
return vec, nil
}

// Reader returns io.ReadCloser implementation.
func (g *gzipCompressor) Reader(src io.ReadCloser) (io.ReadCloser, error) {
r, err := gzip.NewReader(src)
r, err := g.gzip.NewReader(src)
if err != nil {
return nil, err
}
Expand All @@ -99,8 +106,9 @@ func (g *gzipCompressor) Reader(src io.ReadCloser) (io.ReadCloser, error) {
}, nil
}

// Writer returns io.WriteCloser implementation.
func (g *gzipCompressor) Writer(dst io.WriteCloser) (io.WriteCloser, error) {
w, err := gzip.NewWriterLevel(dst, g.compressionLevel)
w, err := g.gzip.NewWriterLevel(dst, g.compressionLevel)
if err != nil {
return nil, err
}
Expand All @@ -116,10 +124,12 @@ type gzipReader struct {
r io.ReadCloser
}

// Read reads up to len(p) bytes into p.
func (g *gzipReader) Read(p []byte) (n int, err error) {
return g.r.Read(p)
}

// Close closes src and r.
func (g *gzipReader) Close() (err error) {
err = g.r.Close()
if err != nil {
Expand All @@ -134,10 +144,12 @@ type gzipWriter struct {
w io.WriteCloser
}

// Write writes len(p) bytes from p
func (g *gzipWriter) Write(p []byte) (n int, err error) {
return g.w.Write(p)
}

// Close closes dst and w.
func (g *gzipWriter) Close() (err error) {
err = g.w.Close()
if err != nil {
Expand Down
69 changes: 69 additions & 0 deletions internal/compress/gzip/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package gzip

import (
"io"

"github.com/klauspost/compress/gzip"
)

// These constants are copied from the gzip package.
const (
NoCompression = gzip.NoCompression
BestSpeed = gzip.BestSpeed
BestCompression = gzip.BestCompression
DefaultCompression = gzip.DefaultCompression
ConstantCompression = gzip.ConstantCompression
HuffmanOnly = gzip.HuffmanOnly
)

// Reader represents an interface for Reader of gzip.
type Reader interface {
io.ReadCloser
Reset(r io.Reader) error
Multistream(ok bool)
}

// Writer represents an interface for Writer of gzip.
type Writer interface {
io.WriteCloser
Reset(w io.Writer)
Flush() error
}

// Gzip is an interface to create Writer and Reader implementation.
type Gzip interface {
NewReader(r io.Reader) (Reader, error)
NewWriterLevel(w io.Writer, level int) (Writer, error)
}

type compress struct{}

// New returns Gzip implementation.
func New() Gzip {
return new(compress)
}

// NewWriterLevel returns Writer implementation.
func (*compress) NewWriterLevel(w io.Writer, level int) (Writer, error) {
return gzip.NewWriterLevel(w, level)
}

// NewReader returns Reader implementation.
func (*compress) NewReader(r io.Reader) (Reader, error) {
return gzip.NewReader(r)
}
Loading