Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor exporter - step 10 #1135

Merged
merged 6 commits into from
Aug 22, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/trace/building-your-own-exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

* To export telemetry to a specific destination, custom exporters must be
written.
* Exporters should inherit from `ActivityExporter` and implement `ExportAsync`
and `ShutdownAsync` methods. `ActivityExporter` is part of the [OpenTelemetry
* Exporters should inherit from `ActivityExporter` and implement `Export` and
`Shutdown` methods. `ActivityExporter` is part of the [OpenTelemetry
Package](https://www.nuget.org/packages/opentelemetry).
* Depending on user's choice and load on the application, `ExportAsync` may get
* Depending on user's choice and load on the application, `Export` may get
called with zero or more activities.
* Exporters will only receive sampled-in and ended activities.
* Exporters must not throw.
Expand Down
11 changes: 5 additions & 6 deletions docs/trace/building-your-own-processor/MyActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ public override void OnEnd(Activity activity)
Console.WriteLine($"{this}.OnEnd");
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
Console.WriteLine($"{this}.ForceFlushAsync");
return Task.CompletedTask;
Console.WriteLine($"{this}.ForceFlush");
return true;
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMillis = Timeout.Infinite)
{
Console.WriteLine($"{this}.ShutdownAsync");
return Task.CompletedTask;
Console.WriteLine($"{this}.Shutdown");
}
}
6 changes: 2 additions & 4 deletions docs/trace/building-your-own-sampler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
critical code path.

```csharp
class MySampler : Sampler
internal class MySampler : Sampler
{
public override SamplingResult ShouldSample(in SamplingParameters samplingParameters)
{
var shouldSample = true;

return new SamplingResult(shouldSample);
return new SamplingResult(SamplingDecision.RecordAndSampled);
}
}
```
3 changes: 2 additions & 1 deletion src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using OpenTelemetry.Exporter.Jaeger.Implementation;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
Expand Down Expand Up @@ -61,7 +62,7 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
}

/// <inheritdoc/>
public override void Shutdown()
public override void Shutdown(int timeoutMillis = Timeout.Infinite)
{
this.JaegerAgentUdpBatcher.FlushAsync(default).GetAwaiter().GetResult();
}
Expand Down
3 changes: 2 additions & 1 deletion src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
[#1094](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1094)
[#1113](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1113)
[#1127](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1127)
[#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129))
[#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129)
[#1135](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1135))

## 0.4.0-beta.2

Expand Down
10 changes: 8 additions & 2 deletions src/OpenTelemetry/Trace/ActivityExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Diagnostics;
using System.Threading;

namespace OpenTelemetry.Trace
{
Expand Down Expand Up @@ -48,9 +49,14 @@ public abstract class ActivityExporter : IDisposable
public abstract ExportResult Export(in Batch<Activity> batch);

/// <summary>
/// Shuts down the exporter.
/// Attempts to shutdown the exporter, blocks the current thread until
/// shutdown completed or timed out.
/// </summary>
public virtual void Shutdown()
/// <param name="timeoutMillis">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we spell it out fully timeoutMilliseconds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That name is coming from the spec.

@cijothomas if we want be C# idiomatic, I will use timeoutMilliseconds across the entire repo (current many places are already using the timeoutMillis convention from the spec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed all places to use timeoutMilliseconds.

/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public virtual void Shutdown(int timeoutMillis = Timeout.Infinite)
{
}

Expand Down
38 changes: 19 additions & 19 deletions src/OpenTelemetry/Trace/ActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,31 @@ public virtual void OnEnd(Activity activity)
}

/// <summary>
/// Shuts down Activity processor asynchronously.
/// Flushes the <see cref="ActivityProcessor"/>, blocks the current
/// thread until flush completed, shutdown signaled or timed out.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public virtual Task ShutdownAsync(CancellationToken cancellationToken)
/// <param name="timeoutMillis">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public virtual bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}

/// <summary>
/// Flushes all activities that have not yet been processed.
/// Attempts to shutdown the processor, blocks the current thread until
/// shutdown completed or timed out.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public virtual Task ForceFlushAsync(CancellationToken cancellationToken)
/// <param name="timeoutMillis">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public virtual void Shutdown(int timeoutMillis = Timeout.Infinite)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}

/// <inheritdoc/>
Expand All @@ -91,7 +91,7 @@ protected virtual void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
this.Shutdown();
}
catch (Exception ex)
{
Expand Down
22 changes: 3 additions & 19 deletions src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public override void OnEnd(Activity activity)
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
public override bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
Expand Down Expand Up @@ -210,23 +210,15 @@ public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ForceFlushAsync(CancellationToken cancellationToken)
{
// TODO
throw new NotImplementedException();
}

/// <summary>
/// Attempt to drain the queue and shutdown the exporter, blocks the
/// Attempts to drain the queue and shutdown the exporter, blocks the
/// current thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMillis">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public void Shutdown(int timeoutMillis = Timeout.Infinite)
public override void Shutdown(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
Expand All @@ -242,14 +234,6 @@ public void Shutdown(int timeoutMillis = Timeout.Infinite)
}
}

/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
// TODO
throw new NotImplementedException();
}

/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
Expand Down
71 changes: 58 additions & 13 deletions src/OpenTelemetry/Trace/CompositeActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public CompositeActivityProcessor AddProcessor(ActivityProcessor processor)
return this;
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
var cur = this.head;
Expand All @@ -80,6 +81,7 @@ public override void OnEnd(Activity activity)
}
}

/// <inheritdoc/>
public override void OnStart(Activity activity)
{
var cur = this.head;
Expand All @@ -91,32 +93,75 @@ public override void OnStart(Activity activity)
}
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
}

var cur = this.head;
var task = cur.Value.ShutdownAsync(cancellationToken);

for (cur = cur.Next; cur != null; cur = cur.Next)
var sw = Stopwatch.StartNew();

while (cur != null)
{
var processor = cur.Value;
task = task.ContinueWith(t => processor.ShutdownAsync(cancellationToken));
if (timeoutMillis == Timeout.Infinite)
{
var succeeded = cur.Value.ForceFlush(Timeout.Infinite);
}
else
{
var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;

if (timeout <= 0)
{
return false;
}

var succeeded = cur.Value.ForceFlush((int)timeout);

if (!succeeded)
{
return false;
}
}

cur = cur.Next;
}

return task;
return true;
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override void Shutdown(int timeoutMillis = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
}

var cur = this.head;
var task = cur.Value.ForceFlushAsync(cancellationToken);

for (cur = cur.Next; cur != null; cur = cur.Next)
var sw = Stopwatch.StartNew();

while (cur != null)
{
var processor = cur.Value;
task = task.ContinueWith(t => processor.ForceFlushAsync(cancellationToken));
}
if (timeoutMillis == Timeout.Infinite)
{
cur.Value.Shutdown(Timeout.Infinite);
}
else
{
var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;

return task;
// notify all the processors, even if we run overtime
cur.Value.Shutdown((int)Math.Max(timeout, 0));
}

cur = cur.Next;
}
}

protected override void Dispose(bool disposing)
Expand Down
9 changes: 2 additions & 7 deletions src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,14 @@ public override void OnEnd(Activity activity)
}

/// <inheritdoc />
public override Task ShutdownAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMillis = Timeout.Infinite)
{
if (!this.stopped)
{
// TODO: pass down the timeout to exporter
this.exporter.Shutdown();
this.stopped = true;
}

#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}

/// <summary>
Expand Down
19 changes: 5 additions & 14 deletions test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,15 @@ public override void OnEnd(Activity activity)
this.EndAction?.Invoke(activity);
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
this.ShutdownCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
this.ForceFlushCalled = true;
return true;
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMillis = Timeout.Infinite)
{
this.ForceFlushCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
this.ShutdownCalled = true;
}

protected override void Dispose(bool disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,15 @@ public override void OnEnd(Activity span)
this.EndAction?.Invoke(span);
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMillis = Timeout.Infinite)
{
this.ShutdownCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
this.ForceFlushCalled = true;
return true;
}

public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMillis = Timeout.Infinite)
{
this.ForceFlushCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
this.ShutdownCalled = true;
}

protected override void Dispose(bool disposing)
Expand Down
Loading