From fa76024e4592be9470ef8df965f0f1e9433bc798 Mon Sep 17 00:00:00 2001 From: Reiley Yang Date: Thu, 20 Aug 2020 13:01:29 -0700 Subject: [PATCH] remove BatchingActivityProcessor --- .../Trace/BatchingActivityProcessor.cs | 344 ----------- .../Trace/BatchingActivityProcessorTests.cs | 571 ------------------ 2 files changed, 915 deletions(-) delete mode 100644 src/OpenTelemetry/Trace/BatchingActivityProcessor.cs delete mode 100644 test/OpenTelemetry.Tests/Trace/BatchingActivityProcessorTests.cs diff --git a/src/OpenTelemetry/Trace/BatchingActivityProcessor.cs b/src/OpenTelemetry/Trace/BatchingActivityProcessor.cs deleted file mode 100644 index 242feab2f18..00000000000 --- a/src/OpenTelemetry/Trace/BatchingActivityProcessor.cs +++ /dev/null @@ -1,344 +0,0 @@ -// -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; -using OpenTelemetry.Internal; - -namespace OpenTelemetry.Trace -{ - /// - /// Implements processor that batches activities before calling exporter. - /// - public class BatchingActivityProcessor : ActivityProcessor - { - private const int DefaultMaxQueueSize = 2048; - private const int DefaultMaxExportBatchSize = 512; - private static readonly TimeSpan DefaultScheduledDelay = TimeSpan.FromMilliseconds(5000); - private static readonly TimeSpan DefaultExporterTimeout = TimeSpan.FromMilliseconds(30000); - - private readonly ConcurrentQueue exportQueue; - private readonly int maxQueueSize; - private readonly int maxExportBatchSize; - private readonly TimeSpan scheduledDelay; - private readonly TimeSpan exporterTimeout; - private readonly ActivityExporter exporter; - private readonly List batch = new List(); - private readonly SemaphoreSlim flushLock = new SemaphoreSlim(1); - private readonly System.Timers.Timer flushTimer; - private volatile int currentQueueSize; - private bool disposed; - - /// - /// Initializes a new instance of the class with default parameters: - /// - /// - /// maxQueueSize = 2048, - /// - /// - /// scheduledDelay = 5 sec, - /// - /// - /// exporterTimeout = 30 sec, - /// - /// - /// maxExportBatchSize = 512 - /// - /// - /// - /// Exporter instance. - public BatchingActivityProcessor(ActivityExporter exporter) - : this(exporter, DefaultMaxQueueSize, DefaultScheduledDelay, DefaultExporterTimeout, DefaultMaxExportBatchSize) - { - } - - /// - /// Initializes a new instance of the class with custom settings. - /// - /// Exporter instance. - /// Maximum queue size. After the size is reached activities are dropped by processor. - /// The delay between two consecutive exports. - /// Maximum allowed time to export data. - /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize. - public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan scheduledDelay, TimeSpan exporterTimeout, int maxExportBatchSize) - { - if (maxQueueSize <= 0) - { - throw new ArgumentOutOfRangeException(nameof(maxQueueSize)); - } - - if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize) - { - throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize)); - } - - if (scheduledDelay <= TimeSpan.FromMilliseconds(0)) - { - throw new ArgumentOutOfRangeException(nameof(scheduledDelay)); - } - - this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter)); - this.maxQueueSize = maxQueueSize; - this.scheduledDelay = scheduledDelay; - this.exporterTimeout = exporterTimeout; - this.maxExportBatchSize = maxExportBatchSize; - - this.exportQueue = new ConcurrentQueue(); - - this.flushTimer = new System.Timers.Timer - { - AutoReset = false, - Enabled = true, - Interval = this.scheduledDelay.TotalMilliseconds, - }; - - this.flushTimer.Elapsed += async (sender, args) => - { - bool lockTaken = this.flushLock.Wait(0); - try - { - if (!lockTaken) - { - // If the lock was already held, it means a flush is already executing. - return; - } - - await this.FlushAsyncInternal(drain: false, lockAlreadyHeld: true, CancellationToken.None).ConfigureAwait(false); - } - catch (Exception ex) - { - OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(System.Timers.Timer.Elapsed), ex); - } - finally - { - if (lockTaken) - { - this.flushLock.Release(); - } - } - }; - } - - /// - public override void OnEnd(Activity activity) - { - if (activity.Recorded) - { - // because of race-condition between checking the size and enqueueing, - // we might end up with a bit more activities than maxQueueSize. - // Let's just tolerate it to avoid extra synchronization. - if (this.currentQueueSize >= this.maxQueueSize) - { - OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted(); - return; - } - - var size = Interlocked.Increment(ref this.currentQueueSize); - - this.exportQueue.Enqueue(activity); - - if (size >= this.maxExportBatchSize) - { - bool lockTaken = this.flushLock.Wait(0); - if (lockTaken) - { - Task.Run(async () => - { - try - { - await this.FlushAsyncInternal(drain: false, lockAlreadyHeld: true, CancellationToken.None).ConfigureAwait(false); - } - catch (Exception ex) - { - OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnEnd), ex); - } - finally - { - this.flushLock.Release(); - } - }); - return; - } - } - } - } - - /// - /// If the is canceled. - public override async Task ShutdownAsync(CancellationToken cancellationToken) - { - await this.FlushAsyncInternal(drain: true, lockAlreadyHeld: false, cancellationToken).ConfigureAwait(false); - - await this.exporter.ShutdownAsync(cancellationToken).ConfigureAwait(false); - - OpenTelemetrySdkEventSource.Log.ShutdownEvent(this.currentQueueSize); - } - - /// - /// If the is canceled. - public override async Task ForceFlushAsync(CancellationToken cancellationToken) - { - await this.FlushAsyncInternal(drain: true, lockAlreadyHeld: false, cancellationToken).ConfigureAwait(false); - - OpenTelemetrySdkEventSource.Log.ForceFlushCompleted(this.currentQueueSize); - } - - /// - /// Releases the unmanaged resources used by this class and optionally releases the managed resources. - /// - /// to release both managed and unmanaged resources; to release only unmanaged resources. - protected override void Dispose(bool disposing) - { - base.Dispose(disposing); - - if (disposing && !this.disposed) - { - if (this.exporter is IDisposable disposableExporter) - { - try - { - disposableExporter.Dispose(); - } - catch (Exception e) - { - OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), e); - } - } - - this.flushTimer.Dispose(); - this.flushLock.Dispose(); - this.disposed = true; - } - } - - private async Task FlushAsyncInternal(bool drain, bool lockAlreadyHeld, CancellationToken cancellationToken) - { - if (!lockAlreadyHeld) - { - await this.flushLock.WaitAsync(cancellationToken).ConfigureAwait(false); - } - - try - { - this.flushTimer.Enabled = false; - - var queueSize = this.currentQueueSize; - do - { - using var cts = new CancellationTokenSource(this.exporterTimeout); - - var linkedCTS = cancellationToken.CanBeCanceled - ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token) - : null; - - int exportedCount; - using (linkedCTS) - { - try - { - exportedCount = await this.ExportBatchAsync(linkedCTS?.Token ?? cts.Token).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - if (cancellationToken.IsCancellationRequested) - { - // User-supplied cancellation, bubble up the exception. - throw; - } - - OpenTelemetrySdkEventSource.Log.SpanExporterTimeout(Math.Min(queueSize, this.maxExportBatchSize)); - break; - } - } - - if (exportedCount == 0) - { - // Break out of drain loop if nothing is being exported, likely means there is an issue - // and we don't want to deadlock. - break; - } - - queueSize -= exportedCount; - } -#pragma warning disable SA1009 // Closing parenthesis should be spaced correctly -> Spacing is for readability - while ( - !cancellationToken.IsCancellationRequested && - ( - (drain && queueSize > 0) // If draining, keep looping until queue is empty. - || - (!drain && queueSize >= this.maxExportBatchSize) // If not draining, keep looping while there are batches ready. - )); -#pragma warning restore SA1009 // Closing parenthesis should be spaced correctly - } - catch (Exception ex) when (!(ex is OperationCanceledException)) - { - OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.FlushAsyncInternal), ex); - } - finally - { - this.flushTimer.Enabled = true; - - if (!lockAlreadyHeld) - { - this.flushLock.Release(); - } - } - } - - private async Task ExportBatchAsync(CancellationToken cancellationToken) - { - try - { - if (this.exportQueue.TryDequeue(out var nextActivity)) - { - Interlocked.Decrement(ref this.currentQueueSize); - this.batch.Add(nextActivity); - } - else - { - // nothing in queue - return 0; - } - - while (this.batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextActivity)) - { - Interlocked.Decrement(ref this.currentQueueSize); - this.batch.Add(nextActivity); - } - - var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false); - if (result != ExportResult.Success) - { - OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result); - - // we do not support retries for now and leave it up to exporter - // as only exporter implementation knows how to retry: which items failed - // and what is the reasonable policy for that exporter. - } - - return this.batch.Count; - } - finally - { - this.batch.Clear(); - } - } - } -} diff --git a/test/OpenTelemetry.Tests/Trace/BatchingActivityProcessorTests.cs b/test/OpenTelemetry.Tests/Trace/BatchingActivityProcessorTests.cs deleted file mode 100644 index 85f9ba11a57..00000000000 --- a/test/OpenTelemetry.Tests/Trace/BatchingActivityProcessorTests.cs +++ /dev/null @@ -1,571 +0,0 @@ -// -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using OpenTelemetry.Tests; -using Xunit; - -namespace OpenTelemetry.Trace.Tests -{ - public class BatchingActivityProcessorTests : IDisposable - { - private const string ActivityName1 = "MyActivityName/1"; - private const string ActivityName2 = "MyActivityName/2"; - private const string ActivitySourceName = "my.source"; - - private static readonly TimeSpan DefaultDelay = TimeSpan.FromMilliseconds(30); - private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(2); - private static readonly ActivitySource Source = new ActivitySource(ActivitySourceName); - - [Fact] - public void ThrowsOnInvalidArguments() - { - Assert.Throws(() => new BatchingActivityProcessor(null)); - Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 0, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 0)); - Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 2048, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 0)); - Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 512, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 513)); - } - - [Fact] - public async Task ShutdownTwice() - { - using var activityProcessor = new BatchingActivityProcessor(new TestActivityExporter(null)); - await activityProcessor.ShutdownAsync(CancellationToken.None); - - // does not throw - await activityProcessor.ShutdownAsync(CancellationToken.None); - } - - [Fact] - public void DisposeTwice() - { - using var activityProcessor = new BatchingActivityProcessor(new TestActivityExporter(null)); - - activityProcessor.Dispose(); - - // does not throw - activityProcessor.Dispose(); - } - - [Fact] - public async Task ShutdownWithHugeScheduledDelay() - { - using var activityProcessor = - new BatchingActivityProcessor(new TestActivityExporter(null), 128, TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(100), 32); - var sw = Stopwatch.StartNew(); - using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100))) - { - cts.Token.ThrowIfCancellationRequested(); - await activityProcessor.ShutdownAsync(cts.Token).ConfigureAwait(false); - } - - sw.Stop(); - Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); - } - - [Fact] - public void CancelWithExporterTimeoutMilliseconds() - { - using var inMemoryEventListener = new InMemoryEventListener(); - var activityExporter = new TestActivityExporter(null, sleepMilliseconds: 5000); - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(0), 1); - using (var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build()) - { - var activity1 = this.CreateActivity(ActivityName1); - } // Force everything to flush out of the processor. - - Assert.Contains(inMemoryEventListener.Events, (e) => e.EventId == 23); - } - - [Fact] - public void ProcessorDoesNotSendRecordDecisionSpanToExporter() - { - var testSampler = new TestSampler(); - testSampler.SamplingAction = (samplingParameters) => - { - return new SamplingResult(SamplingDecision.Record); - }; - - int exportCalledCount = 0; - var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 1); - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .AddProcessor(activityProcessor) - .SetSampler(testSampler) - .Build(); - - var activity1 = this.CreateNotSampledEndedActivity(ActivityName2); - - Assert.True(activity1.IsAllDataRequested); - Assert.Equal(ActivityTraceFlags.None, activity1.ActivityTraceFlags); - Assert.False(activity1.Recorded); - - var exported = this.WaitForActivities(activityExporter, 0, DefaultTimeout); - Assert.Equal(0, exportCalledCount); - - Assert.Empty(exported); - } - - [Fact] - public void ProcessorSendsRecordAndSampledDecisionSpanToExporter() - { - var testSampler = new TestSampler(); - testSampler.SamplingAction = (samplingParameters) => - { - return new SamplingResult(SamplingDecision.RecordAndSampled); - }; - - int exportCalledCount = 0; - var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 1); - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .AddProcessor(activityProcessor) - .SetSampler(testSampler) - .Build(); - - var activity1 = this.CreateSampledEndedActivity(ActivityName1); - var activity2 = this.CreateNotSampledEndedActivity(ActivityName2); - - Assert.True(activity1.IsAllDataRequested); - Assert.Equal(ActivityTraceFlags.Recorded, activity1.ActivityTraceFlags); - Assert.True(activity1.Recorded); - - Assert.True(activity2.IsAllDataRequested); - Assert.Equal(ActivityTraceFlags.Recorded, activity2.ActivityTraceFlags); - Assert.True(activity2.Recorded); - - var exported = this.WaitForActivities(activityExporter, 2, DefaultTimeout); - Assert.Equal(2, exportCalledCount); - Assert.Equal(2, exported.Length); - } - - [Fact] - public void ExportDifferentSampledActivities() - { - var activityExporter = new TestActivityExporter(null); - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 128); - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build(); - - var activity1 = this.CreateActivity(ActivityName1); - var activity2 = this.CreateActivity(ActivityName2); - - var exported = this.WaitForActivities(activityExporter, 2, DefaultTimeout); - - Assert.Equal(2, exported.Length); - Assert.Contains(activity1, exported); - Assert.Contains(activity2, exported); - } - - [Fact] - public void ExporterIsSlowerThanDelay() - { - var exportStartTimes = new List(); - var exportEndTimes = new List(); - var activityExporter = new TestActivityExporter(_ => - { - exportStartTimes.Add(Stopwatch.GetTimestamp()); - Thread.Sleep(50); - exportEndTimes.Add(Stopwatch.GetTimestamp()); - }); - - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(30), DefaultTimeout, 10); - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build(); - - var activities = new List(); - for (int i = 0; i < 20; i++) - { - activities.Add(this.CreateActivity(i.ToString())); - } - - var exported = this.WaitForActivities(activityExporter, 20, TimeSpan.FromSeconds(2)); - - Assert.Equal(activities.Count, exported.Length); - Assert.InRange(exportStartTimes.Count, 2, 20); - - for (int i = 1; i < exportStartTimes.Count - 1; i++) - { - Assert.InRange(exportStartTimes[i], exportEndTimes[i - 1] + 1, exportStartTimes[i + 1] - 1); - } - } - - [Fact] - public void AddActivityAfterQueueIsExhausted() - { - int exportCalledCount = 0; - var activityExporter = new TestActivityExporter(_ => - { - Interlocked.Increment(ref exportCalledCount); - Thread.Sleep(50); - }); - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 1, TimeSpan.FromMilliseconds(100), DefaultTimeout, 1); - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build(); - - var activities = new List(); - for (int i = 0; i < 20; i++) - { - activities.Add(this.CreateActivity(i.ToString())); - } - - var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout); - - Assert.Equal(1, exportCalledCount); - Assert.Single(exported); - Assert.Contains(activities.First(), exported); - } - - [Fact] - public void ExportMoreActivitiesThanTheMaxBatchSize() - { - var exporterCalled = new ManualResetEvent(false); - int exportCalledCount = 0; - var activityExporter = new TestActivityExporter(_ => - { - exporterCalled.Set(); - Interlocked.Increment(ref exportCalledCount); - }); - - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 3); - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build(); - - var activity1 = this.CreateActivity(ActivityName1); - var activity2 = this.CreateActivity(ActivityName1); - var activity3 = this.CreateActivity(ActivityName1); - var activity4 = this.CreateActivity(ActivityName1); - var activity5 = this.CreateActivity(ActivityName1); - var activity6 = this.CreateActivity(ActivityName1); - - // wait for exporter to be called to stabilize tests on the build server - exporterCalled.WaitOne(TimeSpan.FromSeconds(10)); - - var exported = this.WaitForActivities(activityExporter, 6, DefaultTimeout); - - Assert.InRange(exportCalledCount, 2, 6); - - Assert.Equal(6, exported.Count()); - Assert.Contains(activity1, exported); - Assert.Contains(activity2, exported); - Assert.Contains(activity3, exported); - Assert.Contains(activity4, exported); - Assert.Contains(activity5, exported); - Assert.Contains(activity6, exported); - } - - [Fact] - public void ExportNotSampledActivities() - { - int exportCalledCount = 0; - var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 1); - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new ParentOrElseSampler(new AlwaysOffSampler())) - .AddProcessor(activityProcessor) - .Build(); - - var activity1 = this.CreateSampledEndedActivity(ActivityName1); - var activity2 = this.CreateNotSampledEndedActivity(ActivityName2); - - // Activities are recorded and exported in the same order as they are created, we test that a non - // sampled activity is not exported by creating and ending a sampled activity after a non sampled activity - // and checking that the first exported activity is the sampled activity (the non sampled did not get - // exported). - var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout); - Assert.Equal(1, exportCalledCount); - - // Need to check this because otherwise the variable activity1 is unused, other option is to not - // have a activity1 variable. - Assert.Single(exported); - Assert.Contains(activity1, exported); - } - - [Fact] - public void ProcessorDoesNotBlockOnExporter() - { - var resetEvent = new ManualResetEvent(false); - var activityExporter = new TestActivityExporter(_ => resetEvent.WaitOne(TimeSpan.FromSeconds(10))); - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 128); - - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build(); - - var activity = Source.StartActivity("foo"); - - // does not block - var sw = Stopwatch.StartNew(); - activity?.Stop(); - sw.Stop(); - - Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); - - resetEvent.Set(); - - openTelemetrySdk.Dispose(); - - var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout); - - Assert.Single(exported); - } - - [Fact] - public async Task ShutdownOnNotEmptyQueueFullFlush() - { - const int batchSize = 75; - int exportCalledCount = 0; - var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); - using var activityProcessor = - new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, batchSize); - using (var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build()) - { - using var inMemoryEventListener = new InMemoryEventListener(); - var activities = new List(); - for (int i = 0; i < 100; i++) - { - activities.Add(this.CreateActivity(i.ToString())); - } - - Assert.True(activityExporter.ExportedActivities.Length < activities.Count); - using (var cts = new CancellationTokenSource(DefaultTimeout)) - { - await activityProcessor.ShutdownAsync(cts.Token); - } - - // Get the shutdown event. - // 2 is the EventId for OpenTelemetrySdkEventSource.ShutdownEvent - // TODO: Expose event ids as internal, so tests can access them more reliably. - var shutdownEvent = inMemoryEventListener.Events.Where((e) => e.EventId == 2).First(); - - int droppedCount = 0; - if (shutdownEvent != null) - { - // There is a single payload which is the number of items left in buffer at shutdown. - droppedCount = (int)shutdownEvent.Payload[0]; - } - - Assert.True(activityExporter.WasShutDown); - Assert.Equal(activities.Count, droppedCount + activityExporter.ExportedActivities.Length); - Assert.InRange(exportCalledCount, activities.Count / batchSize, activities.Count); - } - } - - [Fact] - public async Task ShutdownOnNotEmptyQueueNotFullFlush() - { - const int batchSize = 25; - int exportCalledCount = 0; - - var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount), 30000); - - using var activityProcessor = - new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, batchSize); - using (var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build()) - { - var activities = new List(); - for (int i = 0; i < 100; i++) - { - activities.Add(this.CreateActivity(i.ToString())); - } - - Assert.True(activityExporter.ExportedActivities.Length < activities.Count); - - // we won't be able to export all before cancellation will fire - using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200))) - { - bool canceled; - try - { - await activityProcessor.ShutdownAsync(cts.Token); - canceled = false; - } - catch (OperationCanceledException) - { - canceled = true; - } - - Assert.True(canceled); - } - - var exportedCount = activityExporter.ExportedActivities.Length; - Assert.True(exportedCount < activities.Count); - } - } - - [Fact] - public async Task ForceFlushExportsAllData() - { - const int batchSize = 75; - int exportCalledCount = 0; - var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount)); - using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, batchSize); - using (var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(activityProcessor) - .Build()) - { - using var inMemoryEventListener = new InMemoryEventListener(); - var activities = new List(); - for (int i = 0; i < 100; i++) - { - activities.Add(this.CreateActivity(i.ToString())); - } - - Assert.True(activityExporter.ExportedActivities.Length < activities.Count); - using (var cts = new CancellationTokenSource(DefaultTimeout)) - { - await activityProcessor.ForceFlushAsync(cts.Token); - } - - // Get the shutdown event. - // 22 is the EventId for OpenTelemetrySdkEventSource.ForceFlushCompleted - // TODO: Expose event ids as internal, so tests can access them more reliably. - var flushEvent = inMemoryEventListener.Events.Where((e) => e.EventId == 22).First(); - - int droppedCount = 0; - if (flushEvent != null) - { - // There is a single payload which is the number of items left in buffer at shutdown. - droppedCount = (int)flushEvent.Payload[0]; - } - - Assert.Equal(activities.Count, activityExporter.ExportedActivities.Length + droppedCount); - Assert.InRange(exportCalledCount, activities.Count / batchSize, activities.Count); - } - } - - [Fact] - public void DisposeFlushes() - { - const int batchSize = 1; - int exportCalledCount = 0; - var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount), 100); - var activities = new List(); - using var inMemoryEventListener = new InMemoryEventListener(); - using (var batchingActivityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, batchSize)) - { - using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder() - .AddSource(ActivitySourceName) - .SetSampler(new AlwaysOnSampler()) - .AddProcessor(batchingActivityProcessor) - .Build(); - for (int i = 0; i < 3; i++) - { - activities.Add(this.CreateActivity(i.ToString())); - } - - Assert.True(activityExporter.ExportedActivities.Length < activities.Count); - } - - // Get the shutdown event. - // 2 is the EventId for OpenTelemetrySdkEventSource.ShutdownEvent - // TODO: Expose event ids as internal, so tests can access them more reliably. - var shutdownEvent = inMemoryEventListener.Events.Where((e) => e.EventId == 2).First(); - - int droppedCount = 0; - if (shutdownEvent != null) - { - // There is a single payload which is the number of items left in buffer at shutdown. - droppedCount = (int)shutdownEvent.Payload[0]; - } - - Assert.True(activityExporter.WasShutDown); - Assert.Equal(activities.Count, activityExporter.ExportedActivities.Length + droppedCount); - Assert.Equal(activities.Count / batchSize, exportCalledCount); - } - - public void Dispose() - { - Activity.Current = null; - } - - private Activity CreateActivity(string activityName) - { - var activity = Source.StartActivity(activityName); - activity?.Stop(); - return activity; - } - - private Activity CreateSampledEndedActivity(string activityName) - { - var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.Recorded); - - var activity = Source.StartActivity(activityName, ActivityKind.Internal, context); - activity.Stop(); - return activity; - } - - private Activity CreateNotSampledEndedActivity(string activityName) - { - var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.None); - - var activity = Source.StartActivity(activityName, ActivityKind.Server, context); - activity?.Stop(); - return activity; - } - - private Activity[] WaitForActivities(TestActivityExporter exporter, int activityCount, TimeSpan timeout) - { - var sw = Stopwatch.StartNew(); - while (exporter.ExportedActivities.Length < activityCount && sw.Elapsed <= timeout) - { - Thread.Sleep(10); - } - - Assert.True( - exporter.ExportedActivities.Length >= activityCount, - $"Expected at least {activityCount}, got {exporter.ExportedActivities.Length}"); - - return exporter.ExportedActivities; - } - } -}