Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add experimental/klauspost_snappy package, which might be more efficient than the google snappy package #12

Merged
merged 1 commit into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
This respository contains go gRPC encoding wrappers for some useful compression
algorithms that are not available in google.golang.org/grpc.

* https://github.com/golang/snappy
* https://github.com/klauspost/compress/tree/master/zstd
* https://github.com/pierrec/lz4
* snappy - using https://github.com/golang/snappy
* zstd - using https://github.com/klauspost/compress/tree/master/zstd
* lz4 - using https://github.com/pierrec/lz4

The following algorithms also have experimental implementations, which have
not been tested as much as those above. These may be changed significantly, or
even removed from this library at a future point.

* https://github.com/klauspost/compress/tree/master/s2
* experimental/s2 - using https://github.com/klauspost/compress/tree/master/s2
* experimental/klauspost_snappy - using https://github.com/klauspost/compress/tree/master/s2
in snappy compatibility mode
98 changes: 98 additions & 0 deletions experimental/klauspost_snappy/klauspost_snappy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package experimental/klauspost_snappy is a wrapper for using
// github.com/klauspost/compress/s2 in snappy compatibility mode with gRPC.
// It might be more efficient than the top-level snappy package which makes
// use of github.com/golang/snappy.
//
// Note that this is registered under the same "snappy" name with gRPC, so
// only one of the two packages should be used at a time.
package klauspost_snappy

// This code is based upon the gzip wrapper in github.com/grpc/grpc-go:
// https://github.com/grpc/grpc-go/blob/master/encoding/gzip/gzip.go

import (
"io"
"io/ioutil"
"sync"

snappylib "github.com/klauspost/compress/s2"
"google.golang.org/grpc/encoding"
)

const Name = "snappy"

type compressor struct {
poolCompressor sync.Pool
poolDecompressor sync.Pool
}

type writer struct {
*snappylib.Writer
pool *sync.Pool
}

type reader struct {
*snappylib.Reader
pool *sync.Pool
}

func init() {
c := &compressor{}
c.poolCompressor.New = func() interface{} {
w := snappylib.NewWriter(ioutil.Discard, snappylib.WriterSnappyCompat())
return &writer{Writer: w, pool: &c.poolCompressor}
}
encoding.RegisterCompressor(c)
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
z := c.poolCompressor.Get().(*writer)
z.Writer.Reset(w)
return z, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
z, inPool := c.poolDecompressor.Get().(*reader)
if !inPool {
newR := snappylib.NewReader(r, snappylib.ReaderAllocBlock(64 << 10))
return &reader{Reader: newR, pool: &c.poolDecompressor}, nil
}
z.Reset(r)
return z, nil
}

func (c *compressor) Name() string {
return Name
}

func (z *writer) Close() error {
err := z.Writer.Close()
z.pool.Put(z)
return err
}

func (z *reader) Read(p []byte) (n int, err error) {
n, err = z.Reader.Read(p)
if err == io.EOF {
z.pool.Put(z)
}
return n, err
}
100 changes: 100 additions & 0 deletions experimental/klauspost_snappy/klauspost_snappy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package klauspost_snappy

import (
"bytes"
"context"
"io/ioutil"
"net"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/test/bufconn"

"github.com/mostynb/go-grpc-compression/internal/testserver"
)

const (
bufSize = 1024
message = "Message Request snappy"
)

func TestRegisteredCompression(t *testing.T) {
comp := encoding.GetCompressor(Name)
require.NotNil(t, comp)
assert.Equal(t, Name, comp.Name())

buf := bytes.NewBuffer(make([]byte, 0, bufSize))
wc, err := comp.Compress(buf)
require.NoError(t, err)

_, err = wc.Write([]byte(message))
require.NoError(t, err)
assert.NoError(t, wc.Close())

r, err := comp.Decompress(buf)
require.NoError(t, err)
expected, err := ioutil.ReadAll(r)
require.NoError(t, err)

assert.Equal(t, message, string(expected))
}

func TestRoundTrip(t *testing.T) {
lis := bufconn.Listen(bufSize)
t.Cleanup(func() {
assert.NoError(t, lis.Close())
})

done := make(chan struct{}, 1)

s := grpc.NewServer()
defer func() {
s.GracefulStop()
<-done
}()
testserver.RegisterTestServerServer(s, &testserver.EchoTestServer{})
go func() {
if err := s.Serve(lis); err != nil && err != grpc.ErrServerStopped {
t.Errorf("Server exited with error: %v", err)
}
done <- struct{}{}
}()

ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithDefaultCallOptions(grpc.UseCompressor(Name)),
grpc.WithInsecure())
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, conn.Close())
})

client := testserver.NewTestServerClient(conn)
resp, err := client.SendMessage(ctx, &testserver.MessageRequest{Request: message})
require.NoError(t, err)
assert.Equal(t, message, resp.Response)
}