From c86a3de3f6678f682efea0f9efa68d0d46d5286f Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Wed, 10 Nov 2021 08:46:41 -0500 Subject: [PATCH] Add remote gRPC option for storage plugin Adds the option to host a gRPC storage API on a remote endpoint using regular gRPC. Previously the plugin system only supported local socket connections through the go-hashicorp plugin system. Signed-off-by: Matvey Arye --- plugin/storage/grpc/config/config.go | 59 +++++++++++++++++++++-- plugin/storage/grpc/options.go | 27 +++++++++-- plugin/storage/grpc/options_test.go | 22 +++++++++ plugin/storage/grpc/shared/grpc_client.go | 12 +++++ plugin/storage/grpc/shared/plugin.go | 9 +--- 5 files changed, 114 insertions(+), 15 deletions(-) diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 72a04a422ee..9424dfcd455 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -15,24 +15,32 @@ package config import ( + "context" "fmt" "os/exec" "runtime" + "time" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/opentracing/opentracing-go" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" ) // Configuration describes the options to customize the storage behavior. type Configuration struct { - PluginBinary string `yaml:"binary" mapstructure:"binary"` - PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"` - PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"` + PluginBinary string `yaml:"binary" mapstructure:"binary"` + PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"` + PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"` + RemoteServerAddr string `yaml:"server" mapstructure:"server"` + RemoteTLS bool `yaml:"tls" mapstructure:"tls"` + RemoteCAFile string `yaml:"cafile" mapstructure:"cafile"` + RemoteServerHostOverride string `yaml:"server-host-override" mapstructure:"server-host-override"` + RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"` } // ClientPluginServices defines services plugin can expose and its capabilities @@ -48,6 +56,51 @@ type PluginBuilder interface { // Build instantiates a PluginServices func (c *Configuration) Build() (*ClientPluginServices, error) { + if c.PluginBinary != "" { + return c.buildPlugin() + } else { + return c.BuildRemote() + } +} + +func (c *Configuration) BuildRemote() (*ClientPluginServices, error) { + opts := []grpc.DialOption{ + grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())), + grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())), + grpc.WithBlock(), + } + var err error + if c.RemoteTLS { + if c.RemoteCAFile == "" { + return nil, fmt.Errorf("ca file is required with TLS") + } + creds, err := credentials.NewClientTLSFromFile(c.RemoteCAFile, c.RemoteServerHostOverride) + if err != nil { + return nil, fmt.Errorf("failed to create TLS credentials %w", err) + } + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithInsecure()) + } + + ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout) + defer cancel() + conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...) + if err != nil { + return nil, fmt.Errorf("error connecting to Promscale GRPC server: %w", err) + } + + grpcClient := shared.NewGRPCClient(conn) + return &ClientPluginServices{ + PluginServices: shared.PluginServices{ + Store: grpcClient, + ArchiveStore: grpcClient, + }, + Capabilities: grpcClient, + }, nil +} + +func (c *Configuration) buildPlugin() (*ClientPluginServices, error) { // #nosec G204 cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile) diff --git a/plugin/storage/grpc/options.go b/plugin/storage/grpc/options.go index 7938a9dde9c..47803cb6963 100644 --- a/plugin/storage/grpc/options.go +++ b/plugin/storage/grpc/options.go @@ -16,6 +16,7 @@ package grpc import ( "flag" + "time" "github.com/spf13/viper" @@ -23,10 +24,16 @@ import ( ) const ( - pluginBinary = "grpc-storage-plugin.binary" - pluginConfigurationFile = "grpc-storage-plugin.configuration-file" - pluginLogLevel = "grpc-storage-plugin.log-level" - defaultPluginLogLevel = "warn" + pluginBinary = "grpc-storage-plugin.binary" + pluginConfigurationFile = "grpc-storage-plugin.configuration-file" + pluginLogLevel = "grpc-storage-plugin.log-level" + pluginServer = "grpc-storage-plugin.server" + pluginTLS = "grpc-storage-plugin.tls" + pluginCAFile = "grpc-storage-plugin.ca-file" + pluginServerHostOverride = "grpc-storage-plugin.server-host-override" + pluginConnectionTimeout = "grpc-storage-plugin.connection-timeout" + defaultPluginLogLevel = "warn" + defaultConnectionTimeout = time.Duration(5 * time.Second) ) // Options contains GRPC plugins configs and provides the ability @@ -40,6 +47,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.String(pluginBinary, "", "The location of the plugin binary") flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg") flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger") + flagSet.String(pluginServer, "", "The server address for the remote gRPC server") + flagSet.Bool(pluginTLS, false, "Whether to use TLS for the remote connection") + flagSet.String(pluginCAFile, "", "The CA file for the remote connection") + flagSet.String(pluginServerHostOverride, "", "The server host override for the remote connection") + flagSet.Duration(pluginConnectionTimeout, defaultConnectionTimeout, "The connection timeout for connecting to the remote server") + } // InitFromViper initializes Options with properties from viper @@ -47,4 +60,10 @@ func (opt *Options) InitFromViper(v *viper.Viper) { opt.Configuration.PluginBinary = v.GetString(pluginBinary) opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile) opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel) + opt.Configuration.RemoteServerAddr = v.GetString(pluginServer) + opt.Configuration.RemoteTLS = v.GetBool(pluginTLS) + opt.Configuration.RemoteCAFile = v.GetString(pluginCAFile) + opt.Configuration.RemoteServerHostOverride = v.GetString(pluginServerHostOverride) + opt.Configuration.RemoteConnectTimeout = v.GetDuration(pluginConnectionTimeout) + } diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go index 5e563c5c15c..bc2046dd362 100644 --- a/plugin/storage/grpc/options_test.go +++ b/plugin/storage/grpc/options_test.go @@ -16,6 +16,7 @@ package grpc import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -37,3 +38,24 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, opts.Configuration.PluginConfigurationFile, "config.json") assert.Equal(t, opts.Configuration.PluginLogLevel, "debug") } + +func TestRemoteOptionsWithFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + err := command.ParseFlags([]string{ + "--grpc-storage-plugin.server=localhost:2001", + "--grpc-storage-plugin.tls=true", + "--grpc-storage-plugin.ca-file=cafile", + "--grpc-storage-plugin.server-host-override=example.com", + "--grpc-storage-plugin.connection-timeout=60s", + }) + assert.NoError(t, err) + opts.InitFromViper(v) + + assert.Equal(t, opts.Configuration.PluginBinary, "") + assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001") + assert.Equal(t, opts.Configuration.RemoteTLS, true) + assert.Equal(t, opts.Configuration.RemoteCAFile, "cafile") + assert.Equal(t, opts.Configuration.RemoteServerHostOverride, "example.com") + assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second) +} diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index 33d166de717..2e1af1a246c 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -20,6 +20,7 @@ import ( "io" "time" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -53,6 +54,17 @@ type grpcClient struct { depsReaderClient storage_v1.DependenciesReaderPluginClient } +func NewGRPCClient(c *grpc.ClientConn) *grpcClient { + return &grpcClient{ + readerClient: storage_v1.NewSpanReaderPluginClient(c), + writerClient: storage_v1.NewSpanWriterPluginClient(c), + archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c), + archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c), + capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c), + depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), + } +} + // ContextUpgradeFunc is a functional type that can be composed to upgrade context type ContextUpgradeFunc func(ctx context.Context) context.Context diff --git a/plugin/storage/grpc/shared/plugin.go b/plugin/storage/grpc/shared/plugin.go index 6ac3c2f8c6e..2027c3fe380 100644 --- a/plugin/storage/grpc/shared/plugin.go +++ b/plugin/storage/grpc/shared/plugin.go @@ -53,12 +53,5 @@ func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server // GRPCClient implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin client. func (*StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { - return &grpcClient{ - readerClient: storage_v1.NewSpanReaderPluginClient(c), - writerClient: storage_v1.NewSpanWriterPluginClient(c), - archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c), - archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c), - capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c), - depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), - }, nil + return NewGRPCClient(c), nil }