From 1d63b31bf554fc8d269e22970c21f27c96ca197f Mon Sep 17 00:00:00 2001 From: Reiley Yang Date: Mon, 20 Sep 2021 11:50:56 -0700 Subject: [PATCH] Refactor MetricReader (#2385) --- docs/metrics/extending-the-sdk/MyReader.cs | 10 +- .../Metrics/BaseExportingMetricReader.cs | 9 +- .../Metrics/CompositeMetricReader.cs | 55 +++------- src/OpenTelemetry/Metrics/MeterProviderSdk.cs | 2 +- src/OpenTelemetry/Metrics/MetricReader.cs | 101 ++++++++++-------- 5 files changed, 71 insertions(+), 106 deletions(-) diff --git a/docs/metrics/extending-the-sdk/MyReader.cs b/docs/metrics/extending-the-sdk/MyReader.cs index 74208f26f1c..84cb64eabb0 100644 --- a/docs/metrics/extending-the-sdk/MyReader.cs +++ b/docs/metrics/extending-the-sdk/MyReader.cs @@ -27,15 +27,9 @@ public MyReader(string name = "MyReader") this.name = name; } - protected override bool OnCollect(Batch metrics, int timeoutMilliseconds) + protected override bool ProcessMetrics(Batch metrics, int timeoutMilliseconds) { - Console.WriteLine($"{this.name}.OnCollect(metrics={metrics}, timeoutMilliseconds={timeoutMilliseconds})"); - return true; - } - - protected override bool OnForceFlush(int timeoutMilliseconds) - { - Console.WriteLine($"{this.name}.OnForceFlush(timeoutMilliseconds={timeoutMilliseconds})"); + Console.WriteLine($"{this.name}.ProcessMetrics(metrics={metrics}, timeoutMilliseconds={timeoutMilliseconds})"); return true; } diff --git a/src/OpenTelemetry/Metrics/BaseExportingMetricReader.cs b/src/OpenTelemetry/Metrics/BaseExportingMetricReader.cs index b1c73f744f5..31e6f61016c 100644 --- a/src/OpenTelemetry/Metrics/BaseExportingMetricReader.cs +++ b/src/OpenTelemetry/Metrics/BaseExportingMetricReader.cs @@ -54,18 +54,11 @@ internal override void SetParentProvider(BaseProvider parentProvider) } /// - protected override bool OnCollect(Batch metrics, int timeoutMilliseconds) + protected override bool ProcessMetrics(Batch metrics, int timeoutMilliseconds) { return this.exporter.Export(metrics) == ExportResult.Success; } - /// - protected override bool OnForceFlush(int timeoutMilliseconds) - { - // TODO: need to hammer this out - return true; - } - /// protected override bool OnShutdown(int timeoutMilliseconds) { diff --git a/src/OpenTelemetry/Metrics/CompositeMetricReader.cs b/src/OpenTelemetry/Metrics/CompositeMetricReader.cs index d9fc6c0cd49..6124ebb243e 100644 --- a/src/OpenTelemetry/Metrics/CompositeMetricReader.cs +++ b/src/OpenTelemetry/Metrics/CompositeMetricReader.cs @@ -69,42 +69,22 @@ public CompositeMetricReader AddReader(MetricReader reader) } /// - public override bool Collect(int timeoutMilliseconds = Timeout.Infinite) - { - var cur = this.head; - var result = true; - var sw = Stopwatch.StartNew(); - - while (cur != null) - { - if (timeoutMilliseconds == Timeout.Infinite) - { - result = cur.Value.Collect(Timeout.Infinite) && result; - } - else - { - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - - // notify all the readers, even if we run overtime - result = cur.Value.Collect((int)Math.Max(timeout, 0)) && result; - } - - cur = cur.Next; - } - - return result; - } - - protected override bool OnCollect(Batch metrics, int timeoutMilliseconds) + protected override bool ProcessMetrics(Batch metrics, int timeoutMilliseconds) { // CompositeMetricReader delegates the work to its underlying readers, - // so CompositeMetricReader.OnCollect should never be called. + // so CompositeMetricReader.ProcessMetrics should never be called. throw new NotImplementedException(); } /// - protected override bool OnForceFlush(int timeoutMilliseconds) + protected override bool OnCollect(int timeoutMilliseconds = Timeout.Infinite) { + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) + { + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); + } + + var result = true; var cur = this.head; var sw = Stopwatch.StartNew(); @@ -112,29 +92,20 @@ protected override bool OnForceFlush(int timeoutMilliseconds) { if (timeoutMilliseconds == Timeout.Infinite) { - _ = cur.Value.ForceFlush(Timeout.Infinite); + result = cur.Value.Collect(Timeout.Infinite) && result; } else { var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - if (timeout <= 0) - { - return false; - } - - var succeeded = cur.Value.ForceFlush((int)timeout); - - if (!succeeded) - { - return false; - } + // notify all the readers, even if we run overtime + result = cur.Value.Collect((int)Math.Max(timeout, 0)) && result; } cur = cur.Next; } - return true; + return result; } /// diff --git a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs index ef5e9e8b07d..c4eb259b708 100644 --- a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs +++ b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs @@ -200,7 +200,7 @@ internal Batch Collect() /// internal bool OnForceFlush(int timeoutMilliseconds) { - return this.reader?.ForceFlush(timeoutMilliseconds) ?? true; + return this.reader?.Collect(timeoutMilliseconds) ?? true; } /// diff --git a/src/OpenTelemetry/Metrics/MetricReader.cs b/src/OpenTelemetry/Metrics/MetricReader.cs index d3bf0ee4148..cc11dd81588 100644 --- a/src/OpenTelemetry/Metrics/MetricReader.cs +++ b/src/OpenTelemetry/Metrics/MetricReader.cs @@ -49,42 +49,16 @@ public AggregationTemporality SupportedAggregationTemporality } } - public virtual bool Collect(int timeoutMilliseconds = Timeout.Infinite) - { - var sw = Stopwatch.StartNew(); - - var collectMetric = this.ParentProvider.GetMetricCollect(); - var metricsCollected = collectMetric(); - - if (timeoutMilliseconds == Timeout.Infinite) - { - this.OnCollect(metricsCollected, Timeout.Infinite); - } - else - { - var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - - if (timeout <= 0) - { - return false; - } - - return this.OnCollect(metricsCollected, (int)timeout); - } - - return true; - } - /// - /// Flushes the processor, blocks the current thread until flush - /// completed, shutdown signaled or timed out. + /// Attempts to collect the metrics, blocks the current thread until + /// metrics collection completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// - /// Returns true when flush succeeded; otherwise, false. + /// Returns true when metrics collection succeeded; otherwise, false. /// /// /// Thrown when the timeoutMilliseconds is smaller than -1. @@ -92,7 +66,7 @@ public virtual bool Collect(int timeoutMilliseconds = Timeout.Infinite) /// /// This function guarantees thread-safety. /// - public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) + public bool Collect(int timeoutMilliseconds = Timeout.Infinite) { if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { @@ -101,11 +75,11 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) try { - return this.OnForceFlush(timeoutMilliseconds); + return this.OnCollect(timeoutMilliseconds); } catch (Exception) { - // TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ForceFlush), ex); + // TODO: OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Collect), ex); return false; } } @@ -115,8 +89,8 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) /// shutdown completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. @@ -163,27 +137,60 @@ internal virtual void SetParentProvider(BaseProvider parentProvider) this.ParentProvider = parentProvider; } - protected abstract bool OnCollect(Batch metrics, int timeoutMilliseconds); + /// + /// Processes a batch of metrics. + /// + /// Batch of metrics to be processed. + /// + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. + /// + /// + /// Returns true when metrics processing succeeded; otherwise, + /// false. + /// + protected abstract bool ProcessMetrics(Batch metrics, int timeoutMilliseconds); /// - /// Called by ForceFlush. This function should block the current - /// thread until flush completed, shutdown signaled or timed out. + /// Called by Collect. This function should block the current + /// thread until metrics collection completed, shutdown signaled or + /// timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// - /// Returns true when flush succeeded; otherwise, false. + /// Returns true when metrics collection succeeded; otherwise, + /// false. /// /// /// This function is called synchronously on the thread which called - /// ForceFlush. This function should be thread-safe, and should + /// Collect. This function should be thread-safe, and should /// not throw exceptions. /// - protected virtual bool OnForceFlush(int timeoutMilliseconds) + protected virtual bool OnCollect(int timeoutMilliseconds) { - return true; + var sw = Stopwatch.StartNew(); + + var collectMetric = this.ParentProvider.GetMetricCollect(); + var metrics = collectMetric(); + + if (timeoutMilliseconds == Timeout.Infinite) + { + return this.ProcessMetrics(metrics, Timeout.Infinite); + } + else + { + var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; + + if (timeout <= 0) + { + return false; + } + + return this.ProcessMetrics(metrics, (int)timeout); + } } /// @@ -191,8 +198,8 @@ protected virtual bool OnForceFlush(int timeoutMilliseconds) /// thread until shutdown completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false.