Skip to content

Commit

Permalink
Otlp Retry Part1 - Refactor ExportClients (#5335)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishweshbankwar authored Feb 16, 2024
1 parent e4b08ac commit 5849d3a
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
/// <typeparam name="TRequest">Type of export request.</typeparam>
internal abstract class BaseOtlpGrpcExportClient<TRequest> : IExportClient<TRequest>
{
protected static readonly ExportClientGrpcResponse SuccessExportResponse = new ExportClientGrpcResponse(success: true, deadlineUtc: null, exception: null);

protected BaseOtlpGrpcExportClient(OtlpExporterOptions options)
{
Guard.ThrowIfNull(options);
Expand All @@ -38,7 +40,7 @@ protected BaseOtlpGrpcExportClient(OtlpExporterOptions options)
internal int TimeoutMilliseconds { get; }

/// <inheritdoc/>
public abstract bool SendExportRequest(TRequest request, CancellationToken cancellationToken = default);
public abstract ExportClientResponse SendExportRequest(TRequest request, CancellationToken cancellationToken = default);

/// <inheritdoc/>
public virtual bool Shutdown(int timeoutMilliseconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
/// <typeparam name="TRequest">Type of export request.</typeparam>
internal abstract class BaseOtlpHttpExportClient<TRequest> : IExportClient<TRequest>
{
private static readonly ExportClientHttpResponse SuccessExportResponse = new ExportClientHttpResponse(success: true, deadlineUtc: null, response: null, exception: null);

protected BaseOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)
{
Guard.ThrowIfNull(options);
Expand All @@ -34,24 +36,33 @@ protected BaseOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpC
internal IReadOnlyDictionary<string, string> Headers { get; }

/// <inheritdoc/>
public bool SendExportRequest(TRequest request, CancellationToken cancellationToken = default)
public ExportClientResponse SendExportRequest(TRequest request, CancellationToken cancellationToken = default)
{
DateTime deadline = DateTime.UtcNow.AddMilliseconds(this.HttpClient.Timeout.TotalMilliseconds);
try
{
using var httpRequest = this.CreateHttpRequest(request);

using var httpResponse = this.SendHttpRequest(httpRequest, cancellationToken);

httpResponse?.EnsureSuccessStatusCode();
try
{
httpResponse.EnsureSuccessStatusCode();
}
catch (HttpRequestException ex)
{
return new ExportClientHttpResponse(success: false, deadlineUtc: deadline, response: httpResponse, ex);
}

// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (HttpRequestException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return false;
return new ExportClientHttpResponse(success: false, deadlineUtc: deadline, response: null, exception: ex);
}

return true;
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal sealed class ExportClientGrpcResponse : ExportClientResponse
{
public ExportClientGrpcResponse(
bool success,
DateTime? deadlineUtc,
Exception? exception)
: base(success, deadlineUtc, exception)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using System.Net;
#if NETFRAMEWORK
using System.Net.Http;
#endif
using System.Net.Http.Headers;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal sealed class ExportClientHttpResponse : ExportClientResponse
{
public ExportClientHttpResponse(
bool success,
DateTime? deadlineUtc,
HttpResponseMessage? response,
Exception? exception)
: base(success, deadlineUtc, exception)
{
this.Headers = response?.Headers;
this.StatusCode = response?.StatusCode;
}

public HttpResponseHeaders? Headers { get; }

public HttpStatusCode? StatusCode { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

internal abstract class ExportClientResponse
{
protected ExportClientResponse(bool success, DateTime? deadlineUtc, Exception? exception)
{
this.Success = success;
this.Exception = exception;
this.DeadlineUtc = deadlineUtc;
}

public bool Success { get; }

public Exception? Exception { get; }

public DateTime? DeadlineUtc { get; }
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;

/// <summary>Export client interface.</summary>
Expand All @@ -12,8 +14,8 @@ internal interface IExportClient<in TRequest>
/// </summary>
/// <param name="request">The request to send to the server.</param>
/// <param name="cancellationToken">An optional token for canceling the call.</param>
/// <returns>True if the request has been sent successfully, otherwise false.</returns>
bool SendExportRequest(TRequest request, CancellationToken cancellationToken = default);
/// <returns><see cref="ExportClientResponse"/>.</returns>
ExportClientResponse SendExportRequest(TRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// Method for shutting down the export client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ public OtlpGrpcLogExportClient(OtlpExporterOptions options, OtlpCollector.LogsSe
}

/// <inheritdoc/>
public override bool SendExportRequest(OtlpCollector.ExportLogsServiceRequest request, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(OtlpCollector.ExportLogsServiceRequest request, CancellationToken cancellationToken = default)
{
var deadline = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);

try
{
this.logsClient.Export(request, headers: this.Headers, deadline: deadline, cancellationToken: cancellationToken);

// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (RpcException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return false;
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadline, exception: ex);
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ public OtlpGrpcMetricsExportClient(OtlpExporterOptions options, OtlpCollector.Me
}

/// <inheritdoc/>
public override bool SendExportRequest(OtlpCollector.ExportMetricsServiceRequest request, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(OtlpCollector.ExportMetricsServiceRequest request, CancellationToken cancellationToken = default)
{
var deadline = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);

try
{
this.metricsClient.Export(request, headers: this.Headers, deadline: deadline, cancellationToken: cancellationToken);

// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (RpcException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return false;
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadline, exception: ex);
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ public OtlpGrpcTraceExportClient(OtlpExporterOptions options, OtlpCollector.Trac
}

/// <inheritdoc/>
public override bool SendExportRequest(OtlpCollector.ExportTraceServiceRequest request, CancellationToken cancellationToken = default)
public override ExportClientResponse SendExportRequest(OtlpCollector.ExportTraceServiceRequest request, CancellationToken cancellationToken = default)
{
var deadline = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);

try
{
this.traceClient.Export(request, headers: this.Headers, deadline: deadline, cancellationToken: cancellationToken);

// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (RpcException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);

return false;
return new ExportClientGrpcResponse(success: false, deadlineUtc: deadline, exception: ex);
}

return true;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
{
request = this.otlpLogRecordTransformer.BuildExportRequest(this.ProcessResource, logRecordBatch);

if (!this.exportClient.SendExportRequest(request))
if (!this.exportClient.SendExportRequest(request).Success)
{
return ExportResult.Failure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public override ExportResult Export(in Batch<Metric> metrics)
{
request.AddMetrics(this.ProcessResource, metrics);

if (!this.exportClient.SendExportRequest(request))
if (!this.exportClient.SendExportRequest(request).Success)
{
return ExportResult.Failure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public override ExportResult Export(in Batch<Activity> activityBatch)
{
request.AddBatch(this.sdkLimitOptions, this.ProcessResource, activityBatch);

if (!this.exportClient.SendExportRequest(request))
if (!this.exportClient.SendExportRequest(request).Success)
{
return ExportResult.Failure;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void RunTest(Batch<Activity> batch)
var httpRequest = testHttpHandler.HttpRequestMessage;

// Assert
Assert.True(result);
Assert.True(result.Success);
Assert.NotNull(httpRequest);
Assert.Equal(HttpMethod.Post, httpRequest.Method);
Assert.Equal("http://localhost:4317/", httpRequest.RequestUri.AbsoluteUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,28 @@ internal class TestExportClient<T>(bool throwException = false) : IExportClient<

public bool ThrowException { get; set; } = throwException;

public bool SendExportRequest(T request, CancellationToken cancellationToken = default)
public ExportClientResponse SendExportRequest(T request, CancellationToken cancellationToken = default)
{
if (this.ThrowException)
{
throw new Exception("Exception thrown from SendExportRequest");
}

this.SendExportRequestCalled = true;
return true;
return new TestExportClientResponse(true, null, null);
}

public bool Shutdown(int timeoutMilliseconds)
{
this.ShutdownCalled = true;
return true;
}

private class TestExportClientResponse : ExportClientResponse
{
public TestExportClientResponse(bool success, DateTime? deadline, Exception exception)
: base(success, deadline, exception)
{
}
}
}

0 comments on commit 5849d3a

Please sign in to comment.