Skip to content

Commit

Permalink
pass start_timestamp to the timeseries in OC exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Jul 24, 2020
1 parent 935280c commit 5c08285
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
"""OpenCensus Collector Metrics Exporter."""

import logging
from typing import Sequence
from typing import Optional, Sequence

import grpc
from google.protobuf.timestamp_pb2 import Timestamp
from opencensus.proto.agent.metrics.v1 import (
metrics_service_pb2,
metrics_service_pb2_grpc,
Expand Down Expand Up @@ -65,6 +66,8 @@ def __init__(
self.client = client

self.node = utils.get_node(service_name, host_name)
self.exporter_start_time_proto = Timestamp()
self.exporter_start_time_proto.GetCurrentTime()

def export(
self, metric_records: Sequence[MetricRecord]
Expand All @@ -89,16 +92,21 @@ def shutdown(self) -> None:
def generate_metrics_requests(
self, metrics: Sequence[MetricRecord]
) -> metrics_service_pb2.ExportMetricsServiceRequest:
collector_metrics = translate_to_collector(metrics)
collector_metrics = translate_to_collector(
metrics, self.exporter_start_time_proto
)
service_request = metrics_service_pb2.ExportMetricsServiceRequest(
node=self.node, metrics=collector_metrics
)
# for testing, will delete
print(collector_metrics)
yield service_request


# pylint: disable=too-many-branches
def translate_to_collector(
metric_records: Sequence[MetricRecord],
start_timestamp: Optional[Timestamp],
) -> Sequence[metrics_pb2.Metric]:
collector_metrics = []
for metric_record in metric_records:
Expand All @@ -121,9 +129,15 @@ def translate_to_collector(
label_keys=label_keys,
)

is_cumulative = metric_descriptor.type in (
metrics_pb2.MetricDescriptor.CUMULATIVE_INT64,
metrics_pb2.MetricDescriptor.CUMULATIVE_DOUBLE,
metrics_pb2.MetricDescriptor.CUMULATIVE_DISTRIBUTION,
)
timeseries = metrics_pb2.TimeSeries(
label_values=label_values,
points=[get_collector_point(metric_record)],
start_timestamp=(start_timestamp if is_cumulative else None),
)
collector_metrics.append(
metrics_pb2.Metric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ def test_translate_to_collector(self):
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(test_metric, self._key_labels, aggregator,)
output_metrics = metrics_exporter.translate_to_collector([record])
start_timestamp = Timestamp()
output_metrics = metrics_exporter.translate_to_collector(
[record], start_timestamp,
)
self.assertEqual(len(output_metrics), 1)
self.assertIsInstance(output_metrics[0], metrics_pb2.Metric)
self.assertEqual(output_metrics[0].metric_descriptor.name, "testname")
Expand All @@ -169,6 +172,9 @@ def test_translate_to_collector(self):
)
self.assertEqual(len(output_metrics[0].timeseries), 1)
self.assertEqual(len(output_metrics[0].timeseries[0].label_values), 1)
self.assertEqual(
output_metrics[0].timeseries[0].start_timestamp, start_timestamp
)
self.assertEqual(
output_metrics[0].timeseries[0].label_values[0].has_value, True
)
Expand Down

0 comments on commit 5c08285

Please sign in to comment.