diff --git a/receiver/mysqlreceiver/client.go b/receiver/mysqlreceiver/client.go index 00f789c69476..76a06cfcf7bf 100644 --- a/receiver/mysqlreceiver/client.go +++ b/receiver/mysqlreceiver/client.go @@ -7,6 +7,7 @@ import ( "context" "database/sql" "fmt" + "strconv" "strings" "time" @@ -30,6 +31,7 @@ type client interface { getTotalRows() ([]NRows, error) getTotalErrors() (int64, error) getRowOperationStats() (RowOperationStats, error) + getActiveConnections() (int64, error) Close() error } @@ -570,6 +572,25 @@ func (c *mySQLClient) getTotalErrors() (int64, error) { return nerrors, nil } +func (c *mySQLClient) getActiveConnections() (int64, error) { + query := "SHOW STATUS WHERE `variable_name` = 'Threads_connected'" + + var varName string + var value string + + err := c.client.QueryRow(query).Scan(&varName, &value) + if err != nil { + return -1, fmt.Errorf("failed to scan active connections: %w", err) + } + + connections, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return -1, fmt.Errorf("failed to parse active connections count: %w", err) + } + + return connections, nil +} + func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, error) { query := "SELECT OBJECT_SCHEMA, OBJECT_NAME, COUNT_READ_NORMAL, COUNT_READ_WITH_SHARED_LOCKS," + "COUNT_READ_HIGH_PRIORITY, COUNT_READ_NO_INSERT, COUNT_READ_EXTERNAL, COUNT_WRITE_ALLOW_WRITE," + diff --git a/receiver/mysqlreceiver/documentation.md b/receiver/mysqlreceiver/documentation.md index 066b81aee7b9..f99d3d9e1502 100644 --- a/receiver/mysqlreceiver/documentation.md +++ b/receiver/mysqlreceiver/documentation.md @@ -98,6 +98,14 @@ The number of times each type of command has been executed. | ---- | ----------- | ------ | | command | The command types. | Str: ``delete``, ``insert``, ``select``, ``update`` | +### mysql.connection.active.count + +The numner of active connections to the MySQL server + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + ### mysql.connection.count The number of connection attempts (successful or not) to the MySQL server. diff --git a/receiver/mysqlreceiver/internal/metadata/generated_config.go b/receiver/mysqlreceiver/internal/metadata/generated_config.go index 198ed08a113f..646393dd0fb3 100644 --- a/receiver/mysqlreceiver/internal/metadata/generated_config.go +++ b/receiver/mysqlreceiver/internal/metadata/generated_config.go @@ -36,6 +36,7 @@ type MetricsConfig struct { MysqlBufferPoolUsage MetricConfig `mapstructure:"mysql.buffer_pool.usage"` MysqlClientNetworkIo MetricConfig `mapstructure:"mysql.client.network.io"` MysqlCommands MetricConfig `mapstructure:"mysql.commands"` + MysqlConnectionActiveCount MetricConfig `mapstructure:"mysql.connection.active.count"` MysqlConnectionCount MetricConfig `mapstructure:"mysql.connection.count"` MysqlConnectionErrors MetricConfig `mapstructure:"mysql.connection.errors"` MysqlDoubleWrites MetricConfig `mapstructure:"mysql.double_writes"` @@ -114,6 +115,9 @@ func DefaultMetricsConfig() MetricsConfig { MysqlCommands: MetricConfig{ Enabled: true, }, + MysqlConnectionActiveCount: MetricConfig{ + Enabled: true, + }, MysqlConnectionCount: MetricConfig{ Enabled: true, }, diff --git a/receiver/mysqlreceiver/internal/metadata/generated_config_test.go b/receiver/mysqlreceiver/internal/metadata/generated_config_test.go index 9983e6e2da12..25da3070ca8d 100644 --- a/receiver/mysqlreceiver/internal/metadata/generated_config_test.go +++ b/receiver/mysqlreceiver/internal/metadata/generated_config_test.go @@ -33,6 +33,7 @@ func TestMetricsBuilderConfig(t *testing.T) { MysqlBufferPoolUsage: MetricConfig{Enabled: true}, MysqlClientNetworkIo: MetricConfig{Enabled: true}, MysqlCommands: MetricConfig{Enabled: true}, + MysqlConnectionActiveCount: MetricConfig{Enabled: true}, MysqlConnectionCount: MetricConfig{Enabled: true}, MysqlConnectionErrors: MetricConfig{Enabled: true}, MysqlDoubleWrites: MetricConfig{Enabled: true}, @@ -102,6 +103,7 @@ func TestMetricsBuilderConfig(t *testing.T) { MysqlBufferPoolUsage: MetricConfig{Enabled: false}, MysqlClientNetworkIo: MetricConfig{Enabled: false}, MysqlCommands: MetricConfig{Enabled: false}, + MysqlConnectionActiveCount: MetricConfig{Enabled: false}, MysqlConnectionCount: MetricConfig{Enabled: false}, MysqlConnectionErrors: MetricConfig{Enabled: false}, MysqlDoubleWrites: MetricConfig{Enabled: false}, diff --git a/receiver/mysqlreceiver/internal/metadata/generated_metrics.go b/receiver/mysqlreceiver/internal/metadata/generated_metrics.go index 7920587c0fc8..5912c3a8c9dc 100644 --- a/receiver/mysqlreceiver/internal/metadata/generated_metrics.go +++ b/receiver/mysqlreceiver/internal/metadata/generated_metrics.go @@ -1430,6 +1430,55 @@ func newMetricMysqlCommands(cfg MetricConfig) metricMysqlCommands { return m } +type metricMysqlConnectionActiveCount struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills mysql.connection.active.count metric with initial data. +func (m *metricMysqlConnectionActiveCount) init() { + m.data.SetName("mysql.connection.active.count") + m.data.SetDescription("The numner of active connections to the MySQL server") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricMysqlConnectionActiveCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricMysqlConnectionActiveCount) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricMysqlConnectionActiveCount) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricMysqlConnectionActiveCount(cfg MetricConfig) metricMysqlConnectionActiveCount { + m := metricMysqlConnectionActiveCount{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + type metricMysqlConnectionCount struct { data pmetric.Metric // data buffer for generated metric. config MetricConfig // metric config provided by user. @@ -4095,6 +4144,7 @@ type MetricsBuilder struct { metricMysqlBufferPoolUsage metricMysqlBufferPoolUsage metricMysqlClientNetworkIo metricMysqlClientNetworkIo metricMysqlCommands metricMysqlCommands + metricMysqlConnectionActiveCount metricMysqlConnectionActiveCount metricMysqlConnectionCount metricMysqlConnectionCount metricMysqlConnectionErrors metricMysqlConnectionErrors metricMysqlDoubleWrites metricMysqlDoubleWrites @@ -4171,6 +4221,7 @@ func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, opt metricMysqlBufferPoolUsage: newMetricMysqlBufferPoolUsage(mbc.Metrics.MysqlBufferPoolUsage), metricMysqlClientNetworkIo: newMetricMysqlClientNetworkIo(mbc.Metrics.MysqlClientNetworkIo), metricMysqlCommands: newMetricMysqlCommands(mbc.Metrics.MysqlCommands), + metricMysqlConnectionActiveCount: newMetricMysqlConnectionActiveCount(mbc.Metrics.MysqlConnectionActiveCount), metricMysqlConnectionCount: newMetricMysqlConnectionCount(mbc.Metrics.MysqlConnectionCount), metricMysqlConnectionErrors: newMetricMysqlConnectionErrors(mbc.Metrics.MysqlConnectionErrors), metricMysqlDoubleWrites: newMetricMysqlDoubleWrites(mbc.Metrics.MysqlDoubleWrites), @@ -4305,6 +4356,7 @@ func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { mb.metricMysqlBufferPoolUsage.emit(ils.Metrics()) mb.metricMysqlClientNetworkIo.emit(ils.Metrics()) mb.metricMysqlCommands.emit(ils.Metrics()) + mb.metricMysqlConnectionActiveCount.emit(ils.Metrics()) mb.metricMysqlConnectionCount.emit(ils.Metrics()) mb.metricMysqlConnectionErrors.emit(ils.Metrics()) mb.metricMysqlDoubleWrites.emit(ils.Metrics()) @@ -4456,6 +4508,11 @@ func (mb *MetricsBuilder) RecordMysqlCommandsDataPoint(ts pcommon.Timestamp, inp return nil } +// RecordMysqlConnectionActiveCountDataPoint adds a data point to mysql.connection.active.count metric. +func (mb *MetricsBuilder) RecordMysqlConnectionActiveCountDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricMysqlConnectionActiveCount.recordDataPoint(mb.startTime, ts, val) +} + // RecordMysqlConnectionCountDataPoint adds a data point to mysql.connection.count metric. func (mb *MetricsBuilder) RecordMysqlConnectionCountDataPoint(ts pcommon.Timestamp, inputVal string) error { val, err := strconv.ParseInt(inputVal, 10, 64) diff --git a/receiver/mysqlreceiver/internal/metadata/generated_metrics_test.go b/receiver/mysqlreceiver/internal/metadata/generated_metrics_test.go index 00217e2a07b2..d6b4bbc0b801 100644 --- a/receiver/mysqlreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/mysqlreceiver/internal/metadata/generated_metrics_test.go @@ -99,6 +99,10 @@ func TestMetricsBuilder(t *testing.T) { allMetricsCount++ mb.RecordMysqlCommandsDataPoint(ts, "1", AttributeCommandDelete) + defaultMetricsCount++ + allMetricsCount++ + mb.RecordMysqlConnectionActiveCountDataPoint(ts, 1) + defaultMetricsCount++ allMetricsCount++ mb.RecordMysqlConnectionCountDataPoint(ts, "1") @@ -440,6 +444,18 @@ func TestMetricsBuilder(t *testing.T) { attrVal, ok := dp.Attributes().Get("command") assert.True(t, ok) assert.EqualValues(t, "delete", attrVal.Str()) + case "mysql.connection.active.count": + assert.False(t, validatedMetrics["mysql.connection.active.count"], "Found a duplicate in the metrics slice: mysql.connection.active.count") + validatedMetrics["mysql.connection.active.count"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "The numner of active connections to the MySQL server", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) case "mysql.connection.count": assert.False(t, validatedMetrics["mysql.connection.count"], "Found a duplicate in the metrics slice: mysql.connection.count") validatedMetrics["mysql.connection.count"] = true diff --git a/receiver/mysqlreceiver/internal/metadata/testdata/config.yaml b/receiver/mysqlreceiver/internal/metadata/testdata/config.yaml index 32ff583cf7fc..0b6db7f91eff 100644 --- a/receiver/mysqlreceiver/internal/metadata/testdata/config.yaml +++ b/receiver/mysqlreceiver/internal/metadata/testdata/config.yaml @@ -17,6 +17,8 @@ all_set: enabled: true mysql.commands: enabled: true + mysql.connection.active.count: + enabled: true mysql.connection.count: enabled: true mysql.connection.errors: @@ -140,6 +142,8 @@ none_set: enabled: false mysql.commands: enabled: false + mysql.connection.active.count: + enabled: false mysql.connection.count: enabled: false mysql.connection.errors: diff --git a/receiver/mysqlreceiver/metadata.yaml b/receiver/mysqlreceiver/metadata.yaml index 40faad297612..769185b76ab3 100644 --- a/receiver/mysqlreceiver/metadata.yaml +++ b/receiver/mysqlreceiver/metadata.yaml @@ -486,6 +486,14 @@ metrics: monotonic: false aggregation_temporality: cumulative attributes: [schema, table_name, write_lock_type] + + mysql.connection.active.count: + enabled: true + description: The numner of active connections to the MySQL server + unit: 1 + gauge: + value_type: int + mysql.connection.count: enabled: true description: The number of connection attempts (successful or not) to the MySQL server. diff --git a/receiver/mysqlreceiver/scraper.go b/receiver/mysqlreceiver/scraper.go index 02c271395188..21f0c29c87c0 100644 --- a/receiver/mysqlreceiver/scraper.go +++ b/receiver/mysqlreceiver/scraper.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/receiver/scrapererror" "go.uber.org/zap" + "github.com/k0kubun/pp" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mysqlreceiver/internal/metadata" ) @@ -118,6 +119,8 @@ func (m *mySQLScraper) scrape(context.Context) (pmetric.Metrics, error) { m.scraperInnodbMetricsForDBM(now, errs) + m.scrapeActiveConnections(now, errs) + rb := m.mb.NewResourceBuilder() version, err := m.sqlclient.getVersion() @@ -677,6 +680,16 @@ func (m *mySQLScraper) scrapeReplicaStatusStats(now pcommon.Timestamp) { } } +func (m *mySQLScraper) scrapeActiveConnections(now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { + activeConnections, err := m.sqlclient.getActiveConnections() + pp.Println("Trying to scrape active connections") + if err != nil { + m.logger.Info("Failed to fetch active connections", zap.Error(err)) + return + } + m.mb.RecordMysqlConnectionActiveCountDataPoint(now, activeConnections) +} + func addPartialIfError(errors *scrapererror.ScrapeErrors, err error) { if err != nil { errors.AddPartial(1, err) diff --git a/receiver/mysqlreceiver/scraper_test.go b/receiver/mysqlreceiver/scraper_test.go index edd42d05c779..b9f0e2dbac86 100644 --- a/receiver/mysqlreceiver/scraper_test.go +++ b/receiver/mysqlreceiver/scraper_test.go @@ -73,6 +73,7 @@ func TestScrape(t *testing.T) { totalRowsFile: "total_rows_stats", totalErrorsFile: "total_error_stats", rowOperationsStatsFile: "row_operations_status", + activeConnectionsFile: "active_connections", } scraper.renameCommands = true @@ -121,6 +122,7 @@ func TestScrape(t *testing.T) { totalRowsFile: "total_rows_empty", totalErrorsFile: "total_errors_empty", rowOperationsStatsFile: "row_operations_status_empty", + activeConnectionsFile: "active_connections_empty", } actualMetrics, scrapeErr := scraper.scrape(context.Background()) @@ -163,6 +165,7 @@ type mockClient struct { totalRowsFile string totalErrorsFile string rowOperationsStatsFile string + activeConnectionsFile string } func readFile(fname string) (map[string]string, error) { @@ -197,6 +200,42 @@ func (c *mockClient) getInnodbStats() (map[string]string, error) { return readFile(c.innodbStatsFile) } +// getActiveConnections implements client. +func (c *mockClient) getActiveConnections() (int64, error) { + // Open test data file + file, err := os.Open(filepath.Join("testdata", "scraper", c.rowOperationsStatsFile+".txt")) + if err != nil { + return -1, fmt.Errorf("failed to open test data file: %w", err) + } + defer file.Close() + + // Create scanner to read test data + scanner := bufio.NewScanner(file) + + // Find the Threads_connected line + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "Threads_connected") { + // Split the line by whitespace and get the value + fields := strings.Fields(line) + if len(fields) >= 2 { + // Parse the value to int64 + connections, err := strconv.ParseInt(fields[len(fields)-1], 10, 64) + if err != nil { + return -1, fmt.Errorf("failed to parse connection count from test data: %w", err) + } + return connections, nil + } + } + } + + if err := scanner.Err(); err != nil { + return -1, fmt.Errorf("error reading test data: %w", err) + } + + return -1, fmt.Errorf("Threads_connected value not found in test data") +} + func (c *mockClient) getRowOperationStats() (RowOperationStats, error) { rowOpsStats := new(RowOperationStats) file, err := os.Open(filepath.Join("testdata", "scraper", c.rowOperationsStatsFile+".txt")) diff --git a/receiver/mysqlreceiver/testdata/scraper/active_connections.txt b/receiver/mysqlreceiver/testdata/scraper/active_connections.txt new file mode 100644 index 000000000000..52218bdc18af --- /dev/null +++ b/receiver/mysqlreceiver/testdata/scraper/active_connections.txt @@ -0,0 +1 @@ +Threads_connected 1 \ No newline at end of file diff --git a/receiver/mysqlreceiver/testdata/scraper/active_connections_empty.txt b/receiver/mysqlreceiver/testdata/scraper/active_connections_empty.txt new file mode 100644 index 000000000000..e69de29bb2d1