Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Add support for GRPC storarge #5228

Merged
merged 8 commits into from
Feb 27, 2024
35 changes: 35 additions & 0 deletions cmd/jaeger/grpc_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: external-storage
trace_storage_archive: external-storage-archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
grpc:
external-storage:
server: localhost:17271
connection-timeout: 5s
external-storage-archive:
server: localhost:17281
connection-timeout: 5s

receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: external-storage
3 changes: 3 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
Expand All @@ -29,6 +31,7 @@ type MemoryStorage struct {

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
//nolint:govet // The remoteRPCClient field in GRPC.Configuration contains error type
if reflect.DeepEqual(*cfg, *emptyCfg) {
return fmt.Errorf("%s: no storage type present in config", ID)
} else {
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -109,6 +111,12 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{
ext: s,
storageKind: "grpc",
cfg: s.config.GRPC,
builder: grpc.NewFactoryWithConfig,
}
esStarter := &starter[esCfg.Configuration, *es.Factory]{
ext: s,
storageKind: "elasticsearch",
Expand All @@ -119,6 +127,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
grpcStarter.build,
esStarter.build,
// TODO add support for other backends
}
Expand Down
9 changes: 7 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Configuration struct {
pluginHealthCheck *time.Ticker
pluginHealthCheckDone chan bool
pluginRPCClient plugin.ClientProtocol
remoteConn *grpc.ClientConn
}

// ClientPluginServices defines services plugin can expose and its capabilities
Expand Down Expand Up @@ -78,6 +79,9 @@ func (c *Configuration) Close() error {
c.pluginHealthCheck.Stop()
c.pluginHealthCheckDone <- true
}
if c.remoteConn != nil {
c.remoteConn.Close()
}

return c.RemoteTLS.Close()
}
Expand Down Expand Up @@ -106,12 +110,13 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
var err error
c.remoteConn, err = grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
if err != nil {
return nil, fmt.Errorf("error connecting to remote storage: %w", err)
}

grpcClient := shared.NewGRPCClient(conn)
grpcClient := shared.NewGRPCClient(c.remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
Expand Down
15 changes: 15 additions & 0 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ func NewFactory() *Factory {
return &Factory{}
}

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.InitFromOptions(Options{Configuration: cfg})
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.options.AddFlags(flagSet)
Expand Down
32 changes: 32 additions & 0 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package grpc

import (
"errors"
"log"
"net"
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -143,6 +147,34 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCStorageFactoryWithConfig(t *testing.T) {
cfg := grpcConfig.Configuration{}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.ErrorContains(t, err, "grpc-plugin builder failed to create a store: error connecting to remote storage")

lis, err := net.Listen("tcp", ":0")
require.NoError(t, err, "failed to listen")

s := grpc.NewServer()
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()
defer s.Stop()

cfg.RemoteServerAddr = lis.Addr().String()
cfg.RemoteConnectTimeout = 1 * time.Second
f, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
defer func() {
err := f.Close()
if err != nil {
log.Fatalf("Client exited with error: %v", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err := f.Close()
if err != nil {
log.Fatalf("Client exited with error: %v", err)
}
require.NoError(t, f.Close())

}()
require.NoError(t, err)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

func TestGRPCStorageFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
Expand Down
13 changes: 10 additions & 3 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,20 @@ func (s *gRPCServer) Restart() error {

type GRPCStorageIntegrationTestSuite struct {
StorageIntegration
logger *zap.Logger
flags []string
server *gRPCServer
logger *zap.Logger
flags []string
factory *grpc.Factory
server *gRPCServer
}

func (s *GRPCStorageIntegrationTestSuite) initialize() error {
s.logger, _ = testutils.NewLogger()

if s.factory != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would it be not nil at this point if the factory is only created in L119? Is it from another run of the test? If so we should close it as part of the clean-up, not as part of starting a new test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, my initial thought was because it restarts the server when initializing, so I think it makes sense if I put it there too. But, moving it to the clean-up process does make things clearer. I've changed it.

if err := s.factory.Close(); err != nil {
return err
}
}
if s.server != nil {
if err := s.server.Restart(); err != nil {
return err
Expand All @@ -120,6 +126,7 @@ func (s *GRPCStorageIntegrationTestSuite) initialize() error {
if err := f.Initialize(metrics.NullFactory, s.logger); err != nil {
return err
}
s.factory = f

if s.SpanWriter, err = f.CreateSpanWriter(); err != nil {
return err
Expand Down
Loading