Skip to content

Commit

Permalink
Add tests for remote gRPC
Browse files Browse the repository at this point in the history
We also created a RegisterServer method on StorageGRPCPlugin to
register be able to easily create remote GRPC servers.
  • Loading branch information
cevian committed Nov 12, 2021
1 parent 2dc5b4d commit 8172504
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 20 deletions.
17 changes: 17 additions & 0 deletions plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,20 @@ func TestRemoteOptionsWithFlags(t *testing.T) {
assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, true)
assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second)
}

func TestRemoteOptionsNoTLSWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=localhost:2001",
"--grpc-storage.tls.enabled=false",
"--grpc-storage.connection-timeout=60s",
})
assert.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, opts.Configuration.PluginBinary, "")
assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001")
assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, false)
assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second)
}
9 changes: 7 additions & 2 deletions plugin/storage/grpc/shared/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type StorageGRPCPlugin struct {
ArchiveImpl ArchiveStoragePlugin
}

// GRPCServer implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin server.
func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
// RegisterServer registers the plugin with the server
func (p *StorageGRPCPlugin) RegisterServer(s *grpc.Server) error {
server := &grpcServer{
Impl: p.Impl,
ArchiveImpl: p.ArchiveImpl,
Expand All @@ -51,6 +51,11 @@ func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server
return nil
}

// GRPCServer implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin server.
func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
return p.RegisterServer(s)
}

// GRPCClient implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin client.
func (*StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return NewGRPCClient(c), nil
Expand Down
145 changes: 127 additions & 18 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,103 @@
package integration

import (
"net"
"os"
"sync"
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

googleGRPC "google.golang.org/grpc"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const defaultPluginBinaryPath = "../../../examples/memstore-plugin/memstore-plugin"

type gRPCServer struct {
errChan chan error
server *googleGRPC.Server
wg sync.WaitGroup
}

func newgRPCServer() (*gRPCServer, error) {
return &gRPCServer{errChan: make(chan error, 1)}, nil
}

func (s *gRPCServer) Restart() error {
//stop the server if one already exists
if s.server != nil {
s.server.GracefulStop()
s.wg.Wait()
select {
case err := <-s.errChan:
return err
default:
}
}

memStorePlugin := &memoryStorePlugin{
store: memory.NewStore(),
archiveStore: memory.NewStore(),
}

s.server = googleGRPC.NewServer()
queryPlugin := shared.StorageGRPCPlugin{
Impl: memStorePlugin,
ArchiveImpl: memStorePlugin,
}

err := queryPlugin.RegisterServer(s.server)
if err != nil {
return err
}

listener, err := net.Listen("tcp", "localhost:2001")
if err != nil {
return err
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
err = s.server.Serve(listener)
if err != nil {
select {
case s.errChan <- err:
default:
}
}
}()
return nil
}

type GRPCStorageIntegrationTestSuite struct {
StorageIntegration
logger *zap.Logger
pluginBinaryPath string
pluginConfigPath string
logger *zap.Logger
flags []string
server *gRPCServer
}

func (s *GRPCStorageIntegrationTestSuite) initialize() error {
s.logger, _ = testutils.NewLogger()

if s.server != nil {
if err := s.server.Restart(); err != nil {
return err
}
}

f := grpc.NewFactory()
v, command := config.Viperize(f.AddFlags)
flags := []string{
"--grpc-storage-plugin.binary",
s.pluginBinaryPath,
"--grpc-storage-plugin.log-level",
"debug",
}
if s.pluginConfigPath != "" {
flags = append(flags,
"--grpc-storage-plugin.configuration-file",
s.pluginConfigPath,
)
}
err := command.ParseFlags(flags)
err := command.ParseFlags(s.flags)
if err != nil {
return err
}
Expand Down Expand Up @@ -87,6 +145,31 @@ func (s *GRPCStorageIntegrationTestSuite) cleanUp() error {
return s.initialize()
}

type memoryStorePlugin struct {
store *memory.Store
archiveStore *memory.Store
}

func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader {
return ns.store
}

func (ns *memoryStorePlugin) SpanReader() spanstore.Reader {
return ns.store
}

func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}

func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer {
return ns.archiveStore
}

func TestGRPCStorage(t *testing.T) {
binaryPath := os.Getenv("PLUGIN_BINARY_PATH")
if binaryPath == "" {
Expand All @@ -97,9 +180,35 @@ func TestGRPCStorage(t *testing.T) {
if configPath == "" {
t.Log("PLUGIN_CONFIG_PATH env var not set")
}

flags := []string{
"--grpc-storage-plugin.binary", binaryPath,
"--grpc-storage-plugin.log-level", "debug",
}
if configPath != "" {
flags = append(flags,
"--grpc-storage-plugin.configuration-file", configPath,
)
}

s := &GRPCStorageIntegrationTestSuite{
flags: flags,
}
require.NoError(t, s.initialize())
s.IntegrationTestAll(t)
}

func TestGRPCRemoteStorage(t *testing.T) {
flags := []string{
"--grpc-storage.server=localhost:2001",
"--grpc-storage.tls.enabled=false",
}
server, err := newgRPCServer()
require.NoError(t, err)

s := &GRPCStorageIntegrationTestSuite{
pluginBinaryPath: binaryPath,
pluginConfigPath: configPath,
flags: flags,
server: server,
}
require.NoError(t, s.initialize())
s.IntegrationTestAll(t)
Expand Down

0 comments on commit 8172504

Please sign in to comment.