From b812cba6e0f1095b49570815be430e910002a989 Mon Sep 17 00:00:00 2001 From: zinefer Date: Wed, 9 May 2018 10:22:00 -0600 Subject: [PATCH] create statements metricset for the metricbeat postgresql module --- .../postgresql/statements/_meta/data.json | 54 +++++++++++ .../postgresql/statements/_meta/docs.asciidoc | 1 + .../postgresql/statements/_meta/fields.yml | 91 ++++++++++++++++++ .../module/postgresql/statements/data.go | 47 ++++++++++ .../postgresql/statements/statements.go | 57 +++++++++++ .../statements/statments_integration_test.go | 94 +++++++++++++++++++ 6 files changed, 344 insertions(+) create mode 100644 metricbeat/module/postgresql/statements/_meta/data.json create mode 100644 metricbeat/module/postgresql/statements/_meta/docs.asciidoc create mode 100644 metricbeat/module/postgresql/statements/_meta/fields.yml create mode 100644 metricbeat/module/postgresql/statements/data.go create mode 100644 metricbeat/module/postgresql/statements/statements.go create mode 100644 metricbeat/module/postgresql/statements/statments_integration_test.go diff --git a/metricbeat/module/postgresql/statements/_meta/data.json b/metricbeat/module/postgresql/statements/_meta/data.json new file mode 100644 index 000000000000..6b7a1359d7b2 --- /dev/null +++ b/metricbeat/module/postgresql/statements/_meta/data.json @@ -0,0 +1,54 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "beat": { + "hostname": "host.example.com", + "name": "host.example.com" + }, + "metricset": { + "host": "postgresql:5432", + "module": "postgresql", + "name": "statements", + "rtt": 115 + }, + "postgresql": { + "statements": { + "user": { + "id": 10 + }, + "database": { + "oid": "12407" + }, + "query": { + "id": "3240664890", + "text": "SELECT pg_sleep(?);", + "calls": "2", + "rows": "2", + "time": { + "total": 120066.497, + "min": 60029.533, + "max": 60036.964, + "mean": 60033.2485, + "stddev": 3.71549999999843 + }, + "memory": { + "shared": { + "hit": 0, + "read": 0, + "dirtied": 0, + "written": 0, + }, + "local": { + "hit": 0, + "read": 0, + "dirtied": 0, + "written": 0, + }, + "temp": { + "read": 0, + "written": 0, + } + } + } + } + } +} \ No newline at end of file diff --git a/metricbeat/module/postgresql/statements/_meta/docs.asciidoc b/metricbeat/module/postgresql/statements/_meta/docs.asciidoc new file mode 100644 index 000000000000..91b482808eaf --- /dev/null +++ b/metricbeat/module/postgresql/statements/_meta/docs.asciidoc @@ -0,0 +1 @@ +This is the `statements` metricset of the PostgreSQL module. diff --git a/metricbeat/module/postgresql/statements/_meta/fields.yml b/metricbeat/module/postgresql/statements/_meta/fields.yml new file mode 100644 index 000000000000..31c341fb279c --- /dev/null +++ b/metricbeat/module/postgresql/statements/_meta/fields.yml @@ -0,0 +1,91 @@ +- name: statements + type: group + description: > + One document per query per user per database, showing information related + invocation of that query, such as cpu usage and total time. Collected by + querying pg_stat_statements. + release: ga + fields: + - name: user.id + type: long + description: > + OID of the user logged into the backend that ran the query. + - name: database.oid + type: long + description: > + OID of the database the query was run on. + - name: query.id + type: long + description: > + ID of the statement. + - name: query.text + description: > + Query text + - name: query.calls + type: long + description: > + Number of times the query has been run. + - name: query.rows + type: long + description: > + Total number of rows returned by query. + - name: time.total + type: long + description: > + Total number of milliseconds spent running query. + - name: time.min + type: long + description: > + Minimum number of milliseconds spent running query. + - name: time.max + type: long + description: > + Maximum number of milliseconds spent running query. + - name: time.mean + type: long + description: > + Mean number of milliseconds spent running query. + - name: time.stddev + type: long + description: > + Population standard deviation of time spent running query, in milliseconds. + - name: memory.shared.hit + type: long + description: > + Total number of shared block cache hits by the query. + - name: memory.shared.read + type: long + description: > + Total number of shared block cache read by the query. + - name: memory.shared.dirtied + type: long + description: > + Total number of shared block cache dirtied by the query. + - name: memory.shared.written + type: long + description: > + Total number of shared block cache written by the query. + - name: memory.local.hit + type: long + description: > + Total number of local block cache hits by the query. + - name: memory.local.read + type: long + description: > + Total number of local block cache read by the query. + - name: memory.local.dirtied + type: long + description: > + Total number of local block cache dirtied by the query. + - name: memory.local.written + type: long + description: > + Total number of local block cache written by the query. + - name: memory.temp.read + type: long + description: > + Total number of temp block cache read by the query. + - name: memory.temp.written + type: long + description: > + Total number of temp block cache written by the query. \ No newline at end of file diff --git a/metricbeat/module/postgresql/statements/data.go b/metricbeat/module/postgresql/statements/data.go new file mode 100644 index 000000000000..89a4067a81ce --- /dev/null +++ b/metricbeat/module/postgresql/statements/data.go @@ -0,0 +1,47 @@ +package statements + +import ( + s "github.com/elastic/beats/libbeat/common/schema" + c "github.com/elastic/beats/libbeat/common/schema/mapstrstr" +) + +// Based on: https://www.postgresql.org/docs/9.6/static/pgstatstatements.html +var schema = s.Schema{ + "user": s.Object{ + "id": c.Int("userid"), + }, + "database": s.Object{ + "oid": c.Int("dbid"), + }, + "query": s.Object{ + "id": c.Str("queryid"), + "text": c.Str("query"), + "calls": c.Int("datid"), + "rows": c.Int("rows"), + "time": s.Object{ + "total": s.Object{"ms": c.Float("total_time")}, + "min": s.Object{"ms": c.Float("min_time")}, + "max": s.Object{"ms": c.Float("max_time")}, + "mean": s.Object{"ms": c.Float("mean_time")}, + "stddev": s.Object{"ms": c.Float("stddev_time")}, + }, + "memory": s.Object{ + "shared": s.Object{ + "hit": c.Int("shared_blks_hit"), + "read": c.Int("shared_blks_read"), + "dirtied": c.Int("shared_blks_dirtied"), + "written": c.Int("shared_blks_written"), + }, + "local": s.Object{ + "hit": c.Int("local_blks_hit"), + "read": c.Int("local_blks_read"), + "dirtied": c.Int("local_blks_dirtied"), + "written": c.Int("local_blks_written"), + }, + "temp": s.Object{ + "read": c.Int("temp_blks_read"), + "written": c.Int("temp_blks_written"), + }, + }, + }, +} diff --git a/metricbeat/module/postgresql/statements/statements.go b/metricbeat/module/postgresql/statements/statements.go new file mode 100644 index 000000000000..838c623df37a --- /dev/null +++ b/metricbeat/module/postgresql/statements/statements.go @@ -0,0 +1,57 @@ +package statements + +import ( + "database/sql" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/postgresql" + + // Register postgresql database/sql driver + _ "github.com/lib/pq" +) + +// init registers the MetricSet with the central registry. +// The New method will be called after the setup of the module and before starting to fetch data +func init() { + mb.Registry.MustAddMetricSet("postgresql", "statements", New, + mb.WithHostParser(postgresql.ParseURL), + mb.DefaultMetricSet(), + ) +} + +// MetricSet type defines all fields of the Postgresql MetricSet +type MetricSet struct { + mb.BaseMetricSet +} + +// New create a new instance of the MetricSet +// Part of new is also setting up the configuration by processing additional +// configuration entries if needed. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + return &MetricSet{BaseMetricSet: base}, nil +} + +// Fetch implements the data gathering and data conversion to the right format. +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + db, err := sql.Open("postgres", m.HostData().URI) + if err != nil { + return nil, err + } + defer db.Close() + + results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_statements") + if err != nil { + return nil, errors.Wrap(err, "QueryStats") + } + + events := []common.MapStr{} + for _, result := range results { + data, _ := schema.Apply(result) + events = append(events, data) + } + + return events, nil +} diff --git a/metricbeat/module/postgresql/statements/statments_integration_test.go b/metricbeat/module/postgresql/statements/statments_integration_test.go new file mode 100644 index 000000000000..15e672477a18 --- /dev/null +++ b/metricbeat/module/postgresql/statements/statments_integration_test.go @@ -0,0 +1,94 @@ +// +build integration + +package statements + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/tests/compose" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/postgresql" + + "github.com/stretchr/testify/assert" +) + +func TestFetch(t *testing.T) { + compose.EnsureUp(t, "postgresql") + + f := mbtest.NewEventsFetcher(t, getConfig()) + events, err := f.Fetch() + if !assert.NoError(t, err) { + t.FailNow() + } + + assert.True(t, len(events) > 0) + event := events[0] + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + + // Check event fields + assert.Contains(t, event, "user") + assert.Contains(t, event["user"].(common.MapStr), "id") + + assert.Contains(t, event, "database") + db_oid := event["database"].(common.MapStr)["oid"].(int64) + assert.True(t, db_oid > 0) + + assert.Contains(t, event, "query") + query := event["query"].(common.MapStr) + assert.Contains(t, query, "id") + assert.Contains(t, query, "text") + assert.Contains(t, query, "calls") + assert.Contains(t, query, "rows") + + assert.Contains(t, query, "time") + time := query["time"].(common.MapStr) + assert.Contains(t, time, "total") + assert.Contains(t, time, "min") + assert.Contains(t, time, "max") + assert.Contains(t, time, "mean") + assert.Contains(t, time, "stddev") + + assert.Contains(t, query["memory"], "shared") + memory := query["memory"].(common.MapStr) + + assert.Contains(t, memory, "shared") + shared := memory["shared"].(common.MapStr) + assert.Contains(t, shared, "hit") + assert.Contains(t, shared, "read") + assert.Contains(t, shared, "dirtied") + assert.Contains(t, shared, "written") + + assert.Contains(t, memory, "local") + local := memory["local"].(common.MapStr) + assert.Contains(t, local, "hit") + assert.Contains(t, local, "read") + assert.Contains(t, local, "dirtied") + assert.Contains(t, local, "written") + + assert.Contains(t, memory, "temp") + temp := memory["temp"].(common.MapStr) + assert.Contains(t, temp, "read") + assert.Contains(t, temp, "written") +} + +func TestData(t *testing.T) { + compose.EnsureUp(t, "postgresql") + f := mbtest.NewEventsFetcher(t, getConfig()) + + err := mbtest.WriteEvents(f, t) + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "postgresql", + "metricsets": []string{"statements"}, + "hosts": []string{postgresql.GetEnvDSN()}, + "username": postgresql.GetEnvUsername(), + "password": postgresql.GetEnvPassword(), + } +}