From 69aef4967f8f12304b46e237fc8cfc0c631d82be Mon Sep 17 00:00:00 2001 From: David Luna Date: Tue, 6 Feb 2024 15:52:14 +0100 Subject: [PATCH] chore: add views for host-metrics --- packages/opentelemetry-node/lib/metrics.js | 202 +++++++++++++++++++++ packages/opentelemetry-node/lib/sdk.js | 20 ++ 2 files changed, 222 insertions(+) create mode 100644 packages/opentelemetry-node/lib/metrics.js diff --git a/packages/opentelemetry-node/lib/metrics.js b/packages/opentelemetry-node/lib/metrics.js new file mode 100644 index 00000000..c8e806a8 --- /dev/null +++ b/packages/opentelemetry-node/lib/metrics.js @@ -0,0 +1,202 @@ +/** + * Type imports + */ +/** + * @typedef {import('@opentelemetry/api').HrTime} HrTime + */ +/** + * @template T + * @typedef {import('@opentelemetry/sdk-metrics/build/src/aggregator/types').Aggregator} Aggregator + */ +/** + * @template T + * @typedef {import('@opentelemetry/sdk-metrics/build/src/aggregator/types').AccumulationRecord} AccumulationRecord + */ +/** + * @typedef {import('@opentelemetry/sdk-metrics').LastValueAggregation} LastValueAggregation + * @typedef {import('@opentelemetry/sdk-metrics/build/src/aggregator/types').Accumulation} Accumulation + * @typedef {import('@opentelemetry/sdk-metrics').MetricDescriptor} MetricDescriptor + * @typedef {import('@opentelemetry/sdk-metrics').AggregationTemporality} AggregationTemporality + * @typedef {import('@opentelemetry/sdk-metrics').GaugeMetricData} GaugeMetricData + */ + +const {millisToHrTime, hrTimeToMicroseconds} = require('@opentelemetry/core'); +const {DataPointType} = require('@opentelemetry/sdk-metrics'); + +/** + * @class + * @implements {Accumulation} + */ +class CpuNumberValueAccumulation { + /** + * + * @param {HrTime} startTime + * @param {number} [current] + * @param {HrTime} [sampleTime] + */ + constructor(startTime, current, sampleTime) { + this.startTime = startTime; + this._current = current || 0; + this.sampleTime = sampleTime || millisToHrTime(Date.now()); + } + + /** + * @param {number} value + */ + record(value) { + console.log('record', value); + this._current = value; + this.sampleTime = millisToHrTime(Date.now()); + } + + /** + * @param {HrTime} startTime + */ + setStartTime(startTime) { + this.startTime = startTime; + } + + /** + * @returns {number} + */ + toPointValue() { + return this._current; + } +} + +/** + * @class + * @implements {Aggregator} + */ +class CpuUtilizationAggregation { + kind = 2; + + /** + * + * @param {HrTime} startTime + * @returns + */ + createAccumulation(startTime) { + return new CpuNumberValueAccumulation(startTime); + } + + /** + * Return the newly captured (delta) accumulation for CpuUtilizationAggregation. + * + * @param {CpuNumberValueAccumulation} previous + * @param {CpuNumberValueAccumulation} delta + * @returns {CpuNumberValueAccumulation} + */ + merge(previous, delta) { + // nanoseconds may lose precisions. + const latestAccumulation = + hrTimeToMicroseconds(delta.sampleTime) >= + hrTimeToMicroseconds(previous.sampleTime) + ? delta + : previous; + return new CpuNumberValueAccumulation( + previous.startTime, + latestAccumulation.toPointValue(), + latestAccumulation.sampleTime + ); + } + + /** + * A delta aggregation is not meaningful to CpuUtilizationAggregation, just return + * the newly captured (delta) accumulation for CpuUtilizationAggregation. + * + * @param {CpuNumberValueAccumulation} previous + * @param {CpuNumberValueAccumulation} current + * @returns {CpuNumberValueAccumulation} + */ + diff(previous, current) { + // nanoseconds may lose precisions. + const latestAccumulation = + hrTimeToMicroseconds(current.sampleTime) >= + hrTimeToMicroseconds(previous.sampleTime) + ? current + : previous; + return new CpuNumberValueAccumulation( + current.startTime, + latestAccumulation.toPointValue(), + latestAccumulation.sampleTime + ); + } + + /** + * + * @param {MetricDescriptor} descriptor + * @param {AggregationTemporality} aggregationTemporality + * @param {AccumulationRecord[]} accumulationByAttributes + * @param {HrTime} endTime + * @returns {GaugeMetricData | undefined} + */ + toMetricData( + descriptor, + aggregationTemporality, + accumulationByAttributes, + endTime + ) { + console.log( + 'toMetricData', + descriptor, + aggregationTemporality, + endTime + ); + // Accumulate bhy cpu state and calculate the average + /** @type {Map} */ + const stateValues = new Map(); + accumulationByAttributes.forEach(([attributes, accumulation]) => { + const key = `${attributes['system.cpu.state']}`; + const val = accumulation.toPointValue(); + if (stateValues.has(key)) { + stateValues.get(key).push(val); + } else { + stateValues.set(key, [val]); + } + }); + + const startTime = accumulationByAttributes[0][1].startTime; + + return { + descriptor, + aggregationTemporality, + dataPointType: DataPointType.GAUGE, + dataPoints: Array.from(stateValues.entries()).map( + ([state, values]) => { + return { + attributes: { + 'system.cpu.state': state, + 'system.cpu.logical_number': 0, + }, + startTime, + endTime, + value: + values.reduce((sum, val) => sum + val, 0) / + values.length, + }; + } + ), + }; + + // return { + // descriptor, + // aggregationTemporality, + // dataPointType: DataPointType.GAUGE, + // dataPoints: accumulationByAttributes.map( + // ([attributes, accumulation]) => { + // return { + // attributes, + // startTime: accumulation.startTime, + // endTime, + // value: accumulation.toPointValue(), + // }; + // } + // ), + // }; + } +} + +module.exports = { + CpuUtilizationAggregation, +}; diff --git a/packages/opentelemetry-node/lib/sdk.js b/packages/opentelemetry-node/lib/sdk.js index aa5987e8..9441f240 100644 --- a/packages/opentelemetry-node/lib/sdk.js +++ b/packages/opentelemetry-node/lib/sdk.js @@ -2,6 +2,7 @@ const { OTLPMetricExporter, } = require('@opentelemetry/exporter-metrics-otlp-proto'); const {metrics, NodeSDK} = require('@opentelemetry/sdk-node'); +const {View, DropAggregation} = require('@opentelemetry/sdk-metrics'); const { envDetectorSync, hostDetectorSync, @@ -17,6 +18,8 @@ const {setupLogger} = require('./logging'); const {distroDetectorSync} = require('./detector'); const {setupEnvironment, restoreEnvironment} = require('./environment'); +const {CpuUtilizationAggregation} = require('./metrics'); + /** * @typedef {Partial} PartialNodeSDKConfiguration */ @@ -74,6 +77,23 @@ class ElasticNodeSDK extends NodeSDK { exportIntervalMillis: metricsInterval, exportTimeoutMillis: metricsInterval, // TODO same val appropriate for timeout? }); + // Add views for `host-metrics` to avoid excess of data being sent to the server + defaultConfig.views = [ + // TODO: drop network metrics for now + new View({ + instrumentName: 'system.network.*', + aggregation: new DropAggregation(), + }), + new View({ + instrumentName: 'system.cpu.utilization', + aggregation: { + createAggregator(instrument) { + console.log('creating Aggregator', instrument); + return new CpuUtilizationAggregation(); + }, + }, + }), + ]; } const configuration = Object.assign(defaultConfig, opts);