diff --git a/src/OpenTelemetry/BatchStruct.cs b/src/OpenTelemetry/BatchStruct.cs new file mode 100644 index 00000000000..6291af5e9d4 --- /dev/null +++ b/src/OpenTelemetry/BatchStruct.cs @@ -0,0 +1,129 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using OpenTelemetry.Internal; + +namespace OpenTelemetry +{ + /// + /// Stores a batch of completed objects to be exported. + /// + /// The type of object in the . + public readonly struct BatchStruct + where T : struct + { + private readonly T item; + private readonly CircularBufferStruct circularBuffer; + private readonly int maxSize; + + internal BatchStruct(T item) + { + this.item = item; + this.circularBuffer = null; + this.maxSize = 1; + } + + internal BatchStruct(CircularBufferStruct circularBuffer, int maxSize) + { + Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number."); + + this.item = default; + this.circularBuffer = circularBuffer ?? throw new ArgumentNullException(nameof(circularBuffer)); + this.maxSize = maxSize; + } + + /// + /// Returns an enumerator that iterates through the . + /// + /// . + public Enumerator GetEnumerator() + { + return this.circularBuffer != null + ? new Enumerator(this.circularBuffer, this.maxSize) + : new Enumerator(this.item); + } + + /// + /// Enumerates the elements of a . + /// + public struct Enumerator : IEnumerator + { + private readonly CircularBufferStruct circularBuffer; + private int count; + + internal Enumerator(T item) + { + this.Current = item; + this.circularBuffer = null; + this.count = -1; + } + + internal Enumerator(CircularBufferStruct circularBuffer, int maxSize) + { + this.Current = default; + this.circularBuffer = circularBuffer; + this.count = Math.Min(maxSize, circularBuffer.Count); + } + + /// + public T Current { get; private set; } + + /// + object IEnumerator.Current => this.Current; + + /// + public void Dispose() + { + } + + /// + public bool MoveNext() + { + var circularBuffer = this.circularBuffer; + + if (circularBuffer == null) + { + if (this.count >= 0) + { + this.Current = default; + return false; + } + + this.count++; + return true; + } + + if (this.count > 0) + { + this.Current = circularBuffer.Read(); + this.count--; + return true; + } + + this.Current = default; + return false; + } + + /// + public void Reset() + => throw new NotSupportedException(); + } + } +} diff --git a/test/Benchmarks/Program.cs b/test/Benchmarks/Program.cs index fe6760ea9f6..deb16323efa 100644 --- a/test/Benchmarks/Program.cs +++ b/test/Benchmarks/Program.cs @@ -22,6 +22,11 @@ internal static class Program public static void Main(string[] args) { BenchmarkSwitcher.FromAssembly(typeof(Program).Assembly).Run(args); + + /* + CircularBufferStress.EntryPoint(); + CircularBufferStructStress.EntryPoint(); + */ } } } diff --git a/test/Benchmarks/Stress/CircularBufferStress.cs b/test/Benchmarks/Stress/CircularBufferStress.cs new file mode 100644 index 00000000000..74b401a4e63 --- /dev/null +++ b/test/Benchmarks/Stress/CircularBufferStress.cs @@ -0,0 +1,123 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Benchmarks +{ + internal class CircularBufferStress + { + private static long cntEvents = 0; + + public static int EntryPoint() + { + var cntWriter = Environment.ProcessorCount; // configure the number of writers + var buffer = new CircularBuffer(500000); // configure the circular buffer capacity, change to a smaller number to test the congestion + long bound = 100000000L; // each writer will write [1, bound] + var statistics = new long[cntWriter]; + var retry = new long[cntWriter]; + long result = bound * (bound + 1) * cntWriter / 2; + + Console.WriteLine($"Inserting [0, {bound}] from {cntWriter} writers to a buffer with capacity={buffer.Capacity}."); + + Parallel.Invoke( + () => + { + var watch = new Stopwatch(); + + while (result != 0) + { + cntEvents = statistics.Sum(); + watch.Restart(); + Thread.Sleep(200); + watch.Stop(); + var nEvents = statistics.Sum(); + var nEventPerSecond = (int)((double)(nEvents - cntEvents) / ((double)watch.ElapsedMilliseconds / 1000.0)); + var cntRetry = retry.Sum(); + Console.Title = string.Format($"QueueSize: {buffer.Count}/{buffer.Capacity}, Retry: {cntRetry}, Enqueue: {nEvents}, Enqueue/s: {nEventPerSecond}, Result: {result}"); + } + }, () => + { + Parallel.For(0, statistics.Length, (i) => + { + statistics[i] = 0; + long num = 1; + + Console.WriteLine($"Writer {i} started."); + + while (true) + { + var item = new Item(num); + + while (!buffer.TryAdd(item, 0)) + { + retry[i]++; + } + + num += 1; + statistics[i]++; + + if (num > bound) + { + break; + } + } + + Console.WriteLine($"Writer {i} finished."); + }); + }, () => + { + Console.WriteLine($"Reader started."); + + while (true) + { + foreach (var item in new Batch(buffer, 100)) + { + result -= item.Value; + } + + if (result == 0) + { + break; + } + } + + Console.WriteLine($"Reader finished."); + }); + + Console.WriteLine("Succeeded!"); + return 0; + } + + internal class Item + { + internal Item(long value) + { + this.Value = value; + } + + public long Value { get; private set; } + } + } +} diff --git a/test/Benchmarks/Stress/CircularBufferStructStress.cs b/test/Benchmarks/Stress/CircularBufferStructStress.cs new file mode 100644 index 00000000000..e2c65fe13fb --- /dev/null +++ b/test/Benchmarks/Stress/CircularBufferStructStress.cs @@ -0,0 +1,123 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using OpenTelemetry; +using OpenTelemetry.Internal; + +namespace OpenTelemetry.Benchmarks +{ + internal class CircularBufferStructStress + { + private static long cntEvents = 0; + + public static int EntryPoint() + { + var cntWriter = Environment.ProcessorCount; // configure the number of writers + var buffer = new CircularBufferStruct(500000); // configure the circular buffer capacity, change to a smaller number to test the congestion + long bound = 100000000L; // each writer will write [1, bound] + var statistics = new long[cntWriter]; + var retry = new long[cntWriter]; + long result = bound * (bound + 1) * cntWriter / 2; + + Console.WriteLine($"Inserting [0, {bound}] from {cntWriter} writers to a buffer with capacity={buffer.Capacity}."); + + Parallel.Invoke( + () => + { + var watch = new Stopwatch(); + + while (result != 0) + { + cntEvents = statistics.Sum(); + watch.Restart(); + Thread.Sleep(200); + watch.Stop(); + var nEvents = statistics.Sum(); + var nEventPerSecond = (int)((double)(nEvents - cntEvents) / ((double)watch.ElapsedMilliseconds / 1000.0)); + var cntRetry = retry.Sum(); + Console.Title = string.Format($"QueueSize: {buffer.Count}/{buffer.Capacity}, Retry: {cntRetry}, Enqueue: {nEvents}, Enqueue/s: {nEventPerSecond}, Result: {result}"); + } + }, () => + { + Parallel.For(0, statistics.Length, (i) => + { + statistics[i] = 0; + long num = 1; + + Console.WriteLine($"Writer {i} started."); + + while (true) + { + var item = new Item(num); + + while (!buffer.TryAdd(item, 0)) + { + retry[i]++; + } + + num += 1; + statistics[i]++; + + if (num > bound) + { + break; + } + } + + Console.WriteLine($"Writer {i} finished."); + }); + }, () => + { + Console.WriteLine($"Reader started."); + + while (true) + { + foreach (var item in new BatchStruct(buffer, 100)) + { + result -= item.Value; + } + + if (result == 0) + { + break; + } + } + + Console.WriteLine($"Reader finished."); + }); + + Console.WriteLine("Succeeded!"); + return 0; + } + + internal struct Item + { + internal Item(long value) + { + this.Value = value; + } + + public long Value { get; private set; } + } + } +}