Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958

Merged
merged 5 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ All notable changes to this project will be documented in this file.

### :boom: Breaking Change

* feat(metrics): metric readers and exporters now select aggregation temporality based on instrument type [#2902](https://github.com/open-telemetry/opentelemetry-js/pull/2902) @seemk
* chore: remove unused InstrumentationConfig#path [#2944](https://github.com/open-telemetry/opentelemetry-js/pull/2944) @flarna

### :rocket: (Enhancement)
Expand Down
3 changes: 3 additions & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ All notable changes to experimental packages in this project will be documented

### :boom: Breaking Change

* feat(metrics): metric readers and exporters now select aggregation temporality based on instrument type #2902 @seemk

### :rocket: (Enhancement)

* feat(exporters): update proto version and use otlp-transformer #2929 @pichlermarc
* fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ interface LastReportedHistory<T> {
* The timestamp the data was reported.
*/
collectionTime: HrTime;
/**
* The AggregationTemporality used to aggregate reports.
*/
aggregationTemporality: AggregationTemporality;
}

/**
Expand Down Expand Up @@ -69,19 +73,20 @@ export class TemporalMetricProcessor<T> {
sdkStartTime: HrTime,
collectionTime: HrTime,
): Maybe<MetricData> {
const aggregationTemporality = collector.selectAggregationTemporality(instrumentDescriptor.type);
// In case it's our first collection, default to start timestamp (see below for explanation).
let lastCollectionTime = sdkStartTime;

this._stashAccumulations(collectors, currentAccumulations);
const unreportedAccumulations = this._getMergedUnreportedAccumulations(collector);

let result = unreportedAccumulations;
let aggregationTemporality: AggregationTemporality;
// Check our last report time.
if (this._reportHistory.has(collector)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const last = this._reportHistory.get(collector)!;
lastCollectionTime = last.collectionTime;
aggregationTemporality = last.aggregationTemporality;

// Use aggregation temporality + instrument to determine if we do a merge or a diff of
// previous. We have the following four scenarios:
Expand All @@ -95,12 +100,16 @@ export class TemporalMetricProcessor<T> {
// for the next cumulative measurement.
result = TemporalMetricProcessor.merge(last.accumulations, unreportedAccumulations, this._aggregator);
}
} else {
// Call into user code to select aggregation temporality for the instrument.
aggregationTemporality = collector.selectAggregationTemporality(instrumentDescriptor.type);
}

// Update last reported (cumulative) accumulation.
this._reportHistory.set(collector, {
accumulations: result,
collectionTime,
aggregationTemporality,
});

// Metric data time span is determined as:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import * as api from '@opentelemetry/api';
import { hrTime } from '@opentelemetry/core';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { SumAggregator } from '../../src/aggregator';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { DataPointType } from '../../src/export/MetricData';
Expand All @@ -40,11 +41,17 @@ const cumulativeCollector1: MetricCollectorHandle = {
const sdkStartTime = hrTime();

describe('TemporalMetricProcessor', () => {
afterEach(() => {
sinon.restore();
});

describe('buildMetrics', () => {
describe('single delta collector', () => {
const collectors = [ deltaCollector1 ];

it('should build metrics', () => {
const spy = sinon.spy(deltaCollector1, 'selectAggregationTemporality');

const aggregator = new SumAggregator();
const deltaMetricStorage = new DeltaMetricProcessor(aggregator);
const temporalMetricStorage = new TemporalMetricProcessor(aggregator);
Expand All @@ -59,7 +66,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -74,7 +84,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 2);
}
Expand All @@ -88,9 +101,15 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 0);
}

// selectAggregationTemporality should be called only once.
assert.strictEqual(spy.callCount, 1);
});
});

Expand All @@ -112,7 +131,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -126,7 +148,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -136,6 +161,8 @@ describe('TemporalMetricProcessor', () => {
describe('single cumulative collector', () => {
const collectors = [ cumulativeCollector1 ];
it('should build metrics', () => {
const spy = sinon.spy(cumulativeCollector1, 'selectAggregationTemporality');

const aggregator = new SumAggregator();
const deltaMetricStorage = new DeltaMetricProcessor(aggregator);
const temporalMetricStorage = new TemporalMetricProcessor(aggregator);
Expand All @@ -150,7 +177,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.CUMULATIVE);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -165,10 +195,16 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.CUMULATIVE);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 3);
}

// selectAggregationTemporality should be called only once.
assert.strictEqual(spy.callCount, 1);
});
});

Expand All @@ -189,7 +225,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.CUMULATIVE);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -204,7 +243,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 3);
}
Expand All @@ -217,7 +259,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.CUMULATIVE);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { Measurement } from '../src/Measurement';
import { isNotNullish } from '../src/utils';
import { HrTime } from '@opentelemetry/api';
import { Histogram } from '../src/aggregator/types';
import { AggregationTemporality } from '../src/export/AggregationTemporality';

export const defaultResource = new Resource({
resourceKey: 'my-resource',
Expand Down Expand Up @@ -71,6 +72,7 @@ export function assertMetricData(
actual: unknown,
dataPointType?: DataPointType,
instrumentDescriptor: Partial<InstrumentDescriptor> | null = defaultInstrumentDescriptor,
aggregationTemporality?: AggregationTemporality,
): asserts actual is MetricData {
const it = actual as MetricData;
if (instrumentDescriptor != null) {
Expand All @@ -81,6 +83,9 @@ export function assertMetricData(
} else {
assert(isNotNullish(DataPointType[it.dataPointType]));
}
if (aggregationTemporality != null) {
assert.strictEqual(aggregationTemporality, it.aggregationTemporality);
}
assert(Array.isArray(it.dataPoints));
}

Expand Down