From 1f587697687799908e9e371a72eb99188e8400ab Mon Sep 17 00:00:00 2001 From: cx <1249843194@qq.com> Date: Thu, 12 Sep 2024 20:03:02 +0800 Subject: [PATCH] Prevent infinite loop in gRPC tracing during span storage. fixes: #5971 Signed-off-by: cx <1249843194@qq.com> --- plugin/storage/grpc/factory.go | 34 +++++++++++-------- plugin/storage/grpc/factory_test.go | 2 +- plugin/storage/grpc/shared/grpc_client.go | 16 ++++----- .../storage/grpc/shared/grpc_client_test.go | 2 +- 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index 90df7128881..749bdba1154 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -49,8 +49,8 @@ type Factory struct { configV1 Configuration configV2 *ConfigV2 - services *ClientPluginServices - remoteConn *grpc.ClientConn + services *ClientPluginServices + remoteConns []*grpc.ClientConn } // NewFactory creates a new Factory. @@ -101,8 +101,8 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) return noopmetric.NewMeterProvider() }, } - newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { - return f.configV2.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...) + newClientFn := func(telSettings component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + return f.configV2.ToClientConn(context.Background(), componenttest.NewNopHost(), telSettings, opts...) } var err error @@ -114,13 +114,11 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) return nil } -type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error) +type newClientFn func(telSettings component.TelemetrySettings, opts ...grpc.DialOption) (*grpc.ClientConn, error) func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) { c := f.configV2 - opts := []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))), - } + opts := make([]grpc.DialOption, 0) if c.Auth != nil { return nil, fmt.Errorf("authenticator is not supported") } @@ -131,12 +129,20 @@ func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) } - remoteConn, err := newClient(opts...) + noTraceConn, err := newClient(telset, opts...) if err != nil { - return nil, fmt.Errorf("error creating remote storage client: %w", err) + return nil, fmt.Errorf("error creating remote storage client without tracing: %w", err) } - f.remoteConn = remoteConn - grpcClient := shared.NewGRPCClient(remoteConn) + f.remoteConns = append(f.remoteConns, noTraceConn) + + opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider)))) + traceConn, err := newClient(telset, opts...) + if err != nil { + return nil, fmt.Errorf("error creating remote storage client with tracing: %w", err) + } + f.remoteConns = append(f.remoteConns, traceConn) + + grpcClient := shared.NewGRPCClient(traceConn, noTraceConn) return &ClientPluginServices{ PluginServices: shared.PluginServices{ Store: grpcClient, @@ -200,8 +206,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { // Close closes the resources held by the factory func (f *Factory) Close() error { var errs []error - if f.remoteConn != nil { - errs = append(errs, f.remoteConn.Close()) + for i := range f.remoteConns { + errs = append(errs, f.remoteConns[i].Close()) } errs = append(errs, f.configV1.RemoteTLS.Close()) return errors.Join(errs...) diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index 9d72134327d..0ac6a66e90e 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -124,7 +124,7 @@ func TestNewFactoryError(t *testing.T) { f, err := NewFactoryWithConfig(ConfigV2{}, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, f.Close()) }) - newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + newClientFn := func(_ component.TelemetrySettings, _ ...grpc.DialOption) (conn *grpc.ClientConn, err error) { return nil, errors.New("test error") } _, err = f.newRemoteStorage(component.TelemetrySettings{}, newClientFn) diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index ee0e910003b..725eff543d3 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -46,15 +46,15 @@ type GRPCClient struct { streamWriterClient storage_v1.StreamingSpanWriterPluginClient } -func NewGRPCClient(c *grpc.ClientConn) *GRPCClient { +func NewGRPCClient(withTraceConn *grpc.ClientConn, withoutTraceConn *grpc.ClientConn) *GRPCClient { return &GRPCClient{ - readerClient: storage_v1.NewSpanReaderPluginClient(c), - writerClient: storage_v1.NewSpanWriterPluginClient(c), - archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c), - archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c), - capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c), - depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), - streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(c), + readerClient: storage_v1.NewSpanReaderPluginClient(withTraceConn), + writerClient: storage_v1.NewSpanWriterPluginClient(withoutTraceConn), + archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(withTraceConn), + archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(withoutTraceConn), + capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(withTraceConn), + depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(withTraceConn), + streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(withoutTraceConn), } } diff --git a/plugin/storage/grpc/shared/grpc_client_test.go b/plugin/storage/grpc/shared/grpc_client_test.go index 51e56df6bb5..da8d82ce4a4 100644 --- a/plugin/storage/grpc/shared/grpc_client_test.go +++ b/plugin/storage/grpc/shared/grpc_client_test.go @@ -104,7 +104,7 @@ func withGRPCClient(fn func(r *grpcClientTest)) { func TestNewGRPCClient(t *testing.T) { conn := &grpc.ClientConn{} - client := NewGRPCClient(conn) + client := NewGRPCClient(conn, conn) assert.NotNil(t, client) assert.Implements(t, (*storage_v1.SpanReaderPluginClient)(nil), client.readerClient)