diff --git a/plugins/outputs/postgresql/README.md b/plugins/outputs/postgresql/README.md index 37df284d129f9..5481b3d5c2ad2 100644 --- a/plugins/outputs/postgresql/README.md +++ b/plugins/outputs/postgresql/README.md @@ -12,6 +12,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. [CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins +## Startup error behavior options + +In addition to the plugin-specific and global configuration settings the plugin +supports options for specifying the behavior when experiencing startup errors +using the `startup_error_behavior` setting. Available values are: + +- `error`: Telegraf with stop and exit in case of startup errors. This is the + default behavior. +- `ignore`: Telegraf will ignore startup errors for this plugin and disables it + but continues processing for all other plugins. +- `retry`: Telegraf will try to startup the plugin in every gather or write + cycle in case of startup errors. The plugin is disabled until + the startup succeeds. + ## Secret-store support This plugin supports secrets from secret-stores for the `connection` option. diff --git a/plugins/outputs/postgresql/postgresql.go b/plugins/outputs/postgresql/postgresql.go index 641e65f98b8f1..b98de99b89580 100644 --- a/plugins/outputs/postgresql/postgresql.go +++ b/plugins/outputs/postgresql/postgresql.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate" @@ -72,33 +73,8 @@ type Postgresql struct { tagsJSONColumn utils.Column } -func init() { - outputs.Add("postgresql", func() telegraf.Output { return newPostgresql() }) -} - -func newPostgresql() *Postgresql { - p := &Postgresql{ - Schema: "public", - TagTableSuffix: "_tag", - TagCacheSize: 100000, - Uint64Type: PgNumeric, - CreateTemplates: []*sqltemplate.Template{{}}, - AddColumnTemplates: []*sqltemplate.Template{{}}, - TagTableCreateTemplates: []*sqltemplate.Template{{}}, - TagTableAddColumnTemplates: []*sqltemplate.Template{{}}, - RetryMaxBackoff: config.Duration(time.Second * 15), - Logger: models.NewLogger("outputs", "postgresql", ""), - LogLevel: "warn", - } - - _ = p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`)) - _ = p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`)) - _ = p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`)) - _ = p.TagTableAddColumnTemplates[0].UnmarshalText( - []byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`), - ) - - return p +func (p *Postgresql) SampleConfig() string { + return sampleConfig } func (p *Postgresql) Init() error { @@ -169,8 +145,6 @@ func (p *Postgresql) Init() error { return nil } -func (p *Postgresql) SampleConfig() string { return sampleConfig } - // Connect establishes a connection to the target database and prepares the cache func (p *Postgresql) Connect() error { // Yes, we're not supposed to store the context. However since we don't receive a context, we have to. @@ -178,8 +152,11 @@ func (p *Postgresql) Connect() error { var err error p.db, err = pgxpool.ConnectConfig(p.dbContext, p.dbConfig) if err != nil { - p.Logger.Errorf("Couldn't connect to server\n%v", err) - return err + p.dbContextCancel() + return &internal.StartupError{ + Err: err, + Retry: true, + } } p.tableManager = NewTableManager(p) @@ -234,7 +211,9 @@ func (p *Postgresql) Close() error { // Die! p.dbContextCancel() - p.db.Close() + if p.db != nil { + p.db.Close() + } p.tableManager = nil return nil } @@ -493,3 +472,32 @@ func (p *Postgresql) writeTagTable(ctx context.Context, db dbh, tableSource *Tab ttsrc.UpdateCache() return nil } + +func newPostgresql() *Postgresql { + p := &Postgresql{ + Schema: "public", + TagTableSuffix: "_tag", + TagCacheSize: 100000, + Uint64Type: PgNumeric, + CreateTemplates: []*sqltemplate.Template{{}}, + AddColumnTemplates: []*sqltemplate.Template{{}}, + TagTableCreateTemplates: []*sqltemplate.Template{{}}, + TagTableAddColumnTemplates: []*sqltemplate.Template{{}}, + RetryMaxBackoff: config.Duration(time.Second * 15), + Logger: models.NewLogger("outputs", "postgresql", ""), + LogLevel: "warn", + } + + _ = p.CreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }})`)) + _ = p.AddColumnTemplates[0].UnmarshalText([]byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`)) + _ = p.TagTableCreateTemplates[0].UnmarshalText([]byte(`CREATE TABLE {{ .table }} ({{ .columns }}, PRIMARY KEY (tag_id))`)) + _ = p.TagTableAddColumnTemplates[0].UnmarshalText( + []byte(`ALTER TABLE {{ .table }} ADD COLUMN IF NOT EXISTS {{ .columns|join ", ADD COLUMN IF NOT EXISTS " }}`), + ) + + return p +} + +func init() { + outputs.Add("postgresql", func() telegraf.Output { return newPostgresql() }) +} diff --git a/plugins/outputs/postgresql/postgresql_test.go b/plugins/outputs/postgresql/postgresql_test.go index 0de2029f32894..d8e26a1f46406 100644 --- a/plugins/outputs/postgresql/postgresql_test.go +++ b/plugins/outputs/postgresql/postgresql_test.go @@ -18,7 +18,9 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/outputs/postgresql/utils" "github.com/influxdata/telegraf/testutil" ) @@ -272,6 +274,80 @@ func TestPostgresqlConnectIntegration(t *testing.T) { require.EqualValues(t, 2, p.db.Stat().MaxConns()) } +func TestConnectionIssueAtStartup(t *testing.T) { + // Test case for https://github.com/influxdata/telegraf/issues/14365 + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + servicePort := "5432" + username := "postgres" + password := "postgres" + testDatabaseName := "telegraf_test" + + container := testutil.Container{ + Image: "postgres:alpine", + ExposedPorts: []string{servicePort}, + Env: map[string]string{ + "POSTGRES_USER": username, + "POSTGRES_PASSWORD": password, + "POSTGRES_DB": "telegraf_test", + }, + WaitingFor: wait.ForAll( + // the database comes up twice, once right away, then again a second + // time after the docker entrypoint starts configuration + wait.ForLog("database system is ready to accept connections").WithOccurrence(2), + wait.ForListeningPort(nat.Port(servicePort)), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Pause the container for connectivity issues + require.NoError(t, container.Pause()) + + // Create a model to be able to use the startup retry strategy + dsn := config.NewSecret([]byte(fmt.Sprintf( + "host=%s port=%s user=%s password=%s dbname=%s connect_timeout=1", + container.Address, + container.Ports[servicePort], + username, + password, + testDatabaseName, + ))) + defer dsn.Destroy() + plugin := newPostgresql() + plugin.Connection = dsn + plugin.Logger = NewLogAccumulator(t) + plugin.LogLevel = "debug" + model := models.NewRunningOutput( + plugin, + &models.OutputConfig{ + Name: "postgres", + StartupErrorBehavior: "retry", + }, + 1000, 1000, + ) + require.NoError(t, model.Init()) + + // The connect call should succeed even though the table creation was not + // successful due to the "retry" strategy + require.NoError(t, model.Connect()) + + // Writing the metrics in this state should fail because we are not fully + // started up + metrics := testutil.MockMetrics() + for _, m := range metrics { + model.AddMetric(m) + } + require.ErrorIs(t, model.WriteBatch(), internal.ErrNotConnected) + + // Unpause the container, now writes should succeed + require.NoError(t, container.Resume()) + require.NoError(t, model.WriteBatch()) + model.Close() +} + func newMetric( t *testing.T, suffix string,