Skip to content

Commit

Permalink
fix(sdk-metrics-base): misbehaving aggregation temporality selector t…
Browse files Browse the repository at this point in the history
…olerance
  • Loading branch information
legendecas committed May 10, 2022
1 parent 1ee1b28 commit 301e77b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ interface LastReportedHistory<T> {
* The timestamp the data was reported.
*/
collectionTime: HrTime;
/**
* The AggregationTemporality used to aggregate reports.
*/
aggregationTemporality: AggregationTemporality;
}

/**
Expand Down Expand Up @@ -69,19 +73,20 @@ export class TemporalMetricProcessor<T> {
sdkStartTime: HrTime,
collectionTime: HrTime,
): Maybe<MetricData> {
const aggregationTemporality = collector.selectAggregationTemporality(instrumentDescriptor.type);
// In case it's our first collection, default to start timestamp (see below for explanation).
let lastCollectionTime = sdkStartTime;

this._stashAccumulations(collectors, currentAccumulations);
const unreportedAccumulations = this._getMergedUnreportedAccumulations(collector);

let result = unreportedAccumulations;
let aggregationTemporality: AggregationTemporality;
// Check our last report time.
if (this._reportHistory.has(collector)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const last = this._reportHistory.get(collector)!;
lastCollectionTime = last.collectionTime;
aggregationTemporality = last.aggregationTemporality;

// Use aggregation temporality + instrument to determine if we do a merge or a diff of
// previous. We have the following four scenarios:
Expand All @@ -95,12 +100,16 @@ export class TemporalMetricProcessor<T> {
// for the next cumulative measurement.
result = TemporalMetricProcessor.merge(last.accumulations, unreportedAccumulations, this._aggregator);
}
} else {
// Call into user code to select aggregation temporality for the instrument.
aggregationTemporality = collector.selectAggregationTemporality(instrumentDescriptor.type);
}

// Update last reported (cumulative) accumulation.
this._reportHistory.set(collector, {
accumulations: result,
collectionTime,
aggregationTemporality,
});

// Metric data time span is determined as:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import * as api from '@opentelemetry/api';
import { hrTime } from '@opentelemetry/core';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { SumAggregator } from '../../src/aggregator';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { DataPointType } from '../../src/export/MetricData';
Expand All @@ -40,11 +41,17 @@ const cumulativeCollector1: MetricCollectorHandle = {
const sdkStartTime = hrTime();

describe('TemporalMetricProcessor', () => {
afterEach(() => {
sinon.restore();
});

describe('buildMetrics', () => {
describe('single delta collector', () => {
const collectors = [ deltaCollector1 ];

it('should build metrics', () => {
const spy = sinon.spy(deltaCollector1, 'selectAggregationTemporality');

const aggregator = new SumAggregator();
const deltaMetricStorage = new DeltaMetricProcessor(aggregator);
const temporalMetricStorage = new TemporalMetricProcessor(aggregator);
Expand All @@ -59,7 +66,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -74,7 +84,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 2);
}
Expand All @@ -88,9 +101,15 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 0);
}

// selectAggregationTemporality should be called only once.
assert.strictEqual(spy.callCount, 1);
});
});

Expand All @@ -112,7 +131,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -126,7 +148,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -136,6 +161,8 @@ describe('TemporalMetricProcessor', () => {
describe('single cumulative collector', () => {
const collectors = [ cumulativeCollector1 ];
it('should build metrics', () => {
const spy = sinon.spy(cumulativeCollector1, 'selectAggregationTemporality');

const aggregator = new SumAggregator();
const deltaMetricStorage = new DeltaMetricProcessor(aggregator);
const temporalMetricStorage = new TemporalMetricProcessor(aggregator);
Expand All @@ -150,7 +177,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.CUMULATIVE);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -165,10 +195,16 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.CUMULATIVE);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 3);
}

// selectAggregationTemporality should be called only once.
assert.strictEqual(spy.callCount, 1);
});
});

Expand All @@ -189,7 +225,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.CUMULATIVE);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 1);
}
Expand All @@ -204,7 +243,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.DELTA);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 3);
}
Expand All @@ -217,7 +259,10 @@ describe('TemporalMetricProcessor', () => {
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assertMetricData(metric,
DataPointType.SINGULAR,
defaultInstrumentDescriptor,
AggregationTemporality.CUMULATIVE);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], {}, 3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { Measurement } from '../src/Measurement';
import { isNotNullish } from '../src/utils';
import { HrTime } from '@opentelemetry/api';
import { Histogram } from '../src/aggregator/types';
import { AggregationTemporality } from '../src/export/AggregationTemporality';

export const defaultResource = new Resource({
resourceKey: 'my-resource',
Expand Down Expand Up @@ -71,6 +72,7 @@ export function assertMetricData(
actual: unknown,
dataPointType?: DataPointType,
instrumentDescriptor: Partial<InstrumentDescriptor> | null = defaultInstrumentDescriptor,
aggregationTemporality?: AggregationTemporality,
): asserts actual is MetricData {
const it = actual as MetricData;
if (instrumentDescriptor != null) {
Expand All @@ -81,6 +83,9 @@ export function assertMetricData(
} else {
assert(isNotNullish(DataPointType[it.dataPointType]));
}
if (aggregationTemporality != null) {
assert.strictEqual(aggregationTemporality, it.aggregationTemporality);
}
assert(Array.isArray(it.dataPoints));
}

Expand Down

0 comments on commit 301e77b

Please sign in to comment.