diff --git a/src/OpenTelemetry/Trace/BatchingActivityProcessor.cs b/src/OpenTelemetry/Trace/BatchingActivityProcessor.cs
deleted file mode 100644
index 242feab2f18..00000000000
--- a/src/OpenTelemetry/Trace/BatchingActivityProcessor.cs
+++ /dev/null
@@ -1,344 +0,0 @@
-//
-// Copyright The OpenTelemetry Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Threading;
-using System.Threading.Tasks;
-using OpenTelemetry.Internal;
-
-namespace OpenTelemetry.Trace
-{
- ///
- /// Implements processor that batches activities before calling exporter.
- ///
- public class BatchingActivityProcessor : ActivityProcessor
- {
- private const int DefaultMaxQueueSize = 2048;
- private const int DefaultMaxExportBatchSize = 512;
- private static readonly TimeSpan DefaultScheduledDelay = TimeSpan.FromMilliseconds(5000);
- private static readonly TimeSpan DefaultExporterTimeout = TimeSpan.FromMilliseconds(30000);
-
- private readonly ConcurrentQueue exportQueue;
- private readonly int maxQueueSize;
- private readonly int maxExportBatchSize;
- private readonly TimeSpan scheduledDelay;
- private readonly TimeSpan exporterTimeout;
- private readonly ActivityExporter exporter;
- private readonly List batch = new List();
- private readonly SemaphoreSlim flushLock = new SemaphoreSlim(1);
- private readonly System.Timers.Timer flushTimer;
- private volatile int currentQueueSize;
- private bool disposed;
-
- ///
- /// Initializes a new instance of the class with default parameters:
- ///
- /// -
- /// maxQueueSize = 2048,
- ///
- /// -
- /// scheduledDelay = 5 sec,
- ///
- /// -
- /// exporterTimeout = 30 sec,
- ///
- /// -
- /// maxExportBatchSize = 512
- ///
- ///
- ///
- /// Exporter instance.
- public BatchingActivityProcessor(ActivityExporter exporter)
- : this(exporter, DefaultMaxQueueSize, DefaultScheduledDelay, DefaultExporterTimeout, DefaultMaxExportBatchSize)
- {
- }
-
- ///
- /// Initializes a new instance of the class with custom settings.
- ///
- /// Exporter instance.
- /// Maximum queue size. After the size is reached activities are dropped by processor.
- /// The delay between two consecutive exports.
- /// Maximum allowed time to export data.
- /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize.
- public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan scheduledDelay, TimeSpan exporterTimeout, int maxExportBatchSize)
- {
- if (maxQueueSize <= 0)
- {
- throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
- }
-
- if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize)
- {
- throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize));
- }
-
- if (scheduledDelay <= TimeSpan.FromMilliseconds(0))
- {
- throw new ArgumentOutOfRangeException(nameof(scheduledDelay));
- }
-
- this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
- this.maxQueueSize = maxQueueSize;
- this.scheduledDelay = scheduledDelay;
- this.exporterTimeout = exporterTimeout;
- this.maxExportBatchSize = maxExportBatchSize;
-
- this.exportQueue = new ConcurrentQueue();
-
- this.flushTimer = new System.Timers.Timer
- {
- AutoReset = false,
- Enabled = true,
- Interval = this.scheduledDelay.TotalMilliseconds,
- };
-
- this.flushTimer.Elapsed += async (sender, args) =>
- {
- bool lockTaken = this.flushLock.Wait(0);
- try
- {
- if (!lockTaken)
- {
- // If the lock was already held, it means a flush is already executing.
- return;
- }
-
- await this.FlushAsyncInternal(drain: false, lockAlreadyHeld: true, CancellationToken.None).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(System.Timers.Timer.Elapsed), ex);
- }
- finally
- {
- if (lockTaken)
- {
- this.flushLock.Release();
- }
- }
- };
- }
-
- ///
- public override void OnEnd(Activity activity)
- {
- if (activity.Recorded)
- {
- // because of race-condition between checking the size and enqueueing,
- // we might end up with a bit more activities than maxQueueSize.
- // Let's just tolerate it to avoid extra synchronization.
- if (this.currentQueueSize >= this.maxQueueSize)
- {
- OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted();
- return;
- }
-
- var size = Interlocked.Increment(ref this.currentQueueSize);
-
- this.exportQueue.Enqueue(activity);
-
- if (size >= this.maxExportBatchSize)
- {
- bool lockTaken = this.flushLock.Wait(0);
- if (lockTaken)
- {
- Task.Run(async () =>
- {
- try
- {
- await this.FlushAsyncInternal(drain: false, lockAlreadyHeld: true, CancellationToken.None).ConfigureAwait(false);
- }
- catch (Exception ex)
- {
- OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnEnd), ex);
- }
- finally
- {
- this.flushLock.Release();
- }
- });
- return;
- }
- }
- }
- }
-
- ///
- /// If the is canceled.
- public override async Task ShutdownAsync(CancellationToken cancellationToken)
- {
- await this.FlushAsyncInternal(drain: true, lockAlreadyHeld: false, cancellationToken).ConfigureAwait(false);
-
- await this.exporter.ShutdownAsync(cancellationToken).ConfigureAwait(false);
-
- OpenTelemetrySdkEventSource.Log.ShutdownEvent(this.currentQueueSize);
- }
-
- ///
- /// If the is canceled.
- public override async Task ForceFlushAsync(CancellationToken cancellationToken)
- {
- await this.FlushAsyncInternal(drain: true, lockAlreadyHeld: false, cancellationToken).ConfigureAwait(false);
-
- OpenTelemetrySdkEventSource.Log.ForceFlushCompleted(this.currentQueueSize);
- }
-
- ///
- /// Releases the unmanaged resources used by this class and optionally releases the managed resources.
- ///
- /// to release both managed and unmanaged resources; to release only unmanaged resources.
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
-
- if (disposing && !this.disposed)
- {
- if (this.exporter is IDisposable disposableExporter)
- {
- try
- {
- disposableExporter.Dispose();
- }
- catch (Exception e)
- {
- OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), e);
- }
- }
-
- this.flushTimer.Dispose();
- this.flushLock.Dispose();
- this.disposed = true;
- }
- }
-
- private async Task FlushAsyncInternal(bool drain, bool lockAlreadyHeld, CancellationToken cancellationToken)
- {
- if (!lockAlreadyHeld)
- {
- await this.flushLock.WaitAsync(cancellationToken).ConfigureAwait(false);
- }
-
- try
- {
- this.flushTimer.Enabled = false;
-
- var queueSize = this.currentQueueSize;
- do
- {
- using var cts = new CancellationTokenSource(this.exporterTimeout);
-
- var linkedCTS = cancellationToken.CanBeCanceled
- ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token)
- : null;
-
- int exportedCount;
- using (linkedCTS)
- {
- try
- {
- exportedCount = await this.ExportBatchAsync(linkedCTS?.Token ?? cts.Token).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- if (cancellationToken.IsCancellationRequested)
- {
- // User-supplied cancellation, bubble up the exception.
- throw;
- }
-
- OpenTelemetrySdkEventSource.Log.SpanExporterTimeout(Math.Min(queueSize, this.maxExportBatchSize));
- break;
- }
- }
-
- if (exportedCount == 0)
- {
- // Break out of drain loop if nothing is being exported, likely means there is an issue
- // and we don't want to deadlock.
- break;
- }
-
- queueSize -= exportedCount;
- }
-#pragma warning disable SA1009 // Closing parenthesis should be spaced correctly -> Spacing is for readability
- while (
- !cancellationToken.IsCancellationRequested &&
- (
- (drain && queueSize > 0) // If draining, keep looping until queue is empty.
- ||
- (!drain && queueSize >= this.maxExportBatchSize) // If not draining, keep looping while there are batches ready.
- ));
-#pragma warning restore SA1009 // Closing parenthesis should be spaced correctly
- }
- catch (Exception ex) when (!(ex is OperationCanceledException))
- {
- OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.FlushAsyncInternal), ex);
- }
- finally
- {
- this.flushTimer.Enabled = true;
-
- if (!lockAlreadyHeld)
- {
- this.flushLock.Release();
- }
- }
- }
-
- private async Task ExportBatchAsync(CancellationToken cancellationToken)
- {
- try
- {
- if (this.exportQueue.TryDequeue(out var nextActivity))
- {
- Interlocked.Decrement(ref this.currentQueueSize);
- this.batch.Add(nextActivity);
- }
- else
- {
- // nothing in queue
- return 0;
- }
-
- while (this.batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextActivity))
- {
- Interlocked.Decrement(ref this.currentQueueSize);
- this.batch.Add(nextActivity);
- }
-
- var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false);
- if (result != ExportResult.Success)
- {
- OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result);
-
- // we do not support retries for now and leave it up to exporter
- // as only exporter implementation knows how to retry: which items failed
- // and what is the reasonable policy for that exporter.
- }
-
- return this.batch.Count;
- }
- finally
- {
- this.batch.Clear();
- }
- }
- }
-}
diff --git a/test/OpenTelemetry.Tests/Trace/BatchingActivityProcessorTests.cs b/test/OpenTelemetry.Tests/Trace/BatchingActivityProcessorTests.cs
deleted file mode 100644
index 85f9ba11a57..00000000000
--- a/test/OpenTelemetry.Tests/Trace/BatchingActivityProcessorTests.cs
+++ /dev/null
@@ -1,571 +0,0 @@
-//
-// Copyright The OpenTelemetry Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-using OpenTelemetry.Tests;
-using Xunit;
-
-namespace OpenTelemetry.Trace.Tests
-{
- public class BatchingActivityProcessorTests : IDisposable
- {
- private const string ActivityName1 = "MyActivityName/1";
- private const string ActivityName2 = "MyActivityName/2";
- private const string ActivitySourceName = "my.source";
-
- private static readonly TimeSpan DefaultDelay = TimeSpan.FromMilliseconds(30);
- private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(2);
- private static readonly ActivitySource Source = new ActivitySource(ActivitySourceName);
-
- [Fact]
- public void ThrowsOnInvalidArguments()
- {
- Assert.Throws(() => new BatchingActivityProcessor(null));
- Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 0, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 0));
- Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 2048, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 0));
- Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 512, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 513));
- }
-
- [Fact]
- public async Task ShutdownTwice()
- {
- using var activityProcessor = new BatchingActivityProcessor(new TestActivityExporter(null));
- await activityProcessor.ShutdownAsync(CancellationToken.None);
-
- // does not throw
- await activityProcessor.ShutdownAsync(CancellationToken.None);
- }
-
- [Fact]
- public void DisposeTwice()
- {
- using var activityProcessor = new BatchingActivityProcessor(new TestActivityExporter(null));
-
- activityProcessor.Dispose();
-
- // does not throw
- activityProcessor.Dispose();
- }
-
- [Fact]
- public async Task ShutdownWithHugeScheduledDelay()
- {
- using var activityProcessor =
- new BatchingActivityProcessor(new TestActivityExporter(null), 128, TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(100), 32);
- var sw = Stopwatch.StartNew();
- using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)))
- {
- cts.Token.ThrowIfCancellationRequested();
- await activityProcessor.ShutdownAsync(cts.Token).ConfigureAwait(false);
- }
-
- sw.Stop();
- Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100));
- }
-
- [Fact]
- public void CancelWithExporterTimeoutMilliseconds()
- {
- using var inMemoryEventListener = new InMemoryEventListener();
- var activityExporter = new TestActivityExporter(null, sleepMilliseconds: 5000);
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(0), 1);
- using (var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build())
- {
- var activity1 = this.CreateActivity(ActivityName1);
- } // Force everything to flush out of the processor.
-
- Assert.Contains(inMemoryEventListener.Events, (e) => e.EventId == 23);
- }
-
- [Fact]
- public void ProcessorDoesNotSendRecordDecisionSpanToExporter()
- {
- var testSampler = new TestSampler();
- testSampler.SamplingAction = (samplingParameters) =>
- {
- return new SamplingResult(SamplingDecision.Record);
- };
-
- int exportCalledCount = 0;
- var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 1);
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .AddProcessor(activityProcessor)
- .SetSampler(testSampler)
- .Build();
-
- var activity1 = this.CreateNotSampledEndedActivity(ActivityName2);
-
- Assert.True(activity1.IsAllDataRequested);
- Assert.Equal(ActivityTraceFlags.None, activity1.ActivityTraceFlags);
- Assert.False(activity1.Recorded);
-
- var exported = this.WaitForActivities(activityExporter, 0, DefaultTimeout);
- Assert.Equal(0, exportCalledCount);
-
- Assert.Empty(exported);
- }
-
- [Fact]
- public void ProcessorSendsRecordAndSampledDecisionSpanToExporter()
- {
- var testSampler = new TestSampler();
- testSampler.SamplingAction = (samplingParameters) =>
- {
- return new SamplingResult(SamplingDecision.RecordAndSampled);
- };
-
- int exportCalledCount = 0;
- var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 1);
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .AddProcessor(activityProcessor)
- .SetSampler(testSampler)
- .Build();
-
- var activity1 = this.CreateSampledEndedActivity(ActivityName1);
- var activity2 = this.CreateNotSampledEndedActivity(ActivityName2);
-
- Assert.True(activity1.IsAllDataRequested);
- Assert.Equal(ActivityTraceFlags.Recorded, activity1.ActivityTraceFlags);
- Assert.True(activity1.Recorded);
-
- Assert.True(activity2.IsAllDataRequested);
- Assert.Equal(ActivityTraceFlags.Recorded, activity2.ActivityTraceFlags);
- Assert.True(activity2.Recorded);
-
- var exported = this.WaitForActivities(activityExporter, 2, DefaultTimeout);
- Assert.Equal(2, exportCalledCount);
- Assert.Equal(2, exported.Length);
- }
-
- [Fact]
- public void ExportDifferentSampledActivities()
- {
- var activityExporter = new TestActivityExporter(null);
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 128);
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build();
-
- var activity1 = this.CreateActivity(ActivityName1);
- var activity2 = this.CreateActivity(ActivityName2);
-
- var exported = this.WaitForActivities(activityExporter, 2, DefaultTimeout);
-
- Assert.Equal(2, exported.Length);
- Assert.Contains(activity1, exported);
- Assert.Contains(activity2, exported);
- }
-
- [Fact]
- public void ExporterIsSlowerThanDelay()
- {
- var exportStartTimes = new List();
- var exportEndTimes = new List();
- var activityExporter = new TestActivityExporter(_ =>
- {
- exportStartTimes.Add(Stopwatch.GetTimestamp());
- Thread.Sleep(50);
- exportEndTimes.Add(Stopwatch.GetTimestamp());
- });
-
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(30), DefaultTimeout, 10);
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build();
-
- var activities = new List();
- for (int i = 0; i < 20; i++)
- {
- activities.Add(this.CreateActivity(i.ToString()));
- }
-
- var exported = this.WaitForActivities(activityExporter, 20, TimeSpan.FromSeconds(2));
-
- Assert.Equal(activities.Count, exported.Length);
- Assert.InRange(exportStartTimes.Count, 2, 20);
-
- for (int i = 1; i < exportStartTimes.Count - 1; i++)
- {
- Assert.InRange(exportStartTimes[i], exportEndTimes[i - 1] + 1, exportStartTimes[i + 1] - 1);
- }
- }
-
- [Fact]
- public void AddActivityAfterQueueIsExhausted()
- {
- int exportCalledCount = 0;
- var activityExporter = new TestActivityExporter(_ =>
- {
- Interlocked.Increment(ref exportCalledCount);
- Thread.Sleep(50);
- });
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 1, TimeSpan.FromMilliseconds(100), DefaultTimeout, 1);
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build();
-
- var activities = new List();
- for (int i = 0; i < 20; i++)
- {
- activities.Add(this.CreateActivity(i.ToString()));
- }
-
- var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout);
-
- Assert.Equal(1, exportCalledCount);
- Assert.Single(exported);
- Assert.Contains(activities.First(), exported);
- }
-
- [Fact]
- public void ExportMoreActivitiesThanTheMaxBatchSize()
- {
- var exporterCalled = new ManualResetEvent(false);
- int exportCalledCount = 0;
- var activityExporter = new TestActivityExporter(_ =>
- {
- exporterCalled.Set();
- Interlocked.Increment(ref exportCalledCount);
- });
-
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 3);
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build();
-
- var activity1 = this.CreateActivity(ActivityName1);
- var activity2 = this.CreateActivity(ActivityName1);
- var activity3 = this.CreateActivity(ActivityName1);
- var activity4 = this.CreateActivity(ActivityName1);
- var activity5 = this.CreateActivity(ActivityName1);
- var activity6 = this.CreateActivity(ActivityName1);
-
- // wait for exporter to be called to stabilize tests on the build server
- exporterCalled.WaitOne(TimeSpan.FromSeconds(10));
-
- var exported = this.WaitForActivities(activityExporter, 6, DefaultTimeout);
-
- Assert.InRange(exportCalledCount, 2, 6);
-
- Assert.Equal(6, exported.Count());
- Assert.Contains(activity1, exported);
- Assert.Contains(activity2, exported);
- Assert.Contains(activity3, exported);
- Assert.Contains(activity4, exported);
- Assert.Contains(activity5, exported);
- Assert.Contains(activity6, exported);
- }
-
- [Fact]
- public void ExportNotSampledActivities()
- {
- int exportCalledCount = 0;
- var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 1);
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new ParentOrElseSampler(new AlwaysOffSampler()))
- .AddProcessor(activityProcessor)
- .Build();
-
- var activity1 = this.CreateSampledEndedActivity(ActivityName1);
- var activity2 = this.CreateNotSampledEndedActivity(ActivityName2);
-
- // Activities are recorded and exported in the same order as they are created, we test that a non
- // sampled activity is not exported by creating and ending a sampled activity after a non sampled activity
- // and checking that the first exported activity is the sampled activity (the non sampled did not get
- // exported).
- var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout);
- Assert.Equal(1, exportCalledCount);
-
- // Need to check this because otherwise the variable activity1 is unused, other option is to not
- // have a activity1 variable.
- Assert.Single(exported);
- Assert.Contains(activity1, exported);
- }
-
- [Fact]
- public void ProcessorDoesNotBlockOnExporter()
- {
- var resetEvent = new ManualResetEvent(false);
- var activityExporter = new TestActivityExporter(_ => resetEvent.WaitOne(TimeSpan.FromSeconds(10)));
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 128);
-
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build();
-
- var activity = Source.StartActivity("foo");
-
- // does not block
- var sw = Stopwatch.StartNew();
- activity?.Stop();
- sw.Stop();
-
- Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100));
-
- resetEvent.Set();
-
- openTelemetrySdk.Dispose();
-
- var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout);
-
- Assert.Single(exported);
- }
-
- [Fact]
- public async Task ShutdownOnNotEmptyQueueFullFlush()
- {
- const int batchSize = 75;
- int exportCalledCount = 0;
- var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
- using var activityProcessor =
- new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, batchSize);
- using (var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build())
- {
- using var inMemoryEventListener = new InMemoryEventListener();
- var activities = new List();
- for (int i = 0; i < 100; i++)
- {
- activities.Add(this.CreateActivity(i.ToString()));
- }
-
- Assert.True(activityExporter.ExportedActivities.Length < activities.Count);
- using (var cts = new CancellationTokenSource(DefaultTimeout))
- {
- await activityProcessor.ShutdownAsync(cts.Token);
- }
-
- // Get the shutdown event.
- // 2 is the EventId for OpenTelemetrySdkEventSource.ShutdownEvent
- // TODO: Expose event ids as internal, so tests can access them more reliably.
- var shutdownEvent = inMemoryEventListener.Events.Where((e) => e.EventId == 2).First();
-
- int droppedCount = 0;
- if (shutdownEvent != null)
- {
- // There is a single payload which is the number of items left in buffer at shutdown.
- droppedCount = (int)shutdownEvent.Payload[0];
- }
-
- Assert.True(activityExporter.WasShutDown);
- Assert.Equal(activities.Count, droppedCount + activityExporter.ExportedActivities.Length);
- Assert.InRange(exportCalledCount, activities.Count / batchSize, activities.Count);
- }
- }
-
- [Fact]
- public async Task ShutdownOnNotEmptyQueueNotFullFlush()
- {
- const int batchSize = 25;
- int exportCalledCount = 0;
-
- var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount), 30000);
-
- using var activityProcessor =
- new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, batchSize);
- using (var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build())
- {
- var activities = new List();
- for (int i = 0; i < 100; i++)
- {
- activities.Add(this.CreateActivity(i.ToString()));
- }
-
- Assert.True(activityExporter.ExportedActivities.Length < activities.Count);
-
- // we won't be able to export all before cancellation will fire
- using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)))
- {
- bool canceled;
- try
- {
- await activityProcessor.ShutdownAsync(cts.Token);
- canceled = false;
- }
- catch (OperationCanceledException)
- {
- canceled = true;
- }
-
- Assert.True(canceled);
- }
-
- var exportedCount = activityExporter.ExportedActivities.Length;
- Assert.True(exportedCount < activities.Count);
- }
- }
-
- [Fact]
- public async Task ForceFlushExportsAllData()
- {
- const int batchSize = 75;
- int exportCalledCount = 0;
- var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
- using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, batchSize);
- using (var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(activityProcessor)
- .Build())
- {
- using var inMemoryEventListener = new InMemoryEventListener();
- var activities = new List();
- for (int i = 0; i < 100; i++)
- {
- activities.Add(this.CreateActivity(i.ToString()));
- }
-
- Assert.True(activityExporter.ExportedActivities.Length < activities.Count);
- using (var cts = new CancellationTokenSource(DefaultTimeout))
- {
- await activityProcessor.ForceFlushAsync(cts.Token);
- }
-
- // Get the shutdown event.
- // 22 is the EventId for OpenTelemetrySdkEventSource.ForceFlushCompleted
- // TODO: Expose event ids as internal, so tests can access them more reliably.
- var flushEvent = inMemoryEventListener.Events.Where((e) => e.EventId == 22).First();
-
- int droppedCount = 0;
- if (flushEvent != null)
- {
- // There is a single payload which is the number of items left in buffer at shutdown.
- droppedCount = (int)flushEvent.Payload[0];
- }
-
- Assert.Equal(activities.Count, activityExporter.ExportedActivities.Length + droppedCount);
- Assert.InRange(exportCalledCount, activities.Count / batchSize, activities.Count);
- }
- }
-
- [Fact]
- public void DisposeFlushes()
- {
- const int batchSize = 1;
- int exportCalledCount = 0;
- var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount), 100);
- var activities = new List();
- using var inMemoryEventListener = new InMemoryEventListener();
- using (var batchingActivityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, batchSize))
- {
- using var openTelemetrySdk = Sdk.CreateTracerProviderBuilder()
- .AddSource(ActivitySourceName)
- .SetSampler(new AlwaysOnSampler())
- .AddProcessor(batchingActivityProcessor)
- .Build();
- for (int i = 0; i < 3; i++)
- {
- activities.Add(this.CreateActivity(i.ToString()));
- }
-
- Assert.True(activityExporter.ExportedActivities.Length < activities.Count);
- }
-
- // Get the shutdown event.
- // 2 is the EventId for OpenTelemetrySdkEventSource.ShutdownEvent
- // TODO: Expose event ids as internal, so tests can access them more reliably.
- var shutdownEvent = inMemoryEventListener.Events.Where((e) => e.EventId == 2).First();
-
- int droppedCount = 0;
- if (shutdownEvent != null)
- {
- // There is a single payload which is the number of items left in buffer at shutdown.
- droppedCount = (int)shutdownEvent.Payload[0];
- }
-
- Assert.True(activityExporter.WasShutDown);
- Assert.Equal(activities.Count, activityExporter.ExportedActivities.Length + droppedCount);
- Assert.Equal(activities.Count / batchSize, exportCalledCount);
- }
-
- public void Dispose()
- {
- Activity.Current = null;
- }
-
- private Activity CreateActivity(string activityName)
- {
- var activity = Source.StartActivity(activityName);
- activity?.Stop();
- return activity;
- }
-
- private Activity CreateSampledEndedActivity(string activityName)
- {
- var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.Recorded);
-
- var activity = Source.StartActivity(activityName, ActivityKind.Internal, context);
- activity.Stop();
- return activity;
- }
-
- private Activity CreateNotSampledEndedActivity(string activityName)
- {
- var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.None);
-
- var activity = Source.StartActivity(activityName, ActivityKind.Server, context);
- activity?.Stop();
- return activity;
- }
-
- private Activity[] WaitForActivities(TestActivityExporter exporter, int activityCount, TimeSpan timeout)
- {
- var sw = Stopwatch.StartNew();
- while (exporter.ExportedActivities.Length < activityCount && sw.Elapsed <= timeout)
- {
- Thread.Sleep(10);
- }
-
- Assert.True(
- exporter.ExportedActivities.Length >= activityCount,
- $"Expected at least {activityCount}, got {exporter.ExportedActivities.Length}");
-
- return exporter.ExportedActivities;
- }
- }
-}