Skip to content

Commit

Permalink
Clean up gRPC storage config (#5877)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Config.go contained business logic that belongs to the factory

## Description of the changes
- move logic to factory
- add unit tests for 100% coverage

## How was this change tested?
- CI

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Aug 23, 2024
1 parent 865dd5d commit 922faf0
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 85 deletions.
60 changes: 0 additions & 60 deletions plugin/storage/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,10 @@
package grpc

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand Down Expand Up @@ -60,56 +52,4 @@ func (c *Configuration) TranslateToConfigV2() *ConfigV2 {
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
remoteConn *grpc.ClientConn
}

// TODO move this to factory.go
func (c *ConfigV2) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
telset := component.TelemetrySettings{
Logger: logger,
TracerProvider: tracerProvider,
}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return c.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...)
}
return newRemoteStorage(c, telset, newClientFn)
}

type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)

func newRemoteStorage(c *ConfigV2, telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")
}

tenancyMgr := tenancy.NewManager(&c.Tenancy)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

remoteConn, err := newClient(opts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
}
grpcClient := shared.NewGRPCClient(remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
},
Capabilities: grpcClient,
remoteConn: remoteConn,
}, nil
}

func (c *ClientPluginServices) Close() error {
if c.remoteConn != nil {
return c.remoteConn.Close()
}
return nil
}
15 changes: 0 additions & 15 deletions plugin/storage/grpc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,11 @@
package grpc

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"google.golang.org/grpc"
)

func TestBuildRemoteNewClientError(t *testing.T) {
// this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params.
c := &ConfigV2{}
newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return nil, errors.New("test error")
}
_, err := newRemoteStorage(c, component.TelemetrySettings{}, newClientFn)
require.Error(t, err)
require.Contains(t, err.Error(), "error creating remote storage client")
}

func TestDefaultConfigV2(t *testing.T) {
cfg := DefaultConfigV2()
assert.NotEmpty(t, cfg.Timeout)
Expand Down
57 changes: 53 additions & 4 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,25 @@
package grpc

import (
"context"
"errors"
"flag"
"fmt"
"io"

"github.com/spf13/viper"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -39,7 +46,8 @@ type Factory struct {
configV1 Configuration
configV2 *ConfigV2

services *ClientPluginServices
services *ClientPluginServices
remoteConn *grpc.ClientConn
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -82,15 +90,56 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.configV2 = f.configV1.TranslateToConfigV2()
}

telset := component.TelemetrySettings{
Logger: logger,
TracerProvider: f.tracerProvider,
}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return f.configV2.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...)
}

var err error
f.services, err = f.configV2.Build(logger, f.tracerProvider)
f.services, err = f.newRemoteStorage(telset, newClientFn)
if err != nil {
return fmt.Errorf("grpc storage builder failed to create a store: %w", err)
}
logger.Info("Remote storage configuration", zap.Any("configuration", f.configV2))
return nil
}

type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
c := f.configV2
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")
}

tenancyMgr := tenancy.NewManager(&c.Tenancy)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

remoteConn, err := newClient(opts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
}
f.remoteConn = remoteConn
grpcClient := shared.NewGRPCClient(remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
},
Capabilities: grpcClient,
}, nil
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return f.services.Store.SpanReader(), nil
Expand Down Expand Up @@ -144,8 +193,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
if f.services != nil {
errs = append(errs, f.services.Close())
if f.remoteConn != nil {
errs = append(errs, f.remoteConn.Close())
}
errs = append(errs, f.configV1.RemoteTLS.Close())
return errors.Join(errs...)
Expand Down
20 changes: 18 additions & 2 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/exporter/exporterhelper"
Expand All @@ -23,6 +24,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared/mocks"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -87,9 +89,7 @@ func makeFactory(t *testing.T) *Factory {
f.InitFromViper(viper.New(), zap.NewNop())
require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

keepServices := f.services
t.Cleanup(func() {
keepServices.Close()
f.Close()
})

Expand Down Expand Up @@ -118,6 +118,19 @@ func TestNewFactoryError(t *testing.T) {
require.Error(t, err)
assert.Contains(t, err.Error(), "authenticator")
})

t.Run("client", func(t *testing.T) {
// this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params.
f, err := NewFactoryWithConfig(ConfigV2{}, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return nil, errors.New("test error")
}
_, err = f.newRemoteStorage(component.TelemetrySettings{}, newClientFn)
require.Error(t, err)
require.Contains(t, err.Error(), "error creating remote storage client")
})
}

func TestInitFactory(t *testing.T) {
Expand Down Expand Up @@ -156,6 +169,9 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) {
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 1 * time.Second,
},
Tenancy: tenancy.Options{
Enabled: true,
},
}
f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
Expand Down
15 changes: 11 additions & 4 deletions plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand Down Expand Up @@ -68,8 +71,12 @@ func TestFailedTLSFlags(t *testing.T) {
"--grpc-storage.tls.cert=blah", // invalid unless tls.enabled=true
})
require.NoError(t, err)
var cfg Configuration
err = v1InitFromViper(&cfg, v)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse gRPC storage TLS options")
f := NewFactory()
f.configV2 = nil
core, logs := observer.New(zap.NewAtomicLevelAt(zapcore.ErrorLevel))
logger := zap.New(core, zap.WithFatalHook(zapcore.WriteThenPanic))
require.Panics(t, func() { f.InitFromViper(v, logger) })
require.Len(t, logs.All(), 1)
assert.Contains(t, logs.All()[0].Message, "unable to initialize gRPC storage factory")
assert.Contains(t, logs.All()[0].ContextMap()["error"], "failed to parse gRPC storage TLS options")
}

0 comments on commit 922faf0

Please sign in to comment.