From 1c23c702ae68f924d9867f5deda387d8ca21beb0 Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Tue, 1 Oct 2019 20:26:23 -0700 Subject: [PATCH] Fix batching tests and processor improvements --- .../Trace/Export/BatchingSpanProcessor.cs | 20 +- .../Impl/Testing/Export/TestExporter.cs | 3 +- .../Export/BatchingSpanProcessorTests.cs | 315 +++++++++--------- 3 files changed, 173 insertions(+), 165 deletions(-) diff --git a/src/OpenTelemetry/Trace/Export/BatchingSpanProcessor.cs b/src/OpenTelemetry/Trace/Export/BatchingSpanProcessor.cs index f173238af6a..88477a84dd2 100644 --- a/src/OpenTelemetry/Trace/Export/BatchingSpanProcessor.cs +++ b/src/OpenTelemetry/Trace/Export/BatchingSpanProcessor.cs @@ -38,7 +38,7 @@ public class BatchingSpanProcessor : SpanProcessor, IDisposable private readonly TimeSpan scheduleDelay; private readonly Task worker; private CancellationTokenSource cts; - private int currentQueueSize; + private volatile int currentQueueSize; private bool stopping = false; /// @@ -113,6 +113,7 @@ public override void OnEnd(Span span) } Interlocked.Increment(ref this.currentQueueSize); + this.exportQueue.Enqueue(span); } @@ -130,9 +131,7 @@ public override async Task ShutdownAsync(CancellationToken cancellationToken) // if there are more items, continue until cancellation token allows while (this.currentQueueSize > 0 && !cancellationToken.IsCancellationRequested) { - Debug.WriteLine($"!!! {DateTime.UtcNow:o} export " + this.currentQueueSize); await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false); - Debug.WriteLine($"!!! {DateTime.UtcNow:o} export finished" + this.currentQueueSize); } // there is no point in waiting for a worker task if cancellation happens @@ -161,16 +160,19 @@ private async Task ExportBatchAsync(CancellationToken cancellationToken) return; } - var nextBatchSize = Math.Min(this.currentQueueSize, this.maxExportBatchSize); - - if (nextBatchSize == 0) + List batch = null; + if (this.exportQueue.TryDequeue(out var nextSpan)) + { + Interlocked.Decrement(ref this.currentQueueSize); + batch = new List { nextSpan }; + } + else { + // nothing in queue return; } - var batch = new List(nextBatchSize); - - while (batch.Count < nextBatchSize && this.exportQueue.TryDequeue(out var nextSpan)) + while (batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextSpan)) { Interlocked.Decrement(ref this.currentQueueSize); batch.Add(nextSpan); diff --git a/test/OpenTelemetry.Tests/Impl/Testing/Export/TestExporter.cs b/test/OpenTelemetry.Tests/Impl/Testing/Export/TestExporter.cs index e76cc546c91..fd41537dab7 100644 --- a/test/OpenTelemetry.Tests/Impl/Testing/Export/TestExporter.cs +++ b/test/OpenTelemetry.Tests/Impl/Testing/Export/TestExporter.cs @@ -14,12 +14,11 @@ // limitations under the License. // -using System.Collections.Concurrent; - namespace OpenTelemetry.Testing.Export { using System; using System.Collections.Generic; + using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using OpenTelemetry.Trace; diff --git a/test/OpenTelemetry.Tests/Impl/Trace/Export/BatchingSpanProcessorTests.cs b/test/OpenTelemetry.Tests/Impl/Trace/Export/BatchingSpanProcessorTests.cs index ae4c4f042a2..bd874ff8804 100644 --- a/test/OpenTelemetry.Tests/Impl/Trace/Export/BatchingSpanProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Impl/Trace/Export/BatchingSpanProcessorTests.cs @@ -34,16 +34,10 @@ public class BatchingSpanProcessorTest : IDisposable private const string SpanName1 = "MySpanName/1"; private const string SpanName2 = "MySpanName/2"; - private TestExporter spanExporter = new TestExporter(null); - private BatchingSpanProcessor spanProcessor; private static readonly TimeSpan DefaultDelay = TimeSpan.FromMilliseconds(30); private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(1); - public BatchingSpanProcessorTest() - { - spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 128); - } - private Span CreateSampledEndedSpan(string spanName) + private Span CreateSampledEndedSpan(string spanName, SpanProcessor spanProcessor) { var sampledActivity = new Activity(spanName); sampledActivity.ActivityTraceFlags |= ActivityTraceFlags.Recorded; @@ -62,7 +56,7 @@ private Span CreateSampledEndedSpan(string spanName) return span; } - private Span CreateNotSampledEndedSpan(string spanName) + private Span CreateNotSampledEndedSpan(string spanName, SpanProcessor spanProcessor) { var notSampledActivity = new Activity(spanName); notSampledActivity.SetIdFormat(ActivityIdFormat.W3C); @@ -92,40 +86,50 @@ public void ThrowsOnInvalidArguments() [Fact] public async Task ShutdownTwice() { - spanProcessor = new BatchingSpanProcessor(new NoopSpanExporter()); + using (var spanProcessor = new BatchingSpanProcessor(new NoopSpanExporter())) + { - await spanProcessor.ShutdownAsync(CancellationToken.None); + await spanProcessor.ShutdownAsync(CancellationToken.None); - // does not throw - await spanProcessor.ShutdownAsync(CancellationToken.None); + // does not throw + await spanProcessor.ShutdownAsync(CancellationToken.None); + } } [Fact] public async Task ShutdownWithHugeScheduleDelay() { - spanProcessor = new BatchingSpanProcessor(new NoopSpanExporter(), 128, TimeSpan.FromMinutes(1), 32); - - var sw = Stopwatch.StartNew(); - using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100))) + using (var spanProcessor = + new BatchingSpanProcessor(new NoopSpanExporter(), 128, TimeSpan.FromMinutes(1), 32)) { - cts.Token.ThrowIfCancellationRequested(); - await spanProcessor.ShutdownAsync(cts.Token).ConfigureAwait(false); + + var sw = Stopwatch.StartNew(); + using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100))) + { + cts.Token.ThrowIfCancellationRequested(); + await spanProcessor.ShutdownAsync(cts.Token).ConfigureAwait(false); + } + + sw.Stop(); + Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); } - sw.Stop(); - Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); } [Fact] public void ExportDifferentSampledSpans() { - var span1 = CreateSampledEndedSpan(SpanName1); - var span2 = CreateSampledEndedSpan(SpanName2); + var spanExporter = new TestExporter(null); + using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 128)) + { + var span1 = CreateSampledEndedSpan(SpanName1, spanProcessor); + var span2 = CreateSampledEndedSpan(SpanName2, spanProcessor); - var exported = WaitForSpans(spanExporter, 2, DefaultTimeout); + var exported = WaitForSpans(spanExporter, 2, DefaultTimeout); - Assert.Equal(2, exported.Length); - Assert.Contains(span1, exported); - Assert.Contains(span2, exported); + Assert.Equal(2, exported.Length); + Assert.Contains(span1, exported); + Assert.Contains(span2, exported); + } } [Fact] @@ -133,28 +137,30 @@ public void ExporterIsSlowerThanDelay() { var exportStartTimes = new List(); var exportEndTimes = new List(); - spanExporter = new TestExporter(_ => + var spanExporter = new TestExporter(_ => { exportStartTimes.Add(Stopwatch.GetTimestamp()); Thread.Sleep(50); exportEndTimes.Add(Stopwatch.GetTimestamp()); }); - spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(30), 2); - var spans = new List(); - for (int i = 0; i < 20; i++) + using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(30), 2)) { - spans.Add(CreateSampledEndedSpan(i.ToString())); - } + var spans = new List(); + for (int i = 0; i < 20; i++) + { + spans.Add(CreateSampledEndedSpan(i.ToString(), spanProcessor)); + } - var exported = WaitForSpans(spanExporter, 20, TimeSpan.FromSeconds(2)); + var exported = WaitForSpans(spanExporter, 20, TimeSpan.FromSeconds(2)); - Assert.Equal(spans.Count, exported.Length); - Assert.InRange(exportStartTimes.Count, 10, 20); + Assert.Equal(spans.Count, exported.Length); + Assert.InRange(exportStartTimes.Count, 10, 20); - for (int i = 1; i < exportStartTimes.Count - 1; i ++) - { - Assert.InRange(exportStartTimes[i], exportEndTimes[i - 1] + 1, exportStartTimes[i + 1] - 1); + for (int i = 1; i < exportStartTimes.Count - 1; i++) + { + Assert.InRange(exportStartTimes[i], exportEndTimes[i - 1] + 1, exportStartTimes[i + 1] - 1); + } } } @@ -162,45 +168,48 @@ public void ExporterIsSlowerThanDelay() public void AddSpanAfterQueueIsExhausted() { int exportCalledCount = 0; - spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); - spanProcessor = new BatchingSpanProcessor(spanExporter, 1, TimeSpan.FromMilliseconds(100), 1); - - var spans = new List(); - for (int i = 0; i < 20; i ++) + var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 1, TimeSpan.FromMilliseconds(100), 1)) { - spans.Add(CreateSampledEndedSpan(i.ToString())); - } + var spans = new List(); + for (int i = 0; i < 20; i++) + { + spans.Add(CreateSampledEndedSpan(i.ToString(), spanProcessor)); + } - var exported = WaitForSpans(spanExporter, 1, DefaultTimeout); + var exported = WaitForSpans(spanExporter, 1, DefaultTimeout); - Assert.Equal(1, exportCalledCount); - Assert.InRange(exported.Length, 1,2); - Assert.Contains(spans.First(), exported); + Assert.Equal(1, exportCalledCount); + Assert.InRange(exported.Length, 1, 2); + Assert.Contains(spans.First(), exported); + } } [Fact] public void ExportMoreSpansThanTheMaxBatchSize() { int exportCalledCount = 0; - spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); - spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 3); - var span1 = CreateSampledEndedSpan(SpanName1); - var span2 = CreateSampledEndedSpan(SpanName1); - var span3 = CreateSampledEndedSpan(SpanName1); - var span4 = CreateSampledEndedSpan(SpanName1); - var span5 = CreateSampledEndedSpan(SpanName1); - var span6 = CreateSampledEndedSpan(SpanName1); - - var exported = WaitForSpans(spanExporter, 6, DefaultTimeout); - Assert.Equal(2, exportCalledCount); - - Assert.Equal(6, exported.Count()); - Assert.Contains(span1, exported); - Assert.Contains(span2, exported); - Assert.Contains(span3, exported); - Assert.Contains(span4, exported); - Assert.Contains(span5, exported); - Assert.Contains(span6, exported); + var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 3)) + { + var span1 = CreateSampledEndedSpan(SpanName1, spanProcessor); + var span2 = CreateSampledEndedSpan(SpanName1, spanProcessor); + var span3 = CreateSampledEndedSpan(SpanName1, spanProcessor); + var span4 = CreateSampledEndedSpan(SpanName1, spanProcessor); + var span5 = CreateSampledEndedSpan(SpanName1, spanProcessor); + var span6 = CreateSampledEndedSpan(SpanName1, spanProcessor); + + var exported = WaitForSpans(spanExporter, 6, DefaultTimeout); + Assert.InRange(exportCalledCount, 2, 6); + + Assert.Equal(6, exported.Count()); + Assert.Contains(span1, exported); + Assert.Contains(span2, exported); + Assert.Contains(span3, exported); + Assert.Contains(span4, exported); + Assert.Contains(span5, exported); + Assert.Contains(span6, exported); + } } @@ -208,55 +217,58 @@ public void ExportMoreSpansThanTheMaxBatchSize() public void ExportNotSampledSpans() { int exportCalledCount = 0; - spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); - spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 3); - - var span1 = CreateNotSampledEndedSpan(SpanName1); - var span2 = CreateSampledEndedSpan(SpanName2); - // Spans are recorded and exported in the same order as they are ended, we test that a non - // sampled span is not exported by creating and ending a sampled span after a non sampled span - // and checking that the first exported span is the sampled span (the non sampled did not get - // exported). - var exported = WaitForSpans(spanExporter, 1, DefaultTimeout); - Assert.Equal(1, exportCalledCount); - - // Need to check this because otherwise the variable span1 is unused, other option is to not - // have a span1 variable. - Assert.Single(exported); - Assert.Contains(span2, exported); + var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 3)) + { + var span1 = CreateNotSampledEndedSpan(SpanName1, spanProcessor); + var span2 = CreateSampledEndedSpan(SpanName2, spanProcessor); + // Spans are recorded and exported in the same order as they are ended, we test that a non + // sampled span is not exported by creating and ending a sampled span after a non sampled span + // and checking that the first exported span is the sampled span (the non sampled did not get + // exported). + var exported = WaitForSpans(spanExporter, 1, DefaultTimeout); + Assert.Equal(1, exportCalledCount); + + // Need to check this because otherwise the variable span1 is unused, other option is to not + // have a span1 variable. + Assert.Single(exported); + Assert.Contains(span2, exported); + } } [Fact] public void ProcessorDoesNotBlockOnExporter() { - spanExporter = new TestExporter( _ => Thread.Sleep(500)); - - spanProcessor = new BatchingSpanProcessor(spanExporter); - - var sampledActivity = new Activity("foo"); - sampledActivity.ActivityTraceFlags |= ActivityTraceFlags.Recorded; - sampledActivity.SetIdFormat(ActivityIdFormat.W3C); - sampledActivity.Start(); - var span = - new Span( - sampledActivity, - Tracestate.Empty, - SpanKind.Internal, - TraceConfig.Default, - spanProcessor, - PreciseTimestamp.GetUtcNow(), - default); - - // does not block - var sw = Stopwatch.StartNew(); - span.End(); - sw.Stop(); - - Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); - - var exported = WaitForSpans(spanExporter, 1, DefaultTimeout); + var spanExporter = new TestExporter( _ => Thread.Sleep(500)); - Assert.Single(exported); + using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 128)) + { + var sampledActivity = new Activity("foo"); + sampledActivity.ActivityTraceFlags |= ActivityTraceFlags.Recorded; + sampledActivity.SetIdFormat(ActivityIdFormat.W3C); + sampledActivity.Start(); + var span = + new Span( + sampledActivity, + Tracestate.Empty, + SpanKind.Internal, + TraceConfig.Default, + spanProcessor, + PreciseTimestamp.GetUtcNow(), + default); + + // does not block + var sw = Stopwatch.StartNew(); + span.End(); + Debug.WriteLine($"[{PreciseTimestamp.GetUtcNow():o}] span.end"); + sw.Stop(); + + Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100)); + + var exported = WaitForSpans(spanExporter, 1, DefaultTimeout); + + Assert.Single(exported); + } } [Fact] @@ -264,23 +276,25 @@ public async Task ShutdownOnNotEmptyQueueFullFlush() { const int batchSize = 2; int exportCalledCount = 0; - spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); - spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize); - - var spans = new List(); - for (int i = 0; i < 100; i++) + var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + using (var spanProcessor = + new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize)) { - spans.Add(CreateSampledEndedSpan(i.ToString())); - } + var spans = new List(); + for (int i = 0; i < 100; i++) + { + spans.Add(CreateSampledEndedSpan(i.ToString(), spanProcessor)); + } - Assert.True(spanExporter.ExportedSpans.Length < spans.Count); - using (var cts = new CancellationTokenSource(DefaultTimeout)) - { - await spanProcessor.ShutdownAsync(cts.Token); + Assert.True(spanExporter.ExportedSpans.Length < spans.Count); + using (var cts = new CancellationTokenSource(DefaultTimeout)) + { + await spanProcessor.ShutdownAsync(cts.Token); + } + + Assert.Equal(spans.Count, spanExporter.ExportedSpans.Length); + Assert.InRange(exportCalledCount, spans.Count / batchSize, spans.Count); } - - Assert.Equal(spans.Count, spanExporter.ExportedSpans.Length); - Assert.InRange(exportCalledCount, spans.Count / batchSize, spans.Count); } [Fact] @@ -291,31 +305,32 @@ public async Task ShutdownOnNotEmptyQueueNotFullFlush() // we'll need about 1.5 sec to export all spans // we export 100 spans in batches of 2, each export takes 30ms, in one thread - spanExporter = new TestExporter(_ => + var spanExporter = new TestExporter(_ => { Interlocked.Increment(ref exportCalledCount); Thread.Sleep(30); }); - spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize); - - var spans = new List(); - for (int i = 0; i < 100; i++) + using (var spanProcessor = + new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize)) { - spans.Add(CreateSampledEndedSpan(i.ToString())); - } + var spans = new List(); + for (int i = 0; i < 100; i++) + { + spans.Add(CreateSampledEndedSpan(i.ToString(), spanProcessor)); + } - Assert.True(spanExporter.ExportedSpans.Length < spans.Count); + Assert.True(spanExporter.ExportedSpans.Length < spans.Count); - // we won't bs able to export all before cancellation will fire - using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200))) - { - await spanProcessor.ShutdownAsync(cts.Token); - } + // we won't bs able to export all before cancellation will fire + using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200))) + { + await spanProcessor.ShutdownAsync(cts.Token); + } - var exportedCount = spanExporter.ExportedSpans.Length; - Assert.True(exportedCount < spans.Count); - Assert.True(exportedCount / batchSize >= exportCalledCount); + var exportedCount = spanExporter.ExportedSpans.Length; + Assert.True(exportedCount < spans.Count); + } } [Fact] @@ -323,13 +338,13 @@ public void DisposeFlushes() { const int batchSize = 2; int exportCalledCount = 0; - spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); + var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount)); var spans = new List(); - using (spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize)) + using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize)) { for (int i = 0; i < 100; i++) { - spans.Add(CreateSampledEndedSpan(i.ToString())); + spans.Add(CreateSampledEndedSpan(i.ToString(), spanProcessor)); } Assert.True(spanExporter.ExportedSpans.Length < spans.Count); } @@ -340,14 +355,6 @@ public void DisposeFlushes() public void Dispose() { - using (var cts = new CancellationTokenSource()) - { - var t = spanProcessor.ShutdownAsync(cts.Token); - cts.Cancel(true); - - t.ContinueWith(_ => { }).GetAwaiter().GetResult(); - } - Activity.Current = null; } @@ -361,9 +368,9 @@ private Span[] WaitForSpans(TestExporter exporter, int spanCount, TimeSpan timeo return true; } - Thread.Sleep(0); + Thread.Sleep(10); return false; - }, timeout + TimeSpan.FromMilliseconds(20)), + }, timeout), $"Expected at least {spanCount}, got {exporter.ExportedSpans.Length}"); return exporter.ExportedSpans;