Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sdk-metrics): prevent per-reader storages from keeping unreported accumulations in memory #4163

Merged
merged 13 commits into from
Oct 10, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

* fix(sdk-trace-base): BatchSpanProcessor flushes when `maxExportBatchSize` is reached [#3958](https://github.com/open-telemetry/opentelemetry-js/pull/3958) @nordfjord
* fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155)
* fix(sdk-metrics): prevent per-reader storages from keeping unreported accumulations in memory [#4163](https://github.com/open-telemetry/opentelemetry-js/pull/4163) @pichlermarc
* fixes a memory leak which occurred when two or more `MetricReader` instances are registered to a `MeterProvider`
* fix(sdk-metrics): do not report empty scopes and metrics [#4135](https://github.com/open-telemetry/opentelemetry-js/pull/4135) @pichlermarc
* Instruments that were created, but did not have measurements will not be exported anymore
* Meters (Scopes) that were created, but did not have any instruments with measurements under them will not be exported anymore.
Expand Down
10 changes: 6 additions & 4 deletions packages/sdk-metrics/src/state/AsyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>>
constructor(
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor
private _attributesProcessor: AttributesProcessor,
collectorHandles: MetricCollectorHandle[]
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(
aggregator,
collectorHandles
);
}

record(measurements: AttributeHashMap<number>, observationTime: HrTime) {
Expand All @@ -66,14 +70,12 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>>
*/
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
this._instrumentDescriptor,
accumulations,
collectionTime
Expand Down
15 changes: 7 additions & 8 deletions packages/sdk-metrics/src/state/MeterSharedState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,7 @@ export class MeterSharedState {

const metricDataList = storages
.map(metricStorage => {
return metricStorage.collect(
collector,
this._meterProviderSharedState.metricCollectors,
collectionTime
);
return metricStorage.collect(collector, collectionTime);
})
.filter(isNotNullish);

Expand Down Expand Up @@ -145,7 +141,8 @@ export class MeterSharedState {
const viewStorage = new MetricStorageType(
viewDescriptor,
aggregator,
view.attributesProcessor
view.attributesProcessor,
this._meterProviderSharedState.metricCollectors
) as R;
this.metricStorageRegistry.register(viewStorage);
return viewStorage;
Expand All @@ -169,7 +166,8 @@ export class MeterSharedState {
const storage = new MetricStorageType(
descriptor,
aggregator,
AttributesProcessor.Noop()
AttributesProcessor.Noop(),
[collector]
) as R;
this.metricStorageRegistry.registerForCollector(collector, storage);
return storage;
Expand All @@ -191,6 +189,7 @@ interface MetricStorageConstructor {
new (
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<Maybe<Accumulation>>,
attributesProcessor: AttributesProcessor
attributesProcessor: AttributesProcessor,
collectors: MetricCollectorHandle[]
): MetricStorage;
}
1 change: 0 additions & 1 deletion packages/sdk-metrics/src/state/MetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export abstract class MetricStorage {
*/
abstract collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData>;

Expand Down
10 changes: 6 additions & 4 deletions packages/sdk-metrics/src/state/SyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>>
constructor(
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor
private _attributesProcessor: AttributesProcessor,
collectorHandles: MetricCollectorHandle[]
) {
super(instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(aggregator);
this._temporalMetricStorage = new TemporalMetricProcessor(
aggregator,
collectorHandles
);
}

record(
Expand All @@ -66,14 +70,12 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>>
*/
collect(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
collectionTime: HrTime
): Maybe<MetricData> {
const accumulations = this._deltaMetricStorage.collect();

return this._temporalMetricStorage.buildMetrics(
collector,
collectors,
this._instrumentDescriptor,
accumulations,
collectionTime
Expand Down
26 changes: 15 additions & 11 deletions packages/sdk-metrics/src/state/TemporalMetricProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {
LastReportedHistory<T>
>();

constructor(private _aggregator: Aggregator<T>) {}
constructor(
private _aggregator: Aggregator<T>,
collectorHandles: MetricCollectorHandle[]
) {
collectorHandles.forEach(handle => {
this._unreportedAccumulations.set(handle, []);
});
}

/**
* Builds the {@link MetricData} streams to report against a specific MetricCollector.
Expand All @@ -74,12 +81,11 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {
*/
buildMetrics(
collector: MetricCollectorHandle,
collectors: MetricCollectorHandle[],
instrumentDescriptor: InstrumentDescriptor,
currentAccumulations: AttributeHashMap<T>,
collectionTime: HrTime
): Maybe<MetricData> {
this._stashAccumulations(collectors, currentAccumulations);
this._stashAccumulations(currentAccumulations);
const unreportedAccumulations =
this._getMergedUnreportedAccumulations(collector);

Expand Down Expand Up @@ -148,18 +154,16 @@ export class TemporalMetricProcessor<T extends Maybe<Accumulation>> {
);
}

private _stashAccumulations(
collectors: MetricCollectorHandle[],
currentAccumulation: AttributeHashMap<T>
) {
collectors.forEach(it => {
let stash = this._unreportedAccumulations.get(it);
private _stashAccumulations(currentAccumulation: AttributeHashMap<T>) {
const registeredCollectors = this._unreportedAccumulations.keys();
for (const collector of registeredCollectors) {
let stash = this._unreportedAccumulations.get(collector);
if (stash === undefined) {
stash = [];
this._unreportedAccumulations.set(it, stash);
this._unreportedAccumulations.set(collector, stash);
}
stash.push(currentAccumulation);
});
}
}

private _getMergedUnreportedAccumulations(collector: MetricCollectorHandle) {
Expand Down
Loading