Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pgbouncer input plugin #3918

Merged
merged 3 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ configuration options.
* [openldap](./plugins/inputs/openldap)
* [opensmtpd](./plugins/inputs/opensmtpd)
* [pf](./plugins/inputs/pf)
* [pgbouncer](./plugins/inputs/pgbouncer)
* [phpfpm](./plugins/inputs/phpfpm)
* [phusion passenger](./plugins/inputs/passenger)
* [ping](./plugins/inputs/ping)
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ services:
image: memcached
ports:
- "11211:11211"
pgbouncer:
image: mbed/pgbouncer
environment:
PG_ENV_POSTGRESQL_USER: pgbouncer
PG_ENV_POSTGRESQL_PASS: pgbouncer
ports:
- "6432:6432"
postgres:
image: postgres:alpine
ports:
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/opensmtpd"
_ "github.com/influxdata/telegraf/plugins/inputs/passenger"
_ "github.com/influxdata/telegraf/plugins/inputs/pf"
_ "github.com/influxdata/telegraf/plugins/inputs/pgbouncer"
_ "github.com/influxdata/telegraf/plugins/inputs/phpfpm"
_ "github.com/influxdata/telegraf/plugins/inputs/ping"
_ "github.com/influxdata/telegraf/plugins/inputs/postfix"
Expand Down
21 changes: 21 additions & 0 deletions plugins/inputs/pgbouncer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# PgBouncer plugin

This PgBouncer plugin provides metrics for your PgBouncer load balancer.

More information about the meaning of these metrics can be found in the [PgBouncer Documentation](https://pgbouncer.github.io/usage.html)

## Configuration
Specify address via a url matching:

`postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]`

All connection parameters are optional.

Without the dbname parameter, the driver will default to a database with the same name as the user.
This dbname is just for instantiating a connection with the server and doesn't restrict the databases we are trying to grab metrics for.

### Configuration example
```
[[inputs.pgbouncer]]
address = "postgres://telegraf@localhost/pgbouncer"
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add details about the measurement/tags/fields you are collecting, check the EXAMPLE_README.md

181 changes: 181 additions & 0 deletions plugins/inputs/pgbouncer/pgbouncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package pgbouncer

import (
"bytes"
"github.com/influxdata/telegraf/plugins/inputs/postgresql"

// register in driver.
_ "github.com/jackc/pgx/stdlib"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

type PgBouncer struct {
postgresql.Service
}

var ignoredColumns = map[string]bool{"user": true, "database": true, "pool_mode": true,
"avg_req": true, "avg_recv": true, "avg_sent": true, "avg_query": true,
}

var sampleConfig = `
## specify address via a url matching:
## postgres://[pqgotest[:password]]@localhost[/dbname]\
## ?sslmode=[disable|verify-ca|verify-full]
## or a simple string:
## host=localhost user=pqotest password=... sslmode=... dbname=app_production
##
## All connection parameters are optional.
##
address = "host=localhost user=pgbouncer sslmode=disable"
`

func (p *PgBouncer) SampleConfig() string {
return sampleConfig
}

func (p *PgBouncer) Description() string {
return "Read metrics from one or many pgbouncer servers"
}

func (p *PgBouncer) Gather(acc telegraf.Accumulator) error {
var (
err error
query string
columns []string
)

query = `SHOW STATS`

rows, err := p.DB.Query(query)
if err != nil {
return err
}

defer rows.Close()

// grab the column information from the result
if columns, err = rows.Columns(); err != nil {
return err
}

for rows.Next() {
tags, columnMap, err := p.accRow(rows, acc, columns)

if err != nil {
return err
}

fields := make(map[string]interface{})
for col, val := range columnMap {
_, ignore := ignoredColumns[col]
if !ignore {
fields[col] = *val
}
}
acc.AddFields("pgbouncer", fields, tags)
}

query = `SHOW POOLS`

poolRows, err := p.DB.Query(query)
if err != nil {
return err
}

defer poolRows.Close()

// grab the column information from the result
if columns, err = poolRows.Columns(); err != nil {
return err
}

for poolRows.Next() {
tags, columnMap, err := p.accRow(poolRows, acc, columns)
if err != nil {
return err
}

switch (*columnMap["user"]).(type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this switch, maybe do:

    if s, ok := (*columnMap["pool_mode"]).(string); ok && s != "" {
        tags["user"] = s
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right

case string:
tags["user"] = (*columnMap["user"]).(string)
}

switch (*columnMap["pool_mode"]).(type) {
case string:
tags["pool_mode"] = (*columnMap["pool_mode"]).(string)
}

fields := make(map[string]interface{})
for col, val := range columnMap {
_, ignore := ignoredColumns[col]
if !ignore {
fields[col] = *val
}
}
acc.AddFields("pgbouncer_pools", fields, tags)
}

return poolRows.Err()
}

type scanner interface {
Scan(dest ...interface{}) error
}

func (p *PgBouncer) accRow(row scanner, acc telegraf.Accumulator, columns []string) (map[string]string,
map[string]*interface{}, error) {
var columnVars []interface{}
var dbname bytes.Buffer

// this is where we'll store the column name with its *interface{}
columnMap := make(map[string]*interface{})

for _, column := range columns {
columnMap[column] = new(interface{})
}

// populate the array of interface{} with the pointers in the right order
for i := 0; i < len(columnMap); i++ {
columnVars = append(columnVars, columnMap[columns[i]])
}

// deconstruct array of variables and send to Scan
err := row.Scan(columnVars...)

if err != nil {
return nil, nil, err
}
if columnMap["database"] != nil {
// extract the database name from the column map
dbname.WriteString((*columnMap["database"]).(string))
} else {
dbname.WriteString("postgres")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is too general purpose of a way to parse the stats. There are other columns we would probably want as tags such as pool_mode and user and I think it may be difficult to use a single function to parse all commands.

When new stats are added we will want to have a chance to decide if they are fields or tags, so I think they should be included manually, this does require a bit more upkeep but once a string is added as a field we can't switch it easily.

Unfortunately, I don't think we can specify the exact columns we are interested in, it would really nice to just be able to say select database,total_requests,... from stats;

I think we should return a map[string]interface{} from this function and have a function for each query to assign tags and call AddFields. For SHOW STATS the only tag would be database, and for SHOW POOLS you would have database, user, and pool_mode as tags.

Optionally, consider not even reporting avg_req | avg_recv | avg_sent | avg_query since these can all be calculated using the difference in totals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fixed


var tagAddress string
tagAddress, err = p.SanitizedAddress()
if err != nil {
return nil, nil, err
}

// Return basic tags and the mapped columns
return map[string]string{"server": tagAddress, "db": dbname.String()}, columnMap, nil
}

func init() {
inputs.Add("pgbouncer", func() telegraf.Input {
return &PgBouncer{
Service: postgresql.Service{
MaxIdle: 1,
MaxOpen: 1,
MaxLifetime: internal.Duration{
Duration: 0,
},
IsPgBouncer: true,
},
}
})
}
66 changes: 66 additions & 0 deletions plugins/inputs/pgbouncer/pgbouncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package pgbouncer

import (
"fmt"
"github.com/influxdata/telegraf/plugins/inputs/postgresql"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)

func TestPgBouncerGeneratesMetrics(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

p := &PgBouncer{
Service: postgresql.Service{
Address: fmt.Sprintf(
"host=%s user=pgbouncer password=pgbouncer dbname=pgbouncer port=6432 sslmode=disable",
testutil.GetLocalHost(),
),
IsPgBouncer: true,
},
}

var acc testutil.Accumulator
require.NoError(t, p.Start(&acc))
require.NoError(t, p.Gather(&acc))

intMetrics := []string{
"total_requests",
"total_received",
"total_sent",
"total_query_time",
"avg_req",
"avg_recv",
"avg_sent",
"avg_query",
"cl_active",
"cl_waiting",
"sv_active",
"sv_idle",
"sv_used",
"sv_tested",
"sv_login",
"maxwait",
}

int32Metrics := []string{}

metricsCounted := 0

for _, metric := range intMetrics {
assert.True(t, acc.HasInt64Field("pgbouncer", metric))
metricsCounted++
}

for _, metric := range int32Metrics {
assert.True(t, acc.HasInt32Field("pgbouncer", metric))
metricsCounted++
}

assert.True(t, metricsCounted > 0)
assert.Equal(t, len(intMetrics)+len(int32Metrics), metricsCounted)
}
1 change: 1 addition & 0 deletions plugins/inputs/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func init() {
MaxLifetime: internal.Duration{
Duration: 0,
},
IsPgBouncer: false,
},
}
})
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/postgresql/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) {
"host=%s user=postgres sslmode=disable",
testutil.GetLocalHost(),
),
IsPgBouncer: false,
},
Databases: []string{"postgres"},
}
Expand Down
33 changes: 32 additions & 1 deletion plugins/inputs/postgresql/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package postgresql
import (
"database/sql"
"fmt"
"github.com/jackc/pgx"
"github.com/jackc/pgx/pgtype"
"github.com/jackc/pgx/stdlib"
"net"
"net/url"
"regexp"
Expand Down Expand Up @@ -90,6 +93,7 @@ type Service struct {
MaxOpen int
MaxLifetime internal.Duration
DB *sql.DB
IsPgBouncer bool
}

// Start starts the ServiceInput's service, whatever that may be
Expand All @@ -100,7 +104,34 @@ func (p *Service) Start(telegraf.Accumulator) (err error) {
p.Address = localhost
}

if p.DB, err = sql.Open("pgx", p.Address); err != nil {
connectionString := p.Address

// Specific support to make it work with PgBouncer too
// See https://github.com/influxdata/telegraf/issues/3253#issuecomment-357505343
if p.IsPgBouncer {
d := &stdlib.DriverConfig{
ConnConfig: pgx.ConnConfig{
PreferSimpleProtocol: true,
RuntimeParams: map[string]string{
"client_encoding": "UTF8",
},
CustomConnInfo: func(c *pgx.Conn) (*pgtype.ConnInfo, error) {
info := c.ConnInfo.DeepCopy()
info.RegisterDataType(pgtype.DataType{
Value: &pgtype.OIDValue{},
Name: "int8OID",
OID: pgtype.Int8OID,
})

return info, nil
},
},
}
stdlib.RegisterDriverConfig(d)
connectionString = d.ConnectionString(p.Address)
}

if p.DB, err = sql.Open("pgx", connectionString); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ func init() {
MaxLifetime: internal.Duration{
Duration: 0,
},
IsPgBouncer: false,
},
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func queryRunner(t *testing.T, q query) *testutil.Accumulator {
"host=%s user=postgres sslmode=disable",
testutil.GetLocalHost(),
),
IsPgBouncer: false,
},
Databases: []string{"postgres"},
Query: q,
Expand Down