Skip to content

Commit

Permalink
refactor: batch observer to be independent from metric types
Browse files Browse the repository at this point in the history
- BatchObservers do not have to be created with names
- BatchObservers do not accept instruments for their own
  • Loading branch information
legendecas committed Dec 2, 2020
1 parent 8933661 commit bf8ec5c
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 153 deletions.
2 changes: 1 addition & 1 deletion examples/metrics/metrics/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const cpuUsageMetric = meter.createValueObserver('cpu_usage_per_app', {
description: 'Example of sync value observer used with async batch observer',
});

meter.createBatchObserver('metric_batch_observer', (observerBatchResult) => {
meter.createBatchObserver((observerBatchResult) => {
Promise.all([
someAsyncMetrics(),
// simulate waiting
Expand Down
7 changes: 2 additions & 5 deletions packages/opentelemetry-api/src/metrics/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
Counter,
ValueRecorder,
ValueObserver,
BatchObserver,
BatchMetricOptions,
UpDownCounter,
} from './Metric';
Expand Down Expand Up @@ -82,15 +81,13 @@ export interface Meter {
): ValueObserver;

/**
* Creates a new `BatchObserver` metric, can be used to update many metrics
* Creates a new `BatchObserver`, can be used to update many observer metrics
* at the same time and when operations needs to be async
* @param name the name of the metric.
* @param callback the batch observer callback
* @param [options] the metric batch options.
*/
createBatchObserver(
name: string,
callback: (batchObserverResult: BatchObserverResult) => void,
options?: BatchMetricOptions
): BatchObserver;
): void;
}
3 changes: 0 additions & 3 deletions packages/opentelemetry-api/src/metrics/Metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ export type UpDownSumObserver = BaseObserver;
/** Base interface for the SumObserver metrics. */
export type SumObserver = BaseObserver;

/** Base interface for the Batch Observer metrics. */
export type BatchObserver = Metric;

/**
* key-value pairs passed by the user.
*/
Expand Down
21 changes: 5 additions & 16 deletions packages/opentelemetry-api/src/metrics/NoopMeter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import {
Counter,
ValueRecorder,
ValueObserver,
BatchObserver,
UpDownCounter,
BaseObserver,
} from './Metric';
Expand Down Expand Up @@ -90,10 +89,9 @@ export class NoopMeter implements Meter {
* @param callback the batch observer callback
*/
createBatchObserver(
_name: string,
_callback: (batchObserverResult: BatchObserverResult) => void
): BatchObserver {
return NOOP_BATCH_OBSERVER_METRIC;
): void {
return;
}
}

Expand Down Expand Up @@ -131,24 +129,21 @@ export class NoopMetric<T> implements UnboundMetric<T> {
}
}

export class NoopCounterMetric
extends NoopMetric<BoundCounter>
export class NoopCounterMetric extends NoopMetric<BoundCounter>
implements Counter {
add(value: number, labels: Labels) {
this.bind(labels).add(value);
}
}

export class NoopValueRecorderMetric
extends NoopMetric<BoundValueRecorder>
export class NoopValueRecorderMetric extends NoopMetric<BoundValueRecorder>
implements ValueRecorder {
record(value: number, labels: Labels) {
this.bind(labels).record(value);
}
}

export class NoopBaseObserverMetric
extends NoopMetric<BoundBaseObserver>
export class NoopBaseObserverMetric extends NoopMetric<BoundBaseObserver>
implements BaseObserver {
observation() {
return {
Expand All @@ -158,10 +153,6 @@ export class NoopBaseObserverMetric
}
}

export class NoopBatchObserverMetric
extends NoopMetric<void>
implements BatchObserver {}

export class NoopBoundCounter implements BoundCounter {
add(_value: number): void {
return;
Expand Down Expand Up @@ -203,5 +194,3 @@ export const NOOP_UP_DOWN_SUM_OBSERVER_METRIC = new NoopBaseObserverMetric(
export const NOOP_SUM_OBSERVER_METRIC = new NoopBaseObserverMetric(
NOOP_BOUND_BASE_OBSERVER
);

export const NOOP_BATCH_OBSERVER_METRIC = new NoopBatchObserverMetric();
2 changes: 1 addition & 1 deletion packages/opentelemetry-metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ const MemUsageMetric = meter.createValueObserver('mem_usage_per_app', {
description: 'Memory',
});

meter.createBatchObserver('metric_batch_observer', (observerBatchResult) => {
meter.createBatchObserver((observerBatchResult) => {
getSomeAsyncMetrics().then(metrics => {
observerBatchResult.observe({ app: 'myApp' }, [
cpuUsageMetric.observation(metrics.value1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,32 @@
*/

import * as api from '@opentelemetry/api';
import { InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { Logger, NoopLogger } from '@opentelemetry/api';
import { BatchObserverResult } from './BatchObserverResult';
import { BoundObserver } from './BoundInstrument';
import { Batcher } from './export/Batcher';
import { MetricKind, MetricRecord } from './export/types';
import { Metric } from './Metric';
import { MetricRecord } from './export/types';

const NOOP_CALLBACK = () => {};
const MAX_TIMEOUT_UPDATE_MS = 500;

/** This is a SDK implementation of Batch Observer Metric. */
export class BatchObserverMetric
extends Metric<BoundObserver>
implements api.BatchObserver {
export class BatchObserver {
private _callback: (observerResult: api.BatchObserverResult) => void;
private _maxTimeoutUpdateMS: number;
private _logger: Logger;

constructor(
name: string,
options: api.BatchMetricOptions,
private readonly _batcher: Batcher,
resource: Resource,
instrumentationLibrary: InstrumentationLibrary,
callback?: (observerResult: api.BatchObserverResult) => void
) {
super(
name,
options,
MetricKind.BATCH_OBSERVER,
resource,
instrumentationLibrary
);
this._logger = options.logger ?? new NoopLogger();
this._maxTimeoutUpdateMS =
options.maxTimeoutUpdateMS ?? MAX_TIMEOUT_UPDATE_MS;
this._callback = callback || NOOP_CALLBACK;
}

protected _makeInstrument(labels: api.Labels): BoundObserver {
return new BoundObserver(
labels,
this._disabled,
this._valueType,
this._logger,
this._batcher.aggregatorFor(this._descriptor)
);
}

getMetricRecord(): Promise<MetricRecord[]> {
collect(): Promise<MetricRecord[]> {
this._logger.debug('getMetricRecord - start');
return new Promise((resolve, reject) => {
return new Promise(resolve => {
const observerResult = new BatchObserverResult();

// cancels after MAX_TIMEOUT_MS - no more waiting for results
Expand All @@ -74,14 +49,14 @@ export class BatchObserverMetric
// remove callback to prevent user from updating the values later if
// for any reason the observerBatchResult will be referenced
observerResult.onObserveCalled();
super.getMetricRecord().then(resolve, reject);
resolve();
this._logger.debug('getMetricRecord - timeout');
}, this._maxTimeoutUpdateMS);

// sets callback for each "observe" method
observerResult.onObserveCalled(() => {
clearTimeout(timer);
super.getMetricRecord().then(resolve, reject);
resolve();
this._logger.debug('getMetricRecord - end');
});

Expand Down
53 changes: 14 additions & 39 deletions packages/opentelemetry-metrics/src/Meter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
import * as api from '@opentelemetry/api';
import { ConsoleLogger, InstrumentationLibrary } from '@opentelemetry/core';
import { Resource } from '@opentelemetry/resources';
import { BatchObserverMetric } from './BatchObserverMetric';
import { BatchObserver } from './BatchObserver';
import { BaseBoundInstrument } from './BoundInstrument';
import { MetricKind } from './export/types';
import { UpDownCounterMetric } from './UpDownCounterMetric';
import { CounterMetric } from './CounterMetric';
import { UpDownSumObserverMetric } from './UpDownSumObserverMetric';
Expand All @@ -37,6 +36,7 @@ import { NoopExporter } from './export/NoopExporter';
*/
export class Meter implements api.Meter {
private readonly _logger: api.Logger;
private readonly _batchObservers: BatchObserver[] = [];
private readonly _metrics = new Map<string, Metric<BaseBoundInstrument>>();
private readonly _batcher: Batcher;
private readonly _resource: Resource;
Expand Down Expand Up @@ -258,35 +258,20 @@ export class Meter implements api.Meter {

/**
* Creates a new batch observer metric.
* @param name the name of the metric.
* @param callback the batch observer callback
* @param [options] the metric batch options.
*/
createBatchObserver(
name: string,
callback: (observerResult: api.BatchObserverResult) => void,
options: api.BatchMetricOptions = {}
): api.BatchObserver {
if (!this._isValidName(name)) {
this._logger.warn(
`Invalid metric name ${name}. Defaulting to noop metric implementation.`
);
return api.NOOP_BATCH_OBSERVER_METRIC;
}
) {
const opt: api.BatchMetricOptions = {
logger: this._logger,
...DEFAULT_METRIC_OPTIONS,
...options,
};
const batchObserver = new BatchObserverMetric(
name,
opt,
this._batcher,
this._resource,
this._instrumentationLibrary,
callback
);
this._registerMetric(name, batchObserver);
const batchObserver = new BatchObserver(opt, callback);
this._batchObservers.push(batchObserver);
return batchObserver;
}

Expand All @@ -299,27 +284,17 @@ export class Meter implements api.Meter {
*/
async collect(): Promise<void> {
// call batch observers first
const batchObservers = Array.from(this._metrics.values())
.filter(metric => {
return metric.getKind() === MetricKind.BATCH_OBSERVER;
})
.map(metric => {
return metric.getMetricRecord();
});
await Promise.all(batchObservers).then(records => {
records.forEach(metrics => {
metrics.forEach(metric => this._batcher.process(metric));
});
});
const observations = Array.from(this._batchObservers.values()).map(
observer => {
return observer.collect();
}
);
await Promise.all(observations);

// after this all remaining metrics can be run
const metrics = Array.from(this._metrics.values())
.filter(metric => {
return metric.getKind() !== MetricKind.BATCH_OBSERVER;
})
.map(metric => {
return metric.getMetricRecord();
});
const metrics = Array.from(this._metrics.values()).map(metric => {
return metric.getMetricRecord();
});

await Promise.all(metrics).then(records => {
records.forEach(metrics => {
Expand Down
Loading

0 comments on commit bf8ec5c

Please sign in to comment.