Skip to content

Commit

Permalink
fix: exit on grpc plugin crash (#3604)
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Würbach <[email protected]>

Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
johanneswuerbach and yurishkuro authored Mar 29, 2022
1 parent f02f79f commit 07ca602
Showing 1 changed file with 38 additions and 2 deletions.
40 changes: 38 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)

var pluginHealthCheckInterval = time.Second * 60

// Configuration describes the options to customize the storage behavior.
type Configuration struct {
PluginBinary string `yaml:"binary" mapstructure:"binary"`
Expand All @@ -42,6 +44,10 @@ type Configuration struct {
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`

pluginHealthCheck *time.Ticker
pluginHealthCheckDone chan bool
pluginRPCClient plugin.ClientProtocol
}

// ClientPluginServices defines services plugin can expose and its capabilities
Expand All @@ -59,13 +65,18 @@ type PluginBuilder interface {
// Build instantiates a PluginServices
func (c *Configuration) Build(logger *zap.Logger) (*ClientPluginServices, error) {
if c.PluginBinary != "" {
return c.buildPlugin()
return c.buildPlugin(logger)
} else {
return c.buildRemote(logger)
}
}

func (c *Configuration) Close() error {
if c.pluginHealthCheck != nil {
c.pluginHealthCheck.Stop()
c.pluginHealthCheckDone <- true
}

return c.RemoteTLS.Close()
}

Expand Down Expand Up @@ -103,7 +114,7 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices,
}, nil
}

func (c *Configuration) buildPlugin() (*ClientPluginServices, error) {
func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices, error) {
// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)

Expand Down Expand Up @@ -154,6 +165,10 @@ func (c *Configuration) buildPlugin() (*ClientPluginServices, error) {
raw, shared.StoragePluginIdentifier)
}

if err := c.startPluginHealthCheck(rpcClient, logger); err != nil {
return nil, fmt.Errorf("initial plugin health check failed: %w", err)
}

return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
Expand All @@ -162,3 +177,24 @@ func (c *Configuration) buildPlugin() (*ClientPluginServices, error) {
Capabilities: capabilities,
}, nil
}

func (c *Configuration) startPluginHealthCheck(rpcClient plugin.ClientProtocol, logger *zap.Logger) error {
c.pluginRPCClient = rpcClient
c.pluginHealthCheckDone = make(chan bool)
c.pluginHealthCheck = time.NewTicker(pluginHealthCheckInterval)

go func() {
for {
select {
case <-c.pluginHealthCheckDone:
return
case <-c.pluginHealthCheck.C:
if err := c.pluginRPCClient.Ping(); err != nil {
logger.Fatal("plugin health check failed", zap.Error(err))
}
}
}
}()

return c.pluginRPCClient.Ping()
}

0 comments on commit 07ca602

Please sign in to comment.