Skip to content

Commit

Permalink
Mariadb/extending mysql for mariadb (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
naman47vyas authored Jul 18, 2024
1 parent 2bfe8e9 commit ed0e43a
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 40 deletions.
80 changes: 57 additions & 23 deletions receiver/mysqlreceiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type IndexIoWaitsStats struct {
type TableStats struct {
schema string
name string
rows int64
averageRowLength int64
dataLength int64
indexLength int64
rows sql.NullInt64
averageRowLength sql.NullInt64
dataLength sql.NullInt64
indexLength sql.NullInt64
}

type StatementEventStats struct {
Expand Down Expand Up @@ -230,7 +230,6 @@ func (c *mySQLClient) getVersion() (string, error) {
if err != nil {
return "", err
}

return version, nil
}

Expand All @@ -247,7 +246,6 @@ func (c *mySQLClient) getInnodbStats() (map[string]string, error) {
}

func (c *mySQLClient) getInnodbStatusStats() (map[string]int64, error, int) {

/*
RETURNS:
map[string]int64 :
Expand Down Expand Up @@ -282,6 +280,7 @@ func (c *mySQLClient) getInnodbStatusStats() (map[string]int64, error, int) {

// TODO: Suggest better value if there's an error for the metric.
if mysqlErr != nil {

err := fmt.Errorf("error querying the mysql db for innodb status %v", mysqlErr)
return nil, err, 0
}
Expand All @@ -301,6 +300,7 @@ func (c *mySQLClient) getInnodbStatusStats() (map[string]int64, error, int) {
var parserErrs error
parserErrs = nil
if total_errs > 0 {

errorString := flattenErrorMap(errs)
parserErrs = fmt.Errorf(errorString)
}
Expand All @@ -310,7 +310,7 @@ func (c *mySQLClient) getInnodbStatusStats() (map[string]int64, error, int) {

type NRows struct {
dbname string
totalRows int64
totalRows sql.NullInt64
}

func (c *mySQLClient) getTotalRows() ([]NRows, error) {
Expand All @@ -331,6 +331,7 @@ func (c *mySQLClient) getTotalRows() ([]NRows, error) {
var r NRows
err := rows.Scan(&r.dbname, &r.totalRows)
if err != nil {

return nil, err
}
nr = append(nr, r)
Expand All @@ -352,10 +353,16 @@ func (c *mySQLClient) getTableStats() ([]TableStats, error) {
var stats []TableStats
for rows.Next() {
var s TableStats
err := rows.Scan(&s.schema, &s.name,
&s.rows, &s.averageRowLength,
&s.dataLength, &s.indexLength)
err := rows.Scan(
&s.schema,
&s.name,
&s.rows,
&s.averageRowLength,
&s.dataLength,
&s.indexLength,
)
if err != nil {

return nil, err
}
stats = append(stats, s)
Expand All @@ -373,6 +380,7 @@ func (c *mySQLClient) getTableIoWaitsStats() ([]TableIoWaitsStats, error) {
"WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema');"
rows, err := c.client.Query(query)
if err != nil {

return nil, err
}
defer rows.Close()
Expand All @@ -383,6 +391,7 @@ func (c *mySQLClient) getTableIoWaitsStats() ([]TableIoWaitsStats, error) {
&s.countDelete, &s.countFetch, &s.countInsert, &s.countUpdate,
&s.timeDelete, &s.timeFetch, &s.timeInsert, &s.timeUpdate)
if err != nil {

return nil, err
}
stats = append(stats, s)
Expand All @@ -401,6 +410,7 @@ func (c *mySQLClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) {

rows, err := c.client.Query(query)
if err != nil {

return nil, err
}
defer rows.Close()
Expand All @@ -411,6 +421,7 @@ func (c *mySQLClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) {
&s.countDelete, &s.countFetch, &s.countInsert, &s.countUpdate,
&s.timeDelete, &s.timeFetch, &s.timeInsert, &s.timeUpdate)
if err != nil {

return nil, err
}
stats = append(stats, s)
Expand All @@ -420,19 +431,32 @@ func (c *mySQLClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) {
}

func (c *mySQLClient) getStatementEventsStats() ([]StatementEventStats, error) {
query := fmt.Sprintf("SELECT ifnull(SCHEMA_NAME, 'NONE') as SCHEMA_NAME, DIGEST,"+
"LEFT(DIGEST_TEXT, %d) as DIGEST_TEXT, SUM_TIMER_WAIT, SUM_ERRORS,"+
"SUM_WARNINGS, SUM_ROWS_AFFECTED, SUM_ROWS_SENT, SUM_ROWS_EXAMINED,"+
"SUM_CREATED_TMP_DISK_TABLES, SUM_CREATED_TMP_TABLES, SUM_SORT_MERGE_PASSES,"+
"SUM_SORT_ROWS, SUM_NO_INDEX_USED , COUNT_STAR "+
"FROM performance_schema.events_statements_summary_by_digest "+
"WHERE SCHEMA_NAME NOT IN ('mysql', 'performance_schema', 'information_schema') "+
"AND last_seen > DATE_SUB(NOW(), INTERVAL %d SECOND) "+
"ORDER BY SUM_TIMER_WAIT DESC "+
"LIMIT %d",
c.statementEventsDigestTextLimit,
int64(c.statementEventsTimeLimit.Seconds()),
c.statementEventsLimit)
query := fmt.Sprintf(`
SELECT
IFNULL(SCHEMA_NAME, 'NONE') AS SCHEMA_NAME,
DIGEST,
LEFT(DIGEST_TEXT, %d) AS DIGEST_TEXT,
SUM_TIMER_WAIT,
SUM_ERRORS,
SUM_WARNINGS,
SUM_ROWS_AFFECTED,
SUM_ROWS_SENT,
SUM_ROWS_EXAMINED,
SUM_CREATED_TMP_DISK_TABLES,
SUM_CREATED_TMP_TABLES,
SUM_SORT_MERGE_PASSES,
SUM_SORT_ROWS,
SUM_NO_INDEX_USED,
COUNT_STAR
FROM
performance_schema.events_statements_summary_by_digest
WHERE
SCHEMA_NAME NOT IN ('performance_schema', 'information_schema')
AND last_seen > DATE_SUB(NOW(), INTERVAL %d SECOND)
ORDER BY
SUM_TIMER_WAIT DESC
LIMIT %d;
`, c.statementEventsDigestTextLimit, int64(c.statementEventsTimeLimit.Seconds()), c.statementEventsLimit)

rows, err := c.client.Query(query)
if err != nil {
Expand Down Expand Up @@ -462,6 +486,7 @@ func (c *mySQLClient) getStatementEventsStats() ([]StatementEventStats, error) {
&s.countStar,
)
if err != nil {

return nil, err
}
stats = append(stats, s)
Expand All @@ -474,6 +499,7 @@ func (c *mySQLClient) getTotalErrors() (int64, error) {

rows, err := c.client.Query(query)
if err != nil {

return -1, err
}

Expand All @@ -484,6 +510,7 @@ func (c *mySQLClient) getTotalErrors() (int64, error) {

err := rows.Scan(&ec)
if err != nil {

return -1, err
}
nerrors += ec
Expand All @@ -505,6 +532,7 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e

rows, err := c.client.Query(query)
if err != nil {

return nil, err
}
defer rows.Close()
Expand All @@ -518,6 +546,7 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e
&s.sumTimerReadNormal, &s.sumTimerReadWithSharedLocks, &s.sumTimerReadHighPriority, &s.sumTimerReadNoInsert, &s.sumTimerReadExternal,
&s.sumTimerWriteAllowWrite, &s.sumTimerWriteConcurrentInsert, &s.sumTimerWriteLowPriority, &s.sumTimerWriteNormal, &s.sumTimerWriteExternal)
if err != nil {

return nil, err
}
stats = append(stats, s)
Expand All @@ -529,23 +558,27 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e
func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
version, err := c.getVersion()
if err != nil {

return nil, err
}

if version < "8.0.22" {

return nil, nil
}

query := "SHOW REPLICA STATUS"
rows, err := c.client.Query(query)

if err != nil {

return nil, err
}

defer rows.Close()
cols, err := rows.Columns()
if err != nil {

return nil, err
}

Expand Down Expand Up @@ -682,6 +715,7 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
err := rows.Scan(dest...)

if err != nil {

return nil, err
}
stats = append(stats, s)
Expand Down
5 changes: 2 additions & 3 deletions receiver/mysqlreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package mysqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mysqlreceiver"

import (
"math"
"time"

"go.opentelemetry.io/collector/config/confignet"
Expand All @@ -17,9 +16,9 @@ import (
)

const (
defaultStatementEventsDigestTextLimit = 120
defaultStatementEventsDigestTextLimit = 1024
defaultStatementEventsLimit = 250
defaultStatementEventsTimeLimit = time.Duration(math.MaxInt64)
defaultStatementEventsTimeLimit = 24 * time.Hour
)

type Config struct {
Expand Down
21 changes: 15 additions & 6 deletions receiver/mysqlreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,9 @@ func (m *mySQLScraper) scrapeTotalRows(now pcommon.Timestamp, errs *scrapererror
return
}
for _, r := range nrows {
m.mb.RecordMysqlTotalRowsDataPoint(now, r.totalRows, r.dbname)
if r.totalRows.Valid {
m.mb.RecordMysqlTotalRowsDataPoint(now, r.totalRows.Int64, r.dbname)
}
}
}

Expand Down Expand Up @@ -481,10 +483,18 @@ func (m *mySQLScraper) scrapeTableStats(now pcommon.Timestamp, errs *scrapererro
for i := 0; i < len(tableStats); i++ {
s := tableStats[i]
// counts
m.mb.RecordMysqlTableRowsDataPoint(now, s.rows, s.name, s.schema)
m.mb.RecordMysqlTableAverageRowLengthDataPoint(now, s.averageRowLength, s.name, s.schema)
m.mb.RecordMysqlTableSizeDataPoint(now, s.dataLength, s.name, s.schema, metadata.AttributeTableSizeTypeData)
m.mb.RecordMysqlTableSizeDataPoint(now, s.indexLength, s.name, s.schema, metadata.AttributeTableSizeTypeIndex)
if s.rows.Valid {
m.mb.RecordMysqlTableRowsDataPoint(now, s.rows.Int64, s.name, s.schema)
}
if s.averageRowLength.Valid {
m.mb.RecordMysqlTableAverageRowLengthDataPoint(now, s.averageRowLength.Int64, s.name, s.schema)
}
if s.dataLength.Valid {
m.mb.RecordMysqlTableSizeDataPoint(now, s.dataLength.Int64, s.name, s.schema, metadata.AttributeTableSizeTypeData)
}
if s.indexLength.Valid {
m.mb.RecordMysqlTableSizeDataPoint(now, s.indexLength.Int64, s.name, s.schema, metadata.AttributeTableSizeTypeIndex)
}
}
}

Expand Down Expand Up @@ -580,7 +590,6 @@ func (m *mySQLScraper) scrapeStatementEventsStats(now pcommon.Timestamp, errs *s

func (m *mySQLScraper) scrapeTotalErrors(now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) {
totalErrors, err := m.sqlclient.getTotalErrors()

if err != nil {
m.logger.Error("Failed to fetch total errors ", zap.Error(err))
errs.AddPartial(1, err)
Expand Down
28 changes: 20 additions & 8 deletions receiver/mysqlreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,7 @@ func (c *mockClient) getTotalRows() ([]NRows, error) {
fmt.Println(text[0])
fmt.Println(text[1])
s.dbname = text[0]
s.totalRows, err = strconv.ParseInt(text[1], 10, 64)
if err != nil {
return nil, err
}
s.totalRows = parseNullInt64(text[1])
stats = append(stats, s)
}

Expand All @@ -309,10 +306,14 @@ func (c *mockClient) getTableStats() ([]TableStats, error) {
text := strings.Split(scanner.Text(), "\t")
s.schema = text[0]
s.name = text[1]
s.rows, _ = parseInt(text[2])
s.averageRowLength, _ = parseInt(text[3])
s.dataLength, _ = parseInt(text[4])
s.indexLength, _ = parseInt(text[5])
s.rows = parseNullInt64(text[2])
s.averageRowLength = parseNullInt64(text[3])
s.dataLength = parseNullInt64(text[4])
s.indexLength = parseNullInt64(text[5])
// s.rows, _ = parseInt(text[2])
// s.averageRowLength, _ = parseInt(text[3])
// s.dataLength, _ = parseInt(text[4])
// s.indexLength, _ = parseInt(text[5])

stats = append(stats, s)
}
Expand Down Expand Up @@ -538,3 +539,14 @@ func (c *mockClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
func (c *mockClient) Close() error {
return nil
}

func parseNullInt64(value string) sql.NullInt64 {
if value == "" {
return sql.NullInt64{Int64: 0, Valid: false}
}
i, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return sql.NullInt64{Int64: 0, Valid: false}
}
return sql.NullInt64{Int64: i, Valid: true}
}

0 comments on commit ed0e43a

Please sign in to comment.