Skip to content

Commit

Permalink
[Otlp] Add persistent storage transmission handler (#5460)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishweshbankwar authored Mar 27, 2024
1 parent 1a607c8 commit af57de2
Show file tree
Hide file tree
Showing 4 changed files with 557 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ public void TrySubmitRequestException(Exception ex)
}
}

[NonEvent]
public void RetryStoredRequestException(Exception ex)
{
if (Log.IsEnabled(EventLevel.Error, EventKeywords.All))
{
this.RetryStoredRequestException(ex.ToInvariantString());
}
}

[Event(2, Message = "Exporter failed send data to collector to {0} endpoint. Data will not be sent. Exception: {1}", Level = EventLevel.Error)]
public void FailedToReachCollector(string rawCollectorUri, string ex)
{
Expand Down Expand Up @@ -94,6 +103,12 @@ public void TrySubmitRequestException(string ex)
this.WriteEvent(12, ex);
}

[Event(13, Message = "Error while attempting to re-transmit data from disk. Message: '{0}'", Level = EventLevel.Error)]
public void RetryStoredRequestException(string ex)
{
this.WriteEvent(13, ex);
}

void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value)
{
this.InvalidConfigurationValue(key, value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#nullable enable

using System.Diagnostics;
using Google.Protobuf;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.PersistentStorage.Abstractions;
using OpenTelemetry.PersistentStorage.FileSystem;
using OpenTelemetry.Proto.Collector.Logs.V1;
using OpenTelemetry.Proto.Collector.Metrics.V1;
using OpenTelemetry.Proto.Collector.Trace.V1;

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal sealed class OtlpExporterPersistentStorageTransmissionHandler<TRequest> : OtlpExporterTransmissionHandler<TRequest>, IDisposable
{
private const int RetryIntervalInMilliseconds = 60000;
private readonly ManualResetEvent shutdownEvent = new(false);
private readonly ManualResetEvent dataExportNotification = new(false);
private readonly AutoResetEvent exportEvent = new(false);
private readonly Thread thread;
private readonly PersistentBlobProvider persistentBlobProvider;
private readonly Func<byte[], TRequest> requestFactory;
private bool disposed;

public OtlpExporterPersistentStorageTransmissionHandler(IExportClient<TRequest> exportClient, double timeoutMilliseconds, Func<byte[], TRequest> requestFactory, string storagePath)
: this(new FileBlobProvider(storagePath), exportClient, timeoutMilliseconds, requestFactory)
{
}

internal OtlpExporterPersistentStorageTransmissionHandler(PersistentBlobProvider persistentBlobProvider, IExportClient<TRequest> exportClient, double timeoutMilliseconds, Func<byte[], TRequest> requestFactory)
: base(exportClient, timeoutMilliseconds)
{
Debug.Assert(persistentBlobProvider != null, "persistentBlobProvider was null");
Debug.Assert(requestFactory != null, "requestFactory was null");

this.persistentBlobProvider = persistentBlobProvider!;
this.requestFactory = requestFactory!;

this.thread = new Thread(this.RetryStoredRequests)
{
Name = $"OtlpExporter Persistent Retry Storage - {typeof(TRequest)}",
IsBackground = true,
};

this.thread.Start();
}

// Used for test.
internal bool InitiateAndWaitForRetryProcess(int timeOutMilliseconds)
{
this.exportEvent.Set();

return this.dataExportNotification.WaitOne(timeOutMilliseconds);
}

protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response)
{
if (RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out _))
{
byte[]? data = null;
if (request is ExportTraceServiceRequest traceRequest)
{
data = traceRequest.ToByteArray();
}
else if (request is ExportMetricsServiceRequest metricsRequest)
{
data = metricsRequest.ToByteArray();
}
else if (request is ExportLogsServiceRequest logsRequest)
{
data = logsRequest.ToByteArray();
}
else
{
Debug.Fail("Unexpected request type encountered");
data = null;
}

if (data != null)
{
return this.persistentBlobProvider.TryCreateBlob(data, out _);
}
}

return false;
}

protected override void OnShutdown(int timeoutMilliseconds)
{
var sw = timeoutMilliseconds == Timeout.Infinite ? null : Stopwatch.StartNew();

try
{
this.shutdownEvent.Set();
}
catch (ObjectDisposedException)
{
// Dispose was called before shutdown.
}

this.thread.Join(timeoutMilliseconds);

if (sw != null)
{
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;

base.OnShutdown((int)Math.Max(timeout, 0));
}
else
{
base.OnShutdown(timeoutMilliseconds);
}
}

protected override void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.shutdownEvent.Dispose();
this.exportEvent.Dispose();
this.dataExportNotification.Dispose();
(this.persistentBlobProvider as IDisposable)?.Dispose();
}

this.disposed = true;
}
}

private void RetryStoredRequests()
{
var handles = new WaitHandle[] { this.shutdownEvent, this.exportEvent };
while (true)
{
try
{
var index = WaitHandle.WaitAny(handles, RetryIntervalInMilliseconds);
if (index == 0)
{
// Shutdown signaled
break;
}

int fileCount = 0;

// TODO: Run maintenance job.
// Transmit 10 files at a time.
while (fileCount < 10 && !this.shutdownEvent.WaitOne(0))
{
if (!this.persistentBlobProvider.TryGetBlob(out var blob))
{
break;
}

if (blob.TryLease((int)this.TimeoutMilliseconds) && blob.TryRead(out var data))
{
var deadlineUtc = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds);
var request = this.requestFactory.Invoke(data);
if (this.TryRetryRequest(request, deadlineUtc, out var response) || !RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out var retryInfo))
{
blob.TryDelete();
}

// TODO: extend the lease period based on the response from server on retryAfter.
}

fileCount++;
}

// Set and reset the handle to notify export and wait for next signal.
// This is used for InitiateAndWaitForRetryProcess.
this.dataExportNotification.Set();
this.dataExportNotification.Reset();
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.RetryStoredRequestException(ex);
return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;

internal class OtlpExporterTransmissionHandler<TRequest>
internal class OtlpExporterTransmissionHandler<TRequest> : IDisposable
{
public OtlpExporterTransmissionHandler(IExportClient<TRequest> exportClient, double timeoutMilliseconds)
{
Expand Down Expand Up @@ -80,6 +80,13 @@ public bool Shutdown(int timeoutMilliseconds)
return this.ExportClient.Shutdown(timeoutMilliseconds);
}

/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Fired when the transmission handler is shutdown.
/// </summary>
Expand Down Expand Up @@ -122,4 +129,16 @@ protected bool TryRetryRequest(TRequest request, DateTime deadlineUtc, out Expor

return true;
}

/// <summary>
/// Releases the unmanaged resources used by this class and optionally
/// releases the managed resources.
/// </summary>
/// <param name="disposing">
/// <see langword="true"/> to release both managed and unmanaged resources;
/// <see langword="false"/> to release only unmanaged resources.
/// </param>
protected virtual void Dispose(bool disposing)
{
}
}
Loading

0 comments on commit af57de2

Please sign in to comment.