Skip to content

Commit

Permalink
feat(sdk-metrics): align MetricReader with specification and other la…
Browse files Browse the repository at this point in the history
…nguage implementations (#3225)

Co-authored-by: Daniel Dyla <[email protected]>
  • Loading branch information
pichlermarc and dyladan authored Sep 12, 2022
1 parent be24d69 commit b73c29d
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 122 deletions.
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ All notable changes to experimental packages in this project will be documented
* `NoopObservableMetric`
* `NoopObservableUpDownCounterMetric`
* `NoopUpDownCounterMetric`
* feat(sdk-metrics): align MetricReader with specification and other language implementations [#3225](https://github.com/open-telemetry/opentelemetry-js/pull/3225) @pichlermarc
* chore(sdk-metrics): remove accidental export of the SDK `Meter` class [#3243](https://github.com/open-telemetry/opentelemetry-js/pull/3243) @pichlermarc

### :rocket: (Enhancement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import * as assert from 'assert';
import * as grpc from '@grpc/grpc-js';
import { VERSION } from '@opentelemetry/core';
import {
Aggregation,
AggregationTemporality,
ExplicitBucketHistogramAggregation,
MeterProvider,
MetricReader,
Expand All @@ -30,14 +28,6 @@ import {
import { IKeyValue, IMetric, IResource } from '@opentelemetry/otlp-transformer';

class TestMetricReader extends MetricReader {
selectAggregation() {
return Aggregation.Default();
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}

protected onForceFlush(): Promise<void> {
return Promise.resolve(undefined);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import { Resource } from '@opentelemetry/resources';
import * as assert from 'assert';
import { InstrumentationScope, VERSION } from '@opentelemetry/core';
import {
Aggregation,
AggregationTemporality,
ExplicitBucketHistogramAggregation,
MeterProvider,
MetricReader,
Expand Down Expand Up @@ -57,14 +55,6 @@ class TestMetricReader extends MetricReader {
protected onShutdown(): Promise<void> {
return Promise.resolve(undefined);
}

selectAggregation() {
return Aggregation.Default();
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}
}

export const HISTOGRAM_AGGREGATION_VIEW = new View({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import {
import { Resource } from '@opentelemetry/resources';
import * as assert from 'assert';
import {
Aggregation,
AggregationTemporality,
ExplicitBucketHistogramAggregation,
MeterProvider,
MetricReader,
Expand All @@ -35,14 +33,6 @@ import { IExportMetricsServiceRequest, IKeyValue, IMetric } from '@opentelemetry
import { Stream } from 'stream';

export class TestMetricReader extends MetricReader {
selectAggregation() {
return Aggregation.Default();
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}

protected onForceFlush(): Promise<void> {
return Promise.resolve(undefined);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ export class PrometheusExporter extends MetricReader {
* @param callback Callback to be called after a server was started
*/
constructor(config: ExporterConfig = {}, callback?: () => void) {
super();
super({
aggregationSelector: _instrumentType => Aggregation.Default(),
aggregationTemporalitySelector: _instrumentType => AggregationTemporality.CUMULATIVE
});
this._host =
config.host ||
process.env.OTEL_EXPORTER_PROMETHEUS_HOST ||
Expand Down Expand Up @@ -90,14 +93,6 @@ export class PrometheusExporter extends MetricReader {
}
}

selectAggregation(): Aggregation {
return Aggregation.Default();
}

selectAggregationTemporality(): AggregationTemporality {
return AggregationTemporality.CUMULATIVE;
}

override async onForceFlush(): Promise<void> {
/** do nothing */
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,10 @@ const attributes = {

class TestMetricReader extends MetricReader {
constructor() {
super();
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}

selectAggregation() {
return Aggregation.Default();
super({
aggregationTemporalitySelector: _instrumentType => AggregationTemporality.CUMULATIVE,
aggregationSelector: _instrumentType => Aggregation.Default()
});
}

async onForceFlush() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ export type AggregationSelector = (instrumentType: InstrumentType) => Aggregatio
* Aggregation temporality selector based on metric instrument types.
*/
export type AggregationTemporalitySelector = (instrumentType: InstrumentType) => AggregationTemporality;

export const DEFAULT_AGGREGATION_SELECTOR: AggregationSelector = _instrumentType => Aggregation.Default();
export const DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR: AggregationTemporalitySelector = _instrumentType => AggregationTemporality.CUMULATIVE;
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
ExportResult,
} from '@opentelemetry/core';
import { InstrumentType } from '../InstrumentDescriptor';
import { Aggregation } from '../view/Aggregation';

/**
* An interface that allows different metric services to export recorded data
Expand All @@ -44,7 +45,13 @@ export interface PushMetricExporter {
* Select the {@link AggregationTemporality} for the given
* {@link InstrumentType} for this exporter.
*/
selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality;
selectAggregationTemporality?(instrumentType: InstrumentType): AggregationTemporality;

/**
* Select the {@link Aggregation} for the given
* {@link InstrumentType} for this exporter.
*/
selectAggregation?(instrumentType: InstrumentType): Aggregation;

/**
* Returns a promise which resolves when the last exportation is completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,32 @@ import { MetricProducer } from './MetricProducer';
import { CollectionResult } from './MetricData';
import { callWithTimeout } from '../utils';
import { InstrumentType } from '../InstrumentDescriptor';
import { CollectionOptions, ForceFlushOptions, ShutdownOptions } from '../types';
import {
CollectionOptions,
ForceFlushOptions,
ShutdownOptions
} from '../types';
import { Aggregation } from '../view/Aggregation';
import {
AggregationSelector,
AggregationTemporalitySelector,
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR
} from './AggregationSelector';

export interface MetricReaderOptions {
/**
* Aggregation selector based on metric instrument types. If no views are
* configured for a metric instrument, a per-metric-reader aggregation is
* selected with this selector.
*/
aggregationSelector?: AggregationSelector;
/**
* Aggregation temporality selector based on metric instrument types. If
* not configured, cumulative is used for all instruments.
*/
aggregationTemporalitySelector?: AggregationTemporalitySelector;
}

/**
* A registered reader of metrics that, when linked to a {@link MetricProducer}, offers global
Expand All @@ -33,6 +57,15 @@ export abstract class MetricReader {
private _shutdown = false;
// MetricProducer used by this instance.
private _metricProducer?: MetricProducer;
private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector;
private readonly _aggregationSelector: AggregationSelector;

constructor(options?: MetricReaderOptions) {
this._aggregationSelector = options?.aggregationSelector ??
DEFAULT_AGGREGATION_SELECTOR;
this._aggregationTemporalitySelector = options?.aggregationTemporalitySelector ??
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;
}

/**
* Set the {@link MetricProducer} used by this instance.
Expand All @@ -51,13 +84,17 @@ export abstract class MetricReader {
* Select the {@link Aggregation} for the given {@link InstrumentType} for this
* reader.
*/
abstract selectAggregation(instrumentType: InstrumentType): Aggregation;
selectAggregation(instrumentType: InstrumentType): Aggregation {
return this._aggregationSelector(instrumentType);
}

/**
* Select the {@link AggregationTemporality} for the given
* {@link InstrumentType} for this reader.
*/
abstract selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality;
selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality {
return this._aggregationTemporalitySelector(instrumentType);
}

/**
* Handle once the SDK has initialized this {@link MetricReader}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,13 @@ import {
unrefTimer
} from '@opentelemetry/core';
import { MetricReader } from './MetricReader';
import { AggregationTemporality } from './AggregationTemporality';
import { InstrumentType } from '../InstrumentDescriptor';
import { PushMetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';
import { Aggregation } from '../view/Aggregation';
import { AggregationSelector } from './AggregationSelector';
import {
callWithTimeout,
TimeoutError
} from '../utils';

export type PeriodicExportingMetricReaderOptions = {
/**
* Aggregation selector based on metric instrument types. If no views are
* configured for a metric instrument, a per-metric-reader aggregation is
* selected with this selector.
*/
aggregationSelector?: AggregationSelector;
/**
* The backing exporter for the metric reader.
*/
Expand All @@ -50,21 +43,21 @@ export type PeriodicExportingMetricReaderOptions = {
exportTimeoutMillis?: number;
};

const DEFAULT_AGGREGATION_SELECTOR: AggregationSelector = Aggregation.Default;

/**
* {@link MetricReader} which collects metrics based on a user-configurable time interval, and passes the metrics to
* the configured {@link MetricExporter}
* the configured {@link PushMetricExporter}
*/
export class PeriodicExportingMetricReader extends MetricReader {
private _interval?: ReturnType<typeof setInterval>;
private _exporter: PushMetricExporter;
private readonly _exportInterval: number;
private readonly _exportTimeout: number;
private readonly _aggregationSelector: AggregationSelector;

constructor(options: PeriodicExportingMetricReaderOptions) {
super();
super({
aggregationSelector: options.exporter.selectAggregation?.bind(options.exporter),
aggregationTemporalitySelector: options.exporter.selectAggregationTemporality?.bind(options.exporter)
});

if (options.exportIntervalMillis !== undefined && options.exportIntervalMillis <= 0) {
throw Error('exportIntervalMillis must be greater than 0');
Expand All @@ -83,7 +76,6 @@ export class PeriodicExportingMetricReader extends MetricReader {
this._exportInterval = options.exportIntervalMillis ?? 60000;
this._exportTimeout = options.exportTimeoutMillis ?? 30000;
this._exporter = options.exporter;
this._aggregationSelector = options.aggregationSelector ?? DEFAULT_AGGREGATION_SELECTOR;
}

private async _runOnce(): Promise<void> {
Expand All @@ -98,9 +90,9 @@ export class PeriodicExportingMetricReader extends MetricReader {
if (result.code !== ExportResultCode.SUCCESS) {
reject(
result.error ??
new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
)
new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
)
);
} else {
resolve();
Expand Down Expand Up @@ -137,18 +129,4 @@ export class PeriodicExportingMetricReader extends MetricReader {

await this._exporter.shutdown();
}

/**
* @inheritdoc
*/
selectAggregation(instrumentType: InstrumentType): Aggregation {
return this._aggregationSelector(instrumentType);
}

/**
* @inheritdoc
*/
selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality {
return this._exporter.selectAggregationTemporality(instrumentType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export {

export {
MetricReader,
MetricReaderOptions
} from './export/MetricReader';

export {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,23 @@ import * as assert from 'assert';
import * as sinon from 'sinon';
import { MeterProvider } from '../../src/MeterProvider';
import { assertRejects } from '../test-utils';
import { emptyResourceMetrics, TestMetricProducer } from './TestMetricProducer';
import {
emptyResourceMetrics,
TestMetricProducer
} from './TestMetricProducer';
import { TestMetricReader } from './TestMetricReader';
import {
Aggregation,
AggregationTemporality
} from '../../src';
import {
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR,
} from '../../src/export/AggregationSelector';
import {
assertAggregationSelector,
assertAggregationTemporalitySelector
} from './utils';

describe('MetricReader', () => {
describe('setMetricProducer', () => {
Expand Down Expand Up @@ -80,4 +95,34 @@ describe('MetricReader', () => {
await reader.shutdown();
});
});

describe('selectAggregation', () => {
it('should override default when not provided with a selector', () => {
assertAggregationSelector(new TestMetricReader(), DEFAULT_AGGREGATION_SELECTOR);
assertAggregationSelector(new TestMetricReader({}), DEFAULT_AGGREGATION_SELECTOR);
});

it('should override default when provided with a selector', () => {
const reader = new TestMetricReader({
aggregationSelector: _instrumentType => Aggregation.Sum()
});
assertAggregationSelector(reader, _instrumentType => Aggregation.Sum());
reader.shutdown();
});
});

describe('selectAggregationTemporality', () => {
it('should override default when not provided with a selector', () => {
assertAggregationTemporalitySelector(new TestMetricReader(), DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR);
assertAggregationTemporalitySelector(new TestMetricReader({}), DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR);
});

it('should override default when provided with a selector', () => {
const reader = new TestMetricReader({
aggregationTemporalitySelector: _instrumentType => AggregationTemporality.DELTA
});
assertAggregationTemporalitySelector(reader, _instrumentType => AggregationTemporality.DELTA);
reader.shutdown();
});
});
});
Loading

0 comments on commit b73c29d

Please sign in to comment.