Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-metrics-base): detect resets on async metrics #2990

Merged
merged 3 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ All notable changes to experimental packages in this project will be documented
* fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas
* feat(trace-otlp-grpc): configure security with env vars #2827 @svetlanabrennan
* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas
* feat(sdk-metrics-base): detect resets on async metrics #2990 @legendecas
* Added monotonicity support in SumAggregator.
* Added reset and gaps detection for async metric instruments.
* Fixed the start time and end time of an exported metric with regarding to resets and gaps.

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ describe('OTLPMetricExporter - node with json over http', () => {
assert.ok(typeof metric3 !== 'undefined', "histogram doesn't exist");
ensureHistogramIsCorrect(
metric3,
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[1].dataPoints[0].endTime),
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[1].dataPoints[0].startTime),
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[2].dataPoints[0].endTime),
core.hrTimeToNanoseconds(metrics.scopeMetrics[0].metrics[2].dataPoints[0].startTime),
[0, 100],
[0, 2, 0]
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import * as api from '@opentelemetry/api';
import * as metrics from '@opentelemetry/api-metrics';
import { ObservableCallback } from '@opentelemetry/api-metrics';
import { hrTime } from '@opentelemetry/core';
import { InstrumentDescriptor } from './InstrumentDescriptor';
import { ObservableRegistry } from './state/ObservableRegistry';
import { AsyncWritableMetricStorage, WritableMetricStorage } from './state/WritableMetricStorage';
Expand All @@ -31,7 +32,7 @@ export class SyncInstrument {
);
value = Math.trunc(value);
}
this._writableMetricStorage.record(value, attributes, context);
this._writableMetricStorage.record(value, attributes, context, hrTime());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export class DropAggregator implements Aggregator<undefined> {
_descriptor: InstrumentDescriptor,
_aggregationTemporality: AggregationTemporality,
_accumulationByAttributes: AccumulationRecord<undefined>[],
_startTime: HrTime,
_endTime: HrTime): Maybe<MetricData> {
return undefined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ function createNewEmptyCheckpoint(boundaries: number[]): Histogram {

export class HistogramAccumulation implements Accumulation {
constructor(
public startTime: HrTime,
private readonly _boundaries: number[],
private _recordMinMax = true,
private _current: Histogram = createNewEmptyCheckpoint(_boundaries)
Expand All @@ -70,6 +71,10 @@ export class HistogramAccumulation implements Accumulation {
this._current.buckets.counts[this._boundaries.length] += 1;
}

setStartTime(startTime: HrTime): void {
this.startTime = startTime;
}

toPointValue(): Histogram {
return this._current;
}
Expand All @@ -88,8 +93,8 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
*/
constructor(private readonly _boundaries: number[], private readonly _recordMinMax: boolean) {}

createAccumulation() {
return new HistogramAccumulation(this._boundaries, this._recordMinMax);
createAccumulation(startTime: HrTime) {
return new HistogramAccumulation(startTime, this._boundaries, this._recordMinMax);
}

/**
Expand Down Expand Up @@ -125,7 +130,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
}
}

return new HistogramAccumulation(previousValue.buckets.boundaries, this._recordMinMax, {
return new HistogramAccumulation(previous.startTime, previousValue.buckets.boundaries, this._recordMinMax, {
buckets: {
boundaries: previousValue.buckets.boundaries,
counts: mergedCounts,
Expand Down Expand Up @@ -153,7 +158,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
diffedCounts[idx] = currentCounts[idx] - previousCounts[idx];
}

return new HistogramAccumulation(previousValue.buckets.boundaries, this._recordMinMax, {
return new HistogramAccumulation(current.startTime, previousValue.buckets.boundaries, this._recordMinMax, {
buckets: {
boundaries: previousValue.buckets.boundaries,
counts: diffedCounts,
Expand All @@ -170,7 +175,6 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
descriptor: InstrumentDescriptor,
aggregationTemporality: AggregationTemporality,
accumulationByAttributes: AccumulationRecord<HistogramAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<HistogramMetricData> {
return {
descriptor,
Expand All @@ -179,7 +183,7 @@ export class HistogramAggregator implements Aggregator<HistogramAccumulation> {
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
return {
attributes,
startTime,
startTime: accumulation.startTime,
endTime,
value: accumulation.toPointValue(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ import { Maybe } from '../utils';
import { AggregationTemporality } from '../export/AggregationTemporality';

export class LastValueAccumulation implements Accumulation {
constructor(private _current: number = 0, public sampleTime: HrTime = [0, 0]) {}
constructor(public startTime: HrTime, private _current: number = 0, public sampleTime: HrTime = [0, 0]) {}

record(value: number): void {
this._current = value;
this.sampleTime = hrTime();
}

setStartTime(startTime: HrTime): void {
this.startTime = startTime;
}

toPointValue(): LastValue {
return this._current;
}
Expand All @@ -39,8 +43,8 @@ export class LastValueAccumulation implements Accumulation {
export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
public kind: AggregatorKind.LAST_VALUE = AggregatorKind.LAST_VALUE;

createAccumulation() {
return new LastValueAccumulation();
createAccumulation(startTime: HrTime) {
return new LastValueAccumulation(startTime);
}

/**
Expand All @@ -51,7 +55,7 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
merge(previous: LastValueAccumulation, delta: LastValueAccumulation): LastValueAccumulation {
// nanoseconds may lose precisions.
const latestAccumulation = hrTimeToMicroseconds(delta.sampleTime) >= hrTimeToMicroseconds(previous.sampleTime) ? delta : previous;
return new LastValueAccumulation(latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
return new LastValueAccumulation(previous.startTime, latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
}

/**
Expand All @@ -63,14 +67,13 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
diff(previous: LastValueAccumulation, current: LastValueAccumulation): LastValueAccumulation {
// nanoseconds may lose precisions.
const latestAccumulation = hrTimeToMicroseconds(current.sampleTime) >= hrTimeToMicroseconds(previous.sampleTime) ? current : previous;
return new LastValueAccumulation(latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
return new LastValueAccumulation(current.startTime, latestAccumulation.toPointValue(), latestAccumulation.sampleTime);
}

toMetricData(
descriptor: InstrumentDescriptor,
aggregationTemporality: AggregationTemporality,
accumulationByAttributes: AccumulationRecord<LastValueAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<SingularMetricData> {
return {
descriptor,
Expand All @@ -79,7 +82,7 @@ export class LastValueAggregator implements Aggregator<LastValueAccumulation> {
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
return {
attributes,
startTime,
startTime: accumulation.startTime,
endTime,
value: accumulation.toPointValue(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ import { Maybe } from '../utils';
import { AggregationTemporality } from '../export/AggregationTemporality';

export class SumAccumulation implements Accumulation {
constructor(private _current: number = 0) {}
constructor(public startTime: HrTime, public monotonic: boolean, private _current: number = 0, public reset = false) {}

record(value: number): void {
if (this.monotonic && value < 0) {
return;
}
this._current += value;
}

setStartTime(startTime: HrTime): void {
this.startTime = startTime;
}

toPointValue(): Sum {
return this._current;
}
Expand All @@ -37,29 +44,45 @@ export class SumAccumulation implements Accumulation {
export class SumAggregator implements Aggregator<SumAccumulation> {
public kind: AggregatorKind.SUM = AggregatorKind.SUM;

createAccumulation() {
return new SumAccumulation();
constructor (public monotonic: boolean) {}
dyladan marked this conversation as resolved.
Show resolved Hide resolved

createAccumulation(startTime: HrTime) {
return new SumAccumulation(startTime, this.monotonic);
}

/**
* Returns the result of the merge of the given accumulations.
*/
merge(previous: SumAccumulation, delta: SumAccumulation): SumAccumulation {
return new SumAccumulation(previous.toPointValue() + delta.toPointValue());
const prevPv = previous.toPointValue();
const deltaPv = delta.toPointValue();
if (delta.reset) {
return new SumAccumulation(delta.startTime, this.monotonic, deltaPv, delta.reset);
}
return new SumAccumulation(previous.startTime, this.monotonic, prevPv + deltaPv);
}

/**
* Returns a new DELTA aggregation by comparing two cumulative measurements.
*/
diff(previous: SumAccumulation, current: SumAccumulation): SumAccumulation {
return new SumAccumulation(current.toPointValue() - previous.toPointValue());
const prevPv = previous.toPointValue();
const currPv = current.toPointValue();
/**
* If the SumAggregator is a monotonic one and the previous point value is
* greater than the current one, a reset is deemed to be happened.
* Return the current point value to prevent the value from been reset.
*/
if (this.monotonic && (prevPv > currPv)) {
return new SumAccumulation(current.startTime, this.monotonic, currPv, true);
}
return new SumAccumulation(current.startTime, this.monotonic, currPv - prevPv);
}

toMetricData(
descriptor: InstrumentDescriptor,
aggregationTemporality: AggregationTemporality,
accumulationByAttributes: AccumulationRecord<SumAccumulation>[],
startTime: HrTime,
endTime: HrTime): Maybe<SingularMetricData> {
return {
descriptor,
pichlermarc marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -68,7 +91,7 @@ export class SumAggregator implements Aggregator<SumAccumulation> {
dataPoints: accumulationByAttributes.map(([attributes, accumulation]) => {
return {
attributes,
startTime,
startTime: accumulation.startTime,
endTime,
value: accumulation.toPointValue(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface Histogram {
* An Aggregator accumulation state.
*/
export interface Accumulation {
setStartTime(startTime: HrTime): void;
record(value: number): void;
}

Expand All @@ -84,7 +85,7 @@ export interface Aggregator<T> {
/**
* Create a clean state of accumulation.
*/
createAccumulation(): T;
createAccumulation(startTime: HrTime): T;

/**
* Returns the result of the merge of the given accumulations.
Expand Down Expand Up @@ -112,13 +113,11 @@ export interface Aggregator<T> {
*
* @param descriptor the metric instrument descriptor.
* @param accumulationByAttributes the array of attributes and accumulation pairs.
* @param startTime the start time of the metric data.
* @param endTime the end time of the metric data.
* @return the {@link MetricData} that this {@link Aggregator} will produce.
*/
toMetricData(descriptor: InstrumentDescriptor,
aggregationTemporality: AggregationTemporality,
accumulationByAttributes: AccumulationRecord<T>[],
startTime: HrTime,
endTime: HrTime): Maybe<MetricData>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
}

record(measurements: AttributeHashMap<number>) {
record(measurements: AttributeHashMap<number>, observationTime: HrTime) {
const processed = new AttributeHashMap<number>();
Array.from(measurements.entries()).forEach(([attributes, value]) => {
processed.set(this._attributesProcessor.process(attributes), value);
});
this._deltaMetricStorage.batchCumulate(processed);
this._deltaMetricStorage.batchCumulate(processed, observationTime);
}

/**
Expand All @@ -64,7 +64,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
sdkStartTime: HrTime,
collectionTime: HrTime,
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();
Expand All @@ -74,7 +73,6 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>> extends MetricSto
collectors,
this._instrumentDescriptor,
accumulations,
sdkStartTime,
collectionTime
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { Context } from '@opentelemetry/api';
import { Context, HrTime } from '@opentelemetry/api';
import { MetricAttributes } from '@opentelemetry/api-metrics';
import { Maybe } from '../utils';
import { Accumulation, Aggregator } from '../aggregator/types';
Expand All @@ -35,31 +35,36 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {

constructor(private _aggregator: Aggregator<T>) {}

/** Bind an efficient storage handle for a set of attributes. */
private bind(attributes: MetricAttributes) {
return this._activeCollectionStorage.getOrDefault(attributes, () => this._aggregator.createAccumulation());
}

record(value: number, attributes: MetricAttributes, _context: Context) {
const accumulation = this.bind(attributes);
record(value: number, attributes: MetricAttributes, _context: Context, collectionTime: HrTime) {
const accumulation = this._activeCollectionStorage.getOrDefault(
attributes,
() => this._aggregator.createAccumulation(collectionTime)
);
accumulation?.record(value);
dyladan marked this conversation as resolved.
Show resolved Hide resolved
}

batchCumulate(measurements: AttributeHashMap<number>) {
batchCumulate(measurements: AttributeHashMap<number>, collectionTime: HrTime) {
Array.from(measurements.entries()).forEach(([attributes, value, hashCode]) => {
let accumulation = this._aggregator.createAccumulation();
const accumulation = this._aggregator.createAccumulation(collectionTime);
accumulation?.record(value);
let delta = accumulation;
if (this._cumulativeMemoStorage.has(attributes, hashCode)) {
// has() returned true, previous is present.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const previous = this._cumulativeMemoStorage.get(attributes, hashCode)!;
accumulation = this._aggregator.diff(previous, accumulation);
delta = this._aggregator.diff(previous, accumulation);
}

// Save the current record and the delta record.
this._cumulativeMemoStorage.set(attributes, accumulation, hashCode);
this._activeCollectionStorage.set(attributes, accumulation, hashCode);
this._activeCollectionStorage.set(attributes, delta, hashCode);
});
}

/**
* Returns a collection of delta metrics. Start time is the when first
* time event collected.
*/
collect() {
const unreportedDelta = this._activeCollectionStorage;
this._activeCollectionStorage = new AttributeHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ export class HashMap<KeyType, ValueType, HashCodeType> {
return this._valueMap.has(hashCode);
}

*keys(): IterableIterator<[KeyType, HashCodeType]> {
const keyIterator = this._keyMap.entries();
let next = keyIterator.next();
while (next.done !== true) {
yield [ next.value[1], next.value[0]];
next = keyIterator.next();
}
}

*entries(): IterableIterator<[KeyType, ValueType, HashCodeType]> {
const valueIterator = this._valueMap.entries();
let next = valueIterator.next();
Expand Down
Loading