Skip to content

Commit

Permalink
fix(inputs.mongodb): add an option to bypass connection errors on sta…
Browse files Browse the repository at this point in the history
…rt (#11629)
  • Loading branch information
papey authored Sep 7, 2022
1 parent f238df2 commit e46f90e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 18 deletions.
6 changes: 6 additions & 0 deletions plugins/inputs/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ All MongoDB server versions from 2.6 and higher are supported.
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Specifies plugin behavior regarding disconnected servers
## Available choices :
## - error: telegraf will return an error on startup if one the servers is unreachable
## - skip: telegraf will skip unreachable servers on both startup and gather
# disconnected_servers_behavior = "error"
```

### Permissions
Expand Down
64 changes: 46 additions & 18 deletions plugins/inputs/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand All @@ -25,19 +26,23 @@ import (
//go:embed sample.conf
var sampleConfig string

var DisconnectedServersBehaviors = []string{"error", "skip"}

type MongoDB struct {
Servers []string
Ssl Ssl
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
GatherTopStat bool
ColStatsDbs []string
Servers []string
Ssl Ssl
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
GatherTopStat bool
DisconnectedServersBehavior string
ColStatsDbs []string
tlsint.ClientConfig

Log telegraf.Logger `toml:"-"`

clients []*Server
clients []*Server
tlsConfig *tls.Config
}

type Ssl struct {
Expand All @@ -50,10 +55,17 @@ func (*MongoDB) SampleConfig() string {
}

func (m *MongoDB) Init() error {
var tlsConfig *tls.Config
if m.DisconnectedServersBehavior == "" {
m.DisconnectedServersBehavior = "error"
}

if err := choice.Check(m.DisconnectedServersBehavior, DisconnectedServersBehaviors); err != nil {
return fmt.Errorf("disconnected_servers_behavior: %w", err)
}

if m.Ssl.Enabled {
// Deprecated TLS config
tlsConfig = &tls.Config{
m.tlsConfig = &tls.Config{
InsecureSkipVerify: m.ClientConfig.InsecureSkipVerify,
}
if len(m.Ssl.CaCerts) == 0 {
Expand All @@ -66,10 +78,10 @@ func (m *MongoDB) Init() error {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
m.tlsConfig.RootCAs = roots
} else {
var err error
tlsConfig, err = m.ClientConfig.TLSConfig()
m.tlsConfig, err = m.ClientConfig.TLSConfig()
if err != nil {
return err
}
Expand All @@ -79,6 +91,11 @@ func (m *MongoDB) Init() error {
m.Servers = []string{"mongodb://127.0.0.1:27017"}
}

return nil
}

// Start runs after init and setup mongodb connections
func (m *MongoDB) Start() error {
for _, connURL := range m.Servers {
if !strings.HasPrefix(connURL, "mongodb://") && !strings.HasPrefix(connURL, "mongodb+srv://") {
// Preserve backwards compatibility for hostnames without a
Expand All @@ -89,28 +106,32 @@ func (m *MongoDB) Init() error {

u, err := url.Parse(connURL)
if err != nil {
return fmt.Errorf("unable to parse connection URL: %q", err)
return fmt.Errorf("unable to parse connection URL: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() //nolint:revive

opts := options.Client().ApplyURI(connURL)
if tlsConfig != nil {
opts.TLSConfig = tlsConfig
if m.tlsConfig != nil {
opts.TLSConfig = m.tlsConfig
}
if opts.ReadPreference == nil {
opts.ReadPreference = readpref.Nearest()
}

client, err := mongo.Connect(ctx, opts)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %q", err)
return fmt.Errorf("unable to connect to MongoDB: %w", err)
}

err = client.Ping(ctx, opts.ReadPreference)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %s", err)
if m.DisconnectedServersBehavior == "error" {
return fmt.Errorf("unable to ping MongoDB: %w", err)
}

m.Log.Errorf("unable to ping MongoDB: %w", err)
}

server := &Server{
Expand All @@ -132,9 +153,16 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
wg.Add(1)
go func(srv *Server) {
defer wg.Done()
if m.DisconnectedServersBehavior == "skip" {
if err := srv.ping(); err != nil {
m.Log.Debugf("failed to ping server: %w", err)
return
}
}

err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs)
if err != nil {
m.Log.Errorf("failed to gather data: %q", err)
m.Log.Errorf("failed to gather data: %w", err)
}
}(client)
}
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/mongodb/mongodb_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func IsAuthorization(err error) bool {
return strings.Contains(err.Error(), "not authorized")
}

func (s *Server) ping() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

return s.client.Ping(ctx, nil)
}

func (s *Server) authLog(err error) {
if IsAuthorization(err) {
s.Log.Debug(err.Error())
Expand Down
54 changes: 54 additions & 0 deletions plugins/inputs/mongodb/mongodb_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

var ServicePort = "27017"
var unreachableMongoEndpoint = "mongodb://user:[email protected]:27017/nop"

func createTestServer(t *testing.T) *testutil.Container {
container := testutil.Container{
Expand Down Expand Up @@ -46,6 +47,8 @@ func TestGetDefaultTagsIntegration(t *testing.T) {
}
err := m.Init()
require.NoError(t, err)
err = m.Start()
require.NoError(t, err)

server := m.clients[0]

Expand Down Expand Up @@ -81,6 +84,8 @@ func TestAddDefaultStatsIntegration(t *testing.T) {
}
err := m.Init()
require.NoError(t, err)
err = m.Start()
require.NoError(t, err)

server := m.clients[0]

Expand All @@ -97,6 +102,55 @@ func TestAddDefaultStatsIntegration(t *testing.T) {
}
}

func TestSkipBehaviorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

m := &MongoDB{
Log: &testutil.CaptureLogger{},
Servers: []string{unreachableMongoEndpoint},
}

m.DisconnectedServersBehavior = "skip"
err := m.Init()
require.NoError(t, err)
err = m.Start()
require.NoError(t, err)

var acc testutil.Accumulator
err = m.Gather(&acc)
require.NoError(t, err)
require.NotContains(t, m.Log.(*testutil.CaptureLogger).LastError, "failed to gather data: ")
}

func TestErrorBehaviorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

m := &MongoDB{
Log: &testutil.CaptureLogger{},
Servers: []string{unreachableMongoEndpoint},
}

err := m.Init()
require.NoError(t, err)
err = m.Start()
require.Error(t, err)

// set to skip to bypass start error
m.DisconnectedServersBehavior = "skip"
err = m.Start()
require.NoError(t, err)
m.DisconnectedServersBehavior = "error"

var acc testutil.Accumulator
err = m.Gather(&acc)
require.NoError(t, err)
require.Contains(t, m.Log.(*testutil.CaptureLogger).LastError, "failed to gather data: ")
}

func TestPoolStatsVersionCompatibility(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/mongodb/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Specifies plugin behavior regarding disconnected servers
## Available choices :
## - error: telegraf will return an error on startup if one the servers is unreachable
## - skip: telegraf will skip unreachable servers on both startup and gather
# disconnected_servers_behavior = "error"

0 comments on commit e46f90e

Please sign in to comment.