Skip to content

Commit

Permalink
feat(outputs.postgresql): Allow configuration of startup error handli…
Browse files Browse the repository at this point in the history
…ng (#15073)
  • Loading branch information
srebhan authored Mar 29, 2024
1 parent 4613c81 commit 5066980
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 32 deletions.
14 changes: 14 additions & 0 deletions plugins/outputs/postgresql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->

In addition to the plugin-specific and global configuration settings the plugin
supports options for specifying the behavior when experiencing startup errors
using the `startup_error_behavior` setting. Available values are:

- `error`: Telegraf with stop and exit in case of startup errors. This is the
default behavior.
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
but continues processing for all other plugins.
- `retry`: Telegraf will try to startup the plugin in every gather or write
cycle in case of startup errors. The plugin is disabled until
the startup succeeds.

## Secret-store support

This plugin supports secrets from secret-stores for the `connection` option.
Expand Down
72 changes: 40 additions & 32 deletions plugins/outputs/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate"
Expand Down Expand Up @@ -72,33 +73,8 @@ type Postgresql struct {
tagsJSONColumn utils.Column
}

func init() {
outputs.Add("postgresql", func() telegraf.Output { return newPostgresql() })
}

func newPostgresql() *Postgresql {
p := &Postgresql{
Schema: "public",
TagTableSuffix: "_tag",
TagCacheSize: 100000,
Uint64Type: PgNumeric,
CreateTemplates: []*sqltemplate.Template{{}},
AddColumnTemplates: []*sqltemplate.Template{{}},
TagTableCreateTemplates: []*sqltemplate.Template{{}},
TagTableAddColumnTemplates: []*sqltemplate.Template{{}},
RetryMaxBackoff: config.Duration(time.Second * 15),
Logger: models.NewLogger("outputs", "postgresql", ""),
LogLevel: "warn",
}

_ = p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`))
_ = p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`))
_ = p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`))
_ = p.TagTableAddColumnTemplates[0].UnmarshalText(
[]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`),
)

return p
func (p *Postgresql) SampleConfig() string {
return sampleConfig
}

func (p *Postgresql) Init() error {
Expand Down Expand Up @@ -169,17 +145,18 @@ func (p *Postgresql) Init() error {
return nil
}

func (p *Postgresql) SampleConfig() string { return sampleConfig }

// Connect establishes a connection to the target database and prepares the cache
func (p *Postgresql) Connect() error {
// Yes, we're not supposed to store the context. However since we don't receive a context, we have to.
p.dbContext, p.dbContextCancel = context.WithCancel(context.Background())
var err error
p.db, err = pgxpool.ConnectConfig(p.dbContext, p.dbConfig)
if err != nil {
p.Logger.Errorf("Couldn't connect to server\n%v", err)
return err
p.dbContextCancel()
return &internal.StartupError{
Err: err,
Retry: true,
}
}
p.tableManager = NewTableManager(p)

Expand Down Expand Up @@ -234,7 +211,9 @@ func (p *Postgresql) Close() error {

// Die!
p.dbContextCancel()
p.db.Close()
if p.db != nil {
p.db.Close()
}
p.tableManager = nil
return nil
}
Expand Down Expand Up @@ -493,3 +472,32 @@ func (p *Postgresql) writeTagTable(ctx context.Context, db dbh, tableSource *Tab
ttsrc.UpdateCache()
return nil
}

func newPostgresql() *Postgresql {
p := &Postgresql{
Schema: "public",
TagTableSuffix: "_tag",
TagCacheSize: 100000,
Uint64Type: PgNumeric,
CreateTemplates: []*sqltemplate.Template{{}},
AddColumnTemplates: []*sqltemplate.Template{{}},
TagTableCreateTemplates: []*sqltemplate.Template{{}},
TagTableAddColumnTemplates: []*sqltemplate.Template{{}},
RetryMaxBackoff: config.Duration(time.Second * 15),
Logger: models.NewLogger("outputs", "postgresql", ""),
LogLevel: "warn",
}

_ = p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`))
_ = p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`))
_ = p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`))
_ = p.TagTableAddColumnTemplates[0].UnmarshalText(
[]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`),
)

return p
}

func init() {
outputs.Add("postgresql", func() telegraf.Output { return newPostgresql() })
}
76 changes: 76 additions & 0 deletions plugins/outputs/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
"github.com/influxdata/telegraf/testutil"
)
Expand Down Expand Up @@ -272,6 +274,80 @@ func TestPostgresqlConnectIntegration(t *testing.T) {
require.EqualValues(t, 2, p.db.Stat().MaxConns())
}

func TestConnectionIssueAtStartup(t *testing.T) {
// Test case for https://github.com/influxdata/telegraf/issues/14365
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

servicePort := "5432"
username := "postgres"
password := "postgres"
testDatabaseName := "telegraf_test"

container := testutil.Container{
Image: "postgres:alpine",
ExposedPorts: []string{servicePort},
Env: map[string]string{
"POSTGRES_USER": username,
"POSTGRES_PASSWORD": password,
"POSTGRES_DB": "telegraf_test",
},
WaitingFor: wait.ForAll(
// the database comes up twice, once right away, then again a second
// time after the docker entrypoint starts configuration
wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
wait.ForListeningPort(nat.Port(servicePort)),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()

// Pause the container for connectivity issues
require.NoError(t, container.Pause())

// Create a model to be able to use the startup retry strategy
dsn := config.NewSecret([]byte(fmt.Sprintf(
"host=%s port=%s user=%s password=%s dbname=%s connect_timeout=1",
container.Address,
container.Ports[servicePort],
username,
password,
testDatabaseName,
)))
defer dsn.Destroy()
plugin := newPostgresql()
plugin.Connection = dsn
plugin.Logger = NewLogAccumulator(t)
plugin.LogLevel = "debug"
model := models.NewRunningOutput(
plugin,
&models.OutputConfig{
Name: "postgres",
StartupErrorBehavior: "retry",
},
1000, 1000,
)
require.NoError(t, model.Init())

// The connect call should succeed even though the table creation was not
// successful due to the "retry" strategy
require.NoError(t, model.Connect())

// Writing the metrics in this state should fail because we are not fully
// started up
metrics := testutil.MockMetrics()
for _, m := range metrics {
model.AddMetric(m)
}
require.ErrorIs(t, model.WriteBatch(), internal.ErrNotConnected)

// Unpause the container, now writes should succeed
require.NoError(t, container.Resume())
require.NoError(t, model.WriteBatch())
model.Close()
}

func newMetric(
t *testing.T,
suffix string,
Expand Down

0 comments on commit 5066980

Please sign in to comment.