Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.sql): Add 'disconnected_servers_behavior' field in the configuration #13289

Merged
merged 4 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/inputs/sql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ to use them.
# connection_max_open = 0
# connection_max_idle = auto

## Specifies plugin behavior regarding disconnected servers
## Available choices :
## - error: telegraf will return an error on startup if one the servers is unreachable
## - ignore: telegraf will ignore unreachable servers on both startup and gather
# disconnected_servers_behavior = "error"

[[inputs.sql.query]]
## Query to perform on the server
query="SELECT user,state,latency,score FROM Scoreboard WHERE application > 0"
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/sql/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
# connection_max_open = 0
# connection_max_idle = auto

## Specifies plugin behavior regarding disconnected servers
## Available choices :
## - error: telegraf will return an error on startup if one the servers is unreachable
## - ignore: telegraf will ignore unreachable servers on both startup and gather
# disconnected_servers_behavior = "error"

[[inputs.sql.query]]
## Query to perform on the server
query="SELECT user,state,latency,score FROM Scoreboard WHERE application > 0"
Expand Down
88 changes: 71 additions & 17 deletions plugins/inputs/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var sampleConfig string

const magicIdleCount = -int(^uint(0) >> 1)

var disconnectedServersBehavior = []string{"error", "ignore"}

type Query struct {
Query string `toml:"query"`
Script string `toml:"query_script"`
Expand Down Expand Up @@ -205,18 +207,20 @@ func (q *Query) parse(acc telegraf.Accumulator, rows *dbsql.Rows, t time.Time) (
}

type SQL struct {
Driver string `toml:"driver"`
Dsn config.Secret `toml:"dsn"`
Timeout config.Duration `toml:"timeout"`
MaxIdleTime config.Duration `toml:"connection_max_idle_time"`
MaxLifetime config.Duration `toml:"connection_max_life_time"`
MaxOpenConnections int `toml:"connection_max_open"`
MaxIdleConnections int `toml:"connection_max_idle"`
Queries []Query `toml:"query"`
Log telegraf.Logger `toml:"-"`

driverName string
db *dbsql.DB
Driver string `toml:"driver"`
Dsn config.Secret `toml:"dsn"`
Timeout config.Duration `toml:"timeout"`
MaxIdleTime config.Duration `toml:"connection_max_idle_time"`
MaxLifetime config.Duration `toml:"connection_max_life_time"`
MaxOpenConnections int `toml:"connection_max_open"`
MaxIdleConnections int `toml:"connection_max_idle"`
Queries []Query `toml:"query"`
Log telegraf.Logger `toml:"-"`
DisconnectedServersBehavior string `toml:"disconnected_servers_behavior"`

driverName string
db *dbsql.DB
serverConnected bool
}

func (*SQL) SampleConfig() string {
Expand Down Expand Up @@ -351,22 +355,30 @@ func (s *SQL) Init() error {
return fmt.Errorf("driver %q not supported use one of %v", s.Driver, availDrivers)
}

if s.DisconnectedServersBehavior == "" {
s.DisconnectedServersBehavior = "error"
}

if !choice.Contains(s.DisconnectedServersBehavior, disconnectedServersBehavior) {
return fmt.Errorf("%q is not a valid value for disconnected_servers_behavior", s.DisconnectedServersBehavior)
}

return nil
}

func (s *SQL) Start(_ telegraf.Accumulator) error {
var err error

func (s *SQL) setupConnection() error {
// Connect to the database server
dsnSecret, err := s.Dsn.Get()
if err != nil {
return fmt.Errorf("getting DSN failed: %w", err)
}
dsn := string(dsnSecret)
config.ReleaseSecret(dsnSecret)

s.Log.Debug("Connecting...")
s.db, err = dbsql.Open(s.driverName, dsn)
if err != nil {
// should return since the error is most likely with invalid DSN string format
return err
}

Expand All @@ -375,16 +387,31 @@ func (s *SQL) Start(_ telegraf.Accumulator) error {
s.db.SetConnMaxLifetime(time.Duration(s.MaxLifetime))
s.db.SetMaxOpenConns(s.MaxOpenConnections)
s.db.SetMaxIdleConns(s.MaxIdleConnections)
return nil
}

func (s *SQL) ping() error {
// Test if the connection can be established
s.Log.Debug("Testing connectivity...")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.Timeout))
err = s.db.PingContext(ctx)
err := s.db.PingContext(ctx)
cancel()
if err != nil {
return fmt.Errorf("connecting to database failed: %w", err)
if s.DisconnectedServersBehavior == "error" {
return fmt.Errorf("unable to connect to database: %w", err)
}
s.Log.Errorf("unable to connect to database: %s", err)
return nil
neelayu marked this conversation as resolved.
Show resolved Hide resolved
}
s.serverConnected = true
return nil
}

func (s *SQL) prepareStatements() bool {
if !s.serverConnected {
s.Log.Debug("connection to server not established yet, skipping prepare statement")
return false
}
neelayu marked this conversation as resolved.
Show resolved Hide resolved
// Prepare the statements
for i, q := range s.Queries {
s.Log.Debugf("Preparing statement %q...", q.Query)
Expand All @@ -402,6 +429,18 @@ func (s *SQL) Start(_ telegraf.Accumulator) error {
}
s.Queries[i].statement = stmt
}
return true
}

func (s *SQL) Start(_ telegraf.Accumulator) error {
if err := s.setupConnection(); err != nil {
return err
}

if err := s.ping(); err != nil {
return err
}
neelayu marked this conversation as resolved.
Show resolved Hide resolved
_ = s.prepareStatements()
neelayu marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Expand All @@ -425,6 +464,21 @@ func (s *SQL) Stop() {
}

func (s *SQL) Gather(acc telegraf.Accumulator) error {
statementsPrepared := true
// during plugin startup, it is possible that the server was not reachable.
// we try pinging the server in this collection cycle.
// we are only concerned with `prepareStatements` function to complete(return true), just once.
if !s.serverConnected {
if err := s.ping(); err != nil {
acc.AddError(err)
}
statementsPrepared = s.prepareStatements()
}
// no need to proceed if statements are not prepared
if !statementsPrepared {
return nil
}
neelayu marked this conversation as resolved.
Show resolved Hide resolved

var wg sync.WaitGroup
tstart := time.Now()
for _, query := range s.Queries {
Expand Down