Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for iterating over query results #132

Merged
merged 6 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ jobs:
startup_sql:
- 'SET lock_timeout = 1000'
- 'SET idle_in_transaction_session_timeout = 100'
# iterator is an optional mechanism to iterate over a series of values, e.g. multiple databases
iterator:
# sql is the SQL to execute to retrieve the list of values to iterate over -
# query result must be a single column
sql: 'SELECT database_name FROM databases'
# placeholder should be present in the original query and not also used as an environment variable
# e.g. {{PLACEHOLDER}} - it will be replaced by the values retrieved by the query
placeholder: PLACEHOLDER
# label is the label name to which the iterator value gets assigned
label: database
# queries is a map of Metric/Query mappings
queries:
# name is prefixed with sql_ and used as the metric name
Expand Down
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type Job struct {
Connections []string `yaml:"connections"`
Queries []*Query `yaml:"queries"`
StartupSQL []string `yaml:"startup_sql"` // SQL executed on startup
Iterator Iterator `yaml:"iterator"` // Iterator configuration
}

type connection struct {
Expand All @@ -162,6 +163,7 @@ type connection struct {
database string
user string
tokenExpirationTime time.Time
iteratorValues []string
}

// Query is an SQL query that is executed on a connection
Expand All @@ -180,3 +182,10 @@ type Query struct {
Query string `yaml:"query"` // a literal query
QueryRef string `yaml:"query_ref"` // references a query in the query map
}

// Iterator is a mechanism to repeat queries from a job based on the results of another query
type Iterator struct {
SQL string `yaml:"sql"` // SQL to execute to retrieve iterator values
Placeholder string `yaml:"placeholder"` // Placeholder in query to be replaced
Label string `yaml:"label"` // Label to assign iterator values to
}
56 changes: 51 additions & 5 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ func (j *Job) Init(logger log.Logger, queries map[string]string) error {
// try to satisfy prometheus naming restrictions
name := MetricNameRE.ReplaceAllString("sql_"+q.Name, "")
help := q.Help

// append the iterator label if it is set
if j.Iterator.Label != "" {
q.Labels = append(q.Labels, j.Iterator.Label)
}

// prepare a new metrics descriptor
//
// the tricky part here is that the *order* of labels has to match the
Expand Down Expand Up @@ -428,6 +434,36 @@ func (j *Job) runOnceConnection(conn *connection, done chan int) {
return
}

// execute iterator SQL
if j.Iterator.SQL != "" {
level.Debug(j.log).Log("msg", "IteratorSQL", "Query:", j.Iterator.SQL)
rows, err := conn.conn.Queryx(j.Iterator.SQL)
if err != nil {
level.Warn(j.log).Log("msg", "Failed to run iterator query", "err", err, "host", conn.host)
j.markFailed(conn)
// we don't have the query name yet.
failedQueryCounter.WithLabelValues(j.Name, "").Inc()
return
}

defer rows.Close()

var ivs []string
for rows.Next() {
var value string
err := rows.Scan(&value)
if err != nil {
level.Warn(j.log).Log("msg", "Failed to read iterator values", "err", err, "host", conn.host)
j.markFailed(conn)
// we don't have the query name yet.
failedQueryCounter.WithLabelValues(j.Name, "").Inc()
return
}
ivs = append(ivs, value)
}
conn.iteratorValues = ivs
}

for _, q := range j.Queries {
if q == nil {
continue
Expand All @@ -437,11 +473,21 @@ func (j *Job) runOnceConnection(conn *connection, done chan int) {
level.Warn(q.log).Log("msg", "Skipping query. Collector is nil")
continue
}
level.Debug(q.log).Log("msg", "Running Query")
// execute the query on the connection
if err := q.Run(conn); err != nil {
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
continue
// repeat query with iterator values if set and the query has the iterator placeholder
if conn.iteratorValues != nil && q.HasIterator(j.Iterator.Placeholder) {
level.Debug(q.log).Log("msg", "Running Iterator Query")
// execute the query with iterator on the connection
if err := q.RunIterator(conn, j.Iterator.Placeholder, conn.iteratorValues, j.Iterator.Label); err != nil {
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
continue
}
} else {
level.Debug(q.log).Log("msg", "Running Query")
// execute the query on the connection
if err := q.Run(conn); err != nil {
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
continue
}
}
level.Debug(q.log).Log("msg", "Query finished")
updated++
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
prom_collectors_version "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
_ "go.uber.org/automaxprocs"
)

Expand Down
97 changes: 93 additions & 4 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (q *Query) Run(conn *connection) error {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
continue
}
m, err := q.updateMetrics(conn, res)
m, err := q.updateMetrics(conn, res, "", "")
if err != nil {
level.Error(q.log).Log("msg", "Failed to update metrics", "err", err, "host", conn.host, "db", conn.database)
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
Expand All @@ -77,8 +78,90 @@ func (q *Query) Run(conn *connection) error {
return nil
}

// RunIterator runs the query for each iterator value on a single connection
func (q *Query) RunIterator(conn *connection, ph string, ivs []string, il string) error {
if q.log == nil {
q.log = log.NewNopLogger()
}
queryCounter.WithLabelValues(q.jobName, q.Name).Inc()
if q.desc == nil {
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
return fmt.Errorf("metrics descriptor is nil")
}
if q.Query == "" {
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
return fmt.Errorf("query is empty")
}
if conn == nil || conn.conn == nil {
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
return fmt.Errorf("db connection not initialized (should not happen)")
}

// execute query for each iterator value
now := time.Now()
metrics := make([]prometheus.Metric, 0, len(q.metrics))
updated := 0
for _, iv := range ivs {
rows, err := conn.conn.Queryx(q.ReplaceIterator(ph, iv))
if err != nil {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
return err
}
defer rows.Close()

for rows.Next() {
res := make(map[string]interface{})
err := rows.MapScan(res)
if err != nil {
level.Error(q.log).Log("msg", "Failed to scan", "err", err, "host", conn.host, "db", conn.database)
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
continue
}
m, err := q.updateMetrics(conn, res, iv, il)
if err != nil {
level.Error(q.log).Log("msg", "Failed to update metrics", "err", err, "host", conn.host, "db", conn.database)
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
continue
}
metrics = append(metrics, m...)
updated++
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(0.0)
}
}

duration := time.Since(now)
queryDurationHistogram.WithLabelValues(q.jobName, q.Name).Observe(duration.Seconds())

if updated < 1 {
if q.AllowZeroRows {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(0.0)
} else {
return fmt.Errorf("zero rows returned")
}
}

// update the metrics cache
q.Lock()
q.metrics[conn] = metrics
q.Unlock()

return nil
}

// HasIterator returns true if the query contains the given placeholder
func (q *Query) HasIterator(ph string) bool {
return strings.Contains(q.Query, ph)
}

// ReplaceIterator replaces a given placeholder with an iterator value and returns a new query
func (q *Query) ReplaceIterator(ph string, iv string) string {
iteratorReplacer := strings.NewReplacer(fmt.Sprint("{{", ph, "}}"), iv)
return iteratorReplacer.Replace(q.Query)
}

// updateMetrics parses the result set and returns a slice of const metrics
func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]prometheus.Metric, error) {
func (q *Query) updateMetrics(conn *connection, res map[string]interface{}, iv string, il string) ([]prometheus.Metric, error) {
// if no value were defined to be parsed, return immediately
if len(q.Values) == 0 {
level.Debug(q.log).Log("msg", "No values defined in configuration, skipping metric update")
Expand All @@ -87,7 +170,7 @@ func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]p
updated := 0
metrics := make([]prometheus.Metric, 0, len(q.Values))
for _, valueName := range q.Values {
m, err := q.updateMetric(conn, res, valueName)
m, err := q.updateMetric(conn, res, valueName, iv, il)
if err != nil {
level.Error(q.log).Log(
"msg", "Failed to update metric",
Expand All @@ -108,7 +191,7 @@ func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]p
}

// updateMetrics parses a single row and returns a const metric
func (q *Query) updateMetric(conn *connection, res map[string]interface{}, valueName string) (prometheus.Metric, error) {
func (q *Query) updateMetric(conn *connection, res map[string]interface{}, valueName string, iv string, il string) (prometheus.Metric, error) {
var value float64
if i, ok := res[valueName]; ok {
switch f := i.(type) {
Expand Down Expand Up @@ -154,6 +237,12 @@ func (q *Query) updateMetric(conn *connection, res map[string]interface{}, value
// added below
labels := make([]string, 0, len(q.Labels)+5)
for _, label := range q.Labels {
// append iterator value to the labels
if label == il && iv != "" {
labels = append(labels, iv)
continue
}

// we need to fill every spot in the slice or the key->value mapping
// won't match up in the end.
//
Expand Down
Loading