Skip to content

Commit

Permalink
[receiver/postgresqlreceiver] add schema attribute to postgresqlrecei…
Browse files Browse the repository at this point in the history
…ver (#30142)

**Description:** 

Changes the postgresql statistics receiver so that a table or indices
schema is stored in it's own attribute.

Currently the schema is stored as table name for *some* of the
statistics, while others completely ignore what schema a statistic comes
out of.

The code will allow computing things like sequential vs index scans
against a table without having to pick apart the resource attribute.

**Link to tracking Issue:**  #29559 

**Testing:** I've built the docker container for otel contrib and have
been running the result for a few days to observe that the attributes
are properly propagated.

**Documentation:** Documentation for the new field was added to the
metadata.yaml and documentation.md
  • Loading branch information
cultpony authored Jan 22, 2024
1 parent 82484a2 commit 5fce801
Show file tree
Hide file tree
Showing 26 changed files with 9,127 additions and 194 deletions.
29 changes: 29 additions & 0 deletions .chloggen/pgcol-add-schema-attribute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: postgresqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add schema attribute to postgresqlreceiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [ 29559 ]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Adds a new resource attribute to the PSQL receiver to store the schema of the table or index
Existing table attributes are adjusted to not include the schema, which was inconsistently used
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [ user ]
38 changes: 22 additions & 16 deletions receiver/postgresqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func (c *postgreSQLClient) getDatabaseSize(ctx context.Context, databases []stri
// tableStats contains a result for a row of the getDatabaseTableMetrics result
type tableStats struct {
database string
schema string
table string
live int64
dead int64
Expand All @@ -277,7 +278,7 @@ type tableStats struct {
}

func (c *postgreSQLClient) getDatabaseTableMetrics(ctx context.Context, db string) (map[tableIdentifier]tableStats, error) {
query := `SELECT schemaname || '.' || relname AS table,
query := `SELECT schemaname as schema, relname AS table,
n_live_tup AS live,
n_dead_tup AS dead,
n_tup_ins AS ins,
Expand All @@ -296,15 +297,16 @@ func (c *postgreSQLClient) getDatabaseTableMetrics(ctx context.Context, db strin
return nil, err
}
for rows.Next() {
var table string
var schema, table string
var live, dead, ins, upd, del, hotUpd, seqScans, tableSize, vacuumCount int64
err = rows.Scan(&table, &live, &dead, &ins, &upd, &del, &hotUpd, &seqScans, &tableSize, &vacuumCount)
err = rows.Scan(&schema, &table, &live, &dead, &ins, &upd, &del, &hotUpd, &seqScans, &tableSize, &vacuumCount)
if err != nil {
errors = multierr.Append(errors, err)
continue
}
ts[tableKey(db, table)] = tableStats{
ts[tableKey(db, schema, table)] = tableStats{
database: db,
schema: schema,
table: table,
live: live,
inserts: ins,
Expand All @@ -321,6 +323,7 @@ func (c *postgreSQLClient) getDatabaseTableMetrics(ctx context.Context, db strin

type tableIOStats struct {
database string
schema string
table string
heapRead int64
heapHit int64
Expand All @@ -333,7 +336,7 @@ type tableIOStats struct {
}

func (c *postgreSQLClient) getBlocksReadByTable(ctx context.Context, db string) (map[tableIdentifier]tableIOStats, error) {
query := `SELECT schemaname || '.' || relname AS table,
query := `SELECT schemaname as schema, relname AS table,
coalesce(heap_blks_read, 0) AS heap_read,
coalesce(heap_blks_hit, 0) AS heap_hit,
coalesce(idx_blks_read, 0) AS idx_read,
Expand All @@ -351,15 +354,16 @@ func (c *postgreSQLClient) getBlocksReadByTable(ctx context.Context, db string)
return nil, err
}
for rows.Next() {
var table string
var schema, table string
var heapRead, heapHit, idxRead, idxHit, toastRead, toastHit, tidxRead, tidxHit int64
err = rows.Scan(&table, &heapRead, &heapHit, &idxRead, &idxHit, &toastRead, &toastHit, &tidxRead, &tidxHit)
err = rows.Scan(&schema, &table, &heapRead, &heapHit, &idxRead, &idxHit, &toastRead, &toastHit, &tidxRead, &tidxHit)
if err != nil {
errors = multierr.Append(errors, err)
continue
}
tios[tableKey(db, table)] = tableIOStats{
tios[tableKey(db, schema, table)] = tableIOStats{
database: db,
schema: schema,
table: table,
heapRead: heapRead,
heapHit: heapHit,
Expand All @@ -377,13 +381,14 @@ func (c *postgreSQLClient) getBlocksReadByTable(ctx context.Context, db string)
type indexStat struct {
index string
table string
schema string
database string
size int64
scans int64
}

func (c *postgreSQLClient) getIndexStats(ctx context.Context, database string) (map[indexIdentifer]indexStat, error) {
query := `SELECT relname, indexrelname,
query := `SELECT schemaname, relname, indexrelname,
pg_relation_size(indexrelid) AS index_size,
idx_scan
FROM pg_stat_user_indexes;`
Expand All @@ -399,17 +404,18 @@ func (c *postgreSQLClient) getIndexStats(ctx context.Context, database string) (
var errs []error
for rows.Next() {
var (
table, index string
schema, table, index string
indexSize, indexScans int64
)
err := rows.Scan(&table, &index, &indexSize, &indexScans)
err := rows.Scan(&schema, &table, &index, &indexSize, &indexScans)
if err != nil {
errs = append(errs, err)
continue
}
stats[indexKey(database, table, index)] = indexStat{
stats[indexKey(database, schema, table, index)] = indexStat{
index: index,
table: table,
schema: schema,
database: database,
size: indexSize,
scans: indexScans,
Expand Down Expand Up @@ -642,10 +648,10 @@ func filterQueryByDatabases(baseQuery string, databases []string, groupBy bool)
return baseQuery + ";"
}

func tableKey(database, table string) tableIdentifier {
return tableIdentifier(fmt.Sprintf("%s|%s", database, table))
func tableKey(database, schema, table string) tableIdentifier {
return tableIdentifier(fmt.Sprintf("%s|%s|%s", database, schema, table))
}

func indexKey(database, table, index string) indexIdentifer {
return indexIdentifer(fmt.Sprintf("%s|%s|%s", database, table, index))
func indexKey(database, schema, table, index string) indexIdentifer {
return indexIdentifer(fmt.Sprintf("%s|%s|%s|%s", database, schema, table, index))
}
3 changes: 2 additions & 1 deletion receiver/postgresqlreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,5 @@ This metric requires WAL to be enabled with at least one replica.
| ---- | ----------- | ------ | ------- |
| postgresql.database.name | The name of the database. | Any Str | true |
| postgresql.index.name | The name of the index on a table. | Any Str | true |
| postgresql.table.name | The schema name followed by the table name. | Any Str | true |
| postgresql.schema.name | The schema name. | Any Str | true |
| postgresql.table.name | The table name. | Any Str | true |
9 changes: 9 additions & 0 deletions receiver/postgresqlreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,27 @@ import (
"github.com/testcontainers/testcontainers-go/wait"
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

const postgresqlPort = "5432"

func TestIntegration(t *testing.T) {
defer testutil.SetFeatureGateForTest(t, separateSchemaAttrGate, false)()
t.Run("single_db", integrationTest("single_db", []string{"otel"}))
t.Run("multi_db", integrationTest("multi_db", []string{"otel", "otel2"}))
t.Run("all_db", integrationTest("all_db", []string{}))
}

func TestIntegrationWithSeparateSchemaAttr(t *testing.T) {
defer testutil.SetFeatureGateForTest(t, separateSchemaAttrGate, true)()
t.Run("single_db_schemaattr", integrationTest("single_db_schemaattr", []string{"otel"}))
t.Run("multi_db_schemaattr", integrationTest("multi_db_schemaattr", []string{"otel", "otel2"}))
t.Run("all_db_schemaattr", integrationTest("all_db_schemaattr", []string{}))
}

func integrationTest(name string, databases []string) func(*testing.T) {
expectedFile := filepath.Join("testdata", "integration", "expected_"+name+".yaml")
return scraperinttest.NewIntegrationTest(
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ all_set:
enabled: true
postgresql.index.name:
enabled: true
postgresql.schema.name:
enabled: true
postgresql.table.name:
enabled: true
none_set:
Expand Down Expand Up @@ -123,5 +125,7 @@ none_set:
enabled: false
postgresql.index.name:
enabled: false
postgresql.schema.name:
enabled: false
postgresql.table.name:
enabled: false
6 changes: 5 additions & 1 deletion receiver/postgresqlreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ resource_attributes:
description: The name of the database.
enabled: true
type: string
postgresql.schema.name:
description: The schema name.
enabled: true
type: string
postgresql.table.name:
description: The schema name followed by the table name.
description: The table name.
enabled: true
type: string
postgresql.index.name:
Expand Down
Loading

0 comments on commit 5fce801

Please sign in to comment.