diff --git a/equivalence_test.go b/equivalence_test.go index 73a07d4..5908e8e 100644 --- a/equivalence_test.go +++ b/equivalence_test.go @@ -27,10 +27,12 @@ import ( "google.golang.org/grpc" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "github.com/golang/protobuf/ptypes/empty" timestamp "github.com/golang/protobuf/ptypes/timestamp" googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" ) @@ -53,6 +55,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) { Unit: "ms", Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, } + seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) // Generate some view.Data and metrics. var vdl []*view.Data @@ -107,7 +110,7 @@ func TestStatsAndMetricsEquivalence(t *testing.T) { vdl := []*view.Data{vd} sctreql := se.makeReq(vdl, maxTimeSeriesPerUpload) - tsl, _ := se.protoMetricToTimeSeries(ctx, nil, nil, metricPbs[i], nil) + tsl, _ := se.protoMetricToTimeSeries(ctx, nil, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i], nil) pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl) if diff := cmpTSReqs(pctreql, sctreql); diff != "" { t.Fatalf("TimeSeries Mismatch -FromMetrics +FromStats: %s", diff) diff --git a/metrics_proto.go b/metrics_proto.go index cfe4863..94823c9 100644 --- a/metrics_proto.go +++ b/metrics_proto.go @@ -35,6 +35,7 @@ import ( distributionpb "google.golang.org/genproto/googleapis/api/distribution" labelpb "google.golang.org/genproto/googleapis/api/label" googlemetricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" @@ -49,6 +50,7 @@ var percentileLabelKey = &metricspb.LabelKey{ Key: "percentile", Description: "the value at a given percentile of a distribution", } +var globalResource = &resource.Resource{Type: "global"} type metricProtoPayload struct { node *commonpb.Node @@ -99,6 +101,9 @@ func (se *statsExporter) ExportMetricsProtoSync(ctx context.Context, node *commo return errNilMetric } + // Caches the resources seen so far + seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) + additionalLabels := se.defaultLabels if additionalLabels == nil { // additionalLabels must be stateless because each node is different @@ -112,17 +117,18 @@ func (se *statsExporter) ExportMetricsProtoSync(ctx context.Context, node *commo var allTss []*monitoringpb.TimeSeries var allErrs []error for _, metric := range metrics { + mappedRsc := se.getResource(rsc, metric, seenResources) if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY { summaryMtcs := se.convertSummaryMetrics(metric) for _, summaryMtc := range summaryMtcs { - if tss, err := se.protoMetricToTimeSeries(ctx, node, rsc, summaryMtc, additionalLabels); err == nil { + if tss, err := se.protoMetricToTimeSeries(ctx, node, mappedRsc, summaryMtc, additionalLabels); err == nil { allTss = append(tss, tss...) } else { allErrs = append(allErrs, err) } } } else { - if tss, err := se.protoMetricToTimeSeries(ctx, node, rsc, metric, additionalLabels); err == nil { + if tss, err := se.protoMetricToTimeSeries(ctx, node, mappedRsc, metric, additionalLabels); err == nil { allTss = append(allTss, tss...) } else { allErrs = append(allErrs, err) @@ -289,6 +295,9 @@ func (se *statsExporter) uploadMetricsProto(payloads []*metricProtoPayload) erro ) defer span.End() + // Caches the resources seen so far + seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) + for _, payload := range payloads { // Now create the metric descriptor remotely. if err := se.createMetricDescriptor(ctx, payload.metric, payload.additionalLabels); err != nil { @@ -299,7 +308,8 @@ func (se *statsExporter) uploadMetricsProto(payloads []*metricProtoPayload) erro var allTimeSeries []*monitoringpb.TimeSeries for _, payload := range payloads { - tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, payload.resource, payload.metric, payload.additionalLabels) + mappedRsc := se.getResource(payload.resource, payload.metric, seenResources) + tsl, err := se.protoMetricToTimeSeries(ctx, payload.node, mappedRsc, payload.metric, payload.additionalLabels) if err != nil { span.SetStatus(trace.Status{Code: 2, Message: err.Error()}) return err @@ -402,11 +412,22 @@ func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monito return ctsreql } +func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource { + var resource = rsc + if metric.Resource != nil { + resource = metric.Resource + } + mappedRsc, ok := seenRscs[resource] + if !ok { + mappedRsc = se.o.MapResource(resourcepbToResource(resource)) + seenRscs[resource] = mappedRsc + } + return mappedRsc +} + func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource { if rsc == nil { - return &resource.Resource{ - Type: "global", - } + return globalResource } res := &resource.Resource{ Type: rsc.Type, @@ -421,18 +442,11 @@ func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource { // protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest // but it doesn't invoke any remote API. -func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) { +func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *commonpb.Node, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) { if metric == nil { return nil, errNilMetric } - var resource = rsc - if metric.Resource != nil { - resource = metric.Resource - } - - mappedRes := se.o.MapResource(resourcepbToResource(resource)) - metricName, _, _, err := metricProseFromProto(metric) if err != nil { return nil, err @@ -460,7 +474,7 @@ func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, node *comm Type: metricType, Labels: labels, }, - Resource: mappedRes, + Resource: mappedRsc, Points: sdPoints, }) } diff --git a/metrics_proto_test.go b/metrics_proto_test.go index 73e7a38..7ebe7b9 100644 --- a/metrics_proto_test.go +++ b/metrics_proto_test.go @@ -135,12 +135,14 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) { }, } + seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) + for i, tt := range tests { se := tt.statsExporter if se == nil { se = new(statsExporter) } - tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, nil, tt.in, nil) + tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, se.getResource(nil, tt.in, seenResources), tt.in, nil) if tt.wantErr != "" { if err == nil || !strings.Contains(err.Error(), tt.wantErr) { t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr) @@ -171,6 +173,8 @@ func TestProtoMetricWithDifferentResource(t *testing.T) { Nanos: 100000997, } + seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) + tests := []struct { in *metricspb.Metric want []*monitoringpb.CreateTimeSeriesRequest @@ -321,7 +325,7 @@ func TestProtoMetricWithDifferentResource(t *testing.T) { if se == nil { se = new(statsExporter) } - tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, nil, tt.in, nil) + tsl, err := se.protoMetricToTimeSeries(context.Background(), nil, se.getResource(nil, tt.in, seenResources), tt.in, nil) if tt.wantErr != "" { if err == nil || !strings.Contains(err.Error(), tt.wantErr) { t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr) @@ -340,6 +344,10 @@ func TestProtoMetricWithDifferentResource(t *testing.T) { t.Fatalf("Test %d failed. Unexpected CreateTimeSeriesRequests -got +want: %s", i, diff) } } + + if len(seenResources) != 2 { + t.Errorf("Should cache 2 resources, got %d", len(seenResources)) + } } func TestProtoToMonitoringMetricDescriptor(t *testing.T) {