Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle overlapping metrics from different jobs in prometheus exporter #1096

Merged
merged 5 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions exporter/prometheusexporter/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package prometheusexporter

import (
"bytes"
"context"
"errors"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
// TODO: once this repository has been transferred to the
// official census-ecosystem location, update this import path.
"github.com/orijtech/prometheus-go-metrics-exporter"
Expand All @@ -42,12 +44,40 @@ func (pe *prometheusExporter) Start(_ context.Context, _ component.Host) error {
}

func (pe *prometheusExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
merged := make(map[string]*metricspb.Metric)
for _, metric := range md.Metrics {
merge(merged, metric)
}
for _, metric := range merged {
_ = pe.exporter.ExportMetric(ctx, md.Node, md.Resource, metric)
}
return nil
}

// The underlying exporter overwrites timeseries when there are conflicting metric signatures.
// Therefore, we need to merge timeseries that share a metric signature into a single metric before sending.
func merge(m map[string]*metricspb.Metric, metric *metricspb.Metric) {
liamawhite marked this conversation as resolved.
Show resolved Hide resolved
key := metricSignature(metric)
current, ok := m[key]
if !ok {
m[key] = metric
return
}
current.Timeseries = append(current.Timeseries, metric.Timeseries...)
}

// Unique identifier of a given promtheus metric
// Assumes label keys are always in the same order
func metricSignature(metric *metricspb.Metric) string {
var buf bytes.Buffer
buf.WriteString(metric.GetMetricDescriptor().GetName())
labelKeys := metric.GetMetricDescriptor().GetLabelKeys()
for _, labelKey := range labelKeys {
buf.WriteString("-" + labelKey.Key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are dashes allowed in metric names and label keys? If yes then this can result in the same metric signature for different metrics. The following 2 different metrics produce the same signature:

  1. Name: abc
    Label key: d-ef
    Signature: abc-d-ef
  2. Name: abc-d
    Label key: ef
    Signature: abc-d-ef

Copy link
Contributor Author

@liamawhite liamawhite Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its underscores only? This signature function is a copy-paste from the exporter. I guess it doesn't hurt to change it though, would you like me to do so?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc seems to indicate dashes aren't allowed in metric names or labels in prometheus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tigrannajaryan I think we're good to merge then 🙂

}
return buf.String()
}

// Shutdown stops the exporter and is invoked during shutdown.
func (pe *prometheusExporter) Shutdown(context.Context) error {
return pe.shutdownFunc()
Expand Down
137 changes: 93 additions & 44 deletions exporter/prometheusexporter/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package prometheusexporter

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
Expand Down Expand Up @@ -87,57 +89,104 @@ func TestPrometheusExporter_endToEnd(t *testing.T) {

assert.NotNil(t, consumer)

var metric1 = &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "this/one/there(where)",
Description: "Extra ones",
Unit: "1",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "os", Description: "Operating system"},
{Key: "arch", Description: "Architecture"},
for delta := 0; delta <= 20; delta += 10 {
consumer.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{Metrics: metricBuilder(int64(delta))})

res, err := http.Get("http://localhost:7777/metrics")
if err != nil {
t.Fatalf("Failed to perform a scrape: %v", err)
}
if g, w := res.StatusCode, 200; g != w {
t.Errorf("Mismatched HTTP response status code: Got: %d Want: %d", g, w)
}
blob, _ := ioutil.ReadAll(res.Body)
_ = res.Body.Close()
want := []string{
`# HELP test_this_one_there_where_ Extra ones`,
`# TYPE test_this_one_there_where_ counter`,
fmt.Sprintf(`test_this_one_there_where_{arch="x86",code="one",foo="bar",os="windows"} %v`, 99+delta),
fmt.Sprintf(`test_this_one_there_where_{arch="x86",code="one",foo="bar",os="linux"} %v`, 100+delta),
}

for _, w := range want {
if !strings.Contains(string(blob), w) {
t.Errorf("Missing %v from response:\n%v", w, string(blob))
}
}
}
}

func metricBuilder(delta int64) []*metricspb.Metric {
return []*metricspb.Metric{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "this/one/there(where)",
Description: "Extra ones",
Unit: "1",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "os", Description: "Operating system"},
{Key: "arch", Description: "Architecture"},
},
},
},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
},
LabelValues: []*metricspb.LabelValue{
{Value: "windows"},
{Value: "x86"},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 99 + delta,
},
},
},
},
LabelValues: []*metricspb.LabelValue{
{Value: "windows"},
{Value: "x86"},
},
},
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "this/one/there(where)",
Description: "Extra ones",
Unit: "1",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "os", Description: "Operating system"},
{Key: "arch", Description: "Architecture"},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 99,
},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
},
LabelValues: []*metricspb.LabelValue{
{Value: "linux"},
{Value: "x86"},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 100 + delta,
},
},
},
},
},
},
}
consumer.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{Metrics: []*metricspb.Metric{metric1}})

res, err := http.Get("http://localhost:7777/metrics")
if err != nil {
t.Fatalf("Failed to perform a scrape: %v", err)
}
if g, w := res.StatusCode, 200; g != w {
t.Errorf("Mismatched HTTP response status code: Got: %d Want: %d", g, w)
}
blob, _ := ioutil.ReadAll(res.Body)
_ = res.Body.Close()
want := `# HELP test_this_one_there_where_ Extra ones
# TYPE test_this_one_there_where_ counter
test_this_one_there_where_{arch="x86",code="one",foo="bar",os="windows"} 99
`
if got := string(blob); got != want {
t.Errorf("Response mismatch\nGot:\n%s\n\nWant:\n%s", got, want)
}
}