Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor metric format
Browse files Browse the repository at this point in the history
ocelotl committed May 10, 2022

Verified

This commit was signed with the committer’s verified signature.
ocelotl Diego Hurtado
1 parent a821311 commit a27fe4b
Showing 13 changed files with 438 additions and 867 deletions.
Original file line number Diff line number Diff line change
@@ -22,16 +22,11 @@
Aggregation,
DefaultAggregation,
_Aggregation,
_convert_aggregation_temporality,
_PointVarT,
_SumAggregation,
)
from opentelemetry.sdk._metrics._internal.export import AggregationTemporality
from opentelemetry.sdk._metrics._internal.measurement import Measurement
from opentelemetry.sdk._metrics._internal.point import Metric
from opentelemetry.sdk._metrics._internal.sdk_configuration import (
SdkConfiguration,
)
from opentelemetry.sdk._metrics._internal.point import DataPointT
from opentelemetry.sdk._metrics._internal.view import View

_logger = getLogger(__name__)
@@ -42,17 +37,12 @@ def __init__(
self,
view: View,
instrument: Instrument,
sdk_config: SdkConfiguration,
instrument_class_temporality: Dict[type, AggregationTemporality],
instrument_class_aggregation: Dict[type, Aggregation],
):
self._view = view
self._instrument = instrument
self._sdk_config = sdk_config
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
self._lock = Lock()
self._instrument_class_temporality = instrument_class_temporality
self._instrument_class_aggregation = instrument_class_aggregation
self._name = self._view._name or self._instrument.name
self._description = (
@@ -124,46 +114,15 @@ def consume_measurement(self, measurement: Measurement) -> None:

self._attributes_aggregation[attributes].aggregate(measurement)

def collect(self) -> Iterable[Metric]:
def collect(
self, aggregation_temporality: AggregationTemporality
) -> Iterable[DataPointT]:

data_points = []
with self._lock:
for (
attributes,
aggregation,
) in self._attributes_aggregation.items():

previous_point = self._attributes_previous_point.get(
attributes
)

current_point = aggregation.collect()

# pylint: disable=assignment-from-none

self._attributes_previous_point[
attributes
] = _convert_aggregation_temporality(
previous_point,
current_point,
AggregationTemporality.CUMULATIVE,
for aggregation in self._attributes_aggregation.values():
data_points.append(
aggregation.collect(aggregation_temporality)
)

if current_point is not None:

yield Metric(
attributes=dict(attributes),
description=self._description,
instrumentation_scope=(
self._instrument.instrumentation_scope
),
name=self._name,
resource=self._sdk_config.resource,
unit=self._instrument.unit,
point=_convert_aggregation_temporality(
previous_point,
current_point,
self._instrument_class_temporality[
self._instrument.__class__
],
),
)
return data_points
251 changes: 109 additions & 142 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/aggregation.py
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@

from abc import ABC, abstractmethod
from bisect import bisect_left
from dataclasses import replace
from enum import IntEnum
from logging import getLogger
from math import inf
@@ -37,11 +36,15 @@
from opentelemetry.sdk._metrics._internal.point import (
Histogram as HistogramPoint,
)
from opentelemetry.sdk._metrics._internal.point import PointT, Sum
from opentelemetry.sdk._metrics._internal.point import (
HistogramDataPoint,
NumberDataPoint,
Sum,
)
from opentelemetry.util._time import _time_ns
from opentelemetry.util.types import Attributes

_PointVarT = TypeVar("_PointVarT", bound=PointT)
_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint)

_logger = getLogger(__name__)

@@ -58,25 +61,32 @@ class AggregationTemporality(IntEnum):
CUMULATIVE = 2


class _Aggregation(ABC, Generic[_PointVarT]):
class _Aggregation(ABC, Generic[_DataPointVarT]):
def __init__(self, attributes: Attributes):
self._lock = Lock()
self._attributes = attributes
self._previous_point = None

@abstractmethod
def aggregate(self, measurement: Measurement) -> None:
pass

@abstractmethod
def collect(self) -> Optional[_PointVarT]:
def collect(
self,
aggregation_temporality: AggregationTemporality,
) -> Optional[_DataPointVarT]:
pass


class _DropAggregation(_Aggregation):
def aggregate(self, measurement: Measurement) -> None:
pass

def collect(self) -> Optional[_PointVarT]:
def collect(
self,
aggregation_temporality: AggregationTemporality,
) -> Optional[_DataPointVarT]:
pass


@@ -176,7 +186,9 @@ def aggregate(self, measurement: Measurement) -> None:
self._value = 0
self._value = self._value + measurement.value

def collect(self) -> Optional[Sum]:
def collect(
self, aggregation_temporality: AggregationTemporality
) -> Optional[NumberDataPoint]:
"""
Atomically return a point for the current value of the metric and
reset the aggregation value.
@@ -192,28 +204,50 @@ def collect(self) -> Optional[Sum]:
self._value = 0
self._start_time_unix_nano = now + 1

return Sum(
aggregation_temporality=AggregationTemporality.DELTA,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)
else:

with self._lock:
if self._value is None:
return None
value = self._value
self._value = None
with self._lock:
if self._value is None:
return None
value = self._value
self._value = None
start_time_unix_nano = self._start_time_unix_nano

return Sum(
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=self._instrument_is_monotonic,
start_time_unix_nano=self._start_time_unix_nano,
current_point = NumberDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=now,
value=value,
)

if self._previous_point is None:
self._previous_point = current_point
return current_point

if self._instrument_temporality is aggregation_temporality:
# Output DELTA for a synchronous instrument
# Output CUMULATIVE for an asynchronous instrument
return current_point

if aggregation_temporality is AggregationTemporality.DELTA:
# Output temporality DELTA for an asynchronous instrument
value = current_point.value - self._previous_point.value
output_start_time_unix_nano = self._previous_point.time_unix_nano

else:
# Output CUMULATIVE for a synchronous instrument
value = current_point.value + self._previous_point.value
output_start_time_unix_nano = (
self._previous_point.start_time_unix_nano
)

return NumberDataPoint(
attributes=self._attributes,
start_time_unix_nano=output_start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
value=value,
)


class _LastValueAggregation(_Aggregation[Gauge]):
def __init__(self, attributes: Attributes):
@@ -224,7 +258,10 @@ def aggregate(self, measurement: Measurement):
with self._lock:
self._value = measurement.value

def collect(self) -> Optional[Gauge]:
def collect(
self,
aggregation_temporality: AggregationTemporality,
) -> Optional[_DataPointVarT]:
"""
Atomically return a point for the current value of the metric.
"""
@@ -234,7 +271,9 @@ def collect(self) -> Optional[Gauge]:
value = self._value
self._value = None

return Gauge(
return NumberDataPoint(
attributes=self._attributes,
start_time_unix_nano=0,
time_unix_nano=_time_ns(),
value=value,
)
@@ -266,6 +305,11 @@ def __init__(
self._sum = 0
self._record_min_max = record_min_max
self._start_time_unix_nano = _time_ns()
# It is assumed that the "natural" aggregation temporality for a
# Histogram instrument is DELTA, like the "natural" aggregation
# temporality for a Counter is DELTA and the "natural" aggregation
# temporality for an ObservableCounter is CUMULATIVE.
self._instrument_temporality = AggregationTemporality.DELTA

def _get_empty_bucket_counts(self) -> List[int]:
return [0] * (len(self._boundaries) + 1)
@@ -282,165 +326,88 @@ def aggregate(self, measurement: Measurement) -> None:

self._bucket_counts[bisect_left(self._boundaries, value)] += 1

def collect(self) -> HistogramPoint:
def collect(
self,
aggregation_temporality: AggregationTemporality,
) -> Optional[_DataPointVarT]:
"""
Atomically return a point for the current value of the metric.
"""
now = _time_ns()

with self._lock:
value = self._bucket_counts
if not any(self._bucket_counts):
return None

bucket_counts = self._bucket_counts
start_time_unix_nano = self._start_time_unix_nano
histogram_sum = self._sum
histogram_max = self._max
histogram_min = self._min
sum_ = self._sum
max_ = self._max
min_ = self._min

self._bucket_counts = self._get_empty_bucket_counts()
self._start_time_unix_nano = now + 1
self._sum = 0
self._min = inf
self._max = -inf

return HistogramPoint(
aggregation_temporality=AggregationTemporality.DELTA,
bucket_counts=tuple(value),
explicit_bounds=self._boundaries,
max=histogram_max,
min=histogram_min,
current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
sum=histogram_sum,
time_unix_nano=now,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)


# pylint: disable=too-many-return-statements,too-many-branches
def _convert_aggregation_temporality(
previous_point: Optional[_PointVarT],
current_point: _PointVarT,
aggregation_temporality: AggregationTemporality,
) -> _PointVarT:
"""Converts `current_point` to the requested `aggregation_temporality`
given the `previous_point`.
`previous_point` must have `CUMULATIVE` temporality. `current_point` may
have `DELTA` or `CUMULATIVE` temporality.
The output point will have temporality `aggregation_temporality`. Since
`GAUGE` points have no temporality, they are returned unchanged.
"""

current_point_type = type(current_point)

if current_point_type is Gauge:
return current_point

if (
previous_point is not None
and current_point is not None
and type(previous_point) is not type(current_point)
):
_logger.warning(
"convert_aggregation_temporality called with mismatched "
"point types: %s and %s",
type(previous_point),
current_point_type,
)

return current_point

if current_point_type is Sum:
if previous_point is None:
# Output CUMULATIVE for a synchronous instrument
# There is no previous value, return the delta point as a
# cumulative
return replace(
current_point, aggregation_temporality=aggregation_temporality
)
if previous_point.aggregation_temporality is not (
AggregationTemporality.CUMULATIVE
):
raise Exception(
"previous_point aggregation temporality must be CUMULATIVE"
)

if current_point.aggregation_temporality is aggregation_temporality:
# Output DELTA for a synchronous instrument
# Output CUMULATIVE for an asynchronous instrument
if self._previous_point is None:
self._previous_point = current_point
return current_point

if aggregation_temporality is AggregationTemporality.DELTA:
# Output temporality DELTA for an asynchronous instrument
value = current_point.value - previous_point.value
output_start_time_unix_nano = previous_point.time_unix_nano

else:
# Output CUMULATIVE for a synchronous instrument
value = current_point.value + previous_point.value
output_start_time_unix_nano = previous_point.start_time_unix_nano

is_monotonic = (
previous_point.is_monotonic and current_point.is_monotonic
)

return Sum(
start_time_unix_nano=output_start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
value=value,
aggregation_temporality=aggregation_temporality,
is_monotonic=is_monotonic,
)

if current_point_type is HistogramPoint:
if previous_point is None:
return replace(
current_point, aggregation_temporality=aggregation_temporality
)
if previous_point.aggregation_temporality is not (
AggregationTemporality.CUMULATIVE
):
raise Exception(
"previous_point aggregation temporality must be CUMULATIVE"
)

if current_point.aggregation_temporality is aggregation_temporality:
if self._instrument_temporality is aggregation_temporality:
return current_point

max_ = current_point.max
min_ = current_point.min

if aggregation_temporality is AggregationTemporality.CUMULATIVE:
start_time_unix_nano = previous_point.start_time_unix_nano
sum_ = current_point.sum + previous_point.sum
start_time_unix_nano = self._previous_point.start_time_unix_nano
sum_ = current_point.sum + self._previous_point.sum
# Only update min/max on delta -> cumulative
max_ = max(current_point.max, previous_point.max)
min_ = min(current_point.min, previous_point.min)
max_ = max(current_point.max, self._previous_point.max)
min_ = min(current_point.min, self._previous_point.min)
bucket_counts = [
curr_count + prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts, previous_point.bucket_counts
current_point.bucket_counts,
self._previous_point.bucket_counts,
)
]
else:
start_time_unix_nano = previous_point.time_unix_nano
sum_ = current_point.sum - previous_point.sum
start_time_unix_nano = self._previous_point.time_unix_nano
sum_ = current_point.sum - self._previous_point.sum
bucket_counts = [
curr_count - prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts, previous_point.bucket_counts
current_point.bucket_counts,
self._previous_point.bucket_counts,
)
]

return HistogramPoint(
aggregation_temporality=aggregation_temporality,
bucket_counts=bucket_counts,
explicit_bounds=current_point.explicit_bounds,
max=max_,
min=min_,
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
sum=sum_,
time_unix_nano=current_point.time_unix_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=current_point.explicit_bounds,
min=min_,
max=max_,
)
return None


class ExplicitBucketHistogramAggregation(Aggregation):
Original file line number Diff line number Diff line change
@@ -16,17 +16,31 @@
from threading import RLock
from typing import Dict, Iterable, List

from opentelemetry._metrics import Asynchronous, Instrument
from opentelemetry._metrics import (
Asynchronous,
Counter,
Instrument,
ObservableCounter,
)
from opentelemetry.sdk._metrics._internal._view_instrument_match import (
_ViewInstrumentMatch,
)
from opentelemetry.sdk._metrics._internal.aggregation import (
Aggregation,
ExplicitBucketHistogramAggregation,
_DropAggregation,
_ExplicitBucketHistogramAggregation,
_LastValueAggregation,
_SumAggregation,
)
from opentelemetry.sdk._metrics._internal.export import AggregationTemporality
from opentelemetry.sdk._metrics._internal.measurement import Measurement
from opentelemetry.sdk._metrics._internal.point import Metric
from opentelemetry.sdk._metrics._internal.point import (
Gauge,
Histogram,
Metric,
Sum,
)
from opentelemetry.sdk._metrics._internal.sdk_configuration import (
SdkConfiguration,
)
@@ -81,10 +95,6 @@ def _get_or_init_view_instrument_match(
_ViewInstrumentMatch(
view=_DEFAULT_VIEW,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_temporality=(
self._instrument_class_temporality
),
instrument_class_aggregation=(
self._instrument_class_aggregation
),
@@ -117,10 +127,59 @@ def collect(self) -> Iterable[Metric]:
# for a single instrument.
with self._lock:
for (
view_instrument_matches
) in self._instrument_view_instrument_matches.values():
instrument,
view_instrument_matches,
) in self._instrument_view_instrument_matches.items():
aggregation_temporality = self._instrument_class_temporality[
instrument.__class__
]

for view_instrument_match in view_instrument_matches:
metrics.extend(view_instrument_match.collect())

if isinstance(
view_instrument_match._aggregation, _SumAggregation
):
data = Sum(
aggregation_temporality=aggregation_temporality,
data_points=view_instrument_match.collect(
aggregation_temporality
),
is_monotonic=isinstance(
instrument, (Counter, ObservableCounter)
),
)
elif isinstance(
view_instrument_match._aggregation,
_LastValueAggregation,
):
data = Gauge(
data_points=view_instrument_match.collect(
aggregation_temporality
)
)
elif isinstance(
view_instrument_match._aggregation,
_ExplicitBucketHistogramAggregation,
):
data = Histogram(
data_points=view_instrument_match.collect(
aggregation_temporality
),
aggregation_temporality=aggregation_temporality,
)
elif isinstance(
view_instrument_match._aggregation, _DropAggregation
):
continue

metrics.append(
Metric(
name=view_instrument_match._name,
description=view_instrument_match._description,
unit=view_instrument_match._instrument.unit,
data=data,
)
)

return metrics

@@ -140,10 +199,6 @@ def _handle_view_instrument_match(
new_view_instrument_match = _ViewInstrumentMatch(
view=view,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_temporality=(
self._instrument_class_temporality
),
instrument_class_aggregation=(
self._instrument_class_aggregation
),
84 changes: 48 additions & 36 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/point.py
Original file line number Diff line number Diff line change
@@ -14,29 +14,37 @@

# pylint: disable=unused-import

from dataclasses import asdict, dataclass
from dataclasses import dataclass
from json import dumps
from typing import Sequence, Union

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk._metrics._internal
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.util.types import Attributes


@dataclass(frozen=True)
class NumberDataPoint:
"""Single data point in a timeseries that describes the time-varying scalar
value of a metric.
"""

attributes: Attributes
start_time_unix_nano: int
time_unix_nano: int
value: Union[int, float]


@dataclass(frozen=True)
class Sum:
"""Represents the type of a scalar metric that is calculated as a sum of
all reported measurements over a time interval."""

data_points: Sequence[NumberDataPoint]
aggregation_temporality: (
"opentelemetry.sdk._metrics.export.AggregationTemporality"
)
is_monotonic: bool
start_time_unix_nano: int
time_unix_nano: int
value: Union[int, float]


@dataclass(frozen=True)
@@ -45,61 +53,65 @@ class Gauge:
value for every data point. It should be used for an unknown
aggregation."""

data_points: Sequence[NumberDataPoint]


@dataclass(frozen=True)
class HistogramDataPoint:
"""Single data point in a timeseries that describes the time-varying scalar
value of a metric.
"""

attributes: Attributes
start_time_unix_nano: int
time_unix_nano: int
value: Union[int, float]
count: int
sum: Union[int, float]
bucket_counts: Sequence[int]
explicit_bounds: Sequence[float]
min: float
max: float


@dataclass(frozen=True)
class Histogram:
"""Represents the type of a metric that is calculated by aggregating as a
histogram of all reported measurements over a time interval."""

data_points: Sequence[HistogramDataPoint]
aggregation_temporality: (
"opentelemetry.sdk._metrics.export.AggregationTemporality"
)
bucket_counts: Sequence[int]
explicit_bounds: Sequence[float]
max: int
min: int
start_time_unix_nano: int
sum: Union[int, float]
time_unix_nano: int

def to_json(self) -> str:
return dumps(
{
"data_points": dumps(self.data_points),
"aggregation_temporality": self.aggregation_temporality,
}
)


PointT = Union[Sum, Gauge, Histogram]
DataT = Union[Sum, Gauge, Histogram]
DataPointT = Union[NumberDataPoint, HistogramDataPoint]


@dataclass(frozen=True)
class Metric:
"""Represents a metric point in the OpenTelemetry data model to be exported
Concrete metric types contain all the information as in the OTLP proto definitions
(https://github.com/open-telemetry/opentelemetry-proto/blob/b43e9b18b76abf3ee040164b55b9c355217151f3/opentelemetry/proto/metrics/v1/metrics.proto#L37) but are flattened as much as possible.
"""
"""Represents a metric point in the OpenTelemetry data model to be
exported."""

# common fields to all metric kinds
attributes: Attributes
description: str
instrumentation_scope: InstrumentationScope
name: str
resource: Resource
description: str
unit: str
point: PointT
"""Contains non-common fields for the given metric"""
data: DataT

def to_json(self) -> str:
return dumps(
{
"attributes": self.attributes if self.attributes else "",
"description": self.description if self.description else "",
"instrumentation_scope": repr(self.instrumentation_scope)
if self.instrumentation_scope
else "",
"name": self.name,
"resource": repr(self.resource.attributes)
if self.resource
else "",
"description": self.description if self.description else "",
"unit": self.unit if self.unit else "",
"point": asdict(self.point) if self.point else "",
"data": self.data.to_json(),
}
)
Original file line number Diff line number Diff line change
@@ -28,17 +28,20 @@

# The point module is not in the export directory to avoid a circular import.
from opentelemetry.sdk._metrics._internal.point import ( # noqa: F401
DataPointT,
DataT,
Gauge,
Histogram,
HistogramDataPoint,
Metric,
PointT,
NumberDataPoint,
Sum,
)

__all__ = []
for key, value in globals().copy().items():
if not key.startswith("_"):
if _version_info.minor == 6 and key == "PointT":
if _version_info.minor == 6 and key == "DataT":
continue
value.__module__ = __name__
__all__.append(key)
603 changes: 84 additions & 519 deletions opentelemetry-sdk/tests/metrics/test_aggregation.py

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion opentelemetry-sdk/tests/metrics/test_import.py
Original file line number Diff line number Diff line change
@@ -46,15 +46,18 @@ def test_import_export(self):
from opentelemetry.sdk._metrics.export import ( # noqa: F401
AggregationTemporality,
ConsoleMetricExporter,
DataPointT,
DataT,
Gauge,
Histogram,
HistogramDataPoint,
InMemoryMetricReader,
Metric,
MetricExporter,
MetricExportResult,
MetricReader,
NumberDataPoint,
PeriodicExportingMetricReader,
PointT,
Sum,
)
except Exception as error:
28 changes: 16 additions & 12 deletions opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py
Original file line number Diff line number Diff line change
@@ -21,10 +21,9 @@
AggregationTemporality,
InMemoryMetricReader,
Metric,
NumberDataPoint,
Sum,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope


class TestInMemoryMetricReader(TestCase):
@@ -37,16 +36,18 @@ def test_no_metrics(self):

def test_converts_metrics_to_list(self):
metric = Metric(
attributes={"myattr": "baz"},
description="",
instrumentation_scope=InstrumentationScope("testmetrics"),
name="foo",
resource=Resource.create(),
description="",
unit="",
point=Sum(
start_time_unix_nano=1647626444152947792,
time_unix_nano=1647626444153163239,
value=72.3309814450449,
data=Sum(
data_points=[
NumberDataPoint(
attributes={"myattr": "baz"},
start_time_unix_nano=1647626444152947792,
time_unix_nano=1647626444153163239,
value=72.3309814450449,
)
],
aggregation_temporality=AggregationTemporality.CUMULATIVE,
is_monotonic=True,
),
@@ -77,5 +78,8 @@ def test_integration(self):
counter1.add(1, {"foo": "2"})

metrics = reader.get_metrics()
# should be 3 metrics, one from the observable gauge and one for each labelset from the counter
self.assertEqual(len(metrics), 3)
# should be 3 number data points, one from the observable gauge and one
# for each labelset from the counter
self.assertEqual(len(metrics), 2)
self.assertEqual(len(metrics[0].data.data_points), 2)
self.assertEqual(len(metrics[1].data.data_points), 1)
16 changes: 12 additions & 4 deletions opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py
Original file line number Diff line number Diff line change
@@ -21,6 +21,9 @@
ObservableCounter,
UpDownCounter,
)
from opentelemetry.sdk._metrics._internal.aggregation import (
_LastValueAggregation,
)
from opentelemetry.sdk._metrics._internal.measurement import Measurement
from opentelemetry.sdk._metrics._internal.metric_reader_storage import (
_DEFAULT_VIEW,
@@ -106,9 +109,9 @@ def test_creates_view_instrument_matches(
def test_forwards_calls_to_view_instrument_match(
self, MockViewInstrumentMatch: Mock
):
view_instrument_match1 = Mock()
view_instrument_match2 = Mock()
view_instrument_match3 = Mock()
view_instrument_match1 = Mock(_aggregation=_LastValueAggregation({}))
view_instrument_match2 = Mock(_aggregation=_LastValueAggregation({}))
view_instrument_match3 = Mock(_aggregation=_LastValueAggregation({}))
MockViewInstrumentMatch.side_effect = [
view_instrument_match1,
view_instrument_match2,
@@ -163,7 +166,12 @@ def test_forwards_calls_to_view_instrument_match(
view_instrument_match1.collect.assert_called_once()
view_instrument_match2.collect.assert_called_once()
view_instrument_match3.collect.assert_called_once()
self.assertEqual(result, all_metrics)
self.assertEqual(result[0].data.data_points[0], all_metrics[0])
self.assertEqual(result[0].data.data_points[1], all_metrics[1])
self.assertEqual(result[1].data.data_points[0], all_metrics[2])
self.assertEqual(result[1].data.data_points[1], all_metrics[3])
self.assertEqual(result[2].data.data_points[0], all_metrics[4])
self.assertEqual(result[2].data.data_points[1], all_metrics[5])

@patch(
"opentelemetry.sdk._metrics._internal."
4 changes: 2 additions & 2 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
@@ -513,11 +513,11 @@ def test_duplicate_instrument_aggregate_data(self):
self.assertEqual(metric_0.name, "counter")
self.assertEqual(metric_0.unit, "unit")
self.assertEqual(metric_0.description, "description")
self.assertEqual(metric_0.point.value, 3)
self.assertEqual(metric_0.data.data_points[0].value, 3)

metric_1 = metrics[1]

self.assertEqual(metric_1.name, "counter")
self.assertEqual(metric_1.unit, "unit")
self.assertEqual(metric_1.description, "description")
self.assertEqual(metric_1.point.value, 7)
self.assertEqual(metric_1.data.data_points[0].value, 7)
Original file line number Diff line number Diff line change
@@ -23,10 +23,10 @@
Metric,
MetricExporter,
MetricExportResult,
NumberDataPoint,
PeriodicExportingMetricReader,
Sum,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.test.concurrency_test import ConcurrencyTestBase
from opentelemetry.util._time import _time_ns

@@ -54,29 +54,34 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
metrics_list = [
Metric(
name="sum_name",
attributes={},
description="",
instrumentation_scope=None,
resource=Resource.create(),
unit="",
point=Sum(
start_time_unix_nano=_time_ns(),
time_unix_nano=_time_ns(),
value=2,
data=Sum(
data_points=[
NumberDataPoint(
attributes={},
start_time_unix_nano=_time_ns(),
time_unix_nano=_time_ns(),
value=2,
)
],
aggregation_temporality=1,
is_monotonic=True,
),
),
Metric(
name="gauge_name",
attributes={},
description="",
instrumentation_scope=None,
resource=Resource.create(),
unit="",
point=Gauge(
time_unix_nano=_time_ns(),
value=2,
data=Gauge(
data_points=[
NumberDataPoint(
attributes={},
start_time_unix_nano=_time_ns(),
time_unix_nano=_time_ns(),
value=2,
)
]
),
),
]
66 changes: 44 additions & 22 deletions opentelemetry-sdk/tests/metrics/test_point.py
Original file line number Diff line number Diff line change
@@ -14,22 +14,22 @@

from unittest import TestCase

from opentelemetry.sdk._metrics.export import Gauge, Histogram, Metric, Sum
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.sdk._metrics.export import (
Gauge,
Histogram,
HistogramDataPoint,
Metric,
NumberDataPoint,
Sum,
)


def _create_metric(value):
def _create_metric(data):
return Metric(
attributes={"attr-key": "test-val"},
description="test-description",
instrumentation_scope=InstrumentationScope(
name="name", version="version"
),
name="test-name",
resource=Resource({"resource-key": "resource-val"}),
description="test-description",
unit="test-unit",
point=value,
data=data,
)


@@ -38,11 +38,16 @@ def test_sum(self):
self.maxDiff = None
point = _create_metric(
Sum(
data_points=[
NumberDataPoint(
attributes={"attr-key": "test-val"},
start_time_unix_nano=10,
time_unix_nano=20,
value=9,
)
],
aggregation_temporality=2,
is_monotonic=True,
start_time_unix_nano=10,
time_unix_nano=20,
value=9,
)
)
self.assertEqual(
@@ -51,7 +56,18 @@ def test_sum(self):
)

def test_gauge(self):
point = _create_metric(Gauge(time_unix_nano=40, value=20))
point = _create_metric(
Gauge(
data_points=[
NumberDataPoint(
attributes={"attr-key": "test-val"},
start_time_unix_nano=10,
time_unix_nano=20,
value=9,
)
]
)
)
self.assertEqual(
'{"attributes": {"attr-key": "test-val"}, "description": "test-description", "instrumentation_scope": "InstrumentationScope(name, version, None)", "name": "test-name", "resource": "BoundedAttributes({\'resource-key\': \'resource-val\'}, maxlen=None)", "unit": "test-unit", "point": {"time_unix_nano": 40, "value": 20}}',
point.to_json(),
@@ -60,14 +76,20 @@ def test_gauge(self):
def test_histogram(self):
point = _create_metric(
Histogram(
data_points=[
HistogramDataPoint(
attributes={"attr-key": "test-val"},
start_time_unix_nano=50,
time_unix_nano=60,
count=1,
sum=0.8,
bucket_counts=[0, 0, 1, 0],
explicit_bounds=[0.1, 0.5, 0.9, 1],
min=0.8,
max=0.8,
)
],
aggregation_temporality=1,
bucket_counts=[0, 0, 1, 0],
explicit_bounds=[0.1, 0.5, 0.9, 1],
max=0.8,
min=0.8,
start_time_unix_nano=50,
sum=0.8,
time_unix_nano=60,
)
)
self.maxDiff = None
68 changes: 18 additions & 50 deletions opentelemetry-sdk/tests/metrics/test_view_instrument_match.py
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@
from opentelemetry.sdk._metrics._internal.sdk_configuration import (
SdkConfiguration,
)
from opentelemetry.sdk._metrics.export import AggregationTemporality, Metric
from opentelemetry.sdk._metrics.export import AggregationTemporality
from opentelemetry.sdk._metrics.view import (
DefaultAggregation,
DropAggregation,
@@ -63,12 +63,6 @@ def test_consume_measurement(self):
attribute_keys={"a", "c"},
),
instrument=instrument1,
sdk_config=self.sdk_configuration,
instrument_class_temporality=MagicMock(
**{
"__getitem__.return_value": AggregationTemporality.CUMULATIVE
}
),
instrument_class_aggregation=MagicMock(
**{"__getitem__.return_value": DefaultAggregation()}
),
@@ -110,12 +104,6 @@ def test_consume_measurement(self):
aggregation=self.mock_aggregation_factory,
),
instrument=instrument1,
sdk_config=self.sdk_configuration,
instrument_class_temporality=MagicMock(
**{
"__getitem__.return_value": AggregationTemporality.CUMULATIVE
}
),
instrument_class_aggregation=MagicMock(
**{"__getitem__.return_value": DefaultAggregation()}
),
@@ -147,12 +135,6 @@ def test_consume_measurement(self):
attribute_keys={},
),
instrument=instrument1,
sdk_config=self.sdk_configuration,
instrument_class_temporality=MagicMock(
**{
"__getitem__.return_value": AggregationTemporality.CUMULATIVE
}
),
instrument_class_aggregation=MagicMock(
**{"__getitem__.return_value": DefaultAggregation()}
),
@@ -177,12 +159,6 @@ def test_consume_measurement(self):
attribute_keys={},
),
instrument=instrument1,
sdk_config=self.sdk_configuration,
instrument_class_temporality=MagicMock(
**{
"__getitem__.return_value": AggregationTemporality.CUMULATIVE
}
),
instrument_class_aggregation=MagicMock(
**{"__getitem__.return_value": DefaultAggregation()}
),
@@ -196,24 +172,22 @@ def test_consume_measurement(self):
)

def test_collect(self):
instrument1 = Mock(
name="instrument1", description="description", unit="unit"
instrument1 = Counter(
"instrument1",
Mock(),
Mock(),
description="description",
unit="unit"
)
instrument1.instrumentation_scope = self.mock_instrumentation_scope
view_instrument_match = _ViewInstrumentMatch(
view=View(
instrument_name="instrument1",
name="name",
aggregation=self.mock_aggregation_factory,
aggregation=DefaultAggregation(),
attribute_keys={"a", "c"},
),
instrument=instrument1,
sdk_config=self.sdk_configuration,
instrument_class_temporality=MagicMock(
**{
"__getitem__.return_value": AggregationTemporality.CUMULATIVE
}
),
instrument_class_aggregation=MagicMock(
**{"__getitem__.return_value": DefaultAggregation()}
),
@@ -226,18 +200,18 @@ def test_collect(self):
attributes={"c": "d", "f": "g"},
)
)

number_data_points = view_instrument_match.collect(
AggregationTemporality.CUMULATIVE
)
self.assertEqual(len(number_data_points), 1)

number_data_point = number_data_points[0]

self.assertEqual(
next(view_instrument_match.collect()),
Metric(
attributes={"c": "d"},
description="description",
instrumentation_scope=self.mock_instrumentation_scope,
name="name",
resource=self.mock_resource,
unit="unit",
point=None,
),
number_data_point.attributes, frozenset({("c", "d")})
)
self.assertEqual(number_data_point.value, 0)

def test_setting_aggregation(self):
instrument1 = Counter(
@@ -256,12 +230,6 @@ def test_setting_aggregation(self):
attribute_keys={"a", "c"},
),
instrument=instrument1,
sdk_config=self.sdk_configuration,
instrument_class_temporality=MagicMock(
**{
"__getitem__.return_value": AggregationTemporality.CUMULATIVE
}
),
instrument_class_aggregation={Counter: LastValueAggregation()},
)

0 comments on commit a27fe4b

Please sign in to comment.