From f49b6e31a0cf25316ca251f8d0cb74ffee606ef1 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 30 Aug 2024 15:26:23 +0200 Subject: [PATCH 1/9] feature: manage pool directly in the plugin Signed-off-by: Valery Piashchynski --- .golangci.yml | 2 +- go.mod | 6 +++--- go.sum | 12 ++++++------ go.work.sum | 30 ++++++++++++++++++++++++++++++ plugin.go | 7 ++++++- protoc_plugins/go.mod | 4 +++- tests/go.mod | 17 ++++++++++------- tests/go.sum | 24 ++++++++++++------------ 8 files changed, 71 insertions(+), 31 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 104ca3d..e1b806c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -38,7 +38,7 @@ linters: # All available linters list: Date: Fri, 30 Aug 2024 17:17:03 +0200 Subject: [PATCH 2/9] chore: support grpc codecV2 Signed-off-by: Valery Piashchynski --- codec/codec.go | 29 +++++++++++++++++++++++------ codec/codec_test.go | 19 ++++++++++++++----- go.mod | 2 +- plugin.go | 3 ++- server.go | 2 +- tests/grpc_plugin_gzip_test.go | 9 ++------- 6 files changed, 43 insertions(+), 21 deletions(-) diff --git a/codec/codec.go b/codec/codec.go index fe7ad19..85a6710 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -1,6 +1,10 @@ package codec -import "google.golang.org/grpc/encoding" +import ( + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/mem" + "google.golang.org/protobuf/proto" +) type RawMessage []byte @@ -15,7 +19,7 @@ func (RawMessage) ProtoMessage() {} func (RawMessage) String() string { return rm } type Codec struct { - Base encoding.Codec + Base encoding.CodecV2 } // Marshal returns the wire format of v. rawMessages would be returned without encoding. @@ -24,17 +28,30 @@ func (c *Codec) Marshal(v any) ([]byte, error) { return raw, nil } - return c.Base.Marshal(v) + data, err := c.Base.Marshal(v) + if err != nil { + return nil, err + } + + return data.Materialize(), nil } // Unmarshal parses the wire format into v. rawMessages would not be unmarshalled. func (c *Codec) Unmarshal(data []byte, v any) error { - if raw, ok := v.(*RawMessage); ok { - *raw = data + switch msg := v.(type) { + case *RawMessage: + *msg = data return nil + case proto.Message: + err := proto.Unmarshal(data, msg) + if err != nil { + return err + } + default: + return c.Base.Unmarshal(mem.BufferSlice{mem.NewBuffer(&data, mem.DefaultBufferPool())}, v) } - return c.Base.Unmarshal(data, v) + return nil } func (c *Codec) Name() string { diff --git a/codec/codec_test.go b/codec/codec_test.go index e9a9ec2..d48e6b9 100644 --- a/codec/codec_test.go +++ b/codec/codec_test.go @@ -1,20 +1,29 @@ package codec import ( + "encoding/json" "testing" - json "github.com/goccy/go-json" "github.com/stretchr/testify/assert" + "google.golang.org/grpc/mem" ) type jsonCodec struct{} -func (jsonCodec) Marshal(v any) ([]byte, error) { - return json.Marshal(v) +func (jsonCodec) Marshal(v any) (mem.BufferSlice, error) { + data, err := json.Marshal(v) + if err != nil { + return nil, err + } + + buf := mem.NewBuffer(&data, mem.DefaultBufferPool()) + bs := mem.BufferSlice{buf} + return bs, nil } -func (jsonCodec) Unmarshal(data []byte, v any) error { - return json.Unmarshal(data, v) +func (jsonCodec) Unmarshal(data mem.BufferSlice, v any) error { + out := data.Materialize() + return json.Unmarshal(out, v) } func (jsonCodec) Name() string { diff --git a/go.mod b/go.mod index b1274f2..b04886d 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ toolchain go1.23.0 require ( github.com/emicklei/proto v1.13.2 - github.com/goccy/go-json v0.10.3 github.com/prometheus/client_golang v1.20.2 github.com/roadrunner-server/api/v4 v4.16.0 github.com/roadrunner-server/endure/v2 v2.5.0 @@ -33,6 +32,7 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect + github.com/goccy/go-json v0.10.3 // indirect github.com/google/uuid v1.6.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/plugin.go b/plugin.go index d4551be..27877bf 100644 --- a/plugin.go +++ b/plugin.go @@ -67,9 +67,10 @@ func (p *Plugin) Init(cfg common.Configurer, log common.Logger, server common.Se if !cfg.Has(pluginName) { return errors.E(errors.Disabled) } + // register the codec encoding.RegisterCodec(&codec.Codec{ - Base: encoding.GetCodec(codec.Name), + Base: encoding.GetCodecV2(codec.Name), }) err := cfg.UnmarshalKey(pluginName, &p.config) diff --git a/server.go b/server.go index 8b44f3e..baa71ad 100644 --- a/server.go +++ b/server.go @@ -118,7 +118,7 @@ func (p *Plugin) serverOptions() ([]grpc.ServerOption, error) { var err error if p.config.EnableTLS() { - // if client CA is not empty we combine it with Cert and Key + // if client CA is not empty, we combine it with Cert and Key if p.config.TLS.RootCA != "" { cert, err = tls.LoadX509KeyPair(p.config.TLS.Cert, p.config.TLS.Key) if err != nil { diff --git a/tests/grpc_plugin_gzip_test.go b/tests/grpc_plugin_gzip_test.go index 303e811..3996903 100644 --- a/tests/grpc_plugin_gzip_test.go +++ b/tests/grpc_plugin_gzip_test.go @@ -35,7 +35,6 @@ func TestGrpcRqRsGzip(t *testing.T) { cfg := &config.Plugin{ Version: "2023.3.0", Path: "configs/.rr-grpc-rq.yaml", - Prefix: "rr", } err := cont.RegisterAll( @@ -112,7 +111,6 @@ func TestGrpcRqRsMultipleGzip(t *testing.T) { cfg := &config.Plugin{ Version: "2023.3.0", Path: "configs/.rr-grpc-rq-multiple.yaml", - Prefix: "rr", } err := cont.RegisterAll( @@ -206,7 +204,6 @@ func TestGrpcRqRsTLSGzip(t *testing.T) { cfg := &config.Plugin{ Version: "2023.3.0", Path: "configs/.rr-grpc-rq-tls.yaml", - Prefix: "rr", } err := cont.RegisterAll( @@ -291,7 +288,6 @@ func TestGrpcRqRsTLSRootCAGzip(t *testing.T) { cfg := &config.Plugin{ Version: "2023.3.0", Path: "configs/.rr-grpc-rq-tls-rootca.yaml", - Prefix: "rr", } err := cont.RegisterAll( @@ -356,7 +352,7 @@ func TestGrpcRqRsTLSRootCAGzip(t *testing.T) { MinVersion: tls.VersionTLS12, } - conn, err := grpc.Dial("127.0.0.1:9003", grpc.WithTransportCredentials(credentials.NewTLS(tlscfg)), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) + conn, err := grpc.NewClient("127.0.0.1:9003", grpc.WithTransportCredentials(credentials.NewTLS(tlscfg)), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) require.NoError(t, err) require.NotNil(t, conn) @@ -376,7 +372,6 @@ func TestGrpcRqRsTLS_WithResetGzip(t *testing.T) { cfg := &config.Plugin{ Version: "2023.3.0", Path: "configs/.rr-grpc-rq-tls.yaml", - Prefix: "rr", } err := cont.RegisterAll( @@ -442,7 +437,7 @@ func TestGrpcRqRsTLS_WithResetGzip(t *testing.T) { MinVersion: tls.VersionTLS12, } - conn, err := grpc.Dial("localhost:9002", grpc.WithTransportCredentials(credentials.NewTLS(tlscfg)), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) + conn, err := grpc.NewClient("localhost:9002", grpc.WithTransportCredentials(credentials.NewTLS(tlscfg)), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) require.NoError(t, err) require.NotNil(t, conn) From 8c818089604fc1c57ead6c10146e32b7ffcd4cb5 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 30 Aug 2024 17:26:32 +0200 Subject: [PATCH 3/9] chore: close the connections Signed-off-by: Valery Piashchynski --- tests/grpc_plugin_gzip_test.go | 5 +++++ tests/grpc_plugin_test.go | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/grpc_plugin_gzip_test.go b/tests/grpc_plugin_gzip_test.go index 3996903..6234e38 100644 --- a/tests/grpc_plugin_gzip_test.go +++ b/tests/grpc_plugin_gzip_test.go @@ -99,6 +99,7 @@ func TestGrpcRqRsGzip(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} @@ -192,6 +193,7 @@ func TestGrpcRqRsMultipleGzip(t *testing.T) { err = watch.CloseSend() require.NoError(t, err) + _ = conn.Close() stopCh <- struct{}{} @@ -276,6 +278,7 @@ func TestGrpcRqRsTLSGzip(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} @@ -360,6 +363,7 @@ func TestGrpcRqRsTLSRootCAGzip(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} @@ -452,6 +456,7 @@ func TestGrpcRqRsTLS_WithResetGzip(t *testing.T) { resp2, err2 := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err2) require.Equal(t, "TOST", resp2.Msg) + _ = conn.Close() stopCh <- struct{}{} wg.Wait() diff --git a/tests/grpc_plugin_test.go b/tests/grpc_plugin_test.go index 44edf1b..688c57e 100644 --- a/tests/grpc_plugin_test.go +++ b/tests/grpc_plugin_test.go @@ -112,6 +112,7 @@ func TestGrpcInit(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} @@ -183,6 +184,7 @@ func TestGrpcOtel(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} wg.Wait() @@ -305,7 +307,6 @@ func TestGrpcInitDup2(t *testing.T) { cfg := &config.Plugin{ Version: "2023.3.0", Path: "configs/.rr-grpc-init-duplicate-2.yaml", - Prefix: "rr", } err := cont.RegisterAll( @@ -436,6 +437,7 @@ func TestGrpcInitMultiple(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() time.Sleep(time.Second) stopCh <- struct{}{} @@ -513,6 +515,7 @@ func TestGrpcRqRs(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} @@ -617,6 +620,7 @@ func TestGrpcRqRsException(t *testing.T) { require.Error(t, err) require.Equal(t, "rpc error: code = Internal desc = FOOOOOOOOOOOO", err.Error()) require.Nil(t, resp) + _ = conn.Close() stopCh <- struct{}{} @@ -710,6 +714,7 @@ func TestGrpcRqRsMultiple(t *testing.T) { err = watch.CloseSend() require.NoError(t, err) + _ = conn.Close() stopCh <- struct{}{} @@ -794,6 +799,7 @@ func TestGrpcRqRsTLS(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} @@ -878,6 +884,7 @@ func TestGrpcRqRsTLSRootCA(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} wg.Wait() @@ -969,6 +976,7 @@ func TestGrpcRqRsTLS_WithReset(t *testing.T) { resp2, err2 := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err2) require.Equal(t, "TOST", resp2.Msg) + _ = conn.Close() stopCh <- struct{}{} wg.Wait() @@ -1056,6 +1064,7 @@ func TestGRPCMetrics(t *testing.T) { assert.Contains(t, genericOut, `rr_grpc_request_total`) assert.Contains(t, genericOut, `rr_grpc_requests_queue`) + _ = conn.Close() close(sig) wg.Wait() @@ -1179,6 +1188,7 @@ func Test_GrpcRqOtlp(t *testing.T) { resp, err := client.Ping(context.Background(), &service.Message{Msg: "TOST"}) require.NoError(t, err) require.Equal(t, "TOST", resp.Msg) + _ = conn.Close() stopCh <- struct{}{} wg.Wait() From 07a34792397a6dd2a07205b1c284d67ffdade2fe Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 30 Aug 2024 17:50:29 +0200 Subject: [PATCH 4/9] chore: rotate ports for some tests Signed-off-by: Valery Piashchynski --- tests/configs/.rr-grpc-rq-multiple.yaml | 3 +-- tests/grpc_plugin_gzip_test.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/configs/.rr-grpc-rq-multiple.yaml b/tests/configs/.rr-grpc-rq-multiple.yaml index 73b2b36..c809717 100644 --- a/tests/configs/.rr-grpc-rq-multiple.yaml +++ b/tests/configs/.rr-grpc-rq-multiple.yaml @@ -15,7 +15,7 @@ logs: # GRPC service configuration grpc: # socket to listen - listen: "tcp://127.0.0.1:9001" + listen: "tcp://127.0.0.1:9002" # proto root file proto: @@ -61,6 +61,5 @@ grpc: # Usual workers pool configuration pool: num_workers: 2 - max_jobs: 0 allocate_timeout: 60s destroy_timeout: 60s diff --git a/tests/grpc_plugin_gzip_test.go b/tests/grpc_plugin_gzip_test.go index 6234e38..6bd9a4e 100644 --- a/tests/grpc_plugin_gzip_test.go +++ b/tests/grpc_plugin_gzip_test.go @@ -168,7 +168,7 @@ func TestGrpcRqRsMultipleGzip(t *testing.T) { time.Sleep(time.Second * 1) - conn, err := grpc.NewClient("127.0.0.1:9001", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) + conn, err := grpc.NewClient("127.0.0.1:9002", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) require.NoError(t, err) require.NotNil(t, conn) From c10f92bf1adaac7b0519b5c9820a2ee8ccf99a45 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 30 Aug 2024 17:54:54 +0200 Subject: [PATCH 5/9] chore: register std codec name Signed-off-by: Valery Piashchynski --- plugin.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin.go b/plugin.go index 27877bf..62f7722 100644 --- a/plugin.go +++ b/plugin.go @@ -68,6 +68,7 @@ func (p *Plugin) Init(cfg common.Configurer, log common.Logger, server common.Se return errors.E(errors.Disabled) } + encoding.RegisterCodecV2(encoding.GetCodecV2(codec.Name)) // register the codec encoding.RegisterCodec(&codec.Codec{ Base: encoding.GetCodecV2(codec.Name), From 242b4aa4934a103271a3b83e8db5edb7149990d4 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 30 Aug 2024 17:56:54 +0200 Subject: [PATCH 6/9] chore: rotate ports Signed-off-by: Valery Piashchynski --- tests/configs/.rr-grpc-rq-multiple.yaml | 2 +- tests/grpc_plugin_gzip_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/configs/.rr-grpc-rq-multiple.yaml b/tests/configs/.rr-grpc-rq-multiple.yaml index c809717..95e82e6 100644 --- a/tests/configs/.rr-grpc-rq-multiple.yaml +++ b/tests/configs/.rr-grpc-rq-multiple.yaml @@ -15,7 +15,7 @@ logs: # GRPC service configuration grpc: # socket to listen - listen: "tcp://127.0.0.1:9002" + listen: "tcp://127.0.0.1:9003" # proto root file proto: diff --git a/tests/grpc_plugin_gzip_test.go b/tests/grpc_plugin_gzip_test.go index 6bd9a4e..543c80a 100644 --- a/tests/grpc_plugin_gzip_test.go +++ b/tests/grpc_plugin_gzip_test.go @@ -168,7 +168,7 @@ func TestGrpcRqRsMultipleGzip(t *testing.T) { time.Sleep(time.Second * 1) - conn, err := grpc.NewClient("127.0.0.1:9002", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) + conn, err := grpc.NewClient("127.0.0.1:9003", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) require.NoError(t, err) require.NotNil(t, conn) From 3b16f123f9cf1e8cbb126e96ee8ccf585bdcecbf Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 30 Aug 2024 18:10:40 +0200 Subject: [PATCH 7/9] chore: register codec only once Signed-off-by: Valery Piashchynski --- .golangci.yml | 1 - plugin.go | 13 +++++++------ tests/grpc_plugin_gzip_test.go | 1 - 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index e1b806c..f712f66 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -40,7 +40,6 @@ linters: # All available linters list: Date: Fri, 30 Aug 2024 18:16:55 +0200 Subject: [PATCH 8/9] chore: update ports Signed-off-by: Valery Piashchynski --- tests/grpc_plugin_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/grpc_plugin_test.go b/tests/grpc_plugin_test.go index 688c57e..79ef767 100644 --- a/tests/grpc_plugin_test.go +++ b/tests/grpc_plugin_test.go @@ -689,7 +689,7 @@ func TestGrpcRqRsMultiple(t *testing.T) { time.Sleep(time.Second * 1) - conn, err := grpc.NewClient("127.0.0.1:9001", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient("127.0.0.1:9003", grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) require.NotNil(t, conn) From b77b9439235f1b91668d8d2cda5479547a1585d9 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 30 Aug 2024 20:39:04 +0200 Subject: [PATCH 9/9] chore: safety on dereferencing a pointer Signed-off-by: Valery Piashchynski --- codec/codec.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/codec/codec.go b/codec/codec.go index 85a6710..34f5cb9 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -40,14 +40,21 @@ func (c *Codec) Marshal(v any) ([]byte, error) { func (c *Codec) Unmarshal(data []byte, v any) error { switch msg := v.(type) { case *RawMessage: + if msg == nil { + return nil + } + + // assign data to v *msg = data return nil case proto.Message: + // for the regular proto message, just unmarshal it err := proto.Unmarshal(data, msg) if err != nil { return err } default: + // otherwise, use the base codec to unmarshal return c.Base.Unmarshal(mem.BufferSlice{mem.NewBuffer(&data, mem.DefaultBufferPool())}, v) }