diff --git a/src/Promitor.Integrations.Sinks.OpenTelemetry/OpenTelemetryCollectorMetricSink.cs b/src/Promitor.Integrations.Sinks.OpenTelemetry/OpenTelemetryCollectorMetricSink.cs index 03ddf8df3..cba785007 100644 --- a/src/Promitor.Integrations.Sinks.OpenTelemetry/OpenTelemetryCollectorMetricSink.cs +++ b/src/Promitor.Integrations.Sinks.OpenTelemetry/OpenTelemetryCollectorMetricSink.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; using System.Diagnostics.Metrics; using System.Linq; -using System.Threading.Channels; +using System.Threading; using System.Threading.Tasks; using GuardNet; using Microsoft.Extensions.Logging; @@ -17,11 +17,11 @@ namespace Promitor.Integrations.Sinks.OpenTelemetry public class OpenTelemetryCollectorMetricSink : MetricSink, IMetricSink { private readonly ILogger _logger; - private static readonly Meter azureMonitorMeter = new("Promitor.Scraper.Metrics.AzureMonitor", "1.0"); + private static readonly Meter azureMonitorMeter = new Meter("Promitor.Scraper.Metrics.AzureMonitor", "1.0"); public MetricSinkType Type => MetricSinkType.OpenTelemetryCollector; - public OpenTelemetryCollectorMetricSink(IMetricsDeclarationProvider metricsDeclarationProvider, ILogger logger) + public OpenTelemetryCollectorMetricSink(IMetricsDeclarationProvider metricsDeclarationProvider, ILogger logger) : base(metricsDeclarationProvider, logger) { Guard.NotNull(logger, nameof(logger)); @@ -40,24 +40,24 @@ public async Task ReportMetricAsync(string metricName, string metricDescription, foreach (var measuredMetric in scrapeResult.MetricValues) { var metricValue = measuredMetric.Value ?? 0; - var metricLabels = DetermineLabels(metricName, scrapeResult, measuredMetric); var reportMetricTask = ReportMetricAsync(metricName, metricDescription, metricValue, metricLabels); + reportMetricTasks.Add(reportMetricTask); } await Task.WhenAll(reportMetricTasks); } - private readonly ConcurrentDictionary> _gauges = new(); - private readonly ConcurrentDictionary>> _measurements = new(); + private readonly ConcurrentDictionary> _gauges = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary>> _measurements = new ConcurrentDictionary>>(); - public async Task ReportMetricAsync(string metricName, string metricDescription, double metricValue, Dictionary labels) + public Task ReportMetricAsync(string metricName, string metricDescription, double metricValue, Dictionary labels) { Guard.NotNullOrEmpty(metricName, nameof(metricName)); - // TODO: Move to factory instead? + // TODO: Move to factory instead? if (_gauges.ContainsKey(metricName) == false) { InitializeNewMetric(metricName, metricDescription); @@ -65,41 +65,28 @@ public async Task ReportMetricAsync(string metricName, string metricDescription, var composedTags = labels.Select(kvp => new KeyValuePair(kvp.Key, kvp.Value)).ToArray(); var newMeasurement = new Measurement(metricValue, composedTags); - var channelWriter = _measurements[metricName].Writer; - await channelWriter.WriteAsync(newMeasurement); - - _logger.LogTrace("Metric {MetricName} with value {MetricValue} was pushed to OpenTelemetry Collector", metricName, metricValue); + _measurements[metricName].Add(newMeasurement); + + _logger.LogTrace("Metric {MetricName} with value {MetricValue} and labels {Labels} was pushed to OpenTelemetry Collector", metricName, metricValue, composedTags); + + return Task.CompletedTask; } private void InitializeNewMetric(string metricName, string metricDescription) { - var gauge = azureMonitorMeter.CreateObservableGauge(metricName, description: metricDescription, observeValues: () => ReportMeasurementsForMetricAsync(metricName).Result); + var gauge = azureMonitorMeter.CreateObservableGauge(metricName, description: metricDescription, observeValues: () => ReportMeasurementsForMetric(metricName)); _gauges.TryAdd(metricName, gauge); - _measurements.TryAdd(metricName, CreateNewMeasurementChannel()); + _measurements.TryAdd(metricName, []); } - private async Task>> ReportMeasurementsForMetricAsync(string metricName) + private IEnumerable> ReportMeasurementsForMetric(string metricName) { - List> measurementsToReport = new List>(); - var channel = _measurements[metricName]; - - var totalCount = channel.Reader.Count; - var readItems = 0; - do - { - var item = await channel.Reader.ReadAsync(); - measurementsToReport.Add(item); - readItems++; - } - while (readItems < totalCount); + var recordedMeasurements = _measurements[metricName]; - return measurementsToReport; - } + var measurementsToReport = Interlocked.Exchange(ref recordedMeasurements, []); - static Channel> CreateNewMeasurementChannel() - { - return Channel.CreateUnbounded>(); + return measurementsToReport; } } -} +} \ No newline at end of file diff --git a/src/Promitor.Tests.Integration/Clients/PrometheusClientFactory.cs b/src/Promitor.Tests.Integration/Clients/PrometheusClientFactory.cs index eb5feeccf..d5827a3d6 100644 --- a/src/Promitor.Tests.Integration/Clients/PrometheusClientFactory.cs +++ b/src/Promitor.Tests.Integration/Clients/PrometheusClientFactory.cs @@ -17,7 +17,7 @@ internal PrometheusClient CreateForOpenTelemetryCollector(IConfiguration configu var baseUri = configuration["OpenTelemetry:Collector:Uri"]; var metricNamespace = configuration["OpenTelemetry:Collector:MetricNamespace"]; - _logger.LogInformation("Creating Prometheus client for {BaseUri}/metrics with metric namespace {metricNamespace}", baseUri, metricNamespace); + _logger.LogWarning("Creating Prometheus client for {BaseUri}/metrics with metric namespace {metricNamespace}", baseUri, metricNamespace); return new PrometheusClient(baseUri, "/metrics", metricNamespace, _logger); }