Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
Cache seen resources for each export request (#179)
Browse files Browse the repository at this point in the history
* Cache seen resources for each export request

* Make global resource a constant
  • Loading branch information
songy23 authored and bogdandrutu committed Aug 28, 2019
1 parent 8a44032 commit ffafe44
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 18 deletions.
5 changes: 4 additions & 1 deletion equivalence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import (
"google.golang.org/grpc"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"

"github.com/golang/protobuf/ptypes/empty"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)

Expand All @@ -53,6 +55,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {
Unit: "ms",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
}
seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)

// Generate some view.Data and metrics.
var vdl []*view.Data
Expand Down Expand Up @@ -107,7 +110,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {

vdl := []*view.Data{vd}
sctreql := se.makeReq(vdl, maxTimeSeriesPerUpload)
tsl, _ := se.protoMetricToTimeSeries(ctx, nil, nil, metricPbs[i], nil)
tsl, _ := se.protoMetricToTimeSeries(ctx, nil, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i], nil)
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
if diff := cmpTSReqs(pctreql, sctreql); diff != "" {
t.Fatalf("TimeSeries Mismatch -FromMetrics +FromStats: %s", diff)
Expand Down
44 changes: 29 additions & 15 deletions metrics_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
labelpb "google.golang.org/genproto/googleapis/api/label"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
Expand All @@ -49,6 +50,7 @@ var percentileLabelKey = &metricspb.LabelKey{
Key: "percentile",
Description: "the value at a given percentile of a distribution",
}
var globalResource = &resource.Resource{Type: "global"}

type metricProtoPayload struct {
node *commonpb.Node
Expand Down Expand Up @@ -99,6 +101,9 @@ func (se *statsExporter) ExportMetricsProtoSync(ctx context.Context, node *commo
return errNilMetric
}

// Caches the resources seen so far
seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)

additionalLabels := se.defaultLabels
if additionalLabels == nil {
// additionalLabels must be stateless because each node is different
Expand All @@ -112,17 +117,18 @@ func (se *statsExporter) ExportMetricsProtoSync(ctx context.Context, node *commo
var allTss []*monitoringpb.TimeSeries
var allErrs []error
for _, metric := range metrics {
mappedRsc := se.getResource(rsc, metric, seenResources)
if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY {
summaryMtcs := se.convertSummaryMetrics(metric)
for _, summaryMtc := range summaryMtcs {
if tss, err := se.protoMetricToTimeSeries(ctx, node, rsc, summaryMtc, additionalLabels); err == nil {
if tss, err := se.protoMetricToTimeSeries(ctx, node, mappedRsc, summaryMtc, additionalLabels); err == nil {
allTss = append(tss, tss...)
} else {
allErrs = append(allErrs, err)
}
}
} else {
if tss, err := se.protoMetricToTimeSeries(ctx, node, rsc, metric, additionalLabels); err == nil {
if tss, err := se.protoMetricToTimeSeries(ctx, node, mappedRsc, metric, additionalLabels); err == nil {
allTss = append(allTss, tss...)
} else {
allErrs = append(allErrs, err)
Expand Down Expand Up @@ -289,6 +295,9 @@ func (se *statsExporter) uploadMetricsProto(payloads []*metricProtoPayload) erro
)
defer span.End()

// Caches the resources seen so far
seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)

for _, payload := range payloads {
// Now create the metric descriptor remotely.
if err := se.createMetricDescriptor(ctx, payload.metric, payload.additionalLabels); err != nil {
Expand All @@ -299,7 +308,8 @@ func (se *statsExporter) uploadMetricsProto(payloads []*metricProtoPayload) erro

var allTimeSeries []*monitoringpb.TimeSeries
for _, payload := range payloads {
tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, payload.resource, payload.metric, payload.additionalLabels)
mappedRsc := se.getResource(payload.resource, payload.metric, seenResources)
tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, mappedRsc, payload.metric, payload.additionalLabels)
if err != nil {
span.SetStatus(trace.Status{Code: 2, Message: err.Error()})
return err
Expand Down Expand Up @@ -402,11 +412,22 @@ func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monito
return ctsreql
}

func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource {
var resource = rsc
if metric.Resource != nil {
resource = metric.Resource
}
mappedRsc, ok := seenRscs[resource]
if !ok {
mappedRsc = se.o.MapResource(resourcepbToResource(resource))
seenRscs[resource] = mappedRsc
}
return mappedRsc
}

func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource {
if rsc == nil {
return &resource.Resource{
Type: "global",
}
return globalResource
}
res := &resource.Resource{
Type: rsc.Type,
Expand All @@ -421,18 +442,11 @@ func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource {

// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest
// but it doesn't invoke any remote API.
func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) {
func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) {
if metric == nil {
return nil, errNilMetric
}

var resource = rsc
if metric.Resource != nil {
resource = metric.Resource
}

mappedRes := se.o.MapResource(resourcepbToResource(resource))

metricName, _, _, err := metricProseFromProto(metric)
if err != nil {
return nil, err
Expand Down Expand Up @@ -460,7 +474,7 @@ func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *comm
Type: metricType,
Labels: labels,
},
Resource: mappedRes,
Resource: mappedRsc,
Points: sdPoints,
})
}
Expand Down
12 changes: 10 additions & 2 deletions metrics_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,14 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) {
},
}

seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)

for i, tt := range tests {
se := tt.statsExporter
if se == nil {
se = new(statsExporter)
}
tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, nil, tt.in, nil)
tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, se.getResource(nil, tt.in, seenResources), tt.in, nil)
if tt.wantErr != "" {
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr)
Expand Down Expand Up @@ -171,6 +173,8 @@ func TestProtoMetricWithDifferentResource(t *testing.T) {
Nanos: 100000997,
}

seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)

tests := []struct {
in *metricspb.Metric
want []*monitoringpb.CreateTimeSeriesRequest
Expand Down Expand Up @@ -321,7 +325,7 @@ func TestProtoMetricWithDifferentResource(t *testing.T) {
if se == nil {
se = new(statsExporter)
}
tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, nil, tt.in, nil)
tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, se.getResource(nil, tt.in, seenResources), tt.in, nil)
if tt.wantErr != "" {
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr)
Expand All @@ -340,6 +344,10 @@ func TestProtoMetricWithDifferentResource(t *testing.T) {
t.Fatalf("Test %d failed. Unexpected CreateTimeSeriesRequests -got +want: %s", i, diff)
}
}

if len(seenResources) != 2 {
t.Errorf("Should cache 2 resources, got %d", len(seenResources))
}
}

func TestProtoToMonitoringMetricDescriptor(t *testing.T) {
Expand Down

0 comments on commit ffafe44

Please sign in to comment.