From 6eca6d4e4c3cf63a2b80ab0b95e4292f916d0437 Mon Sep 17 00:00:00 2001 From: Chengzhong Wu Date: Wed, 29 Jun 2022 01:42:37 +0800 Subject: [PATCH] feat(sdk-metrics-base): detect resets on async metrics (#2990) --- experimental/CHANGELOG.md | 4 + .../test/node/CollectorMetricExporter.test.ts | 4 +- .../src/Instruments.ts | 3 +- .../src/aggregator/Drop.ts | 1 - .../src/aggregator/Histogram.ts | 16 +- .../src/aggregator/LastValue.ts | 17 +- .../src/aggregator/Sum.ts | 37 +- .../src/aggregator/types.ts | 5 +- .../src/state/AsyncMetricStorage.ts | 6 +- .../src/state/DeltaMetricProcessor.ts | 29 +- .../src/state/HashMap.ts | 9 + .../src/state/MeterProviderSharedState.ts | 4 +- .../src/state/MeterSharedState.ts | 3 +- .../src/state/MetricStorage.ts | 1 - .../src/state/MultiWritableMetricStorage.ts | 6 +- .../src/state/ObservableRegistry.ts | 15 +- .../src/state/SyncMetricStorage.ts | 6 +- .../src/state/TemporalMetricProcessor.ts | 48 ++- .../src/state/WritableMetricStorage.ts | 6 +- .../src/view/Aggregation.ts | 16 +- .../test/aggregator/Drop.test.ts | 2 - .../test/aggregator/Histogram.test.ts | 32 +- .../test/aggregator/LastValue.test.ts | 60 ++- .../test/aggregator/Sum.test.ts | 70 +++- .../test/state/AsyncMetricStorage.test.ts | 374 ++++++++++++++++-- .../test/state/DeltaMetricProcessor.test.ts | 26 +- .../test/state/MetricStorageRegistry.test.ts | 1 - .../state/MultiWritableMetricStorage.test.ts | 5 +- .../test/state/SyncMetricStorage.test.ts | 67 ++-- .../state/TemporalMetricProcessor.test.ts | 82 ++-- .../test/util.ts | 8 +- 31 files changed, 680 insertions(+), 283 deletions(-) diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index dd203fccef8..a76f3910474 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -62,6 +62,10 @@ All notable changes to experimental packages in this project will be documented * fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas * feat(trace-otlp-grpc): configure security with env vars #2827 @svetlanabrennan * feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas +* feat(sdk-metrics-base): detect resets on async metrics #2990 @legendecas + * Added monotonicity support in SumAggregator. + * Added reset and gaps detection for async metric instruments. + * Fixed the start time and end time of an exported metric with regarding to resets and gaps. ### :bug: (Bug Fix) diff --git a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts index c6607f5d644..c1b16cd2784 100644 --- a/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts +++ b/experimental/packages/opentelemetry-exporter-metrics-otlp-http/test/node/CollectorMetricExporter.test.ts @@ -306,8 +306,8 @@ describe('OTLPMetricExporter - node with json over http', () => { assert.ok(typeof metric3 !== 'undefined', "histogram doesn't exist"); ensureHistogramIsCorrect( metric3, - core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[1].dataPoints[0].endTime), - core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[1].dataPoints[0].startTime), + core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[2].dataPoints[0].endTime), + core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[2].dataPoints[0].startTime), [0, 100], [0, 2, 0] ); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts index 3cd1f0f15ba..5706059691a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/Instruments.ts @@ -17,6 +17,7 @@ import * as api from '@opentelemetry/api'; import * as metrics from '@opentelemetry/api-metrics'; import { ObservableCallback } from '@opentelemetry/api-metrics'; +import { hrTime } from '@opentelemetry/core'; import { InstrumentDescriptor } from './InstrumentDescriptor'; import { ObservableRegistry } from './state/ObservableRegistry'; import { AsyncWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage'; @@ -31,7 +32,7 @@ export class SyncInstrument { ); value = Math.trunc(value); } - this._writableMetricStorage.record(value, attributes, context); + this._writableMetricStorage.record(value, attributes, context, hrTime()); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts index 56ef74e554a..183a579531c 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Drop.ts @@ -45,7 +45,6 @@ export class DropAggregator implements Aggregator { _descriptor: InstrumentDescriptor, _aggregationTemporality: AggregationTemporality, _accumulationByAttributes: AccumulationRecord[], - _startTime: HrTime, _endTime: HrTime): Maybe { return undefined; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts index 03639e92074..60a1d484d45 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Histogram.ts @@ -45,6 +45,7 @@ function createNewEmptyCheckpoint(boundaries: number[]): Histogram { export class HistogramAccumulation implements Accumulation { constructor( + public startTime: HrTime, private readonly _boundaries: number[], private _recordMinMax = true, private _current: Histogram = createNewEmptyCheckpoint(_boundaries) @@ -70,6 +71,10 @@ export class HistogramAccumulation implements Accumulation { this._current.buckets.counts[this._boundaries.length] += 1; } + setStartTime(startTime: HrTime): void { + this.startTime = startTime; + } + toPointValue(): Histogram { return this._current; } @@ -88,8 +93,8 @@ export class HistogramAggregator implements Aggregator { */ constructor(private readonly _boundaries: number[], private readonly _recordMinMax: boolean) {} - createAccumulation() { - return new HistogramAccumulation(this._boundaries, this._recordMinMax); + createAccumulation(startTime: HrTime) { + return new HistogramAccumulation(startTime, this._boundaries, this._recordMinMax); } /** @@ -125,7 +130,7 @@ export class HistogramAggregator implements Aggregator { } } - return new HistogramAccumulation(previousValue.buckets.boundaries, this._recordMinMax, { + return new HistogramAccumulation(previous.startTime, previousValue.buckets.boundaries, this._recordMinMax, { buckets: { boundaries: previousValue.buckets.boundaries, counts: mergedCounts, @@ -153,7 +158,7 @@ export class HistogramAggregator implements Aggregator { diffedCounts[idx] = currentCounts[idx] - previousCounts[idx]; } - return new HistogramAccumulation(previousValue.buckets.boundaries, this._recordMinMax, { + return new HistogramAccumulation(current.startTime, previousValue.buckets.boundaries, this._recordMinMax, { buckets: { boundaries: previousValue.buckets.boundaries, counts: diffedCounts, @@ -170,7 +175,6 @@ export class HistogramAggregator implements Aggregator { descriptor: InstrumentDescriptor, aggregationTemporality: AggregationTemporality, accumulationByAttributes: AccumulationRecord[], - startTime: HrTime, endTime: HrTime): Maybe { return { descriptor, @@ -179,7 +183,7 @@ export class HistogramAggregator implements Aggregator { dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => { return { attributes, - startTime, + startTime: accumulation.startTime, endTime, value: accumulation.toPointValue(), }; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts index a6ec0365644..0905b03daf9 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/LastValue.ts @@ -23,13 +23,17 @@ import { Maybe } from '../utils'; import { AggregationTemporality } from '../export/AggregationTemporality'; export class LastValueAccumulation implements Accumulation { - constructor(private _current: number = 0, public sampleTime: HrTime = [0, 0]) {} + constructor(public startTime: HrTime, private _current: number = 0, public sampleTime: HrTime = [0, 0]) {} record(value: number): void { this._current = value; this.sampleTime = hrTime(); } + setStartTime(startTime: HrTime): void { + this.startTime = startTime; + } + toPointValue(): LastValue { return this._current; } @@ -39,8 +43,8 @@ export class LastValueAccumulation implements Accumulation { export class LastValueAggregator implements Aggregator { public kind: AggregatorKind.LAST_VALUE = AggregatorKind.LAST_VALUE; - createAccumulation() { - return new LastValueAccumulation(); + createAccumulation(startTime: HrTime) { + return new LastValueAccumulation(startTime); } /** @@ -51,7 +55,7 @@ export class LastValueAggregator implements Aggregator { merge(previous: LastValueAccumulation, delta: LastValueAccumulation): LastValueAccumulation { // nanoseconds may lose precisions. const latestAccumulation = hrTimeToMicroseconds(delta.sampleTime) >= hrTimeToMicroseconds(previous.sampleTime) ? delta : previous; - return new LastValueAccumulation(latestAccumulation.toPointValue(), latestAccumulation.sampleTime); + return new LastValueAccumulation(previous.startTime, latestAccumulation.toPointValue(), latestAccumulation.sampleTime); } /** @@ -63,14 +67,13 @@ export class LastValueAggregator implements Aggregator { diff(previous: LastValueAccumulation, current: LastValueAccumulation): LastValueAccumulation { // nanoseconds may lose precisions. const latestAccumulation = hrTimeToMicroseconds(current.sampleTime) >= hrTimeToMicroseconds(previous.sampleTime) ? current : previous; - return new LastValueAccumulation(latestAccumulation.toPointValue(), latestAccumulation.sampleTime); + return new LastValueAccumulation(current.startTime, latestAccumulation.toPointValue(), latestAccumulation.sampleTime); } toMetricData( descriptor: InstrumentDescriptor, aggregationTemporality: AggregationTemporality, accumulationByAttributes: AccumulationRecord[], - startTime: HrTime, endTime: HrTime): Maybe { return { descriptor, @@ -79,7 +82,7 @@ export class LastValueAggregator implements Aggregator { dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => { return { attributes, - startTime, + startTime: accumulation.startTime, endTime, value: accumulation.toPointValue(), }; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts index 3b8647d730d..1482841106a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/Sum.ts @@ -22,12 +22,19 @@ import { Maybe } from '../utils'; import { AggregationTemporality } from '../export/AggregationTemporality'; export class SumAccumulation implements Accumulation { - constructor(private _current: number = 0) {} + constructor(public startTime: HrTime, public monotonic: boolean, private _current: number = 0, public reset = false) {} record(value: number): void { + if (this.monotonic && value < 0) { + return; + } this._current += value; } + setStartTime(startTime: HrTime): void { + this.startTime = startTime; + } + toPointValue(): Sum { return this._current; } @@ -37,29 +44,45 @@ export class SumAccumulation implements Accumulation { export class SumAggregator implements Aggregator { public kind: AggregatorKind.SUM = AggregatorKind.SUM; - createAccumulation() { - return new SumAccumulation(); + constructor (public monotonic: boolean) {} + + createAccumulation(startTime: HrTime) { + return new SumAccumulation(startTime, this.monotonic); } /** * Returns the result of the merge of the given accumulations. */ merge(previous: SumAccumulation, delta: SumAccumulation): SumAccumulation { - return new SumAccumulation(previous.toPointValue() + delta.toPointValue()); + const prevPv = previous.toPointValue(); + const deltaPv = delta.toPointValue(); + if (delta.reset) { + return new SumAccumulation(delta.startTime, this.monotonic, deltaPv, delta.reset); + } + return new SumAccumulation(previous.startTime, this.monotonic, prevPv + deltaPv); } /** * Returns a new DELTA aggregation by comparing two cumulative measurements. */ diff(previous: SumAccumulation, current: SumAccumulation): SumAccumulation { - return new SumAccumulation(current.toPointValue() - previous.toPointValue()); + const prevPv = previous.toPointValue(); + const currPv = current.toPointValue(); + /** + * If the SumAggregator is a monotonic one and the previous point value is + * greater than the current one, a reset is deemed to be happened. + * Return the current point value to prevent the value from been reset. + */ + if (this.monotonic && (prevPv > currPv)) { + return new SumAccumulation(current.startTime, this.monotonic, currPv, true); + } + return new SumAccumulation(current.startTime, this.monotonic, currPv - prevPv); } toMetricData( descriptor: InstrumentDescriptor, aggregationTemporality: AggregationTemporality, accumulationByAttributes: AccumulationRecord[], - startTime: HrTime, endTime: HrTime): Maybe { return { descriptor, @@ -68,7 +91,7 @@ export class SumAggregator implements Aggregator { dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => { return { attributes, - startTime, + startTime: accumulation.startTime, endTime, value: accumulation.toPointValue(), }; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts index bd7c51f3ebd..827a2af47ba 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/aggregator/types.ts @@ -68,6 +68,7 @@ export interface Histogram { * An Aggregator accumulation state. */ export interface Accumulation { + setStartTime(startTime: HrTime): void; record(value: number): void; } @@ -84,7 +85,7 @@ export interface Aggregator { /** * Create a clean state of accumulation. */ - createAccumulation(): T; + createAccumulation(startTime: HrTime): T; /** * Returns the result of the merge of the given accumulations. @@ -112,13 +113,11 @@ export interface Aggregator { * * @param descriptor the metric instrument descriptor. * @param accumulationByAttributes the array of attributes and accumulation pairs. - * @param startTime the start time of the metric data. * @param endTime the end time of the metric data. * @return the {@link MetricData} that this {@link Aggregator} will produce. */ toMetricData(descriptor: InstrumentDescriptor, aggregationTemporality: AggregationTemporality, accumulationByAttributes: AccumulationRecord[], - startTime: HrTime, endTime: HrTime): Maybe; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts index c8c9aedc1d8..6742bf884cb 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/AsyncMetricStorage.ts @@ -46,12 +46,12 @@ export class AsyncMetricStorage> extends MetricSto this._temporalMetricStorage = new TemporalMetricProcessor(aggregator); } - record(measurements: AttributeHashMap) { + record(measurements: AttributeHashMap, observationTime: HrTime) { const processed = new AttributeHashMap(); Array.from(measurements.entries()).forEach(([attributes, value]) => { processed.set(this._attributesProcessor.process(attributes), value); }); - this._deltaMetricStorage.batchCumulate(processed); + this._deltaMetricStorage.batchCumulate(processed, observationTime); } /** @@ -64,7 +64,6 @@ export class AsyncMetricStorage> extends MetricSto collect( collector: MetricCollectorHandle, collectors: MetricCollectorHandle[], - sdkStartTime: HrTime, collectionTime: HrTime, ): Maybe { const accumulations = this._deltaMetricStorage.collect(); @@ -74,7 +73,6 @@ export class AsyncMetricStorage> extends MetricSto collectors, this._instrumentDescriptor, accumulations, - sdkStartTime, collectionTime ); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricProcessor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricProcessor.ts index 8ce7d641b3e..22c25edab0b 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricProcessor.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/DeltaMetricProcessor.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { Context } from '@opentelemetry/api'; +import { Context, HrTime } from '@opentelemetry/api'; import { MetricAttributes } from '@opentelemetry/api-metrics'; import { Maybe } from '../utils'; import { Accumulation, Aggregator } from '../aggregator/types'; @@ -35,31 +35,36 @@ export class DeltaMetricProcessor> { constructor(private _aggregator: Aggregator) {} - /** Bind an efficient storage handle for a set of attributes. */ - private bind(attributes: MetricAttributes) { - return this._activeCollectionStorage.getOrDefault(attributes, () => this._aggregator.createAccumulation()); - } - - record(value: number, attributes: MetricAttributes, _context: Context) { - const accumulation = this.bind(attributes); + record(value: number, attributes: MetricAttributes, _context: Context, collectionTime: HrTime) { + const accumulation = this._activeCollectionStorage.getOrDefault( + attributes, + () => this._aggregator.createAccumulation(collectionTime) + ); accumulation?.record(value); } - batchCumulate(measurements: AttributeHashMap) { + batchCumulate(measurements: AttributeHashMap, collectionTime: HrTime) { Array.from(measurements.entries()).forEach(([attributes, value, hashCode]) => { - let accumulation = this._aggregator.createAccumulation(); + const accumulation = this._aggregator.createAccumulation(collectionTime); accumulation?.record(value); + let delta = accumulation; if (this._cumulativeMemoStorage.has(attributes, hashCode)) { + // has() returned true, previous is present. // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const previous = this._cumulativeMemoStorage.get(attributes, hashCode)!; - accumulation = this._aggregator.diff(previous, accumulation); + delta = this._aggregator.diff(previous, accumulation); } + // Save the current record and the delta record. this._cumulativeMemoStorage.set(attributes, accumulation, hashCode); - this._activeCollectionStorage.set(attributes, accumulation, hashCode); + this._activeCollectionStorage.set(attributes, delta, hashCode); }); } + /** + * Returns a collection of delta metrics. Start time is the when first + * time event collected. + */ collect() { const unreportedDelta = this._activeCollectionStorage; this._activeCollectionStorage = new AttributeHashMap(); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/HashMap.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/HashMap.ts index b48551feaba..0f8efd37940 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/HashMap.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/HashMap.ts @@ -58,6 +58,15 @@ export class HashMap { return this._valueMap.has(hashCode); } + *keys(): IterableIterator<[KeyType, HashCodeType]> { + const keyIterator = this._keyMap.entries(); + let next = keyIterator.next(); + while (next.done !== true) { + yield [ next.value[1], next.value[0]]; + next = keyIterator.next(); + } + } + *entries(): IterableIterator<[KeyType, ValueType, HashCodeType]> { const valueIterator = this._valueMap.entries(); let next = valueIterator.next(); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts index 536d5fa2ec8..87fe540ee56 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterProviderSharedState.ts @@ -14,8 +14,7 @@ * limitations under the License. */ -import { HrTime } from '@opentelemetry/api'; -import { hrTime, InstrumentationScope } from '@opentelemetry/core'; +import { InstrumentationScope } from '@opentelemetry/core'; import { Resource } from '@opentelemetry/resources'; import { instrumentationScopeId } from '../utils'; import { ViewRegistry } from '../view/ViewRegistry'; @@ -27,7 +26,6 @@ import { MetricCollector } from './MetricCollector'; */ export class MeterProviderSharedState { viewRegistry = new ViewRegistry(); - readonly sdkStartTime: HrTime = hrTime(); metricCollectors: MetricCollector[] = []; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts index b8feefaddd2..ed96aca444a 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MeterSharedState.ts @@ -80,13 +80,12 @@ export class MeterSharedState { * 1. Call all observable callbacks first. * 2. Collect metric result for the collector. */ - const errors = await this.observableRegistry.observe(options?.timeoutMillis); + const errors = await this.observableRegistry.observe(collectionTime, options?.timeoutMillis); const metricDataList = Array.from(this._metricStorageRegistry.getStorages()) .map(metricStorage => { return metricStorage.collect( collector, this._meterProviderSharedState.metricCollectors, - this._meterProviderSharedState.sdkStartTime, collectionTime); }) .filter(isNotNullish); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts index 2d92aa16839..8e24da668ab 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MetricStorage.ts @@ -38,7 +38,6 @@ export abstract class MetricStorage { abstract collect( collector: MetricCollectorHandle, collectors: MetricCollectorHandle[], - sdkStartTime: HrTime, collectionTime: HrTime, ): Maybe; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts index e5f31e378ec..323ef839477 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/MultiWritableMetricStorage.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { Context } from '@opentelemetry/api'; +import { Context, HrTime } from '@opentelemetry/api'; import { MetricAttributes } from '@opentelemetry/api-metrics'; import { WritableMetricStorage } from './WritableMetricStorage'; @@ -24,9 +24,9 @@ import { WritableMetricStorage } from './WritableMetricStorage'; export class MultiMetricStorage implements WritableMetricStorage { constructor(private readonly _backingStorages: WritableMetricStorage[]) {} - record(value: number, attributes: MetricAttributes, context: Context) { + record(value: number, attributes: MetricAttributes, context: Context, recordTime: HrTime) { this._backingStorages.forEach(it => { - it.record(value, attributes, context); + it.record(value, attributes, context, recordTime); }); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts index 005828283f8..c74b9f9ab9f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/ObservableRegistry.ts @@ -15,6 +15,7 @@ */ import * as api from '@opentelemetry/api'; +import { HrTime } from '@opentelemetry/api'; import { BatchObservableCallback, Observable, ObservableCallback } from '@opentelemetry/api-metrics'; import { isObservableInstrument, ObservableInstrument } from '../Instruments'; import { BatchObservableResultImpl, ObservableResultImpl } from '../ObservableResult'; @@ -89,9 +90,9 @@ export class ObservableRegistry { /** * @returns a promise of rejected reasons for invoking callbacks. */ - async observe(timeoutMillis?: number): Promise { - const callbackFutures = this._observeCallbacks(timeoutMillis); - const batchCallbackFutures = this._observeBatchCallbacks(timeoutMillis); + async observe(collectionTime: HrTime, timeoutMillis?: number): Promise { + const callbackFutures = this._observeCallbacks(collectionTime, timeoutMillis); + const batchCallbackFutures = this._observeBatchCallbacks(collectionTime, timeoutMillis); const results = await PromiseAllSettled([...callbackFutures, ...batchCallbackFutures]); @@ -100,7 +101,7 @@ export class ObservableRegistry { return rejections; } - private _observeCallbacks(timeoutMillis?: number) { + private _observeCallbacks(observationTime: HrTime, timeoutMillis?: number) { return this._callbacks .map(async ({ callback, instrument }) => { const observableResult = new ObservableResultImpl(instrument._descriptor); @@ -110,12 +111,12 @@ export class ObservableRegistry { } await callPromise; instrument._metricStorages.forEach(metricStorage => { - metricStorage.record(observableResult._buffer); + metricStorage.record(observableResult._buffer, observationTime); }); }); } - private _observeBatchCallbacks(timeoutMillis?: number) { + private _observeBatchCallbacks(observationTime: HrTime, timeoutMillis?: number) { return this._batchCallbacks .map(async ({ callback, instruments }) => { const observableResult = new BatchObservableResultImpl(); @@ -130,7 +131,7 @@ export class ObservableRegistry { return; } instrument._metricStorages.forEach(metricStorage => { - metricStorage.record(buffer); + metricStorage.record(buffer, observationTime); }); }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts index 06a69846306..26d5fec1abc 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/SyncMetricStorage.ts @@ -46,9 +46,9 @@ export class SyncMetricStorage> extends MetricStor this._temporalMetricStorage = new TemporalMetricProcessor(aggregator); } - record(value: number, attributes: MetricAttributes, context: Context) { + record(value: number, attributes: MetricAttributes, context: Context, recordTime: HrTime) { attributes = this._attributesProcessor.process(attributes, context); - this._deltaMetricStorage.record(value, attributes, context); + this._deltaMetricStorage.record(value, attributes, context, recordTime); } /** @@ -60,7 +60,6 @@ export class SyncMetricStorage> extends MetricStor collect( collector: MetricCollectorHandle, collectors: MetricCollectorHandle[], - sdkStartTime: HrTime, collectionTime: HrTime, ): Maybe { const accumulations = this._deltaMetricStorage.collect(); @@ -70,7 +69,6 @@ export class SyncMetricStorage> extends MetricStor collectors, this._instrumentDescriptor, accumulations, - sdkStartTime, collectionTime ); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts index f0bd55fe3d4..6c648ac1172 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts @@ -15,7 +15,7 @@ */ import { HrTime } from '@opentelemetry/api'; -import { AccumulationRecord, Aggregator } from '../aggregator/types'; +import { Accumulation, AccumulationRecord, Aggregator } from '../aggregator/types'; import { MetricData } from '../export/MetricData'; import { InstrumentDescriptor } from '../InstrumentDescriptor'; import { AggregationTemporality } from '../export/AggregationTemporality'; @@ -26,7 +26,7 @@ import { AttributeHashMap } from './HashMap'; /** * Remembers what was presented to a specific exporter. */ -interface LastReportedHistory { +interface LastReportedHistory> { /** * The last accumulation of metric data. */ @@ -47,7 +47,7 @@ interface LastReportedHistory { * Provides unique reporting for each collectors. Allows synchronous collection * of metrics and reports given temporality values. */ -export class TemporalMetricProcessor { +export class TemporalMetricProcessor> { private _unreportedAccumulations = new Map[]>(); private _reportHistory = new Map>(); @@ -61,7 +61,6 @@ export class TemporalMetricProcessor { * @param instrumentationScope The instrumentation scope that generated these metrics. * @param instrumentDescriptor The instrumentation descriptor that these metrics generated with. * @param currentAccumulations The current accumulation of metric data from instruments. - * @param sdkStartTime The sdk start timestamp. * @param collectionTime The current collection timestamp. * @returns The {@link MetricData} points or `null`. */ @@ -70,12 +69,8 @@ export class TemporalMetricProcessor { collectors: MetricCollectorHandle[], instrumentDescriptor: InstrumentDescriptor, currentAccumulations: AttributeHashMap, - sdkStartTime: HrTime, collectionTime: HrTime, ): Maybe { - // In case it's our first collection, default to start timestamp (see below for explanation). - let lastCollectionTime = sdkStartTime; - this._stashAccumulations(collectors, currentAccumulations); const unreportedAccumulations = this._getMergedUnreportedAccumulations(collector); @@ -85,7 +80,7 @@ export class TemporalMetricProcessor { if (this._reportHistory.has(collector)) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const last = this._reportHistory.get(collector)!; - lastCollectionTime = last.collectionTime; + const lastCollectionTime = last.collectionTime; aggregationTemporality = last.aggregationTemporality; // Use aggregation temporality + instrument to determine if we do a merge or a diff of @@ -96,14 +91,16 @@ export class TemporalMetricProcessor { // Cumulative records are converted to delta recording with DeltaMetricProcessor. // Here we merge with our last record to get a cumulative aggregation. // 3. Delta Aggregation + Delta recording - // Do nothing here. + // Calibrate the startTime of metric streams to be the reader's lastCollectionTime. // 4. Delta Aggregation + Cumulative recording. // Cumulative records are converted to delta recording with DeltaMetricProcessor. - // Do nothing here. + // Calibrate the startTime of metric streams to be the reader's lastCollectionTime. if (aggregationTemporality === AggregationTemporality.CUMULATIVE) { // We need to make sure the current delta recording gets merged into the previous cumulative // for the next cumulative recording. result = TemporalMetricProcessor.merge(last.accumulations, unreportedAccumulations, this._aggregator); + } else { + result = TemporalMetricProcessor.calibrateStartTime(last.accumulations, unreportedAccumulations, lastCollectionTime); } } else { // Call into user code to select aggregation temporality for the instrument. @@ -117,14 +114,10 @@ export class TemporalMetricProcessor { aggregationTemporality, }); - // Metric data time span is determined as: - // 1. Cumulative Aggregation time span: (sdkStartTime, collectionTime] - // 2. Delta Aggregation time span: (lastCollectionTime, collectionTime] return this._aggregator.toMetricData( instrumentDescriptor, aggregationTemporality, AttributesMapToAccumulationRecords(result), - /* startTime */ aggregationTemporality === AggregationTemporality.CUMULATIVE ? sdkStartTime : lastCollectionTime, /* endTime */ collectionTime); } @@ -152,19 +145,38 @@ export class TemporalMetricProcessor { return result; } - static merge(last: AttributeHashMap, current: AttributeHashMap, aggregator: Aggregator) { + static merge>(last: AttributeHashMap, current: AttributeHashMap, aggregator: Aggregator) { const result = last; const iterator = current.entries(); let next = iterator.next(); while (next.done !== true) { const [key, record, hash] = next.value; - const lastAccumulation = last.get(key, hash) ?? aggregator.createAccumulation(); - result.set(key, aggregator.merge(lastAccumulation, record), hash); + if (last.has(key, hash)) { + const lastAccumulation = last.get(key, hash); + // last.has() returned true, lastAccumulation is present. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const accumulation = aggregator.merge(lastAccumulation!, record); + result.set(key, accumulation, hash); + } else { + result.set(key, record, hash); + } next = iterator.next(); } return result; } + + /** + * Calibrate the reported metric streams' startTime to lastCollectionTime. Leaves + * the new stream to be the initial observation time unchanged. + */ + static calibrateStartTime>(last: AttributeHashMap, current: AttributeHashMap, lastCollectionTime: HrTime) { + for (const [key, hash] of last.keys()) { + const currentAccumulation = current.get(key, hash); + currentAccumulation?.setStartTime(lastCollectionTime); + } + return current; + } } // TypeScript complains about converting 3 elements tuple to AccumulationRecord. diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts index 132754705f0..8bfdbcfde12 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/WritableMetricStorage.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import { Context } from '@opentelemetry/api'; +import { Context, HrTime } from '@opentelemetry/api'; import { MetricAttributes } from '@opentelemetry/api-metrics'; import { AttributeHashMap } from './HashMap'; @@ -26,7 +26,7 @@ import { AttributeHashMap } from './HashMap'; */ export interface WritableMetricStorage { /** Records a measurement. */ - record(value: number, attributes: MetricAttributes, context: Context): void; + record(value: number, attributes: MetricAttributes, context: Context, recordTime: HrTime): void; } /** @@ -37,5 +37,5 @@ export interface WritableMetricStorage { */ export interface AsyncWritableMetricStorage { /** Records a batch of measurements. */ - record(measurements: AttributeHashMap): void; + record(measurements: AttributeHashMap, observationTime: HrTime): void; } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/view/Aggregation.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/view/Aggregation.ts index adeb4739378..ed6cdbb50ad 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/view/Aggregation.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/view/Aggregation.ts @@ -63,9 +63,19 @@ export class DropAggregation extends Aggregation { * The default sum aggregation. */ export class SumAggregation extends Aggregation { - private static DEFAULT_INSTANCE = new SumAggregator(); - createAggregator(_instrument: InstrumentDescriptor) { - return SumAggregation.DEFAULT_INSTANCE; + private static MONOTONIC_INSTANCE = new SumAggregator(true); + private static NON_MONOTONIC_INSTANCE = new SumAggregator(false); + createAggregator(instrument: InstrumentDescriptor) { + switch (instrument.type) { + case InstrumentType.COUNTER: + case InstrumentType.OBSERVABLE_COUNTER: + case InstrumentType.HISTOGRAM: { + return SumAggregation.MONOTONIC_INSTANCE; + } + default: { + return SumAggregation.NON_MONOTONIC_INSTANCE; + } + } } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Drop.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Drop.test.ts index d378a165c98..5f83b69aedc 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Drop.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Drop.test.ts @@ -51,14 +51,12 @@ describe('DropAggregator', () => { it('no exceptions', () => { const aggregator = new DropAggregator(); - const startTime: HrTime = [0, 0]; const endTime: HrTime = [1, 1]; assert.strictEqual(aggregator.toMetricData( defaultInstrumentDescriptor, AggregationTemporality.CUMULATIVE, [[{}, undefined]], - startTime, endTime, ), undefined); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Histogram.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Histogram.test.ts index eea2e609fd3..b1f6e05c0a3 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Histogram.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Histogram.test.ts @@ -25,7 +25,7 @@ describe('HistogramAggregator', () => { describe('createAccumulation', () => { it('no exceptions on createAccumulation', () => { const aggregator = new HistogramAggregator([1, 10, 100], true); - const accumulation = aggregator.createAccumulation(); + const accumulation = aggregator.createAccumulation([0, 0]); assert(accumulation instanceof HistogramAccumulation); }); }); @@ -33,15 +33,15 @@ describe('HistogramAggregator', () => { describe('merge', () => { it('no exceptions', () => { const aggregator = new HistogramAggregator([1, 10, 100], true); - const prev = aggregator.createAccumulation(); + const prev = aggregator.createAccumulation([0, 0]); prev.record(0); prev.record(1); - const delta = aggregator.createAccumulation(); + const delta = aggregator.createAccumulation([1, 1]); delta.record(2); delta.record(11); - const expected = aggregator.createAccumulation(); + const expected = aggregator.createAccumulation([0, 0]); // replay actions on prev expected.record(0); expected.record(1); @@ -56,11 +56,11 @@ describe('HistogramAggregator', () => { describe('diff', () => { it('no exceptions', () => { const aggregator = new HistogramAggregator([1, 10, 100], true); - const prev = aggregator.createAccumulation(); + const prev = aggregator.createAccumulation([0, 0]); prev.record(0); prev.record(1); - const curr = aggregator.createAccumulation(); + const curr = aggregator.createAccumulation([1, 1]); // replay actions on prev curr.record(0); curr.record(1); @@ -68,7 +68,7 @@ describe('HistogramAggregator', () => { curr.record(2); curr.record(11); - const expected = new HistogramAccumulation([1, 10, 100], true, { + const expected = new HistogramAccumulation([1, 1], [1, 10, 100], true, { buckets: { boundaries: [1, 10, 100], counts: [0, 1, 1, 0], @@ -88,12 +88,11 @@ describe('HistogramAggregator', () => { it('transform without exception', () => { const aggregator = new HistogramAggregator([1, 10, 100], true); - const accumulation = aggregator.createAccumulation(); - accumulation.record(0); - accumulation.record(1); - const startTime: HrTime = [0, 0]; const endTime: HrTime = [1, 1]; + const accumulation = aggregator.createAccumulation(startTime); + accumulation.record(0); + accumulation.record(1); const expected: MetricData = { descriptor: defaultInstrumentDescriptor, @@ -122,7 +121,6 @@ describe('HistogramAggregator', () => { defaultInstrumentDescriptor, AggregationTemporality.CUMULATIVE, [[{}, accumulation]], - startTime, endTime, ), expected); }); @@ -132,11 +130,19 @@ describe('HistogramAggregator', () => { describe('HistogramAccumulation', () => { describe('record', () => { it('no exceptions on record', () => { - const accumulation = new HistogramAccumulation([1, 10, 100]); + const accumulation = new HistogramAccumulation([0, 0], [1, 10, 100]); for (const value of commonValues) { accumulation.record(value); } }); }); + + describe('setStartTime', () => { + it('should set start time', () => { + const accumulation = new HistogramAccumulation([0, 0], [1, 10, 100]); + accumulation.setStartTime([1, 1]); + assert.deepStrictEqual(accumulation.startTime, [1, 1]); + }); + }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/LastValue.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/LastValue.test.ts index 63df2357c8e..420cabde654 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/LastValue.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/LastValue.test.ts @@ -25,7 +25,7 @@ describe('LastValueAggregator', () => { describe('createAccumulation', () => { it('no exceptions on createAccumulation', () => { const aggregator = new LastValueAggregator(); - const accumulation = aggregator.createAccumulation(); + const accumulation = aggregator.createAccumulation([0, 0]); assert(accumulation instanceof LastValueAccumulation); }); }); @@ -33,19 +33,21 @@ describe('LastValueAggregator', () => { describe('merge', () => { it('no exceptions', () => { const aggregator = new LastValueAggregator(); - const prev = aggregator.createAccumulation(); - const delta = aggregator.createAccumulation(); + const prev = aggregator.createAccumulation([0, 0]); + const delta = aggregator.createAccumulation([1, 1]); prev.record(2); delta.record(3); - assert.deepStrictEqual(aggregator.merge(prev, delta), delta); + const expected = new LastValueAccumulation([0, 0], 3, delta.sampleTime); + + assert.deepStrictEqual(aggregator.merge(prev, delta), expected); }); it('return the newly sampled accumulation', async () => { const aggregator = new LastValueAggregator(); - const accumulation1 = aggregator.createAccumulation(); - const accumulation2 = aggregator.createAccumulation(); + const accumulation1 = aggregator.createAccumulation([0, 0]); + const accumulation2 = aggregator.createAccumulation([1, 1]); accumulation1.record(2); await sleep(1); @@ -54,27 +56,33 @@ describe('LastValueAggregator', () => { await sleep(1); accumulation1.record(4); - assert.deepStrictEqual(aggregator.merge(accumulation1, accumulation2), accumulation1); - assert.deepStrictEqual(aggregator.merge(accumulation2, accumulation1), accumulation1); + assert.deepStrictEqual( + aggregator.merge(accumulation1, accumulation2), + new LastValueAccumulation(accumulation1.startTime, 4, accumulation1.sampleTime)); + assert.deepStrictEqual( + aggregator.merge(accumulation2, accumulation1), + new LastValueAccumulation(accumulation2.startTime, 4, accumulation1.sampleTime)); }); }); describe('diff', () => { it('no exceptions', () => { const aggregator = new LastValueAggregator(); - const prev = aggregator.createAccumulation(); - const curr = aggregator.createAccumulation(); + const prev = aggregator.createAccumulation([0, 0]); + const curr = aggregator.createAccumulation([1, 1]); prev.record(2); curr.record(3); - assert.deepStrictEqual(aggregator.diff(prev, curr), curr); + const expected = new LastValueAccumulation([1, 1], 3, curr.sampleTime); + + assert.deepStrictEqual(aggregator.diff(prev, curr), expected); }); it('return the newly sampled accumulation', async () => { const aggregator = new LastValueAggregator(); - const accumulation1 = aggregator.createAccumulation(); - const accumulation2 = aggregator.createAccumulation(); + const accumulation1 = aggregator.createAccumulation([0, 0]); + const accumulation2 = aggregator.createAccumulation([1, 1]); accumulation1.record(2); accumulation2.record(3); @@ -82,8 +90,12 @@ describe('LastValueAggregator', () => { await sleep(1); accumulation1.record(4); - assert.deepStrictEqual(aggregator.diff(accumulation1, accumulation2), accumulation1); - assert.deepStrictEqual(aggregator.diff(accumulation2, accumulation1), accumulation1); + assert.deepStrictEqual( + aggregator.diff(accumulation1, accumulation2), + new LastValueAccumulation(accumulation2.startTime, 4, accumulation1.sampleTime)); + assert.deepStrictEqual( + aggregator.diff(accumulation2, accumulation1), + new LastValueAccumulation(accumulation1.startTime, 4, accumulation1.sampleTime)); }); }); @@ -91,15 +103,14 @@ describe('LastValueAggregator', () => { it('transform without exception', () => { const aggregator = new LastValueAggregator(); - const accumulation = aggregator.createAccumulation(); + const startTime: HrTime = [0, 0]; + const endTime: HrTime = [1, 1]; + const accumulation = aggregator.createAccumulation(startTime); accumulation.record(1); accumulation.record(2); accumulation.record(1); accumulation.record(4); - const startTime: HrTime = [0, 0]; - const endTime: HrTime = [1, 1]; - const expected: MetricData = { descriptor: defaultInstrumentDescriptor, aggregationTemporality: AggregationTemporality.CUMULATIVE, @@ -117,7 +128,6 @@ describe('LastValueAggregator', () => { defaultInstrumentDescriptor, AggregationTemporality.CUMULATIVE, [[{}, accumulation]], - startTime, endTime, ), expected); }); @@ -127,11 +137,19 @@ describe('LastValueAggregator', () => { describe('LastValueAccumulation', () => { describe('record', () => { it('no exceptions on record', () => { - const accumulation = new LastValueAccumulation(); + const accumulation = new LastValueAccumulation([0, 0]); for (const value of commonValues) { accumulation.record(value); } }); }); + + describe('setStartTime', () => { + it('should set start time', () => { + const accumulation = new LastValueAccumulation([0, 0]); + accumulation.setStartTime([1, 1]); + assert.deepStrictEqual(accumulation.startTime, [1, 1]); + }); + }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Sum.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Sum.test.ts index 64db048d4e7..ac3df809a64 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Sum.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/aggregator/Sum.test.ts @@ -24,37 +24,36 @@ import { commonValues, defaultInstrumentDescriptor } from '../util'; describe('SumAggregator', () => { describe('createAccumulation', () => { it('no exceptions on createAccumulation', () => { - const aggregator = new SumAggregator(); - const accumulation = aggregator.createAccumulation(); + const aggregator = new SumAggregator(true); + const accumulation = aggregator.createAccumulation([0, 0]); assert(accumulation instanceof SumAccumulation); }); }); describe('merge', () => { it('no exceptions', () => { - const aggregator = new SumAggregator(); - const prev = aggregator.createAccumulation(); + const aggregator = new SumAggregator(true); + const prev = aggregator.createAccumulation([0, 0]); prev.record(1); prev.record(2); - const delta = aggregator.createAccumulation(); + const delta = aggregator.createAccumulation([1, 1]); delta.record(3); delta.record(4); - const expected = aggregator.createAccumulation(); - expected.record(1 + 2 + 3 + 4); + const expected = new SumAccumulation([0, 0], true, 1 + 2 + 3 + 4); assert.deepStrictEqual(aggregator.merge(prev, delta), expected); }); }); describe('diff', () => { - it('no exceptions', () => { - const aggregator = new SumAggregator(); - const prev = aggregator.createAccumulation(); + it('non-monotonic', () => { + const aggregator = new SumAggregator(false); + const prev = aggregator.createAccumulation([0, 0]); prev.record(1); prev.record(2); - const curr = aggregator.createAccumulation(); + const curr = aggregator.createAccumulation([1, 1]); // replay actions performed on prev curr.record(1); curr.record(2); @@ -62,21 +61,34 @@ describe('SumAggregator', () => { curr.record(3); curr.record(4); - const expected = aggregator.createAccumulation(); - expected.record(3 + 4); + const expected = new SumAccumulation([1, 1], false, 3 + 4); + assert.deepStrictEqual(aggregator.diff(prev, curr), expected); + }); + + it('monotonic', () => { + const aggregator = new SumAggregator(true); + const prev = aggregator.createAccumulation([0, 0]); + prev.record(10); + + // Create a new record that indicates a reset. + const curr = aggregator.createAccumulation([1, 1]); + curr.record(3); + + // Diff result detected reset. + const expected = new SumAccumulation([1, 1], true, 3, true); assert.deepStrictEqual(aggregator.diff(prev, curr), expected); }); }); describe('toMetricData', () => { it('transform without exception', () => { - const aggregator = new SumAggregator(); - const accumulation = aggregator.createAccumulation(); - accumulation.record(1); - accumulation.record(2); + const aggregator = new SumAggregator(true); const startTime: HrTime = [0, 0]; const endTime: HrTime = [1, 1]; + const accumulation = aggregator.createAccumulation(startTime); + accumulation.record(1); + accumulation.record(2); const expected: MetricData = { descriptor: defaultInstrumentDescriptor, @@ -95,7 +107,6 @@ describe('SumAggregator', () => { defaultInstrumentDescriptor, AggregationTemporality.CUMULATIVE, [[{}, accumulation]], - startTime, endTime, ), expected); }); @@ -105,11 +116,28 @@ describe('SumAggregator', () => { describe('SumAccumulation', () => { describe('record', () => { it('no exceptions on record', () => { - const accumulation = new SumAccumulation(); + for (const monotonic of [true, false]) { + const accumulation = new SumAccumulation([0, 0], monotonic); - for (const value of commonValues) { - accumulation.record(value); + for (const value of commonValues) { + accumulation.record(value); + } } }); + + it('should ignore negative values on monotonic sum', () => { + const accumulation = new SumAccumulation([0, 0], true); + accumulation.record(1); + accumulation.record(-1); + assert.strictEqual(accumulation.toPointValue(), 1); + }); + }); + + describe('setStartTime', () => { + it('should set start time', () => { + const accumulation = new SumAccumulation([0, 0], true); + accumulation.setStartTime([1, 1]); + assert.deepStrictEqual(accumulation.startTime, [1, 1]); + }); }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts index f65b642898f..dc0e7fe0722 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/AsyncMetricStorage.test.ts @@ -14,7 +14,6 @@ * limitations under the License. */ -import { hrTime } from '@opentelemetry/core'; import * as assert from 'assert'; import { SumAggregator } from '../../src/aggregator'; @@ -26,6 +25,7 @@ import { NoopAttributesProcessor } from '../../src/view/AttributesProcessor'; import { ObservableRegistry } from '../../src/state/ObservableRegistry'; import { assertMetricData, assertDataPoint, defaultInstrumentDescriptor, ObservableCallbackDelegate } from '../util'; import { ObservableInstrument } from '../../src/Instruments'; +import { HrTime } from '@opentelemetry/api'; const deltaCollector: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.DELTA, @@ -35,8 +35,6 @@ const cumulativeCollector: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.CUMULATIVE, }; -const sdkStartTime = hrTime(); - describe('AsyncMetricStorage', () => { describe('collect', () => { describe('Delta Collector', () => { @@ -46,7 +44,7 @@ describe('AsyncMetricStorage', () => { const observableRegistry = new ObservableRegistry(); const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, - new SumAggregator(), + new SumAggregator(true), new NoopAttributesProcessor(), ); const observable = new ObservableInstrument( @@ -63,29 +61,29 @@ describe('AsyncMetricStorage', () => { observableResult.observe(3, { key: '3' }); }); { - await observableRegistry.observe(); + const collectionTime: HrTime = [0, 0]; + await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, collectors, - sdkStartTime, - hrTime()); + collectionTime); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 3); - assertDataPoint(metric.dataPoints[0], { key: '1' }, 1); - assertDataPoint(metric.dataPoints[1], { key: '2' }, 2); - assertDataPoint(metric.dataPoints[2], { key: '3' }, 3); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 1, collectionTime, collectionTime); + assertDataPoint(metric.dataPoints[1], { key: '2' }, 2, collectionTime, collectionTime); + assertDataPoint(metric.dataPoints[2], { key: '3' }, 3, collectionTime, collectionTime); } delegate.setDelegate(observableResult => {}); // The attributes should not be memorized if no measurement was reported. { - await observableRegistry.observe(); + const collectionTime: HrTime = [1, 1]; + await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, collectors, - sdkStartTime, - hrTime()); + collectionTime); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 0); @@ -97,19 +95,166 @@ describe('AsyncMetricStorage', () => { observableResult.observe(6, { key: '3' }); }); { - await observableRegistry.observe(); + const collectionTime: HrTime = [2, 2]; + await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - [deltaCollector], - sdkStartTime, - hrTime()); + collectors, + collectionTime); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 3); - // all values were diffed. - assertDataPoint(metric.dataPoints[0], { key: '1' }, 3); - assertDataPoint(metric.dataPoints[1], { key: '2' }, 3); - assertDataPoint(metric.dataPoints[2], { key: '3' }, 3); + // All values were diffed. StartTime is being reset for gaps. + assertDataPoint(metric.dataPoints[0], { key: '1' }, 3, collectionTime, collectionTime); + assertDataPoint(metric.dataPoints[1], { key: '2' }, 3, collectionTime, collectionTime); + assertDataPoint(metric.dataPoints[2], { key: '3' }, 3, collectionTime, collectionTime); + } + }); + + it('should detect resets for monotonic sum metrics', async () => { + const delegate = new ObservableCallbackDelegate(); + const observableRegistry = new ObservableRegistry(); + const metricStorage = new AsyncMetricStorage( + defaultInstrumentDescriptor, + new SumAggregator(true), + new NoopAttributesProcessor(), + ); + const observable = new ObservableInstrument( + defaultInstrumentDescriptor, + [metricStorage], + observableRegistry + ); + + observableRegistry.addCallback(delegate.getCallback(), observable); + + // Observe a metric + delegate.setDelegate(observableResult => { + observableResult.observe(100, { key: '1' }); + }); + let lastCollectionTime: HrTime; + { + const collectionTime: HrTime = [0, 0]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + deltaCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 100, collectionTime, collectionTime); + lastCollectionTime = collectionTime; + } + + // Observe a drop on the metric + delegate.setDelegate(observableResult => { + observableResult.observe(1, { key: '1' }); + }); + // The result data should not be diff-ed to be a negative value + { + const collectionTime: HrTime = [1, 1]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + deltaCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 1, lastCollectionTime, collectionTime); + lastCollectionTime = collectionTime; + } + + // Observe a new data point + delegate.setDelegate(observableResult => { + observableResult.observe(50, { key: '1' }); + }); + // The result data should now be a delta to the previous collection + { + const collectionTime: HrTime = [2, 2]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + deltaCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 49, lastCollectionTime, collectionTime); + } + }); + + it('should not detect resets and gaps for non-monotonic sum metrics', async () => { + const delegate = new ObservableCallbackDelegate(); + const observableRegistry = new ObservableRegistry(); + const metricStorage = new AsyncMetricStorage( + defaultInstrumentDescriptor, + new SumAggregator(false), + new NoopAttributesProcessor(), + ); + const observable = new ObservableInstrument( + defaultInstrumentDescriptor, + [metricStorage], + observableRegistry + ); + + observableRegistry.addCallback(delegate.getCallback(), observable); + + // Observe a metric + delegate.setDelegate(observableResult => { + observableResult.observe(100, { key: '1' }); + }); + let lastCollectionTime: HrTime; + { + const collectionTime: HrTime = [0, 0]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + deltaCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 100, collectionTime, collectionTime); + lastCollectionTime = collectionTime; + } + + // Observe a drop on the metric + delegate.setDelegate(observableResult => { + observableResult.observe(1, { key: '1' }); + }); + // The result data should be a delta to the previous collection + { + const collectionTime: HrTime = [0, 0]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + deltaCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, -99, lastCollectionTime, collectionTime); + lastCollectionTime = collectionTime; + + } + + // Observe a new data point + delegate.setDelegate(observableResult => { + observableResult.observe(50, { key: '1' }); + }); + // The result data should be a delta to the previous collection + { + const collectionTime: HrTime = [2, 2]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + deltaCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 49, lastCollectionTime, collectionTime); } }); }); @@ -121,7 +266,7 @@ describe('AsyncMetricStorage', () => { const observableRegistry = new ObservableRegistry(); const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, - new SumAggregator(), + new SumAggregator(true), new NoopAttributesProcessor(), ); const observable = new ObservableInstrument( @@ -137,36 +282,38 @@ describe('AsyncMetricStorage', () => { observableResult.observe(2, { key: '2' }); observableResult.observe(3, { key: '3' }); }); + let startTime: HrTime; { - await observableRegistry.observe(); + const collectionTime: HrTime = [0, 0]; + await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, collectors, - sdkStartTime, - hrTime()); + collectionTime); + startTime = collectionTime; assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 3); - assertDataPoint(metric.dataPoints[0], { key: '1' }, 1); - assertDataPoint(metric.dataPoints[1], { key: '2' }, 2); - assertDataPoint(metric.dataPoints[2], { key: '3' }, 3); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 1, startTime, collectionTime); + assertDataPoint(metric.dataPoints[1], { key: '2' }, 2, startTime, collectionTime); + assertDataPoint(metric.dataPoints[2], { key: '3' }, 3, startTime, collectionTime); } delegate.setDelegate(observableResult => {}); // The attributes should be memorized even if no measurement was reported. { - await observableRegistry.observe(); + const collectionTime: HrTime = [1, 1]; + await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, collectors, - sdkStartTime, - hrTime()); + collectionTime); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 3); - assertDataPoint(metric.dataPoints[0], { key: '1' }, 1); - assertDataPoint(metric.dataPoints[1], { key: '2' }, 2); - assertDataPoint(metric.dataPoints[2], { key: '3' }, 3); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 1, startTime, collectionTime); + assertDataPoint(metric.dataPoints[1], { key: '2' }, 2, startTime, collectionTime); + assertDataPoint(metric.dataPoints[2], { key: '3' }, 3, startTime, collectionTime); } delegate.setDelegate(observableResult => { @@ -175,18 +322,165 @@ describe('AsyncMetricStorage', () => { observableResult.observe(6, { key: '3' }); }); { - await observableRegistry.observe(); + const collectionTime: HrTime = [2, 2]; + await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, collectors, - sdkStartTime, - hrTime()); + collectionTime); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 3); - assertDataPoint(metric.dataPoints[0], { key: '1' }, 4); - assertDataPoint(metric.dataPoints[1], { key: '2' }, 5); - assertDataPoint(metric.dataPoints[2], { key: '3' }, 6); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 4, startTime, collectionTime); + assertDataPoint(metric.dataPoints[1], { key: '2' }, 5, startTime, collectionTime); + assertDataPoint(metric.dataPoints[2], { key: '3' }, 6, startTime, collectionTime); + } + }); + + it('should collect monotonic metrics with resets and gaps', async () => { + const delegate = new ObservableCallbackDelegate(); + const observableRegistry = new ObservableRegistry(); + const metricStorage = new AsyncMetricStorage( + defaultInstrumentDescriptor, + new SumAggregator(true), + new NoopAttributesProcessor(), + ); + const observable = new ObservableInstrument( + defaultInstrumentDescriptor, + [metricStorage], + observableRegistry + ); + + observableRegistry.addCallback(delegate.getCallback(), observable); + + // Observe a metric + delegate.setDelegate(observableResult => { + observableResult.observe(100, { key: '1' }); + }); + let startTime: HrTime; + { + const collectionTime: HrTime = [0, 0]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + cumulativeCollector, + collectors, + collectionTime); + + startTime = collectionTime; + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 100, startTime, collectionTime); + } + + // Observe a drop on the metric + delegate.setDelegate(observableResult => { + observableResult.observe(1, { key: '1' }); + }); + // The result data should not be diff-ed to be a negative value + { + const collectionTime: HrTime = [1, 1]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + cumulativeCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + // The startTime should be reset. + assertDataPoint(metric.dataPoints[0], { key: '1' }, 1, collectionTime, collectionTime); + startTime = collectionTime; + } + + // Observe a new data point + delegate.setDelegate(observableResult => { + observableResult.observe(50, { key: '1' }); + }); + // The result data should now be a delta to the previous collection + { + const collectionTime: HrTime = [2, 2]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + cumulativeCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 50, startTime, collectionTime); + } + }); + + it('should collect non-monotonic metrics with resets and gaps', async () => { + const delegate = new ObservableCallbackDelegate(); + const observableRegistry = new ObservableRegistry(); + const metricStorage = new AsyncMetricStorage( + defaultInstrumentDescriptor, + new SumAggregator(false), + new NoopAttributesProcessor(), + ); + const observable = new ObservableInstrument( + defaultInstrumentDescriptor, + [metricStorage], + observableRegistry + ); + + observableRegistry.addCallback(delegate.getCallback(), observable); + + // Observe a metric + delegate.setDelegate(observableResult => { + observableResult.observe(100, { key: '1' }); + }); + let startTime: HrTime; + { + const collectionTime: HrTime = [0, 0]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + cumulativeCollector, + collectors, + collectionTime); + + startTime = collectionTime; + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 100, startTime, collectionTime); + } + + // Observe a drop on the metric + delegate.setDelegate(observableResult => { + observableResult.observe(1, { key: '1' }); + }); + // The result data should be a delta to the previous collection + { + const collectionTime: HrTime = [1, 1]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + cumulativeCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + // No reset on the value or the startTime + assertDataPoint(metric.dataPoints[0], { key: '1' }, 1, startTime, collectionTime); + } + + // Observe a new data point + delegate.setDelegate(observableResult => { + observableResult.observe(50, { key: '1' }); + }); + // The result data should be a delta to the previous collection + { + const collectionTime: HrTime = [2, 2]; + await observableRegistry.observe(collectionTime); + const metric = metricStorage.collect( + cumulativeCollector, + collectors, + collectionTime); + + assertMetricData(metric, DataPointType.SINGULAR); + assert.strictEqual(metric.dataPoints.length, 1); + assertDataPoint(metric.dataPoints[0], { key: '1' }, 50, startTime, collectionTime); } }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricProcessor.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricProcessor.test.ts index acada414afb..2bff4bec716 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricProcessor.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/DeltaMetricProcessor.test.ts @@ -28,17 +28,17 @@ describe('DeltaMetricProcessor', () => { for (const value of commonValues) { for (const attributes of commonAttributes) { - metricProcessor.record(value, attributes, api.context.active()); + metricProcessor.record(value, attributes, api.context.active(), [0, 0]); } } }); it('no exceptions on record with no-drop aggregator', () => { - const metricProcessor = new DeltaMetricProcessor(new SumAggregator()); + const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true)); for (const value of commonValues) { for (const attributes of commonAttributes) { - metricProcessor.record(value, attributes, api.context.active()); + metricProcessor.record(value, attributes, api.context.active(), [0, 0]); } } }); @@ -54,11 +54,11 @@ describe('DeltaMetricProcessor', () => { measurements.set(attributes, value); } } - metricProcessor.batchCumulate(measurements); + metricProcessor.batchCumulate(measurements, [0, 0]); }); it('no exceptions on record with no-drop aggregator', () => { - const metricProcessor = new DeltaMetricProcessor(new SumAggregator()); + const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true)); const measurements = new AttributeHashMap(); for (const value of commonValues) { @@ -66,16 +66,16 @@ describe('DeltaMetricProcessor', () => { measurements.set(attributes, value); } } - metricProcessor.batchCumulate(measurements); + metricProcessor.batchCumulate(measurements, [0, 0]); }); it('should compute the diff of accumulations', () => { - const metricProcessor = new DeltaMetricProcessor(new SumAggregator()); + const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true)); { const measurements = new AttributeHashMap(); measurements.set({}, 10); - metricProcessor.batchCumulate(measurements); + metricProcessor.batchCumulate(measurements, [0, 0]); const accumulations = metricProcessor.collect(); const accumulation = accumulations.get({}); assert.strictEqual(accumulation?.toPointValue(), 10); @@ -84,7 +84,7 @@ describe('DeltaMetricProcessor', () => { { const measurements = new AttributeHashMap(); measurements.set({}, 21); - metricProcessor.batchCumulate(measurements); + metricProcessor.batchCumulate(measurements, [0, 0]); const accumulations = metricProcessor.collect(); const accumulation = accumulations.get({}); assert.strictEqual(accumulation?.toPointValue(), 11); @@ -94,11 +94,11 @@ describe('DeltaMetricProcessor', () => { describe('collect', () => { it('should export', () => { - const metricProcessor = new DeltaMetricProcessor(new SumAggregator()); + const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true)); - metricProcessor.record(1, { attribute: '1' }, api.ROOT_CONTEXT); - metricProcessor.record(2, { attribute: '1' }, api.ROOT_CONTEXT); - metricProcessor.record(1, { attribute: '2' }, api.ROOT_CONTEXT); + metricProcessor.record(1, { attribute: '1' }, api.ROOT_CONTEXT, [0, 0]); + metricProcessor.record(2, { attribute: '1' }, api.ROOT_CONTEXT, [1, 1]); + metricProcessor.record(1, { attribute: '2' }, api.ROOT_CONTEXT, [2, 2]); let accumulations = metricProcessor.collect(); assert.strictEqual(accumulations.size, 2); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricStorageRegistry.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricStorageRegistry.test.ts index 575127c9fb9..e3defd2c924 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricStorageRegistry.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MetricStorageRegistry.test.ts @@ -33,7 +33,6 @@ import { class TestMetricStorage extends MetricStorage { collect(collector: MetricCollectorHandle, collectors: MetricCollectorHandle[], - sdkStartTime: HrTime, collectionTime: HrTime, ): Maybe { return undefined; diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts index d202895b691..e91c952e69d 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/MultiWritableMetricStorage.test.ts @@ -16,6 +16,7 @@ import * as api from '@opentelemetry/api'; import { MetricAttributes } from '@opentelemetry/api-metrics'; +import { hrTime } from '@opentelemetry/core'; import * as assert from 'assert'; import { MultiMetricStorage } from '../../src/state/MultiWritableMetricStorage'; import { WritableMetricStorage } from '../../src/state/WritableMetricStorage'; @@ -28,7 +29,7 @@ describe('MultiMetricStorage', () => { for (const value of commonValues) { for (const attribute of commonAttributes) { - metricStorage.record(value, attribute, api.context.active()); + metricStorage.record(value, attribute, api.context.active(), [0, 0]); } } }); @@ -50,7 +51,7 @@ describe('MultiMetricStorage', () => { for (const attributes of commonAttributes) { const context = api.context.active(); expectedMeasurements.push({ value, attributes, context }); - metricStorage.record(value, attributes, context); + metricStorage.record(value, attributes, context, hrTime()); } } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts index 72d090b961c..1a95d4c12a3 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/SyncMetricStorage.test.ts @@ -15,7 +15,6 @@ */ import * as api from '@opentelemetry/api'; -import { hrTime } from '@opentelemetry/core'; import * as assert from 'assert'; import { SumAggregator } from '../../src/aggregator'; @@ -34,16 +33,18 @@ const cumulativeCollector: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.CUMULATIVE, }; -const sdkStartTime = hrTime(); - describe('SyncMetricStorage', () => { describe('record', () => { it('no exceptions on record', () => { - const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor()); + const metricStorage = new SyncMetricStorage( + defaultInstrumentDescriptor, + new SumAggregator(true), + new NoopAttributesProcessor() + ); for (const value of commonValues) { for (const attributes of commonAttributes) { - metricStorage.record(value, attributes, api.context.active()); + metricStorage.record(value, attributes, api.context.active(), [0, 0]); } } }); @@ -53,20 +54,23 @@ describe('SyncMetricStorage', () => { describe('Delta Collector', () => { const collectors = [deltaCollector]; it('should collect and reset memos', async () => { - const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor()); - metricStorage.record(1, {}, api.context.active()); - metricStorage.record(2, {}, api.context.active()); - metricStorage.record(3, {}, api.context.active()); + const metricStorage = new SyncMetricStorage( + defaultInstrumentDescriptor, + new SumAggregator(true), + new NoopAttributesProcessor() + ); + metricStorage.record(1, {}, api.context.active(), [0, 0]); + metricStorage.record(2, {}, api.context.active(), [1, 1]); + metricStorage.record(3, {}, api.context.active(), [2, 2]); { const metric = metricStorage.collect( deltaCollector, collectors, - sdkStartTime, - hrTime()); + [3, 3]); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 6); + assertDataPoint(metric.dataPoints[0], {}, 6, [0, 0], [3, 3]); } // The attributes should not be memorized. @@ -74,24 +78,22 @@ describe('SyncMetricStorage', () => { const metric = metricStorage.collect( deltaCollector, collectors, - sdkStartTime, - hrTime()); + [4, 4]); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 0); } - metricStorage.record(1, {}, api.context.active()); + metricStorage.record(1, {}, api.context.active(), [5, 5]); { const metric = metricStorage.collect( deltaCollector, [deltaCollector], - sdkStartTime, - hrTime()); + [6, 6]); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 1); + assertDataPoint(metric.dataPoints[0], {}, 1, [5, 5], [6, 6]); } }); }); @@ -99,20 +101,23 @@ describe('SyncMetricStorage', () => { describe('Cumulative Collector', () => { const collectors = [cumulativeCollector]; it('should collect cumulative metrics', async () => { - const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor()); - metricStorage.record(1, {}, api.context.active()); - metricStorage.record(2, {}, api.context.active()); - metricStorage.record(3, {}, api.context.active()); + const metricStorage = new SyncMetricStorage( + defaultInstrumentDescriptor, + new SumAggregator(true), + new NoopAttributesProcessor() + ); + metricStorage.record(1, {}, api.context.active(), [0, 0]); + metricStorage.record(2, {}, api.context.active(), [1, 1]); + metricStorage.record(3, {}, api.context.active(), [2, 2]); { const metric = metricStorage.collect( cumulativeCollector, collectors, - sdkStartTime, - hrTime()); + [3, 3]); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 6); + assertDataPoint(metric.dataPoints[0], {}, 6, [0, 0], [3, 3]); } // The attributes should be memorized. @@ -120,25 +125,23 @@ describe('SyncMetricStorage', () => { const metric = metricStorage.collect( cumulativeCollector, collectors, - sdkStartTime, - hrTime()); + [4, 4]); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 6); + assertDataPoint(metric.dataPoints[0], {}, 6, [0, 0], [4, 4]); } - metricStorage.record(1, {}, api.context.active()); + metricStorage.record(1, {}, api.context.active(), [5, 5]); { const metric = metricStorage.collect( cumulativeCollector, collectors, - sdkStartTime, - hrTime()); + [6, 6]); assertMetricData(metric, DataPointType.SINGULAR); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 7); + assertDataPoint(metric.dataPoints[0], {}, 7, [0, 0], [6, 6]); } }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts index 253a4132b17..5595d23f257 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts @@ -15,7 +15,6 @@ */ import * as api from '@opentelemetry/api'; -import { hrTime } from '@opentelemetry/core'; import * as assert from 'assert'; import * as sinon from 'sinon'; import { SumAggregator } from '../../src/aggregator'; @@ -38,8 +37,6 @@ const cumulativeCollector1: MetricCollectorHandle = { selectAggregationTemporality: () => AggregationTemporality.CUMULATIVE, }; -const sdkStartTime = hrTime(); - describe('TemporalMetricProcessor', () => { afterEach(() => { sinon.restore(); @@ -49,47 +46,46 @@ describe('TemporalMetricProcessor', () => { describe('single delta collector', () => { const collectors = [ deltaCollector1 ]; - it('should build metrics', () => { + it('should build delta recording metrics', () => { const spy = sinon.spy(deltaCollector1, 'selectAggregationTemporality'); - const aggregator = new SumAggregator(); + const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - deltaMetricStorage.record(1, {}, api.context.active()); + deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [2, 2]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 1); + assertDataPoint(metric.dataPoints[0], {}, 1, [1, 1], [2, 2]); } - deltaMetricStorage.record(2, {}, api.context.active()); + deltaMetricStorage.record(2, {}, api.context.active(), [3, 3]); { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [4, 4]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 2); + // Time span: (lastCollectionTime, collectionTime) + assertDataPoint(metric.dataPoints[0], {}, 2, [2, 2], [4, 4]); } { @@ -98,8 +94,7 @@ describe('TemporalMetricProcessor', () => { collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [5, 5]); assertMetricData(metric, DataPointType.SINGULAR, @@ -116,27 +111,26 @@ describe('TemporalMetricProcessor', () => { describe('two delta collector', () => { const collectors = [ deltaCollector1, deltaCollector2 ]; - it('should build metrics', () => { - const aggregator = new SumAggregator(); + it('should build delta recording metrics', () => { + const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - deltaMetricStorage.record(1, {}, api.context.active()); + deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [2, 2]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 1); + assertDataPoint(metric.dataPoints[0], {}, 1, [1, 1], [2, 2]); } { @@ -145,62 +139,59 @@ describe('TemporalMetricProcessor', () => { collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [3, 3]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 1); + assertDataPoint(metric.dataPoints[0], {}, 1, [1, 1], [3, 3]); } }); }); describe('single cumulative collector', () => { const collectors = [ cumulativeCollector1 ]; - it('should build metrics', () => { + it('should build delta recording metrics', () => { const spy = sinon.spy(cumulativeCollector1, 'selectAggregationTemporality'); - const aggregator = new SumAggregator(); + const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - deltaMetricStorage.record(1, {}, api.context.active()); + deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [2, 2]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.CUMULATIVE); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 1); + assertDataPoint(metric.dataPoints[0], {}, 1, [1, 1], [2, 2]); } - deltaMetricStorage.record(2, {}, api.context.active()); + deltaMetricStorage.record(2, {}, api.context.active(), [3, 3]); { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [4, 4]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.CUMULATIVE); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 3); + assertDataPoint(metric.dataPoints[0], {}, 3, [1, 1], [4, 4]); } // selectAggregationTemporality should be called only once. @@ -210,45 +201,43 @@ describe('TemporalMetricProcessor', () => { describe('cumulative collector with delta collector', () => { const collectors = [ deltaCollector1, cumulativeCollector1 ]; - it('should build metrics', () => { - const aggregator = new SumAggregator(); + it('should build delta recording metrics', () => { + const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - deltaMetricStorage.record(1, {}, api.context.active()); + deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [2, 2]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.CUMULATIVE); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 1); + assertDataPoint(metric.dataPoints[0], {}, 1, [1, 1], [2, 2]); } - deltaMetricStorage.record(2, {}, api.context.active()); + deltaMetricStorage.record(2, {}, api.context.active(), [3, 3]); { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [4, 4]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 3); + assertDataPoint(metric.dataPoints[0], {}, 3, [1, 1], [4, 4]); } { const metric = temporalMetricStorage.buildMetrics( @@ -256,15 +245,14 @@ describe('TemporalMetricProcessor', () => { collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), - sdkStartTime, - hrTime()); + [5, 5]); assertMetricData(metric, DataPointType.SINGULAR, defaultInstrumentDescriptor, AggregationTemporality.CUMULATIVE); assert.strictEqual(metric.dataPoints.length, 1); - assertDataPoint(metric.dataPoints[0], {}, 3); + assertDataPoint(metric.dataPoints[0], {}, 3, [1, 1], [5, 5]); } }); }); diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts index c889d6723ab..87c6184e375 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts @@ -112,16 +112,16 @@ export function assertDataPoint( assert.deepStrictEqual(it.attributes, attributes); assert.deepStrictEqual(it.value, point); if (startTime) { - assert.deepStrictEqual(it.startTime, startTime); + assert.deepStrictEqual(it.startTime, startTime, 'startTime should be equal'); } else { assert(Array.isArray(it.startTime)); - assert.strictEqual(it.startTime.length, 2); + assert.strictEqual(it.startTime.length, 2, 'startTime should be equal'); } if (endTime) { - assert.deepStrictEqual(it.endTime, endTime); + assert.deepStrictEqual(it.endTime, endTime, 'endTime should be equal'); } else { assert(Array.isArray(it.endTime)); - assert.strictEqual(it.endTime.length, 2); + assert.strictEqual(it.endTime.length, 2, 'endTime should be equal'); } }