Skip to content

Commit

Permalink
feat: metric aggregation temporality controls (#2902)
Browse files Browse the repository at this point in the history
* feat: aggregation temporality controls

* chore: update CHANGELOG

* refactor: move named otlp temporality selectors to OTLPMetricExporterBase

* fix: revert accidental protos change

* refactor: rename preferredAggregationTemporality to temporalityPreference

* doc: better changelog wording

* fix: remove unnecessary files
  • Loading branch information
seemk authored May 10, 2022
1 parent 858f6ce commit 1ee1b28
Show file tree
Hide file tree
Showing 44 changed files with 221 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file.

### :boom: Breaking Change

* feat(metrics): metric readers and exporters now select aggregation temporality based on instrument type [#2902](https://github.com/open-telemetry/opentelemetry-js/pull/2902) @seemk
* chore: remove unused InstrumentationConfig#path [#2944](https://github.com/open-telemetry/opentelemetry-js/pull/2944) @flarna

### :rocket: (Enhancement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/

import {
defaultExporterTemporality,
defaultOptions,
OTLPMetricExporterBase,
OTLPMetricExporterOptions
} from '@opentelemetry/exporter-metrics-otlp-http';
import { AggregationTemporality, ResourceMetrics } from '@opentelemetry/sdk-metrics-base';
import { ResourceMetrics } from '@opentelemetry/sdk-metrics-base';
import {
OTLPGRPCExporterConfigNode,
OTLPGRPCExporterNodeBase,
Expand All @@ -34,9 +33,7 @@ import { createExportMetricsServiceRequest, IExportMetricsServiceRequest } from
const DEFAULT_COLLECTOR_URL = 'localhost:4317';


class OTLPMetricExporterProxy extends OTLPGRPCExporterNodeBase<ResourceMetrics,
IExportMetricsServiceRequest> {
protected readonly _aggregationTemporality: AggregationTemporality;
class OTLPMetricExporterProxy extends OTLPGRPCExporterNodeBase<ResourceMetrics, IExportMetricsServiceRequest> {

constructor(config: OTLPGRPCExporterConfigNode & OTLPMetricExporterOptions= defaultOptions) {
super(config);
Expand All @@ -45,7 +42,6 @@ class OTLPMetricExporterProxy extends OTLPGRPCExporterNodeBase<ResourceMetrics,
for (const [k, v] of Object.entries(headers)) {
this.metadata.set(k, v);
}
this._aggregationTemporality = config.aggregationTemporality ?? defaultExporterTemporality;
}

getServiceProtoPath(): string {
Expand All @@ -67,7 +63,7 @@ class OTLPMetricExporterProxy extends OTLPGRPCExporterNodeBase<ResourceMetrics,
}

convert(metrics: ResourceMetrics[]): IExportMetricsServiceRequest {
return createExportMetricsServiceRequest(metrics, this._aggregationTemporality);
return createExportMetricsServiceRequest(metrics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ const testOTLPMetricExporter = (params: TestParams) =>
url: 'grpcs://' + address,
credentials,
metadata: params.metadata,
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});

setUp();
Expand Down Expand Up @@ -182,15 +182,15 @@ const testOTLPMetricExporter = (params: TestParams) =>
headers: {
foo: 'bar',
},
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
const args = warnStub.args[0];
assert.strictEqual(args[0], 'Headers cannot be set when using grpc');
});
it('should warn about path in url', () => {
collectorExporter = new OTLPMetricExporter({
url: `http://${address}/v1/metrics`,
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
const args = warnStub.args[0];
assert.strictEqual(
Expand Down Expand Up @@ -262,7 +262,7 @@ describe('OTLPMetricExporter - node (getDefaultUrl)', () => {
const url = 'http://foo.bar.com';
const collectorExporter = new OTLPMetricExporter({
url,
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
setTimeout(() => {
assert.strictEqual(collectorExporter._otlpExporter.url, 'foo.bar.com');
Expand Down Expand Up @@ -307,7 +307,7 @@ describe('when configuring via environment', () => {
envSource.OTEL_EXPORTER_OTLP_METRICS_HEADERS = 'foo=boo';
const collectorExporter = new OTLPMetricExporter({
metadata,
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
assert.deepStrictEqual(collectorExporter._otlpExporter.metadata?.get('foo'), ['boo']);
assert.deepStrictEqual(collectorExporter._otlpExporter.metadata?.get('bar'), ['foo']);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@ import { Resource } from '@opentelemetry/resources';
import * as assert from 'assert';
import * as grpc from '@grpc/grpc-js';
import { VERSION } from '@opentelemetry/core';
import { ExplicitBucketHistogramAggregation, MeterProvider, MetricReader } from '@opentelemetry/sdk-metrics-base';
import {
AggregationTemporality,
ExplicitBucketHistogramAggregation,
MeterProvider,
MetricReader,
} from '@opentelemetry/sdk-metrics-base';
import { IKeyValue, IMetric, IResource } from '@opentelemetry/otlp-transformer';

export class TestMetricReader extends MetricReader {
class TestMetricReader extends MetricReader {
selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}

protected onForceFlush(): Promise<void> {
return Promise.resolve(undefined);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,51 @@
*/

import { ExportResult } from '@opentelemetry/core';
import { AggregationTemporality, PushMetricExporter, ResourceMetrics } from '@opentelemetry/sdk-metrics-base';
import {
AggregationTemporality,
AggregationTemporalitySelector,
InstrumentType,
PushMetricExporter,
ResourceMetrics
} from '@opentelemetry/sdk-metrics-base';
import { defaultOptions, OTLPMetricExporterOptions } from './OTLPMetricExporterOptions';
import { OTLPExporterBase } from '@opentelemetry/otlp-exporter-base';
import { IExportMetricsServiceRequest } from '@opentelemetry/otlp-transformer';

export const CumulativeTemporalitySelector: AggregationTemporalitySelector = () => AggregationTemporality.CUMULATIVE;

export const DeltaTemporalitySelector: AggregationTemporalitySelector = (instrumentType: InstrumentType) => {
switch (instrumentType) {
case InstrumentType.COUNTER:
case InstrumentType.OBSERVABLE_COUNTER:
case InstrumentType.HISTOGRAM:
case InstrumentType.OBSERVABLE_GAUGE:
return AggregationTemporality.DELTA;
case InstrumentType.UP_DOWN_COUNTER:
case InstrumentType.OBSERVABLE_UP_DOWN_COUNTER:
return AggregationTemporality.CUMULATIVE;
}
};

function chooseTemporalitySelector(temporalityPreference?: AggregationTemporality): AggregationTemporalitySelector {
if (temporalityPreference === AggregationTemporality.DELTA) {
return DeltaTemporalitySelector;
}

return CumulativeTemporalitySelector;
}

export class OTLPMetricExporterBase<T extends OTLPExporterBase<OTLPMetricExporterOptions,
ResourceMetrics,
IExportMetricsServiceRequest>>
implements PushMetricExporter {
public _otlpExporter: T;
protected _preferredAggregationTemporality: AggregationTemporality;
protected _aggregationTemporalitySelector: AggregationTemporalitySelector;

constructor(exporter: T,
config: OTLPMetricExporterOptions = defaultOptions) {
this._otlpExporter = exporter;
this._preferredAggregationTemporality = config.aggregationTemporality ?? AggregationTemporality.CUMULATIVE;
this._aggregationTemporalitySelector = chooseTemporalitySelector(config.temporalityPreference);
}

export(metrics: ResourceMetrics, resultCallback: (result: ExportResult) => void): void {
Expand All @@ -45,7 +74,7 @@ implements PushMetricExporter {
return Promise.resolve();
}

getPreferredAggregationTemporality(): AggregationTemporality {
return this._preferredAggregationTemporality;
selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality {
return this._aggregationTemporalitySelector(instrumentType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { AggregationTemporality } from '@opentelemetry/sdk-metrics-base';
import { OTLPExporterConfigBase } from '@opentelemetry/otlp-exporter-base';

export interface OTLPMetricExporterOptions extends OTLPExporterConfigBase {
aggregationTemporality?: AggregationTemporality
temporalityPreference?: AggregationTemporality
}
export const defaultExporterTemporality = AggregationTemporality.CUMULATIVE;
export const defaultOptions = {aggregationTemporality: defaultExporterTemporality};
export const defaultOptions = {temporalityPreference: defaultExporterTemporality};
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

import { AggregationTemporality, ResourceMetrics } from '@opentelemetry/sdk-metrics-base';
import { ResourceMetrics } from '@opentelemetry/sdk-metrics-base';
import { baggageUtils, getEnv } from '@opentelemetry/core';
import { defaultExporterTemporality, defaultOptions, OTLPMetricExporterOptions } from '../../OTLPMetricExporterOptions';
import { defaultOptions, OTLPMetricExporterOptions } from '../../OTLPMetricExporterOptions';
import { OTLPMetricExporterBase } from '../../OTLPMetricExporterBase';
import {
appendResourcePathToUrlIfNotPresent,
Expand All @@ -28,9 +28,7 @@ import { createExportMetricsServiceRequest, IExportMetricsServiceRequest } from
const DEFAULT_COLLECTOR_RESOURCE_PATH = '/v1/metrics';
const DEFAULT_COLLECTOR_URL = `http://localhost:4318${DEFAULT_COLLECTOR_RESOURCE_PATH}`;

class OTLPExporterBrowserProxy extends OTLPExporterBrowserBase<ResourceMetrics,
IExportMetricsServiceRequest> {
protected readonly _aggregationTemporality: AggregationTemporality;
class OTLPExporterBrowserProxy extends OTLPExporterBrowserBase<ResourceMetrics, IExportMetricsServiceRequest> {

constructor(config: OTLPMetricExporterOptions & OTLPExporterConfigBase = defaultOptions) {
super(config);
Expand All @@ -40,7 +38,6 @@ class OTLPExporterBrowserProxy extends OTLPExporterBrowserBase<ResourceMetrics,
getEnv().OTEL_EXPORTER_OTLP_METRICS_HEADERS
)
);
this._aggregationTemporality = config.aggregationTemporality ?? defaultExporterTemporality;
}

getDefaultUrl(config: OTLPExporterConfigBase): string {
Expand All @@ -54,10 +51,7 @@ class OTLPExporterBrowserProxy extends OTLPExporterBrowserBase<ResourceMetrics,
}

convert(metrics: ResourceMetrics[]): IExportMetricsServiceRequest {
return createExportMetricsServiceRequest(
metrics,
this._aggregationTemporality
);
return createExportMetricsServiceRequest(metrics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

import { ResourceMetrics, AggregationTemporality } from '@opentelemetry/sdk-metrics-base';
import { ResourceMetrics } from '@opentelemetry/sdk-metrics-base';
import { getEnv, baggageUtils} from '@opentelemetry/core';
import { defaultExporterTemporality, defaultOptions, OTLPMetricExporterOptions } from '../../OTLPMetricExporterOptions';
import { defaultOptions, OTLPMetricExporterOptions } from '../../OTLPMetricExporterOptions';
import { OTLPMetricExporterBase } from '../../OTLPMetricExporterBase';
import {
appendResourcePathToUrlIfNotPresent,
Expand All @@ -28,9 +28,7 @@ import { createExportMetricsServiceRequest, IExportMetricsServiceRequest } from
const DEFAULT_COLLECTOR_RESOURCE_PATH = '/v1/metrics';
const DEFAULT_COLLECTOR_URL = `http://localhost:4318${DEFAULT_COLLECTOR_RESOURCE_PATH}`;

class OTLPExporterNodeProxy extends OTLPExporterNodeBase<ResourceMetrics,
IExportMetricsServiceRequest> {
protected readonly _aggregationTemporality: AggregationTemporality;
class OTLPExporterNodeProxy extends OTLPExporterNodeBase<ResourceMetrics, IExportMetricsServiceRequest> {

constructor(config: OTLPExporterNodeConfigBase & OTLPMetricExporterOptions = defaultOptions) {
super(config);
Expand All @@ -40,14 +38,10 @@ class OTLPExporterNodeProxy extends OTLPExporterNodeBase<ResourceMetrics,
getEnv().OTEL_EXPORTER_OTLP_METRICS_HEADERS
)
);
this._aggregationTemporality = config.aggregationTemporality ?? defaultExporterTemporality;
}

convert(metrics: ResourceMetrics[]): IExportMetricsServiceRequest {
return createExportMetricsServiceRequest(
metrics,
this._aggregationTemporality
);
return createExportMetricsServiceRequest(metrics);
}

getDefaultUrl(config: OTLPExporterNodeConfigBase): string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ describe('OTLPMetricExporter - web', () => {
beforeEach(() => {
collectorExporter = new OTLPMetricExporter({
url: 'http://foo.bar.com',
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
});
it('should successfully send metrics using sendBeacon', done => {
Expand Down Expand Up @@ -191,7 +191,7 @@ describe('OTLPMetricExporter - web', () => {
(window.navigator as any).sendBeacon = false;
collectorExporter = new OTLPMetricExporter({
url: 'http://foo.bar.com',
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
// Overwrites the start time to make tests consistent
Object.defineProperty(collectorExporter, '_startTime', {
Expand Down Expand Up @@ -316,7 +316,7 @@ describe('OTLPMetricExporter - web', () => {
beforeEach(() => {
collectorExporterConfig = {
headers: customHeaders,
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
};
server = sinon.fakeServer.create();
});
Expand Down Expand Up @@ -408,7 +408,7 @@ describe('when configuring via environment', () => {
envSource.OTEL_EXPORTER_OTLP_HEADERS = 'foo=bar';
const collectorExporter = new OTLPMetricExporter({
headers: {},
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
assert.strictEqual(collectorExporter['_otlpExporter']['_headers'].foo, 'bar');
envSource.OTEL_EXPORTER_OTLP_HEADERS = '';
Expand All @@ -418,7 +418,7 @@ describe('when configuring via environment', () => {
envSource.OTEL_EXPORTER_OTLP_METRICS_HEADERS = 'foo=boo';
const collectorExporter = new OTLPMetricExporter({
headers: {},
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
assert.strictEqual(collectorExporter['_otlpExporter']['_headers'].foo, 'boo');
assert.strictEqual(collectorExporter['_otlpExporter']['_headers'].bar, 'foo');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { Resource } from '@opentelemetry/resources';
import * as assert from 'assert';
import { InstrumentationLibrary, VERSION } from '@opentelemetry/core';
import {
AggregationTemporality,
ExplicitBucketHistogramAggregation,
MeterProvider,
MetricReader
Expand All @@ -43,14 +44,18 @@ if (typeof Buffer === 'undefined') {
};
}

export class TestMetricReader extends MetricReader {
class TestMetricReader extends MetricReader {
protected onForceFlush(): Promise<void> {
return Promise.resolve(undefined);
}

protected onShutdown(): Promise<void> {
return Promise.resolve(undefined);
}

selectAggregationTemporality() {
return AggregationTemporality.CUMULATIVE;
}
}

const defaultResource = Resource.default().merge(new Resource({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ describe('OTLPMetricExporter - node with json over http', () => {
url: 'http://foo.bar.com',
keepAlive: true,
httpAgentOptions: { keepAliveMsecs: 2000 },
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
};

collectorExporter = new OTLPMetricExporter(collectorExporterConfig);
Expand Down Expand Up @@ -329,7 +329,7 @@ describe('OTLPMetricExporter - node with json over http', () => {
const url = 'http://foo.bar.com';
const collectorExporter = new OTLPMetricExporter({
url,
aggregationTemporality: AggregationTemporality.CUMULATIVE
temporalityPreference: AggregationTemporality.CUMULATIVE
});
setTimeout(() => {
assert.strictEqual(collectorExporter._otlpExporter.url, url);
Expand Down
Loading

0 comments on commit 1ee1b28

Please sign in to comment.