From ed0e43a5c1db1682f5ea7ab3ca41b9d639340160 Mon Sep 17 00:00:00 2001 From: Naman Vyas <165130700+naman47vyas@users.noreply.github.com> Date: Thu, 18 Jul 2024 10:34:01 +0530 Subject: [PATCH] Mariadb/extending mysql for mariadb (#88) --- receiver/mysqlreceiver/client.go | 80 ++++++++++++++++++-------- receiver/mysqlreceiver/config.go | 5 +- receiver/mysqlreceiver/scraper.go | 21 +++++-- receiver/mysqlreceiver/scraper_test.go | 28 ++++++--- 4 files changed, 94 insertions(+), 40 deletions(-) diff --git a/receiver/mysqlreceiver/client.go b/receiver/mysqlreceiver/client.go index 05266eee0b42..845c279aeb10 100644 --- a/receiver/mysqlreceiver/client.go +++ b/receiver/mysqlreceiver/client.go @@ -65,10 +65,10 @@ type IndexIoWaitsStats struct { type TableStats struct { schema string name string - rows int64 - averageRowLength int64 - dataLength int64 - indexLength int64 + rows sql.NullInt64 + averageRowLength sql.NullInt64 + dataLength sql.NullInt64 + indexLength sql.NullInt64 } type StatementEventStats struct { @@ -230,7 +230,6 @@ func (c *mySQLClient) getVersion() (string, error) { if err != nil { return "", err } - return version, nil } @@ -247,7 +246,6 @@ func (c *mySQLClient) getInnodbStats() (map[string]string, error) { } func (c *mySQLClient) getInnodbStatusStats() (map[string]int64, error, int) { - /* RETURNS: map[string]int64 : @@ -282,6 +280,7 @@ func (c *mySQLClient) getInnodbStatusStats() (map[string]int64, error, int) { // TODO: Suggest better value if there's an error for the metric. if mysqlErr != nil { + err := fmt.Errorf("error querying the mysql db for innodb status %v", mysqlErr) return nil, err, 0 } @@ -301,6 +300,7 @@ func (c *mySQLClient) getInnodbStatusStats() (map[string]int64, error, int) { var parserErrs error parserErrs = nil if total_errs > 0 { + errorString := flattenErrorMap(errs) parserErrs = fmt.Errorf(errorString) } @@ -310,7 +310,7 @@ func (c *mySQLClient) getInnodbStatusStats() (map[string]int64, error, int) { type NRows struct { dbname string - totalRows int64 + totalRows sql.NullInt64 } func (c *mySQLClient) getTotalRows() ([]NRows, error) { @@ -331,6 +331,7 @@ func (c *mySQLClient) getTotalRows() ([]NRows, error) { var r NRows err := rows.Scan(&r.dbname, &r.totalRows) if err != nil { + return nil, err } nr = append(nr, r) @@ -352,10 +353,16 @@ func (c *mySQLClient) getTableStats() ([]TableStats, error) { var stats []TableStats for rows.Next() { var s TableStats - err := rows.Scan(&s.schema, &s.name, - &s.rows, &s.averageRowLength, - &s.dataLength, &s.indexLength) + err := rows.Scan( + &s.schema, + &s.name, + &s.rows, + &s.averageRowLength, + &s.dataLength, + &s.indexLength, + ) if err != nil { + return nil, err } stats = append(stats, s) @@ -373,6 +380,7 @@ func (c *mySQLClient) getTableIoWaitsStats() ([]TableIoWaitsStats, error) { "WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema');" rows, err := c.client.Query(query) if err != nil { + return nil, err } defer rows.Close() @@ -383,6 +391,7 @@ func (c *mySQLClient) getTableIoWaitsStats() ([]TableIoWaitsStats, error) { &s.countDelete, &s.countFetch, &s.countInsert, &s.countUpdate, &s.timeDelete, &s.timeFetch, &s.timeInsert, &s.timeUpdate) if err != nil { + return nil, err } stats = append(stats, s) @@ -401,6 +410,7 @@ func (c *mySQLClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) { rows, err := c.client.Query(query) if err != nil { + return nil, err } defer rows.Close() @@ -411,6 +421,7 @@ func (c *mySQLClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) { &s.countDelete, &s.countFetch, &s.countInsert, &s.countUpdate, &s.timeDelete, &s.timeFetch, &s.timeInsert, &s.timeUpdate) if err != nil { + return nil, err } stats = append(stats, s) @@ -420,19 +431,32 @@ func (c *mySQLClient) getIndexIoWaitsStats() ([]IndexIoWaitsStats, error) { } func (c *mySQLClient) getStatementEventsStats() ([]StatementEventStats, error) { - query := fmt.Sprintf("SELECT ifnull(SCHEMA_NAME, 'NONE') as SCHEMA_NAME, DIGEST,"+ - "LEFT(DIGEST_TEXT, %d) as DIGEST_TEXT, SUM_TIMER_WAIT, SUM_ERRORS,"+ - "SUM_WARNINGS, SUM_ROWS_AFFECTED, SUM_ROWS_SENT, SUM_ROWS_EXAMINED,"+ - "SUM_CREATED_TMP_DISK_TABLES, SUM_CREATED_TMP_TABLES, SUM_SORT_MERGE_PASSES,"+ - "SUM_SORT_ROWS, SUM_NO_INDEX_USED , COUNT_STAR "+ - "FROM performance_schema.events_statements_summary_by_digest "+ - "WHERE SCHEMA_NAME NOT IN ('mysql', 'performance_schema', 'information_schema') "+ - "AND last_seen > DATE_SUB(NOW(), INTERVAL %d SECOND) "+ - "ORDER BY SUM_TIMER_WAIT DESC "+ - "LIMIT %d", - c.statementEventsDigestTextLimit, - int64(c.statementEventsTimeLimit.Seconds()), - c.statementEventsLimit) + query := fmt.Sprintf(` + SELECT + IFNULL(SCHEMA_NAME, 'NONE') AS SCHEMA_NAME, + DIGEST, + LEFT(DIGEST_TEXT, %d) AS DIGEST_TEXT, + SUM_TIMER_WAIT, + SUM_ERRORS, + SUM_WARNINGS, + SUM_ROWS_AFFECTED, + SUM_ROWS_SENT, + SUM_ROWS_EXAMINED, + SUM_CREATED_TMP_DISK_TABLES, + SUM_CREATED_TMP_TABLES, + SUM_SORT_MERGE_PASSES, + SUM_SORT_ROWS, + SUM_NO_INDEX_USED, + COUNT_STAR + FROM + performance_schema.events_statements_summary_by_digest + WHERE + SCHEMA_NAME NOT IN ('performance_schema', 'information_schema') + AND last_seen > DATE_SUB(NOW(), INTERVAL %d SECOND) + ORDER BY + SUM_TIMER_WAIT DESC + LIMIT %d; + `, c.statementEventsDigestTextLimit, int64(c.statementEventsTimeLimit.Seconds()), c.statementEventsLimit) rows, err := c.client.Query(query) if err != nil { @@ -462,6 +486,7 @@ func (c *mySQLClient) getStatementEventsStats() ([]StatementEventStats, error) { &s.countStar, ) if err != nil { + return nil, err } stats = append(stats, s) @@ -474,6 +499,7 @@ func (c *mySQLClient) getTotalErrors() (int64, error) { rows, err := c.client.Query(query) if err != nil { + return -1, err } @@ -484,6 +510,7 @@ func (c *mySQLClient) getTotalErrors() (int64, error) { err := rows.Scan(&ec) if err != nil { + return -1, err } nerrors += ec @@ -505,6 +532,7 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e rows, err := c.client.Query(query) if err != nil { + return nil, err } defer rows.Close() @@ -518,6 +546,7 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e &s.sumTimerReadNormal, &s.sumTimerReadWithSharedLocks, &s.sumTimerReadHighPriority, &s.sumTimerReadNoInsert, &s.sumTimerReadExternal, &s.sumTimerWriteAllowWrite, &s.sumTimerWriteConcurrentInsert, &s.sumTimerWriteLowPriority, &s.sumTimerWriteNormal, &s.sumTimerWriteExternal) if err != nil { + return nil, err } stats = append(stats, s) @@ -529,10 +558,12 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { version, err := c.getVersion() if err != nil { + return nil, err } if version < "8.0.22" { + return nil, nil } @@ -540,12 +571,14 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { rows, err := c.client.Query(query) if err != nil { + return nil, err } defer rows.Close() cols, err := rows.Columns() if err != nil { + return nil, err } @@ -682,6 +715,7 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { err := rows.Scan(dest...) if err != nil { + return nil, err } stats = append(stats, s) diff --git a/receiver/mysqlreceiver/config.go b/receiver/mysqlreceiver/config.go index ad7e7a0c8da4..c46317c8b577 100644 --- a/receiver/mysqlreceiver/config.go +++ b/receiver/mysqlreceiver/config.go @@ -4,7 +4,6 @@ package mysqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mysqlreceiver" import ( - "math" "time" "go.opentelemetry.io/collector/config/confignet" @@ -17,9 +16,9 @@ import ( ) const ( - defaultStatementEventsDigestTextLimit = 120 + defaultStatementEventsDigestTextLimit = 1024 defaultStatementEventsLimit = 250 - defaultStatementEventsTimeLimit = time.Duration(math.MaxInt64) + defaultStatementEventsTimeLimit = 24 * time.Hour ) type Config struct { diff --git a/receiver/mysqlreceiver/scraper.go b/receiver/mysqlreceiver/scraper.go index 7df99256d5ee..3ae9412b7e81 100644 --- a/receiver/mysqlreceiver/scraper.go +++ b/receiver/mysqlreceiver/scraper.go @@ -437,7 +437,9 @@ func (m *mySQLScraper) scrapeTotalRows(now pcommon.Timestamp, errs *scrapererror return } for _, r := range nrows { - m.mb.RecordMysqlTotalRowsDataPoint(now, r.totalRows, r.dbname) + if r.totalRows.Valid { + m.mb.RecordMysqlTotalRowsDataPoint(now, r.totalRows.Int64, r.dbname) + } } } @@ -481,10 +483,18 @@ func (m *mySQLScraper) scrapeTableStats(now pcommon.Timestamp, errs *scrapererro for i := 0; i < len(tableStats); i++ { s := tableStats[i] // counts - m.mb.RecordMysqlTableRowsDataPoint(now, s.rows, s.name, s.schema) - m.mb.RecordMysqlTableAverageRowLengthDataPoint(now, s.averageRowLength, s.name, s.schema) - m.mb.RecordMysqlTableSizeDataPoint(now, s.dataLength, s.name, s.schema, metadata.AttributeTableSizeTypeData) - m.mb.RecordMysqlTableSizeDataPoint(now, s.indexLength, s.name, s.schema, metadata.AttributeTableSizeTypeIndex) + if s.rows.Valid { + m.mb.RecordMysqlTableRowsDataPoint(now, s.rows.Int64, s.name, s.schema) + } + if s.averageRowLength.Valid { + m.mb.RecordMysqlTableAverageRowLengthDataPoint(now, s.averageRowLength.Int64, s.name, s.schema) + } + if s.dataLength.Valid { + m.mb.RecordMysqlTableSizeDataPoint(now, s.dataLength.Int64, s.name, s.schema, metadata.AttributeTableSizeTypeData) + } + if s.indexLength.Valid { + m.mb.RecordMysqlTableSizeDataPoint(now, s.indexLength.Int64, s.name, s.schema, metadata.AttributeTableSizeTypeIndex) + } } } @@ -580,7 +590,6 @@ func (m *mySQLScraper) scrapeStatementEventsStats(now pcommon.Timestamp, errs *s func (m *mySQLScraper) scrapeTotalErrors(now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { totalErrors, err := m.sqlclient.getTotalErrors() - if err != nil { m.logger.Error("Failed to fetch total errors ", zap.Error(err)) errs.AddPartial(1, err) diff --git a/receiver/mysqlreceiver/scraper_test.go b/receiver/mysqlreceiver/scraper_test.go index c674906d6138..74e54e9ff850 100644 --- a/receiver/mysqlreceiver/scraper_test.go +++ b/receiver/mysqlreceiver/scraper_test.go @@ -285,10 +285,7 @@ func (c *mockClient) getTotalRows() ([]NRows, error) { fmt.Println(text[0]) fmt.Println(text[1]) s.dbname = text[0] - s.totalRows, err = strconv.ParseInt(text[1], 10, 64) - if err != nil { - return nil, err - } + s.totalRows = parseNullInt64(text[1]) stats = append(stats, s) } @@ -309,10 +306,14 @@ func (c *mockClient) getTableStats() ([]TableStats, error) { text := strings.Split(scanner.Text(), "\t") s.schema = text[0] s.name = text[1] - s.rows, _ = parseInt(text[2]) - s.averageRowLength, _ = parseInt(text[3]) - s.dataLength, _ = parseInt(text[4]) - s.indexLength, _ = parseInt(text[5]) + s.rows = parseNullInt64(text[2]) + s.averageRowLength = parseNullInt64(text[3]) + s.dataLength = parseNullInt64(text[4]) + s.indexLength = parseNullInt64(text[5]) + // s.rows, _ = parseInt(text[2]) + // s.averageRowLength, _ = parseInt(text[3]) + // s.dataLength, _ = parseInt(text[4]) + // s.indexLength, _ = parseInt(text[5]) stats = append(stats, s) } @@ -538,3 +539,14 @@ func (c *mockClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { func (c *mockClient) Close() error { return nil } + +func parseNullInt64(value string) sql.NullInt64 { + if value == "" { + return sql.NullInt64{Int64: 0, Valid: false} + } + i, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return sql.NullInt64{Int64: 0, Valid: false} + } + return sql.NullInt64{Int64: i, Valid: true} +}