Skip to content

Commit

Permalink
reimplement zstd compressor without sync.Pool
Browse files Browse the repository at this point in the history
The Encoder and Decoder types in github.com/klauspost/compress/zstd offer
EncodeAll and DecodeAll functions which can be used concurrently, which
should allow us to drop the complex finalizer setup required to avoid
leaking resources.

While go-grpc uses an io.Reader/io.Writer based API, it appears that it
uses buffers to compress/decompress data in independent chunks rather than
streaming large amounts of data. So this should line up with the EncodeAll
and DecodeAll functions mentioned above.
  • Loading branch information
mostynb committed Dec 2, 2020
1 parent 0443371 commit 5f24893
Showing 1 changed file with 52 additions and 92 deletions.
144 changes: 52 additions & 92 deletions zstd/zstd.go
Original file line number Diff line number Diff line change
@@ -1,139 +1,99 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Copyright 2020 Mostyn Bramley-Moore.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package zstd is a wrapper for using github.com/klauspost/compress/zstd
// with gRPC.
package zstd

// This code is based upon the gzip wrapper in github.com/grpc/grpc-go:
// https://github.com/grpc/grpc-go/blob/master/encoding/gzip/gzip.go

import (
"bytes"
"io"
"io/ioutil"
"runtime"
"sync"

zstdlib "github.com/klauspost/compress/zstd"
"github.com/klauspost/compress/zstd"
"google.golang.org/grpc/encoding"
)

const Name = "zstd"

type compressor struct {
poolCompressor sync.Pool
poolDecompressor sync.Pool
}

type writer struct {
*zstdlib.Encoder
pool *sync.Pool
}

type reader struct {
*zstdlib.Decoder
pool *sync.Pool
encoder *zstd.Encoder
decoder *zstd.Decoder
}

func init() {
c := &compressor{}
c.poolCompressor.New = func() interface{} {
w, err := zstdlib.NewWriter(ioutil.Discard)
if err != nil {
panic(err)
}
writer := &writer{Encoder: w, pool: &c.poolCompressor}
runtime.SetFinalizer(writer, finalizeWriter)
return writer
enc, _ := zstd.NewWriter(nil)
dec, _ := zstd.NewReader(nil)
c := &compressor{
encoder: enc,
decoder: dec,
}
encoding.RegisterCompressor(c)
}

// SetLevel updates the registered compressor to use a particular compression
// level. NOTE: this function must only be called from an init function, and
// is not threadsafe.
func SetLevel(level zstdlib.EncoderLevel) error {
func SetLevel(level zstd.EncoderLevel) error {
c := encoding.GetCompressor(Name).(*compressor)
c.poolCompressor.New = func() interface{} {
w, err := zstdlib.NewWriter(ioutil.Discard,
zstdlib.WithEncoderLevel(level))
if err != nil {
return err
}

writer := &writer{Encoder: w, pool: &c.poolCompressor}
runtime.SetFinalizer(writer, finalizeWriter)
return writer

enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(level))
if err != nil {
return err
}

c.encoder = enc
return nil
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
z := c.poolCompressor.Get().(*writer)
z.Encoder.Reset(w)
return z, nil
return &zstdWriteCloser{
enc: c.encoder,
writer: w,
}, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
z, inPool := c.poolDecompressor.Get().(*reader)
if !inPool {
newZ, err := zstdlib.NewReader(r)
if err != nil {
return nil, err
}
reader := &reader{Decoder: newZ, pool: &c.poolDecompressor}
runtime.SetFinalizer(reader, finalizeReader)
return reader, nil
}
if err := z.Reset(r); err != nil {
c.poolDecompressor.Put(z)
return nil, err
}
return z, nil
type zstdWriteCloser struct {
enc *zstd.Encoder
writer io.Writer // Compressed data will be written here.
buf bytes.Buffer // Buffer uncompressed data here, compress on Close.
}

func (c *compressor) Name() string {
return Name
func (z *zstdWriteCloser) Write(p []byte) (int, error) {
return z.buf.Write(p)
}

func (z *writer) Close() error {
err := z.Encoder.Close()
z.pool.Put(z)
func (z *zstdWriteCloser) Close() error {
compressed := z.enc.EncodeAll(z.buf.Bytes(), nil)
_, err := io.Copy(z.writer, bytes.NewReader(compressed))
return err
}

func (z *reader) Read(p []byte) (n int, err error) {
n, err = z.Decoder.Read(p)
if err == io.EOF {
z.pool.Put(z)
func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
compressed, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
return n, err
}

func finalizeReader(r *reader) {
if r.Decoder != nil {
r.Decoder.Close()
uncompressed, err := c.decoder.DecodeAll(compressed, nil)
if err != nil {
return nil, err
}

return bytes.NewReader(uncompressed), nil
}

func finalizeWriter(w *writer) {
if w.Encoder != nil {
w.Encoder.Close()
}
func (c *compressor) Name() string {
return Name
}

0 comments on commit 5f24893

Please sign in to comment.