-
Notifications
You must be signed in to change notification settings - Fork 782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Otlp] Add persistent storage transmission handler #5460
Merged
utpilla
merged 21 commits into
open-telemetry:main
from
vishweshbankwar:vibankwa/add-persistent-storage-transmission-handler
Mar 27, 2024
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
199107e
Add persistent storage transmission handler
vishweshbankwar ca19618
refactor + forceflush
vishweshbankwar b027f4a
Merge branch 'main' into vibankwa/add-persistent-storage-transmission…
vishweshbankwar c4416dd
refactor
vishweshbankwar c974427
Merge branch 'main' into vibankwa/add-persistent-storage-transmission…
vishweshbankwar f95cc8d
error log + todo
vishweshbankwar 8a3d22b
code comment
vishweshbankwar e2dc333
refactor
vishweshbankwar e64a9fe
clean up
vishweshbankwar 6930c21
review
vishweshbankwar 1fb0d4c
Fix formatting
vishweshbankwar cb396f0
Update src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementatio…
vishweshbankwar 175bb7c
Update src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementatio…
vishweshbankwar 4072729
feedback
vishweshbankwar 1da01b9
Merge branch 'main' into vibankwa/add-persistent-storage-transmission…
vishweshbankwar 45d685f
fix format
vishweshbankwar 764dad4
review
vishweshbankwar de0e028
retry period
vishweshbankwar f83fa77
revert and add todo
vishweshbankwar 4c198f6
rmv using
vishweshbankwar 40204ab
Merge branch 'main' into vibankwa/add-persistent-storage-transmission…
vishweshbankwar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
186 changes: 186 additions & 0 deletions
186
...yProtocol/Implementation/Transmission/OtlpExporterPersistentStorageTransmissionHandler.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
#nullable enable | ||
|
||
using System.Diagnostics; | ||
using Google.Protobuf; | ||
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; | ||
using OpenTelemetry.PersistentStorage.Abstractions; | ||
using OpenTelemetry.PersistentStorage.FileSystem; | ||
using OpenTelemetry.Proto.Collector.Logs.V1; | ||
using OpenTelemetry.Proto.Collector.Metrics.V1; | ||
using OpenTelemetry.Proto.Collector.Trace.V1; | ||
|
||
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; | ||
|
||
internal sealed class OtlpExporterPersistentStorageTransmissionHandler<TRequest> : OtlpExporterTransmissionHandler<TRequest>, IDisposable | ||
{ | ||
private const int RetryIntervalInMilliseconds = 60000; | ||
private readonly ManualResetEvent shutdownEvent = new(false); | ||
private readonly ManualResetEvent dataExportNotification = new(false); | ||
private readonly AutoResetEvent exportEvent = new(false); | ||
private readonly Thread thread; | ||
private readonly PersistentBlobProvider persistentBlobProvider; | ||
private readonly Func<byte[], TRequest> requestFactory; | ||
private bool disposed; | ||
|
||
public OtlpExporterPersistentStorageTransmissionHandler(IExportClient<TRequest> exportClient, double timeoutMilliseconds, Func<byte[], TRequest> requestFactory, string storagePath) | ||
: this(new FileBlobProvider(storagePath), exportClient, timeoutMilliseconds, requestFactory) | ||
{ | ||
} | ||
|
||
internal OtlpExporterPersistentStorageTransmissionHandler(PersistentBlobProvider persistentBlobProvider, IExportClient<TRequest> exportClient, double timeoutMilliseconds, Func<byte[], TRequest> requestFactory) | ||
: base(exportClient, timeoutMilliseconds) | ||
{ | ||
Debug.Assert(persistentBlobProvider != null, "persistentBlobProvider was null"); | ||
Debug.Assert(requestFactory != null, "requestFactory was null"); | ||
|
||
this.persistentBlobProvider = persistentBlobProvider!; | ||
this.requestFactory = requestFactory!; | ||
|
||
this.thread = new Thread(this.RetryStoredRequests) | ||
vishweshbankwar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
Name = $"OtlpExporter Persistent Retry Storage - {typeof(TRequest)}", | ||
IsBackground = true, | ||
}; | ||
|
||
this.thread.Start(); | ||
} | ||
|
||
// Used for test. | ||
internal bool InitiateAndWaitForRetryProcess(int timeOutMilliseconds) | ||
{ | ||
this.exportEvent.Set(); | ||
|
||
return this.dataExportNotification.WaitOne(timeOutMilliseconds); | ||
} | ||
|
||
protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response) | ||
{ | ||
if (RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out _)) | ||
{ | ||
byte[]? data = null; | ||
if (request is ExportTraceServiceRequest traceRequest) | ||
{ | ||
data = traceRequest.ToByteArray(); | ||
} | ||
else if (request is ExportMetricsServiceRequest metricsRequest) | ||
{ | ||
data = metricsRequest.ToByteArray(); | ||
} | ||
else if (request is ExportLogsServiceRequest logsRequest) | ||
{ | ||
data = logsRequest.ToByteArray(); | ||
} | ||
vishweshbankwar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else | ||
{ | ||
Debug.Fail("Unexpected request type encountered"); | ||
data = null; | ||
} | ||
|
||
if (data != null) | ||
{ | ||
return this.persistentBlobProvider.TryCreateBlob(data, out _); | ||
} | ||
} | ||
|
||
return false; | ||
} | ||
|
||
protected override void OnShutdown(int timeoutMilliseconds) | ||
{ | ||
var sw = timeoutMilliseconds == Timeout.Infinite ? null : Stopwatch.StartNew(); | ||
|
||
try | ||
{ | ||
this.shutdownEvent.Set(); | ||
} | ||
catch (ObjectDisposedException) | ||
{ | ||
// Dispose was called before shutdown. | ||
} | ||
|
||
this.thread.Join(timeoutMilliseconds); | ||
|
||
if (sw != null) | ||
{ | ||
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; | ||
|
||
base.OnShutdown((int)Math.Max(timeout, 0)); | ||
} | ||
else | ||
{ | ||
base.OnShutdown(timeoutMilliseconds); | ||
} | ||
} | ||
|
||
protected override void Dispose(bool disposing) | ||
utpilla marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
if (!this.disposed) | ||
{ | ||
if (disposing) | ||
{ | ||
this.shutdownEvent.Dispose(); | ||
this.exportEvent.Dispose(); | ||
this.dataExportNotification.Dispose(); | ||
(this.persistentBlobProvider as IDisposable)?.Dispose(); | ||
} | ||
|
||
this.disposed = true; | ||
} | ||
} | ||
|
||
private void RetryStoredRequests() | ||
{ | ||
var handles = new WaitHandle[] { this.shutdownEvent, this.exportEvent }; | ||
while (true) | ||
{ | ||
try | ||
{ | ||
var index = WaitHandle.WaitAny(handles, RetryIntervalInMilliseconds); | ||
if (index == 0) | ||
{ | ||
// Shutdown signaled | ||
break; | ||
} | ||
|
||
int fileCount = 0; | ||
|
||
// TODO: Run maintenance job. | ||
// Transmit 10 files at a time. | ||
while (fileCount < 10 && !this.shutdownEvent.WaitOne(0)) | ||
{ | ||
if (!this.persistentBlobProvider.TryGetBlob(out var blob)) | ||
{ | ||
break; | ||
} | ||
|
||
if (blob.TryLease((int)this.TimeoutMilliseconds) && blob.TryRead(out var data)) | ||
{ | ||
var deadlineUtc = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds); | ||
var request = this.requestFactory.Invoke(data); | ||
if (this.TryRetryRequest(request, deadlineUtc, out var response) || !RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out var retryInfo)) | ||
{ | ||
blob.TryDelete(); | ||
} | ||
|
||
// TODO: extend the lease period based on the response from server on retryAfter. | ||
} | ||
|
||
fileCount++; | ||
} | ||
|
||
// Set and reset the handle to notify export and wait for next signal. | ||
// This is used for InitiateAndWaitForRetryProcess. | ||
this.dataExportNotification.Set(); | ||
this.dataExportNotification.Reset(); | ||
} | ||
catch (Exception ex) | ||
{ | ||
OpenTelemetryProtocolExporterEventSource.Log.RetryStoredRequestException(ex); | ||
return; | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for this PR but it would be nice to also log the telemetry signal that was being sent. We should do it for other methods in this class too.