From 5f2489304fdefc331e2803e6574af3b8c8f4f00a Mon Sep 17 00:00:00 2001 From: Mostyn Bramley-Moore Date: Wed, 2 Dec 2020 08:10:14 +0100 Subject: [PATCH] reimplement zstd compressor without sync.Pool The Encoder and Decoder types in github.com/klauspost/compress/zstd offer EncodeAll and DecodeAll functions which can be used concurrently, which should allow us to drop the complex finalizer setup required to avoid leaking resources. While go-grpc uses an io.Reader/io.Writer based API, it appears that it uses buffers to compress/decompress data in independent chunks rather than streaming large amounts of data. So this should line up with the EncodeAll and DecodeAll functions mentioned above. --- zstd/zstd.go | 144 +++++++++++++++++++-------------------------------- 1 file changed, 52 insertions(+), 92 deletions(-) diff --git a/zstd/zstd.go b/zstd/zstd.go index 22dd13a..aff2386 100644 --- a/zstd/zstd.go +++ b/zstd/zstd.go @@ -1,65 +1,43 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +// Copyright 2020 Mostyn Bramley-Moore. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. // Package zstd is a wrapper for using github.com/klauspost/compress/zstd // with gRPC. package zstd -// This code is based upon the gzip wrapper in github.com/grpc/grpc-go: -// https://github.com/grpc/grpc-go/blob/master/encoding/gzip/gzip.go - import ( + "bytes" "io" "io/ioutil" - "runtime" - "sync" - zstdlib "github.com/klauspost/compress/zstd" + "github.com/klauspost/compress/zstd" "google.golang.org/grpc/encoding" ) const Name = "zstd" type compressor struct { - poolCompressor sync.Pool - poolDecompressor sync.Pool -} - -type writer struct { - *zstdlib.Encoder - pool *sync.Pool -} - -type reader struct { - *zstdlib.Decoder - pool *sync.Pool + encoder *zstd.Encoder + decoder *zstd.Decoder } func init() { - c := &compressor{} - c.poolCompressor.New = func() interface{} { - w, err := zstdlib.NewWriter(ioutil.Discard) - if err != nil { - panic(err) - } - writer := &writer{Encoder: w, pool: &c.poolCompressor} - runtime.SetFinalizer(writer, finalizeWriter) - return writer + enc, _ := zstd.NewWriter(nil) + dec, _ := zstd.NewReader(nil) + c := &compressor{ + encoder: enc, + decoder: dec, } encoding.RegisterCompressor(c) } @@ -67,73 +45,55 @@ func init() { // SetLevel updates the registered compressor to use a particular compression // level. NOTE: this function must only be called from an init function, and // is not threadsafe. -func SetLevel(level zstdlib.EncoderLevel) error { +func SetLevel(level zstd.EncoderLevel) error { c := encoding.GetCompressor(Name).(*compressor) - c.poolCompressor.New = func() interface{} { - w, err := zstdlib.NewWriter(ioutil.Discard, - zstdlib.WithEncoderLevel(level)) - if err != nil { - return err - } - - writer := &writer{Encoder: w, pool: &c.poolCompressor} - runtime.SetFinalizer(writer, finalizeWriter) - return writer + + enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) + if err != nil { + return err } + c.encoder = enc return nil } func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { - z := c.poolCompressor.Get().(*writer) - z.Encoder.Reset(w) - return z, nil + return &zstdWriteCloser{ + enc: c.encoder, + writer: w, + }, nil } -func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { - z, inPool := c.poolDecompressor.Get().(*reader) - if !inPool { - newZ, err := zstdlib.NewReader(r) - if err != nil { - return nil, err - } - reader := &reader{Decoder: newZ, pool: &c.poolDecompressor} - runtime.SetFinalizer(reader, finalizeReader) - return reader, nil - } - if err := z.Reset(r); err != nil { - c.poolDecompressor.Put(z) - return nil, err - } - return z, nil +type zstdWriteCloser struct { + enc *zstd.Encoder + writer io.Writer // Compressed data will be written here. + buf bytes.Buffer // Buffer uncompressed data here, compress on Close. } -func (c *compressor) Name() string { - return Name +func (z *zstdWriteCloser) Write(p []byte) (int, error) { + return z.buf.Write(p) } -func (z *writer) Close() error { - err := z.Encoder.Close() - z.pool.Put(z) +func (z *zstdWriteCloser) Close() error { + compressed := z.enc.EncodeAll(z.buf.Bytes(), nil) + _, err := io.Copy(z.writer, bytes.NewReader(compressed)) return err } -func (z *reader) Read(p []byte) (n int, err error) { - n, err = z.Decoder.Read(p) - if err == io.EOF { - z.pool.Put(z) +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + compressed, err := ioutil.ReadAll(r) + if err != nil { + return nil, err } - return n, err -} -func finalizeReader(r *reader) { - if r.Decoder != nil { - r.Decoder.Close() + uncompressed, err := c.decoder.DecodeAll(compressed, nil) + if err != nil { + return nil, err } + + return bytes.NewReader(uncompressed), nil } -func finalizeWriter(w *writer) { - if w.Encoder != nil { - w.Encoder.Close() - } +func (c *compressor) Name() string { + return Name }