Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp column support to postgresql_extensible #8602

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/inputs/postgresql_extensible/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ The example below has two queries are specified, with the following parameters:
# defined tags. The values in these columns must be of a string-type,
# a number-type or a blob-type.
#
# The timestamp field is used to override the data points timestamp value. By
# default, all rows inserted with current time. By setting a timestamp column,
# the row will be inserted with that column's value.
#
# Structure :
# [[inputs.postgresql_extensible.query]]
# sqlquery string
# version string
# withdbname boolean
# tagvalue string (coma separated)
# timestamp string
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database where datname"
version=901
Expand Down
30 changes: 29 additions & 1 deletion plugins/inputs/postgresql_extensible/postgresql_extensible.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"strings"
"time"

_ "github.com/jackc/pgx/stdlib"

Expand All @@ -19,6 +20,7 @@ type Postgresql struct {
postgresql.Service
Databases []string
AdditionalTags []string
Timestamp string
Query query
Debug bool

Expand All @@ -32,6 +34,7 @@ type query []struct {
Withdbname bool
Tagvalue string
Measurement string
Timestamp string
}

var ignoredColumns = map[string]bool{"stats_reset": true}
Expand Down Expand Up @@ -82,13 +85,23 @@ var sampleConfig = `
## The script option can be used to specify the .sql file path.
## If script and sqlquery options specified at same time, sqlquery will be used
##
## the tagvalue field is used to define custom tags (separated by comas).
## the query is expected to return columns which match the names of the
## defined tags. The values in these columns must be of a string-type,
## a number-type or a blob-type.
##
## The timestamp field is used to override the data points timestamp value. By
## default, all rows inserted with current time. By setting a timestamp column,
## the row will be inserted with that column's value.
##
## Structure :
## [[inputs.postgresql_extensible.query]]
## sqlquery string
## version string
## withdbname boolean
## tagvalue string (comma separated)
## measurement string
## timestamp string
[[inputs.postgresql_extensible.query]]
sqlquery="SELECT * FROM pg_stat_database"
version=901
Expand Down Expand Up @@ -150,6 +163,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
query string
tag_value string
meas_name string
timestamp string
columns []string
)

Expand All @@ -164,6 +178,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
for i := range p.Query {
sql_query = p.Query[i].Sqlquery
tag_value = p.Query[i].Tagvalue
timestamp = p.Query[i].Timestamp

if p.Query[i].Measurement != "" {
meas_name = p.Query[i].Measurement
Expand Down Expand Up @@ -206,6 +221,8 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error {
}
}

p.Timestamp = timestamp

for rows.Next() {
err = p.accRow(meas_name, rows, acc, columns)
if err != nil {
Expand All @@ -228,6 +245,7 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
columnVars []interface{}
dbname bytes.Buffer
tagAddress string
timestamp time.Time
)

// this is where we'll store the column name with its *interface{}
Expand Down Expand Up @@ -269,6 +287,9 @@ func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumula
"db": dbname.String(),
}

// set default timestamp to Now
timestamp = time.Now()

fields := make(map[string]interface{})
COLUMN:
for col, val := range columnMap {
Expand All @@ -278,6 +299,13 @@ COLUMN:
continue
}

if col == p.Timestamp {
if v, ok := (*val).(time.Time); ok {
timestamp = v
}
Comment on lines +303 to +305
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to consider string fields that need to be parsed as timestamps, but this could be handled later.

continue
}

for _, tag := range p.AdditionalTags {
if col != tag {
continue
Expand All @@ -301,7 +329,7 @@ COLUMN:
fields[col] = *val
}
}
acc.AddFields(meas_name, fields, tags)
acc.AddFields(meas_name, fields, tags, timestamp)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/influxdata/telegraf/plugins/inputs/postgresql"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -126,6 +127,13 @@ func TestPostgresqlQueryOutputTests(t *testing.T) {
assert.True(t, found)
assert.Equal(t, true, v)
},
"SELECT timestamp'1980-07-23' as ts, true AS myvalue": func(acc *testutil.Accumulator) {
expectedTime := time.Date(1980, 7, 23, 0, 0, 0, 0, time.UTC)
v, found := acc.BoolField(measurement, "myvalue")
assert.True(t, found)
assert.Equal(t, true, v)
assert.True(t, acc.HasTimestamp(measurement, expectedTime))
},
}

for q, assertions := range examples {
Expand All @@ -134,6 +142,7 @@ func TestPostgresqlQueryOutputTests(t *testing.T) {
Version: 901,
Withdbname: false,
Tagvalue: "",
Timestamp: "ts",
}})
assertions(acc)
}
Expand Down