Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
Add an experimental API that exports metrics proto synchronously (#178)
Browse files Browse the repository at this point in the history
* Add an experimental API that exports metrics proto synchronously

* Fix gofmt
  • Loading branch information
songy23 authored Aug 26, 2019
1 parent bfde7f1 commit 8a44032
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
79 changes: 79 additions & 0 deletions metrics_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,70 @@ func (se *statsExporter) ExportMetricsProto(ctx context.Context, node *commonpb.
return nil
}

// ExportMetricsProtoSync exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously,
// without de-duping or adding proto metrics to the bundler.
func (se *statsExporter) ExportMetricsProtoSync(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error {
if len(metrics) == 0 {
return errNilMetric
}

additionalLabels := se.defaultLabels
if additionalLabels == nil {
// additionalLabels must be stateless because each node is different
additionalLabels = getDefaultLabelsFromNode(node)
}

ctx, cancel := se.o.newContextWithTimeout()
defer cancel()

var allReqs []*monitoringpb.CreateTimeSeriesRequest
var allTss []*monitoringpb.TimeSeries
var allErrs []error
for _, metric := range metrics {
if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY {
summaryMtcs := se.convertSummaryMetrics(metric)
for _, summaryMtc := range summaryMtcs {
if tss, err := se.protoMetricToTimeSeries(ctx, node, rsc, summaryMtc, additionalLabels); err == nil {
allTss = append(tss, tss...)
} else {
allErrs = append(allErrs, err)
}
}
} else {
if tss, err := se.protoMetricToTimeSeries(ctx, node, rsc, metric, additionalLabels); err == nil {
allTss = append(allTss, tss...)
} else {
allErrs = append(allErrs, err)
}
}

if len(allTss) >= maxTimeSeriesPerUpload { // Max 200 time series per request
allReqs = append(allReqs, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: allTss[0:maxTimeSeriesPerUpload],
})
allTss = allTss[maxTimeSeriesPerUpload:]
}
}

// Last batch, if any.
if len(allTss) > 0 {
allReqs = append(allReqs, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(se.o.ProjectID),
TimeSeries: allTss,
})
}

// Send create time series requests to Stackdriver.
for _, req := range allReqs {
if err := createTimeSeries(ctx, se.c, req); err != nil {
allErrs = append(allErrs, err)
}
}

return combineErrors(allErrs)
}

func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric {
var metrics []*metricspb.Metric
var percentileTss []*metricspb.TimeSeries
Expand Down Expand Up @@ -724,3 +788,18 @@ func getDefaultLabelsFromNode(node *commonpb.Node) map[string]labelValue {
},
}
}

// combineErrors converts a list of errors into one error.
func combineErrors(errs []error) error {
numErrors := len(errs)
if numErrors == 1 {
return errs[0]
} else if numErrors > 1 {
errMsgs := make([]string, 0, numErrors)
for _, err := range errs {
errMsgs = append(errMsgs, err.Error())
}
return fmt.Errorf("[%s]", strings.Join(errMsgs, "; "))
}
return nil
}
8 changes: 8 additions & 0 deletions stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,14 @@ func (e *Exporter) ExportMetricsProto(ctx context.Context, node *commonpb.Node,
return e.statsExporter.ExportMetricsProto(ctx, node, rsc, metrics)
}

// ExportMetricsProtoSync exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously,
// without de-duping or adding proto metrics to the bundler.
//
// Deprecated: experimental API for internal use at OpenTelemetry-Service only.
func (e *Exporter) ExportMetricsProtoSync(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) error {
return e.statsExporter.ExportMetricsProtoSync(ctx, node, rsc, metrics)
}

// ExportMetrics exports OpenCensus Metrics to Stackdriver Monitoring
func (e *Exporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
return e.statsExporter.ExportMetrics(ctx, metrics)
Expand Down

0 comments on commit 8a44032

Please sign in to comment.