Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix Broken OTEL Sink #2567

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
69d0510
Raise logging level
hkfgo Oct 31, 2024
859fb13
Add logging on gauge callback
hkfgo Oct 31, 2024
d24a57e
wait longer to collect scraper image logs
hkfgo Oct 31, 2024
f7b4f6b
turn on debug logging on collector
hkfgo Oct 31, 2024
8e28b67
Fail CI fast and show collector logs
hkfgo Oct 31, 2024
4b04676
Fail CI fast and show collector logs
hkfgo Nov 1, 2024
a98ff34
raise metric expiration time window
hkfgo Nov 1, 2024
36ec1b6
more logging
hkfgo Nov 1, 2024
0781565
keep shortening timeout
hkfgo Nov 1, 2024
27ac16a
make gauge reader run more frequently
hkfgo Nov 1, 2024
ae62446
Add console exporter
hkfgo Nov 1, 2024
9c8285e
add test gauge
hkfgo Nov 1, 2024
8d70e68
remove add transient call
hkfgo Nov 1, 2024
72d0bdd
forego OTEL system metrics
hkfgo Nov 6, 2024
90c3f81
revert to earliest sink version
hkfgo Nov 6, 2024
544c3b3
remove otel namespace
hkfgo Nov 6, 2024
a7f664e
remove otel namespace
hkfgo Nov 6, 2024
6ffd48e
rollback some CI params
hkfgo Nov 6, 2024
f7ee7e6
move to later version of test logger
hkfgo Nov 7, 2024
b204257
move to later version of test logger
hkfgo Nov 7, 2024
2050a8a
move to later version of test logger
hkfgo Nov 7, 2024
2659610
move to later version of test logger
hkfgo Nov 7, 2024
801bb01
Init Prometheus with logger
hkfgo Nov 7, 2024
91b03a6
enable otel system metrics again
hkfgo Nov 7, 2024
012f6a4
log labels being pushed
hkfgo Nov 7, 2024
f9e29b8
log labels being pushed
hkfgo Nov 7, 2024
1daa4cc
try providing all labels in OTEL sink
hkfgo Nov 7, 2024
d85255c
Don't show collector logs
hkfgo Nov 7, 2024
43fe8fb
revert irrelevant chagnes
hkfgo Nov 8, 2024
3fab511
remove unused imports
hkfgo Nov 11, 2024
bbd679e
revert integration test changes
hkfgo Nov 11, 2024
22c592b
fight resharper
hkfgo Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Threading.Channels;
using System.Threading;
using System.Threading.Tasks;
using GuardNet;
using Microsoft.Extensions.Logging;
Expand All @@ -17,11 +17,11 @@ namespace Promitor.Integrations.Sinks.OpenTelemetry
public class OpenTelemetryCollectorMetricSink : MetricSink, IMetricSink
{
private readonly ILogger<OpenTelemetryCollectorMetricSink> _logger;
private static readonly Meter azureMonitorMeter = new("Promitor.Scraper.Metrics.AzureMonitor", "1.0");
private static readonly Meter azureMonitorMeter = new Meter("Promitor.Scraper.Metrics.AzureMonitor", "1.0");

public MetricSinkType Type => MetricSinkType.OpenTelemetryCollector;

public OpenTelemetryCollectorMetricSink(IMetricsDeclarationProvider metricsDeclarationProvider, ILogger<OpenTelemetryCollectorMetricSink> logger)
public OpenTelemetryCollectorMetricSink(IMetricsDeclarationProvider metricsDeclarationProvider, ILogger<OpenTelemetryCollectorMetricSink> logger)
: base(metricsDeclarationProvider, logger)
{
Guard.NotNull(logger, nameof(logger));
Expand All @@ -40,66 +40,53 @@ public async Task ReportMetricAsync(string metricName, string metricDescription,
foreach (var measuredMetric in scrapeResult.MetricValues)
{
var metricValue = measuredMetric.Value ?? 0;

var metricLabels = DetermineLabels(metricName, scrapeResult, measuredMetric);

var reportMetricTask = ReportMetricAsync(metricName, metricDescription, metricValue, metricLabels);

reportMetricTasks.Add(reportMetricTask);
}

await Task.WhenAll(reportMetricTasks);
}

private readonly ConcurrentDictionary<string, ObservableGauge<double>> _gauges = new();
private readonly ConcurrentDictionary<string, Channel<Measurement<double>>> _measurements = new();
private readonly ConcurrentDictionary<string, ObservableGauge<double>> _gauges = new ConcurrentDictionary<string, ObservableGauge<double>>();
private readonly ConcurrentDictionary<string, HashSet<Measurement<double>>> _measurements = new ConcurrentDictionary<string, HashSet<Measurement<double>>>();

public async Task ReportMetricAsync(string metricName, string metricDescription, double metricValue, Dictionary<string, string> labels)
public Task ReportMetricAsync(string metricName, string metricDescription, double metricValue, Dictionary<string, string> labels)
{
Guard.NotNullOrEmpty(metricName, nameof(metricName));

// TODO: Move to factory instead?
// TODO: Move to factory instead?
if (_gauges.ContainsKey(metricName) == false)
{
InitializeNewMetric(metricName, metricDescription);
}

var composedTags = labels.Select(kvp => new KeyValuePair<string, object?>(kvp.Key, kvp.Value)).ToArray();
var newMeasurement = new Measurement<double>(metricValue, composedTags);
var channelWriter = _measurements[metricName].Writer;
await channelWriter.WriteAsync(newMeasurement);

_logger.LogTrace("Metric {MetricName} with value {MetricValue} was pushed to OpenTelemetry Collector", metricName, metricValue);
_measurements[metricName].Add(newMeasurement);

_logger.LogTrace("Metric {MetricName} with value {MetricValue} and labels {Labels} was pushed to OpenTelemetry Collector", metricName, metricValue, composedTags);

return Task.CompletedTask;
}

private void InitializeNewMetric(string metricName, string metricDescription)
{
var gauge = azureMonitorMeter.CreateObservableGauge(metricName, description: metricDescription, observeValues: () => ReportMeasurementsForMetricAsync(metricName).Result);
var gauge = azureMonitorMeter.CreateObservableGauge<double>(metricName, description: metricDescription, observeValues: () => ReportMeasurementsForMetric(metricName));
_gauges.TryAdd(metricName, gauge);

_measurements.TryAdd(metricName, CreateNewMeasurementChannel());
_measurements.TryAdd(metricName, []);
}

private async Task<IEnumerable<Measurement<double>>> ReportMeasurementsForMetricAsync(string metricName)
private IEnumerable<Measurement<double>> ReportMeasurementsForMetric(string metricName)
{
List<Measurement<double>> measurementsToReport = new List<Measurement<double>>();
var channel = _measurements[metricName];

var totalCount = channel.Reader.Count;
var readItems = 0;
do
{
var item = await channel.Reader.ReadAsync();
measurementsToReport.Add(item);
readItems++;
}
while (readItems < totalCount);
var recordedMeasurements = _measurements[metricName];

return measurementsToReport;
}
var measurementsToReport = Interlocked.Exchange(ref recordedMeasurements, []);

static Channel<Measurement<double>> CreateNewMeasurementChannel()
{
return Channel.CreateUnbounded<Measurement<double>>();
return measurementsToReport;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal PrometheusClient CreateForOpenTelemetryCollector(IConfiguration configu
var baseUri = configuration["OpenTelemetry:Collector:Uri"];
var metricNamespace = configuration["OpenTelemetry:Collector:MetricNamespace"];

_logger.LogInformation("Creating Prometheus client for {BaseUri}/metrics with metric namespace {metricNamespace}", baseUri, metricNamespace);
_logger.LogWarning("Creating Prometheus client for {BaseUri}/metrics with metric namespace {metricNamespace}", baseUri, metricNamespace);

return new PrometheusClient(baseUri, "/metrics", metricNamespace, _logger);
}
Expand Down
Loading