diff --git a/src/ReverseProxy/Abstractions/Telemetry/IOperationContext.cs b/src/ReverseProxy/Abstractions/Telemetry/IOperationContext.cs deleted file mode 100644 index e46bf2429..000000000 --- a/src/ReverseProxy/Abstractions/Telemetry/IOperationContext.cs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace Microsoft.ReverseProxy.Abstractions.Telemetry -{ - /// - /// Provides contextual information for an ongoing operation. - /// Operation contexts support nesting, and the current context - /// can be obtained from . - /// - public interface IOperationContext - { - /// - /// Sets a property on the current operation context. - /// - /// Property key. - /// Property value. - void SetProperty(string key, string value); - } -} diff --git a/src/ReverseProxy/Abstractions/Telemetry/IOperationLogger.cs b/src/ReverseProxy/Abstractions/Telemetry/IOperationLogger.cs deleted file mode 100644 index e757b9314..000000000 --- a/src/ReverseProxy/Abstractions/Telemetry/IOperationLogger.cs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System; -using System.Threading.Tasks; - -namespace Microsoft.ReverseProxy.Abstractions.Telemetry -{ - /// - /// Provides methods to log telemetry for the execution of chunks of - /// synchronous or asynchronous operations. - /// - public interface IOperationLogger - { - /// - /// Gets the context for the current operation. - /// - IOperationContext Context { get; } - - /// - /// Tracks the execution of the provided - /// as an operation named . - /// - /// Name of the operation. - /// Action to execute. - void Execute(string operationName, Action action); - - /// - /// Tracks the execution of the provided - /// as an operation named . - /// - /// Name of the operation. - /// Action to execute. - TResult Execute(string operationName, Func func); - - /// - /// Tracks the asynchronous execution of the provided - /// as an operation named . - /// - /// Name of the operation. - /// Action to execute. - Task ExecuteAsync(string operationName, Func func); - - /// - /// Tracks the asynchronous execution of the provided - /// as an operation named . - /// - /// Name of the operation. - /// Action to execute. - Task ExecuteAsync(string operationName, Func> func); - } -} diff --git a/src/ReverseProxy/Abstractions/Time/IMonotonicTimer.cs b/src/ReverseProxy/Abstractions/Time/IMonotonicTimer.cs deleted file mode 100644 index 6b5cce0ed..000000000 --- a/src/ReverseProxy/Abstractions/Time/IMonotonicTimer.cs +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Microsoft.ReverseProxy.Abstractions.Time -{ - /// - /// Abstraction for measuring time that is monotonically increasing. - /// - public interface IMonotonicTimer - { - /// - /// Gets the current time (elapsed, relative to the creation of this timer). - /// - TimeSpan CurrentTime { get; } - - /// - /// Creates a task that completes when CurrentTime >= expiryTime. - /// - /// Time at which the returned task will be completed. - /// Cancelation token for the created task. - /// A task which completes at . - Task DelayUntil(TimeSpan expiryTime, CancellationToken cancelationToken); - } -} diff --git a/src/ReverseProxy/Configuration/DependencyInjection/BuilderExtensions/IReverseProxyBuilderExtensions.cs b/src/ReverseProxy/Configuration/DependencyInjection/BuilderExtensions/IReverseProxyBuilderExtensions.cs index 7ad3b643e..18086493a 100644 --- a/src/ReverseProxy/Configuration/DependencyInjection/BuilderExtensions/IReverseProxyBuilderExtensions.cs +++ b/src/ReverseProxy/Configuration/DependencyInjection/BuilderExtensions/IReverseProxyBuilderExtensions.cs @@ -4,8 +4,6 @@ using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; -using Microsoft.ReverseProxy.Abstractions.Telemetry; -using Microsoft.ReverseProxy.Abstractions.Time; using Microsoft.ReverseProxy.RuntimeModel; using Microsoft.ReverseProxy.Service; using Microsoft.ReverseProxy.Service.Config; @@ -15,7 +13,6 @@ using Microsoft.ReverseProxy.Service.Proxy.Infrastructure; using Microsoft.ReverseProxy.Service.Routing; using Microsoft.ReverseProxy.Service.SessionAffinity; -using Microsoft.ReverseProxy.Telemetry; using Microsoft.ReverseProxy.Utilities; using System.Linq; @@ -23,13 +20,6 @@ namespace Microsoft.ReverseProxy.Configuration.DependencyInjection { internal static class IReverseProxyBuilderExtensions { - public static IReverseProxyBuilder AddTelemetryShims(this IReverseProxyBuilder builder) - { - // NOTE: Consumers of ReverseProxy are expected to replace these with their own classes - builder.Services.TryAddSingleton(typeof(IOperationLogger<>), typeof(TextOperationLogger<>)); - return builder; - } - public static IReverseProxyBuilder AddConfigBuilder(this IReverseProxyBuilder builder) { builder.Services.TryAddSingleton(); @@ -43,7 +33,6 @@ public static IReverseProxyBuilder AddRuntimeStateManagers(this IReverseProxyBui builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton(); - builder.Services.TryAddSingleton(); builder.Services.TryAddSingleton(); return builder; } @@ -64,13 +53,6 @@ public static IReverseProxyBuilder AddProxy(this IReverseProxyBuilder builder) return builder; } - public static IReverseProxyBuilder AddBackgroundWorkers(this IReverseProxyBuilder builder) - { - builder.Services.TryAddSingleton(); - - return builder; - } - public static IReverseProxyBuilder AddSessionAffinityProvider(this IReverseProxyBuilder builder) { builder.Services.TryAddEnumerable(new[] { diff --git a/src/ReverseProxy/Configuration/DependencyInjection/ReverseProxyServiceCollectionExtensions.cs b/src/ReverseProxy/Configuration/DependencyInjection/ReverseProxyServiceCollectionExtensions.cs index d8f626cb5..887dbea3a 100644 --- a/src/ReverseProxy/Configuration/DependencyInjection/ReverseProxyServiceCollectionExtensions.cs +++ b/src/ReverseProxy/Configuration/DependencyInjection/ReverseProxyServiceCollectionExtensions.cs @@ -9,6 +9,7 @@ using Microsoft.ReverseProxy.Configuration.DependencyInjection; using Microsoft.ReverseProxy.Service; using Microsoft.ReverseProxy.Service.Proxy; +using Microsoft.ReverseProxy.Utilities; namespace Microsoft.Extensions.DependencyInjection { @@ -23,6 +24,7 @@ public static class ReverseProxyServiceCollectionExtensions /// public static IServiceCollection AddHttpProxy(this IServiceCollection services) { + services.TryAddSingleton(); services.TryAddSingleton(); return services; } @@ -34,15 +36,13 @@ public static IReverseProxyBuilder AddReverseProxy(this IServiceCollection servi { var builder = new ReverseProxyBuilder(services); builder - .AddTelemetryShims() .AddConfigBuilder() .AddRuntimeStateManagers() .AddConfigManager() .AddSessionAffinityProvider() .AddActiveHealthChecks() .AddPassiveHealthCheck() - .AddProxy() - .AddBackgroundWorkers(); + .AddProxy(); services.AddDataProtection(); services.AddAuthorization(); diff --git a/src/ReverseProxy/Microsoft.ReverseProxy.csproj b/src/ReverseProxy/Microsoft.ReverseProxy.csproj index dca7e8659..cd7410f30 100644 --- a/src/ReverseProxy/Microsoft.ReverseProxy.csproj +++ b/src/ReverseProxy/Microsoft.ReverseProxy.csproj @@ -5,6 +5,7 @@ net5.0;netcoreapp3.1 Library Microsoft.ReverseProxy + true diff --git a/src/ReverseProxy/Middleware/ProxyInvokerMiddleware.cs b/src/ReverseProxy/Middleware/ProxyInvokerMiddleware.cs index 9ffefcbd9..0831084da 100644 --- a/src/ReverseProxy/Middleware/ProxyInvokerMiddleware.cs +++ b/src/ReverseProxy/Middleware/ProxyInvokerMiddleware.cs @@ -2,13 +2,11 @@ // Licensed under the MIT License. using System; -using System.Text; -using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; -using Microsoft.ReverseProxy.Abstractions.Telemetry; using Microsoft.ReverseProxy.Service.Proxy; +using Microsoft.ReverseProxy.Telemetry; using Microsoft.ReverseProxy.Utilities; namespace Microsoft.ReverseProxy.Middleware @@ -21,19 +19,16 @@ internal class ProxyInvokerMiddleware private readonly IRandomFactory _randomFactory; private readonly RequestDelegate _next; // Unused, this middleware is always terminal private readonly ILogger _logger; - private readonly IOperationLogger _operationLogger; private readonly IHttpProxy _httpProxy; public ProxyInvokerMiddleware( RequestDelegate next, ILogger logger, - IOperationLogger operationLogger, IHttpProxy httpProxy, IRandomFactory randomFactory) { _next = next ?? throw new ArgumentNullException(nameof(next)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _operationLogger = operationLogger ?? throw new ArgumentNullException(nameof(operationLogger)); _httpProxy = httpProxy ?? throw new ArgumentNullException(nameof(httpProxy)); _randomFactory = randomFactory ?? throw new ArgumentNullException(nameof(randomFactory)); } @@ -85,9 +80,9 @@ public async Task Invoke(HttpContext context) cluster.ConcurrencyCounter.Increment(); destination.ConcurrencyCounter.Increment(); - await _operationLogger.ExecuteAsync( - "ReverseProxy.Proxy", - () => _httpProxy.ProxyAsync(context, destinationConfig.Address, reverseProxyFeature.ClusterConfig.HttpClient, proxyOptions)); + ProxyTelemetry.Log.ProxyInvoke(cluster.ClusterId, routeConfig.Route.RouteId, destination.DestinationId); + + await _httpProxy.ProxyAsync(context, destinationConfig.Address, reverseProxyFeature.ClusterConfig.HttpClient, proxyOptions); } finally { diff --git a/src/ReverseProxy/Service/HealthChecks/ActiveHealthCheckMonitor.cs b/src/ReverseProxy/Service/HealthChecks/ActiveHealthCheckMonitor.cs index 4b8fc54ed..1e5560c86 100644 --- a/src/ReverseProxy/Service/HealthChecks/ActiveHealthCheckMonitor.cs +++ b/src/ReverseProxy/Service/HealthChecks/ActiveHealthCheckMonitor.cs @@ -4,13 +4,11 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Microsoft.ReverseProxy.Abstractions; -using Microsoft.ReverseProxy.Abstractions.Telemetry; using Microsoft.ReverseProxy.RuntimeModel; using Microsoft.ReverseProxy.Service.Management; using Microsoft.ReverseProxy.Utilities; using System; using System.Collections.Generic; -using System.Net; using System.Net.Http; using System.Runtime.ExceptionServices; using System.Threading; diff --git a/src/ReverseProxy/Service/Proxy/HttpProxy.cs b/src/ReverseProxy/Service/Proxy/HttpProxy.cs index 372b8821b..194302be7 100644 --- a/src/ReverseProxy/Service/Proxy/HttpProxy.cs +++ b/src/ReverseProxy/Service/Proxy/HttpProxy.cs @@ -18,6 +18,8 @@ using Microsoft.Extensions.Primitives; using Microsoft.Net.Http.Headers; using Microsoft.ReverseProxy.Service.RuntimeModel.Transforms; +using Microsoft.ReverseProxy.Telemetry; +using Microsoft.ReverseProxy.Utilities; namespace Microsoft.ReverseProxy.Service.Proxy { @@ -32,10 +34,12 @@ internal class HttpProxy : IHttpProxy }; private readonly ILogger _logger; + private readonly IClock _clock; - public HttpProxy(ILogger logger) + public HttpProxy(ILogger logger, IClock clock) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _clock = clock ?? throw new ArgumentNullException(nameof(_clock)); } /// @@ -93,142 +97,153 @@ public async Task ProxyAsync( _ = destinationPrefix ?? throw new ArgumentNullException(nameof(destinationPrefix)); _ = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); _ = proxyOptions ?? throw new ArgumentNullException(nameof(proxyOptions)); - var requestAborted = context.RequestAborted; - // :: Step 1: Create outgoing HttpRequestMessage - var upgradeFeature = context.Features.Get(); - var isUpgradeRequest = (upgradeFeature?.IsUpgradableRequest ?? false) - // Mitigate https://github.com/microsoft/reverse-proxy/issues/255, IIS considers all requests upgradeable. - && string.Equals("WebSocket", context.Request.Headers[HeaderNames.Upgrade], StringComparison.OrdinalIgnoreCase); + ProxyTelemetry.Log.ProxyStart(destinationPrefix); + try + { + var requestAborted = context.RequestAborted; - var destinationRequest = CreateRequestMessage(context, destinationPrefix, isUpgradeRequest, proxyOptions); + // :: Step 1: Create outgoing HttpRequestMessage + var upgradeFeature = context.Features.Get(); + var isUpgradeRequest = (upgradeFeature?.IsUpgradableRequest ?? false) + // Mitigate https://github.com/microsoft/reverse-proxy/issues/255, IIS considers all requests upgradeable. + && string.Equals("WebSocket", context.Request.Headers[HeaderNames.Upgrade], StringComparison.OrdinalIgnoreCase); - var isClientHttp2 = ProtocolHelper.IsHttp2(context.Request.Protocol); + var destinationRequest = CreateRequestMessage(context, destinationPrefix, isUpgradeRequest, proxyOptions); - // NOTE: We heuristically assume gRPC-looking requests may require streaming semantics. - // See https://github.com/microsoft/reverse-proxy/issues/118 for design discussion. - var isStreamingRequest = isClientHttp2 && ProtocolHelper.IsGrpcContentType(context.Request.ContentType); + var isClientHttp2 = ProtocolHelper.IsHttp2(context.Request.Protocol); - // :: Step 2: Setup copy of request body (background) Client --► Proxy --► Destination - // Note that we must do this before step (3) because step (3) may also add headers to the HttpContent that we set up here. - var requestContent = SetupRequestBodyCopy(context.Request, destinationRequest, isStreamingRequest, requestAborted); + // NOTE: We heuristically assume gRPC-looking requests may require streaming semantics. + // See https://github.com/microsoft/reverse-proxy/issues/118 for design discussion. + var isStreamingRequest = isClientHttp2 && ProtocolHelper.IsGrpcContentType(context.Request.ContentType); - // :: Step 3: Copy request headers Client --► Proxy --► Destination - CopyRequestHeaders(context, destinationRequest, proxyOptions.Transforms); + // :: Step 2: Setup copy of request body (background) Client --► Proxy --► Destination + // Note that we must do this before step (3) because step (3) may also add headers to the HttpContent that we set up here. + var requestContent = SetupRequestBodyCopy(context.Request, destinationRequest, isStreamingRequest, requestAborted); - // :: Step 4: Send the outgoing request using HttpClient - HttpResponseMessage destinationResponse; - var requestTimeoutSource = CancellationTokenSource.CreateLinkedTokenSource(requestAborted); - requestTimeoutSource.CancelAfter(proxyOptions.RequestTimeout); - var requestTimeoutToken = requestTimeoutSource.Token; - try - { - destinationResponse = await httpClient.SendAsync(destinationRequest, requestTimeoutToken); - } - catch (OperationCanceledException canceledException) - { - if (!requestAborted.IsCancellationRequested && requestTimeoutToken.IsCancellationRequested) + // :: Step 3: Copy request headers Client --► Proxy --► Destination + CopyRequestHeaders(context, destinationRequest, proxyOptions.Transforms); + + // :: Step 4: Send the outgoing request using HttpClient + HttpResponseMessage destinationResponse; + var requestTimeoutSource = CancellationTokenSource.CreateLinkedTokenSource(requestAborted); + requestTimeoutSource.CancelAfter(proxyOptions.RequestTimeout); + var requestTimeoutToken = requestTimeoutSource.Token; + try { - ReportProxyError(context, ProxyError.RequestTimedOut, canceledException); - context.Response.StatusCode = StatusCodes.Status504GatewayTimeout; - return; + ProxyTelemetry.Log.ProxyStage(ProxyStage.SendAsyncStart); + destinationResponse = await httpClient.SendAsync(destinationRequest, requestTimeoutToken); + ProxyTelemetry.Log.ProxyStage(ProxyStage.SendAsyncStop); } + catch (OperationCanceledException canceledException) + { + if (!requestAborted.IsCancellationRequested && requestTimeoutToken.IsCancellationRequested) + { + ReportProxyError(context, ProxyError.RequestTimedOut, canceledException); + context.Response.StatusCode = StatusCodes.Status504GatewayTimeout; + return; + } - ReportProxyError(context, ProxyError.RequestCanceled, canceledException); - context.Response.StatusCode = StatusCodes.Status502BadGateway; - return; - } - catch (Exception requestException) - { - await HandleRequestFailureAsync(context, requestContent, requestException); - return; - } - finally - { - requestTimeoutSource.Dispose(); - } - - // Detect connection downgrade, which may be problematic for e.g. gRPC. - if (isClientHttp2 && destinationResponse.Version.Major != 2) - { - // TODO: Do something on connection downgrade... - Log.HttpDowngradeDetected(_logger); - } + ReportProxyError(context, ProxyError.RequestCanceled, canceledException); + context.Response.StatusCode = StatusCodes.Status502BadGateway; + return; + } + catch (Exception requestException) + { + await HandleRequestFailureAsync(context, requestContent, requestException); + return; + } + finally + { + requestTimeoutSource.Dispose(); + } - // Assert that, if we are proxying content to the destination, it must have started by now - // (since HttpClient.SendAsync has already completed asynchronously). - // If this check fails, there is a coding defect which would otherwise - // cause us to wait forever in step 9, so fail fast here. - if (requestContent != null && !requestContent.Started) - { - // TODO: HttpClient might not need to read the body in some scenarios, such as an early auth failure with Expect: 100-continue. - throw new InvalidOperationException("Proxying the Client request body to the Destination server hasn't started. This is a coding defect."); - } + // Detect connection downgrade, which may be problematic for e.g. gRPC. + if (isClientHttp2 && destinationResponse.Version.Major != 2) + { + // TODO: Do something on connection downgrade... + Log.HttpDowngradeDetected(_logger); + } - // :: Step 5: Copy response status line Client ◄-- Proxy ◄-- Destination - // :: Step 6: Copy response headers Client ◄-- Proxy ◄-- Destination - CopyResponseStatusAndHeaders(destinationResponse, context, proxyOptions.Transforms.ResponseHeaderTransforms); + // Assert that, if we are proxying content to the destination, it must have started by now + // (since HttpClient.SendAsync has already completed asynchronously). + // If this check fails, there is a coding defect which would otherwise + // cause us to wait forever in step 9, so fail fast here. + if (requestContent != null && !requestContent.Started) + { + // TODO: HttpClient might not need to read the body in some scenarios, such as an early auth failure with Expect: 100-continue. + throw new InvalidOperationException("Proxying the Client request body to the Destination server hasn't started. This is a coding defect."); + } - // :: Step 7-A: Check for a 101 upgrade response, this takes care of WebSockets as well as any other upgradeable protocol. - if (destinationResponse.StatusCode == HttpStatusCode.SwitchingProtocols) - { - await HandleUpgradedResponse(context, upgradeFeature, destinationResponse, requestAborted); - return; - } + // :: Step 5: Copy response status line Client ◄-- Proxy ◄-- Destination + // :: Step 6: Copy response headers Client ◄-- Proxy ◄-- Destination + CopyResponseStatusAndHeaders(destinationResponse, context, proxyOptions.Transforms.ResponseHeaderTransforms); - // NOTE: it may *seem* wise to call `context.Response.StartAsync()` at this point - // since it looks like we are ready to send back response headers - // (and this might help reduce extra delays while we wait to receive the body from the destination). - // HOWEVER, this would produce the wrong result if it turns out that there is no content - // from the destination -- instead of sending headers and terminating the stream at once, - // we would send headers thinking a body may be coming, and there is none. - // This is problematic on gRPC connections when the destination server encounters an error, - // in which case it immediately returns the response headers and trailing headers, but no content, - // and clients misbehave if the initial headers response does not indicate stream end. + // :: Step 7-A: Check for a 101 upgrade response, this takes care of WebSockets as well as any other upgradeable protocol. + if (destinationResponse.StatusCode == HttpStatusCode.SwitchingProtocols) + { + await HandleUpgradedResponse(context, upgradeFeature, destinationResponse, requestAborted); + return; + } - // :: Step 7-B: Copy response body Client ◄-- Proxy ◄-- Destination - var (responseBodyCopyResult, responseBodyException) = await CopyResponseBodyAsync(destinationResponse.Content, context.Response.Body, requestAborted); + // NOTE: it may *seem* wise to call `context.Response.StartAsync()` at this point + // since it looks like we are ready to send back response headers + // (and this might help reduce extra delays while we wait to receive the body from the destination). + // HOWEVER, this would produce the wrong result if it turns out that there is no content + // from the destination -- instead of sending headers and terminating the stream at once, + // we would send headers thinking a body may be coming, and there is none. + // This is problematic on gRPC connections when the destination server encounters an error, + // in which case it immediately returns the response headers and trailing headers, but no content, + // and clients misbehave if the initial headers response does not indicate stream end. - if (responseBodyCopyResult != StreamCopyResult.Success) - { - await HandleResponseBodyErrorAsync(context, requestContent, responseBodyCopyResult, responseBodyException); - return; - } + // :: Step 7-B: Copy response body Client ◄-- Proxy ◄-- Destination + var (responseBodyCopyResult, responseBodyException) = await CopyResponseBodyAsync(destinationResponse.Content, context.Response.Body, requestAborted); - // :: Step 8: Copy response trailer headers and finish response Client ◄-- Proxy ◄-- Destination - CopyResponseTrailingHeaders(destinationResponse, context, proxyOptions.Transforms.ResponseTrailerTransforms); + if (responseBodyCopyResult != StreamCopyResult.Success) + { + await HandleResponseBodyErrorAsync(context, requestContent, responseBodyCopyResult, responseBodyException); + return; + } - if (isStreamingRequest) - { - // NOTE: We must call `CompleteAsync` so that Kestrel will flush all bytes to the client. - // In the case where there was no response body, - // this is also when headers and trailing headers are sent to the client. - // Without this, the client might wait forever waiting for response bytes, - // while we might wait forever waiting for request bytes, - // leading to a stuck connection and no way to make progress. - await context.Response.CompleteAsync(); - } + // :: Step 8: Copy response trailer headers and finish response Client ◄-- Proxy ◄-- Destination + CopyResponseTrailingHeaders(destinationResponse, context, proxyOptions.Transforms.ResponseTrailerTransforms); - // :: Step 9: Wait for completion of step 2: copying request body Client --► Proxy --► Destination - if (requestContent != null) - { - var (requestBodyCopyResult, requestBodyException) = await requestContent.ConsumptionTask; + if (isStreamingRequest) + { + // NOTE: We must call `CompleteAsync` so that Kestrel will flush all bytes to the client. + // In the case where there was no response body, + // this is also when headers and trailing headers are sent to the client. + // Without this, the client might wait forever waiting for response bytes, + // while we might wait forever waiting for request bytes, + // leading to a stuck connection and no way to make progress. + await context.Response.CompleteAsync(); + } - if (requestBodyCopyResult != StreamCopyResult.Success) + // :: Step 9: Wait for completion of step 2: copying request body Client --► Proxy --► Destination + if (requestContent != null) { - // The response succeeded. If there was a request body error then it was probably because the client or destination decided - // to cancel it. Report as low severity. + var (requestBodyCopyResult, requestBodyException) = await requestContent.ConsumptionTask; - var error = requestBodyCopyResult switch + if (requestBodyCopyResult != StreamCopyResult.Success) { - StreamCopyResult.InputError => ProxyError.RequestBodyClient, - StreamCopyResult.OutputError => ProxyError.RequestBodyDestination, - StreamCopyResult.Canceled => ProxyError.RequestBodyCanceled, - _ => throw new NotImplementedException(requestBodyCopyResult.ToString()) - }; - ReportProxyError(context, error, requestBodyException); + // The response succeeded. If there was a request body error then it was probably because the client or destination decided + // to cancel it. Report as low severity. + + var error = requestBodyCopyResult switch + { + StreamCopyResult.InputError => ProxyError.RequestBodyClient, + StreamCopyResult.OutputError => ProxyError.RequestBodyDestination, + StreamCopyResult.Canceled => ProxyError.RequestBodyCanceled, + _ => throw new NotImplementedException(requestBodyCopyResult.ToString()) + }; + ReportProxyError(context, error, requestBodyException); + } } } + finally + { + ProxyTelemetry.Log.ProxyStop(context.Response.StatusCode); + } } private HttpRequestMessage CreateRequestMessage(HttpContext context, string destinationAddress, bool isUpgradeRequest, RequestProxyOptions proxyOptions) @@ -375,6 +390,7 @@ private StreamCopyHttpContent SetupRequestBodyCopy(HttpRequest request, HttpRequ requestContent = new StreamCopyHttpContent( source: request.Body, autoFlushHttpClientOutgoingStream: isStreamingRequest, + clock: _clock, cancellation: cancellation); destinationRequest.Content = requestContent; } @@ -443,7 +459,7 @@ static void AddHeader(HttpRequestMessage request, string headerName, StringValue // https://github.com/dotnet/aspnetcore/issues/26461 if (string.Equals(headerName, HeaderNames.Cookie, StringComparison.OrdinalIgnoreCase) && value.Count > 1) { - value = String.Join("; ", value); + value = string.Join("; ", value); } if (value.Count == 1) @@ -544,6 +560,8 @@ private static void CopyResponseStatusAndHeaders(HttpResponseMessage source, Htt private async Task HandleUpgradedResponse(HttpContext context, IHttpUpgradeFeature upgradeFeature, HttpResponseMessage destinationResponse, CancellationToken longCancellation) { + ProxyTelemetry.Log.ProxyStage(ProxyStage.ResponseUpgrade); + // SocketHttpHandler and similar transports always provide an HttpContent object, even if it's empty. // Note as of 5.0 HttpResponse.Content never returns null. // https://github.com/dotnet/runtime/blame/8fc68f626a11d646109a758cb0fc70a0aa7826f1/src/libraries/System.Net.Http/src/System/Net/Http/HttpResponseMessage.cs#L46 @@ -560,8 +578,8 @@ private async Task HandleUpgradedResponse(HttpContext context, IHttpUpgradeFeatu using var abortTokenSource = CancellationTokenSource.CreateLinkedTokenSource(longCancellation); - var requestTask = StreamCopier.CopyAsync(clientStream, destinationStream, abortTokenSource.Token); - var responseTask = StreamCopier.CopyAsync(destinationStream, clientStream, abortTokenSource.Token); + var requestTask = StreamCopier.CopyAsync(isRequest: true, clientStream, destinationStream, _clock, abortTokenSource.Token); + var responseTask = StreamCopier.CopyAsync(isRequest: false, destinationStream, clientStream, _clock, abortTokenSource.Token); // Make sure we report the first failure. var firstTask = await Task.WhenAny(requestTask, responseTask); @@ -609,7 +627,7 @@ void ReportResult(HttpContext context, bool reqeuest, StreamCopyResult result, E if (destinationResponseContent != null) { using var destinationResponseStream = await destinationResponseContent.ReadAsStreamAsync(); - return await StreamCopier.CopyAsync(destinationResponseStream, clientResponseStream, cancellation); + return await StreamCopier.CopyAsync(isRequest: false, destinationResponseStream, clientResponseStream, _clock, cancellation); } return (StreamCopyResult.Success, null); @@ -749,6 +767,7 @@ private void ReportProxyError(HttpContext context, ProxyError error, Exception e { context.Features.Set(new ProxyErrorFeature(error, ex)); Log.ErrorProxying(_logger, error, ex); + ProxyTelemetry.Log.ProxyFailed(error); } private static void ResetOrAbort(HttpContext context, bool isCancelled) diff --git a/src/ReverseProxy/Service/Proxy/ProxyError.cs b/src/ReverseProxy/Service/Proxy/ProxyError.cs index 706580dd8..29f0211e1 100644 --- a/src/ReverseProxy/Service/Proxy/ProxyError.cs +++ b/src/ReverseProxy/Service/Proxy/ProxyError.cs @@ -6,7 +6,7 @@ namespace Microsoft.ReverseProxy.Service.Proxy /// /// Errors reported when proxying a request to the destination. /// - public enum ProxyError + public enum ProxyError : int { /// /// No error. diff --git a/src/ReverseProxy/Service/Proxy/StreamCopier.cs b/src/ReverseProxy/Service/Proxy/StreamCopier.cs index aadc4ef7b..e786aebae 100644 --- a/src/ReverseProxy/Service/Proxy/StreamCopier.cs +++ b/src/ReverseProxy/Service/Proxy/StreamCopier.cs @@ -3,9 +3,12 @@ using System; using System.Buffers; +using System.Diagnostics; using System.IO; using System.Threading; using System.Threading.Tasks; +using Microsoft.ReverseProxy.Telemetry; +using Microsoft.ReverseProxy.Utilities; namespace Microsoft.ReverseProxy.Service.Proxy { @@ -22,18 +25,38 @@ internal static class StreamCopier /// Based on Microsoft.AspNetCore.Http.StreamCopyOperationInternal.CopyToAsync. /// See: . /// - public static async Task<(StreamCopyResult, Exception)> CopyAsync(Stream input, Stream output, CancellationToken cancellation) + public static async Task<(StreamCopyResult, Exception)> CopyAsync(bool isRequest, Stream input, Stream output, IClock clock, CancellationToken cancellation) { _ = input ?? throw new ArgumentNullException(nameof(input)); _ = output ?? throw new ArgumentNullException(nameof(output)); + var telemetryEnabled = ProxyTelemetry.Log.IsEnabled(); + // TODO: Consider System.IO.Pipelines for better perf (e.g. reads during writes) var buffer = ArrayPool.Shared.Rent(DefaultBufferSize); - long iops = 0; - long totalBytes = 0; var reading = true; + + long contentLength = 0; + long iops = 0; + long readTime = 0; + long writeTime = 0; + long firstReadTime = -1; + try { + long lastTime = 0; + long nextTransferringEvent = 0; + long stopwatchTicksBetweenTransferringEvents = 0; + + if (telemetryEnabled) + { + ProxyTelemetry.Log.ProxyStage(isRequest ? ProxyStage.RequestContentTransferStart : ProxyStage.ResponseContentTransferStart); + + stopwatchTicksBetweenTransferringEvents = Stopwatch.Frequency; // 1 second + lastTime = clock.GetStopwatchTimestamp(); + nextTransferringEvent = lastTime + stopwatchTicksBetweenTransferringEvents; + } + while (true) { if (cancellation.IsCancellationRequested) @@ -41,9 +64,29 @@ internal static class StreamCopier return (StreamCopyResult.Canceled, new OperationCanceledException(cancellation)); } - iops++; reading = true; - var read = await input.ReadAsync(buffer.AsMemory(), cancellation); + var read = 0; + try + { + read = await input.ReadAsync(buffer.AsMemory(), cancellation); + } + finally + { + if (telemetryEnabled) + { + contentLength += read; + iops++; + + var readStop = clock.GetStopwatchTimestamp(); + var currentReadTime = readStop - lastTime; + lastTime = readStop; + readTime += currentReadTime; + if (firstReadTime == -1) + { + firstReadTime = currentReadTime; + } + } + } // End of the source stream. if (read == 0) @@ -57,9 +100,32 @@ internal static class StreamCopier } reading = false; - await output.WriteAsync(buffer.AsMemory(0, read), cancellation); + try + { + await output.WriteAsync(buffer.AsMemory(0, read), cancellation); + } + finally + { + if (telemetryEnabled) + { + var writeStop = clock.GetStopwatchTimestamp(); + writeTime += writeStop - lastTime; + lastTime = writeStop; + if (lastTime >= nextTransferringEvent) + { + ProxyTelemetry.Log.ContentTransferring( + isRequest, + contentLength, + iops, + StopwatchTicksToDateTimeTicks(readTime), + StopwatchTicksToDateTimeTicks(writeTime)); - totalBytes += read; + // Avoid attributing the time taken by logging ContentTransferring to the next read call + lastTime = clock.GetStopwatchTimestamp(); + nextTransferringEvent = lastTime + stopwatchTicksBetweenTransferringEvents; + } + } + } } } catch (OperationCanceledException oex) @@ -74,6 +140,23 @@ internal static class StreamCopier { // We can afford the perf impact of clearArray == true since we only do this twice per request. ArrayPool.Shared.Return(buffer, clearArray: true); + + if (telemetryEnabled) + { + ProxyTelemetry.Log.ContentTransferred( + isRequest, + contentLength, + iops, + StopwatchTicksToDateTimeTicks(readTime), + StopwatchTicksToDateTimeTicks(writeTime), + StopwatchTicksToDateTimeTicks(Math.Max(0, firstReadTime))); + } + } + + static long StopwatchTicksToDateTimeTicks(long stopwatchTicks) + { + var dateTimeTicksPerStopwatchTick = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency; + return (long)(stopwatchTicks * dateTimeTicksPerStopwatchTick); } } } diff --git a/src/ReverseProxy/Service/Proxy/StreamCopyHttpContent.cs b/src/ReverseProxy/Service/Proxy/StreamCopyHttpContent.cs index be1374e99..dea093509 100644 --- a/src/ReverseProxy/Service/Proxy/StreamCopyHttpContent.cs +++ b/src/ReverseProxy/Service/Proxy/StreamCopyHttpContent.cs @@ -7,6 +7,7 @@ using System.Net.Http; using System.Threading; using System.Threading.Tasks; +using Microsoft.ReverseProxy.Utilities; namespace Microsoft.ReverseProxy.Service.Proxy { @@ -38,14 +39,16 @@ internal class StreamCopyHttpContent : HttpContent { private readonly Stream _source; private readonly bool _autoFlushHttpClientOutgoingStream; + private readonly IClock _clock; // Note this is the long token that should only be canceled in the event of an error, not timed out. private readonly CancellationToken _cancellation; private readonly TaskCompletionSource<(StreamCopyResult, Exception)> _tcs = new TaskCompletionSource<(StreamCopyResult, Exception)>(TaskCreationOptions.RunContinuationsAsynchronously); - public StreamCopyHttpContent(Stream source, bool autoFlushHttpClientOutgoingStream, CancellationToken cancellation) + public StreamCopyHttpContent(Stream source, bool autoFlushHttpClientOutgoingStream, IClock clock, CancellationToken cancellation) { _source = source ?? throw new ArgumentNullException(nameof(source)); _autoFlushHttpClientOutgoingStream = autoFlushHttpClientOutgoingStream; + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); _cancellation = cancellation; } @@ -148,7 +151,7 @@ protected override async Task SerializeToStreamAsync(Stream stream, TransportCon return; } - var (result, error) = await StreamCopier.CopyAsync(_source, stream, _cancellation); + var (result, error) = await StreamCopier.CopyAsync(isRequest: true, _source, stream, _clock, _cancellation); _tcs.TrySetResult((result, error)); // Check for errors that weren't the result of the destination failing. // We have to throw something here so the transport knows the body is incomplete. diff --git a/src/ReverseProxy/Telemetry/NullOperationContext.cs b/src/ReverseProxy/Telemetry/NullOperationContext.cs deleted file mode 100644 index 3816fd68c..000000000 --- a/src/ReverseProxy/Telemetry/NullOperationContext.cs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using Microsoft.ReverseProxy.Abstractions.Telemetry; - -namespace Microsoft.ReverseProxy.Telemetry -{ - /// - /// Implementation of - /// which doesn't do anything. - /// - public class NullOperationContext : IOperationContext - { - /// - public void SetProperty(string key, string value) - { - } - } -} diff --git a/src/ReverseProxy/Telemetry/NullOperationLogger.cs b/src/ReverseProxy/Telemetry/NullOperationLogger.cs deleted file mode 100644 index 8a94d833d..000000000 --- a/src/ReverseProxy/Telemetry/NullOperationLogger.cs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System; -using System.Threading.Tasks; -using Microsoft.ReverseProxy.Abstractions.Telemetry; - -namespace Microsoft.ReverseProxy.Telemetry -{ - /// - /// Implementation of - /// which doesn't log anything. - /// - public class NullOperationLogger : IOperationLogger - { - /// - public IOperationContext Context => new NullOperationContext(); - - /// - public void Execute(string operationName, Action action) - { - action(); - } - - /// - public TResult Execute(string operationName, Func func) - { - return func(); - } - - /// - public async Task ExecuteAsync(string operationName, Func func) - { - await func(); - } - - /// - public async Task ExecuteAsync(string operationName, Func> func) - { - return await func(); - } - } -} diff --git a/src/ReverseProxy/Telemetry/ProxyStage.cs b/src/ReverseProxy/Telemetry/ProxyStage.cs new file mode 100644 index 000000000..653e43ebd --- /dev/null +++ b/src/ReverseProxy/Telemetry/ProxyStage.cs @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.ReverseProxy.Telemetry +{ + internal enum ProxyStage : int + { + SendAsyncStart = 1, + SendAsyncStop, + RequestContentTransferStart, + ResponseContentTransferStart, + ResponseUpgrade, + } +} diff --git a/src/ReverseProxy/Telemetry/ProxyTelemetry.cs b/src/ReverseProxy/Telemetry/ProxyTelemetry.cs new file mode 100644 index 000000000..4d7cfe1e2 --- /dev/null +++ b/src/ReverseProxy/Telemetry/ProxyTelemetry.cs @@ -0,0 +1,204 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Diagnostics; +using System.Diagnostics.Tracing; +using System.Threading; +using Microsoft.ReverseProxy.Service.Proxy; + +namespace Microsoft.ReverseProxy.Telemetry +{ + internal sealed class ProxyTelemetry : EventSource + { + public static readonly ProxyTelemetry Log = new ProxyTelemetry(); + + private IncrementingPollingCounter _startedRequestsPerSecondCounter; + private PollingCounter _startedRequestsCounter; + private PollingCounter _currentRequestsCounter; + private PollingCounter _failedRequestsCounter; + + private long _startedRequests; + private long _stoppedRequests; + private long _failedRequests; + + private ProxyTelemetry() + : base("Microsoft.ReverseProxy") + { } + + [Event(1, Level = EventLevel.Informational)] + public void ProxyStart(string destinationPrefix) + { + Interlocked.Increment(ref _startedRequests); + + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + WriteEvent(eventId: 1, destinationPrefix); + } + } + + [Event(2, Level = EventLevel.Informational)] + public void ProxyStop(int statusCode) + { + Interlocked.Increment(ref _stoppedRequests); + + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + WriteEvent(eventId: 2, statusCode); + } + } + + [Event(3, Level = EventLevel.Informational)] + public void ProxyFailed(ProxyError error) + { + Interlocked.Increment(ref _failedRequests); + + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + Debug.Assert(sizeof(ProxyError) == sizeof(int), "Backing type of ProxyError MUST NOT be changed"); + WriteEvent(eventId: 3, (int)error); + } + } + + [Event(4, Level = EventLevel.Informational)] + public void ProxyStage(ProxyStage stage) + { + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + Debug.Assert(sizeof(ProxyStage) == sizeof(int), "Backing type of ProxyStage MUST NOT be changed"); + WriteEvent(eventId: 4, (int)stage); + } + } + + [Event(5, Level = EventLevel.Informational)] + public void ContentTransferring(bool isRequest, long contentLength, long iops, long readTime, long writeTime) + { + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + WriteEvent(eventId: 5, isRequest, contentLength, iops, readTime, writeTime); + } + } + + [Event(6, Level = EventLevel.Informational)] + public void ContentTransferred(bool isRequest, long contentLength, long iops, long readTime, long writeTime, long firstReadTime) + { + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + WriteEvent(eventId: 6, isRequest, contentLength, iops, readTime, writeTime, firstReadTime); + } + } + + [Event(7, Level = EventLevel.Informational)] + public void ProxyInvoke(string clusterId, string routeId, string destinationId) + { + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + WriteEvent(eventId: 7, clusterId, routeId, destinationId); + } + } + + + protected override void OnEventCommand(EventCommandEventArgs command) + { + if (command.Command == EventCommand.Enable) + { + _startedRequestsCounter ??= new PollingCounter("requests-started", this, () => Volatile.Read(ref _startedRequests)) + { + DisplayName = "Requests Started", + }; + + _startedRequestsPerSecondCounter ??= new IncrementingPollingCounter("requests-started-rate", this, () => Volatile.Read(ref _startedRequests)) + { + DisplayName = "Requests Started Rate", + DisplayRateTimeScale = TimeSpan.FromSeconds(1) + }; + + _failedRequestsCounter ??= new PollingCounter("requests-failed", this, () => Volatile.Read(ref _failedRequests)) + { + DisplayName = "Requests Failed" + }; + + _currentRequestsCounter ??= new PollingCounter("current-requests", this, () => -Volatile.Read(ref _stoppedRequests) + Volatile.Read(ref _startedRequests)) + { + DisplayName = "Current Requests" + }; + } + } + + + [NonEvent] + private unsafe void WriteEvent(int eventId, bool arg1, long arg2, long arg3, long arg4, long arg5) + { + const int NumEventDatas = 5; + var descrs = stackalloc EventData[NumEventDatas]; + + descrs[0] = new EventData + { + DataPointer = (IntPtr)(&arg1), + Size = sizeof(int) // EventSource defines bool as a 32-bit type + }; + descrs[1] = new EventData + { + DataPointer = (IntPtr)(&arg2), + Size = sizeof(long) + }; + descrs[2] = new EventData + { + DataPointer = (IntPtr)(&arg3), + Size = sizeof(long) + }; + descrs[3] = new EventData + { + DataPointer = (IntPtr)(&arg4), + Size = sizeof(long) + }; + descrs[4] = new EventData + { + DataPointer = (IntPtr)(&arg5), + Size = sizeof(long) + }; + + WriteEventCore(eventId, NumEventDatas, descrs); + } + + [NonEvent] + private unsafe void WriteEvent(int eventId, bool arg1, long arg2, long arg3, long arg4, long arg5, long arg6) + { + const int NumEventDatas = 6; + var descrs = stackalloc EventData[NumEventDatas]; + + descrs[0] = new EventData + { + DataPointer = (IntPtr)(&arg1), + Size = sizeof(int) // EventSource defines bool as a 32-bit type + }; + descrs[1] = new EventData + { + DataPointer = (IntPtr)(&arg2), + Size = sizeof(long) + }; + descrs[2] = new EventData + { + DataPointer = (IntPtr)(&arg3), + Size = sizeof(long) + }; + descrs[3] = new EventData + { + DataPointer = (IntPtr)(&arg4), + Size = sizeof(long) + }; + descrs[4] = new EventData + { + DataPointer = (IntPtr)(&arg5), + Size = sizeof(long) + }; + descrs[5] = new EventData + { + DataPointer = (IntPtr)(&arg6), + Size = sizeof(long) + }; + + WriteEventCore(eventId, NumEventDatas, descrs); + } + } +} diff --git a/src/ReverseProxy/Telemetry/TextOperationLogger.cs b/src/ReverseProxy/Telemetry/TextOperationLogger.cs deleted file mode 100644 index 7fdc7e940..000000000 --- a/src/ReverseProxy/Telemetry/TextOperationLogger.cs +++ /dev/null @@ -1,136 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Microsoft.ReverseProxy.Abstractions.Telemetry; -using Microsoft.ReverseProxy.Utilities; - -namespace Microsoft.ReverseProxy.Telemetry -{ - /// - /// Default implementation of - /// which logs activity start / end events as Information messages. - /// - public class TextOperationLogger : IOperationLogger - { - private readonly ILogger _logger; - - /// - /// Initializes a new instance of the class. - /// - /// Logger where text messages will be logger. - public TextOperationLogger(ILogger logger) - { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - } - - /// - // TODO: Implement this. - public IOperationContext Context => null; - - /// - public void Execute(string operationName, Action action) - { - var stopwatch = ValueStopwatch.StartNew(); - try - { - Log.OperationStarted(_logger, operationName); - action(); - Log.OperationEnded(_logger, operationName, stopwatch.Elapsed.TotalMilliseconds); - } - catch (Exception ex) - { - Log.OperationFailed(_logger, operationName, stopwatch.Elapsed.TotalMilliseconds, ex.Message); - throw; - } - } - - /// - public TResult Execute(string operationName, Func func) - { - var stopwatch = ValueStopwatch.StartNew(); - try - { - Log.OperationStarted(_logger, operationName); - var res = func(); - Log.OperationEnded(_logger, operationName, stopwatch.Elapsed.TotalMilliseconds); - return res; - } - catch (Exception ex) - { - Log.OperationFailed(_logger, operationName, stopwatch.Elapsed.TotalMilliseconds, ex.Message); - throw; - } - } - - /// - public async Task ExecuteAsync(string operationName, Func action) - { - var stopwatch = ValueStopwatch.StartNew(); - try - { - Log.OperationStarted(_logger, operationName); - await action(); - Log.OperationEnded(_logger, operationName, stopwatch.Elapsed.TotalMilliseconds); - } - catch (Exception ex) - { - Log.OperationFailed(_logger, operationName, stopwatch.Elapsed.TotalMilliseconds, ex.Message); - throw; - } - } - - /// - public async Task ExecuteAsync(string operationName, Func> func) - { - var stopwatch = ValueStopwatch.StartNew(); - try - { - Log.OperationStarted(_logger, operationName); - var res = await func(); - Log.OperationEnded(_logger, operationName, stopwatch.Elapsed.TotalMilliseconds); - return res; - } - catch (Exception ex) - { - Log.OperationFailed(_logger, operationName, stopwatch.Elapsed.TotalMilliseconds, ex.Message); - throw; - } - } - - private static class Log - { - private static readonly Action _operationStarted = LoggerMessage.Define( - LogLevel.Information, - EventIds.OperationStarted, - "Operation started: {operationName}"); - - private static readonly Action _operationEnded = LoggerMessage.Define( - LogLevel.Information, - EventIds.OperationEnded, - "Operation ended: {operationName}, {operationDuration}ms, success"); - - private static readonly Action _operationFailed = LoggerMessage.Define( - LogLevel.Information, - EventIds.OperationFailed, - "Operation ended: {operationName}, {operationDuration}ms, error: {operationError}"); - - public static void OperationStarted(ILogger logger, string operationName) - { - _operationStarted(logger, operationName, null); - } - - public static void OperationEnded(ILogger logger, string operationName, double operationDuration) - { - _operationEnded(logger, operationName, operationDuration, null); - } - - public static void OperationFailed(ILogger logger, string operationName, double operationDuration, string operationError) - { - _operationFailed(logger, operationName, operationDuration, operationError, null); - } - } - } -} diff --git a/src/ReverseProxy/Utilities/UptimeClock.cs b/src/ReverseProxy/Utilities/Clock.cs similarity index 59% rename from src/ReverseProxy/Utilities/UptimeClock.cs rename to src/ReverseProxy/Utilities/Clock.cs index 68c2a6ee5..26cc45026 100644 --- a/src/ReverseProxy/Utilities/UptimeClock.cs +++ b/src/ReverseProxy/Utilities/Clock.cs @@ -2,11 +2,14 @@ // Licensed under the MIT License. using System; +using System.Diagnostics; namespace Microsoft.ReverseProxy.Utilities { - internal class UptimeClock : IUptimeClock + internal sealed class Clock : IClock { public long TickCount => Environment.TickCount64; + + public long GetStopwatchTimestamp() => Stopwatch.GetTimestamp(); } } diff --git a/src/ReverseProxy/Utilities/IUptimeClock.cs b/src/ReverseProxy/Utilities/IClock.cs similarity index 56% rename from src/ReverseProxy/Utilities/IUptimeClock.cs rename to src/ReverseProxy/Utilities/IClock.cs index 18dbccf17..79520b7dc 100644 --- a/src/ReverseProxy/Utilities/IUptimeClock.cs +++ b/src/ReverseProxy/Utilities/IClock.cs @@ -4,10 +4,12 @@ namespace Microsoft.ReverseProxy.Utilities { /// - /// Measures the time passed since the application start. + /// Abstraction over time providers (Environment.TickCount64, Stopwatch.GetTimestamp) /// - internal interface IUptimeClock + internal interface IClock { long TickCount { get; } + + long GetStopwatchTimestamp(); } } diff --git a/src/ReverseProxy/Utilities/MonotonicTimer.cs b/src/ReverseProxy/Utilities/MonotonicTimer.cs deleted file mode 100644 index b837e3332..000000000 --- a/src/ReverseProxy/Utilities/MonotonicTimer.cs +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.ReverseProxy.Abstractions.Time; - -namespace Microsoft.ReverseProxy.Utilities -{ - /// - /// Provides a way to measure time in a monotonic fashion, immune to any system clock changes. - /// The time is measured from the moment the class is instantiated. - /// - public sealed class MonotonicTimer : IMonotonicTimer - { - /// - /// Specifies the minimum granularity of a scheduling tick. Larger values produce less precise scheduling. Smaller values - /// produce unnecessary scheduling events, wasting CPU cycles and/or power. - /// - private static readonly TimeSpan _minimalInterval = TimeSpan.FromMilliseconds(0.1); - - /// - /// Use a System.Diagnostics.StopWatch to measure time. Even though it has a poorer resolution, it serves this purpose very well. - /// - private readonly Stopwatch _timeProvider; - - /// - /// Initializes a new instance of the class. - /// - public MonotonicTimer() - { - _timeProvider = Stopwatch.StartNew(); - } - - /// - public TimeSpan CurrentTime => _timeProvider.Elapsed; - - /// - public async Task DelayUntil(TimeSpan expiryTime, CancellationToken cancellationToken) - { - // Note: this implementation could be improved by coalescing related expirations. For example, if there's a When(12:00 noon) and When(12:30pm), then - // the second When doesn't need to start allocating Task.Delay timers until after the first expires. - for (; ;) - { - var now = CurrentTime; - if (now >= expiryTime) - { - return; - } - - var delay = TimeUtil.Max(expiryTime - now, _minimalInterval); - await Task.Delay(delay, cancellationToken); - } - } - } -} diff --git a/src/ReverseProxy/Utilities/MonotonicTimerExtensions.cs b/src/ReverseProxy/Utilities/MonotonicTimerExtensions.cs deleted file mode 100644 index 73f54bc4e..000000000 --- a/src/ReverseProxy/Utilities/MonotonicTimerExtensions.cs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.ReverseProxy.Abstractions.Time; - -namespace Microsoft.ReverseProxy.Utilities -{ - /// - /// Extension methods for . - /// - public static class MonotonicTimerExtensions - { - /// - /// Creates a task that completes after . - /// - /// instance. - /// How much time to delay for. - /// Cancellation token. - /// A task which completes when the delay has elapsed. - public static async Task Delay(this IMonotonicTimer timer, TimeSpan delay, CancellationToken cancellation) - { - if (timer == null) - { - throw new ArgumentNullException(nameof(timer)); - } - - await timer.DelayUntil(timer.CurrentTime + delay, cancellation); - } - - /// - /// Operates like but supporting specifying a custom timer. - /// - /// Token to cancel after expiration is complete. - /// Timeout after which the cancellationTokenSource will be canceled. - /// Timer to perform the measurement of time for determining when to cancel. - public static async void CancelAfter(this CancellationTokenSource cancellationTokenSource, TimeSpan timeout, IMonotonicTimer timer) - { - if (timer == null) - { - throw new ArgumentNullException(nameof(timer)); - } - - try - { - await timer.Delay(timeout, cancellationTokenSource.Token); - cancellationTokenSource.Cancel(); - } - catch (ObjectDisposedException) - { - // Ignore disposed cancellation tokens. Indicates cancellation is no longer needed. Unfortunately CTS's don't give a good - // way to safely check async disposal, so must rely on exception handling instead. - } - catch (OperationCanceledException) - { - // It cts was canceled, then there's no need for us to cancel the token. Return successfully. - // Note that we can't avoid this situation in advance as we strongly desire here to retain the 'void' returning - // interface that cts.CancelAfter(ts) has. - } - } - } -} diff --git a/test/ReverseProxy.Tests/Common/DelegatingStream.cs b/test/ReverseProxy.Tests/Common/DelegatingStream.cs new file mode 100644 index 000000000..dfc2f5f81 --- /dev/null +++ b/test/ReverseProxy.Tests/Common/DelegatingStream.cs @@ -0,0 +1,186 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.ReverseProxy.Common.Tests +{ + // Forwards all calls to an inner stream except where overridden in a derived class. + internal abstract class DelegatingStream : Stream + { + private readonly Stream _innerStream; + + #region Properties + + public override bool CanRead + { + get { return _innerStream.CanRead; } + } + + public override bool CanSeek + { + get { return _innerStream.CanSeek; } + } + + public override bool CanWrite + { + get { return _innerStream.CanWrite; } + } + + public override long Length + { + get { return _innerStream.Length; } + } + + public override long Position + { + get { return _innerStream.Position; } + set { _innerStream.Position = value; } + } + + public override int ReadTimeout + { + get { return _innerStream.ReadTimeout; } + set { _innerStream.ReadTimeout = value; } + } + + public override bool CanTimeout + { + get { return _innerStream.CanTimeout; } + } + + public override int WriteTimeout + { + get { return _innerStream.WriteTimeout; } + set { _innerStream.WriteTimeout = value; } + } + + #endregion Properties + + protected DelegatingStream(Stream innerStream) + { + Debug.Assert(innerStream != null); + _innerStream = innerStream; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _innerStream.Dispose(); + } + base.Dispose(disposing); + } + + public override ValueTask DisposeAsync() + { + return _innerStream.DisposeAsync(); + } + + #region Read + + public override long Seek(long offset, SeekOrigin origin) + { + return _innerStream.Seek(offset, origin); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _innerStream.Read(buffer, offset, count); + } + + public override int Read(Span buffer) + { + return _innerStream.Read(buffer); + } + + public override int ReadByte() + { + return _innerStream.ReadByte(); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return _innerStream.ReadAsync(buffer, offset, count, cancellationToken); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + return _innerStream.ReadAsync(buffer, cancellationToken); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return _innerStream.BeginRead(buffer, offset, count, callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return _innerStream.EndRead(asyncResult); + } + + #endregion Read + + #region Write + + public override void Flush() + { + _innerStream.Flush(); + } + + public override Task FlushAsync(CancellationToken cancellationToken) + { + return _innerStream.FlushAsync(cancellationToken); + } + + public override void SetLength(long value) + { + _innerStream.SetLength(value); + } + + public override void Write(byte[] buffer, int offset, int count) + { + _innerStream.Write(buffer, offset, count); + } + + public override void Write(ReadOnlySpan buffer) + { + _innerStream.Write(buffer); + } + + public override void WriteByte(byte value) + { + _innerStream.WriteByte(value); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return _innerStream.WriteAsync(buffer, offset, count, cancellationToken); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return _innerStream.WriteAsync(buffer, cancellationToken); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + return _innerStream.BeginWrite(buffer, offset, count, callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + _innerStream.EndWrite(asyncResult); + } + + public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + return _innerStream.CopyToAsync(destination, bufferSize, cancellationToken); + } + #endregion Write + } +} diff --git a/test/ReverseProxy.Tests/Common/EventAssertExtensions.cs b/test/ReverseProxy.Tests/Common/EventAssertExtensions.cs new file mode 100644 index 000000000..9f5abb765 --- /dev/null +++ b/test/ReverseProxy.Tests/Common/EventAssertExtensions.cs @@ -0,0 +1,70 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Diagnostics.Tracing; +using System.Linq; +using Microsoft.ReverseProxy.Telemetry; +using Xunit; + +namespace Microsoft.ReverseProxy.Common.Tests +{ + internal static class EventAssertExtensions + { + public static (ProxyStage Stage, DateTime TimeStamp)[] GetProxyStages(this List events) + { + return events + .Where(e => e.EventName == "ProxyStage") + .Select(e => + { + var stage = (ProxyStage)Assert.Single(e.Payload); + Assert.InRange(stage, ProxyStage.SendAsyncStart, ProxyStage.ResponseUpgrade); + return (stage, e.TimeStamp); + }) + .ToArray(); + } + + public static void AssertContainProxyStages(this List events, bool hasRequestContent = true, bool upgrade = false) + { + var stages = new List() + { + ProxyStage.SendAsyncStart, + ProxyStage.SendAsyncStop, + ProxyStage.ResponseContentTransferStart, + }; + + if (hasRequestContent) + { + stages.Add(ProxyStage.RequestContentTransferStart); + } + + if (upgrade) + { + stages.Add(ProxyStage.ResponseUpgrade); + } + + events.AssertContainProxyStages(stages.ToArray()); + } + + public static void AssertContainProxyStages(this List events, ProxyStage[] expectedStages) + { + var proxyStages = events.GetProxyStages() + .Select(s => s.Stage) + .ToArray(); + + var presentStages = proxyStages.ToHashSet(); + + Assert.Equal(presentStages.Count, proxyStages.Length); + + foreach (var expectedStage in expectedStages) + { + Assert.Contains(expectedStage, presentStages); + } + + presentStages.RemoveWhere(s => expectedStages.Contains(s)); + + Assert.Empty(presentStages); + } + } +} diff --git a/test/ReverseProxy.Tests/Common/ManualClock.cs b/test/ReverseProxy.Tests/Common/ManualClock.cs new file mode 100644 index 000000000..77609ce14 --- /dev/null +++ b/test/ReverseProxy.Tests/Common/ManualClock.cs @@ -0,0 +1,18 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Diagnostics; +using Microsoft.ReverseProxy.Utilities; + +namespace Microsoft.ReverseProxy.Common.Tests +{ + internal class ManualClock : IClock + { + public TimeSpan Time { get; set; } + + public long TickCount => (long)Time.TotalMilliseconds; + + public long GetStopwatchTimestamp() => (long)(Time.TotalSeconds * Stopwatch.Frequency); + } +} diff --git a/test/ReverseProxy.Tests/Common/SingleThreadedTaskScheduler.cs b/test/ReverseProxy.Tests/Common/SingleThreadedTaskScheduler.cs deleted file mode 100644 index ed138dcb4..000000000 --- a/test/ReverseProxy.Tests/Common/SingleThreadedTaskScheduler.cs +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.ReverseProxy.Utilities; - -namespace Microsoft.ReverseProxy.Common.Tests -{ - /// - /// Task scheduler that only schedules execution of one task at a time. - /// - /// - /// This task scheduler included the ability to wait for all scheduled tasks to complete. This is intended for testing - /// where repeatability is paramount. Use with the in testing time-related policies - /// for repeatable time related tests that don't have to wait for wall clock time to elapse (e.g. simulating minutes or hours - /// in milliseconds of real time; testing of timeout and rate limiting policies). - /// - /// Note that while this task scheduler only runs one task at a time, this doesn't prevent async methods from running - /// concurrently. Async methods allocate one task between every pair of blocking awaits. If two async methods are running, they - /// end up taking turns, where one is running until an await blocks the async method, allowing the second one a turn to run until - /// it blocks, back and forth. - /// - public class SingleThreadedTaskScheduler : TaskScheduler - { - private readonly object _lockObject = new object(); - private readonly Queue _taskQueue = new Queue(); - private bool _schedulerIsRunning = false; - - // An indication to the scheduler to either attempt or not attempt to execute any scheduled tasks upon scheduling. - // If the value is true, a call to TaskScheduler.Run() will queue the task on the current scheduler and attempt to execute - // the tasks on the queue in the order they were put on the queue. - // If the value is false, a call to TaskScheduler.Run() will queue the task on the current scheduler, but not execute it. - private bool _suspendScheduler = false; - - /// - /// Gets or sets callback to invoke whenever the scheduler goes idle. This gives listeners a chance to create more work to schedule before the scheduler loop terminates. - /// - public Action OnIdle { get; set; } - - /// - /// Helper function to run an async method until completion. - /// This should be used in cases when you have exercised the main action and don't care about dangling work on the scheduler. - /// - public void RunUntilComplete(Func func) - { - lock (_lockObject) - { - if (_schedulerIsRunning) - { - throw new ArgumentException("Synchronous execution is not supported if already being executed."); - } - - var flag = false; - _suspendScheduler = true; - - try - { - var ignore = this.Run(async () => { await func(); flag = true; }); - } - finally - { - _suspendScheduler = false; - } - - ExecuteTasksUntil(() => flag); - - if (!flag) - { - throw new ArgumentException("Task did not execute synchronously. Check for any non-single threaded work."); - } - } - } - - /// - protected override void QueueTask(Task task) - { - lock (_lockObject) - { - _taskQueue.Enqueue(task); - - ExecuteTasksUntil(() => false); - } - } - - /// - protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) - { - return false; - } - - /// - protected override IEnumerable GetScheduledTasks() - { - return _taskQueue; - } - - /// - /// Runs tasks in single threaded fashion until queue is emptied or the condition of the predicate is false. - /// - /// - /// It's critical that this method is not invoked with itself as the current scheduler. It must run on another scheduler (such - /// as the default scheduler), or a deadlock will occur. - /// - private void ExecuteTasksUntil(Func predicate) - { - if (!Monitor.IsEntered(_lockObject)) - { - throw new ArgumentException("Must hold lock when scheduling."); - } - - // Prevent reentrancy. Reentrancy can lead to stack overflow and difficulty when debugging. - if (_schedulerIsRunning || _suspendScheduler) - { - return; - } - - try - { - _schedulerIsRunning = true; - while (predicate() == false) - { - if (_taskQueue.Count == 0) - { - if (OnIdle != null) - { - OnIdle(); - - // OnIdle handler may have created more tasks. Try finding more work to execute. - if (_taskQueue.Count != 0) - { - continue; - } - } - - // No remaining work to execute. Return. - return; - } - - var nextTask = _taskQueue.Dequeue(); - TryExecuteTask(nextTask); - } - } - finally - { - _schedulerIsRunning = false; - } - } - } -} diff --git a/test/ReverseProxy.Tests/Common/TestEventListener.cs b/test/ReverseProxy.Tests/Common/TestEventListener.cs new file mode 100644 index 000000000..b91a677f4 --- /dev/null +++ b/test/ReverseProxy.Tests/Common/TestEventListener.cs @@ -0,0 +1,42 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Diagnostics.Tracing; +using System.Threading; + +namespace Microsoft.ReverseProxy.Common.Tests +{ + internal static class TestEventListener + { + private static readonly AsyncLocal> _eventsAsyncLocal = new AsyncLocal>(); + private static readonly InternalEventListener _listener = new InternalEventListener(); + + public static List Collect() + { + return _eventsAsyncLocal.Value = new List(); + } + + private sealed class InternalEventListener : EventListener + { + protected override void OnEventSourceCreated(EventSource eventSource) + { + if (eventSource.Name == "Microsoft.ReverseProxy") + { + EnableEvents(eventSource, EventLevel.LogAlways, EventKeywords.All); + } + } + + protected override void OnEventWritten(EventWrittenEventArgs eventData) + { + if (eventData.EventId == 0) + { + throw new Exception($"EventSource error received: {eventData.Payload[0]}"); + } + + _eventsAsyncLocal.Value?.Add(eventData); + } + } + } +} diff --git a/test/ReverseProxy.Tests/Common/VirtualMonotonicTimer.cs b/test/ReverseProxy.Tests/Common/VirtualMonotonicTimer.cs deleted file mode 100644 index 4226c37b1..000000000 --- a/test/ReverseProxy.Tests/Common/VirtualMonotonicTimer.cs +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.ReverseProxy.Abstractions.Time; - -namespace Microsoft.ReverseProxy.Common.Tests -{ - /// - /// Simulation analog to MonotonicTimer, used for testing. - /// - /// - /// This timer doesn't track real time, but instead tracks virtual. Time only advances when - /// the method is called. - /// - /// - public class VirtualMonotonicTimer : IMonotonicTimer - { - private readonly SortedList _delayItems = new SortedList(); - - /// - /// Initializes a new instance of the class. - /// - /// Initial value for current time. Zero if not specified. - public VirtualMonotonicTimer(TimeSpan? initialTime = null) - { - CurrentTime = initialTime ?? TimeSpan.Zero; - } - - /// - public TimeSpan CurrentTime { get; private set; } - - /// - /// Advances time by the specified amount. - /// - /// How much to advance by. - public void AdvanceClockBy(TimeSpan howMuch) - { - AdvanceClockTo(CurrentTime + howMuch); - } - - /// - /// Advances time to the specified point. - /// - /// Advances until it equals . - public void AdvanceClockTo(TimeSpan targetTime) - { - if (targetTime < CurrentTime) - { - throw new InvalidOperationException("Time should not flow backwards"); - } - - // Signal any delays that have expired by advancing the clock. - while (_delayItems.Count > 0 && _delayItems.ElementAt(0).Key <= targetTime) - { - AdvanceStep(); - } - - CurrentTime = targetTime; - } - - /// - /// Creates a task that completes when CurrentTime >= expiryTime. - /// - /// Time at which the returned task will be completed. - /// Cancellation token for the created task. - /// A task which completed at . - public async Task DelayUntil(TimeSpan expiryTime, CancellationToken cancelationToken) - { - if (expiryTime <= CurrentTime) - { - return; - } - - var delayTask = new DelayItem - { - When = expiryTime, - Signal = new TaskCompletionSource(cancelationToken), - }; - - var task = delayTask.Signal.Task; - - // Note: sorted list doesn't allow duplicates, so increment expiry until unique. - while (_delayItems.ContainsKey(expiryTime)) - { - expiryTime += TimeSpan.FromTicks(1); - } - - _delayItems.Add(expiryTime, delayTask); - - using (cancelationToken.Register(() => CancelTask(delayTask))) - { - await task; - } - } - - /// - /// Advances time to schedule the next item of work. - /// - /// True if any timers were found and signaled. - public bool AdvanceStep() - { - if (_delayItems.Count > 0) - { - var next = _delayItems.ElementAt(0); - CurrentTime = next.Key; - - // Note: this will unfortunately have O(N) cost. However, this code is only used for testing right now, and the list is generally short. If that - // ever changes, suggest finding a priority queue / heap data structure for .Net (core libraries are lacking this data structure). - _delayItems.RemoveAt(0); - - // Unblock the task. It's no longer asleep. - next.Value.Signal.TrySetResult(0); - - // Note that TPL normally schedules tasks synchronously. When used with - // the SingleThreadedTaskScheduler, we can assume all tasks have completed by the - // time SetResult returns, provided that AdvanceClockTo was invoked outside of the task scheduler - // loop. - return true; - } - - return false; - } - - private void CancelTask(DelayItem delayTask) - { - var i = _delayItems.IndexOfValue(delayTask); - if (i != -1) - { - _delayItems.RemoveAt(i); - } - - delayTask.Signal.TrySetCanceled(); - } - - private class DelayItem - { - public TimeSpan When { get; set; } - - public TaskCompletionSource Signal { get; set; } - } - } -} diff --git a/test/ReverseProxy.Tests/Middleware/LoadBalancerMiddlewareTests.cs b/test/ReverseProxy.Tests/Middleware/LoadBalancerMiddlewareTests.cs index 37b379868..daff4aedd 100644 --- a/test/ReverseProxy.Tests/Middleware/LoadBalancerMiddlewareTests.cs +++ b/test/ReverseProxy.Tests/Middleware/LoadBalancerMiddlewareTests.cs @@ -8,12 +8,10 @@ using Microsoft.AspNetCore.Routing; using Microsoft.AspNetCore.Routing.Patterns; using Microsoft.ReverseProxy.Abstractions; -using Microsoft.ReverseProxy.Abstractions.Telemetry; using Microsoft.ReverseProxy.Common.Tests; using Microsoft.ReverseProxy.RuntimeModel; using Microsoft.ReverseProxy.Service.Management; using Microsoft.ReverseProxy.Service.Proxy; -using Microsoft.ReverseProxy.Telemetry; using Moq; using Xunit; @@ -23,7 +21,6 @@ public class LoadBalancerMiddlewareTests : TestAutoMockBase { public LoadBalancerMiddlewareTests() { - Provide, TextOperationLogger>(); Provide(context => Task.CompletedTask); } diff --git a/test/ReverseProxy.Tests/Middleware/ProxyInvokerMiddlewareTests.cs b/test/ReverseProxy.Tests/Middleware/ProxyInvokerMiddlewareTests.cs index 23306f714..1530c28fb 100644 --- a/test/ReverseProxy.Tests/Middleware/ProxyInvokerMiddlewareTests.cs +++ b/test/ReverseProxy.Tests/Middleware/ProxyInvokerMiddlewareTests.cs @@ -9,12 +9,10 @@ using Microsoft.AspNetCore.Routing; using Microsoft.AspNetCore.Routing.Patterns; using Microsoft.ReverseProxy.Abstractions; -using Microsoft.ReverseProxy.Abstractions.Telemetry; using Microsoft.ReverseProxy.Common.Tests; using Microsoft.ReverseProxy.RuntimeModel; using Microsoft.ReverseProxy.Service.Management; using Microsoft.ReverseProxy.Service.Proxy; -using Microsoft.ReverseProxy.Telemetry; using Moq; using Xunit; @@ -22,11 +20,6 @@ namespace Microsoft.ReverseProxy.Middleware.Tests { public class ProxyInvokerMiddlewareTests : TestAutoMockBase { - public ProxyInvokerMiddlewareTests() - { - Provide, TextOperationLogger>(); - } - [Fact] public void Constructor_Works() { @@ -36,6 +29,8 @@ public void Constructor_Works() [Fact] public async Task Invoke_Works() { + var events = TestEventListener.Collect(); + // Arrange var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; @@ -111,6 +106,12 @@ public async Task Invoke_Works() // Assert Mock().Verify(); + + var invoke = Assert.Single(events, e => e.EventName == "ProxyInvoke"); + Assert.Equal(3, invoke.Payload.Count); + Assert.Equal(cluster1.ClusterId, (string)invoke.Payload[0]); + Assert.Equal(routeConfig.Route.RouteId, (string)invoke.Payload[1]); + Assert.Equal(destination1.DestinationId, (string)invoke.Payload[2]); } [Fact] diff --git a/test/ReverseProxy.Tests/Service/Management/EntityActionSchedulerTests.cs b/test/ReverseProxy.Tests/Service/Management/EntityActionSchedulerTests.cs index e65d85550..5a40d010a 100644 --- a/test/ReverseProxy.Tests/Service/Management/EntityActionSchedulerTests.cs +++ b/test/ReverseProxy.Tests/Service/Management/EntityActionSchedulerTests.cs @@ -81,7 +81,7 @@ public void Schedule_AutoStartDisabledRunOnceEnabled_StartsManuallyAndRunsEachRe timerFactory.VerifyTimer(1, Period1); timerFactory.FireTimer(1); - + Assert.Same(entity1, lastInvokedEntity); VerifyEntities(scheduler, entity0); @@ -224,7 +224,7 @@ public void ChangePeriod_TimerStartedPeriodChangedAfterFirstCall_PeriodChangedBe private void VerifyEntities(EntityActionScheduler scheduler, params Entity[] entities) { var actualCount = 0; - foreach(var entity in entities) + foreach (var entity in entities) { Assert.True(scheduler.IsScheduled(entity)); actualCount++; diff --git a/test/ReverseProxy.Tests/Service/Proxy/HttpProxyTests.cs b/test/ReverseProxy.Tests/Service/Proxy/HttpProxyTests.cs index 42b17131f..435c7b30f 100644 --- a/test/ReverseProxy.Tests/Service/Proxy/HttpProxyTests.cs +++ b/test/ReverseProxy.Tests/Service/Proxy/HttpProxyTests.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics.Tracing; using System.IO; using System.IO.Pipelines; using System.Linq; @@ -15,7 +16,9 @@ using Microsoft.AspNetCore.Http.Features; using Microsoft.Extensions.DependencyInjection; using Microsoft.Net.Http.Headers; +using Microsoft.ReverseProxy.Common.Tests; using Microsoft.ReverseProxy.Service.RuntimeModel.Transforms; +using Microsoft.ReverseProxy.Telemetry; using Microsoft.ReverseProxy.Utilities; using Moq; using Xunit; @@ -43,6 +46,8 @@ public void Constructor_Works() [Fact] public async Task ProxyAsync_NormalRequest_Works() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Scheme = "http"; @@ -105,11 +110,16 @@ public async Task ProxyAsync_NormalRequest_Works() proxyResponseStream.Position = 0; var proxyResponseText = StreamToString(proxyResponseStream); Assert.Equal("response content", proxyResponseText); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(); } [Fact] public async Task ProxyAsync_NormalRequestWithTransforms_Works() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Protocol = "http/2"; @@ -201,11 +211,16 @@ public async Task ProxyAsync_NormalRequestWithTransforms_Works() proxyResponseStream.Position = 0; var proxyResponseText = StreamToString(proxyResponseStream); Assert.Equal("response content", proxyResponseText); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(); } [Fact] public async Task ProxyAsync_NormalRequestWithCopyRequestHeadersDisabled_Works() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Scheme = "http"; @@ -292,11 +307,16 @@ public async Task ProxyAsync_NormalRequestWithCopyRequestHeadersDisabled_Works() proxyResponseStream.Position = 0; var proxyResponseText = StreamToString(proxyResponseStream); Assert.Equal("response content", proxyResponseText); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(); } [Fact] public async Task ProxyAsync_NormalRequestWithExistingForwarders_Appends() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Scheme = "http"; @@ -348,7 +368,7 @@ public async Task ProxyAsync_NormalRequestWithExistingForwarders_Appends() Assert.Null(request.Content); - var response = new HttpResponseMessage((HttpStatusCode)234); + var response = new HttpResponseMessage((HttpStatusCode)234) { Content = new ByteArrayContent(Array.Empty()) }; return response; }); @@ -360,12 +380,17 @@ public async Task ProxyAsync_NormalRequestWithExistingForwarders_Appends() await sut.ProxyAsync(httpContext, destinationPrefix, client, proxyOptions); Assert.Equal(234, httpContext.Response.StatusCode); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(hasRequestContent: false); } // Tests proxying an upgradeable request. [Fact] public async Task ProxyAsync_UpgradableRequest_Works() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Scheme = "http"; @@ -428,6 +453,9 @@ public async Task ProxyAsync_UpgradableRequest_Works() upstreamStream.WriteStream.Position = 0; var sentToUpstream = StreamToString(upstreamStream.WriteStream); Assert.Equal("request content", sentToUpstream); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(upgrade: true); } // Tests proxying an upgradeable request where the destination refused to upgrade. @@ -435,6 +463,8 @@ public async Task ProxyAsync_UpgradableRequest_Works() [Fact] public async Task ProxyAsync_UpgradableRequestFailsToUpgrade_ProxiesResponse() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Scheme = "https"; @@ -488,6 +518,9 @@ public async Task ProxyAsync_UpgradableRequestFailsToUpgrade_ProxiesResponse() proxyResponseStream.Position = 0; var proxyResponseText = StreamToString(proxyResponseStream); Assert.Equal("response content", proxyResponseText); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(hasRequestContent: false, upgrade: false); } [Theory] @@ -510,6 +543,8 @@ public async Task ProxyAsync_UpgradableRequestFailsToUpgrade_ProxiesResponse() // [InlineData("CONNECT", "HTTP/1.1", "")] Blocked in HttpUtilities.GetHttpMethod public async Task ProxyAsync_RequetsWithoutBodies_NoHttpContent(string method, string protocol, string headers) { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = method; httpContext.Request.Protocol = protocol; @@ -538,6 +573,9 @@ public async Task ProxyAsync_RequetsWithoutBodies_NoHttpContent(string method, s await sut.ProxyAsync(httpContext, destinationPrefix, client, new RequestProxyOptions()); Assert.Equal(StatusCodes.Status200OK, httpContext.Response.StatusCode); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(hasRequestContent: false); } [Theory] @@ -553,6 +591,8 @@ public async Task ProxyAsync_RequetsWithoutBodies_NoHttpContent(string method, s [InlineData("Delete", "HTTP/2", "expect:100-continue")] public async Task ProxyAsync_RequetsWithBodies_HasHttpContent(string method, string protocol, string headers) { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = method; httpContext.Request.Protocol = protocol; @@ -583,11 +623,16 @@ public async Task ProxyAsync_RequetsWithBodies_HasHttpContent(string method, str await sut.ProxyAsync(httpContext, destinationPrefix, client, new RequestProxyOptions()); Assert.Equal(StatusCodes.Status200OK, httpContext.Response.StatusCode); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(); } [Fact] public async Task ProxyAsync_RequestWithCookieHeaders() { + var events = TestEventListener.Collect(); + // This is an invalid format per spec but may happen due to https://github.com/dotnet/aspnetcore/issues/26461 var cookies = new [] { "testA=A_Cookie", "testB=B_Cookie", "testC=C_Cookie" }; var httpContext = new DefaultHttpContext(); @@ -600,7 +645,7 @@ public async Task ProxyAsync_RequestWithCookieHeaders() (HttpRequestMessage request, CancellationToken cancellationToken) => { // "testA=A_Cookie; testB=B_Cookie; testC=C_Cookie" - var expectedCookieString = String.Join("; ", cookies); + var expectedCookieString = string.Join("; ", cookies); Assert.Equal(new Version(2, 0), request.Version); Assert.Equal("GET", request.Method.Method, StringComparer.OrdinalIgnoreCase); @@ -618,11 +663,16 @@ public async Task ProxyAsync_RequestWithCookieHeaders() Assert.Null(httpContext.Features.Get()); Assert.Equal(StatusCodes.Status200OK, httpContext.Response.StatusCode); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(hasRequestContent: false); } [Fact] public async Task ProxyAsync_OptionsWithVersion() { + var events = TestEventListener.Collect(); + // Use any non-default value var version = new Version(5, 5); #if NET @@ -659,11 +709,16 @@ public async Task ProxyAsync_OptionsWithVersion() Assert.Null(httpContext.Features.Get()); Assert.Equal(StatusCodes.Status200OK, httpContext.Response.StatusCode); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(hasRequestContent: false); } [Fact] public async Task ProxyAsync_OptionsWithVersion_Transformed() { + var events = TestEventListener.Collect(); + // Use any non-default value var version = new Version(5, 5); var transformedVersion = new Version(6, 6); @@ -717,11 +772,16 @@ public async Task ProxyAsync_OptionsWithVersion_Transformed() Assert.Null(httpContext.Features.Get()); Assert.Equal(StatusCodes.Status200OK, httpContext.Response.StatusCode); + + AssertProxyStartStop(events, destinationPrefix, httpContext.Response.StatusCode); + events.AssertContainProxyStages(hasRequestContent: false); } [Fact] public async Task ProxyAsync_UnableToConnect_Returns502() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -744,11 +804,16 @@ public async Task ProxyAsync_UnableToConnect_Returns502() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.Request, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { ProxyStage.SendAsyncStart }); } [Fact] public async Task ProxyAsync_UnableToConnectWithBody_Returns502() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -773,11 +838,16 @@ public async Task ProxyAsync_UnableToConnectWithBody_Returns502() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.Request, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { ProxyStage.SendAsyncStart }); } [Fact] public async Task ProxyAsync_RequestTimedOut_Returns504() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -807,11 +877,16 @@ public async Task ProxyAsync_RequestTimedOut_Returns504() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestTimedOut, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { ProxyStage.SendAsyncStart }); } [Fact] public async Task ProxyAsync_RequestCanceled_Returns502() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -836,11 +911,16 @@ public async Task ProxyAsync_RequestCanceled_Returns502() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestCanceled, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { ProxyStage.SendAsyncStart }); } [Fact] public async Task ProxyAsync_RequestWithBodyTimedOut_Returns504() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -872,11 +952,16 @@ public async Task ProxyAsync_RequestWithBodyTimedOut_Returns504() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestTimedOut, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { ProxyStage.SendAsyncStart }); } [Fact] public async Task ProxyAsync_RequestWithBodyCanceled_Returns502() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -903,11 +988,16 @@ public async Task ProxyAsync_RequestWithBodyCanceled_Returns502() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestCanceled, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { ProxyStage.SendAsyncStart }); } [Fact] public async Task ProxyAsync_RequestBodyClientErrorBeforeResponseError_Returns400() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -934,11 +1024,19 @@ public async Task ProxyAsync_RequestBodyClientErrorBeforeResponseError_Returns40 var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestBodyClient, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { + ProxyStage.SendAsyncStart, + ProxyStage.RequestContentTransferStart + }); } [Fact] public async Task ProxyAsync_RequestBodyDestinationErrorBeforeResponseError_Returns502() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -965,11 +1063,19 @@ public async Task ProxyAsync_RequestBodyDestinationErrorBeforeResponseError_Retu var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestBodyDestination, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { + ProxyStage.SendAsyncStart, + ProxyStage.RequestContentTransferStart + }); } [Fact] public async Task ProxyAsync_RequestBodyCanceledBeforeResponseError_Returns502() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -1001,11 +1107,16 @@ public async Task ProxyAsync_RequestBodyCanceledBeforeResponseError_Returns502() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestBodyCanceled, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(new[] { ProxyStage.SendAsyncStart }); } [Fact] public async Task ProxyAsync_ResponseBodyDestionationErrorFirstRead_Returns502() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -1034,11 +1145,16 @@ public async Task ProxyAsync_ResponseBodyDestionationErrorFirstRead_Returns502() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.ResponseBodyDestination, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(hasRequestContent: false); } [Fact] public async Task ProxyAsync_ResponseBodyDestionationErrorSecondRead_Aborted() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -1069,11 +1185,16 @@ public async Task ProxyAsync_ResponseBodyDestionationErrorSecondRead_Aborted() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.ResponseBodyDestination, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(hasRequestContent: false); } [Fact] public async Task ProxyAsync_ResponseBodyClientError_Aborted() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -1103,11 +1224,16 @@ public async Task ProxyAsync_ResponseBodyClientError_Aborted() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.ResponseBodyClient, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(hasRequestContent: false); } [Fact] public async Task ProxyAsync_ResponseBodyCancelled_502() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -1138,11 +1264,16 @@ public async Task ProxyAsync_ResponseBodyCancelled_502() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.ResponseBodyCanceled, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(hasRequestContent: false); } [Fact] public async Task ProxyAsync_ResponseBodyCancelledAfterStart_Aborted() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Host = new HostString("example.com:3456"); @@ -1173,11 +1304,16 @@ public async Task ProxyAsync_ResponseBodyCancelledAfterStart_Aborted() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.ResponseBodyCanceled, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(hasRequestContent: false); } [Fact] public async Task ProxyAsync_RequestBodyCanceledAfterResponse_Reported() { + var events = TestEventListener.Collect(); + var waitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; @@ -1216,11 +1352,16 @@ public async Task ProxyAsync_RequestBodyCanceledAfterResponse_Reported() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestBodyCanceled, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(); } [Fact] public async Task ProxyAsync_RequestBodyClientErrorAfterResponse_Reported() { + var events = TestEventListener.Collect(); + var waitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; @@ -1252,11 +1393,16 @@ public async Task ProxyAsync_RequestBodyClientErrorAfterResponse_Reported() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestBodyClient, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(); } [Fact] public async Task ProxyAsync_RequestBodyDestinationErrorAfterResponse_Reported() { + var events = TestEventListener.Collect(); + var waitTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "POST"; @@ -1288,11 +1434,16 @@ public async Task ProxyAsync_RequestBodyDestinationErrorAfterResponse_Reported() var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.RequestBodyDestination, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(); } [Fact] public async Task ProxyAsync_UpgradableRequest_RequestBodyCopyError_CancelsResponseBody() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Scheme = "http"; @@ -1310,6 +1461,7 @@ public async Task ProxyAsync_UpgradableRequest_RequestBodyCopyError_CancelsRespo upgradeFeatureMock.Setup(u => u.UpgradeAsync()).ReturnsAsync(downstreamStream); httpContext.Features.Set(upgradeFeatureMock.Object); + var destinationPrefix = "https://localhost/"; var sut = CreateProxy(); var client = MockHttpHandler.CreateClient( (HttpRequestMessage request, CancellationToken cancellationToken) => @@ -1332,17 +1484,22 @@ public async Task ProxyAsync_UpgradableRequest_RequestBodyCopyError_CancelsRespo return Task.FromResult(response); }); - await sut.ProxyAsync(httpContext, "https://localhost/", client, new RequestProxyOptions()); + await sut.ProxyAsync(httpContext, destinationPrefix, client, new RequestProxyOptions()); Assert.Equal(StatusCodes.Status101SwitchingProtocols, httpContext.Response.StatusCode); var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.UpgradeRequestClient, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(upgrade: true); } [Fact] public async Task ProxyAsync_UpgradableRequest_ResponseBodyCopyError_CancelsRequestBody() { + var events = TestEventListener.Collect(); + var httpContext = new DefaultHttpContext(); httpContext.Request.Method = "GET"; httpContext.Request.Scheme = "http"; @@ -1365,6 +1522,7 @@ public async Task ProxyAsync_UpgradableRequest_ResponseBodyCopyError_CancelsRequ upgradeFeatureMock.Setup(u => u.UpgradeAsync()).ReturnsAsync(downstreamStream); httpContext.Features.Set(upgradeFeatureMock.Object); + var destinationPrefix = "https://localhost/"; var sut = CreateProxy(); var client = MockHttpHandler.CreateClient( (HttpRequestMessage request, CancellationToken cancellationToken) => @@ -1382,12 +1540,45 @@ public async Task ProxyAsync_UpgradableRequest_ResponseBodyCopyError_CancelsRequ return Task.FromResult(response); }); - await sut.ProxyAsync(httpContext, "https://localhost/", client, new RequestProxyOptions()); + await sut.ProxyAsync(httpContext, destinationPrefix, client, new RequestProxyOptions()); Assert.Equal(StatusCodes.Status101SwitchingProtocols, httpContext.Response.StatusCode); var errorFeature = httpContext.Features.Get(); Assert.Equal(ProxyError.UpgradeResponseDestination, errorFeature.Error); Assert.IsType(errorFeature.Exception); + + AssertProxyStartFailedStop(events, destinationPrefix, httpContext.Response.StatusCode, errorFeature.Error); + events.AssertContainProxyStages(upgrade: true); + } + + private static void AssertProxyStartStop(List events, string destinationPrefix, int statusCode) + { + AssertProxyStartFailedStop(events, destinationPrefix, statusCode, error: null); + } + + private static void AssertProxyStartFailedStop(List events, string destinationPrefix, int statusCode, ProxyError? error) + { + var start = Assert.Single(events, e => e.EventName == "ProxyStart"); + var prefixActual = (string)Assert.Single(start.Payload); + Assert.Equal(destinationPrefix, prefixActual); + + var stop = Assert.Single(events, e => e.EventName == "ProxyStop"); + var statusActual = (int)Assert.Single(stop.Payload); + Assert.Equal(statusCode, statusActual); + Assert.True(start.TimeStamp <= stop.TimeStamp); + + if (error is null) + { + Assert.DoesNotContain(events, e => e.EventName == "ProxyFailed"); + } + else + { + var failed = Assert.Single(events, e => e.EventName == "ProxyFailed"); + var errorActual = (ProxyError)Assert.Single(failed.Payload); + Assert.Equal(error.Value, errorActual); + Assert.True(start.TimeStamp <= failed.TimeStamp); + Assert.True(failed.TimeStamp <= stop.TimeStamp); + } } private static MemoryStream StringToStream(string text) @@ -1820,7 +2011,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati private class TestRequestParametersTransform : RequestParametersTransform { - private Action _transformation; + private readonly Action _transformation; public TestRequestParametersTransform(Action transformation) { diff --git a/test/ReverseProxy.Tests/Service/Proxy/StreamCopierTests.cs b/test/ReverseProxy.Tests/Service/Proxy/StreamCopierTests.cs index 620829435..4bdb17cf5 100644 --- a/test/ReverseProxy.Tests/Service/Proxy/StreamCopierTests.cs +++ b/test/ReverseProxy.Tests/Service/Proxy/StreamCopierTests.cs @@ -2,61 +2,265 @@ // Licensed under the MIT License. using System; +using System.Collections.Generic; +using System.Diagnostics.Tracing; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.ReverseProxy.Common.Tests; +using Microsoft.ReverseProxy.Telemetry; +using Microsoft.ReverseProxy.Utilities; using Xunit; namespace Microsoft.ReverseProxy.Service.Proxy.Tests { public class StreamCopierTests : TestAutoMockBase { - [Fact] - public async Task CopyAsync_Works() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task CopyAsync_Works(bool isRequest) { + var events = TestEventListener.Collect(); + const int SourceSize = (128 * 1024) - 3; var sourceBytes = Enumerable.Range(0, SourceSize).Select(i => (byte)(i % 256)).ToArray(); var source = new MemoryStream(sourceBytes); var destination = new MemoryStream(); - await StreamCopier.CopyAsync(source, destination, CancellationToken.None); + await StreamCopier.CopyAsync(isRequest, source, destination, new Clock(), CancellationToken.None); Assert.Equal(sourceBytes, destination.ToArray()); + + AssertContentTransferred(events, isRequest, SourceSize); } - [Fact] - public async Task SourceThrows_Reported() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task SourceThrows_Reported(bool isRequest) { - var source = new ThrowStream(); + var events = TestEventListener.Collect(); + + var clock = new ManualClock(); + var sourceWaitTime = TimeSpan.FromMilliseconds(12345); + var source = new SlowStream(new ThrowStream(), clock, sourceWaitTime); var destination = new MemoryStream(); - var (result, error) = await StreamCopier.CopyAsync(source, destination, CancellationToken.None); + var (result, error) = await StreamCopier.CopyAsync(isRequest, source, destination, clock, CancellationToken.None); Assert.Equal(StreamCopyResult.InputError, result); Assert.IsAssignableFrom(error); + + AssertContentTransferred(events, isRequest, + contentLength: 0, + iops: 1, + firstReadTime: sourceWaitTime, + readTime: sourceWaitTime, + writeTime: TimeSpan.Zero); } - [Fact] - public async Task DestinationThrows_Reported() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task DestinationThrows_Reported(bool isRequest) { - var source = new MemoryStream(new byte[10]); - var destination = new ThrowStream(); + var events = TestEventListener.Collect(); + + const int SourceSize = 10; + const int BytesPerRead = 3; - var (result, error) = await StreamCopier.CopyAsync(source, destination, CancellationToken.None); + var clock = new ManualClock(); + var sourceWaitTime = TimeSpan.FromMilliseconds(12345); + var destinationWaitTime = TimeSpan.FromMilliseconds(42); + var source = new SlowStream(new MemoryStream(new byte[SourceSize]), clock, sourceWaitTime) { MaxBytesPerRead = BytesPerRead }; + var destination = new SlowStream(new ThrowStream(), clock, destinationWaitTime); + + var (result, error) = await StreamCopier.CopyAsync(isRequest, source, destination, clock, CancellationToken.None); Assert.Equal(StreamCopyResult.OutputError, result); Assert.IsAssignableFrom(error); + + AssertContentTransferred(events, isRequest, + contentLength: BytesPerRead, + iops: 1, + firstReadTime: sourceWaitTime, + readTime: sourceWaitTime, + writeTime: destinationWaitTime); } - [Fact] - public async Task Cancelled_Reported() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task Cancelled_Reported(bool isRequest) { + var events = TestEventListener.Collect(); + var source = new MemoryStream(new byte[10]); var destination = new MemoryStream(); - var (result, error) = await StreamCopier.CopyAsync(source, destination, new CancellationToken(canceled: true)); + var (result, error) = await StreamCopier.CopyAsync(isRequest, source, destination, new Clock(), new CancellationToken(canceled: true)); Assert.Equal(StreamCopyResult.Canceled, result); Assert.IsAssignableFrom(error); + + AssertContentTransferred(events, isRequest, + contentLength: 0, + iops: 0, + firstReadTime: TimeSpan.Zero, + readTime: TimeSpan.Zero, + writeTime: TimeSpan.Zero); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task SlowStreams_TelemetryReportsCorrectTime(bool isRequest) + { + var events = TestEventListener.Collect(); + + const int SourceSize = 3; + var sourceBytes = new byte[SourceSize]; + var source = new MemoryStream(sourceBytes); + var destination = new MemoryStream(); + + var clock = new ManualClock(); + var sourceWaitTime = TimeSpan.FromMilliseconds(12345); + var destinationWaitTime = TimeSpan.FromMilliseconds(42); + + await StreamCopier.CopyAsync( + isRequest, + new SlowStream(source, clock, sourceWaitTime), + new SlowStream(destination, clock, destinationWaitTime), + clock, + CancellationToken.None); + + Assert.Equal(sourceBytes, destination.ToArray()); + + AssertContentTransferred(events, isRequest, SourceSize, + iops: SourceSize + 1, + firstReadTime: sourceWaitTime, + readTime: (SourceSize + 1) * sourceWaitTime, + writeTime: SourceSize * destinationWaitTime); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task LongContentTransfer_TelemetryReportsTransferringEvents(bool isRequest) + { + var events = TestEventListener.Collect(); + + const int SourceSize = 123; + var sourceBytes = new byte[SourceSize]; + var source = new MemoryStream(sourceBytes); + var destination = new MemoryStream(); + + var clock = new ManualClock(); + var sourceWaitTime = TimeSpan.FromMilliseconds(789); // Every second read triggers ContentTransferring + var destinationWaitTime = TimeSpan.FromMilliseconds(42); + + const int BytesPerRead = 3; + var contentReads = (int)Math.Ceiling((double)SourceSize / BytesPerRead); + + await StreamCopier.CopyAsync( + isRequest, + new SlowStream(source, clock, sourceWaitTime) { MaxBytesPerRead = BytesPerRead }, + new SlowStream(destination, clock, destinationWaitTime), + clock, + CancellationToken.None); + + Assert.Equal(sourceBytes, destination.ToArray()); + + AssertContentTransferred(events, isRequest, SourceSize, + iops: contentReads + 1, + firstReadTime: sourceWaitTime, + readTime: (contentReads + 1) * sourceWaitTime, + writeTime: contentReads * destinationWaitTime); + + var transferringEvents = events.Where(e => e.EventName == "ContentTransferring").ToArray(); + Assert.Equal(contentReads / 2, transferringEvents.Length); + + for (var i = 0; i < transferringEvents.Length; i++) + { + var payload = transferringEvents[i].Payload; + Assert.Equal(5, payload.Count); + + Assert.Equal(isRequest, (bool)payload[0]); + + var contentLength = (long)payload[1]; + + var iops = (long)payload[2]; + Assert.Equal((i + 1) * 2, iops); + + if (contentLength % BytesPerRead == 0) + { + Assert.Equal(iops * BytesPerRead, contentLength); + } + else + { + Assert.Equal(transferringEvents.Length - 1, i); + Assert.Equal(SourceSize, contentLength); + } + + var readTime = new TimeSpan((long)payload[3]); + Assert.Equal(iops * sourceWaitTime, readTime, new ApproximateTimeSpanComparer()); + + var writeTime = new TimeSpan((long)payload[4]); + Assert.Equal(iops * destinationWaitTime, writeTime, new ApproximateTimeSpanComparer()); + } + } + + private static void AssertContentTransferred( + List events, + bool isRequest, + long contentLength, + long? iops = null, + TimeSpan? firstReadTime = null, + TimeSpan? readTime = null, + TimeSpan? writeTime = null) + { + var contentTransferred = Assert.Single(events, e => e.EventName == "ContentTransferred"); + var payload = contentTransferred.Payload; + Assert.Equal(6, payload.Count); + + Assert.Equal(isRequest, (bool)payload[0]); + Assert.Equal(contentLength, (long)payload[1]); + + var actualIops = (long)payload[2]; + if (iops.HasValue) + { + Assert.Equal(iops.Value, actualIops); + } + else + { + Assert.InRange(actualIops, 1, contentLength + 1); + } + + if (readTime.HasValue) + { + Assert.Equal(readTime.Value, new TimeSpan((long)payload[3]), new ApproximateTimeSpanComparer()); + } + + if (writeTime.HasValue) + { + Assert.Equal(writeTime.Value, new TimeSpan((long)payload[4]), new ApproximateTimeSpanComparer()); + } + + if (firstReadTime.HasValue) + { + Assert.Equal(firstReadTime.Value, new TimeSpan((long)payload[5]), new ApproximateTimeSpanComparer()); + + if (readTime.HasValue) + { + Assert.True(firstReadTime.Value <= readTime.Value); + } + } + + var stages = events.GetProxyStages(); + + var startStage = isRequest ? ProxyStage.RequestContentTransferStart : ProxyStage.ResponseContentTransferStart; + var startTime = Assert.Single(stages, s => s.Stage == startStage).TimeStamp; + + Assert.True(startTime <= contentTransferred.TimeStamp); } private class ThrowStream : Stream @@ -101,5 +305,43 @@ public override void Write(byte[] buffer, int offset, int count) throw new IOException("Fake connection issue"); } } + + private class SlowStream : DelegatingStream + { + private readonly TimeSpan _waitTime; + private readonly ManualClock _clock; + + public int MaxBytesPerRead { get; set; } = 1; + + public SlowStream(Stream innerStream, ManualClock clock, TimeSpan waitTime) + : base(innerStream) + { + _clock = clock; + _waitTime = waitTime; + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + _clock.Time += _waitTime; + return base.ReadAsync(buffer.Slice(0, Math.Min(buffer.Length, MaxBytesPerRead)), cancellationToken); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + _clock.Time += _waitTime; + return base.WriteAsync(buffer, cancellationToken); + } + } + + private class ApproximateTimeSpanComparer : IEqualityComparer + { + public TimeSpan Precision { get; set; } = TimeSpan.FromMilliseconds(0.1); + + public bool Equals(TimeSpan x, TimeSpan y) => x > y + ? x - y <= Precision + : y - x <= Precision; + + public int GetHashCode(TimeSpan obj) => 42; + } } } diff --git a/test/ReverseProxy.Tests/Service/Proxy/StreamCopyHttpContentTests.cs b/test/ReverseProxy.Tests/Service/Proxy/StreamCopyHttpContentTests.cs index ff5b4d509..d3d2a4e60 100644 --- a/test/ReverseProxy.Tests/Service/Proxy/StreamCopyHttpContentTests.cs +++ b/test/ReverseProxy.Tests/Service/Proxy/StreamCopyHttpContentTests.cs @@ -8,6 +8,7 @@ using System.Reflection; using System.Threading; using System.Threading.Tasks; +using Microsoft.ReverseProxy.Utilities; using Moq; using Xunit; @@ -25,7 +26,7 @@ public async Task CopyToAsync_InvokesStreamCopier() var source = new MemoryStream(sourceBytes); var destination = new MemoryStream(); - var sut = new StreamCopyHttpContent(source, autoFlushHttpClientOutgoingStream: false, CancellationToken.None); + var sut = new StreamCopyHttpContent(source, autoFlushHttpClientOutgoingStream: false, new Clock(), CancellationToken.None); // Act & Assert Assert.False(sut.ConsumptionTask.IsCompleted); @@ -50,7 +51,7 @@ public async Task CopyToAsync_AutoFlushing(bool autoFlush, int expectedFlushes) var destination = new MemoryStream(); var flushCountingDestination = new FlushCountingStream(destination); - var sut = new StreamCopyHttpContent(source, autoFlushHttpClientOutgoingStream: autoFlush, CancellationToken.None); + var sut = new StreamCopyHttpContent(source, autoFlushHttpClientOutgoingStream: autoFlush, new Clock(), CancellationToken.None); // Act & Assert Assert.False(sut.ConsumptionTask.IsCompleted); @@ -72,7 +73,7 @@ public async Task CopyToAsync_AsyncSequencing() source.Setup(s => s.ReadAsync(It.IsAny>(), It.IsAny())).Returns(() => new ValueTask(tcs.Task)); var destination = new MemoryStream(); - var sut = new StreamCopyHttpContent(source.Object, autoFlushHttpClientOutgoingStream: false, CancellationToken.None); + var sut = new StreamCopyHttpContent(source.Object, autoFlushHttpClientOutgoingStream: false, new Clock(), CancellationToken.None); // Act & Assert Assert.False(sut.ConsumptionTask.IsCompleted); @@ -93,7 +94,7 @@ public Task ReadAsStreamAsync_Throws() // Arrange var source = new MemoryStream(); var destination = new MemoryStream(); - var sut = new StreamCopyHttpContent(source, autoFlushHttpClientOutgoingStream: false, CancellationToken.None); + var sut = new StreamCopyHttpContent(source, autoFlushHttpClientOutgoingStream: false, new Clock(), CancellationToken.None); // Act Func func = () => sut.ReadAsStreamAsync(); @@ -107,7 +108,7 @@ public void AllowDuplex_ReturnsTrue() { // Arrange var source = new MemoryStream(); - var sut = new StreamCopyHttpContent(source, autoFlushHttpClientOutgoingStream: false, CancellationToken.None); + var sut = new StreamCopyHttpContent(source, autoFlushHttpClientOutgoingStream: false, new Clock(), CancellationToken.None); // Assert // This is an internal property that HttpClient and friends use internally and which must be true