Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch<T> public api additions to unblock users #2542

Merged
merged 5 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/OpenTelemetry/.publicApi/net461/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
OpenTelemetry.Batch<T>.Batch(T[] items, int count) -> void
OpenTelemetry.Batch<T>.Count.get -> long
OpenTelemetry.Trace.BatchExportActivityProcessorOptions
OpenTelemetry.Trace.BatchExportActivityProcessorOptions.BatchExportActivityProcessorOptions() -> void
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
OpenTelemetry.BaseExporter<T>.ForceFlush(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>.Batch(T[] items, int count) -> void
OpenTelemetry.Batch<T>.Count.get -> long
OpenTelemetry.Trace.BatchExportActivityProcessorOptions
OpenTelemetry.Trace.BatchExportActivityProcessorOptions.BatchExportActivityProcessorOptions() -> void
override OpenTelemetry.BaseExportProcessor<T>.OnForceFlush(int timeoutMilliseconds) -> bool
Expand Down
156 changes: 87 additions & 69 deletions src/OpenTelemetry/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Metrics;

namespace OpenTelemetry
{
Expand All @@ -32,40 +31,53 @@ namespace OpenTelemetry
{
private readonly T item;
private readonly CircularBuffer<T> circularBuffer;
private readonly T[] metrics;
private readonly T[] items;
private readonly long targetCount;

/// <summary>
/// Initializes a new instance of the <see cref="Batch{T}"/> struct.
/// </summary>
/// <param name="items">The items to store in the batch.</param>
/// <param name="count">The number of items in the batch.</param>
public Batch(T[] items, int count)
{
Guard.Null(items, nameof(items));
Guard.Range(count, nameof(count), 0, items.Length);

this.item = null;
this.circularBuffer = null;
this.items = items;
this.Count = this.targetCount = count;
}

internal Batch(T item)
{
Guard.Null(item, nameof(item));
Debug.Assert(item != null, $"{nameof(item)} was null.");

this.item = item;
this.circularBuffer = null;
this.metrics = null;
this.targetCount = 1;
this.items = null;
this.Count = this.targetCount = 1;
}

internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
{
Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");
Guard.Null(circularBuffer, nameof(circularBuffer));
Debug.Assert(circularBuffer != null, $"{nameof(circularBuffer)} was null.");

this.item = null;
this.metrics = null;
this.items = null;
this.circularBuffer = circularBuffer;
this.targetCount = circularBuffer.RemovedCount + Math.Min(maxSize, circularBuffer.Count);
this.Count = Math.Min(maxSize, circularBuffer.Count);
this.targetCount = circularBuffer.RemovedCount + this.Count;
}

internal Batch(T[] metrics, int maxSize)
{
Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");
Guard.Null(metrics, nameof(metrics));
private delegate bool BatchEnumeratorMoveNextFunc(ref Enumerator enumerator);

this.item = null;
this.circularBuffer = null;
this.metrics = metrics;
this.targetCount = maxSize;
}
/// <summary>
/// Gets the count of items in the batch.
/// </summary>
public long Count { get; }

/// <inheritdoc/>
public void Dispose()
Expand All @@ -88,46 +100,91 @@ public Enumerator GetEnumerator()
{
return this.circularBuffer != null
? new Enumerator(this.circularBuffer, this.targetCount)
: this.metrics != null
? new Enumerator(this.metrics, this.targetCount)
: new Enumerator(this.item);
: this.item != null
? new Enumerator(this.item)
/* In the event someone uses default/new Batch() to create Batch we fallback to empty items mode. */
: new Enumerator(this.items ?? Array.Empty<T>(), this.targetCount);
}

/// <summary>
/// Enumerates the elements of a <see cref="Batch{T}"/>.
/// </summary>
public struct Enumerator : IEnumerator<T>
{
private static readonly BatchEnumeratorMoveNextFunc MoveNextSingleItem = (ref Enumerator enumerator) =>
{
if (enumerator.targetCount >= 0)
{
enumerator.Current = null;
return false;
}

enumerator.targetCount++;
return true;
};

private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBuffer = (ref Enumerator enumerator) =>
{
var circularBuffer = enumerator.circularBuffer;

if (circularBuffer.RemovedCount < enumerator.targetCount)
{
enumerator.Current = circularBuffer.Read();
return true;
}

enumerator.Current = null;
return false;
};

private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) =>
{
var items = enumerator.items;

if (enumerator.itemIndex < enumerator.targetCount)
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
{
enumerator.Current = items[enumerator.itemIndex++];
return true;
}

enumerator.Current = null;
return false;
};

private readonly CircularBuffer<T> circularBuffer;
private readonly T[] metrics;
private readonly T[] items;
private readonly BatchEnumeratorMoveNextFunc moveNextFunc;
private long targetCount;
private int metricIndex;
private int itemIndex;

internal Enumerator(T item)
{
this.Current = item;
this.circularBuffer = null;
this.metrics = null;
this.items = null;
this.targetCount = -1;
this.metricIndex = 0;
this.itemIndex = 0;
this.moveNextFunc = MoveNextSingleItem;
}

internal Enumerator(CircularBuffer<T> circularBuffer, long targetCount)
{
this.Current = null;
this.metrics = null;
this.items = null;
this.circularBuffer = circularBuffer;
this.targetCount = targetCount;
this.metricIndex = 0;
this.itemIndex = 0;
this.moveNextFunc = MoveNextCircularBuffer;
}

internal Enumerator(T[] metrics, long targetCount)
internal Enumerator(T[] items, long targetCount)
{
this.Current = null;
this.circularBuffer = null;
this.metrics = metrics;
this.items = items;
this.targetCount = targetCount;
this.metricIndex = 0;
this.itemIndex = 0;
this.moveNextFunc = MoveNextArray;
}

/// <inheritdoc/>
Expand All @@ -144,46 +201,7 @@ public void Dispose()
/// <inheritdoc/>
public bool MoveNext()
{
if (typeof(T) == typeof(Metric))
{
var metrics = this.metrics;

if (metrics != null)
{
if (this.metricIndex < this.targetCount)
{
this.Current = metrics[this.metricIndex];
this.metricIndex++;
return true;
}
}

this.Current = null;
return false;
}

var circularBuffer = this.circularBuffer;

if (circularBuffer == null)
{
if (this.targetCount >= 0)
{
this.Current = null;
return false;
}

this.targetCount++;
return true;
}

if (circularBuffer.RemovedCount < this.targetCount)
{
this.Current = circularBuffer.Read();
return true;
}

this.Current = null;
return false;
return this.moveNextFunc(ref this);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hoping this delegate method is faster, but not sure. Need to benchmark it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yun-Ting could you help on the benchmark part?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I'm working on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmarks summary are as below.
With If statements: https://github.com/Yun-Ting/opentelemetry-dotnet/blob/Yun-Ting/oldMoveNext/test/OpenTelemetry.Tests/Trace/BatchTestMoveNextBenchmarks.cs

// * Summary *
BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1288 (21H1/May2021Update)
Intel Xeon CPU E5-1650 v4 3.60GHz, 1 CPU, 12 logical and 6 physical cores
.NET SDK=5.0.402
[Host] : .NET 5.0.11 (5.0.1121.47308), X64 RyuJIT [AttachedDebugger]
DefaultJob : .NET 5.0.11 (5.0.1121.47308), X64 RyuJIT

Method Mean Error StdDev Gen 0 Allocated
SingleElement 26.18 ns 0.514 ns 0.900 ns - -
MoveNextCircularBuffer 220.11 ns 4.389 ns 11.562 ns 0.0138 112 B
MoveNextArray 47.21 ns 0.984 ns 2.097 ns 0.0081 64 B

And with delegate methods: https://github.com/Yun-Ting/opentelemetry-dotnet/blob/Yun-Ting/newMoveNext/test/OpenTelemetry.Tests/Trace/BatchTestMoveNextBenchmarks.cs
// * Summary *

Method Mean Error StdDev Gen 0 Allocated
SingleElement 26.59 ns 0.527 ns 1.002 ns - -
MoveNextCircularBuffer 219.28 ns 4.341 ns 8.769 ns 0.0141 112 B
MoveNextArray 54.77 ns 1.114 ns 2.146 ns 0.0081 64 B

With delegate method, MoveNextArray is slightly slower than using if statements.
I think it is caused by:

this.Current = metrics[this.metricIndex];
this.metricIndex++;

vs

enumerator.Current = items[enumerator.itemIndex++];

But I'm quite surprised to learn that one line assignment + variable increment is slower than assign and increment in two lines.
For SingleElement and MoveNextCircularBuffer they are almost identical.
I'm voting for the delegate methods because of code readability.

}

/// <inheritdoc/>
Expand Down
34 changes: 31 additions & 3 deletions test/OpenTelemetry.Tests/Trace/BatchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class BatchTest
[Fact]
public void CheckConstructorExceptions()
{
Assert.Throws<ArgumentNullException>(() => new Batch<string>(null));

// Assert.Throws<ArgumentNullException>(() => new Batch<string>(null, 1));
Assert.Throws<ArgumentNullException>(() => new Batch<string>((string[])null, 0));
Assert.Throws<ArgumentOutOfRangeException>(() => new Batch<string>(Array.Empty<string>(), -1));
Assert.Throws<ArgumentOutOfRangeException>(() => new Batch<string>(Array.Empty<string>(), 1));
}

[Fact]
Expand Down Expand Up @@ -132,6 +132,34 @@ public void CheckEnumeratorResetException()
Assert.Throws<NotSupportedException>(() => enumerator.Reset());
}

[Fact]
public void DrainIntoNewBatchTest()
{
var circularBuffer = new CircularBuffer<string>(100);
circularBuffer.Add("a");
circularBuffer.Add("b");

Batch<string> batch = new Batch<string>(circularBuffer, 10);

Assert.Equal(2, batch.Count);

string[] storage = new string[10];
int selectedItemCount = 0;
foreach (string item in batch)
{
if (item == "b")
{
storage[selectedItemCount++] = item;
}
}

batch = new Batch<string>(storage, selectedItemCount);

Assert.Equal(1, batch.Count);

this.ValidateEnumerator(batch.GetEnumerator(), "b");
}

private void ValidateEnumerator(Batch<string>.Enumerator enumerator, string expected)
{
if (enumerator.Current != null)
Expand Down