Skip to content

Commit

Permalink
Added userstats to mysql input plugin (influxdata#2137)
Browse files Browse the repository at this point in the history
* Added GatherUserStatistics, row Uptime in gatherGlobalStatuses, and version fields & tags

* Updated README file

* pulling in latest from master

* ran go fmt to fix formatting

* fix unreachable code

* few fixes

* cleaning up and applying suggestions from sparrc
  • Loading branch information
gioxchopper authored and sparrc committed Jan 13, 2017
1 parent b89c45b commit 0c9da09
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 1 deletion.
30 changes: 30 additions & 0 deletions plugins/inputs/mysql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This plugin gathers the statistic data from MySQL server
* Slave statuses
* Binlog size
* Process list
* User Statistics
* Info schema auto increment columns
* Table I/O waits
* Index I/O waits
Expand Down Expand Up @@ -44,6 +45,9 @@ This plugin gathers the statistic data from MySQL server
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
gather_process_list = true
#
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
gather_user_statistics = true
#
## gather auto_increment columns and max values from information schema
gather_info_schema_auto_inc = true
#
Expand Down Expand Up @@ -89,6 +93,30 @@ Requires to be turned on in configuration.
* binary_files_count(int, number)
* Process list - connection metrics from processlist for each user. It has the following tags
* connections(int, number)
* User Statistics - connection metrics from user statistics for each user. It has the following fields
* access_denied
* binlog_bytes_written
* busy_time
* bytes_received
* bytes_sent
* commit_transactions
* concurrent_connections
* connected_time
* cpu_time
* denied_connections
* empty_queries
* hostlost_connections
* other_commands
* rollback_transactions
* rows_fetched
* rows_updated
* select_commands
* server
* table_rows_read
* total_connections
* total_ssl_connections
* update_commands
* user
* Perf Table IO waits - total count and time of I/O waits event for each table
and process. It has following fields:
* table_io_waits_total_fetch(float, number)
Expand Down Expand Up @@ -158,6 +186,8 @@ The unit of fields varies by the tags.
* server (the host name from which the metrics are gathered)
* Process list measurement has following tags
* user (username for whom the metrics are gathered)
* User Statistics measurement has following tags
* user (username for whom the metrics are gathered)
* Perf table IO waits measurement has following tags
* schema
* name (object name for event or process)
Expand Down
177 changes: 176 additions & 1 deletion plugins/inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Mysql struct {
PerfEventsStatementsTimeLimit int64 `toml:"perf_events_statemetns_time_limit"`
TableSchemaDatabases []string `toml:"table_schema_databases"`
GatherProcessList bool `toml:"gather_process_list"`
GatherUserStatistics bool `toml:"gather_user_statistics"`
GatherInfoSchemaAutoInc bool `toml:"gather_info_schema_auto_inc"`
GatherSlaveStatus bool `toml:"gather_slave_status"`
GatherBinaryLogs bool `toml:"gather_binary_logs"`
Expand Down Expand Up @@ -60,6 +61,9 @@ var sampleConfig = `
## gather thread state counts from INFORMATION_SCHEMA.PROCESSLIST
gather_process_list = true
#
## gather thread state counts from INFORMATION_SCHEMA.USER_STATISTICS
gather_user_statistics = true
#
## gather auto_increment columns and max values from information schema
gather_info_schema_auto_inc = true
#
Expand Down Expand Up @@ -415,6 +419,10 @@ const (
WHERE ID != connection_id()
GROUP BY command,state
ORDER BY null`
infoSchemaUserStatisticsQuery = `
SELECT *,count(*)
FROM information_schema.user_statistics
GROUP BY user`
infoSchemaAutoIncQuery = `
SELECT table_schema, table_name, column_name, auto_increment,
CAST(pow(2, case data_type
Expand Down Expand Up @@ -530,7 +538,6 @@ const (
table_name
FROM information_schema.tables
WHERE table_schema = 'performance_schema' AND table_name = ?
`
)

Expand Down Expand Up @@ -582,6 +589,13 @@ func (m *Mysql) gatherServer(serv string, acc telegraf.Accumulator) error {
}
}

if m.GatherUserStatistics {
err = m.GatherUserStatisticsStatuses(db, serv, acc)
if err != nil {
return err
}
}

if m.GatherSlaveStatus {
err = m.gatherSlaveStatuses(db, serv, acc)
if err != nil {
Expand Down Expand Up @@ -669,6 +683,11 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
return err
}
key = strings.ToLower(key)
// parse mysql version and put into field and tag
if strings.Contains(key, "version") {
fields[key] = string(val)
tags[key] = string(val)
}
// parse value, if it is numeric then save, otherwise ignore
if floatVal, ok := parseValue(val); ok {
fields[key] = floatVal
Expand Down Expand Up @@ -854,6 +873,12 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
return err
}
fields["syncs"] = i
case "Uptime":
i, err := strconv.ParseInt(string(val.([]byte)), 10, 64)
if err != nil {
return err
}
fields["uptime"] = i
}
}
// Send any remaining fields
Expand Down Expand Up @@ -884,6 +909,74 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
}
}

// gather connection metrics from user_statistics for each user
if m.GatherUserStatistics {
conn_rows, err := db.Query("select user, total_connections, concurrent_connections, connected_time, busy_time, cpu_time, bytes_received, bytes_sent, binlog_bytes_written, rows_fetched, rows_updated, table_rows_read, select_commands, update_commands, other_commands, commit_transactions, rollback_transactions, denied_connections, lost_connections, access_denied, empty_queries, total_ssl_connections FROM INFORMATION_SCHEMA.USER_STATISTICS GROUP BY user")

for conn_rows.Next() {
var user string
var total_connections int64
var concurrent_connections int64
var connected_time int64
var busy_time int64
var cpu_time int64
var bytes_received int64
var bytes_sent int64
var binlog_bytes_written int64
var rows_fetched int64
var rows_updated int64
var table_rows_read int64
var select_commands int64
var update_commands int64
var other_commands int64
var commit_transactions int64
var rollback_transactions int64
var denied_connections int64
var lost_connections int64
var access_denied int64
var empty_queries int64
var total_ssl_connections int64

err = conn_rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections,
)

if err != nil {
return err
}

tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{
"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}

acc.AddFields("mysql_user_stats", fields, tags)
}
}

return nil
}

Expand Down Expand Up @@ -932,6 +1025,88 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
return nil
}

// GatherUserStatistics can be used to collect metrics on each running command
// and its state with its running count
func (m *Mysql) GatherUserStatisticsStatuses(db *sql.DB, serv string, acc telegraf.Accumulator) error {
// run query
rows, err := db.Query(infoSchemaUserStatisticsQuery)
if err != nil {
return err
}
defer rows.Close()
var (
user string
total_connections int64
concurrent_connections int64
connected_time int64
busy_time int64
cpu_time int64
bytes_received int64
bytes_sent int64
binlog_bytes_written int64
rows_fetched int64
rows_updated int64
table_rows_read int64
select_commands int64
update_commands int64
other_commands int64
commit_transactions int64
rollback_transactions int64
denied_connections int64
lost_connections int64
access_denied int64
empty_queries int64
total_ssl_connections int64
count uint32
)

var servtag string
servtag, err = parseDSN(serv)
if err != nil {
servtag = "localhost"
}

for rows.Next() {
err = rows.Scan(&user, &total_connections, &concurrent_connections,
&connected_time, &busy_time, &cpu_time, &bytes_received, &bytes_sent, &binlog_bytes_written,
&rows_fetched, &rows_updated, &table_rows_read, &select_commands, &update_commands, &other_commands,
&commit_transactions, &rollback_transactions, &denied_connections, &lost_connections, &access_denied,
&empty_queries, &total_ssl_connections, &count,
)
if err != nil {
return err
}

tags := map[string]string{"server": servtag, "user": user}
fields := map[string]interface{}{

"total_connections": total_connections,
"concurrent_connections": concurrent_connections,
"connected_time": connected_time,
"busy_time": busy_time,
"cpu_time": cpu_time,
"bytes_received": bytes_received,
"bytes_sent": bytes_sent,
"binlog_bytes_written": binlog_bytes_written,
"rows_fetched": rows_fetched,
"rows_updated": rows_updated,
"table_rows_read": table_rows_read,
"select_commands": select_commands,
"update_commands": update_commands,
"other_commands": other_commands,
"commit_transactions": commit_transactions,
"rollback_transactions": rollback_transactions,
"denied_connections": denied_connections,
"lost_connections": lost_connections,
"access_denied": access_denied,
"empty_queries": empty_queries,
"total_ssl_connections": total_ssl_connections,
}
acc.AddFields("mysql_user_stats", fields, tags)
}
return nil
}

// gatherPerfTableIOWaits can be used to get total count and time
// of I/O wait event for each table and process
func (m *Mysql) gatherPerfTableIOWaits(db *sql.DB, serv string, acc telegraf.Accumulator) error {
Expand Down

0 comments on commit 0c9da09

Please sign in to comment.