Skip to content

Commit

Permalink
fix(inputs.postgresql_extensible): Use same timestamp for each gather
Browse files Browse the repository at this point in the history
Rather than using a different timestamp for each query, this uses a
single timestamp determined at the start of gather, for any and all
metrics generated during that gather.

While the timestamp might be off from when the actual queries are run,
it combines all metrics around a single timestamp to better understand
when data was captured.

fixes: #9737
  • Loading branch information
powersj committed May 24, 2024
1 parent aa45c7c commit f5a1223
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
15 changes: 8 additions & 7 deletions plugins/inputs/postgresql_extensible/postgresql_extensible.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,21 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
dbVersion = 0
}

// set default timestamp to Now and use for all generated metrics during
// the same Gather call
timestamp := time.Now()

// We loop in order to process each query
// Query is not run if Database version does not match the query version.
for _, q := range p.Query {
if q.MinVersion <= dbVersion && (q.MaxVersion == 0 || q.MaxVersion > dbVersion) {
acc.AddError(p.gatherMetricsFromQuery(acc, q))
acc.AddError(p.gatherMetricsFromQuery(acc, q, timestamp))
}
}
return nil
}

func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query) error {
func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query, timestamp time.Time) error {
rows, err := p.service.DB.Query(q.Sqlquery)
if err != nil {
return err
Expand All @@ -139,7 +143,7 @@ func (p *Postgresql) gatherMetricsFromQuery(acc telegraf.Accumulator, q query) e
}

for rows.Next() {
if err := p.accRow(acc, rows, columns, q); err != nil {
if err := p.accRow(acc, rows, columns, q, timestamp); err != nil {
return err
}
}
Expand All @@ -150,7 +154,7 @@ type scanner interface {
Scan(dest ...interface{}) error
}

func (p *Postgresql) accRow(acc telegraf.Accumulator, row scanner, columns []string, q query) error {
func (p *Postgresql) accRow(acc telegraf.Accumulator, row scanner, columns []string, q query, timestamp time.Time) error {
// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})

Expand Down Expand Up @@ -188,9 +192,6 @@ func (p *Postgresql) accRow(acc telegraf.Accumulator, row scanner, columns []str
"db": dbname.String(),
}

// set default timestamp to Now
timestamp := time.Now()

fields := make(map[string]interface{})
for col, val := range columnMap {
p.Log.Debugf("Column: %s = %T: %v\n", col, *val, *val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func TestAccRow(t *testing.T) {
}
for _, tt := range tests {
q := query{Measurement: "pgTEST", additionalTags: make(map[string]bool)}
require.NoError(t, p.accRow(&acc, tt.fields, columns, q))
require.NoError(t, p.accRow(&acc, tt.fields, columns, q, time.Now()))
require.Len(t, acc.Metrics, 1)
metric := acc.Metrics[0]
require.Equal(t, tt.dbName, metric.Tags["db"])
Expand Down

0 comments on commit f5a1223

Please sign in to comment.