diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go index 0f4833ebe839..7f83a8744984 100644 --- a/plugin/storage/grpc/options_test.go +++ b/plugin/storage/grpc/options_test.go @@ -55,3 +55,20 @@ func TestRemoteOptionsWithFlags(t *testing.T) { assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, true) assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second) } + +func TestRemoteOptionsNoTLSWithFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + err := command.ParseFlags([]string{ + "--grpc-storage.server=localhost:2001", + "--grpc-storage.tls.enabled=false", + "--grpc-storage.connection-timeout=60s", + }) + assert.NoError(t, err) + opts.InitFromViper(v) + + assert.Equal(t, opts.Configuration.PluginBinary, "") + assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001") + assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, false) + assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second) +} diff --git a/plugin/storage/grpc/shared/plugin.go b/plugin/storage/grpc/shared/plugin.go index 2027c3fe3807..b1cb9eef009f 100644 --- a/plugin/storage/grpc/shared/plugin.go +++ b/plugin/storage/grpc/shared/plugin.go @@ -36,8 +36,8 @@ type StorageGRPCPlugin struct { ArchiveImpl ArchiveStoragePlugin } -// GRPCServer implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin server. -func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { +// RegisterServer registers the plugin with the server +func (p *StorageGRPCPlugin) RegisterServer(s *grpc.Server) error { server := &grpcServer{ Impl: p.Impl, ArchiveImpl: p.ArchiveImpl, @@ -51,6 +51,11 @@ func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server return nil } +// GRPCServer implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin server. +func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + return p.RegisterServer(s) +} + // GRPCClient implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin client. func (*StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { return NewGRPCClient(c), nil diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 2e16b6ec960d..186078f05a77 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -18,45 +18,103 @@ package integration import ( + "net" "os" + "sync" "testing" + "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" + "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + googleGRPC "google.golang.org/grpc" + "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/grpc" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const defaultPluginBinaryPath = "../../../examples/memstore-plugin/memstore-plugin" +type gRPCServer struct { + errChan chan error + server *googleGRPC.Server + wg sync.WaitGroup +} + +func newgRPCServer() (*gRPCServer, error) { + return &gRPCServer{errChan: make(chan error, 1)}, nil +} + +func (s *gRPCServer) Restart() error { + //stop the server if one already exists + if s.server != nil { + s.server.GracefulStop() + s.wg.Wait() + select { + case err := <-s.errChan: + return err + default: + } + } + + memStorePlugin := &memoryStorePlugin{ + store: memory.NewStore(), + archiveStore: memory.NewStore(), + } + + s.server = googleGRPC.NewServer() + queryPlugin := shared.StorageGRPCPlugin{ + Impl: memStorePlugin, + ArchiveImpl: memStorePlugin, + } + + err := queryPlugin.RegisterServer(s.server) + if err != nil { + return err + } + + listener, err := net.Listen("tcp", "localhost:2001") + if err != nil { + return err + } + s.wg.Add(1) + go func() { + defer s.wg.Done() + err = s.server.Serve(listener) + if err != nil { + select { + case s.errChan <- err: + default: + } + } + }() + return nil +} + type GRPCStorageIntegrationTestSuite struct { StorageIntegration - logger *zap.Logger - pluginBinaryPath string - pluginConfigPath string + logger *zap.Logger + flags []string + server *gRPCServer } func (s *GRPCStorageIntegrationTestSuite) initialize() error { s.logger, _ = testutils.NewLogger() + if s.server != nil { + if err := s.server.Restart(); err != nil { + return err + } + } + f := grpc.NewFactory() v, command := config.Viperize(f.AddFlags) - flags := []string{ - "--grpc-storage-plugin.binary", - s.pluginBinaryPath, - "--grpc-storage-plugin.log-level", - "debug", - } - if s.pluginConfigPath != "" { - flags = append(flags, - "--grpc-storage-plugin.configuration-file", - s.pluginConfigPath, - ) - } - err := command.ParseFlags(flags) + err := command.ParseFlags(s.flags) if err != nil { return err } @@ -87,6 +145,31 @@ func (s *GRPCStorageIntegrationTestSuite) cleanUp() error { return s.initialize() } +type memoryStorePlugin struct { + store *memory.Store + archiveStore *memory.Store +} + +func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader { + return ns.store +} + +func (ns *memoryStorePlugin) SpanReader() spanstore.Reader { + return ns.store +} + +func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer { + return ns.store +} + +func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader { + return ns.archiveStore +} + +func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer { + return ns.archiveStore +} + func TestGRPCStorage(t *testing.T) { binaryPath := os.Getenv("PLUGIN_BINARY_PATH") if binaryPath == "" { @@ -97,9 +180,35 @@ func TestGRPCStorage(t *testing.T) { if configPath == "" { t.Log("PLUGIN_CONFIG_PATH env var not set") } + + flags := []string{ + "--grpc-storage-plugin.binary", binaryPath, + "--grpc-storage-plugin.log-level", "debug", + } + if configPath != "" { + flags = append(flags, + "--grpc-storage-plugin.configuration-file", configPath, + ) + } + + s := &GRPCStorageIntegrationTestSuite{ + flags: flags, + } + require.NoError(t, s.initialize()) + s.IntegrationTestAll(t) +} + +func TestGRPCRemoteStorage(t *testing.T) { + flags := []string{ + "--grpc-storage.server=localhost:2001", + "--grpc-storage.tls.enabled=false", + } + server, err := newgRPCServer() + require.NoError(t, err) + s := &GRPCStorageIntegrationTestSuite{ - pluginBinaryPath: binaryPath, - pluginConfigPath: configPath, + flags: flags, + server: server, } require.NoError(t, s.initialize()) s.IntegrationTestAll(t)