From 47e1c5e22f1cc43c86d9c4be693d81d82dfdda70 Mon Sep 17 00:00:00 2001 From: Reiley Yang Date: Mon, 20 Sep 2021 14:08:09 -0700 Subject: [PATCH] Update ForceFlush behavior to meet with the latest spec requirements (#2388) --- src/OpenTelemetry/BaseExporter.cs | 10 +++---- src/OpenTelemetry/BaseProcessor.cs | 20 ++++++------- src/OpenTelemetry/CHANGELOG.md | 5 ++++ src/OpenTelemetry/CompositeProcessor.cs | 19 ++++--------- .../Metrics/CompositeMetricReader.cs | 5 ---- .../Metrics/MeterProviderExtensions.cs | 28 +++++++++---------- src/OpenTelemetry/Metrics/MeterProviderSdk.cs | 8 +++--- src/OpenTelemetry/Metrics/MetricReader.cs | 4 +-- .../Trace/TracerProviderExtensions.cs | 28 +++++++++---------- src/OpenTelemetry/Trace/TracerProviderSdk.cs | 4 +-- .../Trace/CompositeActivityProcessorTests.cs | 12 ++------ 11 files changed, 63 insertions(+), 80 deletions(-) diff --git a/src/OpenTelemetry/BaseExporter.cs b/src/OpenTelemetry/BaseExporter.cs index 9a281063225..9a3ac8dd4fb 100644 --- a/src/OpenTelemetry/BaseExporter.cs +++ b/src/OpenTelemetry/BaseExporter.cs @@ -62,8 +62,8 @@ public abstract class BaseExporter : IDisposable /// shutdown completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. @@ -79,7 +79,7 @@ public bool Shutdown(int timeoutMilliseconds = Timeout.Infinite) { if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); } if (Interlocked.Increment(ref this.shutdownCount) > 1) @@ -110,8 +110,8 @@ public void Dispose() /// thread until shutdown completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. diff --git a/src/OpenTelemetry/BaseProcessor.cs b/src/OpenTelemetry/BaseProcessor.cs index e8cf98ae0e3..c4734f6a828 100644 --- a/src/OpenTelemetry/BaseProcessor.cs +++ b/src/OpenTelemetry/BaseProcessor.cs @@ -68,8 +68,8 @@ public virtual void OnEnd(T data) /// completed, shutdown signaled or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when flush succeeded; otherwise, false. @@ -84,7 +84,7 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); } try @@ -103,8 +103,8 @@ public bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) /// shutdown completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. @@ -120,7 +120,7 @@ public bool Shutdown(int timeoutMilliseconds = Timeout.Infinite) { if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); } if (Interlocked.Increment(ref this.shutdownCount) > 1) @@ -156,8 +156,8 @@ internal virtual void SetParentProvider(BaseProvider parentProvider) /// thread until flush completed, shutdown signaled or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when flush succeeded; otherwise, false. @@ -177,8 +177,8 @@ protected virtual bool OnForceFlush(int timeoutMilliseconds) /// thread until shutdown completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 8459599b179..0dd390ad1bd 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +* Changed `CompositeProcessor.OnForceFlush` to meet with the spec + requirement. Now the SDK will invoke `ForceFlush` on all registered + processors, even if there is a timeout. + ([#2388](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2388)) + ## 1.2.0-alpha3 Released 2021-Sep-13 diff --git a/src/OpenTelemetry/CompositeProcessor.cs b/src/OpenTelemetry/CompositeProcessor.cs index cf2a2861ef6..65ccccffed8 100644 --- a/src/OpenTelemetry/CompositeProcessor.cs +++ b/src/OpenTelemetry/CompositeProcessor.cs @@ -95,37 +95,28 @@ public override void OnStart(T data) /// protected override bool OnForceFlush(int timeoutMilliseconds) { + var result = true; var cur = this.head; - var sw = Stopwatch.StartNew(); while (cur != null) { if (timeoutMilliseconds == Timeout.Infinite) { - _ = cur.Value.ForceFlush(Timeout.Infinite); + result = cur.Value.ForceFlush(Timeout.Infinite) && result; } else { var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; - if (timeout <= 0) - { - return false; - } - - var succeeded = cur.Value.ForceFlush((int)timeout); - - if (!succeeded) - { - return false; - } + // notify all the processors, even if we run overtime + result = cur.Value.ForceFlush((int)Math.Max(timeout, 0)) && result; } cur = cur.Next; } - return true; + return result; } /// diff --git a/src/OpenTelemetry/Metrics/CompositeMetricReader.cs b/src/OpenTelemetry/Metrics/CompositeMetricReader.cs index 6124ebb243e..f422095e84b 100644 --- a/src/OpenTelemetry/Metrics/CompositeMetricReader.cs +++ b/src/OpenTelemetry/Metrics/CompositeMetricReader.cs @@ -79,11 +79,6 @@ protected override bool ProcessMetrics(Batch metrics, int timeoutMillise /// protected override bool OnCollect(int timeoutMilliseconds = Timeout.Infinite) { - if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) - { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); - } - var result = true; var cur = this.head; var sw = Stopwatch.StartNew(); diff --git a/src/OpenTelemetry/Metrics/MeterProviderExtensions.cs b/src/OpenTelemetry/Metrics/MeterProviderExtensions.cs index b4d318ab9c4..be45f12f7ff 100644 --- a/src/OpenTelemetry/Metrics/MeterProviderExtensions.cs +++ b/src/OpenTelemetry/Metrics/MeterProviderExtensions.cs @@ -29,8 +29,8 @@ public static class MeterProviderExtensions /// /// MeterProviderSdk instance on which ForceFlush will be called. /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when force flush succeeded; otherwise, false. @@ -48,13 +48,13 @@ public static bool ForceFlush(this MeterProvider provider, int timeoutMillisecon throw new ArgumentNullException(nameof(provider)); } - if (provider is MeterProviderSdk meterProviderSdk) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) - { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); - } + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); + } + if (provider is MeterProviderSdk meterProviderSdk) + { try { return meterProviderSdk.OnForceFlush(timeoutMilliseconds); @@ -76,8 +76,8 @@ public static bool ForceFlush(this MeterProvider provider, int timeoutMillisecon /// /// MeterProviderSdk instance on which Shutdown will be called. /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. @@ -96,13 +96,13 @@ public static bool Shutdown(this MeterProvider provider, int timeoutMilliseconds throw new ArgumentNullException(nameof(provider)); } - if (provider is MeterProviderSdk meterProviderSdk) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) - { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); - } + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); + } + if (provider is MeterProviderSdk meterProviderSdk) + { if (Interlocked.Increment(ref meterProviderSdk.ShutdownCount) > 1) { return false; // shutdown already called diff --git a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs index c4eb259b708..10c1326885c 100644 --- a/src/OpenTelemetry/Metrics/MeterProviderSdk.cs +++ b/src/OpenTelemetry/Metrics/MeterProviderSdk.cs @@ -187,8 +187,8 @@ internal Batch Collect() /// thread until flush completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when flush succeeded; otherwise, false. @@ -208,8 +208,8 @@ internal bool OnForceFlush(int timeoutMilliseconds) /// thread until shutdown completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. diff --git a/src/OpenTelemetry/Metrics/MetricReader.cs b/src/OpenTelemetry/Metrics/MetricReader.cs index cc11dd81588..fe300d75527 100644 --- a/src/OpenTelemetry/Metrics/MetricReader.cs +++ b/src/OpenTelemetry/Metrics/MetricReader.cs @@ -70,7 +70,7 @@ public bool Collect(int timeoutMilliseconds = Timeout.Infinite) { if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); } try @@ -106,7 +106,7 @@ public bool Shutdown(int timeoutMilliseconds = Timeout.Infinite) { if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); } if (Interlocked.Increment(ref this.shutdownCount) > 1) diff --git a/src/OpenTelemetry/Trace/TracerProviderExtensions.cs b/src/OpenTelemetry/Trace/TracerProviderExtensions.cs index b07a3ab54ec..a1b2ef9415d 100644 --- a/src/OpenTelemetry/Trace/TracerProviderExtensions.cs +++ b/src/OpenTelemetry/Trace/TracerProviderExtensions.cs @@ -49,8 +49,8 @@ public static TracerProvider AddProcessor(this TracerProvider provider, BaseProc /// /// TracerProviderSdk instance on which ForceFlush will be called. /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when force flush succeeded; otherwise, false. @@ -68,13 +68,13 @@ public static bool ForceFlush(this TracerProvider provider, int timeoutMilliseco throw new ArgumentNullException(nameof(provider)); } - if (provider is TracerProviderSdk tracerProviderSdk) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) - { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); - } + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); + } + if (provider is TracerProviderSdk tracerProviderSdk) + { try { return tracerProviderSdk.OnForceFlush(timeoutMilliseconds); @@ -95,8 +95,8 @@ public static bool ForceFlush(this TracerProvider provider, int timeoutMilliseco /// /// TracerProviderSdk instance on which Shutdown will be called. /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. @@ -115,13 +115,13 @@ public static bool Shutdown(this TracerProvider provider, int timeoutMillisecond throw new ArgumentNullException(nameof(provider)); } - if (provider is TracerProviderSdk tracerProviderSdk) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) - { - throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative."); - } + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds), timeoutMilliseconds, "timeoutMilliseconds should be non-negative or Timeout.Infinite."); + } + if (provider is TracerProviderSdk tracerProviderSdk) + { if (Interlocked.Increment(ref tracerProviderSdk.ShutdownCount) > 1) { return false; // shutdown already called diff --git a/src/OpenTelemetry/Trace/TracerProviderSdk.cs b/src/OpenTelemetry/Trace/TracerProviderSdk.cs index 5ab91ef2425..f384995256b 100644 --- a/src/OpenTelemetry/Trace/TracerProviderSdk.cs +++ b/src/OpenTelemetry/Trace/TracerProviderSdk.cs @@ -265,8 +265,8 @@ internal bool OnForceFlush(int timeoutMilliseconds) /// thread until shutdown completed or timed out. /// /// - /// The number of milliseconds to wait, or Timeout.Infinite to - /// wait indefinitely. + /// The number (non-negative) of milliseconds to wait, or + /// Timeout.Infinite to wait indefinitely. /// /// /// Returns true when shutdown succeeded; otherwise, false. diff --git a/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs b/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs index 9446a1258a5..9d99bc1f681 100644 --- a/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs @@ -101,16 +101,8 @@ public void CompositeActivityProcessor_ForceFlush(int timeout) { processor.ForceFlush(timeout); - if (timeout != 0) - { - Assert.True(p1.ForceFlushCalled); - Assert.True(p2.ForceFlushCalled); - } - else - { - Assert.False(p1.ForceFlushCalled); - Assert.False(p2.ForceFlushCalled); - } + Assert.True(p1.ForceFlushCalled); + Assert.True(p2.ForceFlushCalled); } } }