Skip to content

Commit

Permalink
feat(sdk-metrics-base): sum aggregator monotonicity
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed May 25, 2022
1 parent ba3e320 commit 5769113
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 24 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ All notable changes to experimental packages in this project will be documented
* fix(sdk-metrics-base): misbehaving aggregation temporality selector tolerance #2958 @legendecas
* feat(trace-otlp-grpc): configure security with env vars #2827 @svetlanabrennan
* feat(sdk-metrics-base): async instruments callback timeout #2742 @legendecas
* feat(sdk-metrics-base): add sum aggregator monotonicity support #2990 @legendecas

### :bug: (Bug Fix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export class SumAccumulation implements Accumulation {
export class SumAggregator implements Aggregator<SumAccumulation> {
public kind: AggregatorKind.SUM = AggregatorKind.SUM;

constructor (public monotonic: boolean) {}

createAccumulation() {
return new SumAccumulation();
}
Expand All @@ -52,6 +54,16 @@ export class SumAggregator implements Aggregator<SumAccumulation> {
* Returns a new DELTA aggregation by comparing two cumulative measurements.
*/
diff(previous: SumAccumulation, current: SumAccumulation): SumAccumulation {
const prevPv = previous.toPointValue();
const currPv = current.toPointValue();
/**
* If the SumAggregator is a monotonic one and the previous point value is
* greater than the current one, a reset or gap is deemed to happened.
* Return the current point value to prevent the value from been reset.
*/
if (this.monotonic && (prevPv > currPv)) {
return new SumAccumulation(currPv);
}
return new SumAccumulation(current.toPointValue() - previous.toPointValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {

batchCumulate(measurements: AttributeHashMap<number>) {
Array.from(measurements.entries()).forEach(([attributes, value, hashCode]) => {
let accumulation = this._aggregator.createAccumulation();
const accumulation = this._aggregator.createAccumulation();
accumulation?.record(value);
let delta = accumulation;
if (this._cumulativeMemoStorage.has(attributes, hashCode)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const previous = this._cumulativeMemoStorage.get(attributes, hashCode)!;
accumulation = this._aggregator.diff(previous, accumulation);
delta = this._aggregator.diff(previous, accumulation);
}

// Save the current record and the delta record.
this._cumulativeMemoStorage.set(attributes, accumulation, hashCode);
this._activeCollectionStorage.set(attributes, accumulation, hashCode);
this._activeCollectionStorage.set(attributes, delta, hashCode);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,19 @@ export class DropAggregation extends Aggregation {
* The default sum aggregation.
*/
export class SumAggregation extends Aggregation {
private static DEFAULT_INSTANCE = new SumAggregator();
createAggregator(_instrument: InstrumentDescriptor) {
return SumAggregation.DEFAULT_INSTANCE;
private static MONOTONIC_INSTANCE = new SumAggregator(true);
private static NON_MONOTONIC_INSTANCE = new SumAggregator(false);
createAggregator(instrument: InstrumentDescriptor) {
switch (instrument.type) {
case InstrumentType.COUNTER:
case InstrumentType.OBSERVABLE_COUNTER:
case InstrumentType.HISTOGRAM: {
return SumAggregation.MONOTONIC_INSTANCE;
}
default: {
return SumAggregation.NON_MONOTONIC_INSTANCE;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import { commonValues, defaultInstrumentDescriptor } from '../util';
describe('SumAggregator', () => {
describe('createAccumulation', () => {
it('no exceptions on createAccumulation', () => {
const aggregator = new SumAggregator();
const aggregator = new SumAggregator(true);
const accumulation = aggregator.createAccumulation();
assert(accumulation instanceof SumAccumulation);
});
});

describe('merge', () => {
it('no exceptions', () => {
const aggregator = new SumAggregator();
const aggregator = new SumAggregator(true);
const prev = aggregator.createAccumulation();
prev.record(1);
prev.record(2);
Expand All @@ -48,8 +48,8 @@ describe('SumAggregator', () => {
});

describe('diff', () => {
it('no exceptions', () => {
const aggregator = new SumAggregator();
it('non-monotonic', () => {
const aggregator = new SumAggregator(false);
const prev = aggregator.createAccumulation();
prev.record(1);
prev.record(2);
Expand All @@ -66,11 +66,25 @@ describe('SumAggregator', () => {
expected.record(3 + 4);
assert.deepStrictEqual(aggregator.diff(prev, curr), expected);
});

it('monotonic', () => {
const aggregator = new SumAggregator(true);
const prev = aggregator.createAccumulation();
prev.record(10);

// A new record with the value been reset.
const curr = aggregator.createAccumulation();
curr.record(3);

const expected = aggregator.createAccumulation();
expected.record(3);
assert.deepStrictEqual(aggregator.diff(prev, curr), expected);
});
});

describe('toMetricData', () => {
it('transform without exception', () => {
const aggregator = new SumAggregator();
const aggregator = new SumAggregator(true);
const accumulation = aggregator.createAccumulation();
accumulation.record(1);
accumulation.record(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('AsyncMetricStorage', () => {
const observableRegistry = new ObservableRegistry();
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(),
new SumAggregator(true),
new NoopAttributesProcessor(),
);
const observable = new ObservableInstrument(
Expand Down Expand Up @@ -112,6 +112,146 @@ describe('AsyncMetricStorage', () => {
assertDataPoint(metric.dataPoints[2], { key: '3' }, 3);
}
});

it('should detect resets and gaps for monotonic sum metrics', async () => {
const delegate = new ObservableCallbackDelegate();
const observableRegistry = new ObservableRegistry();
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(true),
new NoopAttributesProcessor(),
);
const observable = new ObservableInstrument(
defaultInstrumentDescriptor,
[metricStorage],
observableRegistry
);

observableRegistry.addCallback(delegate.getCallback(), observable);

// Observe a metric
delegate.setDelegate(observableResult => {
observableResult.observe(100, { key: '1' });
});
{
await observableRegistry.observe();
const metric = metricStorage.collect(
deltaCollector,
collectors,
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], { key: '1' }, 100);
}

// Observe a drop on the metric
delegate.setDelegate(observableResult => {
observableResult.observe(1, { key: '1' });
});
// The result data should not be diff-ed to be a negative value
{
await observableRegistry.observe();
const metric = metricStorage.collect(
deltaCollector,
collectors,
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], { key: '1' }, 1);
}

// Observe a new data point
delegate.setDelegate(observableResult => {
observableResult.observe(50, { key: '1' });
});
// The result data should now be a delta to the previous collection
{
await observableRegistry.observe();
const metric = metricStorage.collect(
deltaCollector,
[deltaCollector],
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], { key: '1' }, 49);
}
});

it('should not detect resets and gaps for non-monotonic sum metrics', async () => {
const delegate = new ObservableCallbackDelegate();
const observableRegistry = new ObservableRegistry();
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(false),
new NoopAttributesProcessor(),
);
const observable = new ObservableInstrument(
defaultInstrumentDescriptor,
[metricStorage],
observableRegistry
);

observableRegistry.addCallback(delegate.getCallback(), observable);

// Observe a metric
delegate.setDelegate(observableResult => {
observableResult.observe(100, { key: '1' });
});
{
await observableRegistry.observe();
const metric = metricStorage.collect(
deltaCollector,
collectors,
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], { key: '1' }, 100);
}

// Observe a drop on the metric
delegate.setDelegate(observableResult => {
observableResult.observe(1, { key: '1' });
});
// The result data should be a delta to the previous collection
{
await observableRegistry.observe();
const metric = metricStorage.collect(
deltaCollector,
collectors,
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], { key: '1' }, -99);
}

// Observe a new data point
delegate.setDelegate(observableResult => {
observableResult.observe(50, { key: '1' });
});
// The result data should be a delta to the previous collection
{
await observableRegistry.observe();
const metric = metricStorage.collect(
deltaCollector,
[deltaCollector],
sdkStartTime,
hrTime());

assertMetricData(metric, DataPointType.SINGULAR);
assert.strictEqual(metric.dataPoints.length, 1);
assertDataPoint(metric.dataPoints[0], { key: '1' }, 49);
}
});
});

describe('Cumulative Collector', () => {
Expand All @@ -121,7 +261,7 @@ describe('AsyncMetricStorage', () => {
const observableRegistry = new ObservableRegistry();
const metricStorage = new AsyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(),
new SumAggregator(true),
new NoopAttributesProcessor(),
);
const observable = new ObservableInstrument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('DeltaMetricProcessor', () => {
});

it('no exceptions on record with no-drop aggregator', () => {
const metricProcessor = new DeltaMetricProcessor(new SumAggregator());
const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true));

for (const value of commonValues) {
for (const attributes of commonAttributes) {
Expand All @@ -58,7 +58,7 @@ describe('DeltaMetricProcessor', () => {
});

it('no exceptions on record with no-drop aggregator', () => {
const metricProcessor = new DeltaMetricProcessor(new SumAggregator());
const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true));

const measurements = new AttributeHashMap<number>();
for (const value of commonValues) {
Expand All @@ -70,7 +70,7 @@ describe('DeltaMetricProcessor', () => {
});

it('should compute the diff of accumulations', () => {
const metricProcessor = new DeltaMetricProcessor(new SumAggregator());
const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true));

{
const measurements = new AttributeHashMap<number>();
Expand All @@ -94,7 +94,7 @@ describe('DeltaMetricProcessor', () => {

describe('collect', () => {
it('should export', () => {
const metricProcessor = new DeltaMetricProcessor(new SumAggregator());
const metricProcessor = new DeltaMetricProcessor(new SumAggregator(true));

metricProcessor.record(1, { attribute: '1' }, api.ROOT_CONTEXT);
metricProcessor.record(2, { attribute: '1' }, api.ROOT_CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ const sdkStartTime = hrTime();
describe('SyncMetricStorage', () => {
describe('record', () => {
it('no exceptions on record', () => {
const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor());
const metricStorage = new SyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(true),
new NoopAttributesProcessor()
);

for (const value of commonValues) {
for (const attributes of commonAttributes) {
Expand All @@ -53,7 +57,11 @@ describe('SyncMetricStorage', () => {
describe('Delta Collector', () => {
const collectors = [deltaCollector];
it('should collect and reset memos', async () => {
const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor());
const metricStorage = new SyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(true),
new NoopAttributesProcessor()
);
metricStorage.record(1, {}, api.context.active());
metricStorage.record(2, {}, api.context.active());
metricStorage.record(3, {}, api.context.active());
Expand Down Expand Up @@ -99,7 +107,11 @@ describe('SyncMetricStorage', () => {
describe('Cumulative Collector', () => {
const collectors = [cumulativeCollector];
it('should collect cumulative metrics', async () => {
const metricStorage = new SyncMetricStorage(defaultInstrumentDescriptor, new SumAggregator(), new NoopAttributesProcessor());
const metricStorage = new SyncMetricStorage(
defaultInstrumentDescriptor,
new SumAggregator(true),
new NoopAttributesProcessor()
);
metricStorage.record(1, {}, api.context.active());
metricStorage.record(2, {}, api.context.active());
metricStorage.record(3, {}, api.context.active());
Expand Down
Loading

0 comments on commit 5769113

Please sign in to comment.