Skip to content

Commit

Permalink
Merge branch 'main' into issues3550
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl authored Feb 8, 2024
2 parents f4719c9 + b9ee532 commit 15be2e2
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 104 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Logs: ObservedTimestamp field is missing in console exporter output
([#3564](https://github.com/open-telemetry/opentelemetry-python/pull/3564))
- Fix explicit bucket histogram aggregation
([#3429](https://github.com/open-telemetry/opentelemetry-python/pull/3429))
- Add `code.lineno`, `code.function` and `code.filepath` to all logs
([#3645](https://github.com/open-telemetry/opentelemetry-python/pull/3645))
- Add Synchronous Gauge instrument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,22 +463,16 @@ def _get_attributes(record: logging.LogRecord) -> Attributes:
attributes[SpanAttributes.CODE_LINENO] = record.lineno

if record.exc_info:
exc_type = ""
message = ""
stack_trace = ""
exctype, value, tb = record.exc_info
if exctype is not None:
exc_type = exctype.__name__
attributes[SpanAttributes.EXCEPTION_TYPE] = exctype.__name__
if value is not None and value.args:
message = value.args[0]
attributes[SpanAttributes.EXCEPTION_MESSAGE] = value.args[0]
if tb is not None:
# https://github.com/open-telemetry/opentelemetry-specification/blob/9fa7c656b26647b27e485a6af7e38dc716eba98a/specification/trace/semantic_conventions/exceptions.md#stacktrace-representation
stack_trace = "".join(
attributes[SpanAttributes.EXCEPTION_STACKTRACE] = "".join(
traceback.format_exception(*record.exc_info)
)
attributes[SpanAttributes.EXCEPTION_TYPE] = exc_type
attributes[SpanAttributes.EXCEPTION_MESSAGE] = message
attributes[SpanAttributes.EXCEPTION_STACKTRACE] = stack_trace
return attributes

def _translate(self, record: logging.LogRecord) -> LogRecord:
Expand Down
187 changes: 104 additions & 83 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
def __init__(
self,
attributes: Attributes,
instrument_aggregation_temporality: AggregationTemporality,
start_time_unix_nano: int,
boundaries: Sequence[float] = (
0.0,
Expand All @@ -398,33 +399,43 @@ def __init__(
record_min_max: bool = True,
):
super().__init__(attributes)

self._boundaries = tuple(boundaries)
self._bucket_counts = self._get_empty_bucket_counts()
self._record_min_max = record_min_max
self._min = inf
self._max = -inf
self._sum = 0
self._record_min_max = record_min_max

self._start_time_unix_nano = start_time_unix_nano
# It is assumed that the "natural" aggregation temporality for a
# Histogram instrument is DELTA, like the "natural" aggregation
# temporality for a Counter is DELTA and the "natural" aggregation
# temporality for an ObservableCounter is CUMULATIVE.
self._instrument_aggregation_temporality = AggregationTemporality.DELTA
self._instrument_aggregation_temporality = (
instrument_aggregation_temporality
)

self._current_value = None

self._previous_collection_start_nano = self._start_time_unix_nano
self._previous_cumulative_value = self._get_empty_bucket_counts()
self._previous_min = inf
self._previous_max = -inf
self._previous_sum = 0

def _get_empty_bucket_counts(self) -> List[int]:
return [0] * (len(self._boundaries) + 1)

def aggregate(self, measurement: Measurement) -> None:
with self._lock:
if self._current_value is None:
self._current_value = self._get_empty_bucket_counts()

value = measurement.value
value = measurement.value

if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)
self._sum += value

self._sum += value
if self._record_min_max:
self._min = min(self._min, value)
self._max = max(self._max, value)

self._bucket_counts[bisect_left(self._boundaries, value)] += 1
self._current_value[bisect_left(self._boundaries, value)] += 1

def collect(
self,
Expand All @@ -434,84 +445,78 @@ def collect(
"""
Atomically return a point for the current value of the metric.
"""
with self._lock:
if not any(self._bucket_counts):
return None

bucket_counts = self._bucket_counts
start_time_unix_nano = self._start_time_unix_nano
with self._lock:
current_value = self._current_value
sum_ = self._sum
max_ = self._max
min_ = self._min
max_ = self._max

self._bucket_counts = self._get_empty_bucket_counts()
self._start_time_unix_nano = collection_start_nano
self._current_value = None
self._sum = 0
self._min = inf
self._max = -inf

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)
if (
self._instrument_aggregation_temporality
is AggregationTemporality.DELTA
):
# This happens when the corresponding instrument for this
# aggregation is synchronous.
if (
collection_aggregation_temporality
is AggregationTemporality.DELTA
):

if self._previous_point is None or (
self._instrument_aggregation_temporality
is collection_aggregation_temporality
):
self._previous_point = current_point
return current_point
if current_value is None:
return None

max_ = current_point.max
min_ = current_point.min
previous_collection_start_nano = (
self._previous_collection_start_nano
)
self._previous_collection_start_nano = (
collection_start_nano
)

if (
collection_aggregation_temporality
is AggregationTemporality.CUMULATIVE
):
start_time_unix_nano = self._previous_point.start_time_unix_nano
sum_ = current_point.sum + self._previous_point.sum
# Only update min/max on delta -> cumulative
max_ = max(current_point.max, self._previous_point.max)
min_ = min(current_point.min, self._previous_point.min)
bucket_counts = [
curr_count + prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,
)
]
else:
start_time_unix_nano = self._previous_point.time_unix_nano
sum_ = current_point.sum - self._previous_point.sum
bucket_counts = [
curr_count - prev_count
for curr_count, prev_count in zip(
current_point.bucket_counts,
self._previous_point.bucket_counts,
return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=previous_collection_start_nano,
time_unix_nano=collection_start_nano,
count=sum(current_value),
sum=sum_,
bucket_counts=tuple(current_value),
explicit_bounds=self._boundaries,
min=min_,
max=max_,
)

if current_value is None:
current_value = self._get_empty_bucket_counts()

self._previous_cumulative_value = [
current_value_element + previous_cumulative_value_element
for (
current_value_element,
previous_cumulative_value_element,
) in zip(current_value, self._previous_cumulative_value)
]
self._previous_min = min(min_, self._previous_min)
self._previous_max = max(max_, self._previous_max)
self._previous_sum = sum_ + self._previous_sum

return HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=self._start_time_unix_nano,
time_unix_nano=collection_start_nano,
count=sum(self._previous_cumulative_value),
sum=self._previous_sum,
bucket_counts=tuple(self._previous_cumulative_value),
explicit_bounds=self._boundaries,
min=self._previous_min,
max=self._previous_max,
)
]

current_point = HistogramDataPoint(
attributes=self._attributes,
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
count=sum(bucket_counts),
sum=sum_,
bucket_counts=tuple(bucket_counts),
explicit_bounds=current_point.explicit_bounds,
min=min_,
max=max_,
)
self._previous_point = current_point
return current_point
return None


# pylint: disable=protected-access
Expand Down Expand Up @@ -1100,7 +1105,11 @@ def _create_aggregation(

if isinstance(instrument, Histogram):
return _ExplicitBucketHistogramAggregation(
attributes, start_time_unix_nano
attributes,
instrument_aggregation_temporality=(
AggregationTemporality.DELTA
),
start_time_unix_nano=start_time_unix_nano,
)

if isinstance(instrument, ObservableGauge):
Expand Down Expand Up @@ -1179,8 +1188,18 @@ def _create_aggregation(
attributes: Attributes,
start_time_unix_nano: int,
) -> _Aggregation:

instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
instrument_aggregation_temporality = AggregationTemporality.DELTA
elif isinstance(instrument, Asynchronous):
instrument_aggregation_temporality = (
AggregationTemporality.CUMULATIVE
)

return _ExplicitBucketHistogramAggregation(
attributes,
instrument_aggregation_temporality,
start_time_unix_nano,
self._boundaries,
self._record_min_max,
Expand All @@ -1200,16 +1219,18 @@ def _create_aggregation(
start_time_unix_nano: int,
) -> _Aggregation:

temporality = AggregationTemporality.UNSPECIFIED
instrument_aggregation_temporality = AggregationTemporality.UNSPECIFIED
if isinstance(instrument, Synchronous):
temporality = AggregationTemporality.DELTA
instrument_aggregation_temporality = AggregationTemporality.DELTA
elif isinstance(instrument, Asynchronous):
temporality = AggregationTemporality.CUMULATIVE
instrument_aggregation_temporality = (
AggregationTemporality.CUMULATIVE
)

return _SumAggregation(
attributes,
isinstance(instrument, (Counter, ObservableCounter)),
temporality,
instrument_aggregation_temporality,
start_time_unix_nano,
)

Expand Down
Loading

0 comments on commit 15be2e2

Please sign in to comment.