From 301e77b97dbb7d27c15431a9bbd057d641000569 Mon Sep 17 00:00:00 2001 From: legendecas Date: Tue, 10 May 2022 19:25:50 +0800 Subject: [PATCH 1/2] fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance --- .../src/state/TemporalMetricProcessor.ts | 11 +++- .../state/TemporalMetricProcessor.test.ts | 65 ++++++++++++++++--- .../test/util.ts | 5 ++ 3 files changed, 70 insertions(+), 11 deletions(-) diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts index c9131e22529..dc4e5b26ab8 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/src/state/TemporalMetricProcessor.ts @@ -35,6 +35,10 @@ interface LastReportedHistory { * The timestamp the data was reported. */ collectionTime: HrTime; + /** + * The AggregationTemporality used to aggregate reports. + */ + aggregationTemporality: AggregationTemporality; } /** @@ -69,7 +73,6 @@ export class TemporalMetricProcessor { sdkStartTime: HrTime, collectionTime: HrTime, ): Maybe { - const aggregationTemporality = collector.selectAggregationTemporality(instrumentDescriptor.type); // In case it's our first collection, default to start timestamp (see below for explanation). let lastCollectionTime = sdkStartTime; @@ -77,11 +80,13 @@ export class TemporalMetricProcessor { const unreportedAccumulations = this._getMergedUnreportedAccumulations(collector); let result = unreportedAccumulations; + let aggregationTemporality: AggregationTemporality; // Check our last report time. if (this._reportHistory.has(collector)) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const last = this._reportHistory.get(collector)!; lastCollectionTime = last.collectionTime; + aggregationTemporality = last.aggregationTemporality; // Use aggregation temporality + instrument to determine if we do a merge or a diff of // previous. We have the following four scenarios: @@ -95,12 +100,16 @@ export class TemporalMetricProcessor { // for the next cumulative measurement. result = TemporalMetricProcessor.merge(last.accumulations, unreportedAccumulations, this._aggregator); } + } else { + // Call into user code to select aggregation temporality for the instrument. + aggregationTemporality = collector.selectAggregationTemporality(instrumentDescriptor.type); } // Update last reported (cumulative) accumulation. this._reportHistory.set(collector, { accumulations: result, collectionTime, + aggregationTemporality, }); // Metric data time span is determined as: diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts index 7a9710db153..253a4132b17 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/state/TemporalMetricProcessor.test.ts @@ -17,6 +17,7 @@ import * as api from '@opentelemetry/api'; import { hrTime } from '@opentelemetry/core'; import * as assert from 'assert'; +import * as sinon from 'sinon'; import { SumAggregator } from '../../src/aggregator'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; import { DataPointType } from '../../src/export/MetricData'; @@ -40,11 +41,17 @@ const cumulativeCollector1: MetricCollectorHandle = { const sdkStartTime = hrTime(); describe('TemporalMetricProcessor', () => { + afterEach(() => { + sinon.restore(); + }); + describe('buildMetrics', () => { describe('single delta collector', () => { const collectors = [ deltaCollector1 ]; it('should build metrics', () => { + const spy = sinon.spy(deltaCollector1, 'selectAggregationTemporality'); + const aggregator = new SumAggregator(); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); @@ -59,7 +66,10 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 1); } @@ -74,7 +84,10 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 2); } @@ -88,9 +101,15 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 0); } + + // selectAggregationTemporality should be called only once. + assert.strictEqual(spy.callCount, 1); }); }); @@ -112,7 +131,10 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 1); } @@ -126,7 +148,10 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 1); } @@ -136,6 +161,8 @@ describe('TemporalMetricProcessor', () => { describe('single cumulative collector', () => { const collectors = [ cumulativeCollector1 ]; it('should build metrics', () => { + const spy = sinon.spy(cumulativeCollector1, 'selectAggregationTemporality'); + const aggregator = new SumAggregator(); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); @@ -150,7 +177,10 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.CUMULATIVE); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 1); } @@ -165,10 +195,16 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.CUMULATIVE); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 3); } + + // selectAggregationTemporality should be called only once. + assert.strictEqual(spy.callCount, 1); }); }); @@ -189,7 +225,10 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.CUMULATIVE); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 1); } @@ -204,7 +243,10 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.DELTA); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 3); } @@ -217,7 +259,10 @@ describe('TemporalMetricProcessor', () => { sdkStartTime, hrTime()); - assertMetricData(metric, DataPointType.SINGULAR); + assertMetricData(metric, + DataPointType.SINGULAR, + defaultInstrumentDescriptor, + AggregationTemporality.CUMULATIVE); assert.strictEqual(metric.dataPoints.length, 1); assertDataPoint(metric.dataPoints[0], {}, 3); } diff --git a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts index fde6afd95d2..9bcc9c38e3f 100644 --- a/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts +++ b/experimental/packages/opentelemetry-sdk-metrics-base/test/util.ts @@ -29,6 +29,7 @@ import { Measurement } from '../src/Measurement'; import { isNotNullish } from '../src/utils'; import { HrTime } from '@opentelemetry/api'; import { Histogram } from '../src/aggregator/types'; +import { AggregationTemporality } from '../src/export/AggregationTemporality'; export const defaultResource = new Resource({ resourceKey: 'my-resource', @@ -71,6 +72,7 @@ export function assertMetricData( actual: unknown, dataPointType?: DataPointType, instrumentDescriptor: Partial | null = defaultInstrumentDescriptor, + aggregationTemporality?: AggregationTemporality, ): asserts actual is MetricData { const it = actual as MetricData; if (instrumentDescriptor != null) { @@ -81,6 +83,9 @@ export function assertMetricData( } else { assert(isNotNullish(DataPointType[it.dataPointType])); } + if (aggregationTemporality != null) { + assert.strictEqual(aggregationTemporality, it.aggregationTemporality); + } assert(Array.isArray(it.dataPoints)); } From 8bf7f3e757bfc407e9cd2faf1db7dd7394bd54dc Mon Sep 17 00:00:00 2001 From: legendecas Date: Tue, 10 May 2022 19:36:41 +0800 Subject: [PATCH 2/2] fixup! CHANGELOG --- CHANGELOG.md | 1 - experimental/CHANGELOG.md | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b8c5fac1eb..dcff93ca762 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,6 @@ All notable changes to this project will be documented in this file. ### :boom: Breaking Change -* feat(metrics): metric readers and exporters now select aggregation temporality based on instrument type [#2902](https://github.com/open-telemetry/opentelemetry-js/pull/2902) @seemk * chore: remove unused InstrumentationConfig#path [#2944](https://github.com/open-telemetry/opentelemetry-js/pull/2944) @flarna ### :rocket: (Enhancement) diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index 387777f69c2..c973057caca 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -6,9 +6,12 @@ All notable changes to experimental packages in this project will be documented ### :boom: Breaking Change +* feat(metrics): metric readers and exporters now select aggregation temporality based on instrument type #2902 @seemk + ### :rocket: (Enhancement) * feat(exporters): update proto version and use otlp-transformer #2929 @pichlermarc +* fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas ### :bug: (Bug Fix)