Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do Not Merge] Metrics Async Pull API #2517

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,19 +198,19 @@ private delegate Task WriteMetricsFunc(
/// <summary>
/// Serialize metrics to prometheus format.
/// </summary>
/// <param name="exporter"><see cref="PrometheusExporter"/>.</param>
/// <param name="metrics">Metrics to write.</param>
/// <param name="stream">Stream to write to.</param>
/// <param name="getUtcNowDateTimeOffset">Optional function to resolve the current date &amp; time.</param>
/// <returns><see cref="Task"/> to await the operation.</returns>
public static async Task WriteMetricsCollection(
this PrometheusExporter exporter,
Batch<Metric> metrics,
Stream stream,
Func<DateTimeOffset> getUtcNowDateTimeOffset)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(8192);
try
{
foreach (var metric in exporter.Metrics)
foreach (var metric in metrics)
{
if (!MetricInfoCache.TryGetValue(metric.Name, out MetricInfo metricInfo))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,28 @@ private void WorkerThread()

private async Task ProcessExportRequest(HttpListenerContext context)
{
try
this.exporter.OnPullExportAsync = async (metrics) =>
{
this.exporter.Collect(Timeout.Infinite);
try
{
context.Response.StatusCode = 200;
context.Response.ContentType = PrometheusMetricsFormatHelper.ContentType;

context.Response.StatusCode = 200;
context.Response.ContentType = PrometheusMetricsFormatHelper.ContentType;
await PrometheusExporterExtensions.WriteMetricsCollection(metrics, context.Response.OutputStream, this.exporter.Options.GetUtcNowDateTimeOffset).ConfigureAwait(false);

await this.exporter.WriteMetricsCollection(context.Response.OutputStream, this.exporter.Options.GetUtcNowDateTimeOffset).ConfigureAwait(false);
}
catch (Exception ex)
{
PrometheusExporterEventSource.Log.FailedExport(ex);
return ExportResult.Success;
}
catch (Exception ex)
{
context.Response.StatusCode = 500;
PrometheusExporterEventSource.Log.FailedExport(ex);
return ExportResult.Failure;
}
};

context.Response.StatusCode = 500;
try
{
await this.exporter.CollectAndPullAsync(Timeout.Infinite).ConfigureAwait(false);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,28 @@ public async Task InvokeAsync(HttpContext httpContext)
return;
}

try
this.exporter.OnPullExportAsync = async (metrics) =>
{
this.exporter.Collect(Timeout.Infinite);

await WriteMetricsToResponse(this.exporter, response).ConfigureAwait(false);
}
catch (Exception ex)
{
if (!response.HasStarted)
try
{
response.StatusCode = 500;
await WriteMetricsToResponse(this.exporter, metrics, response).ConfigureAwait(false);
return ExportResult.Success;
}
catch (Exception ex)
{
if (!response.HasStarted)
{
response.StatusCode = 500;
}

PrometheusExporterEventSource.Log.FailedExport(ex);
PrometheusExporterEventSource.Log.FailedExport(ex);
return ExportResult.Failure;
}
};

try
{
await this.exporter.CollectAndPullAsync(Timeout.Infinite).ConfigureAwait(false);
}
finally
{
Expand All @@ -91,12 +99,12 @@ public async Task InvokeAsync(HttpContext httpContext)
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static async Task WriteMetricsToResponse(PrometheusExporter exporter, HttpResponse response)
internal static async Task WriteMetricsToResponse(PrometheusExporter exporter, Batch<Metric> metrics, HttpResponse response)
{
response.StatusCode = 200;
response.ContentType = PrometheusMetricsFormatHelper.ContentType;

await exporter.WriteMetricsCollection(response.Body, exporter.Options.GetUtcNowDateTimeOffset).ConfigureAwait(false);
await PrometheusExporterExtensions.WriteMetricsCollection(metrics, response.Body, exporter.Options.GetUtcNowDateTimeOffset).ConfigureAwait(false);
}
}
}
Expand Down
33 changes: 26 additions & 7 deletions src/OpenTelemetry.Exporter.Prometheus/PrometheusExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Exporter.Prometheus;
using OpenTelemetry.Metrics;

Expand All @@ -30,10 +31,10 @@ public class PrometheusExporter : BaseExporter<Metric>, IPullMetricExporter
{
internal const string HttpListenerStartFailureExceptionMessage = "PrometheusExporter http listener could not be started.";
internal readonly PrometheusExporterOptions Options;
internal Batch<Metric> Metrics;
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
private readonly PrometheusExporterMetricsHttpServer metricsHttpServer;
private Func<int, bool> funcCollect;
private Func<int, Task<bool>> funcCollectAndPullAsync;
private Func<Batch<Metric>, Task<ExportResult>> funcPullExportAsync;
private bool disposed;

/// <summary>
Expand All @@ -58,16 +59,34 @@ public PrometheusExporter(PrometheusExporterOptions options)
}
}

public Func<int, bool> Collect
public Func<int, Task<bool>> CollectAndPullAsync
{
get => this.funcCollect;
set => this.funcCollect = value;
get => this.funcCollectAndPullAsync;
set => this.funcCollectAndPullAsync = value;
}

internal Func<Batch<Metric>, Task<ExportResult>> OnPullExportAsync
{
get => this.funcPullExportAsync;
set => this.funcPullExportAsync = value;
}

public override ExportResult Export(in Batch<Metric> metrics)
{
this.Metrics = metrics;
return ExportResult.Success;
// todo: Implement PUSH export here.
throw new NotImplementedException();
}

public Task<ExportResult> PullAsync(Batch<Metric> metrics)
{
var pullFunc = this.funcPullExportAsync;
if (pullFunc == null)
{
// todo: Log an error.
return Task.FromResult(ExportResult.Failure);
}

return pullFunc(metrics);
}

internal bool TryEnterSemaphore()
Expand Down
39 changes: 35 additions & 4 deletions src/OpenTelemetry/Metrics/BaseExportingMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Metrics
Expand Down Expand Up @@ -53,15 +54,15 @@ public BaseExportingMetricReader(BaseExporter<Metric> exporter)
{
if (this.supportedExportModes.HasFlag(ExportModes.Push))
{
pullExporter.Collect = this.Collect;
pullExporter.CollectAndPullAsync = this.CollectAsync;
}
else
{
pullExporter.Collect = (timeoutMilliseconds) =>
pullExporter.CollectAndPullAsync = async (timeoutMilliseconds) =>
{
using (PullMetricScope.Begin())
{
return this.Collect(timeoutMilliseconds);
return await this.CollectAsync(timeoutMilliseconds).ConfigureAwait(false);
}
};
}
Expand All @@ -85,6 +86,19 @@ protected override bool ProcessMetrics(in Batch<Metric> metrics, int timeoutMill
return this.exporter.Export(metrics) == ExportResult.Success;
}

/// <inheritdoc/>
protected override async Task<bool> ProcessMetricsAsync(Batch<Metric> metrics, int timeoutMilliseconds)
{
if (PullMetricScope.IsPullAllowed && this.exporter is IPullMetricExporter pullMetricExporter)
{
// TODO: Do we need to consider timeout here?
return await pullMetricExporter.PullAsync(metrics).ConfigureAwait(false) == ExportResult.Success;
}

// TODO: Do we need to consider timeout here?
return this.exporter.Export(metrics) == ExportResult.Success;
}

/// <inheritdoc />
protected override bool OnCollect(int timeoutMilliseconds)
{
Expand All @@ -102,6 +116,23 @@ protected override bool OnCollect(int timeoutMilliseconds)
return false;
}

/// <inheritdoc />
protected override Task<bool> OnCollectAsync(int timeoutMilliseconds)
{
if (this.supportedExportModes.HasFlag(ExportModes.Push))
{
return base.OnCollectAsync(timeoutMilliseconds);
}

if (this.supportedExportModes.HasFlag(ExportModes.Pull) && PullMetricScope.IsPullAllowed)
{
return base.OnCollectAsync(timeoutMilliseconds);
}

// TODO: add some error log
return Task.FromResult(false);
}

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
Expand Down Expand Up @@ -134,7 +165,7 @@ protected override void Dispose(bool disposing)
{
if (this.exporter is IPullMetricExporter pullExporter)
{
pullExporter.Collect = null;
pullExporter.CollectAndPullAsync = null;
}

this.exporter.Dispose();
Expand Down
5 changes: 4 additions & 1 deletion src/OpenTelemetry/Metrics/IPullMetricExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// </copyright>

using System;
using System.Threading.Tasks;

namespace OpenTelemetry.Metrics
{
Expand All @@ -23,6 +24,8 @@ namespace OpenTelemetry.Metrics
/// </summary>
public interface IPullMetricExporter
{
Func<int, bool> Collect { get; set; }
Func<int, Task<bool>> CollectAndPullAsync { get; set; }

Task<ExportResult> PullAsync(Batch<Metric> batch);
}
}
82 changes: 78 additions & 4 deletions src/OpenTelemetry/Metrics/MetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public abstract class MetricReader : IDisposable
{
private const AggregationTemporality CumulativeAndDelta = AggregationTemporality.Cumulative | AggregationTemporality.Delta;
private readonly object newTaskLock = new object();
private readonly object onCollectLock = new object();
private readonly TaskCompletionSource<bool> shutdownTcs = new TaskCompletionSource<bool>();
private AggregationTemporality preferredAggregationTemporality = CumulativeAndDelta;
private AggregationTemporality supportedAggregationTemporality = CumulativeAndDelta;
Expand Down Expand Up @@ -106,16 +105,62 @@ public bool Collect(int timeoutMilliseconds = Timeout.Infinite)
var result = false;
try
{
lock (this.onCollectLock)
result = this.OnCollect(timeoutMilliseconds);
}
catch (Exception)
{
// TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Shutdown), ex);
}
finally
{
this.collectionTcs = null;
}

tcs.TrySetResult(result);
return result;
}

public async Task<bool> CollectAsync(int timeoutMilliseconds = Timeout.Infinite)
{
Guard.InvalidTimeout(timeoutMilliseconds, nameof(timeoutMilliseconds));

var shouldRunCollect = false;
var tcs = this.collectionTcs;

if (tcs == null)
{
lock (this.newTaskLock)
{
this.collectionTcs = null;
result = this.OnCollect(timeoutMilliseconds);
tcs = this.collectionTcs;

if (tcs == null)
{
shouldRunCollect = true;
tcs = new TaskCompletionSource<bool>();
this.collectionTcs = tcs;
}
}
}

if (!shouldRunCollect)
{
Task completedTask = await Task.WhenAny(tcs.Task, this.shutdownTcs.Task, Task.Delay(timeoutMilliseconds)).ConfigureAwait(false);
return completedTask == tcs.Task && tcs.Task.Result;
}

var result = false;
try
{
result = await this.OnCollectAsync(timeoutMilliseconds).ConfigureAwait(false);
}
catch (Exception)
{
// TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Shutdown), ex);
}
finally
{
this.collectionTcs = null;
}

tcs.TrySetResult(result);
return result;
Expand Down Expand Up @@ -188,6 +233,11 @@ internal virtual void SetParentProvider(BaseProvider parentProvider)
/// </returns>
protected abstract bool ProcessMetrics(in Batch<Metric> metrics, int timeoutMilliseconds);

protected virtual Task<bool> ProcessMetricsAsync(Batch<Metric> metrics, int timeoutMilliseconds)
{
return Task.FromResult(this.ProcessMetrics(metrics, timeoutMilliseconds));
}

/// <summary>
/// Called by <c>Collect</c>. This function should block the current
/// thread until metrics collection completed, shutdown signaled or
Expand Down Expand Up @@ -229,6 +279,30 @@ protected virtual bool OnCollect(int timeoutMilliseconds)
}
}

protected virtual Task<bool> OnCollectAsync(int timeoutMilliseconds)
{
var sw = Stopwatch.StartNew();

var collectMetric = this.ParentProvider.GetMetricCollect();
var metrics = collectMetric();

if (timeoutMilliseconds == Timeout.Infinite)
{
return this.ProcessMetricsAsync(metrics, Timeout.Infinite);
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return Task.FromResult(false);
}

return this.ProcessMetricsAsync(metrics, (int)timeout);
}
}

/// <summary>
/// Called by <c>Shutdown</c>. This function should block the current
/// thread until shutdown completed or timed out.
Expand Down
Loading