Skip to content

Commit

Permalink
Refactor MetricReader (#2385)
Browse files Browse the repository at this point in the history
  • Loading branch information
reyang authored Sep 20, 2021
1 parent 58054d1 commit 1d63b31
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 106 deletions.
10 changes: 2 additions & 8 deletions docs/metrics/extending-the-sdk/MyReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,9 @@ public MyReader(string name = "MyReader")
this.name = name;
}

protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
Console.WriteLine($"{this.name}.OnCollect(metrics={metrics}, timeoutMilliseconds={timeoutMilliseconds})");
return true;
}

protected override bool OnForceFlush(int timeoutMilliseconds)
{
Console.WriteLine($"{this.name}.OnForceFlush(timeoutMilliseconds={timeoutMilliseconds})");
Console.WriteLine($"{this.name}.ProcessMetrics(metrics={metrics}, timeoutMilliseconds={timeoutMilliseconds})");
return true;
}

Expand Down
9 changes: 1 addition & 8 deletions src/OpenTelemetry/Metrics/BaseExportingMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,11 @@ internal override void SetParentProvider(BaseProvider parentProvider)
}

/// <inheritdoc/>
protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
return this.exporter.Export(metrics) == ExportResult.Success;
}

/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
{
// TODO: need to hammer this out
return true;
}

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
Expand Down
55 changes: 13 additions & 42 deletions src/OpenTelemetry/Metrics/CompositeMetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,72 +69,43 @@ public CompositeMetricReader AddReader(MetricReader reader)
}

/// <inheritdoc/>
public override bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
var cur = this.head;
var result = true;
var sw = Stopwatch.StartNew();

while (cur != null)
{
if (timeoutMilliseconds == Timeout.Infinite)
{
result = cur.Value.Collect(Timeout.Infinite) && result;
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

// notify all the readers, even if we run overtime
result = cur.Value.Collect((int)Math.Max(timeout, 0)) && result;
}

cur = cur.Next;
}

return result;
}

protected override bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds)
protected override bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds)
{
// CompositeMetricReader delegates the work to its underlying readers,
// so CompositeMetricReader.OnCollect should never be called.
// so CompositeMetricReader.ProcessMetrics should never be called.
throw new NotImplementedException();
}

/// <inheritdoc/>
protected override bool OnForceFlush(int timeoutMilliseconds)
protected override bool OnCollect(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative.");
}

var result = true;
var cur = this.head;
var sw = Stopwatch.StartNew();

while (cur != null)
{
if (timeoutMilliseconds == Timeout.Infinite)
{
_ = cur.Value.ForceFlush(Timeout.Infinite);
result = cur.Value.Collect(Timeout.Infinite) && result;
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

var succeeded = cur.Value.ForceFlush((int)timeout);

if (!succeeded)
{
return false;
}
// notify all the readers, even if we run overtime
result = cur.Value.Collect((int)Math.Max(timeout, 0)) && result;
}

cur = cur.Next;
}

return true;
return result;
}

/// <inheritdoc/>
Expand Down
2 changes: 1 addition & 1 deletion src/OpenTelemetry/Metrics/MeterProviderSdk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ internal Batch<Metric> Collect()
/// </remarks>
internal bool OnForceFlush(int timeoutMilliseconds)
{
return this.reader?.ForceFlush(timeoutMilliseconds) ?? true;
return this.reader?.Collect(timeoutMilliseconds) ?? true;
}

/// <summary>
Expand Down
101 changes: 54 additions & 47 deletions src/OpenTelemetry/Metrics/MetricReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,50 +49,24 @@ public AggregationTemporality SupportedAggregationTemporality
}
}

public virtual bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
var sw = Stopwatch.StartNew();

var collectMetric = this.ParentProvider.GetMetricCollect();
var metricsCollected = collectMetric();

if (timeoutMilliseconds == Timeout.Infinite)
{
this.OnCollect(metricsCollected, Timeout.Infinite);
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

return this.OnCollect(metricsCollected, (int)timeout);
}

return true;
}

/// <summary>
/// Flushes the processor, blocks the current thread until flush
/// completed, shutdown signaled or timed out.
/// Attempts to collect the metrics, blocks the current thread until
/// metrics collection completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
/// Returns <c>true</c> when metrics collection succeeded; otherwise, <c>false</c>.
/// </returns>
/// <exception cref="System.ArgumentOutOfRangeException">
/// Thrown when the <c>timeoutMilliseconds</c> is smaller than -1.
/// </exception>
/// <remarks>
/// This function guarantees thread-safety.
/// </remarks>
public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
public bool Collect(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
Expand All @@ -101,11 +75,11 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)

try
{
return this.OnForceFlush(timeoutMilliseconds);
return this.OnCollect(timeoutMilliseconds);
}
catch (Exception)
{
// TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ForceFlush), ex);
// TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Collect), ex);
return false;
}
}
Expand All @@ -115,8 +89,8 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
/// shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand Down Expand Up @@ -163,36 +137,69 @@ internal virtual void SetParentProvider(BaseProvider parentProvider)
this.ParentProvider = parentProvider;
}

protected abstract bool OnCollect(Batch<Metric> metrics, int timeoutMilliseconds);
/// <summary>
/// Processes a batch of metrics.
/// </summary>
/// <param name="metrics">Batch of metrics to be processed.</param>
/// <param name="timeoutMilliseconds">
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when metrics processing succeeded; otherwise,
/// <c>false</c>.
/// </returns>
protected abstract bool ProcessMetrics(Batch<Metric> metrics, int timeoutMilliseconds);

/// <summary>
/// Called by <c>ForceFlush</c>. This function should block the current
/// thread until flush completed, shutdown signaled or timed out.
/// Called by <c>Collect</c>. This function should block the current
/// thread until metrics collection completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush succeeded; otherwise, <c>false</c>.
/// Returns <c>true</c> when metrics collection succeeded; otherwise,
/// <c>false</c>.
/// </returns>
/// <remarks>
/// This function is called synchronously on the thread which called
/// <c>ForceFlush</c>. This function should be thread-safe, and should
/// <c>Collect</c>. This function should be thread-safe, and should
/// not throw exceptions.
/// </remarks>
protected virtual bool OnForceFlush(int timeoutMilliseconds)
protected virtual bool OnCollect(int timeoutMilliseconds)
{
return true;
var sw = Stopwatch.StartNew();

var collectMetric = this.ParentProvider.GetMetricCollect();
var metrics = collectMetric();

if (timeoutMilliseconds == Timeout.Infinite)
{
return this.ProcessMetrics(metrics, Timeout.Infinite);
}
else
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

return this.ProcessMetrics(metrics, (int)timeout);
}
}

/// <summary>
/// Called by <c>Shutdown</c>. This function should block the current
/// thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// The number (non-negative) of milliseconds to wait, or
/// <c>Timeout.Infinite</c> to wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when shutdown succeeded; otherwise, <c>false</c>.
Expand Down

0 comments on commit 1d63b31

Please sign in to comment.