Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Resource to MetricRecord #1209

Merged
merged 10 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:


def get_resource(metric_record: MetricRecord) -> resource_pb2.Resource:
resource_attributes = metric_record.instrument.meter.resource.attributes
resource_attributes = metric_record.resource.attributes
return resource_pb2.Resource(
type=infer_oc_resource_type(resource_attributes),
labels={k: str(v) for k, v in resource_attributes.items()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,36 @@ def test_get_collector_point(self):
"testName", "testDescription", "unit", float, ValueRecorder
)
result = metrics_exporter.get_collector_point(
MetricRecord(int_counter, self._key_labels, aggregator)
MetricRecord(
int_counter,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
)
self.assertIsInstance(result, metrics_pb2.Point)
self.assertIsInstance(result.timestamp, Timestamp)
self.assertEqual(result.int64_value, 0)
aggregator.update(123.5)
aggregator.take_checkpoint()
result = metrics_exporter.get_collector_point(
MetricRecord(float_counter, self._key_labels, aggregator)
MetricRecord(
float_counter,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
)
self.assertEqual(result.double_value, 123.5)
self.assertRaises(
TypeError,
metrics_exporter.get_collector_point(
MetricRecord(valuerecorder, self._key_labels, aggregator)
MetricRecord(
valuerecorder,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
),
)

Expand All @@ -130,7 +145,10 @@ def test_export(self):
"testname", "testdesc", "unit", int, Counter, self._labels.keys(),
)
record = MetricRecord(
test_metric, self._key_labels, aggregate.SumAggregator(),
test_metric,
self._key_labels,
aggregate.SumAggregator(),
metrics.get_meter_provider().resource,
)

result = collector_exporter.export([record])
Expand All @@ -155,7 +173,12 @@ def test_translate_to_collector(self):
aggregator = aggregate.SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(test_metric, self._key_labels, aggregator,)
record = MetricRecord(
test_metric,
self._key_labels,
aggregator,
metrics.get_meter_provider().resource,
)
start_timestamp = Timestamp()
output_metrics = metrics_exporter.translate_to_collector(
[record], start_timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ def _translate_data(
# ValueObserver Gauge()
for sdk_metric in data:

if sdk_metric.instrument.meter.resource not in (
if sdk_metric.resource not in (
sdk_resource_instrumentation_library_metrics.keys()
):
sdk_resource_instrumentation_library_metrics[
sdk_metric.instrument.meter.resource
sdk_metric.resource
] = InstrumentationLibraryMetrics()

type_class = {
Expand Down Expand Up @@ -217,7 +217,7 @@ def _translate_data(
argument = type_class[value_type]["gauge"]["argument"]

sdk_resource_instrumentation_library_metrics[
sdk_metric.instrument.meter.resource
sdk_metric.resource
].metrics.append(
OTLPMetric(
**{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,19 @@
class TestOTLPMetricExporter(TestCase):
def setUp(self):
self.exporter = OTLPMetricsExporter()

resource = SDKResource(OrderedDict([("a", 1), ("b", False)]))
self.counter_metric_record = MetricRecord(
Counter(
"a",
"b",
"c",
int,
MeterProvider(
resource=SDKResource(OrderedDict([("a", 1), ("b", False)]))
).get_meter(__name__),
MeterProvider(resource=resource,).get_meter(__name__),
("d",),
),
OrderedDict([("e", "f")]),
SumAggregator(),
resource,
)

def test_translate_metrics(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def test_shutdown(self):
def test_export(self):
with self._registry_register_patch:
record = MetricRecord(
self._test_metric, self._labels_key, SumAggregator(),
self._test_metric,
self._labels_key,
SumAggregator(),
get_meter_provider().resource,
)
exporter = PrometheusMetricsExporter()
result = exporter.export([record])
Expand All @@ -86,7 +89,9 @@ def test_min_max_sum_aggregator_to_prometheus(self):
aggregator.update(123)
aggregator.update(456)
aggregator.take_checkpoint()
record = MetricRecord(metric, key_labels, aggregator)
record = MetricRecord(
metric, key_labels, aggregator, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
result_bytes = generate_latest(collector)
Expand All @@ -104,7 +109,9 @@ def test_counter_to_prometheus(self):
aggregator = SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(metric, key_labels, aggregator)
record = MetricRecord(
metric, key_labels, aggregator, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])

Expand Down Expand Up @@ -132,7 +139,9 @@ def test_invalid_metric(self):
)
labels = {"environment": "staging"}
key_labels = get_dict_as_key(labels)
record = MetricRecord(metric, key_labels, None)
record = MetricRecord(
metric, key_labels, None, get_meter_provider().resource
)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
collector.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def __init__(
instrumentation_info: "InstrumentationInfo",
):
self.instrumentation_info = instrumentation_info
self.batcher = Batcher(source.stateful)
self.batcher = Batcher(source.stateful, source.resource)
self.resource = source.resource
self.metrics = set()
self.observers = set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from opentelemetry import metrics as metrics_api
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
from opentelemetry.sdk.resources import Resource


class MetricsExportResult(Enum):
Expand All @@ -30,10 +31,12 @@ def __init__(
instrument: metrics_api.InstrumentT,
labels: Tuple[Tuple[str, str]],
aggregator: Aggregator,
resource: Resource,
):
self.instrument = instrument
self.labels = labels
self.aggregator = aggregator
self.resource = resource


class MetricsExporter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Sequence

from opentelemetry.sdk.metrics.export import MetricRecord
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util import get_dict_as_key


Expand All @@ -26,13 +27,14 @@ class Batcher:
will be sent to an exporter for exporting.
"""

def __init__(self, stateful: bool):
def __init__(self, stateful: bool, resource: Resource):
self._batch_map = {}
# stateful=True indicates the batcher computes checkpoints from over
# the process lifetime. False indicates the batcher computes
# checkpoints which describe the updates of a single collection period
# (deltas)
self.stateful = stateful
self._resource = resource

def checkpoint_set(self) -> Sequence[MetricRecord]:
"""Returns a list of MetricRecords used for exporting.
Expand All @@ -46,7 +48,9 @@ def checkpoint_set(self) -> Sequence[MetricRecord]:
(instrument, aggregator_type, _, labels),
aggregator,
) in self._batch_map.items():
metric_records.append(MetricRecord(instrument, labels, aggregator))
metric_records.append(
MetricRecord(instrument, labels, aggregator, self._resource)
)
return metric_records

def finished_collection(self):
Expand Down
17 changes: 9 additions & 8 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from opentelemetry.sdk.metrics.export.batcher import Batcher
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.resources import Resource


# pylint: disable=protected-access
Expand All @@ -49,7 +50,7 @@ def test_export(self):
)
labels = {"environment": "staging"}
aggregator = SumAggregator()
record = MetricRecord(metric, labels, aggregator)
record = MetricRecord(metric, labels, aggregator, meter.resource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to have a test that tests to see if the resource created in the metric record matches the resource in the meterprovider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the ConsoleMetricsExporter to show the resource and updated the test to validate it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to remove resource from Meter: codeboten#9

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec does specify that a Meter should bind an API with a resource: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/sdk.md#sdk-terminology, but I guess since we're not making use of the resource yet, it's ok to remove it here for now.

result = '{}(data="{}", labels="{}", value={})'.format(
ConsoleMetricsExporter.__name__,
metric,
Expand All @@ -64,7 +65,7 @@ def test_export(self):
class TestBatcher(unittest.TestCase):
def test_checkpoint_set(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
batcher = Batcher(True, meter.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -81,13 +82,13 @@ def test_checkpoint_set(self):
self.assertEqual(records[0].aggregator, aggregator)

def test_checkpoint_set_empty(self):
batcher = Batcher(True)
batcher = Batcher(True, Resource.create_empty())
records = batcher.checkpoint_set()
self.assertEqual(len(records), 0)

def test_finished_collection_stateless(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(False)
batcher = Batcher(False, meter.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -102,7 +103,7 @@ def test_finished_collection_stateless(self):

def test_finished_collection_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
batcher = Batcher(True, meter.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -117,7 +118,7 @@ def test_finished_collection_stateful(self):

def test_batcher_process_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
batcher = Batcher(True, meter.resource)
aggregator = SumAggregator()
aggregator2 = SumAggregator()
metric = metrics.Counter(
Expand All @@ -138,7 +139,7 @@ def test_batcher_process_exists(self):

def test_batcher_process_not_exists(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
batcher = Batcher(True, meter.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand All @@ -157,7 +158,7 @@ def test_batcher_process_not_exists(self):

def test_batcher_process_not_stateful(self):
meter = metrics.MeterProvider().get_meter(__name__)
batcher = Batcher(True)
batcher = Batcher(True, meter.resource)
aggregator = SumAggregator()
metric = metrics.Counter(
"available memory", "available memory", "bytes", int, meter
Expand Down