Skip to content

Commit

Permalink
Merge pull request #23 from everesio/label_dimensions_inconsistent
Browse files Browse the repository at this point in the history
Fill missing metrics labels with empty string to avoid label dimensions inconsistent failure
  • Loading branch information
frodenas authored Aug 23, 2018
2 parents de048a0 + 0d41153 commit 49c2fbd
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 28 deletions.
32 changes: 32 additions & 0 deletions collectors/fnv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package collectors

const separatorByte = 255

// https://github.com/prometheus/client_golang/blob/master/prometheus/fnv.go
// Inline and byte-free variant of hash/fnv's fnv64a.

const (
offset64 = 14695981039346656037
prime64 = 1099511628211
)

// hashNew initializies a new fnv64a hash value.
func hashNew() uint64 {
return offset64
}

// hashAdd adds a string to a fnv64a hash value, returning the updated hash.
func hashAdd(h uint64, s string) uint64 {
for i := 0; i < len(s); i++ {
h ^= uint64(s[i])
h *= prime64
}
return h
}

// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash.
func hashAddByte(h uint64, b byte) uint64 {
h ^= uint64(b)
h *= prime64
return h
}
40 changes: 13 additions & 27 deletions collectors/monitoring_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type MonitoringCollector struct {
lastScrapeErrorMetric prometheus.Gauge
lastScrapeTimestampMetric prometheus.Gauge
lastScrapeDurationSecondsMetric prometheus.Gauge
collectorFillMissingLabels bool
}

func NewMonitoringCollector(
Expand All @@ -35,6 +36,7 @@ func NewMonitoringCollector(
metricsInterval time.Duration,
metricsOffset time.Duration,
monitoringService *monitoring.Service,
collectorFillMissingLabels bool,
) (*MonitoringCollector, error) {
apiCallsTotalMetric := prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -108,6 +110,7 @@ func NewMonitoringCollector(
lastScrapeErrorMetric: lastScrapeErrorMetric,
lastScrapeTimestampMetric: lastScrapeTimestampMetric,
lastScrapeDurationSecondsMetric: lastScrapeDurationSecondsMetric,
collectorFillMissingLabels: collectorFillMissingLabels,
}

return monitoringCollector, nil
Expand Down Expand Up @@ -231,8 +234,14 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics(
var metricValue float64
var metricValueType prometheus.ValueType
var newestTSPoint *monitoring.Point
var metricDesc *prometheus.Desc

timeSeriesMetrics := &TimeSeriesMetrics{
metricDescriptor: metricDescriptor,
ch: ch,
fillMissingLabels: c.collectorFillMissingLabels,
constMetrics: make(map[string][]ConstMetric),
histogramMetrics: make(map[string][]HistogramMetric),
}
for _, timeSeries := range page.TimeSeries {
newestEndTime := time.Unix(0, 0)
for _, point := range timeSeries.Points {
Expand All @@ -245,7 +254,6 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics(
newestTSPoint = point
}
}

labelKeys := []string{"unit"}
labelValues := []string{metricDescriptor.Unit}

Expand All @@ -263,17 +271,6 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics(
labelValues = append(labelValues, value)
}

// The metric name to report is composed by the 3 parts:
// 1. namespace is a constant prefix (stackdriver)
// 2. subsystem is the monitored resource type (ie gce_instance)
// 3. name is the metric type (ie compute.googleapis.com/instance/cpu/usage_time)
metricDesc = prometheus.NewDesc(
prometheus.BuildFQName("stackdriver", utils.NormalizeMetricName(timeSeries.Resource.Type), utils.NormalizeMetricName(timeSeries.Metric.Type)),
metricDescriptor.Description,
labelKeys,
prometheus.Labels{},
)

switch timeSeries.MetricKind {
case "GAUGE":
metricValueType = prometheus.GaugeValue
Expand All @@ -299,13 +296,7 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics(
dist := newestTSPoint.Value.DistributionValue
buckets, err := c.generateHistogramBuckets(dist)
if err == nil {
ch <- prometheus.MustNewConstHistogram(
metricDesc,
uint64(dist.Count),
dist.Mean*float64(dist.Count), // Stackdriver does not provide the sum, but we can fake it
buckets,
labelValues...,
)
timeSeriesMetrics.CollectNewConstHistogram(timeSeries, labelKeys, dist, buckets, labelValues)
} else {
log.Debugf("Discarding resource %s metric %s: %s", timeSeries.Resource.Type, timeSeries.Metric.Type, err)
}
Expand All @@ -315,14 +306,9 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics(
continue
}

ch <- prometheus.MustNewConstMetric(
metricDesc,
metricValueType,
metricValue,
labelValues...,
)
timeSeriesMetrics.CollectNewConstMetric(timeSeries, labelKeys, metricValueType, metricValue, labelValues)
}

timeSeriesMetrics.Complete()
return nil
}

Expand Down
230 changes: 230 additions & 0 deletions collectors/monitoring_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package collectors

import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/api/monitoring/v3"

"github.com/frodenas/stackdriver_exporter/utils"
"sort"
)

func buildFQName(timeSeries *monitoring.TimeSeries) string {
// The metric name to report is composed by the 3 parts:
// 1. namespace is a constant prefix (stackdriver)
// 2. subsystem is the monitored resource type (ie gce_instance)
// 3. name is the metric type (ie compute.googleapis.com/instance/cpu/usage_time)
return prometheus.BuildFQName("stackdriver", utils.NormalizeMetricName(timeSeries.Resource.Type), utils.NormalizeMetricName(timeSeries.Metric.Type))
}

type TimeSeriesMetrics struct {
metricDescriptor *monitoring.MetricDescriptor
ch chan<- prometheus.Metric

fillMissingLabels bool
constMetrics map[string][]ConstMetric
histogramMetrics map[string][]HistogramMetric
}

func (t *TimeSeriesMetrics) newMetricDesc(fqName string, labelKeys []string) *prometheus.Desc {
return prometheus.NewDesc(
fqName,
t.metricDescriptor.Description,
labelKeys,
prometheus.Labels{},
)
}

type ConstMetric struct {
fqName string
labelKeys []string
valueType prometheus.ValueType
value float64
labelValues []string

keysHash uint64
}

type HistogramMetric struct {
fqName string
labelKeys []string
dist *monitoring.Distribution
buckets map[float64]uint64
labelValues []string

keysHash uint64
}

func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.TimeSeries, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) {
fqName := buildFQName(timeSeries)

if t.fillMissingLabels {
vs, ok := t.histogramMetrics[fqName]
if !ok {
vs = make([]HistogramMetric, 0)
}
v := HistogramMetric{
fqName: fqName,
labelKeys: labelKeys,
dist: dist,
buckets: buckets,
labelValues: labelValues,

keysHash: hashLabelKeys(labelKeys),
}
t.histogramMetrics[fqName] = append(vs, v)
return
}
t.ch <- t.newConstHistogram(fqName, labelKeys, dist, buckets, labelValues)
}

func (t *TimeSeriesMetrics) newConstHistogram(fqName string, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric {
return prometheus.MustNewConstHistogram(
t.newMetricDesc(fqName, labelKeys),
uint64(dist.Count),
dist.Mean*float64(dist.Count), // Stackdriver does not provide the sum, but we can fake it
buckets,
labelValues...,
)
}

func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSeries, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) {
fqName := buildFQName(timeSeries)

if t.fillMissingLabels {
vs, ok := t.constMetrics[fqName]
if !ok {
vs = make([]ConstMetric, 0)
}
v := ConstMetric{
fqName: fqName,
labelKeys: labelKeys,
valueType: metricValueType,
value: metricValue,
labelValues: labelValues,

keysHash: hashLabelKeys(labelKeys),
}
t.constMetrics[fqName] = append(vs, v)
return
}
t.ch <- t.newConstMetric(fqName, labelKeys, metricValueType, metricValue, labelValues)
}

func (t *TimeSeriesMetrics) newConstMetric(fqName string, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) prometheus.Metric {
return prometheus.MustNewConstMetric(
t.newMetricDesc(fqName, labelKeys),
metricValueType,
metricValue,
labelValues...,
)
}

func hashLabelKeys(labelKeys []string) uint64 {
dh := hashNew()
sortedKeys := make([]string, len(labelKeys))
copy(sortedKeys, labelKeys)
sort.Strings(sortedKeys)
for _, key := range sortedKeys {
dh = hashAdd(dh, key)
dh = hashAddByte(dh, separatorByte)
}
return dh
}

func (t *TimeSeriesMetrics) Complete() {
t.completeConstMetrics()
t.completeHistogramMetrics()
}

func (t *TimeSeriesMetrics) completeConstMetrics() {
for _, vs := range t.constMetrics {
if len(vs) > 1 {
var needFill bool
for i := 1; i < len(vs); i++ {
if vs[0].keysHash != vs[i].keysHash {
needFill = true
}
}
if needFill {
vs = fillConstMetricsLabels(vs)
}
}

for _, v := range vs {
t.ch <- t.newConstMetric(v.fqName, v.labelKeys, v.valueType, v.value, v.labelValues)
}
}
}

func (t *TimeSeriesMetrics) completeHistogramMetrics() {
for _, vs := range t.histogramMetrics {
if len(vs) > 1 {
var needFill bool
for i := 1; i < len(vs); i++ {
if vs[0].keysHash != vs[i].keysHash {
needFill = true
}
}
if needFill {
vs = fillHistogramMetricsLabels(vs)
}
}
for _, v := range vs {
t.ch <- t.newConstHistogram(v.fqName, v.labelKeys, v.dist, v.buckets, v.labelValues)
}
}
}

func fillConstMetricsLabels(metrics []ConstMetric) []ConstMetric {
allKeys := make(map[string]struct{})
for _, metric := range metrics {
for _, key := range metric.labelKeys {
allKeys[key] = struct{}{}
}
}
result := make([]ConstMetric, len(metrics))
for i, metric := range metrics {
if len(metric.labelKeys) != len(allKeys) {
metricKeys := make(map[string]struct{})
for _, key := range metric.labelKeys {
metricKeys[key] = struct{}{}
}
for key := range allKeys {
if _, ok := metricKeys[key]; !ok {
metric.labelKeys = append(metric.labelKeys, key)
metric.labelValues = append(metric.labelValues, "")
}
}
}
result[i] = metric
}

return result
}

func fillHistogramMetricsLabels(metrics []HistogramMetric) []HistogramMetric {
allKeys := make(map[string]struct{})
for _, metric := range metrics {
for _, key := range metric.labelKeys {
allKeys[key] = struct{}{}
}
}
result := make([]HistogramMetric, len(metrics))
for i, metric := range metrics {
if len(metric.labelKeys) != len(allKeys) {
metricKeys := make(map[string]struct{})
for _, key := range metric.labelKeys {
metricKeys[key] = struct{}{}
}
for key := range allKeys {
if _, ok := metricKeys[key]; !ok {
metric.labelKeys = append(metric.labelKeys, key)
metric.labelValues = append(metric.labelValues, "")
}
}
}
result[i] = metric
}

return result
}
6 changes: 5 additions & 1 deletion stackdriver_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ var (
metricsPath = kingpin.Flag(
"web.telemetry-path", "Path under which to expose Prometheus metrics ($STACKDRIVER_EXPORTER_WEB_TELEMETRY_PATH).",
).Envar("STACKDRIVER_EXPORTER_WEB_TELEMETRY_PATH").Default("/metrics").String()

collectorFillMissingLabels = kingpin.Flag(
"collector.fill-missing-labels", "Fill missing metrics labels with empty string to avoid label dimensions inconsistent failure ($STACKDRIVER_EXPORTER_COLLECTOR_FILL_MISSING_LABELS).",
).Envar("STACKDRIVER_EXPORTER_COLLECTOR_FILL_MISSING_LABELS").Default("true").Bool()
)

func init() {
Expand Down Expand Up @@ -89,7 +93,7 @@ func main() {
os.Exit(1)
}

monitoringCollector, err := collectors.NewMonitoringCollector(*projectID, metricsTypePrefixes, *monitoringMetricsInterval, *monitoringMetricsOffset, monitoringService)
monitoringCollector, err := collectors.NewMonitoringCollector(*projectID, metricsTypePrefixes, *monitoringMetricsInterval, *monitoringMetricsOffset, monitoringService, *collectorFillMissingLabels)
if err != nil {
log.Error(err)
os.Exit(1)
Expand Down

0 comments on commit 49c2fbd

Please sign in to comment.