From 3124bf677d1b6d54eb768b2c99cf3a8e8e8a0563 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 22 Oct 2024 13:44:46 -0400 Subject: [PATCH 1/8] Use proper zstd decoder pool for binlog event compression handling Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 38 +++++++++++++++++++++------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 378698bc64b..e89095f0793 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -89,7 +89,7 @@ var ( // allocations and GC overhead so this pool allows us to handle // concurrent cases better while still scaling to 0 when there's no // usage. - statefulDecoderPool sync.Pool + statefulDecoderPool = &decoderPool{} ) func init() { @@ -98,7 +98,7 @@ func init() { if err != nil { // Should only happen e.g. due to ENOMEM log.Errorf("Error creating stateless decoder: %v", err) } - statefulDecoderPool = sync.Pool{ + statefulDecoderPool.pool = sync.Pool{ New: func() any { d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) if err != nil { // Should only happen e.g. due to ENOMEM @@ -304,12 +304,9 @@ func (tp *TransactionPayload) decompress() error { // larger payloads. if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize { in := bytes.NewReader(tp.payload) - streamDecoder := statefulDecoderPool.Get().(*zstd.Decoder) - if streamDecoder == nil { - return vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder") - } - if err := streamDecoder.Reset(in); err != nil { - return vterrors.Wrap(err, "error resetting stateful stream decoder") + streamDecoder, err := statefulDecoderPool.Get(in) + if err != nil { + return err } compressedTrxPayloadsUsingStream.Add(1) tp.reader = streamDecoder @@ -341,7 +338,7 @@ func (tp *TransactionPayload) Close() { switch reader := tp.reader.(type) { case *zstd.Decoder: if err := reader.Reset(nil); err == nil || err == io.EOF { - readersPool.Put(reader) + statefulDecoderPool.Put(reader) } default: reader = nil @@ -368,3 +365,26 @@ func (tp *TransactionPayload) GetNextEvent() (BinlogEvent, error) { //func (tp *TransactionPayload) Events() iter.Seq[BinlogEvent] { // return tp.iterator //} + +// decoderPool manages a sync.Pool of *zstd.Decoders. +type decoderPool struct { + pool sync.Pool +} + +// Get gets a new *zstd.Decoder. +func (dp *decoderPool) Get(r io.Reader) (*zstd.Decoder, error) { + decoder, ok := dp.pool.Get().(*zstd.Decoder) + if !ok || decoder == nil { + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder") + } + if err := decoder.Reset(r); err != nil { + return nil, vterrors.Wrap(err, "error resetting stateful stream decoder") + } + return decoder, nil +} + +func (dp *decoderPool) Put(sd *zstd.Decoder) { + if err := sd.Reset(nil); err == nil || err == io.EOF { + dp.pool.Put(sd) + } +} From bf0e91d01491895816ce8ad3e98f93fa3efb4ffa Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 22 Oct 2024 14:01:56 -0400 Subject: [PATCH 2/8] Address review comment and cleanup Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 38 +++++++++++----------------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index e89095f0793..b6b76822e8a 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -19,13 +19,13 @@ package mysql import ( "bytes" "encoding/binary" + "fmt" "io" "sync" "github.com/klauspost/compress/zstd" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vterrors" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -96,16 +96,7 @@ func init() { var err error statelessDecoder, err = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) if err != nil { // Should only happen e.g. due to ENOMEM - log.Errorf("Error creating stateless decoder: %v", err) - } - statefulDecoderPool.pool = sync.Pool{ - New: func() any { - d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) - if err != nil { // Should only happen e.g. due to ENOMEM - log.Errorf("Error creating stateful decoder: %v", err) - } - return d - }, + panic(fmt.Errorf("failed to create stateless decoder: %v", err)) } } @@ -314,9 +305,6 @@ func (tp *TransactionPayload) decompress() error { } // Process smaller payloads using only in-memory buffers. - if statelessDecoder == nil { // Should never happen - return vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateless decoder") - } decompressedBytes := make([]byte, 0, tp.uncompressedSize) // Perform a single pre-allocation decompressedBytes, err := statelessDecoder.DecodeAll(tp.payload, decompressedBytes[:0]) if err != nil { @@ -371,20 +359,24 @@ type decoderPool struct { pool sync.Pool } -// Get gets a new *zstd.Decoder. -func (dp *decoderPool) Get(r io.Reader) (*zstd.Decoder, error) { - decoder, ok := dp.pool.Get().(*zstd.Decoder) - if !ok || decoder == nil { - return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder") +// Get gets a pooled OR new *zstd.Decoder. +func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) { + decoder := dp.pool.Get().(*zstd.Decoder) + if decoder == nil { + d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) + if err != nil { // Should only happen e.g. due to ENOMEM + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder") + } + decoder = d } - if err := decoder.Reset(r); err != nil { + if err := decoder.Reset(reader); err != nil { return nil, vterrors.Wrap(err, "error resetting stateful stream decoder") } return decoder, nil } -func (dp *decoderPool) Put(sd *zstd.Decoder) { - if err := sd.Reset(nil); err == nil || err == io.EOF { - dp.pool.Put(sd) +func (dp *decoderPool) Put(decoder *zstd.Decoder) { + if err := decoder.Reset(nil); err == nil || err == io.EOF { + dp.pool.Put(decoder) } } From f2e6ad12d3b019e6ab60f393be2b566c80af8842 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 22 Oct 2024 14:39:07 -0400 Subject: [PATCH 3/8] Add unit test Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 11 ++-- go/mysql/binlog_event_compression_test.go | 63 +++++++++++++++++++++++ 2 files changed, 68 insertions(+), 6 deletions(-) create mode 100644 go/mysql/binlog_event_compression_test.go diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index b6b76822e8a..b1fffbc5284 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -325,11 +325,8 @@ func (tp *TransactionPayload) decompress() error { func (tp *TransactionPayload) Close() { switch reader := tp.reader.(type) { case *zstd.Decoder: - if err := reader.Reset(nil); err == nil || err == io.EOF { - statefulDecoderPool.Put(reader) - } + statefulDecoderPool.Put(reader) default: - reader = nil } tp.iterator = nil } @@ -361,8 +358,10 @@ type decoderPool struct { // Get gets a pooled OR new *zstd.Decoder. func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) { - decoder := dp.pool.Get().(*zstd.Decoder) - if decoder == nil { + var decoder *zstd.Decoder + if pooled := dp.pool.Get(); pooled != nil { + decoder = pooled.(*zstd.Decoder) + } else { d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) if err != nil { // Should only happen e.g. due to ENOMEM return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder") diff --git a/go/mysql/binlog_event_compression_test.go b/go/mysql/binlog_event_compression_test.go new file mode 100644 index 00000000000..36ff68609b7 --- /dev/null +++ b/go/mysql/binlog_event_compression_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysql + +import ( + "bytes" + "io" + "testing" + + "github.com/klauspost/compress/zstd" + "github.com/stretchr/testify/require" +) + +func TestDecoderPool(t *testing.T) { + type args struct { + r io.Reader + } + tests := []struct { + name string + reader io.Reader + wantErr bool + }{ + { + name: "happy path", + reader: bytes.NewReader([]byte{0x68, 0x61, 0x70, 0x70, 0x79}), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + decoder, err := statefulDecoderPool.Get(tt.reader) + require.NoError(t, err) + require.NotNil(t, decoder) + require.IsType(t, &zstd.Decoder{}, decoder) + statefulDecoderPool.Put(decoder) + decoder2, err := statefulDecoderPool.Get(tt.reader) + require.NoError(t, err) + require.NotNil(t, decoder2) + require.IsType(t, &zstd.Decoder{}, decoder) + statefulDecoderPool.Put(decoder) + require.True(t, (decoder2 == decoder)) + statefulDecoderPool.Put(decoder2) + decoder3, err := statefulDecoderPool.Get(tt.reader) + require.NoError(t, err) + require.IsType(t, &zstd.Decoder{}, decoder) + statefulDecoderPool.Put(decoder) + require.True(t, (decoder3 == decoder2)) + }) + } +} From aa2d87f68afbdf4fdcb2551fc302b1bb5efb1f31 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 22 Oct 2024 15:14:52 -0400 Subject: [PATCH 4/8] Deflake unit test Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression_test.go | 49 +++++++++++++++-------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/go/mysql/binlog_event_compression_test.go b/go/mysql/binlog_event_compression_test.go index 36ff68609b7..b54932dada6 100644 --- a/go/mysql/binlog_event_compression_test.go +++ b/go/mysql/binlog_event_compression_test.go @@ -41,23 +41,38 @@ func TestDecoderPool(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - decoder, err := statefulDecoderPool.Get(tt.reader) - require.NoError(t, err) - require.NotNil(t, decoder) - require.IsType(t, &zstd.Decoder{}, decoder) - statefulDecoderPool.Put(decoder) - decoder2, err := statefulDecoderPool.Get(tt.reader) - require.NoError(t, err) - require.NotNil(t, decoder2) - require.IsType(t, &zstd.Decoder{}, decoder) - statefulDecoderPool.Put(decoder) - require.True(t, (decoder2 == decoder)) - statefulDecoderPool.Put(decoder2) - decoder3, err := statefulDecoderPool.Get(tt.reader) - require.NoError(t, err) - require.IsType(t, &zstd.Decoder{}, decoder) - statefulDecoderPool.Put(decoder) - require.True(t, (decoder3 == decoder2)) + // It's not guaranteed that we get the same decoder back from the pool + // that we just put in, so we use a loop and ensure that it worked at + // least one of the times. Without doing this the test would be flaky. + used := false + + for i := 0; i < 20; i++ { + decoder, err := statefulDecoderPool.Get(tt.reader) + require.NoError(t, err) + require.NotNil(t, decoder) + require.IsType(t, &zstd.Decoder{}, decoder) + statefulDecoderPool.Put(decoder) + + decoder2, err := statefulDecoderPool.Get(tt.reader) + require.NoError(t, err) + require.NotNil(t, decoder2) + require.IsType(t, &zstd.Decoder{}, decoder) + if decoder2 == decoder { + used = true + } + statefulDecoderPool.Put(decoder2) + + decoder3, err := statefulDecoderPool.Get(tt.reader) + require.NoError(t, err) + require.NotNil(t, &zstd.Decoder{}, decoder3) + require.IsType(t, &zstd.Decoder{}, decoder3) + if decoder3 == decoder || decoder3 == decoder2 { + used = true + } + statefulDecoderPool.Put(decoder) + } + + require.True(t, used) }) } } From 0340b02e006d6f3bf2d205f1f3a3b8b3d17895b4 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 22 Oct 2024 15:17:27 -0400 Subject: [PATCH 5/8] Minor change from self review Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index b1fffbc5284..0eb454aa0f9 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -364,7 +364,7 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) { } else { d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) if err != nil { // Should only happen e.g. due to ENOMEM - return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "failed to create stateful stream decoder") + return nil, vterrors.Wrap(err, "failed to create stateful stream decoder") } decoder = d } From 1d8db4373e889debec48fe70789c83533d3e090a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 22 Oct 2024 15:38:26 -0400 Subject: [PATCH 6/8] Address review comments Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 10 ++++++++-- go/mysql/binlog_event_compression_test.go | 12 ++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 0eb454aa0f9..fc77a88ecf3 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -358,9 +358,15 @@ type decoderPool struct { // Get gets a pooled OR new *zstd.Decoder. func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) { - var decoder *zstd.Decoder + var ( + decoder *zstd.Decoder + ok bool + ) if pooled := dp.pool.Get(); pooled != nil { - decoder = pooled.(*zstd.Decoder) + decoder, ok = pooled.(*zstd.Decoder) + if !ok { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled) + } } else { d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) if err != nil { // Should only happen e.g. due to ENOMEM diff --git a/go/mysql/binlog_event_compression_test.go b/go/mysql/binlog_event_compression_test.go index b54932dada6..fc6ecc46efa 100644 --- a/go/mysql/binlog_event_compression_test.go +++ b/go/mysql/binlog_event_compression_test.go @@ -44,7 +44,7 @@ func TestDecoderPool(t *testing.T) { // It's not guaranteed that we get the same decoder back from the pool // that we just put in, so we use a loop and ensure that it worked at // least one of the times. Without doing this the test would be flaky. - used := false + poolingUsed := false for i := 0; i < 20; i++ { decoder, err := statefulDecoderPool.Get(tt.reader) @@ -56,9 +56,9 @@ func TestDecoderPool(t *testing.T) { decoder2, err := statefulDecoderPool.Get(tt.reader) require.NoError(t, err) require.NotNil(t, decoder2) - require.IsType(t, &zstd.Decoder{}, decoder) + require.IsType(t, &zstd.Decoder{}, decoder2) if decoder2 == decoder { - used = true + poolingUsed = true } statefulDecoderPool.Put(decoder2) @@ -67,12 +67,12 @@ func TestDecoderPool(t *testing.T) { require.NotNil(t, &zstd.Decoder{}, decoder3) require.IsType(t, &zstd.Decoder{}, decoder3) if decoder3 == decoder || decoder3 == decoder2 { - used = true + poolingUsed = true } - statefulDecoderPool.Put(decoder) + statefulDecoderPool.Put(decoder3) } - require.True(t, used) + require.True(t, poolingUsed) }) } } From 3a12360bb4270cb2300701f4083af9a864fc6670 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 22 Oct 2024 15:53:17 -0400 Subject: [PATCH 7/8] Address review comments part 2 Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression_test.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/go/mysql/binlog_event_compression_test.go b/go/mysql/binlog_event_compression_test.go index fc6ecc46efa..4c6418a1aa0 100644 --- a/go/mysql/binlog_event_compression_test.go +++ b/go/mysql/binlog_event_compression_test.go @@ -26,8 +26,10 @@ import ( ) func TestDecoderPool(t *testing.T) { - type args struct { - r io.Reader + validateDecoder := func(t *testing.T, err error, decoder *zstd.Decoder) { + require.NoError(t, err) + require.NotNil(t, decoder) + require.IsType(t, &zstd.Decoder{}, decoder) } tests := []struct { name string @@ -39,6 +41,7 @@ func TestDecoderPool(t *testing.T) { reader: bytes.NewReader([]byte{0x68, 0x61, 0x70, 0x70, 0x79}), }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // It's not guaranteed that we get the same decoder back from the pool @@ -48,24 +51,18 @@ func TestDecoderPool(t *testing.T) { for i := 0; i < 20; i++ { decoder, err := statefulDecoderPool.Get(tt.reader) - require.NoError(t, err) - require.NotNil(t, decoder) - require.IsType(t, &zstd.Decoder{}, decoder) + validateDecoder(t, err, decoder) statefulDecoderPool.Put(decoder) decoder2, err := statefulDecoderPool.Get(tt.reader) - require.NoError(t, err) - require.NotNil(t, decoder2) - require.IsType(t, &zstd.Decoder{}, decoder2) + validateDecoder(t, err, decoder2) if decoder2 == decoder { poolingUsed = true } statefulDecoderPool.Put(decoder2) decoder3, err := statefulDecoderPool.Get(tt.reader) - require.NoError(t, err) - require.NotNil(t, &zstd.Decoder{}, decoder3) - require.IsType(t, &zstd.Decoder{}, decoder3) + validateDecoder(t, err, decoder3) if decoder3 == decoder || decoder3 == decoder2 { poolingUsed = true } From e7191f0be5a6a26420ef39128ba3e2f47d997833 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 22 Oct 2024 15:56:39 -0400 Subject: [PATCH 8/8] Go back to logging on failed stateless decoder creation And do it as a JiT allocation Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index fc77a88ecf3..1cb38d5cb16 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -19,7 +19,6 @@ package mysql import ( "bytes" "encoding/binary" - "fmt" "io" "sync" @@ -92,14 +91,6 @@ var ( statefulDecoderPool = &decoderPool{} ) -func init() { - var err error - statelessDecoder, err = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) - if err != nil { // Should only happen e.g. due to ENOMEM - panic(fmt.Errorf("failed to create stateless decoder: %v", err)) - } -} - type TransactionPayload struct { size uint64 compressionType uint64 @@ -305,6 +296,13 @@ func (tp *TransactionPayload) decompress() error { } // Process smaller payloads using only in-memory buffers. + if statelessDecoder == nil { // Should only need to be done once + var err error + statelessDecoder, err = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) + if err != nil { // Should only happen e.g. due to ENOMEM + return vterrors.Wrap(err, "failed to create stateless decoder") + } + } decompressedBytes := make([]byte, 0, tp.uncompressedSize) // Perform a single pre-allocation decompressedBytes, err := statelessDecoder.DecodeAll(tp.payload, decompressedBytes[:0]) if err != nil {