From 3b40441f0c7adf855317ffe70162b5175274cb77 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Sun, 1 Sep 2019 09:20:41 -0700 Subject: [PATCH] Add a metric batcher which preallocates batch sizes. (#194) * Add a metric batcher which preallocates batch sizes. * Fix received/dropped logic. * Fix comments and add a new method that returns num dropped timeseries * Use recordDroppedTimeseries when possible * Rename ExportMetricsProtoAndReturnDropped to PushMetricsProto * Fix tests. --- equivalence_test.go | 12 ++- metrics_batcher.go | 91 +++++++++++++++++++++ metrics_proto.go | 184 +++++------------------------------------- metrics_proto_test.go | 14 +++- stackdriver.go | 8 +- stats.go | 77 ++++++++++++++++++ 6 files changed, 214 insertions(+), 172 deletions(-) create mode 100644 metrics_batcher.go diff --git a/equivalence_test.go b/equivalence_test.go index 38d5a01..ec5879c 100644 --- a/equivalence_test.go +++ b/equivalence_test.go @@ -102,18 +102,22 @@ func TestStatsAndMetricsEquivalence(t *testing.T) { Name: fmt.Sprintf("projects/%s", se.o.ProjectID), MetricDescriptor: sMD, } - pMDR, err := se.protoMetricDescriptorToCreateMetricDescriptorRequest(ctx, metricPbs[i], nil) + inMD, err := se.protoToMonitoringMetricDescriptor(metricPbs[i], nil) if err != nil { t.Errorf("#%d: Stats.protoMetricDescriptorToMetricDescriptor: %v", i, err) } + pMDR := &monitoringpb.CreateMetricDescriptorRequest{ + Name: fmt.Sprintf("projects/%s", se.o.ProjectID), + MetricDescriptor: inMD, + } if diff := cmpMDReq(pMDR, sMDR); diff != "" { t.Fatalf("MetricDescriptor Mismatch -FromMetricsPb +FromMetrics: %s", diff) } stss, _ := se.metricToMpbTs(ctx, metric) sctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(stss) - tsl, _ := se.protoMetricToTimeSeries(ctx, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i], nil) - pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl) + allTss, _ := protoMetricToTimeSeries(ctx, se, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i]) + pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(allTss) if diff := cmpTSReqs(pctreql, sctreql); diff != "" { t.Fatalf("TimeSeries Mismatch -FromMetricsPb +FromMetrics: %s", diff) } @@ -336,7 +340,7 @@ func TestEquivalenceStatsVsMetricsUploads(t *testing.T) { }) // Export the proto Metrics to the Stackdriver backend. - se.ExportMetricsProto(context.Background(), nil, nil, metricPbs) + se.PushMetricsProto(context.Background(), nil, nil, metricPbs) se.Flush() var stackdriverTimeSeriesFromMetricsPb []*monitoringpb.CreateTimeSeriesRequest diff --git a/metrics_batcher.go b/metrics_batcher.go new file mode 100644 index 0000000..54a92e9 --- /dev/null +++ b/metrics_batcher.go @@ -0,0 +1,91 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "context" + "fmt" + "strings" + + monitoring "cloud.google.com/go/monitoring/apiv3" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +type metricsBatcher struct { + projectID string + allReqs []*monitoringpb.CreateTimeSeriesRequest + allTss []*monitoringpb.TimeSeries + allErrs []error + // Counts all dropped TimeSeries by this exporter. + droppedTimeSeries int +} + +func newMetricsBatcher(projectID string) *metricsBatcher { + return &metricsBatcher{ + projectID: projectID, + allTss: make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload), + droppedTimeSeries: 0, + } +} + +func (mb *metricsBatcher) recordDroppedTimeseries(numTimeSeries int, err error) { + mb.droppedTimeSeries += numTimeSeries + mb.allErrs = append(mb.allErrs, err) +} + +func (mb *metricsBatcher) addTimeSeries(ts *monitoringpb.TimeSeries) { + mb.allTss = append(mb.allTss, ts) + if len(mb.allTss) == maxTimeSeriesPerUpload { + mb.allReqs = append(mb.allReqs, &monitoringpb.CreateTimeSeriesRequest{ + Name: monitoring.MetricProjectPath(mb.projectID), + TimeSeries: mb.allTss, + }) + mb.allTss = make([]*monitoringpb.TimeSeries, maxTimeSeriesPerUpload) + } +} + +func (mb *metricsBatcher) export(ctx context.Context, mc *monitoring.MetricClient) { + // Last batch, if any. + if len(mb.allTss) > 0 { + mb.allReqs = append(mb.allReqs, &monitoringpb.CreateTimeSeriesRequest{ + Name: monitoring.MetricProjectPath(mb.projectID), + TimeSeries: mb.allTss, + }) + } + + // Send create time series requests to Stackdriver. + for _, req := range mb.allReqs { + if err := createTimeSeries(ctx, mc, req); err != nil { + mb.recordDroppedTimeseries(len(req.TimeSeries), err) + } + } +} + +func (mb *metricsBatcher) finalError() error { + numErrors := len(mb.allErrs) + if numErrors == 0 { + return nil + } + + if numErrors == 1 { + return mb.allErrs[0] + } + + errMsgs := make([]string, 0, numErrors) + for _, err := range mb.allErrs { + errMsgs = append(errMsgs, err.Error()) + } + return fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) +} diff --git a/metrics_proto.go b/metrics_proto.go index 304884e..a34d949 100644 --- a/metrics_proto.go +++ b/metrics_proto.go @@ -24,12 +24,10 @@ import ( "errors" "fmt" "path" - "sort" "strings" "go.opencensus.io/resource" - monitoring "cloud.google.com/go/monitoring/apiv3" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" @@ -49,11 +47,11 @@ var percentileLabelKey = &metricspb.LabelKey{ var globalResource = &resource.Resource{Type: "global"} var domains = []string{"googleapis.com", "kubernetes.io", "istio.io"} -// ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously, +// PushMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously, // without de-duping or adding proto metrics to the bundler. -func (se *statsExporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error { +func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) { if len(metrics) == 0 { - return errNilMetricOrMetricDescriptor + return 0, errNilMetricOrMetricDescriptor } ctx, cancel := se.o.newContextWithTimeout() @@ -62,9 +60,7 @@ func (se *statsExporter) ExportMetricsProto(ctx context.Context, node *commonpb. // Caches the resources seen so far seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) - var allReqs []*monitoringpb.CreateTimeSeriesRequest - var allTss []*monitoringpb.TimeSeries - var allErrs []error + mb := newMetricsBatcher(se.o.ProjectID) for _, metric := range metrics { if len(metric.GetTimeseries()) == 0 { // No TimeSeries to export, skip this metric. @@ -74,53 +70,23 @@ func (se *statsExporter) ExportMetricsProto(ctx context.Context, node *commonpb. if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY { summaryMtcs := se.convertSummaryMetrics(metric) for _, summaryMtc := range summaryMtcs { - if err := se.createMetricDescriptor(ctx, summaryMtc, se.defaultLabels); err != nil { - allErrs = append(allErrs, err) + if err := se.createMetricDescriptor(ctx, summaryMtc); err != nil { + mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err) continue } - if tss, err := se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, se.defaultLabels); err == nil { - allTss = append(tss, tss...) - } else { - allErrs = append(allErrs, err) - } + se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb) } } else { - if err := se.createMetricDescriptor(ctx, metric, se.defaultLabels); err != nil { - allErrs = append(allErrs, err) + if err := se.createMetricDescriptor(ctx, metric); err != nil { + mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err) continue } - if tss, err := se.protoMetricToTimeSeries(ctx, mappedRsc, metric, se.defaultLabels); err == nil { - allTss = append(allTss, tss...) - } else { - allErrs = append(allErrs, err) - } - } - - if len(allTss) >= maxTimeSeriesPerUpload { // Max 200 time series per request - allReqs = append(allReqs, &monitoringpb.CreateTimeSeriesRequest{ - Name: monitoring.MetricProjectPath(se.o.ProjectID), - TimeSeries: allTss[0:maxTimeSeriesPerUpload], - }) - allTss = allTss[maxTimeSeriesPerUpload:] + se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb) } } - // Last batch, if any. - if len(allTss) > 0 { - allReqs = append(allReqs, &monitoringpb.CreateTimeSeriesRequest{ - Name: monitoring.MetricProjectPath(se.o.ProjectID), - TimeSeries: allTss, - }) - } - - // Send create time series requests to Stackdriver. - for _, req := range allReqs { - if err := createTimeSeries(ctx, se.c, req); err != nil { - allErrs = append(allErrs, err) - } - } - - return combineErrors(allErrs) + mb.export(ctx, se.c) + return mb.droppedTimeSeries, mb.finalError() } func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric { @@ -238,81 +204,6 @@ func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*met return metrics } -// metricSignature creates a unique signature consisting of a -// metric's type and its lexicographically sorted label values -// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120 -func metricSignature(metric *googlemetricpb.Metric) string { - labels := metric.GetLabels() - labelValues := make([]string, 0, len(labels)) - - for _, labelValue := range labels { - labelValues = append(labelValues, labelValue) - } - sort.Strings(labelValues) - return fmt.Sprintf("%s:%s", metric.GetType(), strings.Join(labelValues, ",")) -} - -func (se *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) { - if len(ts) == 0 { - return nil - } - - // Since there are scenarios in which Metrics with the same Type - // can be bunched in the same TimeSeries, we have to ensure that - // we create a unique CreateTimeSeriesRequest with entirely unique Metrics - // per TimeSeries, lest we'll encounter: - // - // err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: - // Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered. - // Only one point can be written per TimeSeries per request.: timeSeries[2] - // - // This scenario happens when we are using the OpenCensus Agent in which multiple metrics - // are streamed by various client applications. - // See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73 - uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts)) - nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts)) - seenMetrics := make(map[string]struct{}) - - for _, tti := range ts { - key := metricSignature(tti.Metric) - if _, alreadySeen := seenMetrics[key]; !alreadySeen { - uniqueTimeSeries = append(uniqueTimeSeries, tti) - seenMetrics[key] = struct{}{} - } else { - nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti) - } - } - - // UniqueTimeSeries can be bunched up together - // While for each nonUniqueTimeSeries, we have - // to make a unique CreateTimeSeriesRequest. - ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{ - Name: monitoring.MetricProjectPath(se.o.ProjectID), - TimeSeries: uniqueTimeSeries, - }) - - // Now recursively also combine the non-unique TimeSeries - // that were singly added to nonUniqueTimeSeries. - // The reason is that we need optimal combinations - // for optimal combinations because: - // * "a/b/c" - // * "a/b/c" - // * "x/y/z" - // * "a/b/c" - // * "x/y/z" - // * "p/y/z" - // * "d/y/z" - // - // should produce: - // CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"] - // CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"] - // CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"] - nonUniqueRequests := se.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries) - ctsreql = append(ctsreql, nonUniqueRequests...) - - return ctsreql -} - func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource { var resource = rsc if metric.Resource != nil { @@ -343,9 +234,9 @@ func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource { // protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest // but it doesn't invoke any remote API. -func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, additionalLabels map[string]labelValue) ([]*monitoringpb.TimeSeries, error) { +func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, mb *metricsBatcher) { if metric == nil || metric.MetricDescriptor == nil { - return nil, errNilMetricOrMetricDescriptor + mb.recordDroppedTimeseries(len(metric.GetTimeseries()), errNilMetricOrMetricDescriptor) } metricName := metric.GetMetricDescriptor().GetName() @@ -357,21 +248,21 @@ func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc labelKeys = append(labelKeys, sanitize(key.GetKey())) } - timeSeries := make([]*monitoringpb.TimeSeries, 0, len(metric.Timeseries)) for _, protoTimeSeries := range metric.Timeseries { sdPoints, err := se.protoTimeSeriesToMonitoringPoints(protoTimeSeries, metricKind) if err != nil { - return nil, err + mb.recordDroppedTimeseries(1, err) + continue } // Each TimeSeries has labelValues which MUST be correlated // with that from the MetricDescriptor - labels, err := labelsPerTimeSeries(additionalLabels, labelKeys, protoTimeSeries.GetLabelValues()) + labels, err := labelsPerTimeSeries(se.defaultLabels, labelKeys, protoTimeSeries.GetLabelValues()) if err != nil { - // TODO: (@odeke-em) perhaps log this error from labels extraction, if non-nil. + mb.recordDroppedTimeseries(1, err) continue } - timeSeries = append(timeSeries, &monitoringpb.TimeSeries{ + mb.addTimeSeries(&monitoringpb.TimeSeries{ Metric: &googlemetricpb.Metric{ Type: metricType, Labels: labels, @@ -382,8 +273,6 @@ func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc Points: sdPoints, }) } - - return timeSeries, nil } func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, labelValues []*metricspb.LabelValue) (map[string]string, error) { @@ -409,25 +298,9 @@ func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, lab return labels, nil } -func (se *statsExporter) protoMetricDescriptorToCreateMetricDescriptorRequest(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) (*monitoringpb.CreateMetricDescriptorRequest, error) { - // Otherwise, we encountered a cache-miss and - // should create the metric descriptor remotely. - inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels) - if err != nil { - return nil, err - } - - cmrdesc := &monitoringpb.CreateMetricDescriptorRequest{ - Name: fmt.Sprintf("projects/%s", se.o.ProjectID), - MetricDescriptor: inMD, - } - - return cmrdesc, nil -} - // createMetricDescriptor creates a metric descriptor from the OpenCensus proto metric // and then creates it remotely using Stackdriver's API. -func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *metricspb.Metric, additionalLabels map[string]labelValue) error { +func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *metricspb.Metric) error { se.protoMu.Lock() defer se.protoMu.Unlock() @@ -438,7 +311,7 @@ func (se *statsExporter) createMetricDescriptor(ctx context.Context, metric *met // Otherwise, we encountered a cache-miss and // should create the metric descriptor remotely. - inMD, err := se.protoToMonitoringMetricDescriptor(metric, additionalLabels) + inMD, err := se.protoToMonitoringMetricDescriptor(metric, se.defaultLabels) if err != nil { return err } @@ -684,18 +557,3 @@ func protoMetricDescriptorTypeToMetricKind(m *metricspb.Metric) (googlemetricpb. return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED } } - -// combineErrors converts a list of errors into one error. -func combineErrors(errs []error) error { - numErrors := len(errs) - if numErrors == 1 { - return errs[0] - } else if numErrors > 1 { - errMsgs := make([]string, 0, numErrors) - for _, err := range errs { - errMsgs = append(errMsgs, err.Error()) - } - return fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) - } - return nil -} diff --git a/metrics_proto_test.go b/metrics_proto_test.go index b64535d..42473f8 100644 --- a/metrics_proto_test.go +++ b/metrics_proto_test.go @@ -142,7 +142,7 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) { if se == nil { se = new(statsExporter) } - tsl, err := se.protoMetricToTimeSeries(context.Background(), se.getResource(nil, tt.in, seenResources), tt.in, nil) + allTss, err := protoMetricToTimeSeries(context.Background(), se, se.getResource(nil, tt.in, seenResources), tt.in) if tt.wantErr != "" { if err == nil || !strings.Contains(err.Error(), tt.wantErr) { t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr) @@ -154,7 +154,7 @@ func TestProtoMetricToCreateTimeSeriesRequest(t *testing.T) { continue } - got := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl) + got := se.combineTimeSeriesToCreateTimeSeriesRequest(allTss) // Our saving grace is serialization equality since some // unexported fields could be present in the various values. if diff := cmpTSReqs(got, tt.want); diff != "" { @@ -331,7 +331,7 @@ func TestProtoMetricWithDifferentResource(t *testing.T) { if se == nil { se = new(statsExporter) } - tsl, err := se.protoMetricToTimeSeries(context.Background(), se.getResource(nil, tt.in, seenResources), tt.in, nil) + allTss, err := protoMetricToTimeSeries(context.Background(), se, se.getResource(nil, tt.in, seenResources), tt.in) if tt.wantErr != "" { if err == nil || !strings.Contains(err.Error(), tt.wantErr) { t.Errorf("#%d: unmatched error. Got\n\t%v\nWant\n\t%v", i, err, tt.wantErr) @@ -343,7 +343,7 @@ func TestProtoMetricWithDifferentResource(t *testing.T) { continue } - got := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl) + got := se.combineTimeSeriesToCreateTimeSeriesRequest(allTss) // Our saving grace is serialization equality since some // unexported fields could be present in the various values. if diff := cmpTSReqs(got, tt.want); diff != "" { @@ -848,3 +848,9 @@ func makePercentileValue(val, percentile float64) *metricspb.SummaryValue_Snapsh Percentile: percentile, } } + +func protoMetricToTimeSeries(ctx context.Context, se *statsExporter, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric) ([]*monitoringpb.TimeSeries, error) { + mb := newMetricsBatcher(se.o.ProjectID) + se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb) + return mb.allTss, mb.finalError() +} diff --git a/stackdriver.go b/stackdriver.go index a15972a..68ab4d1 100644 --- a/stackdriver.go +++ b/stackdriver.go @@ -351,7 +351,13 @@ func (e *Exporter) ExportView(vd *view.Data) { // ExportMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously, // without de-duping or adding proto metrics to the bundler. func (e *Exporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error { - return e.statsExporter.ExportMetricsProto(ctx, node, rsc, metrics) + _, err := e.statsExporter.PushMetricsProto(ctx, node, rsc, metrics) + return err +} + +// PushMetricsProto simliar with ExportMetricsProto but returns the number of dropped timeseries. +func (e *Exporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) { + return e.statsExporter.PushMetricsProto(ctx, node, rsc, metrics) } // ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring diff --git a/stats.go b/stats.go index 2528907..d1e5c6d 100644 --- a/stats.go +++ b/stats.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "path" + "sort" "strconv" "strings" "sync" @@ -40,6 +41,7 @@ import ( distributionpb "google.golang.org/genproto/googleapis/api/distribution" labelpb "google.golang.org/genproto/googleapis/api/label" "google.golang.org/genproto/googleapis/api/metric" + googlemetricpb "google.golang.org/genproto/googleapis/api/metric" metricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" @@ -394,6 +396,81 @@ func (e *statsExporter) displayName(suffix string) string { return path.Join(displayNamePrefix, suffix) } +func (e *statsExporter) combineTimeSeriesToCreateTimeSeriesRequest(ts []*monitoringpb.TimeSeries) (ctsreql []*monitoringpb.CreateTimeSeriesRequest) { + if len(ts) == 0 { + return nil + } + + // Since there are scenarios in which Metrics with the same Type + // can be bunched in the same TimeSeries, we have to ensure that + // we create a unique CreateTimeSeriesRequest with entirely unique Metrics + // per TimeSeries, lest we'll encounter: + // + // err: rpc error: code = InvalidArgument desc = One or more TimeSeries could not be written: + // Field timeSeries[2] had an invalid value: Duplicate TimeSeries encountered. + // Only one point can be written per TimeSeries per request.: timeSeries[2] + // + // This scenario happens when we are using the OpenCensus Agent in which multiple metrics + // are streamed by various client applications. + // See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/73 + uniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts)) + nonUniqueTimeSeries := make([]*monitoringpb.TimeSeries, 0, len(ts)) + seenMetrics := make(map[string]struct{}) + + for _, tti := range ts { + key := metricSignature(tti.Metric) + if _, alreadySeen := seenMetrics[key]; !alreadySeen { + uniqueTimeSeries = append(uniqueTimeSeries, tti) + seenMetrics[key] = struct{}{} + } else { + nonUniqueTimeSeries = append(nonUniqueTimeSeries, tti) + } + } + + // UniqueTimeSeries can be bunched up together + // While for each nonUniqueTimeSeries, we have + // to make a unique CreateTimeSeriesRequest. + ctsreql = append(ctsreql, &monitoringpb.CreateTimeSeriesRequest{ + Name: monitoring.MetricProjectPath(e.o.ProjectID), + TimeSeries: uniqueTimeSeries, + }) + + // Now recursively also combine the non-unique TimeSeries + // that were singly added to nonUniqueTimeSeries. + // The reason is that we need optimal combinations + // for optimal combinations because: + // * "a/b/c" + // * "a/b/c" + // * "x/y/z" + // * "a/b/c" + // * "x/y/z" + // * "p/y/z" + // * "d/y/z" + // + // should produce: + // CreateTimeSeries(uniqueTimeSeries) :: ["a/b/c", "x/y/z", "p/y/z", "d/y/z"] + // CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c"] + // CreateTimeSeries(nonUniqueTimeSeries) :: ["a/b/c", "x/y/z"] + nonUniqueRequests := e.combineTimeSeriesToCreateTimeSeriesRequest(nonUniqueTimeSeries) + ctsreql = append(ctsreql, nonUniqueRequests...) + + return ctsreql +} + +// metricSignature creates a unique signature consisting of a +// metric's type and its lexicographically sorted label values +// See https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/120 +func metricSignature(metric *googlemetricpb.Metric) string { + labels := metric.GetLabels() + labelValues := make([]string, 0, len(labels)) + + for _, labelValue := range labels { + labelValues = append(labelValues, labelValue) + } + sort.Strings(labelValues) + return fmt.Sprintf("%s:%s", metric.GetType(), strings.Join(labelValues, ",")) +} + func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point { switch v.Aggregation.Type { case view.AggTypeLastValue: