Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Add support for GRPC storarge #5228

Merged
merged 8 commits into from
Feb 27, 2024
35 changes: 35 additions & 0 deletions cmd/jaeger/grpc_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: external-storage
trace_storage_archive: external-storage-archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
grpc:
external-storage:
server: localhost:17271
connection-timeout: 5s
external-storage-archive:
server: localhost:17281
connection-timeout: 5s

receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: external-storage
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
Expand All @@ -29,6 +31,7 @@ type MemoryStorage struct {

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
//nolint:govet // The remoteRPCClient field in GRPC.Configuration contains error type
if reflect.DeepEqual(*cfg, *emptyCfg) {
return fmt.Errorf("%s: no storage type present in config", ID)
} else {
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -109,6 +111,12 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{
ext: s,
storageKind: "grpc",
cfg: s.config.GRPC,
builder: grpc.NewFactoryWithConfig,
}
esStarter := &starter[esCfg.Configuration, *es.Factory]{
ext: s,
storageKind: "elasticsearch",
Expand All @@ -119,6 +127,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
grpcStarter.build,
esStarter.build,
// TODO add support for other backends
}
Expand Down
9 changes: 7 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Configuration struct {
pluginHealthCheck *time.Ticker
pluginHealthCheckDone chan bool
pluginRPCClient plugin.ClientProtocol
remoteConn *grpc.ClientConn
}

// ClientPluginServices defines services plugin can expose and its capabilities
Expand Down Expand Up @@ -78,6 +79,9 @@ func (c *Configuration) Close() error {
c.pluginHealthCheck.Stop()
c.pluginHealthCheckDone <- true
}
if c.remoteConn != nil {
c.remoteConn.Close()
}

return c.RemoteTLS.Close()
}
Expand Down Expand Up @@ -106,12 +110,13 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
var err error
c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
if err != nil {
return nil, fmt.Errorf("error connecting to remote storage: %w", err)
}

grpcClient := shared.NewGRPCClient(conn)
grpcClient := shared.NewGRPCClient(c.remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
Expand Down
15 changes: 15 additions & 0 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ func NewFactory() *Factory {
return &Factory{}
}

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.InitFromOptions(Options{Configuration: cfg})
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.options.AddFlags(flagSet)
Expand Down
27 changes: 27 additions & 0 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package grpc

import (
"errors"
"log"
"net"
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -143,6 +147,29 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCStorageFactoryWithConfig(t *testing.T) {
cfg := grpcConfig.Configuration{}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage")

lis, err := net.Listen("tcp", ":0")
require.NoError(t, err, "failed to listen")

s := grpc.NewServer()
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()
defer s.Stop()

cfg.RemoteServerAddr = lis.Addr().String()
cfg.RemoteConnectTimeout = 1 * time.Second
f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, f.Close())
}

func TestGRPCStorageFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
Expand Down
11 changes: 8 additions & 3 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func (s *gRPCServer) Restart() error {

type GRPCStorageIntegrationTestSuite struct {
StorageIntegration
logger *zap.Logger
flags []string
server *gRPCServer
logger *zap.Logger
flags []string
factory *grpc.Factory
server *gRPCServer
}

func (s *GRPCStorageIntegrationTestSuite) initialize() error {
Expand All @@ -120,6 +121,7 @@ func (s *GRPCStorageIntegrationTestSuite) initialize() error {
if err := f.Initialize(metrics.NullFactory, s.logger); err != nil {
return err
}
s.factory = f

if s.SpanWriter, err = f.CreateSpanWriter(); err != nil {
return err
Expand All @@ -140,6 +142,9 @@ func (s *GRPCStorageIntegrationTestSuite) refresh() error {
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp() error {
if err := s.factory.Close(); err != nil {
return err
}
return s.initialize()
}

Expand Down
Loading