Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientModel Prototype: Move response buffering to transport #41651

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3da9e8b
initial rework
annelo-msft Jan 29, 2024
ee0d905
continued refactor
annelo-msft Jan 29, 2024
fa7f392
WIP
annelo-msft Jan 29, 2024
0264e53
nits
annelo-msft Jan 29, 2024
4123619
fix and add some tests
annelo-msft Jan 29, 2024
2c41bfc
update
annelo-msft Jan 30, 2024
1cefea4
Merge remote-tracking branch 'upstream/feature/core-experiment' into …
annelo-msft Jan 30, 2024
f9120d1
update
annelo-msft Jan 30, 2024
a773379
Merge remote-tracking branch 'upstream/feature/core-experiment' into …
annelo-msft Jan 30, 2024
e53febc
fix
annelo-msft Jan 30, 2024
3ba62f7
fix
annelo-msft Jan 30, 2024
c926c83
fix
annelo-msft Jan 30, 2024
10c6c29
fix
annelo-msft Jan 30, 2024
85231fe
Add messages to Debug.Asserts; update test to opt-out of buffering
annelo-msft Jan 30, 2024
cc76d49
plumb through network timeout from clientoptions
annelo-msft Jan 30, 2024
698315a
more test fixes
annelo-msft Jan 30, 2024
0d93a04
move template call into Azure.Core base type
annelo-msft Jan 30, 2024
9383605
fix and address tests that rely on transport not buffering
annelo-msft Jan 30, 2024
433bd23
set NetworkTimeout in Pipeline.Send
annelo-msft Jan 30, 2024
46166fc
add functional tests for buffering
annelo-msft Jan 31, 2024
628e195
update
annelo-msft Jan 31, 2024
4c5492b
refactor
annelo-msft Jan 31, 2024
543e8f9
reorg
annelo-msft Feb 2, 2024
b322734
refactor
annelo-msft Feb 2, 2024
c3bf692
nits
annelo-msft Feb 2, 2024
84a8d7f
nit
annelo-msft Feb 2, 2024
d23785e
rewwork a bit
annelo-msft Feb 3, 2024
b01095a
rewwork a bit
annelo-msft Feb 3, 2024
9ac2475
Merge remote-tracking branch 'upstream/feature/core-experiment' into …
annelo-msft Feb 10, 2024
3b76dfc
updates
annelo-msft Feb 12, 2024
c0dc49b
fix
annelo-msft Feb 12, 2024
700015b
bug fix
annelo-msft Feb 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/src/ClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace Azure.Core
/// </summary>
public abstract class ClientOptions : ClientPipelineOptions
{
internal static readonly TimeSpan DefaultNetworkTimeout = TimeSpan.FromSeconds(100);

private HttpPipelineTransport _transport;
internal bool IsCustomTransportSet { get; private set; }

Expand Down
1 change: 1 addition & 0 deletions sdk/core/Azure.Core/src/HttpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public HttpMessage(Request request, ResponseClassifier responseClassifier)
Argument.AssertNotNull(request, nameof(request));

ResponseClassifier = responseClassifier;
NetworkTimeout = request.NetworkTimeout ?? ClientOptions.DefaultNetworkTimeout;
}

/// <summary>
Expand Down
5 changes: 3 additions & 2 deletions sdk/core/Azure.Core/src/Pipeline/DisposableHttpPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public sealed class DisposableHttpPipeline : HttpPipeline, IDisposable
/// <param name="policies">Policies to be invoked as part of the pipeline in order.</param>
/// <param name="responseClassifier">The response classifier to be used in invocations.</param>
/// <param name="isTransportOwnedInternally"> </param>
internal DisposableHttpPipeline(HttpPipelineTransport transport, int perCallIndex, int perRetryIndex, HttpPipelinePolicy[] policies, ResponseClassifier responseClassifier, bool isTransportOwnedInternally)
: base(transport, perCallIndex, perRetryIndex, policies, responseClassifier)
/// <param name="networkTimeout"></param>
internal DisposableHttpPipeline(HttpPipelineTransport transport, int perCallIndex, int perRetryIndex, HttpPipelinePolicy[] policies, ResponseClassifier responseClassifier, bool isTransportOwnedInternally, TimeSpan networkTimeout)
: base(transport, perCallIndex, perRetryIndex, policies, responseClassifier, networkTimeout)
{
this.isTransportOwnedInternally = isTransportOwnedInternally;
}
Expand Down
30 changes: 26 additions & 4 deletions sdk/core/Azure.Core/src/Pipeline/HttpPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class HttpPipeline
/// </summary>
private readonly int _perRetryIndex;

private readonly TimeSpan _networkTimeout;
annelo-msft marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Creates a new instance of <see cref="HttpPipeline"/> with the provided transport, policies and response classifier.
/// </summary>
Expand All @@ -59,14 +61,16 @@ public HttpPipeline(HttpPipelineTransport transport, HttpPipelinePolicy[]? polic
policies.CopyTo(all, 0);

_pipeline = all;
_networkTimeout = ClientOptions.DefaultNetworkTimeout;
}

internal HttpPipeline(
HttpPipelineTransport transport,
int perCallIndex,
int perRetryIndex,
HttpPipelinePolicy[] pipeline,
ResponseClassifier responseClassifier)
ResponseClassifier responseClassifier,
TimeSpan? networkTimeout)
{
ResponseClassifier = responseClassifier ?? throw new ArgumentNullException(nameof(responseClassifier));

Expand All @@ -77,6 +81,9 @@ internal HttpPipeline(

_perCallIndex = perCallIndex;
_perRetryIndex = perRetryIndex;

_networkTimeout = networkTimeout ?? ClientOptions.DefaultNetworkTimeout;

_internallyConstructed = true;
}

Expand All @@ -85,14 +92,22 @@ internal HttpPipeline(
/// </summary>
/// <returns>The request.</returns>
public Request CreateRequest()
=> _transport.CreateRequest();
{
Request request = _transport.CreateRequest();
request.NetworkTimeout = _networkTimeout;
return request;
}

/// <summary>
/// Creates a new <see cref="HttpMessage"/> instance.
/// </summary>
/// <returns>The message.</returns>
public HttpMessage CreateMessage()
=> new(CreateRequest(), ResponseClassifier);
{
Request request = CreateRequest();
HttpMessage message = new(request, ResponseClassifier);
return message;
}

/// <summary>
/// </summary>
Expand All @@ -109,7 +124,8 @@ public HttpMessage CreateMessage(RequestContext? context)
/// <returns>The message.</returns>
public HttpMessage CreateMessage(RequestContext? context, ResponseClassifier? classifier = default)
{
HttpMessage message = new(CreateRequest(), classifier ?? ResponseClassifier);
Request request = CreateRequest();
HttpMessage message = new(request, classifier ?? ResponseClassifier);

if (context != null)
{
Expand All @@ -134,6 +150,11 @@ public ValueTask SendAsync(HttpMessage message, CancellationToken cancellationTo
{
message.SetCancellationToken(cancellationToken);
message.ProcessingStartTime = DateTimeOffset.UtcNow;

// We must set NetworkTimeout here because the documentation for
// HttpMessage states that if a user sets this value to null, the
// pipeline will use the value set on ClientOptions.
message.NetworkTimeout ??= _networkTimeout;
AddHttpMessageProperties(message);

if (message.Policies == null || message.Policies.Count == 0)
Expand Down Expand Up @@ -169,6 +190,7 @@ public void Send(HttpMessage message, CancellationToken cancellationToken)
{
message.SetCancellationToken(cancellationToken);
message.ProcessingStartTime = DateTimeOffset.UtcNow;
message.NetworkTimeout ??= _networkTimeout;
AddHttpMessageProperties(message);

if (message.Policies == null || message.Policies.Count == 0)
Expand Down
16 changes: 7 additions & 9 deletions sdk/core/Azure.Core/src/Pipeline/HttpPipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static HttpPipeline Build(
((List<HttpPipelinePolicy>)pipelineOptions.PerRetryPolicies).AddRange(perRetryPolicies);
var result = BuildInternal(pipelineOptions, null);

return new HttpPipeline(result.Transport, result.PerCallIndex, result.PerRetryIndex, result.Policies, result.Classifier);
return new HttpPipeline(result.Transport, result.PerCallIndex, result.PerRetryIndex, result.Policies, result.Classifier, result.NetworkTimeout);
}

/// <summary>
Expand All @@ -63,7 +63,7 @@ public static DisposableHttpPipeline Build(ClientOptions options, HttpPipelinePo
((List<HttpPipelinePolicy>)pipelineOptions.PerCallPolicies).AddRange(perCallPolicies);
((List<HttpPipelinePolicy>)pipelineOptions.PerRetryPolicies).AddRange(perRetryPolicies);
var result = BuildInternal(pipelineOptions, transportOptions);
return new DisposableHttpPipeline(result.Transport, result.PerCallIndex, result.PerRetryIndex, result.Policies, result.Classifier, result.IsTransportOwned);
return new DisposableHttpPipeline(result.Transport, result.PerCallIndex, result.PerRetryIndex, result.Policies, result.Classifier, result.IsTransportOwned, result.NetworkTimeout);
}

/// <summary>
Expand All @@ -74,7 +74,7 @@ public static DisposableHttpPipeline Build(ClientOptions options, HttpPipelinePo
public static HttpPipeline Build(HttpPipelineOptions options)
{
var result = BuildInternal(options, null);
return new HttpPipeline(result.Transport, result.PerCallIndex, result.PerRetryIndex, result.Policies, result.Classifier);
return new HttpPipeline(result.Transport, result.PerCallIndex, result.PerRetryIndex, result.Policies, result.Classifier, result.NetworkTimeout);
}

/// <summary>
Expand All @@ -87,17 +87,17 @@ public static DisposableHttpPipeline Build(HttpPipelineOptions options, HttpPipe
{
Argument.AssertNotNull(transportOptions, nameof(transportOptions));
var result = BuildInternal(options, transportOptions);
return new DisposableHttpPipeline(result.Transport, result.PerCallIndex, result.PerRetryIndex, result.Policies, result.Classifier, result.IsTransportOwned);
return new DisposableHttpPipeline(result.Transport, result.PerCallIndex, result.PerRetryIndex, result.Policies, result.Classifier, result.IsTransportOwned, result.NetworkTimeout);
}

internal static (ResponseClassifier Classifier, HttpPipelineTransport Transport, int PerCallIndex, int PerRetryIndex, HttpPipelinePolicy[] Policies, bool IsTransportOwned) BuildInternal(
internal static (ResponseClassifier Classifier, HttpPipelineTransport Transport, int PerCallIndex, int PerRetryIndex, HttpPipelinePolicy[] Policies, bool IsTransportOwned, TimeSpan NetworkTimeout) BuildInternal(
HttpPipelineOptions buildOptions,
HttpPipelineTransportOptions? defaultTransportOptions)
{
Argument.AssertNotNull(buildOptions.PerCallPolicies, nameof(buildOptions.PerCallPolicies));
Argument.AssertNotNull(buildOptions.PerRetryPolicies, nameof(buildOptions.PerRetryPolicies));

var policies = new List<HttpPipelinePolicy>(8 +
var policies = new List<HttpPipelinePolicy>(7 +
(buildOptions.ClientOptions.Policies?.Count ?? 0) +
buildOptions.PerCallPolicies.Count +
buildOptions.PerRetryPolicies.Count);
Expand Down Expand Up @@ -181,8 +181,6 @@ void AddNonNullPolicies(HttpPipelinePolicy[] policiesToAdd)
policies.Add(new LoggingPolicy(diagnostics.IsLoggingContentEnabled, diagnostics.LoggedContentSizeLimit, sanitizer, assemblyName));
}

policies.Add(new ResponseBodyPolicy(buildOptions.ClientOptions.Retry.NetworkTimeout));

policies.Add(new RequestActivityPolicy(isDistributedTracingEnabled, ClientDiagnostics.GetResourceProviderNamespace(buildOptions.ClientOptions.GetType().Assembly), sanitizer));

AddUserPolicies(HttpPipelinePosition.BeforeTransport);
Expand All @@ -209,7 +207,7 @@ void AddNonNullPolicies(HttpPipelinePolicy[] policiesToAdd)

buildOptions.ResponseClassifier ??= ResponseClassifier.Shared;

return (buildOptions.ResponseClassifier, transport, perCallIndex, perRetryIndex, policies.ToArray(), isTransportInternallyCreated);
return (buildOptions.ResponseClassifier, transport, perCallIndex, perRetryIndex, policies.ToArray(), isTransportInternallyCreated, buildOptions.ClientOptions.Retry.NetworkTimeout);
}

// internal for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.ClientModel.Primitives;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline;

public partial class HttpPipelineTransport
{
private class AzureCorePipelineTransport : PipelineTransport
{
private readonly HttpPipelineTransport _transport;

public AzureCorePipelineTransport(HttpPipelineTransport transport)
{
_transport = transport;
}

protected override PipelineMessage CreateMessageCore()
{
Request request = _transport.CreateRequest();
return new HttpMessage(request, ResponseClassifier.Shared);
}

protected override void ProcessCore(PipelineMessage message)
{
HttpMessage httpMessage = AssertHttpMessage(message);
_transport.Process(httpMessage);
}

protected override async ValueTask ProcessCoreAsync(PipelineMessage message)
{
HttpMessage httpMessage = AssertHttpMessage(message);
await _transport.ProcessAsync(httpMessage).ConfigureAwait(false);
}

private static HttpMessage AssertHttpMessage(PipelineMessage message)
{
if (message is not HttpMessage httpMessage)
{
throw new InvalidOperationException($"Invalid type for PipelineMessage: '{message?.GetType()}'.");
}

return httpMessage;
}
}
}
24 changes: 22 additions & 2 deletions sdk/core/Azure.Core/src/Pipeline/HttpPipelineTransport.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Threading;
using System.ClientModel.Primitives;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
/// <summary>
/// Represents an HTTP pipeline transport used to send HTTP requests and receive responses.
/// </summary>
public abstract class HttpPipelineTransport
public abstract partial class HttpPipelineTransport
{
private readonly PipelineTransport _transport;

/// <summary>
/// TBD.
/// </summary>
protected HttpPipelineTransport()
{
_transport = new AzureCorePipelineTransport(this);
}

internal void ProcessInternal(HttpMessage message)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This effectively retroactively implements the template method and allows us to use ClientModel PipelineTransport validations in the Azure.Core abstract transport type. This gives us buffering in the default pipeline even for custom subtypes of HttpPipelineTransport.

{
_transport.Process(message);
}

internal async ValueTask ProcessInternalAsync(HttpMessage message)
{
await _transport.ProcessAsync(message).ConfigureAwait(false);
}

/// <summary>
/// Sends the request contained by the <paramref name="message"/> and sets the <see cref="HttpMessage.Response"/> property to received response synchronously.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@ public override async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory
{
Debug.Assert(pipeline.IsEmpty);

await _transport.ProcessAsync(message).ConfigureAwait(false);
await _transport.ProcessInternalAsync(message).ConfigureAwait(false);

message.Response.RequestFailedDetailsParser = _errorParser;
message.Response.Sanitizer = _sanitizer;

// TODO: I think we can remove the call to below?
message.Response.SetIsError(message.ResponseClassifier.IsErrorResponse(message));
}

public override void Process(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
Debug.Assert(pipeline.IsEmpty);

_transport.Process(message);
_transport.ProcessInternal(message);

message.Response.RequestFailedDetailsParser = _errorParser;
message.Response.Sanitizer = _sanitizer;
Expand Down
94 changes: 0 additions & 94 deletions sdk/core/Azure.Core/src/Pipeline/Internal/ResponseBodyPolicy.cs

This file was deleted.

Loading