Skip to content

Commit

Permalink
Batch<T> public api additions to unblock users (#2542)
Browse files Browse the repository at this point in the history
* Added a public ctor to Batch<T> to unblock user scenarios.

* Bug fixes and improvements.

* Unit test.

* CHANGELOG update

Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
CodeBlanch and cijothomas authored Oct 29, 2021
1 parent ce46d00 commit d9b2ea4
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 72 deletions.
2 changes: 2 additions & 0 deletions src/OpenTelemetry/.publicApi/net461/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
OpenTelemetry.Batch<T>.Batch(T[] items, int count) -> void
OpenTelemetry.Batch<T>.Count.get -> long
OpenTelemetry.Trace.BatchExportActivityProcessorOptions
OpenTelemetry.Trace.BatchExportActivityProcessorOptions.BatchExportActivityProcessorOptions() -> void
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
OpenTelemetry.BaseExporter<T>.ForceFlush(int timeoutMilliseconds = -1) -> bool
OpenTelemetry.Batch<T>.Batch(T[] items, int count) -> void
OpenTelemetry.Batch<T>.Count.get -> long
OpenTelemetry.Trace.BatchExportActivityProcessorOptions
OpenTelemetry.Trace.BatchExportActivityProcessorOptions.BatchExportActivityProcessorOptions() -> void
override OpenTelemetry.BaseExportProcessor<T>.OnForceFlush(int timeoutMilliseconds) -> bool
Expand Down
156 changes: 87 additions & 69 deletions src/OpenTelemetry/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Metrics;

namespace OpenTelemetry
{
Expand All @@ -32,40 +31,53 @@ namespace OpenTelemetry
{
private readonly T item;
private readonly CircularBuffer<T> circularBuffer;
private readonly T[] metrics;
private readonly T[] items;
private readonly long targetCount;

/// <summary>
/// Initializes a new instance of the <see cref="Batch{T}"/> struct.
/// </summary>
/// <param name="items">The items to store in the batch.</param>
/// <param name="count">The number of items in the batch.</param>
public Batch(T[] items, int count)
{
Guard.Null(items, nameof(items));
Guard.Range(count, nameof(count), 0, items.Length);

this.item = null;
this.circularBuffer = null;
this.items = items;
this.Count = this.targetCount = count;
}

internal Batch(T item)
{
Guard.Null(item, nameof(item));
Debug.Assert(item != null, $"{nameof(item)} was null.");

this.item = item;
this.circularBuffer = null;
this.metrics = null;
this.targetCount = 1;
this.items = null;
this.Count = this.targetCount = 1;
}

internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
{
Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");
Guard.Null(circularBuffer, nameof(circularBuffer));
Debug.Assert(circularBuffer != null, $"{nameof(circularBuffer)} was null.");

this.item = null;
this.metrics = null;
this.items = null;
this.circularBuffer = circularBuffer;
this.targetCount = circularBuffer.RemovedCount + Math.Min(maxSize, circularBuffer.Count);
this.Count = Math.Min(maxSize, circularBuffer.Count);
this.targetCount = circularBuffer.RemovedCount + this.Count;
}

internal Batch(T[] metrics, int maxSize)
{
Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");
Guard.Null(metrics, nameof(metrics));
private delegate bool BatchEnumeratorMoveNextFunc(ref Enumerator enumerator);

this.item = null;
this.circularBuffer = null;
this.metrics = metrics;
this.targetCount = maxSize;
}
/// <summary>
/// Gets the count of items in the batch.
/// </summary>
public long Count { get; }

/// <inheritdoc/>
public void Dispose()
Expand All @@ -88,46 +100,91 @@ public Enumerator GetEnumerator()
{
return this.circularBuffer != null
? new Enumerator(this.circularBuffer, this.targetCount)
: this.metrics != null
? new Enumerator(this.metrics, this.targetCount)
: new Enumerator(this.item);
: this.item != null
? new Enumerator(this.item)
/* In the event someone uses default/new Batch() to create Batch we fallback to empty items mode. */
: new Enumerator(this.items ?? Array.Empty<T>(), this.targetCount);
}

/// <summary>
/// Enumerates the elements of a <see cref="Batch{T}"/>.
/// </summary>
public struct Enumerator : IEnumerator<T>
{
private static readonly BatchEnumeratorMoveNextFunc MoveNextSingleItem = (ref Enumerator enumerator) =>
{
if (enumerator.targetCount >= 0)
{
enumerator.Current = null;
return false;
}

enumerator.targetCount++;
return true;
};

private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBuffer = (ref Enumerator enumerator) =>
{
var circularBuffer = enumerator.circularBuffer;

if (circularBuffer.RemovedCount < enumerator.targetCount)
{
enumerator.Current = circularBuffer.Read();
return true;
}

enumerator.Current = null;
return false;
};

private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) =>
{
var items = enumerator.items;

if (enumerator.itemIndex < enumerator.targetCount)
{
enumerator.Current = items[enumerator.itemIndex++];
return true;
}

enumerator.Current = null;
return false;
};

private readonly CircularBuffer<T> circularBuffer;
private readonly T[] metrics;
private readonly T[] items;
private readonly BatchEnumeratorMoveNextFunc moveNextFunc;
private long targetCount;
private int metricIndex;
private int itemIndex;

internal Enumerator(T item)
{
this.Current = item;
this.circularBuffer = null;
this.metrics = null;
this.items = null;
this.targetCount = -1;
this.metricIndex = 0;
this.itemIndex = 0;
this.moveNextFunc = MoveNextSingleItem;
}

internal Enumerator(CircularBuffer<T> circularBuffer, long targetCount)
{
this.Current = null;
this.metrics = null;
this.items = null;
this.circularBuffer = circularBuffer;
this.targetCount = targetCount;
this.metricIndex = 0;
this.itemIndex = 0;
this.moveNextFunc = MoveNextCircularBuffer;
}

internal Enumerator(T[] metrics, long targetCount)
internal Enumerator(T[] items, long targetCount)
{
this.Current = null;
this.circularBuffer = null;
this.metrics = metrics;
this.items = items;
this.targetCount = targetCount;
this.metricIndex = 0;
this.itemIndex = 0;
this.moveNextFunc = MoveNextArray;
}

/// <inheritdoc/>
Expand All @@ -144,46 +201,7 @@ public void Dispose()
/// <inheritdoc/>
public bool MoveNext()
{
if (typeof(T) == typeof(Metric))
{
var metrics = this.metrics;

if (metrics != null)
{
if (this.metricIndex < this.targetCount)
{
this.Current = metrics[this.metricIndex];
this.metricIndex++;
return true;
}
}

this.Current = null;
return false;
}

var circularBuffer = this.circularBuffer;

if (circularBuffer == null)
{
if (this.targetCount >= 0)
{
this.Current = null;
return false;
}

this.targetCount++;
return true;
}

if (circularBuffer.RemovedCount < this.targetCount)
{
this.Current = circularBuffer.Read();
return true;
}

this.Current = null;
return false;
return this.moveNextFunc(ref this);
}

/// <inheritdoc/>
Expand Down
3 changes: 3 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
* Added `BaseExporter.ForceFlush`.
([#2525](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2525))

* Exposed public `Batch(T[] items, int count)` constructor on `Batch<T>` struct
([#2542](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2542))

## 1.2.0-beta1

Released 2021-Oct-08
Expand Down
34 changes: 31 additions & 3 deletions test/OpenTelemetry.Tests/Trace/BatchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class BatchTest
[Fact]
public void CheckConstructorExceptions()
{
Assert.Throws<ArgumentNullException>(() => new Batch<string>(null));

// Assert.Throws<ArgumentNullException>(() => new Batch<string>(null, 1));
Assert.Throws<ArgumentNullException>(() => new Batch<string>((string[])null, 0));
Assert.Throws<ArgumentOutOfRangeException>(() => new Batch<string>(Array.Empty<string>(), -1));
Assert.Throws<ArgumentOutOfRangeException>(() => new Batch<string>(Array.Empty<string>(), 1));
}

[Fact]
Expand Down Expand Up @@ -132,6 +132,34 @@ public void CheckEnumeratorResetException()
Assert.Throws<NotSupportedException>(() => enumerator.Reset());
}

[Fact]
public void DrainIntoNewBatchTest()
{
var circularBuffer = new CircularBuffer<string>(100);
circularBuffer.Add("a");
circularBuffer.Add("b");

Batch<string> batch = new Batch<string>(circularBuffer, 10);

Assert.Equal(2, batch.Count);

string[] storage = new string[10];
int selectedItemCount = 0;
foreach (string item in batch)
{
if (item == "b")
{
storage[selectedItemCount++] = item;
}
}

batch = new Batch<string>(storage, selectedItemCount);

Assert.Equal(1, batch.Count);

this.ValidateEnumerator(batch.GetEnumerator(), "b");
}

private void ValidateEnumerator(Batch<string>.Enumerator enumerator, string expected)
{
if (enumerator.Current != null)
Expand Down

0 comments on commit d9b2ea4

Please sign in to comment.