Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.mongodb): add an option to bypass connection errors on start #11629

Merged
merged 27 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b623edb
fix(inputs/mongodb): add an option to by pass connexions error on init
papey Aug 6, 2022
f54547f
Move connection logic in Start func
papey Aug 8, 2022
3d45ac5
Prefer %w when formatting errors
papey Aug 10, 2022
534a95f
Change log level from info to error
papey Aug 10, 2022
9981b91
Chore: replace init with start in docs
papey Aug 10, 2022
7d94083
Fix: add start to test
papey Aug 10, 2022
b3830f8
Add test that assert for error when option is not set
papey Aug 10, 2022
664d451
Group together internal vars
papey Aug 10, 2022
f1f0206
Migrate tests to testutil.Container
papey Aug 10, 2022
35f16ca
Fix: ensure start on integration tests
papey Aug 10, 2022
4998099
Remove useless containers in tests
papey Aug 13, 2022
e7b050c
Add ping wrapper to mongo db server struct
papey Aug 13, 2022
dc1ed4e
Add ignore unreachable hosts to Gather() func
papey Aug 13, 2022
601f484
Chore: remove ugly line return
papey Aug 13, 2022
97e94d5
Fix: add missing err check
papey Aug 13, 2022
d0ae2fd
Fix: typo
papey Aug 22, 2022
2194d53
Chore: remove emtpy comment Lines
papey Aug 22, 2022
c8b23d2
Reduce timeout in ping function
papey Aug 23, 2022
b0071c0
Change unreachable servers config type and name to handle various cases
papey Aug 23, 2022
b087bdc
Chore: s/connect/ping
papey Aug 23, 2022
0e63599
Chore: s/default/error
papey Aug 23, 2022
6ba7824
Chore: better error on behavior check
papey Aug 23, 2022
35335eb
Refacto: prefer single liner
papey Aug 30, 2022
29f46b1
Replace retry with skip for clarity
papey Aug 30, 2022
6c490d8
Fix: docs
papey Aug 30, 2022
22b0dd5
Add debug log message in case of ping error
papey Aug 31, 2022
3f2bc9f
Chore: harmonize error logs
papey Sep 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/inputs/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ All MongoDB server versions from 2.6 and higher are supported.
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Specifies plugin behavior regarding disconnected servers
## Available choices :
## - error: telegraf will return an error on startup if one the servers is unreachable
## - skip: telegraf will skip unreachable servers on both startup and gather
# disconnected_servers_behavior = "error"
```

### Permissions
Expand Down
64 changes: 46 additions & 18 deletions plugins/inputs/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand All @@ -25,19 +26,23 @@ import (
//go:embed sample.conf
var sampleConfig string

var DisconnectedServersBehaviors = []string{"error", "skip"}

type MongoDB struct {
Servers []string
Ssl Ssl
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
GatherTopStat bool
ColStatsDbs []string
Servers []string
Ssl Ssl
GatherClusterStatus bool
GatherPerdbStats bool
GatherColStats bool
GatherTopStat bool
DisconnectedServersBehavior string
ColStatsDbs []string
tlsint.ClientConfig

Log telegraf.Logger `toml:"-"`

clients []*Server
clients []*Server
tlsConfig *tls.Config
}

type Ssl struct {
Expand All @@ -50,10 +55,17 @@ func (*MongoDB) SampleConfig() string {
}

func (m *MongoDB) Init() error {
var tlsConfig *tls.Config
if m.DisconnectedServersBehavior == "" {
m.DisconnectedServersBehavior = "error"
}

if err := choice.Check(m.DisconnectedServersBehavior, DisconnectedServersBehaviors); err != nil {
return fmt.Errorf("disconnected_servers_behavior: %w", err)
}

if m.Ssl.Enabled {
// Deprecated TLS config
tlsConfig = &tls.Config{
m.tlsConfig = &tls.Config{
InsecureSkipVerify: m.ClientConfig.InsecureSkipVerify,
}
if len(m.Ssl.CaCerts) == 0 {
Expand All @@ -66,10 +78,10 @@ func (m *MongoDB) Init() error {
return fmt.Errorf("failed to parse root certificate")
}
}
tlsConfig.RootCAs = roots
m.tlsConfig.RootCAs = roots
} else {
var err error
tlsConfig, err = m.ClientConfig.TLSConfig()
m.tlsConfig, err = m.ClientConfig.TLSConfig()
if err != nil {
return err
}
Expand All @@ -79,6 +91,11 @@ func (m *MongoDB) Init() error {
m.Servers = []string{"mongodb://127.0.0.1:27017"}
}

return nil
}

// Start runs after init and setup mongodb connections
func (m *MongoDB) Start() error {
for _, connURL := range m.Servers {
if !strings.HasPrefix(connURL, "mongodb://") && !strings.HasPrefix(connURL, "mongodb+srv://") {
// Preserve backwards compatibility for hostnames without a
Expand All @@ -89,28 +106,32 @@ func (m *MongoDB) Init() error {

u, err := url.Parse(connURL)
if err != nil {
return fmt.Errorf("unable to parse connection URL: %q", err)
return fmt.Errorf("unable to parse connection URL: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() //nolint:revive

opts := options.Client().ApplyURI(connURL)
if tlsConfig != nil {
opts.TLSConfig = tlsConfig
if m.tlsConfig != nil {
opts.TLSConfig = m.tlsConfig
}
if opts.ReadPreference == nil {
opts.ReadPreference = readpref.Nearest()
}

client, err := mongo.Connect(ctx, opts)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %q", err)
return fmt.Errorf("unable to connect to MongoDB: %w", err)
}

err = client.Ping(ctx, opts.ReadPreference)
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %s", err)
if m.DisconnectedServersBehavior == "error" {
return fmt.Errorf("unable to ping MongoDB: %w", err)
}

m.Log.Errorf("unable to ping MongoDB: %w", err)
}

server := &Server{
Expand All @@ -132,9 +153,16 @@ func (m *MongoDB) Gather(acc telegraf.Accumulator) error {
wg.Add(1)
go func(srv *Server) {
defer wg.Done()
if m.DisconnectedServersBehavior == "skip" {
if err := srv.ping(); err != nil {
Hipska marked this conversation as resolved.
Show resolved Hide resolved
m.Log.Debugf("failed to ping server: %w", err)
return
}
}

err := srv.gatherData(acc, m.GatherClusterStatus, m.GatherPerdbStats, m.GatherColStats, m.GatherTopStat, m.ColStatsDbs)
if err != nil {
m.Log.Errorf("failed to gather data: %q", err)
m.Log.Errorf("failed to gather data: %w", err)
}
}(client)
}
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/mongodb/mongodb_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func IsAuthorization(err error) bool {
return strings.Contains(err.Error(), "not authorized")
}

func (s *Server) ping() error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

return s.client.Ping(ctx, nil)
}

func (s *Server) authLog(err error) {
if IsAuthorization(err) {
s.Log.Debug(err.Error())
Expand Down
54 changes: 54 additions & 0 deletions plugins/inputs/mongodb/mongodb_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

var ServicePort = "27017"
var unreachableMongoEndpoint = "mongodb://user:[email protected]:27017/nop"

func createTestServer(t *testing.T) *testutil.Container {
container := testutil.Container{
Expand Down Expand Up @@ -46,6 +47,8 @@ func TestGetDefaultTagsIntegration(t *testing.T) {
}
err := m.Init()
require.NoError(t, err)
err = m.Start()
require.NoError(t, err)

server := m.clients[0]

Expand Down Expand Up @@ -81,6 +84,8 @@ func TestAddDefaultStatsIntegration(t *testing.T) {
}
err := m.Init()
require.NoError(t, err)
err = m.Start()
require.NoError(t, err)

server := m.clients[0]

Expand All @@ -97,6 +102,55 @@ func TestAddDefaultStatsIntegration(t *testing.T) {
}
}

func TestSkipBehaviorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

m := &MongoDB{
Log: &testutil.CaptureLogger{},
Servers: []string{unreachableMongoEndpoint},
}

m.DisconnectedServersBehavior = "skip"
err := m.Init()
require.NoError(t, err)
err = m.Start()
require.NoError(t, err)

var acc testutil.Accumulator
err = m.Gather(&acc)
require.NoError(t, err)
require.NotContains(t, m.Log.(*testutil.CaptureLogger).LastError, "failed to gather data: ")
}

func TestErrorBehaviorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

m := &MongoDB{
Log: &testutil.CaptureLogger{},
Servers: []string{unreachableMongoEndpoint},
}

err := m.Init()
require.NoError(t, err)
err = m.Start()
require.Error(t, err)

// set to skip to bypass start error
m.DisconnectedServersBehavior = "skip"
err = m.Start()
require.NoError(t, err)
m.DisconnectedServersBehavior = "error"

var acc testutil.Accumulator
err = m.Gather(&acc)
require.NoError(t, err)
require.Contains(t, m.Log.(*testutil.CaptureLogger).LastError, "failed to gather data: ")
}

func TestPoolStatsVersionCompatibility(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/mongodb/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Specifies plugin behavior regarding disconnected servers
## Available choices :
## - error: telegraf will return an error on startup if one the servers is unreachable
## - skip: telegraf will skip unreachable servers on both startup and gather
# disconnected_servers_behavior = "error"