Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand metrics on pg_stat_replication to include lag expressed as time. #115

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Version 0.10.0 / 2020-02-23
## Version 0.10.0 / 2022-02-23

[full changelog](https://github.com/rnaveiras/postgres_exporter/compare/v0.9.0...v0.10.0)

Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ Prometheus exporter for PostgreSQL server metrics.
| postgres_stat_database_xact_commit_total | Number of transactions in this database that have been committed | datname |
| postgres_stat_database_xact_rollback_total | Number of transactions in this database that have been rolled back | datname |
| postgres_stat_replication_lag_bytes | Replication Lag in bytes | application_name, client_addr, state, sync_state |
| postgres_stat_replication_flush_lag_seconds | Elapsed time during committed WALs from primary to the standby (WAL's has already been flushed but not yet applied). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state |
| postgres_stat_replication_replay_lag_seconds | Elapsed time during committed WALs from primary to the standby (fully committed in standby node). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state |
| postgres_stat_replication_write_lag_seconds | Elapsed time during committed WALs from primary to the standby (but not yet committed in the standby). Reported from the primary node. *Only available on Posgres versions > 9x*. | application_name, client_addr, state, sync_state |
| postgres_stat_vacuum_progress_heap_blks_scanned | Number of heap blocks scanned | pid, query_start, schemaname, datname, relname |
| postgres_stat_vacuum_progress_heap_blks_total | Total number of heap blocks in the table | pid, query_start, schemaname, datname, relname |
| postgres_stat_vacuum_progress_heap_blks_vacuumed | Number of heap blocks vacuumed | pid, query_start, schemaname, datname, relname |
Expand All @@ -75,8 +78,11 @@ Prometheus exporter for PostgreSQL server metrics.
| postgres_stat_user_indexes_scan_total | Number of times this index has been scanned | datname, schemaname, tablename, indexname |
| postgres_stat_user_indexes_tuple_read_total | Number of times tuples have been returned from scanning this index | datname, schemaname, tablename, indexname |
| postgres_stat_user_indexes_tuple_fetch_total | Number of live tuples fetched by scans on this index | datname, schemaname, tablename, indexname |
| postgres_wal_receiver_replay_lag_seconds | Replication lag measured in seconds on the standby. Measured as `EXTRACT (EPOCH FROM now()) - pg_last_xact_replay_timestamp()` | status |
| postgres_wal_receiver_replay_lag_bytes | Replication lag measured in bytes on the standby. Measured as `pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())::float` | status |
| postgres_up | Whether the Postgres server is up | |


### Run

#### Passing in a libpq connection string
Expand Down
1 change: 1 addition & 0 deletions collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func NewExporter(ctx context.Context, logger kitlog.Logger, connConfig *pgx.Conn
NewStatBgwriterScraper(),
NewStatDatabaseScraper(),
NewStatReplicationScraper(),
NewWalReceiverScraper(),
},
datnameScrapers: []Scraper{
NewStatVacuumProgressScraper(),
Expand Down
86 changes: 60 additions & 26 deletions collector/stat_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,7 @@ running the backup. In both connections (state=backup and state=streaming) the
pg_log_location_diff is null and it requires to be excluded */
const (
// Scrape query
statReplicationLagBytes9x = `
WITH pg_replication AS (
SELECT application_name
, client_addr
, state
, sync_state
, ( CASE when pg_is_in_recovery()
THEN pg_xlog_location_diff(pg_last_xlog_receive_location(), replay_location)::float
ELSE pg_xlog_location_diff(pg_current_xlog_location(), replay_location)::float
END
) AS pg_xlog_location_diff
FROM pg_stat_replication
)
SELECT * FROM pg_replication WHERE pg_xlog_location_diff IS NOT NULL /*postgres_exporter*/`

statReplicationLagBytes = `
statReplicationLag = `
WITH pg_replication AS (
SELECT application_name
, client_addr
Expand All @@ -40,13 +25,19 @@ WITH pg_replication AS (
ELSE pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn)::float
END
) AS pg_xlog_location_diff
, COALESCE(EXTRACT (EPOCH FROM write_lag), 0) as write_lag_seconds
, COALESCE(EXTRACT (EPOCH FROM flush_lag), 0) as flush_lag_seconds
, COALESCE(EXTRACT (EPOCH FROM replay_lag), 0) as replay_lag_seconds
FROM pg_stat_replication
)
SELECT * FROM pg_replication WHERE pg_xlog_location_diff IS NOT NULL /*postgres_exporter*/`
)

type statReplicationScraper struct {
lagBytes *prometheus.Desc
lagBytes *prometheus.Desc
writeLag *prometheus.Desc
flushLag *prometheus.Desc
replayLag *prometheus.Desc
}

// NewStatReplicationScraper returns a new Scraper exposing postgres pg_stat_replication
Expand All @@ -58,6 +49,24 @@ func NewStatReplicationScraper() Scraper {
[]string{"application_name", "client_addr", "state", "sync_state"},
nil,
),
writeLag: prometheus.NewDesc(
"postgres_stat_replication_write_lag_seconds",
"write_lag as reported by the pg_stat_replication view converted to seconds",
[]string{"application_name", "client_addr", "state", "sync_state"},
nil,
),
flushLag: prometheus.NewDesc(
"postgres_stat_replication_flush_lag_seconds",
"flush_lag as reported by the pg_stat_replication view converted to seconds",
[]string{"application_name", "client_addr", "state", "sync_state"},
nil,
),
replayLag: prometheus.NewDesc(
"postgres_stat_replication_replay_lag_seconds",
"replay_lag as reported by the pg_stat_replication view converted to seconds",
[]string{"application_name", "client_addr", "state", "sync_state"},
nil,
),
}
}

Expand All @@ -69,11 +78,7 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver
var rows pgx.Rows
var err error

if version.Gte(10) {
rows, err = conn.Query(ctx, statReplicationLagBytes)
} else {
rows, err = conn.Query(ctx, statReplicationLagBytes9x)
}
rows, err = conn.Query(ctx, statReplicationLag)

if err != nil {
return err
Expand All @@ -82,18 +87,21 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver

var applicationName, state, syncState string
var clientAddr net.IP
var pgXlogLocationDiff float64
var pgXlogLocationDiff, writeLagSeconds, flushLagSeconds, replayLagSeconds float64

for rows.Next() {

if err := rows.Scan(&applicationName,
&clientAddr,
&state,
&syncState,
&pgXlogLocationDiff); err != nil {
&pgXlogLocationDiff,
&writeLagSeconds,
&flushLagSeconds,
&replayLagSeconds); err != nil {

return err
}

// postgres_stat_replication_lag_bytes
ch <- prometheus.MustNewConstMetric(c.lagBytes,
prometheus.GaugeValue,
Expand All @@ -102,8 +110,34 @@ func (c *statReplicationScraper) Scrape(ctx context.Context, conn *pgx.Conn, ver
clientAddr.String(),
state,
syncState)
}

// postgres_stat_replication_write_lag_seconds
ch <- prometheus.MustNewConstMetric(c.writeLag,
prometheus.GaugeValue,
writeLagSeconds,
applicationName,
clientAddr.String(),
state,
syncState)

// postgres_stat_replication_flush_lag_seconds
ch <- prometheus.MustNewConstMetric(c.flushLag,
prometheus.GaugeValue,
flushLagSeconds,
applicationName,
clientAddr.String(),
state,
syncState)

// postgres_stat_replication_replay_lag_seconds
ch <- prometheus.MustNewConstMetric(c.replayLag,
prometheus.GaugeValue,
replayLagSeconds,
applicationName,
clientAddr.String(),
state,
syncState)
}
err = rows.Err()
if err != nil {
return err
Expand Down
99 changes: 99 additions & 0 deletions collector/stat_wal_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package collector

import (
"context"

pgx "github.com/jackc/pgx/v4"
"github.com/prometheus/client_golang/prometheus"
)

/* When pg_basebackup is running in stream mode, it opens a second connection
to the server and starts streaming the transaction log in parallel while
running the backup. In both connections (state=backup and state=streaming) the
pg_log_location_diff is null and it requires to be excluded */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is pg_log_location_diff referenced? We aren't using it anywhere in this code, are we? Maybe a third party library? It is definitely getting queried though. I tested the exporter with a streaming replication setup on my machine and I saw the following errors in the logs of the primary PostgreSQL instance:

2022-06-14 10:32:45.934 BST [94070] STATEMENT:  select * from pg_log_location_diff;
2022-06-14 10:32:47.403 BST [94070] ERROR:  relation "pg_log_location_diff" does not exist at character 15

Even Google has no clue what pg_log_location_diff is!!

const (
// Scrape query
statWalReceiver = `
WITH pg_wal_receiver AS (
SELECT status
, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())::float
as postgres_wal_receiver_replay_bytes
, (
CASE WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()
THEN 0
ELSE EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp())
END
) as postgres_wal_receiver_replay_lag
FROM pg_stat_wal_receiver
)
SELECT * FROM pg_wal_receiver WHERE postgres_wal_receiver_replay_lag IS NOT NULL /*postgres_exporter*/`
)

type statWalReceiverScraper struct {
walReceiverReplayBytes *prometheus.Desc
walReceiverReplayLag *prometheus.Desc
}

// NewStatWalReceiverScraper returns a new Scraper exposing postgres pg_stat_replication
func NewWalReceiverScraper() Scraper {
return &statWalReceiverScraper{
walReceiverReplayBytes: prometheus.NewDesc(
"postgres_wal_receiver_replay_lag_bytes",
"delay in standby wal replay bytes pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())::float",
[]string{"status"},
nil,
),
walReceiverReplayLag: prometheus.NewDesc(
"postgres_wal_receiver_replay_lag_seconds",
"delay in standby wal replay seconds EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()",
[]string{"status"},
nil,
),
}
}

func (c *statWalReceiverScraper) Name() string {
return "StatWalReceiverScraperr"
}

func (c *statWalReceiverScraper) Scrape(ctx context.Context, conn *pgx.Conn, version Version, ch chan<- prometheus.Metric) error {
var rows pgx.Rows
var err error

rows, err = conn.Query(ctx, statWalReceiver)

if err != nil {
return err
}
defer rows.Close()

var status string
var pgWalReceiverReplayBytes, pgWalReceiverReplayLag float64

for rows.Next() {

if err := rows.Scan(&status,
&pgWalReceiverReplayBytes,
&pgWalReceiverReplayLag); err != nil {

return err
}
// postgres_wal_receiver_replay_lag_bytes
ch <- prometheus.MustNewConstMetric(c.walReceiverReplayBytes,
prometheus.GaugeValue,
pgWalReceiverReplayBytes,
status)
// postgres_wal_receiver_replay_lag_seconds
ch <- prometheus.MustNewConstMetric(c.walReceiverReplayLag,
prometheus.GaugeValue,
pgWalReceiverReplayLag,
status)
}

err = rows.Err()
if err != nil {
return err
}

return nil
}