Skip to content

Commit

Permalink
Core 2.0 Prototype: Merge "move buffering" and "response.Content" to …
Browse files Browse the repository at this point in the history
…a single PR (#41907)

* initial rework

* continued refactor

* WIP

* nits

* fix and add some tests

* update

* update

* fix

* fix

* fix

* fix

* Add messages to Debug.Asserts; update test to opt-out of buffering

* plumb through network timeout from clientoptions

* more test fixes

* move template call into Azure.Core base type

* fix and address tests that rely on transport not buffering

* set NetworkTimeout in Pipeline.Send

* add functional tests for buffering

* update

* refactor

* reorg

* refactor

* nits

* nit

* rewwork a bit

* rewwork a bit

* updates

* fix

* initial move

* clientmodel tests

* Azure.Core implementation

* export API

* bug fix

* updates

* fix internal timeout property issue

* bug fix

* fix where dispose is called before buffer

* bug fix

* fix

* updates from clientmodel PR

* instrumentation for debugging async issue

* remove instrumentation

* bug fix and remove more instrumentation

* updates

* more cleanup

* update

* updates

* keep in sync with PR to main

* updates

* add test for async exception factory
  • Loading branch information
annelo-msft authored Feb 13, 2024
1 parent b3b1425 commit 5ede910
Show file tree
Hide file tree
Showing 50 changed files with 1,454 additions and 998 deletions.
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net461.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net472.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net6.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/src/ClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace Azure.Core
/// </summary>
public abstract class ClientOptions : ClientPipelineOptions
{
internal static readonly TimeSpan DefaultNetworkTimeout = TimeSpan.FromSeconds(100);

private HttpPipelineTransport _transport;
internal bool IsCustomTransportSet { get; private set; }

Expand Down
1 change: 1 addition & 0 deletions sdk/core/Azure.Core/src/HttpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public HttpMessage(Request request, ResponseClassifier responseClassifier)
Argument.AssertNotNull(request, nameof(request));

ResponseClassifier = responseClassifier;
NetworkTimeout = request.NetworkTimeout ?? ClientOptions.DefaultNetworkTimeout;
}

/// <summary>
Expand Down
80 changes: 80 additions & 0 deletions sdk/core/Azure.Core/src/Internal/AzureBaseBuffersExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace Azure.Core.Buffers
{
internal static class AzureBaseBuffersExtensions
{
// Same value as Stream.CopyTo uses by default
private const int DefaultCopyBufferSize = 81920;

public static async Task WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellation = default)
{
Argument.AssertNotNull(stream, nameof(stream));
Expand Down Expand Up @@ -87,5 +90,82 @@ public static async Task WriteAsync(this Stream stream, ReadOnlySequence<byte> b
ArrayPool<byte>.Shared.Return(array);
}
}

public static async Task CopyToAsync(this Stream source, Stream destination, CancellationToken cancellationToken)
{
//using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
//cts.CancelAfter(timeout);

//// If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then
//// register callback to dispose the stream on cancellation.
//if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled)
//{
// cts.Token.Register(state => ((Stream?)state)?.Dispose(), source);
//}

byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);

try
{
while (true)
{
#pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets
int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
#pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets
if (bytesRead == 0)
break;
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
}
}
//catch (Exception ex) when (ex is ObjectDisposedException
// or IOException
// or OperationCanceledException
// or NotSupportedException)
//{
// CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout);
// throw;
//}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

public static void CopyTo(this Stream source, Stream destination, CancellationToken cancellationToken)
{
//using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
//cts.CancelAfter(timeout);

//// If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then
//// register callback to dispose the stream on cancellation.
//if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled)
//{
// cts.Token.Register(state => ((Stream?)state)?.Dispose(), source);
//}

byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);

try
{
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
destination.Write(buffer, 0, read);
}
}
//catch (Exception ex) when (ex is ObjectDisposedException
// or IOException
// or OperationCanceledException
// or NotSupportedException)
//{
// CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout);
// throw;
//}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
5 changes: 3 additions & 2 deletions sdk/core/Azure.Core/src/Pipeline/DisposableHttpPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public sealed class DisposableHttpPipeline : HttpPipeline, IDisposable
/// <param name="policies">Policies to be invoked as part of the pipeline in order.</param>
/// <param name="responseClassifier">The response classifier to be used in invocations.</param>
/// <param name="isTransportOwnedInternally"> </param>
internal DisposableHttpPipeline(HttpPipelineTransport transport, int perCallIndex, int perRetryIndex, HttpPipelinePolicy[] policies, ResponseClassifier responseClassifier, bool isTransportOwnedInternally)
: base(transport, perCallIndex, perRetryIndex, policies, responseClassifier)
/// <param name="networkTimeout"></param>
internal DisposableHttpPipeline(HttpPipelineTransport transport, int perCallIndex, int perRetryIndex, HttpPipelinePolicy[] policies, ResponseClassifier responseClassifier, bool isTransportOwnedInternally, TimeSpan networkTimeout)
: base(transport, perCallIndex, perRetryIndex, policies, responseClassifier, networkTimeout)
{
this.isTransportOwnedInternally = isTransportOwnedInternally;
}
Expand Down
34 changes: 34 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.Response.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.ClientModel.Primitives;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
Expand Down Expand Up @@ -41,6 +44,21 @@ public override Stream? ContentStream
set => _pipelineResponse.ContentStream = value;
}

public override BinaryData Content
{
get
{
ResetContentStreamPosition(_pipelineResponse);
return _pipelineResponse.Content;
}
}

public override BinaryData ReadContent(CancellationToken cancellationToken = default)
=> _pipelineResponse.ReadContent(cancellationToken);

public override async ValueTask<BinaryData> ReadContentAsync(CancellationToken cancellationToken = default)
=> await base.ReadContentAsync(cancellationToken).ConfigureAwait(false);

protected internal override bool ContainsHeader(string name)
=> _pipelineResponse.Headers.TryGetValue(name, out _);

Expand All @@ -61,8 +79,24 @@ protected internal override bool TryGetHeaderValues(string name, [NotNullWhen(tr
public override void Dispose()
{
PipelineResponse response = _pipelineResponse;
ResetContentStreamPosition(response);
response?.Dispose();
}

private void ResetContentStreamPosition(PipelineResponse response)
{
if (response.ContentStream is MemoryStream stream && stream.Position != 0)
{
// Azure.Core Response has a contract that ContentStream can be read
// without setting position back to 0. This means if ReadContent is
// called after such a read, the buffer will contain empty BinaryData.

// So that the ClientModel response implementations don't throw,
// set the position back to 0 if Azure.Core Response default
// ReadContent was called.
stream.Position = 0;
}
}
}
}
}
4 changes: 2 additions & 2 deletions sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ public override async ValueTask ProcessAsync(HttpMessage message)
{
if (message.HasResponse)
{
throw new RequestFailedException(message.Response, e.InnerException);
throw await RequestFailedException.CreateAsync(message.Response, innerException: e.InnerException).ConfigureAwait(false);
}
else
{
throw new RequestFailedException(e.Message, e.InnerException);
throw new RequestFailedException(e.Message, innerException: e.InnerException);
}
}
}
Expand Down
Loading

0 comments on commit 5ede910

Please sign in to comment.