Skip to content

Commit

Permalink
feat(mysqlreceiver): add mysql.replica.time_behind_master and mysql.r…
Browse files Browse the repository at this point in the history
…eplica.sql_delay metrics

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
Dominik Rosiek committed Oct 11, 2022
1 parent 8bb7951 commit b19c140
Show file tree
Hide file tree
Showing 10 changed files with 684 additions and 89 deletions.
16 changes: 16 additions & 0 deletions .chloggen/drosiek-mysql-slave-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mysqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add mysql.replica.time_behind_source and mysql.replica.sql_delay metrics

# One or more tracking issues related to the change
issues: [14138]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
288 changes: 288 additions & 0 deletions receiver/mysqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package mysqlreceiver // import "github.com/open-telemetry/opentelemetry-collect

import (
"database/sql"
"errors"
"fmt"
"strings"

// registers the mysql driver
"github.com/go-sql-driver/mysql"
Expand All @@ -28,6 +30,7 @@ type client interface {
getInnodbStats() (map[string]string, error)
getTableIoWaitsStats() ([]TableIoWaitsStats, error)
getIndexIoWaitsStats() ([]IndexIoWaitsStats, error)
getReplicaStatusStats() ([]ReplicaStatusStats, error)
Close() error
}

Expand Down Expand Up @@ -58,6 +61,69 @@ type IndexIoWaitsStats struct {
index string
}

type ReplicaStatusStats struct {
replicaIOState string
sourceHost string
sourceUser string
sourcePort int64
connectRetry int64
sourceLogFile string
readSourceLogPos int64
relayLogFile string
relayLogPos int64
relaySourceLogFile string
replicaIORunning string
replicaSQLRunning string
replicateDoDB string
replicateIgnoreDB string
replicateDoTable string
replicateIgnoreTable string
replicateWildDoTable string
replicateWildIgnoreTable string
lastErrno int64
lastError string
skipCounter int64
execSourceLogPos int64
relayLogSpace int64
untilCondition string
untilLogFile string
untilLogPos string
sourceSSLAllowed string
sourceSSLCAFile string
sourceSSLCAPath string
sourceSSLCert string
sourceSSLCipher string
sourceSSLKey string
secondsBehindSource sql.NullInt64
sourceSSLVerifyServerCert string
lastIOErrno int64
lastIOError string
lastSQLErrno int64
lastSQLError string
replicateIgnoreServerIds string
sourceServerID int64
sourceUUID string
sourceInfoFile string
sqlDelay int64
sqlRemainingDelay sql.NullInt64
replicaSQLRunningState string
sourceRetryCount int64
sourceBind string
lastIOErrorTimestamp string
lastSQLErrorTimestamp string
sourceSSLCrl string
sourceSSLCrlpath string
retrievedGtidSet string
executedGtidSet string
autoPosition string
replicateRewriteDB string
channelName string
sourceTLSVersion string
sourcePublicKeyPath string
getSourcePublicKey int64
networkNamespace string
}

var _ client = (*mySQLClient)(nil)

func newMySQLClient(conf *Config) client {
Expand Down Expand Up @@ -152,6 +218,228 @@ func (c *mySQLClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) {
return stats, nil
}

func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
query := "SHOW REPLICA STATUS"
var me *mysql.MySQLError
rows, err := c.client.Query(query)
if err != nil {
if !errors.As(err, &me) {
return nil, err
}

if me.Number != 1064 {
return nil, err
}

// fallback to deprecated command for older versions
query = "SHOW SLAVE STATUS"
rows, err = c.client.Query(query)
if err != nil {
return nil, err
}
}

defer rows.Close()
cols, err := rows.Columns()
if err != nil {
return nil, err
}

var stats []ReplicaStatusStats
for rows.Next() {
var s ReplicaStatusStats
dest := []interface{}{}
for _, col := range cols {
switch strings.ToLower(col) {
case "replica_io_state":
fallthrough
case "slave_io_state":
dest = append(dest, &s.replicaIOState)
case "source_host":
fallthrough
case "master_host":
dest = append(dest, &s.sourceHost)
case "source_user":
fallthrough
case "master_user":
dest = append(dest, &s.sourceUser)
case "source_port":
fallthrough
case "master_port":
dest = append(dest, &s.sourcePort)
case "connect_retry":
dest = append(dest, &s.connectRetry)
case "source_log_file":
fallthrough
case "master_log_file":
dest = append(dest, &s.sourceLogFile)
case "read_source_log_pos":
fallthrough
case "read_master_log_pos":
dest = append(dest, &s.readSourceLogPos)
case "relay_log_file":
dest = append(dest, &s.relayLogFile)
case "relay_log_pos":
dest = append(dest, &s.relayLogPos)
case "relay_source_log_file":
fallthrough
case "relay_master_log_file":
dest = append(dest, &s.relaySourceLogFile)
case "replica_io_running":
fallthrough
case "slave_io_running":
dest = append(dest, &s.replicaIORunning)
case "replica_sql_running":
fallthrough
case "slave_sql_running":
dest = append(dest, &s.replicaSQLRunning)
case "replicate_do_db":
dest = append(dest, &s.replicateDoDB)
case "replicate_ignore_db":
dest = append(dest, &s.replicateIgnoreDB)
case "replicate_do_table":
dest = append(dest, &s.replicateDoTable)
case "replicate_ignore_table":
dest = append(dest, &s.replicateIgnoreTable)
case "replicate_wild_do_table":
dest = append(dest, &s.replicateWildDoTable)
case "replicate_wild_ignore_table":
dest = append(dest, &s.replicateWildIgnoreTable)
case "last_errno":
dest = append(dest, &s.lastErrno)
case "last_error":
dest = append(dest, &s.lastError)
case "skip_counter":
dest = append(dest, &s.skipCounter)
case "exec_source_log_pos":
fallthrough
case "exec_master_log_pos":
dest = append(dest, &s.execSourceLogPos)
case "relay_log_space":
dest = append(dest, &s.relayLogSpace)
case "until_condition":
dest = append(dest, &s.untilCondition)
case "until_log_file":
dest = append(dest, &s.untilLogFile)
case "until_log_pos":
dest = append(dest, &s.untilLogPos)
case "source_ssl_allowed":
fallthrough
case "master_ssl_allowed":
dest = append(dest, &s.sourceSSLAllowed)
case "source_ssl_ca_file":
fallthrough
case "master_ssl_ca_file":
dest = append(dest, &s.sourceSSLCAFile)
case "source_ssl_ca_path":
fallthrough
case "master_ssl_ca_path":
dest = append(dest, &s.sourceSSLCAPath)
case "source_ssl_cert":
fallthrough
case "master_ssl_cert":
dest = append(dest, &s.sourceSSLCert)
case "source_ssl_cipher":
fallthrough
case "master_ssl_cipher":
dest = append(dest, &s.sourceSSLCipher)
case "source_ssl_key":
fallthrough
case "master_ssl_key":
dest = append(dest, &s.sourceSSLKey)
case "seconds_behind_source":
fallthrough
case "seconds_behind_master":
dest = append(dest, &s.secondsBehindSource)
case "source_ssl_verify_server_cert":
fallthrough
case "master_ssl_verify_server_cert":
dest = append(dest, &s.sourceSSLVerifyServerCert)
case "last_io_errno":
dest = append(dest, &s.lastIOErrno)
case "last_io_error":
dest = append(dest, &s.lastIOError)
case "last_sql_errno":
dest = append(dest, &s.lastSQLErrno)
case "last_sql_error":
dest = append(dest, &s.lastSQLError)
case "replicate_ignore_server_ids":
dest = append(dest, &s.replicateIgnoreServerIds)
case "source_server_id":
fallthrough
case "master_server_id":
dest = append(dest, &s.sourceServerID)
case "source_uuid":
fallthrough
case "master_uuid":
dest = append(dest, &s.sourceUUID)
case "source_info_file":
fallthrough
case "master_info_file":
dest = append(dest, &s.sourceInfoFile)
case "sql_delay":
dest = append(dest, &s.sqlDelay)
case "sql_remaining_delay":
dest = append(dest, &s.sqlRemainingDelay)
case "replica_sql_running_state":
fallthrough
case "slave_sql_running_state":
dest = append(dest, &s.replicaSQLRunningState)
case "source_retry_count":
fallthrough
case "master_retry_count":
dest = append(dest, &s.sourceRetryCount)
case "source_bind":
fallthrough
case "master_bind":
dest = append(dest, &s.sourceBind)
case "last_io_error_timestamp":
dest = append(dest, &s.lastIOErrorTimestamp)
case "last_sql_error_timestamp":
dest = append(dest, &s.lastSQLErrorTimestamp)
case "source_ssl_crl":
fallthrough
case "master_ssl_crl":
dest = append(dest, &s.sourceSSLCrl)
case "source_ssl_crlpath":
fallthrough
case "master_ssl_crlpath":
dest = append(dest, &s.sourceSSLCrlpath)
case "retrieved_gtid_set":
dest = append(dest, &s.retrievedGtidSet)
case "executed_gtid_set":
dest = append(dest, &s.executedGtidSet)
case "auto_position":
dest = append(dest, &s.autoPosition)
case "replicate_rewrite_db":
dest = append(dest, &s.replicateRewriteDB)
case "channel_name":
dest = append(dest, &s.channelName)
case "source_tls_version":
fallthrough
case "master_tls_version":
dest = append(dest, &s.sourceTLSVersion)
case "source_public_key_path":
dest = append(dest, &s.sourcePublicKeyPath)
case "get_source_public_key":
dest = append(dest, &s.getSourcePublicKey)
case "network_namespace":
dest = append(dest, &s.networkNamespace)
default:
return nil, fmt.Errorf("unknown column name %s for replica status", col)
}
}
err := rows.Scan(dest...)

if err != nil {
return nil, err
}
stats = append(stats, s)
}

return stats, nil
}

func Query(c mySQLClient, query string) (map[string]string, error) {
rows, err := c.client.Query(query)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions receiver/mysqlreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ These are the metrics available for this scraper.
| **mysql.log_operations** | The number of InnoDB log operations. | 1 | Sum(Int) | <ul> <li>log_operations</li> </ul> |
| **mysql.operations** | The number of InnoDB operations. | 1 | Sum(Int) | <ul> <li>operations</li> </ul> |
| **mysql.page_operations** | The number of InnoDB page operations. | 1 | Sum(Int) | <ul> <li>page_operations</li> </ul> |
| mysql.replica.sql_delay | The number of seconds that the replica must lag the source. | s | Sum(Int) | <ul> </ul> |
| mysql.replica.time_behind_source | This field is an indication of how “late” the replica is. | s | Sum(Int) | <ul> </ul> |
| **mysql.row_locks** | The number of InnoDB row locks. | 1 | Sum(Int) | <ul> <li>row_locks</li> </ul> |
| **mysql.row_operations** | The number of InnoDB row operations. | 1 | Sum(Int) | <ul> <li>row_operations</li> </ul> |
| **mysql.sorts** | The number of MySQL sorts. | 1 | Sum(Int) | <ul> <li>sorts</li> </ul> |
Expand Down
Loading

0 comments on commit b19c140

Please sign in to comment.