Skip to content

Commit

Permalink
Add metric_version option to mysql input (influxdata#3954)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Apr 2, 2018
1 parent 64b2396 commit 82448a9
Show file tree
Hide file tree
Showing 4 changed files with 435 additions and 33 deletions.
22 changes: 12 additions & 10 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

### Release Notes

- The `mysql` input plugin has been updated to convert values to the
correct data type. This may cause a `field type error` when inserting into
InfluxDB due the change of types. It is recommended to drop the `mysql`,
`mysql_variables`, and `mysql_innodb`:
```
DROP MEASUREMENT mysql
DROP MEASUREMENT mysql_variables
DROP MEASUREMENT mysql_innodb
```
- The `mysql` input plugin has been updated fix a number of type convertion
issues. This may cause a `field type error` when inserting into InfluxDB due
the change of types.

To address this we have introduced a new `metric_version` option to control
enabling the new format. For in depth recommendations on upgrading please
reference the [mysql plugin documentation](./plugins/inputs/mysql/README.md#metric-version).

It is encouraged to migrate to the new model when possible as the old version
is deprecated and will be removed in a future version.

- The `postgresql` plugins now defaults to using a persistent connection to the database.
In environments where TCP connections are terminated the `max_lifetime`
Expand All @@ -26,7 +27,8 @@
is set. It is encouraged to enable this option when possible as the old
ordering is deprecated.

- The `httpjson` is now deprecated, please migrate to the new `http` input.
- The new `http` input configured with `data_format = "json"` can perform the
same task as the, now deprecated, `httpjson` input.


### New Inputs
Expand Down
93 changes: 88 additions & 5 deletions plugins/inputs/mysql/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# MySQL Input plugin
# MySQL Input Plugin

This plugin gathers the statistic data from MySQL server

Expand All @@ -18,9 +18,9 @@ This plugin gathers the statistic data from MySQL server
* File events statistics
* Table schema statistics

## Configuration
### Configuration

```
```toml
# Read metrics from one or many mysql servers
[[inputs.mysql]]
## specify servers via a url matching:
Expand Down Expand Up @@ -81,14 +81,97 @@ This plugin gathers the statistic data from MySQL server
#
## Some queries we may want to run less often (such as SHOW GLOBAL VARIABLES)
interval_slow = "30m"

## Optional SSL Config (will be used if tls=custom parameter specified in server uri)
ssl_ca = "/etc/telegraf/ca.pem"
ssl_cert = "/etc/telegraf/cert.pem"
ssl_key = "/etc/telegraf/key.pem"
```

## Measurements & Fields
#### Metric Version

When `metric_version = 2`, a variety of field type issues are corrected as well
as naming inconsistencies. If you have existing data on the original version
enabling this feature will cause a `field type error` when inserted into
InfluxDB due to the change of types. For this reason, you should keep the
`metric_version` unset until you are ready to migrate to the new format.

If preserving your old data is not required you may wish to drop conflicting
measurements:
```
DROP SERIES from mysql
DROP SERIES from mysql_variables
DROP SERIES from mysql_innodb
```

Otherwise, migration can be performed using the following steps:

1. Duplicate your `mysql` plugin configuration and add a `name_suffix` and
`metric_version = 2`, this will result in collection using both the old and new
style concurrently:
```toml
[[inputs.mysql]]
servers = ["tcp(127.0.0.1:3306)/"]

[[inputs.mysql]]
name_override = "_2"
metric_version = 2

servers = ["tcp(127.0.0.1:3306)/"]
```

2. Upgrade all affected Telegraf clients to version >=1.6.

New measurements will be created with the `name_suffix`, for example::
- `mysql_v2`
- `mysql_variables_v2`

3. Update charts, alerts, and other supporting code to the new format.
4. You can now remove the old `mysql` plugin configuration and remove old
measurements.

If you wish to remove the `name_suffix` you may use Kapacitor to copy the
historical data to the default name. Do this only after retiring the old
measurement name.

1. Use the techinique described above to write to multiple locations:
```toml
[[inputs.mysql]]
servers = ["tcp(127.0.0.1:3306)/"]
metric_version = 2

[[inputs.mysql]]
name_override = "_2"
metric_version = 2

servers = ["tcp(127.0.0.1:3306)/"]
```
2. Create a TICKScript to copy the historical data:
```
dbrp "telegraf"."autogen"
batch
|query('''
SELECT * FROM "telegraf"."autogen"."mysql_v2"
''')
.period(5m)
.every(5m)
|influxDBOut()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mysql')
```
3. Define a task for your script:
```sh
kapacitor define copy-measurement -tick copy-measurement.task
```
4. Run the task over the data you would like to migrate:
```sh
kapacitor replay-live batch -start 2018-03-30T20:00:00Z -stop 2018-04-01T12:00:00Z -rec-time -task copy-measurement
```
5. Verify copied data and repeat for other measurements.

### Metrics:
* Global statuses - all numeric and boolean values of `SHOW GLOBAL STATUSES`
* Global variables - all numeric and boolean values of `SHOW GLOBAL VARIABLES`
* Slave status - metrics from `SHOW SLAVE STATUS` the metrics are gathered when
Expand Down
158 changes: 140 additions & 18 deletions plugins/inputs/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/mysql/v1"

"github.com/go-sql-driver/mysql"
)
Expand Down Expand Up @@ -40,6 +41,7 @@ type Mysql struct {
SSLCA string `toml:"ssl_ca"`
SSLCert string `toml:"ssl_cert"`
SSLKey string `toml:"ssl_key"`
MetricVersion int `toml:"metric_version"`
}

var sampleConfig = `
Expand All @@ -52,6 +54,20 @@ var sampleConfig = `
#
## If no servers are specified, then localhost is used as the host.
servers = ["tcp(127.0.0.1:3306)/"]
## Selects the metric output format.
##
## This option exists to maintain backwards compatibility, if you have
## existing metrics do not set or change this value until you are ready to
## migrate to the new format.
##
## If you do not have existing metrics from this plugin set to the latest
## version.
##
## Telegraf >=1.6: metric_version = 2
## <1.6: metric_version = 1 (or unset)
metric_version = 2
## the limits for metrics form perf_events_statements
perf_events_statements_digest_text_limit = 120
perf_events_statements_limit = 250
Expand Down Expand Up @@ -541,7 +557,7 @@ func (m *Mysql) gatherGlobalVariables(db *sql.DB, serv string, acc telegraf.Accu
fields[key] = string(val)
tags[key] = string(val)
}
if value, ok := parseValue(val); ok {
if value, ok := m.parseValue(val); ok {
fields[key] = value
}
// Send 20 fields at a time
Expand Down Expand Up @@ -593,7 +609,7 @@ func (m *Mysql) gatherSlaveStatuses(db *sql.DB, serv string, acc telegraf.Accumu
// range over columns, and try to parse values
for i, col := range cols {
col = strings.ToLower(col)
if value, ok := parseValue(*vals[i].(*sql.RawBytes)); ok {
if value, ok := m.parseValue(*vals[i].(*sql.RawBytes)); ok {
fields["slave_"+col] = value
}
}
Expand Down Expand Up @@ -662,10 +678,75 @@ func (m *Mysql) gatherGlobalStatuses(db *sql.DB, serv string, acc telegraf.Accum
return err
}

key = strings.ToLower(key)
if m.MetricVersion < 2 {
var found bool
for _, mapped := range v1.Mappings {
if strings.HasPrefix(key, mapped.OnServer) {
// convert numeric values to integer
i, _ := strconv.Atoi(string(val))
fields[mapped.InExport+key[len(mapped.OnServer):]] = i
found = true
}
}
// Send 20 fields at a time
if len(fields) >= 20 {
acc.AddFields("mysql", fields, tags)
fields = make(map[string]interface{})
}
if found {
continue
}

if value, ok := parseValue(val); ok {
fields[key] = value
// search for specific values
switch key {
case "Queries":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["queries"] = i
}
case "Questions":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["questions"] = i
}
case "Slow_queries":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["slow_queries"] = i
}
case "Connections":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["connections"] = i
}
case "Syncs":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["syncs"] = i
}
case "Uptime":
i, err := strconv.ParseInt(string(val), 10, 64)
if err != nil {
acc.AddError(fmt.Errorf("E! Error mysql: parsing %s int value (%s)", key, err))
} else {
fields["uptime"] = i
}
}
} else {
key = strings.ToLower(key)
if value, ok := m.parseValue(val); ok {
fields[key] = value
}
}

// Send 20 fields at a time
Expand Down Expand Up @@ -820,7 +901,11 @@ func (m *Mysql) GatherProcessListStatuses(db *sql.DB, serv string, acc telegraf.
for s, c := range stateCounts {
fields[newNamespace("threads", s)] = c
}
acc.AddFields("mysql_process_list", fields, tags)
if m.MetricVersion < 2 {
acc.AddFields("mysql_info_schema", fields, tags)
} else {
acc.AddFields("mysql_process_list", fields, tags)
}
return nil
}

Expand Down Expand Up @@ -1033,7 +1118,11 @@ func (m *Mysql) gatherInfoSchemaAutoIncStatuses(db *sql.DB, serv string, acc tel
fields["auto_increment_column"] = incValue
fields["auto_increment_column_max"] = maxInt

acc.AddFields("mysql_table_schema", fields, tags)
if m.MetricVersion < 2 {
acc.AddFields("mysql_info_schema", fields, tags)
} else {
acc.AddFields("mysql_table_schema", fields, tags)
}
}
return nil
}
Expand All @@ -1059,7 +1148,7 @@ func (m *Mysql) gatherInnoDBMetrics(db *sql.DB, serv string, acc telegraf.Accumu
return err
}
key = strings.ToLower(key)
if value, ok := parseValue(val); ok {
if value, ok := m.parseValue(val); ok {
fields[key] = value
}
// Send 20 fields at a time
Expand Down Expand Up @@ -1430,31 +1519,64 @@ func (m *Mysql) gatherTableSchema(db *sql.DB, serv string, acc telegraf.Accumula
tags["schema"] = tableSchema
tags["table"] = tableName

acc.AddFields("mysql_table_schema",
map[string]interface{}{"rows": tableRows}, tags)
if m.MetricVersion < 2 {
acc.AddFields(newNamespace("info_schema", "table_rows"),
map[string]interface{}{"value": tableRows}, tags)

acc.AddFields("mysql_table_schema",
map[string]interface{}{"data_length": dataLength}, tags)
dlTags := copyTags(tags)
dlTags["component"] = "data_length"
acc.AddFields(newNamespace("info_schema", "table_size", "data_length"),
map[string]interface{}{"value": dataLength}, dlTags)

acc.AddFields("mysql_table_schema",
map[string]interface{}{"index_length": indexLength}, tags)
ilTags := copyTags(tags)
ilTags["component"] = "index_length"
acc.AddFields(newNamespace("info_schema", "table_size", "index_length"),
map[string]interface{}{"value": indexLength}, ilTags)

acc.AddFields("mysql_table_schema",
map[string]interface{}{"data_free": dataFree}, tags)
dfTags := copyTags(tags)
dfTags["component"] = "data_free"
acc.AddFields(newNamespace("info_schema", "table_size", "data_free"),
map[string]interface{}{"value": dataFree}, dfTags)
} else {
acc.AddFields("mysql_table_schema",
map[string]interface{}{"rows": tableRows}, tags)

acc.AddFields("mysql_table_schema",
map[string]interface{}{"data_length": dataLength}, tags)

acc.AddFields("mysql_table_schema",
map[string]interface{}{"index_length": indexLength}, tags)

acc.AddFields("mysql_table_schema",
map[string]interface{}{"data_free": dataFree}, tags)
}

versionTags := copyTags(tags)
versionTags["type"] = tableType
versionTags["engine"] = engine
versionTags["row_format"] = rowFormat
versionTags["create_options"] = createOptions

acc.AddFields("mysql_table_schema_version",
map[string]interface{}{"table_version": version}, versionTags)
if m.MetricVersion < 2 {
acc.AddFields(newNamespace("info_schema", "table_version"),
map[string]interface{}{"value": version}, versionTags)
} else {
acc.AddFields("mysql_table_schema_version",
map[string]interface{}{"table_version": version}, versionTags)
}
}
}
return nil
}

func (m *Mysql) parseValue(value sql.RawBytes) (interface{}, bool) {
if m.MetricVersion < 2 {
return v1.ParseValue(value)
} else {
return parseValue(value)
}
}

// parseValue can be used to convert values such as "ON","OFF","Yes","No" to 0,1
func parseValue(value sql.RawBytes) (interface{}, bool) {
if bytes.EqualFold(value, []byte("YES")) || bytes.Compare(value, []byte("ON")) == 0 {
Expand Down
Loading

0 comments on commit 82448a9

Please sign in to comment.