diff --git a/plugin/storage/grpc/config.go b/plugin/storage/grpc/config.go index 71599911cfc..3ce6a6dfcbb 100644 --- a/plugin/storage/grpc/config.go +++ b/plugin/storage/grpc/config.go @@ -4,18 +4,10 @@ package grpc import ( - "context" - "fmt" "time" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" - "google.golang.org/grpc" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/tenancy" @@ -60,56 +52,4 @@ func (c *Configuration) TranslateToConfigV2() *ConfigV2 { type ClientPluginServices struct { shared.PluginServices Capabilities shared.PluginCapabilities - remoteConn *grpc.ClientConn -} - -// TODO move this to factory.go -func (c *ConfigV2) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) { - telset := component.TelemetrySettings{ - Logger: logger, - TracerProvider: tracerProvider, - } - newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { - return c.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...) - } - return newRemoteStorage(c, telset, newClientFn) -} - -type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error) - -func newRemoteStorage(c *ConfigV2, telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) { - opts := []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))), - } - if c.Auth != nil { - return nil, fmt.Errorf("authenticator is not supported") - } - - tenancyMgr := tenancy.NewManager(&c.Tenancy) - if tenancyMgr.Enabled { - opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) - opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) - } - - remoteConn, err := newClient(opts...) - if err != nil { - return nil, fmt.Errorf("error creating remote storage client: %w", err) - } - grpcClient := shared.NewGRPCClient(remoteConn) - return &ClientPluginServices{ - PluginServices: shared.PluginServices{ - Store: grpcClient, - ArchiveStore: grpcClient, - StreamingSpanWriter: grpcClient, - }, - Capabilities: grpcClient, - remoteConn: remoteConn, - }, nil -} - -func (c *ClientPluginServices) Close() error { - if c.remoteConn != nil { - return c.remoteConn.Close() - } - return nil } diff --git a/plugin/storage/grpc/config_test.go b/plugin/storage/grpc/config_test.go index b994c9a53f5..44fcbd7d331 100644 --- a/plugin/storage/grpc/config_test.go +++ b/plugin/storage/grpc/config_test.go @@ -4,26 +4,11 @@ package grpc import ( - "errors" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "google.golang.org/grpc" ) -func TestBuildRemoteNewClientError(t *testing.T) { - // this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params. - c := &ConfigV2{} - newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) { - return nil, errors.New("test error") - } - _, err := newRemoteStorage(c, component.TelemetrySettings{}, newClientFn) - require.Error(t, err) - require.Contains(t, err.Error(), "error creating remote storage client") -} - func TestDefaultConfigV2(t *testing.T) { cfg := DefaultConfigV2() assert.NotEmpty(t, cfg.Timeout) diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index 2c53587da31..57b5da3092a 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -4,18 +4,25 @@ package grpc import ( + "context" "errors" "flag" "fmt" "io" "github.com/spf13/viper" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "google.golang.org/grpc" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin" + "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -39,7 +46,8 @@ type Factory struct { configV1 Configuration configV2 *ConfigV2 - services *ClientPluginServices + services *ClientPluginServices + remoteConn *grpc.ClientConn } // NewFactory creates a new Factory. @@ -82,8 +90,16 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.configV2 = f.configV1.TranslateToConfigV2() } + telset := component.TelemetrySettings{ + Logger: logger, + TracerProvider: f.tracerProvider, + } + newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + return f.configV2.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...) + } + var err error - f.services, err = f.configV2.Build(logger, f.tracerProvider) + f.services, err = f.newRemoteStorage(telset, newClientFn) if err != nil { return fmt.Errorf("grpc storage builder failed to create a store: %w", err) } @@ -91,6 +107,39 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) return nil } +type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error) + +func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) { + c := f.configV2 + opts := []grpc.DialOption{ + grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))), + } + if c.Auth != nil { + return nil, fmt.Errorf("authenticator is not supported") + } + + tenancyMgr := tenancy.NewManager(&c.Tenancy) + if tenancyMgr.Enabled { + opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr))) + opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr))) + } + + remoteConn, err := newClient(opts...) + if err != nil { + return nil, fmt.Errorf("error creating remote storage client: %w", err) + } + f.remoteConn = remoteConn + grpcClient := shared.NewGRPCClient(remoteConn) + return &ClientPluginServices{ + PluginServices: shared.PluginServices{ + Store: grpcClient, + ArchiveStore: grpcClient, + StreamingSpanWriter: grpcClient, + }, + Capabilities: grpcClient, + }, nil +} + // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { return f.services.Store.SpanReader(), nil @@ -144,8 +193,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { // Close closes the resources held by the factory func (f *Factory) Close() error { var errs []error - if f.services != nil { - errs = append(errs, f.services.Close()) + if f.remoteConn != nil { + errs = append(errs, f.remoteConn.Close()) } errs = append(errs, f.configV1.RemoteTLS.Close()) return errors.Join(errs...) diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index db53d52afe6..9d72134327d 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -23,6 +24,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared/mocks" "github.com/jaegertracing/jaeger/storage" @@ -87,9 +89,7 @@ func makeFactory(t *testing.T) *Factory { f.InitFromViper(viper.New(), zap.NewNop()) require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - keepServices := f.services t.Cleanup(func() { - keepServices.Close() f.Close() }) @@ -118,6 +118,19 @@ func TestNewFactoryError(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "authenticator") }) + + t.Run("client", func(t *testing.T) { + // this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params. + f, err := NewFactoryWithConfig(ConfigV2{}, metrics.NullFactory, zap.NewNop()) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, f.Close()) }) + newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + return nil, errors.New("test error") + } + _, err = f.newRemoteStorage(component.TelemetrySettings{}, newClientFn) + require.Error(t, err) + require.Contains(t, err.Error(), "error creating remote storage client") + }) } func TestInitFactory(t *testing.T) { @@ -156,6 +169,9 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) { TimeoutSettings: exporterhelper.TimeoutSettings{ Timeout: 1 * time.Second, }, + Tenancy: tenancy.Options{ + Enabled: true, + }, } f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) require.NoError(t, err) diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go index afa7d172de6..45665e3b06d 100644 --- a/plugin/storage/grpc/options_test.go +++ b/plugin/storage/grpc/options_test.go @@ -9,6 +9,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/tenancy" @@ -68,8 +71,12 @@ func TestFailedTLSFlags(t *testing.T) { "--grpc-storage.tls.cert=blah", // invalid unless tls.enabled=true }) require.NoError(t, err) - var cfg Configuration - err = v1InitFromViper(&cfg, v) - require.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse gRPC storage TLS options") + f := NewFactory() + f.configV2 = nil + core, logs := observer.New(zap.NewAtomicLevelAt(zapcore.ErrorLevel)) + logger := zap.New(core, zap.WithFatalHook(zapcore.WriteThenPanic)) + require.Panics(t, func() { f.InitFromViper(v, logger) }) + require.Len(t, logs.All(), 1) + assert.Contains(t, logs.All()[0].Message, "unable to initialize gRPC storage factory") + assert.Contains(t, logs.All()[0].ContextMap()["error"], "failed to parse gRPC storage TLS options") }