From 1280ee00a30b55d9e4b3495ccf37b76111324824 Mon Sep 17 00:00:00 2001 From: naman47vyas Date: Wed, 24 Jul 2024 16:43:42 +0530 Subject: [PATCH] client method for row ops from performance schema --- go.mod | 6 +- go.sum | 6 + receiver/mysqlreceiver/client.go | 59 ++++ receiver/mysqlreceiver/documentation.md | 32 +++ .../internal/metadata/generated_config.go | 16 ++ .../metadata/generated_config_test.go | 8 + .../internal/metadata/generated_metrics.go | 256 ++++++++++++++++++ .../metadata/generated_metrics_test.go | 72 +++++ .../internal/metadata/testdata/config.yaml | 16 ++ receiver/mysqlreceiver/metadata.yaml | 43 ++- receiver/mysqlreceiver/scraper.go | 30 +- 11 files changed, 541 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 599ee7c32317..dcbcdc4b852c 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib // For the OpenTelemetry Collector Contrib distribution specifically, see // https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -go 1.21.0 +go 1.22.2 + +toolchain go1.22.5 require ( github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector v0.102.0 @@ -511,6 +513,7 @@ require ( github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect + github.com/k0kubun/pp v3.0.1+incompatible // indirect github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect github.com/karrick/godirwalk v1.17.0 // indirect github.com/klauspost/compress v1.17.8 // indirect @@ -539,6 +542,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/microsoft/ApplicationInsights-Go v0.4.4 // indirect github.com/microsoft/go-mssqldb v1.7.2 // indirect + github.com/middleware-labs/innoParser v0.0.0-20240508090457-8c2fa2246395 // indirect github.com/miekg/dns v1.1.58 // indirect github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible // indirect github.com/mitchellh/copystructure v1.2.0 // indirect diff --git a/go.sum b/go.sum index 1ae022316b6b..48018aacd9ea 100644 --- a/go.sum +++ b/go.sum @@ -1729,6 +1729,10 @@ github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4d github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM= +github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= +github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40= +github.com/k0kubun/pp v3.0.1+incompatible/go.mod h1:GWse8YhT0p8pT4ir3ZgBbfZild3tgzSScAn6HmfYukg= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI= @@ -1836,6 +1840,8 @@ github.com/microsoft/ApplicationInsights-Go v0.4.4 h1:G4+H9WNs6ygSCe6sUyxRc2U81T github.com/microsoft/ApplicationInsights-Go v0.4.4/go.mod h1:fKRUseBqkw6bDiXTs3ESTiU/4YTIHsQS4W3fP2ieF4U= github.com/microsoft/go-mssqldb v1.7.2 h1:CHkFJiObW7ItKTJfHo1QX7QBBD1iV+mn1eOyRP3b/PA= github.com/microsoft/go-mssqldb v1.7.2/go.mod h1:kOvZKUdrhhFQmxLZqbwUV0rHkNkZpthMITIb2Ko1IoA= +github.com/middleware-labs/innoParser v0.0.0-20240508090457-8c2fa2246395 h1:e66QAbgGATZ550Iu3uCTrGF+F2UCeP5QwhAvXzvavn0= +github.com/middleware-labs/innoParser v0.0.0-20240508090457-8c2fa2246395/go.mod h1:K2Iq9MJAEQyQO+ZXQHraf1zxZgS+bRgv/D6p+ClJWRM= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= diff --git a/receiver/mysqlreceiver/client.go b/receiver/mysqlreceiver/client.go index 845c279aeb10..b0c3d6c7ba44 100644 --- a/receiver/mysqlreceiver/client.go +++ b/receiver/mysqlreceiver/client.go @@ -29,6 +29,7 @@ type client interface { getInnodbStatusStats() (map[string]int64, error, int) getTotalRows() ([]NRows, error) getTotalErrors() (int64, error) + getRowOperationStats() (RowOperationStats, error) Close() error } @@ -40,6 +41,12 @@ type mySQLClient struct { statementEventsTimeLimit time.Duration } +type RowOperationStats struct { + rowsRead int64 + rowsUpdated int64 + rowsDeleted int64 + rowsInserted int64 +} type IoWaitsStats struct { schema string name string @@ -233,6 +240,58 @@ func (c *mySQLClient) getVersion() (string, error) { return version, nil } +func (c *mySQLClient) getRowOperationStats() (RowOperationStats, error) { + query := "SELECT SUBSTRING_INDEX(DIGEST_TEXT, ' ', 1) AS statement_type, " + + "SUM(SUM_ROWS_AFFECTED) AS rows_affected, " + + "SUM(SUM_ROWS_SENT) AS rows_sent " + + "FROM performance_schema.events_statements_summary_by_digest " + + "WHERE DIGEST_TEXT LIKE 'SELECT% '" + + "OR DIGEST_TEXT LIKE 'INSERT%' " + + "OR DIGEST_TEXT LIKE 'UPDATE%' " + + "OR DIGEST_TEXT LIKE 'DELETE%' " + + "GROUP BY statement_type; " + + rows, err := c.client.Query(query) + rowOpsStats := new(RowOperationStats) + /* + +----------------+-----------------+---------------+-----------+---------------+ + | statement_type | execution_count | rows_affected | rows_sent | rows_examined | + +----------------+-----------------+---------------+-----------+---------------+ + | SELECT | 4862 | 0 | 246533 | 43871844 | + | UPDATE | 2800 | 395 | 0 | 480395 | + | DELETE | 400 | 0 | 0 | 79800 | + | INSERT | 800 | 400 | 0 | 80200 | + +----------------+-----------------+---------------+-----------+---------------+ + */ + if err != nil { + return *rowOpsStats, err + } + + defer rows.Close() + + for rows.Next() { + var rowsAffected int64 + var rowsSent int64 + var statementType string + err := rows.Scan(&statementType, &rowsAffected, &rowsSent) + + if err != nil { + return *rowOpsStats, err + } + + if statementType == "SELECT" { + rowOpsStats.rowsRead = rowsSent + } else if statementType == "UPDATE" { + rowOpsStats.rowsUpdated = rowsAffected + } else if statementType == "DELETE" { + rowOpsStats.rowsDeleted = rowsAffected + } else if statementType == "INSERT" { + rowOpsStats.rowsInserted = rowsAffected + } + } + return *rowOpsStats, nil +} + // getGlobalStats queries the db for global status metrics. func (c *mySQLClient) getGlobalStats() (map[string]string, error) { q := "SHOW GLOBAL STATUS;" diff --git a/receiver/mysqlreceiver/documentation.md b/receiver/mysqlreceiver/documentation.md index c9785ecc5e3e..066b81aee7b9 100644 --- a/receiver/mysqlreceiver/documentation.md +++ b/receiver/mysqlreceiver/documentation.md @@ -286,6 +286,38 @@ The number of InnoDB page operations. | ---- | ----------- | ------ | | operation | The page operation types. | Str: ``created``, ``read``, ``written`` | +### mysql.performance.rows_deleted + +The number of rows deleted in the database as per the performance schema. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {row} | Sum | Int | Cumulative | true | + +### mysql.performance.rows_inserted + +The number of rows inserted in the database as per the performance schema. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {row} | Sum | Int | Cumulative | true | + +### mysql.performance.rows_read + +The number of rows read in the database as per the performance schema. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {row} | Sum | Int | Cumulative | true | + +### mysql.performance.rows_updated + +The number of rows updated in the database as per the performance schema. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {row} | Sum | Int | Cumulative | true | + ### mysql.prepared_statements The number of times each type of prepared statement command has been issued. diff --git a/receiver/mysqlreceiver/internal/metadata/generated_config.go b/receiver/mysqlreceiver/internal/metadata/generated_config.go index 1bd198ae10db..198ed08a113f 100644 --- a/receiver/mysqlreceiver/internal/metadata/generated_config.go +++ b/receiver/mysqlreceiver/internal/metadata/generated_config.go @@ -54,6 +54,10 @@ type MetricsConfig struct { MysqlOpenedResources MetricConfig `mapstructure:"mysql.opened_resources"` MysqlOperations MetricConfig `mapstructure:"mysql.operations"` MysqlPageOperations MetricConfig `mapstructure:"mysql.page_operations"` + MysqlPerformanceRowsDeleted MetricConfig `mapstructure:"mysql.performance.rows_deleted"` + MysqlPerformanceRowsInserted MetricConfig `mapstructure:"mysql.performance.rows_inserted"` + MysqlPerformanceRowsRead MetricConfig `mapstructure:"mysql.performance.rows_read"` + MysqlPerformanceRowsUpdated MetricConfig `mapstructure:"mysql.performance.rows_updated"` MysqlPreparedStatements MetricConfig `mapstructure:"mysql.prepared_statements"` MysqlQueryClientCount MetricConfig `mapstructure:"mysql.query.client.count"` MysqlQueryCount MetricConfig `mapstructure:"mysql.query.count"` @@ -164,6 +168,18 @@ func DefaultMetricsConfig() MetricsConfig { MysqlPageOperations: MetricConfig{ Enabled: true, }, + MysqlPerformanceRowsDeleted: MetricConfig{ + Enabled: true, + }, + MysqlPerformanceRowsInserted: MetricConfig{ + Enabled: true, + }, + MysqlPerformanceRowsRead: MetricConfig{ + Enabled: true, + }, + MysqlPerformanceRowsUpdated: MetricConfig{ + Enabled: true, + }, MysqlPreparedStatements: MetricConfig{ Enabled: true, }, diff --git a/receiver/mysqlreceiver/internal/metadata/generated_config_test.go b/receiver/mysqlreceiver/internal/metadata/generated_config_test.go index 26c0901d22ed..9983e6e2da12 100644 --- a/receiver/mysqlreceiver/internal/metadata/generated_config_test.go +++ b/receiver/mysqlreceiver/internal/metadata/generated_config_test.go @@ -51,6 +51,10 @@ func TestMetricsBuilderConfig(t *testing.T) { MysqlOpenedResources: MetricConfig{Enabled: true}, MysqlOperations: MetricConfig{Enabled: true}, MysqlPageOperations: MetricConfig{Enabled: true}, + MysqlPerformanceRowsDeleted: MetricConfig{Enabled: true}, + MysqlPerformanceRowsInserted: MetricConfig{Enabled: true}, + MysqlPerformanceRowsRead: MetricConfig{Enabled: true}, + MysqlPerformanceRowsUpdated: MetricConfig{Enabled: true}, MysqlPreparedStatements: MetricConfig{Enabled: true}, MysqlQueryClientCount: MetricConfig{Enabled: true}, MysqlQueryCount: MetricConfig{Enabled: true}, @@ -116,6 +120,10 @@ func TestMetricsBuilderConfig(t *testing.T) { MysqlOpenedResources: MetricConfig{Enabled: false}, MysqlOperations: MetricConfig{Enabled: false}, MysqlPageOperations: MetricConfig{Enabled: false}, + MysqlPerformanceRowsDeleted: MetricConfig{Enabled: false}, + MysqlPerformanceRowsInserted: MetricConfig{Enabled: false}, + MysqlPerformanceRowsRead: MetricConfig{Enabled: false}, + MysqlPerformanceRowsUpdated: MetricConfig{Enabled: false}, MysqlPreparedStatements: MetricConfig{Enabled: false}, MysqlQueryClientCount: MetricConfig{Enabled: false}, MysqlQueryCount: MetricConfig{Enabled: false}, diff --git a/receiver/mysqlreceiver/internal/metadata/generated_metrics.go b/receiver/mysqlreceiver/internal/metadata/generated_metrics.go index 286c0f508d60..7920587c0fc8 100644 --- a/receiver/mysqlreceiver/internal/metadata/generated_metrics.go +++ b/receiver/mysqlreceiver/internal/metadata/generated_metrics.go @@ -2380,6 +2380,210 @@ func newMetricMysqlPageOperations(cfg MetricConfig) metricMysqlPageOperations { return m } +type metricMysqlPerformanceRowsDeleted struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mysql.performance.rows_deleted metric with initial data. +func (m *metricMysqlPerformanceRowsDeleted) init() { + m.data.SetName("mysql.performance.rows_deleted") + m.data.SetDescription("The number of rows deleted in the database as per the performance schema.") + m.data.SetUnit("{row}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricMysqlPerformanceRowsDeleted) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMysqlPerformanceRowsDeleted) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMysqlPerformanceRowsDeleted) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMysqlPerformanceRowsDeleted(cfg MetricConfig) metricMysqlPerformanceRowsDeleted { + m := metricMysqlPerformanceRowsDeleted{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricMysqlPerformanceRowsInserted struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mysql.performance.rows_inserted metric with initial data. +func (m *metricMysqlPerformanceRowsInserted) init() { + m.data.SetName("mysql.performance.rows_inserted") + m.data.SetDescription("The number of rows inserted in the database as per the performance schema.") + m.data.SetUnit("{row}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricMysqlPerformanceRowsInserted) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMysqlPerformanceRowsInserted) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMysqlPerformanceRowsInserted) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMysqlPerformanceRowsInserted(cfg MetricConfig) metricMysqlPerformanceRowsInserted { + m := metricMysqlPerformanceRowsInserted{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricMysqlPerformanceRowsRead struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mysql.performance.rows_read metric with initial data. +func (m *metricMysqlPerformanceRowsRead) init() { + m.data.SetName("mysql.performance.rows_read") + m.data.SetDescription("The number of rows read in the database as per the performance schema.") + m.data.SetUnit("{row}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricMysqlPerformanceRowsRead) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMysqlPerformanceRowsRead) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMysqlPerformanceRowsRead) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMysqlPerformanceRowsRead(cfg MetricConfig) metricMysqlPerformanceRowsRead { + m := metricMysqlPerformanceRowsRead{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricMysqlPerformanceRowsUpdated struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mysql.performance.rows_updated metric with initial data. +func (m *metricMysqlPerformanceRowsUpdated) init() { + m.data.SetName("mysql.performance.rows_updated") + m.data.SetDescription("The number of rows updated in the database as per the performance schema.") + m.data.SetUnit("{row}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricMysqlPerformanceRowsUpdated) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMysqlPerformanceRowsUpdated) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMysqlPerformanceRowsUpdated) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMysqlPerformanceRowsUpdated(cfg MetricConfig) metricMysqlPerformanceRowsUpdated { + m := metricMysqlPerformanceRowsUpdated{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + type metricMysqlPreparedStatements struct { data pmetric.Metric // data buffer for generated metric. config MetricConfig // metric config provided by user. @@ -3909,6 +4113,10 @@ type MetricsBuilder struct { metricMysqlOpenedResources metricMysqlOpenedResources metricMysqlOperations metricMysqlOperations metricMysqlPageOperations metricMysqlPageOperations + metricMysqlPerformanceRowsDeleted metricMysqlPerformanceRowsDeleted + metricMysqlPerformanceRowsInserted metricMysqlPerformanceRowsInserted + metricMysqlPerformanceRowsRead metricMysqlPerformanceRowsRead + metricMysqlPerformanceRowsUpdated metricMysqlPerformanceRowsUpdated metricMysqlPreparedStatements metricMysqlPreparedStatements metricMysqlQueryClientCount metricMysqlQueryClientCount metricMysqlQueryCount metricMysqlQueryCount @@ -3981,6 +4189,10 @@ func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, opt metricMysqlOpenedResources: newMetricMysqlOpenedResources(mbc.Metrics.MysqlOpenedResources), metricMysqlOperations: newMetricMysqlOperations(mbc.Metrics.MysqlOperations), metricMysqlPageOperations: newMetricMysqlPageOperations(mbc.Metrics.MysqlPageOperations), + metricMysqlPerformanceRowsDeleted: newMetricMysqlPerformanceRowsDeleted(mbc.Metrics.MysqlPerformanceRowsDeleted), + metricMysqlPerformanceRowsInserted: newMetricMysqlPerformanceRowsInserted(mbc.Metrics.MysqlPerformanceRowsInserted), + metricMysqlPerformanceRowsRead: newMetricMysqlPerformanceRowsRead(mbc.Metrics.MysqlPerformanceRowsRead), + metricMysqlPerformanceRowsUpdated: newMetricMysqlPerformanceRowsUpdated(mbc.Metrics.MysqlPerformanceRowsUpdated), metricMysqlPreparedStatements: newMetricMysqlPreparedStatements(mbc.Metrics.MysqlPreparedStatements), metricMysqlQueryClientCount: newMetricMysqlQueryClientCount(mbc.Metrics.MysqlQueryClientCount), metricMysqlQueryCount: newMetricMysqlQueryCount(mbc.Metrics.MysqlQueryCount), @@ -4111,6 +4323,10 @@ func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { mb.metricMysqlOpenedResources.emit(ils.Metrics()) mb.metricMysqlOperations.emit(ils.Metrics()) mb.metricMysqlPageOperations.emit(ils.Metrics()) + mb.metricMysqlPerformanceRowsDeleted.emit(ils.Metrics()) + mb.metricMysqlPerformanceRowsInserted.emit(ils.Metrics()) + mb.metricMysqlPerformanceRowsRead.emit(ils.Metrics()) + mb.metricMysqlPerformanceRowsUpdated.emit(ils.Metrics()) mb.metricMysqlPreparedStatements.emit(ils.Metrics()) mb.metricMysqlQueryClientCount.emit(ils.Metrics()) mb.metricMysqlQueryCount.emit(ils.Metrics()) @@ -4410,6 +4626,46 @@ func (mb *MetricsBuilder) RecordMysqlPageOperationsDataPoint(ts pcommon.Timestam return nil } +// RecordMysqlPerformanceRowsDeletedDataPoint adds a data point to mysql.performance.rows_deleted metric. +func (mb *MetricsBuilder) RecordMysqlPerformanceRowsDeletedDataPoint(ts pcommon.Timestamp, inputVal string) error { + val, err := strconv.ParseInt(inputVal, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse int64 for MysqlPerformanceRowsDeleted, value was %s: %w", inputVal, err) + } + mb.metricMysqlPerformanceRowsDeleted.recordDataPoint(mb.startTime, ts, val) + return nil +} + +// RecordMysqlPerformanceRowsInsertedDataPoint adds a data point to mysql.performance.rows_inserted metric. +func (mb *MetricsBuilder) RecordMysqlPerformanceRowsInsertedDataPoint(ts pcommon.Timestamp, inputVal string) error { + val, err := strconv.ParseInt(inputVal, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse int64 for MysqlPerformanceRowsInserted, value was %s: %w", inputVal, err) + } + mb.metricMysqlPerformanceRowsInserted.recordDataPoint(mb.startTime, ts, val) + return nil +} + +// RecordMysqlPerformanceRowsReadDataPoint adds a data point to mysql.performance.rows_read metric. +func (mb *MetricsBuilder) RecordMysqlPerformanceRowsReadDataPoint(ts pcommon.Timestamp, inputVal string) error { + val, err := strconv.ParseInt(inputVal, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse int64 for MysqlPerformanceRowsRead, value was %s: %w", inputVal, err) + } + mb.metricMysqlPerformanceRowsRead.recordDataPoint(mb.startTime, ts, val) + return nil +} + +// RecordMysqlPerformanceRowsUpdatedDataPoint adds a data point to mysql.performance.rows_updated metric. +func (mb *MetricsBuilder) RecordMysqlPerformanceRowsUpdatedDataPoint(ts pcommon.Timestamp, inputVal string) error { + val, err := strconv.ParseInt(inputVal, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse int64 for MysqlPerformanceRowsUpdated, value was %s: %w", inputVal, err) + } + mb.metricMysqlPerformanceRowsUpdated.recordDataPoint(mb.startTime, ts, val) + return nil +} + // RecordMysqlPreparedStatementsDataPoint adds a data point to mysql.prepared_statements metric. func (mb *MetricsBuilder) RecordMysqlPreparedStatementsDataPoint(ts pcommon.Timestamp, inputVal string, preparedStatementsCommandAttributeValue AttributePreparedStatementsCommand) error { val, err := strconv.ParseInt(inputVal, 10, 64) diff --git a/receiver/mysqlreceiver/internal/metadata/generated_metrics_test.go b/receiver/mysqlreceiver/internal/metadata/generated_metrics_test.go index 38488180fb65..00217e2a07b2 100644 --- a/receiver/mysqlreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/mysqlreceiver/internal/metadata/generated_metrics_test.go @@ -168,6 +168,22 @@ func TestMetricsBuilder(t *testing.T) { allMetricsCount++ mb.RecordMysqlPageOperationsDataPoint(ts, "1", AttributePageOperationsCreated) + defaultMetricsCount++ + allMetricsCount++ + mb.RecordMysqlPerformanceRowsDeletedDataPoint(ts, "1") + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordMysqlPerformanceRowsInsertedDataPoint(ts, "1") + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordMysqlPerformanceRowsReadDataPoint(ts, "1") + + defaultMetricsCount++ + allMetricsCount++ + mb.RecordMysqlPerformanceRowsUpdatedDataPoint(ts, "1") + defaultMetricsCount++ allMetricsCount++ mb.RecordMysqlPreparedStatementsDataPoint(ts, "1", AttributePreparedStatementsCommandExecute) @@ -733,6 +749,62 @@ func TestMetricsBuilder(t *testing.T) { attrVal, ok := dp.Attributes().Get("operation") assert.True(t, ok) assert.EqualValues(t, "created", attrVal.Str()) + case "mysql.performance.rows_deleted": + assert.False(t, validatedMetrics["mysql.performance.rows_deleted"], "Found a duplicate in the metrics slice: mysql.performance.rows_deleted") + validatedMetrics["mysql.performance.rows_deleted"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The number of rows deleted in the database as per the performance schema.", ms.At(i).Description()) + assert.Equal(t, "{row}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "mysql.performance.rows_inserted": + assert.False(t, validatedMetrics["mysql.performance.rows_inserted"], "Found a duplicate in the metrics slice: mysql.performance.rows_inserted") + validatedMetrics["mysql.performance.rows_inserted"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The number of rows inserted in the database as per the performance schema.", ms.At(i).Description()) + assert.Equal(t, "{row}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "mysql.performance.rows_read": + assert.False(t, validatedMetrics["mysql.performance.rows_read"], "Found a duplicate in the metrics slice: mysql.performance.rows_read") + validatedMetrics["mysql.performance.rows_read"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The number of rows read in the database as per the performance schema.", ms.At(i).Description()) + assert.Equal(t, "{row}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + case "mysql.performance.rows_updated": + assert.False(t, validatedMetrics["mysql.performance.rows_updated"], "Found a duplicate in the metrics slice: mysql.performance.rows_updated") + validatedMetrics["mysql.performance.rows_updated"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The number of rows updated in the database as per the performance schema.", ms.At(i).Description()) + assert.Equal(t, "{row}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) case "mysql.prepared_statements": assert.False(t, validatedMetrics["mysql.prepared_statements"], "Found a duplicate in the metrics slice: mysql.prepared_statements") validatedMetrics["mysql.prepared_statements"] = true diff --git a/receiver/mysqlreceiver/internal/metadata/testdata/config.yaml b/receiver/mysqlreceiver/internal/metadata/testdata/config.yaml index 16c9239376f5..32ff583cf7fc 100644 --- a/receiver/mysqlreceiver/internal/metadata/testdata/config.yaml +++ b/receiver/mysqlreceiver/internal/metadata/testdata/config.yaml @@ -53,6 +53,14 @@ all_set: enabled: true mysql.page_operations: enabled: true + mysql.performance.rows_deleted: + enabled: true + mysql.performance.rows_inserted: + enabled: true + mysql.performance.rows_read: + enabled: true + mysql.performance.rows_updated: + enabled: true mysql.prepared_statements: enabled: true mysql.query.client.count: @@ -168,6 +176,14 @@ none_set: enabled: false mysql.page_operations: enabled: false + mysql.performance.rows_deleted: + enabled: false + mysql.performance.rows_inserted: + enabled: false + mysql.performance.rows_read: + enabled: false + mysql.performance.rows_updated: + enabled: false mysql.prepared_statements: enabled: false mysql.query.client.count: diff --git a/receiver/mysqlreceiver/metadata.yaml b/receiver/mysqlreceiver/metadata.yaml index 1c20cf2bfd11..58b7e6ea4689 100644 --- a/receiver/mysqlreceiver/metadata.yaml +++ b/receiver/mysqlreceiver/metadata.yaml @@ -631,7 +631,7 @@ metrics: aggregation_temporality: cumulative - #DBM metrics + #DBM metrics mysql.innodb.rows_deleted: enabled: true description: Rate at which rows are being deleted in InnoDB. @@ -672,6 +672,47 @@ metrics: monotonic: false aggregation_temporality: cumulative + # Row opertation metrics from performance schema, as innodb stats are proved to be unreliable + mysql.performance.rows_inserted: + enabled: true + description: The number of rows inserted in the database as per the performance schema. + unit: '{row}' + sum: + value_type: int + input_type: string + monotonic: true + aggregation_temporality: cumulative + + mysql.performance.rows_read: + enabled: true + description: The number of rows read in the database as per the performance schema. + unit: '{row}' + sum: + value_type: int + input_type: string + monotonic: true + aggregation_temporality: cumulative + + mysql.performance.rows_updated: + enabled: true + description: The number of rows updated in the database as per the performance schema. + unit: '{row}' + sum: + value_type: int + input_type: string + monotonic: true + aggregation_temporality: cumulative + + mysql.performance.rows_deleted: + enabled: true + description: The number of rows deleted in the database as per the performance schema. + unit: '{row}' + sum: + value_type: int + input_type: string + monotonic: true + aggregation_temporality: cumulative + mysql.total_rows: enabled: true description: Total rows in the mysql db diff --git a/receiver/mysqlreceiver/scraper.go b/receiver/mysqlreceiver/scraper.go index 3ae9412b7e81..8b6b1fdd25a5 100644 --- a/receiver/mysqlreceiver/scraper.go +++ b/receiver/mysqlreceiver/scraper.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/receiver/scrapererror" "go.uber.org/zap" + "github.com/k0kubun/pp" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mysqlreceiver/internal/metadata" ) @@ -95,7 +96,6 @@ func (m *mySQLScraper) scrape(context.Context) (pmetric.Metrics, error) { m.scrapeIndexIoWaitsStats(now, errs) // collect table size metrics. - m.scrapeTableStats(now, errs) // collect performance event statements metrics. @@ -114,6 +114,10 @@ func (m *mySQLScraper) scrape(context.Context) (pmetric.Metrics, error) { // collect total errors m.scrapeTotalErrors(now, errs) + // collect row operation stats from performance schema as sometimes + // innodb row stats are unreliable + m.scrapeRowOperationStats(now, errs) + m.scraperInnodbMetricsForDBM(now, errs) rb := m.mb.NewResourceBuilder() @@ -130,6 +134,30 @@ func (m *mySQLScraper) scrape(context.Context) (pmetric.Metrics, error) { return m.mb.Emit(), errs.Combine() } +func (m *mySQLScraper) scrapeRowOperationStats(now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { + rowOperationStats, err := m.sqlclient.getRowOperationStats() + if err != nil { + pp.Print(err) + m.logger.Error("Failed to fetch row operation stats from performance schema", zap.Error(err)) + errs.AddPartial(4, err) + return + } + rowsDeleted := strconv.FormatInt(rowOperationStats.rowsDeleted, 10) + rowsInserted := strconv.FormatInt(rowOperationStats.rowsInserted, 10) + rowsUpdated := strconv.FormatInt(rowOperationStats.rowsUpdated, 10) + rowsRead := strconv.FormatInt(rowOperationStats.rowsInserted, 10) + + pp.Println(rowsDeleted) + pp.Println(rowsInserted) + pp.Println(rowsUpdated) + pp.Println(rowsRead) + + m.mb.RecordMysqlPerformanceRowsDeletedDataPoint(now, rowsDeleted) + m.mb.RecordMysqlPerformanceRowsInsertedDataPoint(now, rowsInserted) + m.mb.RecordMysqlPerformanceRowsUpdatedDataPoint(now, rowsUpdated) + m.mb.RecordMysqlPerformanceRowsReadDataPoint(now, rowsRead) +} + func (m *mySQLScraper) scrapeGlobalStats(now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { globalStats, err := m.sqlclient.getGlobalStats() if err != nil {