diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs index ba9ffaa993d..625e8e3ecc5 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/OpenTelemetryProtocolExporterEventSource.cs @@ -40,6 +40,15 @@ public void TrySubmitRequestException(Exception ex) } } + [NonEvent] + public void RetryStoredRequestException(Exception ex) + { + if (Log.IsEnabled(EventLevel.Error, EventKeywords.All)) + { + this.RetryStoredRequestException(ex.ToInvariantString()); + } + } + [Event(2, Message = "Exporter failed send data to collector to {0} endpoint. Data will not be sent. Exception: {1}", Level = EventLevel.Error)] public void FailedToReachCollector(string rawCollectorUri, string ex) { @@ -94,6 +103,12 @@ public void TrySubmitRequestException(string ex) this.WriteEvent(12, ex); } + [Event(13, Message = "Error while attempting to re-transmit data from disk. Message: '{0}'", Level = EventLevel.Error)] + public void RetryStoredRequestException(string ex) + { + this.WriteEvent(13, ex); + } + void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value) { this.InvalidConfigurationValue(key, value); diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterPersistentStorageTransmissionHandler.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterPersistentStorageTransmissionHandler.cs new file mode 100644 index 00000000000..e8ed3da211d --- /dev/null +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterPersistentStorageTransmissionHandler.cs @@ -0,0 +1,186 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#nullable enable + +using System.Diagnostics; +using Google.Protobuf; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; +using OpenTelemetry.PersistentStorage.Abstractions; +using OpenTelemetry.PersistentStorage.FileSystem; +using OpenTelemetry.Proto.Collector.Logs.V1; +using OpenTelemetry.Proto.Collector.Metrics.V1; +using OpenTelemetry.Proto.Collector.Trace.V1; + +namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; + +internal sealed class OtlpExporterPersistentStorageTransmissionHandler : OtlpExporterTransmissionHandler, IDisposable +{ + private const int RetryIntervalInMilliseconds = 60000; + private readonly ManualResetEvent shutdownEvent = new(false); + private readonly ManualResetEvent dataExportNotification = new(false); + private readonly AutoResetEvent exportEvent = new(false); + private readonly Thread thread; + private readonly PersistentBlobProvider persistentBlobProvider; + private readonly Func requestFactory; + private bool disposed; + + public OtlpExporterPersistentStorageTransmissionHandler(IExportClient exportClient, double timeoutMilliseconds, Func requestFactory, string storagePath) + : this(new FileBlobProvider(storagePath), exportClient, timeoutMilliseconds, requestFactory) + { + } + + internal OtlpExporterPersistentStorageTransmissionHandler(PersistentBlobProvider persistentBlobProvider, IExportClient exportClient, double timeoutMilliseconds, Func requestFactory) + : base(exportClient, timeoutMilliseconds) + { + Debug.Assert(persistentBlobProvider != null, "persistentBlobProvider was null"); + Debug.Assert(requestFactory != null, "requestFactory was null"); + + this.persistentBlobProvider = persistentBlobProvider!; + this.requestFactory = requestFactory!; + + this.thread = new Thread(this.RetryStoredRequests) + { + Name = $"OtlpExporter Persistent Retry Storage - {typeof(TRequest)}", + IsBackground = true, + }; + + this.thread.Start(); + } + + // Used for test. + internal bool InitiateAndWaitForRetryProcess(int timeOutMilliseconds) + { + this.exportEvent.Set(); + + return this.dataExportNotification.WaitOne(timeOutMilliseconds); + } + + protected override bool OnSubmitRequestFailure(TRequest request, ExportClientResponse response) + { + if (RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out _)) + { + byte[]? data = null; + if (request is ExportTraceServiceRequest traceRequest) + { + data = traceRequest.ToByteArray(); + } + else if (request is ExportMetricsServiceRequest metricsRequest) + { + data = metricsRequest.ToByteArray(); + } + else if (request is ExportLogsServiceRequest logsRequest) + { + data = logsRequest.ToByteArray(); + } + else + { + Debug.Fail("Unexpected request type encountered"); + data = null; + } + + if (data != null) + { + return this.persistentBlobProvider.TryCreateBlob(data, out _); + } + } + + return false; + } + + protected override void OnShutdown(int timeoutMilliseconds) + { + var sw = timeoutMilliseconds == Timeout.Infinite ? null : Stopwatch.StartNew(); + + try + { + this.shutdownEvent.Set(); + } + catch (ObjectDisposedException) + { + // Dispose was called before shutdown. + } + + this.thread.Join(timeoutMilliseconds); + + if (sw != null) + { + var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; + + base.OnShutdown((int)Math.Max(timeout, 0)); + } + else + { + base.OnShutdown(timeoutMilliseconds); + } + } + + protected override void Dispose(bool disposing) + { + if (!this.disposed) + { + if (disposing) + { + this.shutdownEvent.Dispose(); + this.exportEvent.Dispose(); + this.dataExportNotification.Dispose(); + (this.persistentBlobProvider as IDisposable)?.Dispose(); + } + + this.disposed = true; + } + } + + private void RetryStoredRequests() + { + var handles = new WaitHandle[] { this.shutdownEvent, this.exportEvent }; + while (true) + { + try + { + var index = WaitHandle.WaitAny(handles, RetryIntervalInMilliseconds); + if (index == 0) + { + // Shutdown signaled + break; + } + + int fileCount = 0; + + // TODO: Run maintenance job. + // Transmit 10 files at a time. + while (fileCount < 10 && !this.shutdownEvent.WaitOne(0)) + { + if (!this.persistentBlobProvider.TryGetBlob(out var blob)) + { + break; + } + + if (blob.TryLease((int)this.TimeoutMilliseconds) && blob.TryRead(out var data)) + { + var deadlineUtc = DateTime.UtcNow.AddMilliseconds(this.TimeoutMilliseconds); + var request = this.requestFactory.Invoke(data); + if (this.TryRetryRequest(request, deadlineUtc, out var response) || !RetryHelper.ShouldRetryRequest(request, response, OtlpRetry.InitialBackoffMilliseconds, out var retryInfo)) + { + blob.TryDelete(); + } + + // TODO: extend the lease period based on the response from server on retryAfter. + } + + fileCount++; + } + + // Set and reset the handle to notify export and wait for next signal. + // This is used for InitiateAndWaitForRetryProcess. + this.dataExportNotification.Set(); + this.dataExportNotification.Reset(); + } + catch (Exception ex) + { + OpenTelemetryProtocolExporterEventSource.Log.RetryStoredRequestException(ex); + return; + } + } + } +} diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterTransmissionHandler.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterTransmissionHandler.cs index 3300c8f6352..2b56e16dd7e 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterTransmissionHandler.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Transmission/OtlpExporterTransmissionHandler.cs @@ -9,7 +9,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; -internal class OtlpExporterTransmissionHandler +internal class OtlpExporterTransmissionHandler : IDisposable { public OtlpExporterTransmissionHandler(IExportClient exportClient, double timeoutMilliseconds) { @@ -80,6 +80,13 @@ public bool Shutdown(int timeoutMilliseconds) return this.ExportClient.Shutdown(timeoutMilliseconds); } + /// + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + /// /// Fired when the transmission handler is shutdown. /// @@ -122,4 +129,16 @@ protected bool TryRetryRequest(TRequest request, DateTime deadlineUtc, out Expor return true; } + + /// + /// Releases the unmanaged resources used by this class and optionally + /// releases the managed resources. + /// + /// + /// to release both managed and unmanaged resources; + /// to release only unmanaged resources. + /// + protected virtual void Dispose(bool disposing) + { + } } diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs index 3029519eac2..e90201341c6 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/MockCollectorIntegrationTests.cs @@ -4,6 +4,7 @@ #if !NETFRAMEWORK using System.Diagnostics; using System.Net; +using Google.Protobuf; using Grpc.Core; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; @@ -13,7 +14,10 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission; using OpenTelemetry.Metrics; +using OpenTelemetry.PersistentStorage.Abstractions; using OpenTelemetry.Proto.Collector.Trace.V1; using OpenTelemetry.Tests; using OpenTelemetry.Trace; @@ -280,6 +284,270 @@ public async Task HttpRetryTests(bool useRetryTransmissionHandler, ExportResult Assert.Equal(expectedResult, exportResult); } + [Theory] + [InlineData(true, ExportResult.Success, HttpStatusCode.ServiceUnavailable)] + [InlineData(true, ExportResult.Success, HttpStatusCode.BadGateway)] + [InlineData(true, ExportResult.Success, HttpStatusCode.GatewayTimeout)] + [InlineData(true, ExportResult.Failure, HttpStatusCode.BadRequest)] + [InlineData(true, ExportResult.Success, HttpStatusCode.TooManyRequests)] + [InlineData(false, ExportResult.Failure, HttpStatusCode.ServiceUnavailable)] + [InlineData(false, ExportResult.Failure, HttpStatusCode.BadGateway)] + [InlineData(false, ExportResult.Failure, HttpStatusCode.GatewayTimeout)] + [InlineData(false, ExportResult.Failure, HttpStatusCode.TooManyRequests)] + [InlineData(false, ExportResult.Failure, HttpStatusCode.BadRequest)] + public async Task HttpPersistentStorageRetryTests(bool usePersistentStorageTransmissionHandler, ExportResult expectedResult, HttpStatusCode initialHttpStatusCode) + { + var testHttpPort = Interlocked.Increment(ref httpPort); + + using var host = await new HostBuilder() + .ConfigureWebHostDefaults(webBuilder => webBuilder + .ConfigureKestrel(options => + { + options.ListenLocalhost(testHttpPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1); + }) + .ConfigureServices(services => + { + services.AddSingleton(new MockCollectorHttpState()); + }) + .ConfigureLogging(loggingBuilder => loggingBuilder.ClearProviders()) + .Configure(app => + { + app.UseRouting(); + + app.UseEndpoints(endpoints => + { + endpoints.MapGet( + "/MockCollector/SetResponseCodes/{responseCodesCsv}", + (MockCollectorHttpState collectorState, string responseCodesCsv) => + { + var codes = responseCodesCsv.Split(",").Select(x => int.Parse(x)).ToArray(); + collectorState.SetStatusCodes(codes); + }); + + endpoints.MapPost("/v1/traces", async ctx => + { + var state = ctx.RequestServices.GetRequiredService(); + ctx.Response.StatusCode = (int)state.NextStatus(); + + await ctx.Response.WriteAsync("Request Received."); + }); + }); + })) + .StartAsync(); + + using var httpClient = new HttpClient() { BaseAddress = new Uri($"http://localhost:{testHttpPort}") }; + + var codes = new[] { initialHttpStatusCode, HttpStatusCode.OK }; + await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}"); + + var endpoint = new Uri($"http://localhost:{testHttpPort}/v1/traces"); + + var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000 }; + + var exportClient = new OtlpHttpTraceExportClient(exporterOptions, new HttpClient()); + + // TODO: update this to configure via experimental environment variable. + OtlpExporterTransmissionHandler transmissionHandler; + MockFileProvider mockProvider = null; + if (usePersistentStorageTransmissionHandler) + { + mockProvider = new MockFileProvider(); + transmissionHandler = new OtlpExporterPersistentStorageTransmissionHandler( + mockProvider, + exportClient, + exporterOptions.TimeoutMilliseconds, + (byte[] data) => + { + var request = new ExportTraceServiceRequest(); + request.MergeFrom(data); + return request; + }); + } + else + { + transmissionHandler = new OtlpExporterTransmissionHandler(exportClient, exporterOptions.TimeoutMilliseconds); + } + + var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), new(), transmissionHandler); + + var activitySourceName = "otel.http.persistent.storage.retry.test"; + using var source = new ActivitySource(activitySourceName); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(activitySourceName) + .Build(); + + using var activity = source.StartActivity("HttpPersistentStorageRetryTest"); + activity.Stop(); + using var batch = new Batch([activity], 1); + + var exportResult = otlpExporter.Export(batch); + + Assert.Equal(expectedResult, exportResult); + + if (usePersistentStorageTransmissionHandler) + { + if (exportResult == ExportResult.Success) + { + Assert.Single(mockProvider.TryGetBlobs()); + + // Force Retry + Assert.True((transmissionHandler as OtlpExporterPersistentStorageTransmissionHandler).InitiateAndWaitForRetryProcess(-1)); + + Assert.False(mockProvider.TryGetBlob(out _)); + } + else + { + Assert.Empty(mockProvider.TryGetBlobs()); + } + } + else + { + Assert.Null(mockProvider); + } + + transmissionHandler.Shutdown(0); + + transmissionHandler.Dispose(); + } + + // For `Grpc.Core.StatusCode.DeadlineExceeded` + // See https://github.com/open-telemetry/opentelemetry-dotnet/issues/5436. + [Theory] + [InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Unavailable)] + [InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Cancelled)] + [InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.Aborted)] + [InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.OutOfRange)] + [InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.DataLoss)] + [InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Internal)] + [InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.InvalidArgument)] + [InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Unimplemented)] + [InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.FailedPrecondition)] + [InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.PermissionDenied)] + [InlineData(true, ExportResult.Failure, Grpc.Core.StatusCode.Unauthenticated)] + [InlineData(true, ExportResult.Success, Grpc.Core.StatusCode.DeadlineExceeded)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Unavailable)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Cancelled)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Aborted)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.OutOfRange)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.DataLoss)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.Internal)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.InvalidArgument)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.FailedPrecondition)] + [InlineData(false, ExportResult.Failure, Grpc.Core.StatusCode.DeadlineExceeded)] + public async Task GrpcPersistentStorageRetryTests(bool usePersistentStorageTransmissionHandler, ExportResult expectedResult, Grpc.Core.StatusCode initialgrpcStatusCode) + { + var testGrpcPort = Interlocked.Increment(ref gRPCPort); + var testHttpPort = Interlocked.Increment(ref httpPort); + + using var host = await new HostBuilder() + .ConfigureWebHostDefaults(webBuilder => webBuilder + .ConfigureKestrel(options => + { + options.ListenLocalhost(testHttpPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1); + options.ListenLocalhost(testGrpcPort, listenOptions => listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2); + }) + .ConfigureServices(services => + { + services.AddSingleton(new MockCollectorState()); + services.AddGrpc(); + }) + .ConfigureLogging(loggingBuilder => loggingBuilder.ClearProviders()) + .Configure(app => + { + app.UseRouting(); + + app.UseEndpoints(endpoints => + { + endpoints.MapGet( + "/MockCollector/SetResponseCodes/{responseCodesCsv}", + (MockCollectorState collectorState, string responseCodesCsv) => + { + var codes = responseCodesCsv.Split(",").Select(x => int.Parse(x)).ToArray(); + collectorState.SetStatusCodes(codes); + }); + + endpoints.MapGrpcService(); + }); + })) + .StartAsync(); + + using var httpClient = new HttpClient() { BaseAddress = new Uri($"http://localhost:{testHttpPort}") }; + + var codes = new[] { initialgrpcStatusCode, Grpc.Core.StatusCode.OK }; + await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}"); + + var endpoint = new Uri($"http://localhost:{testGrpcPort}"); + + var exporterOptions = new OtlpExporterOptions() { Endpoint = endpoint, TimeoutMilliseconds = 20000 }; + + var exportClient = new OtlpGrpcTraceExportClient(exporterOptions); + + // TODO: update this to configure via experimental environment variable. + OtlpExporterTransmissionHandler transmissionHandler; + MockFileProvider mockProvider = null; + if (usePersistentStorageTransmissionHandler) + { + mockProvider = new MockFileProvider(); + transmissionHandler = new OtlpExporterPersistentStorageTransmissionHandler( + mockProvider, + exportClient, + exporterOptions.TimeoutMilliseconds, + (byte[] data) => + { + var request = new ExportTraceServiceRequest(); + request.MergeFrom(data); + return request; + }); + } + else + { + transmissionHandler = new OtlpExporterTransmissionHandler(exportClient, exporterOptions.TimeoutMilliseconds); + } + + var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), new(), transmissionHandler); + + var activitySourceName = "otel.grpc.persistent.storage.retry.test"; + using var source = new ActivitySource(activitySourceName); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(activitySourceName) + .Build(); + + using var activity = source.StartActivity("GrpcPersistentStorageRetryTest"); + activity.Stop(); + using var batch = new Batch([activity], 1); + + var exportResult = otlpExporter.Export(batch); + + Assert.Equal(expectedResult, exportResult); + + if (usePersistentStorageTransmissionHandler) + { + if (exportResult == ExportResult.Success) + { + Assert.Single(mockProvider.TryGetBlobs()); + + // Force Retry + Assert.True((transmissionHandler as OtlpExporterPersistentStorageTransmissionHandler).InitiateAndWaitForRetryProcess(-1)); + + Assert.False(mockProvider.TryGetBlob(out _)); + } + else + { + Assert.Empty(mockProvider.TryGetBlobs()); + } + } + else + { + Assert.Null(mockProvider); + } + + transmissionHandler.Shutdown(0); + + transmissionHandler.Dispose(); + } + private class MockCollectorState { private Grpc.Core.StatusCode[] statusCodes = { }; @@ -338,5 +606,73 @@ public override Task Export(ExportTraceServiceReques return Task.FromResult(new ExportTraceServiceResponse()); } } + + private class MockFileProvider : PersistentBlobProvider + { + private readonly List mockStorage = new(); + + public IEnumerable TryGetBlobs() => this.mockStorage.AsEnumerable(); + + protected override IEnumerable OnGetBlobs() + { + return this.mockStorage.AsEnumerable(); + } + + protected override bool OnTryCreateBlob(byte[] buffer, int leasePeriodMilliseconds, out PersistentBlob blob) + { + blob = new MockFileBlob(this.mockStorage); + return blob.TryWrite(buffer); + } + + protected override bool OnTryCreateBlob(byte[] buffer, out PersistentBlob blob) + { + blob = new MockFileBlob(this.mockStorage); + return blob.TryWrite(buffer); + } + + protected override bool OnTryGetBlob(out PersistentBlob blob) + { + blob = this.GetBlobs().FirstOrDefault(); + + return blob != null; + } + } + + private class MockFileBlob : PersistentBlob + { + private readonly List mockStorage; + + private byte[] buffer = Array.Empty(); + + public MockFileBlob(List mockStorage) + { + this.mockStorage = mockStorage; + } + + protected override bool OnTryRead(out byte[] buffer) + { + buffer = this.buffer; + + return true; + } + + protected override bool OnTryWrite(byte[] buffer, int leasePeriodMilliseconds = 0) + { + this.buffer = buffer; + this.mockStorage.Add(this); + + return true; + } + + protected override bool OnTryLease(int leasePeriodMilliseconds) + { + return true; + } + + protected override bool OnTryDelete() + { + return this.mockStorage.Remove(this); + } + } } #endif