From 1f51d8c57ae11767631363ecbf339ae0539bf776 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Tue, 26 Sep 2023 11:25:01 +0200 Subject: [PATCH 1/6] fix(sdk-metrics): prevent per-reader storages from keeping unreported accumulations in memory --- examples/otlp-exporter-node/test.js | 0 .../src/state/AsyncMetricStorage.ts | 6 ++- .../sdk-metrics/src/state/MeterSharedState.ts | 10 ++--- .../sdk-metrics/src/state/MetricStorage.ts | 11 +++++- .../src/state/SyncMetricStorage.ts | 6 ++- .../src/state/TemporalMetricProcessor.ts | 25 ++++++++----- .../test/state/AsyncMetricStorage.test.ts | 37 +++++++------------ .../test/state/MetricStorageRegistry.test.ts | 3 +- .../test/state/SyncMetricStorage.test.ts | 31 ++++------------ .../state/TemporalMetricProcessor.test.ts | 23 +++--------- 10 files changed, 66 insertions(+), 86 deletions(-) create mode 100644 examples/otlp-exporter-node/test.js diff --git a/examples/otlp-exporter-node/test.js b/examples/otlp-exporter-node/test.js new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/sdk-metrics/src/state/AsyncMetricStorage.ts b/packages/sdk-metrics/src/state/AsyncMetricStorage.ts index 286874987c..a5cc005e89 100644 --- a/packages/sdk-metrics/src/state/AsyncMetricStorage.ts +++ b/packages/sdk-metrics/src/state/AsyncMetricStorage.ts @@ -66,17 +66,19 @@ export class AsyncMetricStorage> */ collect( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], collectionTime: HrTime ): Maybe { const accumulations = this._deltaMetricStorage.collect(); return this._temporalMetricStorage.buildMetrics( collector, - collectors, this._instrumentDescriptor, accumulations, collectionTime ); } + + registerCollector(collector: MetricCollectorHandle): void { + this._temporalMetricStorage.registerCollector(collector); + } } diff --git a/packages/sdk-metrics/src/state/MeterSharedState.ts b/packages/sdk-metrics/src/state/MeterSharedState.ts index 099a21c0d7..f5ea6fd8ed 100644 --- a/packages/sdk-metrics/src/state/MeterSharedState.ts +++ b/packages/sdk-metrics/src/state/MeterSharedState.ts @@ -91,11 +91,7 @@ export class MeterSharedState { this.metricStorageRegistry.getStorages(collector) ) .map(metricStorage => { - return metricStorage.collect( - collector, - this._meterProviderSharedState.metricCollectors, - collectionTime - ); + return metricStorage.collect(collector, collectionTime); }) .filter(isNotNullish); @@ -137,6 +133,9 @@ export class MeterSharedState { aggregator, view.attributesProcessor ) as R; + for (const collector of this._meterProviderSharedState.metricCollectors) { + viewStorage.registerCollector(collector); + } this.metricStorageRegistry.register(viewStorage); return viewStorage; }); @@ -161,6 +160,7 @@ export class MeterSharedState { aggregator, AttributesProcessor.Noop() ) as R; + storage.registerCollector(collector); this.metricStorageRegistry.registerForCollector(collector, storage); return storage; } diff --git a/packages/sdk-metrics/src/state/MetricStorage.ts b/packages/sdk-metrics/src/state/MetricStorage.ts index 5d02437f58..3fa2ee334c 100644 --- a/packages/sdk-metrics/src/state/MetricStorage.ts +++ b/packages/sdk-metrics/src/state/MetricStorage.ts @@ -39,10 +39,19 @@ export abstract class MetricStorage { */ abstract collect( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], collectionTime: HrTime ): Maybe; + /** + * Registers a collector that this storage will be used with. Failing to register + * a collector will result in dropped metrics. + * + * Note: Memory pressure may build if a collector is registered but does not collect + * on this instance. Once registered, ensure that `collect()` is called in reasonable intervals. + * @param collector + */ + abstract registerCollector(collector: MetricCollectorHandle): void; + getInstrumentDescriptor(): Readonly { return this._instrumentDescriptor; } diff --git a/packages/sdk-metrics/src/state/SyncMetricStorage.ts b/packages/sdk-metrics/src/state/SyncMetricStorage.ts index 0648b12728..257b9adffd 100644 --- a/packages/sdk-metrics/src/state/SyncMetricStorage.ts +++ b/packages/sdk-metrics/src/state/SyncMetricStorage.ts @@ -66,17 +66,19 @@ export class SyncMetricStorage> */ collect( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], collectionTime: HrTime ): Maybe { const accumulations = this._deltaMetricStorage.collect(); return this._temporalMetricStorage.buildMetrics( collector, - collectors, this._instrumentDescriptor, accumulations, collectionTime ); } + + registerCollector(collector: MetricCollectorHandle): void { + this._temporalMetricStorage.registerCollector(collector); + } } diff --git a/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts b/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts index 2b9c5dbbaa..d64921955f 100644 --- a/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts +++ b/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts @@ -63,6 +63,14 @@ export class TemporalMetricProcessor> { constructor(private _aggregator: Aggregator) {} + registerCollector(collector: MetricCollectorHandle) { + let stash = this._unreportedAccumulations.get(collector); + if (stash === undefined) { + stash = []; + this._unreportedAccumulations.set(collector, stash); + } + } + /** * Builds the {@link MetricData} streams to report against a specific MetricCollector. * @param collector The information of the MetricCollector. @@ -74,12 +82,11 @@ export class TemporalMetricProcessor> { */ buildMetrics( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], instrumentDescriptor: InstrumentDescriptor, currentAccumulations: AttributeHashMap, collectionTime: HrTime ): Maybe { - this._stashAccumulations(collectors, currentAccumulations); + this._stashAccumulations(currentAccumulations); const unreportedAccumulations = this._getMergedUnreportedAccumulations(collector); @@ -141,18 +148,16 @@ export class TemporalMetricProcessor> { ); } - private _stashAccumulations( - collectors: MetricCollectorHandle[], - currentAccumulation: AttributeHashMap - ) { - collectors.forEach(it => { - let stash = this._unreportedAccumulations.get(it); + private _stashAccumulations(currentAccumulation: AttributeHashMap) { + const registeredCollectors = this._unreportedAccumulations.keys(); + for (const collector of registeredCollectors) { + let stash = this._unreportedAccumulations.get(collector); if (stash === undefined) { stash = []; - this._unreportedAccumulations.set(it, stash); + this._unreportedAccumulations.set(collector, stash); } stash.push(currentAccumulation); - }); + } } private _getMergedUnreportedAccumulations(collector: MetricCollectorHandle) { diff --git a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts index 9fe742ca02..f1cea48f03 100644 --- a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts @@ -43,7 +43,6 @@ const cumulativeCollector: MetricCollectorHandle = { describe('AsyncMetricStorage', () => { describe('collect', () => { describe('Delta Collector', () => { - const collectors = [deltaCollector]; it('should collect and reset memos', async () => { const delegate = new ObservableCallbackDelegate(); const observableRegistry = new ObservableRegistry(); @@ -52,6 +51,8 @@ describe('AsyncMetricStorage', () => { new SumAggregator(true), new NoopAttributesProcessor() ); + metricStorage.registerCollector(deltaCollector); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -70,7 +71,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - collectors, collectionTime ); @@ -104,11 +104,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [1, 1]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectors, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 0); @@ -124,7 +120,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - collectors, collectionTime ); @@ -163,6 +158,8 @@ describe('AsyncMetricStorage', () => { new SumAggregator(true), new NoopAttributesProcessor() ); + metricStorage.registerCollector(deltaCollector); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -181,7 +178,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - collectors, collectionTime ); @@ -207,7 +203,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - collectors, collectionTime ); @@ -233,7 +228,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - collectors, collectionTime ); @@ -257,6 +251,8 @@ describe('AsyncMetricStorage', () => { new SumAggregator(false), new NoopAttributesProcessor() ); + metricStorage.registerCollector(deltaCollector); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -275,7 +271,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - collectors, collectionTime ); @@ -301,7 +296,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - collectors, collectionTime ); @@ -327,7 +321,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( deltaCollector, - collectors, collectionTime ); @@ -345,7 +338,6 @@ describe('AsyncMetricStorage', () => { }); describe('Cumulative Collector', () => { - const collectors = [cumulativeCollector]; it('should collect cumulative metrics', async () => { const delegate = new ObservableCallbackDelegate(); const observableRegistry = new ObservableRegistry(); @@ -354,6 +346,8 @@ describe('AsyncMetricStorage', () => { new SumAggregator(true), new NoopAttributesProcessor() ); + metricStorage.registerCollector(cumulativeCollector); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -373,7 +367,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -410,7 +403,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -449,7 +441,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -487,6 +478,8 @@ describe('AsyncMetricStorage', () => { new SumAggregator(true), new NoopAttributesProcessor() ); + metricStorage.registerCollector(cumulativeCollector); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -505,7 +498,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -531,7 +523,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -558,7 +549,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -582,6 +572,8 @@ describe('AsyncMetricStorage', () => { new SumAggregator(false), new NoopAttributesProcessor() ); + metricStorage.registerCollector(cumulativeCollector); + const observable = new ObservableInstrument( defaultInstrumentDescriptor, [metricStorage], @@ -600,7 +592,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -626,7 +617,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); @@ -652,7 +642,6 @@ describe('AsyncMetricStorage', () => { await observableRegistry.observe(collectionTime); const metric = metricStorage.collect( cumulativeCollector, - collectors, collectionTime ); diff --git a/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts b/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts index 26a48a0ba3..f9fe744b97 100644 --- a/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts +++ b/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts @@ -33,11 +33,12 @@ import { class TestMetricStorage extends MetricStorage { collect( collector: MetricCollectorHandle, - collectors: MetricCollectorHandle[], collectionTime: HrTime ): Maybe { return undefined; } + + registerCollector(collector: MetricCollectorHandle): void {} } describe('MetricStorageRegistry', () => { diff --git a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts index 8e568be19e..fdd4e99903 100644 --- a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts @@ -58,22 +58,19 @@ describe('SyncMetricStorage', () => { describe('collect', () => { describe('Delta Collector', () => { - const collectors = [deltaCollector]; it('should collect and reset memos', async () => { const metricStorage = new SyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), new NoopAttributesProcessor() ); + metricStorage.registerCollector(deltaCollector); + metricStorage.record(1, {}, api.context.active(), [0, 0]); metricStorage.record(2, {}, api.context.active(), [1, 1]); metricStorage.record(3, {}, api.context.active(), [2, 2]); { - const metric = metricStorage.collect( - deltaCollector, - collectors, - [3, 3] - ); + const metric = metricStorage.collect(deltaCollector, [3, 3]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -82,11 +79,7 @@ describe('SyncMetricStorage', () => { // The attributes should not be memorized. { - const metric = metricStorage.collect( - deltaCollector, - collectors, - [4, 4] - ); + const metric = metricStorage.collect(deltaCollector, [4, 4]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 0); @@ -94,11 +87,7 @@ describe('SyncMetricStorage', () => { metricStorage.record(1, {}, api.context.active(), [5, 5]); { - const metric = metricStorage.collect( - deltaCollector, - [deltaCollector], - [6, 6] - ); + const metric = metricStorage.collect(deltaCollector, [6, 6]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -108,22 +97,18 @@ describe('SyncMetricStorage', () => { }); describe('Cumulative Collector', () => { - const collectors = [cumulativeCollector]; it('should collect cumulative metrics', async () => { const metricStorage = new SyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), new NoopAttributesProcessor() ); + metricStorage.registerCollector(cumulativeCollector); metricStorage.record(1, {}, api.context.active(), [0, 0]); metricStorage.record(2, {}, api.context.active(), [1, 1]); metricStorage.record(3, {}, api.context.active(), [2, 2]); { - const metric = metricStorage.collect( - cumulativeCollector, - collectors, - [3, 3] - ); + const metric = metricStorage.collect(cumulativeCollector, [3, 3]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -134,7 +119,6 @@ describe('SyncMetricStorage', () => { { const metric = metricStorage.collect( cumulativeCollector, - collectors, [4, 4] ); @@ -147,7 +131,6 @@ describe('SyncMetricStorage', () => { { const metric = metricStorage.collect( cumulativeCollector, - collectors, [6, 6] ); diff --git a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts index 8d0f165cd0..cd33c5bfc6 100644 --- a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts +++ b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts @@ -48,7 +48,6 @@ describe('TemporalMetricProcessor', () => { describe('buildMetrics', () => { describe('single delta collector', () => { - const collectors = [deltaCollector1]; it('should build delta recording metrics', () => { const spy = sinon.spy(deltaCollector1, 'selectAggregationTemporality'); @@ -56,12 +55,11 @@ describe('TemporalMetricProcessor', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - + temporalMetricStorage.registerCollector(deltaCollector1); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [2, 2] @@ -81,7 +79,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [4, 4] @@ -101,7 +98,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [5, 5] @@ -122,18 +118,17 @@ describe('TemporalMetricProcessor', () => { }); describe('two delta collector', () => { - const collectors = [deltaCollector1, deltaCollector2]; - it('should build delta recording metrics', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + temporalMetricStorage.registerCollector(deltaCollector1); + temporalMetricStorage.registerCollector(deltaCollector2); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [2, 2] @@ -152,7 +147,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( deltaCollector2, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [3, 3] @@ -171,7 +165,6 @@ describe('TemporalMetricProcessor', () => { }); describe('single cumulative collector', () => { - const collectors = [cumulativeCollector1]; it('should build delta recording metrics', () => { const spy = sinon.spy( cumulativeCollector1, @@ -181,12 +174,11 @@ describe('TemporalMetricProcessor', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - + temporalMetricStorage.registerCollector(cumulativeCollector1); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [2, 2] @@ -206,7 +198,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [4, 4] @@ -228,17 +219,17 @@ describe('TemporalMetricProcessor', () => { }); describe('cumulative collector with delta collector', () => { - const collectors = [deltaCollector1, cumulativeCollector1]; it('should build delta recording metrics', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); const temporalMetricStorage = new TemporalMetricProcessor(aggregator); + temporalMetricStorage.registerCollector(cumulativeCollector1); + temporalMetricStorage.registerCollector(deltaCollector1); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [2, 2] @@ -258,7 +249,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( deltaCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [4, 4] @@ -276,7 +266,6 @@ describe('TemporalMetricProcessor', () => { { const metric = temporalMetricStorage.buildMetrics( cumulativeCollector1, - collectors, defaultInstrumentDescriptor, deltaMetricStorage.collect(), [5, 5] From a10ddf1820a72fb97438ed4777af768ddbc882cc Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Tue, 26 Sep 2023 13:49:39 +0200 Subject: [PATCH 2/6] fix(sdk-metrics): move adding collector handles to the constructor --- .../src/state/AsyncMetricStorage.ts | 12 ++++----- .../sdk-metrics/src/state/MeterSharedState.ts | 13 +++++----- .../sdk-metrics/src/state/MetricStorage.ts | 10 -------- .../src/state/SyncMetricStorage.ts | 12 ++++----- .../src/state/TemporalMetricProcessor.ts | 15 ++++++----- .../test/state/AsyncMetricStorage.test.ts | 24 +++++++++--------- .../test/state/SyncMetricStorage.test.ts | 11 ++++---- .../state/TemporalMetricProcessor.test.ts | 25 +++++++++++-------- 8 files changed, 58 insertions(+), 64 deletions(-) diff --git a/packages/sdk-metrics/src/state/AsyncMetricStorage.ts b/packages/sdk-metrics/src/state/AsyncMetricStorage.ts index a5cc005e89..6bebafdc1f 100644 --- a/packages/sdk-metrics/src/state/AsyncMetricStorage.ts +++ b/packages/sdk-metrics/src/state/AsyncMetricStorage.ts @@ -42,11 +42,15 @@ export class AsyncMetricStorage> constructor( _instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator, - private _attributesProcessor: AttributesProcessor + private _attributesProcessor: AttributesProcessor, + collectorHandles: MetricCollectorHandle[] ) { super(_instrumentDescriptor); this._deltaMetricStorage = new DeltaMetricProcessor(aggregator); - this._temporalMetricStorage = new TemporalMetricProcessor(aggregator); + this._temporalMetricStorage = new TemporalMetricProcessor( + aggregator, + collectorHandles + ); } record(measurements: AttributeHashMap, observationTime: HrTime) { @@ -77,8 +81,4 @@ export class AsyncMetricStorage> collectionTime ); } - - registerCollector(collector: MetricCollectorHandle): void { - this._temporalMetricStorage.registerCollector(collector); - } } diff --git a/packages/sdk-metrics/src/state/MeterSharedState.ts b/packages/sdk-metrics/src/state/MeterSharedState.ts index f5ea6fd8ed..9ae67ecb8c 100644 --- a/packages/sdk-metrics/src/state/MeterSharedState.ts +++ b/packages/sdk-metrics/src/state/MeterSharedState.ts @@ -131,11 +131,9 @@ export class MeterSharedState { const viewStorage = new MetricStorageType( viewDescriptor, aggregator, - view.attributesProcessor + view.attributesProcessor, + this._meterProviderSharedState.metricCollectors ) as R; - for (const collector of this._meterProviderSharedState.metricCollectors) { - viewStorage.registerCollector(collector); - } this.metricStorageRegistry.register(viewStorage); return viewStorage; }); @@ -158,9 +156,9 @@ export class MeterSharedState { const storage = new MetricStorageType( descriptor, aggregator, - AttributesProcessor.Noop() + AttributesProcessor.Noop(), + [collector] ) as R; - storage.registerCollector(collector); this.metricStorageRegistry.registerForCollector(collector, storage); return storage; } @@ -181,6 +179,7 @@ interface MetricStorageConstructor { new ( instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator>, - attributesProcessor: AttributesProcessor + attributesProcessor: AttributesProcessor, + collectors: MetricCollectorHandle[] ): MetricStorage; } diff --git a/packages/sdk-metrics/src/state/MetricStorage.ts b/packages/sdk-metrics/src/state/MetricStorage.ts index 3fa2ee334c..32a48391da 100644 --- a/packages/sdk-metrics/src/state/MetricStorage.ts +++ b/packages/sdk-metrics/src/state/MetricStorage.ts @@ -42,16 +42,6 @@ export abstract class MetricStorage { collectionTime: HrTime ): Maybe; - /** - * Registers a collector that this storage will be used with. Failing to register - * a collector will result in dropped metrics. - * - * Note: Memory pressure may build if a collector is registered but does not collect - * on this instance. Once registered, ensure that `collect()` is called in reasonable intervals. - * @param collector - */ - abstract registerCollector(collector: MetricCollectorHandle): void; - getInstrumentDescriptor(): Readonly { return this._instrumentDescriptor; } diff --git a/packages/sdk-metrics/src/state/SyncMetricStorage.ts b/packages/sdk-metrics/src/state/SyncMetricStorage.ts index 257b9adffd..bb546e1271 100644 --- a/packages/sdk-metrics/src/state/SyncMetricStorage.ts +++ b/packages/sdk-metrics/src/state/SyncMetricStorage.ts @@ -41,11 +41,15 @@ export class SyncMetricStorage> constructor( instrumentDescriptor: InstrumentDescriptor, aggregator: Aggregator, - private _attributesProcessor: AttributesProcessor + private _attributesProcessor: AttributesProcessor, + collectorHandles: MetricCollectorHandle[] ) { super(instrumentDescriptor); this._deltaMetricStorage = new DeltaMetricProcessor(aggregator); - this._temporalMetricStorage = new TemporalMetricProcessor(aggregator); + this._temporalMetricStorage = new TemporalMetricProcessor( + aggregator, + collectorHandles + ); } record( @@ -77,8 +81,4 @@ export class SyncMetricStorage> collectionTime ); } - - registerCollector(collector: MetricCollectorHandle): void { - this._temporalMetricStorage.registerCollector(collector); - } } diff --git a/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts b/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts index d64921955f..2a2861dda5 100644 --- a/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts +++ b/packages/sdk-metrics/src/state/TemporalMetricProcessor.ts @@ -61,14 +61,13 @@ export class TemporalMetricProcessor> { LastReportedHistory >(); - constructor(private _aggregator: Aggregator) {} - - registerCollector(collector: MetricCollectorHandle) { - let stash = this._unreportedAccumulations.get(collector); - if (stash === undefined) { - stash = []; - this._unreportedAccumulations.set(collector, stash); - } + constructor( + private _aggregator: Aggregator, + collectorHandles: MetricCollectorHandle[] + ) { + collectorHandles.forEach(handle => { + this._unreportedAccumulations.set(handle, []); + }); } /** diff --git a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts index f1cea48f03..fc4f75b872 100644 --- a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts @@ -49,9 +49,9 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [deltaCollector] ); - metricStorage.registerCollector(deltaCollector); const observable = new ObservableInstrument( defaultInstrumentDescriptor, @@ -156,9 +156,9 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [deltaCollector] ); - metricStorage.registerCollector(deltaCollector); const observable = new ObservableInstrument( defaultInstrumentDescriptor, @@ -249,9 +249,9 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(false), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [deltaCollector] ); - metricStorage.registerCollector(deltaCollector); const observable = new ObservableInstrument( defaultInstrumentDescriptor, @@ -344,9 +344,9 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [cumulativeCollector] ); - metricStorage.registerCollector(cumulativeCollector); const observable = new ObservableInstrument( defaultInstrumentDescriptor, @@ -476,9 +476,9 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [cumulativeCollector] ); - metricStorage.registerCollector(cumulativeCollector); const observable = new ObservableInstrument( defaultInstrumentDescriptor, @@ -570,9 +570,9 @@ describe('AsyncMetricStorage', () => { const metricStorage = new AsyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(false), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [cumulativeCollector] ); - metricStorage.registerCollector(cumulativeCollector); const observable = new ObservableInstrument( defaultInstrumentDescriptor, diff --git a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts index fdd4e99903..32d3e10b57 100644 --- a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts @@ -45,7 +45,8 @@ describe('SyncMetricStorage', () => { const metricStorage = new SyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [] ); for (const value of commonValues) { @@ -62,9 +63,9 @@ describe('SyncMetricStorage', () => { const metricStorage = new SyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [deltaCollector] ); - metricStorage.registerCollector(deltaCollector); metricStorage.record(1, {}, api.context.active(), [0, 0]); metricStorage.record(2, {}, api.context.active(), [1, 1]); @@ -101,9 +102,9 @@ describe('SyncMetricStorage', () => { const metricStorage = new SyncMetricStorage( defaultInstrumentDescriptor, new SumAggregator(true), - new NoopAttributesProcessor() + new NoopAttributesProcessor(), + [cumulativeCollector] ); - metricStorage.registerCollector(cumulativeCollector); metricStorage.record(1, {}, api.context.active(), [0, 0]); metricStorage.record(2, {}, api.context.active(), [1, 1]); metricStorage.record(3, {}, api.context.active(), [2, 2]); diff --git a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts index cd33c5bfc6..c116db6991 100644 --- a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts +++ b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts @@ -54,8 +54,9 @@ describe('TemporalMetricProcessor', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); - const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - temporalMetricStorage.registerCollector(deltaCollector1); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator, [ + deltaCollector1, + ]); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( @@ -121,9 +122,10 @@ describe('TemporalMetricProcessor', () => { it('should build delta recording metrics', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); - const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - temporalMetricStorage.registerCollector(deltaCollector1); - temporalMetricStorage.registerCollector(deltaCollector2); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator, [ + deltaCollector1, + deltaCollector2, + ]); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { @@ -173,8 +175,10 @@ describe('TemporalMetricProcessor', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); - const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - temporalMetricStorage.registerCollector(cumulativeCollector1); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator, [ + cumulativeCollector1, + ]); + deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { const metric = temporalMetricStorage.buildMetrics( @@ -222,9 +226,10 @@ describe('TemporalMetricProcessor', () => { it('should build delta recording metrics', () => { const aggregator = new SumAggregator(true); const deltaMetricStorage = new DeltaMetricProcessor(aggregator); - const temporalMetricStorage = new TemporalMetricProcessor(aggregator); - temporalMetricStorage.registerCollector(cumulativeCollector1); - temporalMetricStorage.registerCollector(deltaCollector1); + const temporalMetricStorage = new TemporalMetricProcessor(aggregator, [ + cumulativeCollector1, + deltaCollector1, + ]); deltaMetricStorage.record(1, {}, api.context.active(), [1, 1]); { From 6b0b62f3159befdb565469774811565026608184 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Tue, 26 Sep 2023 13:53:21 +0200 Subject: [PATCH 3/6] fix: lint --- .../test/state/AsyncMetricStorage.test.ts | 40 ++++--------------- .../test/state/SyncMetricStorage.test.ts | 10 +---- .../state/TemporalMetricProcessor.test.ts | 1 - 3 files changed, 10 insertions(+), 41 deletions(-) diff --git a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts index fc4f75b872..38a25df39b 100644 --- a/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/AsyncMetricStorage.test.ts @@ -69,10 +69,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [0, 0]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 3); @@ -118,10 +115,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [2, 2]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 3); @@ -176,10 +170,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [0, 0]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -201,10 +192,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [1, 1]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -226,10 +214,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [2, 2]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -269,10 +254,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [0, 0]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -294,10 +276,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [0, 0]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -319,10 +298,7 @@ describe('AsyncMetricStorage', () => { { const collectionTime: HrTime = [2, 2]; await observableRegistry.observe(collectionTime); - const metric = metricStorage.collect( - deltaCollector, - collectionTime - ); + const metric = metricStorage.collect(deltaCollector, collectionTime); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); diff --git a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts index 32d3e10b57..5f59aa6197 100644 --- a/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts +++ b/packages/sdk-metrics/test/state/SyncMetricStorage.test.ts @@ -118,10 +118,7 @@ describe('SyncMetricStorage', () => { // The attributes should be memorized. { - const metric = metricStorage.collect( - cumulativeCollector, - [4, 4] - ); + const metric = metricStorage.collect(cumulativeCollector, [4, 4]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); @@ -130,10 +127,7 @@ describe('SyncMetricStorage', () => { metricStorage.record(1, {}, api.context.active(), [5, 5]); { - const metric = metricStorage.collect( - cumulativeCollector, - [6, 6] - ); + const metric = metricStorage.collect(cumulativeCollector, [6, 6]); assertMetricData(metric, DataPointType.SUM); assert.strictEqual(metric.dataPoints.length, 1); diff --git a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts index c116db6991..5f5d0785ab 100644 --- a/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts +++ b/packages/sdk-metrics/test/state/TemporalMetricProcessor.test.ts @@ -48,7 +48,6 @@ describe('TemporalMetricProcessor', () => { describe('buildMetrics', () => { describe('single delta collector', () => { - it('should build delta recording metrics', () => { const spy = sinon.spy(deltaCollector1, 'selectAggregationTemporality'); From a71ad744bae34098e7f42c19da407c6805c9e171 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Tue, 26 Sep 2023 13:58:33 +0200 Subject: [PATCH 4/6] fix: changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad5c780983..9b3e8dc8d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/ ### :bug: (Bug Fix) * fix(sdk-metrics): allow instrument names to contain '/' [#4155](https://github.com/open-telemetry/opentelemetry-js/pull/4155) +* fix(sdk-metrics): prevent per-reader storages from keeping unreported accumulations in memory [#4163](https://github.com/open-telemetry/opentelemetry-js/pull/4163) @pichlermarc + * fixes a memory leak which occurred when two or more `MetricReader` instances are registered to a `MeterProvider` ### :books: (Refine Doc) From fedd5ceb6216a900a869f80c052e87de69c45d48 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Tue, 26 Sep 2023 13:59:42 +0200 Subject: [PATCH 5/6] fix: cleanup --- examples/otlp-exporter-node/test.js | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 examples/otlp-exporter-node/test.js diff --git a/examples/otlp-exporter-node/test.js b/examples/otlp-exporter-node/test.js deleted file mode 100644 index e69de29bb2..0000000000 From d56ed321b9d96b22a2bef14fefbba8407f457233 Mon Sep 17 00:00:00 2001 From: Marc Pichler Date: Tue, 10 Oct 2023 15:18:15 +0200 Subject: [PATCH 6/6] fix: remove unused test code --- packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts b/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts index f9fe744b97..55ef806511 100644 --- a/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts +++ b/packages/sdk-metrics/test/state/MetricStorageRegistry.test.ts @@ -37,8 +37,6 @@ class TestMetricStorage extends MetricStorage { ): Maybe { return undefined; } - - registerCollector(collector: MetricCollectorHandle): void {} } describe('MetricStorageRegistry', () => {