Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core 2.0 Prototype: Merge "move buffering" and "response.Content" to a single PR #41907

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
3da9e8b
initial rework
annelo-msft Jan 29, 2024
ee0d905
continued refactor
annelo-msft Jan 29, 2024
fa7f392
WIP
annelo-msft Jan 29, 2024
0264e53
nits
annelo-msft Jan 29, 2024
4123619
fix and add some tests
annelo-msft Jan 29, 2024
2c41bfc
update
annelo-msft Jan 30, 2024
1cefea4
Merge remote-tracking branch 'upstream/feature/core-experiment' into …
annelo-msft Jan 30, 2024
f9120d1
update
annelo-msft Jan 30, 2024
a773379
Merge remote-tracking branch 'upstream/feature/core-experiment' into …
annelo-msft Jan 30, 2024
e53febc
fix
annelo-msft Jan 30, 2024
3ba62f7
fix
annelo-msft Jan 30, 2024
c926c83
fix
annelo-msft Jan 30, 2024
10c6c29
fix
annelo-msft Jan 30, 2024
85231fe
Add messages to Debug.Asserts; update test to opt-out of buffering
annelo-msft Jan 30, 2024
cc76d49
plumb through network timeout from clientoptions
annelo-msft Jan 30, 2024
698315a
more test fixes
annelo-msft Jan 30, 2024
0d93a04
move template call into Azure.Core base type
annelo-msft Jan 30, 2024
9383605
fix and address tests that rely on transport not buffering
annelo-msft Jan 30, 2024
433bd23
set NetworkTimeout in Pipeline.Send
annelo-msft Jan 30, 2024
46166fc
add functional tests for buffering
annelo-msft Jan 31, 2024
628e195
update
annelo-msft Jan 31, 2024
4c5492b
refactor
annelo-msft Jan 31, 2024
543e8f9
reorg
annelo-msft Feb 2, 2024
b322734
refactor
annelo-msft Feb 2, 2024
c3bf692
nits
annelo-msft Feb 2, 2024
84a8d7f
nit
annelo-msft Feb 2, 2024
d23785e
rewwork a bit
annelo-msft Feb 3, 2024
b01095a
rewwork a bit
annelo-msft Feb 3, 2024
9ac2475
Merge remote-tracking branch 'upstream/feature/core-experiment' into …
annelo-msft Feb 10, 2024
3b76dfc
updates
annelo-msft Feb 12, 2024
c0dc49b
fix
annelo-msft Feb 12, 2024
2128de2
initial move
annelo-msft Feb 12, 2024
f53d74d
clientmodel tests
annelo-msft Feb 12, 2024
6c775fa
Azure.Core implementation
annelo-msft Feb 12, 2024
f15202f
export API
annelo-msft Feb 12, 2024
700015b
bug fix
annelo-msft Feb 12, 2024
bc6614e
Merge branch 'core2-move-buffering-to-transport' into core2-content-b…
annelo-msft Feb 12, 2024
856d1a6
updates
annelo-msft Feb 12, 2024
adce0c9
fix internal timeout property issue
annelo-msft Feb 12, 2024
77f4d79
bug fix
annelo-msft Feb 12, 2024
47a3fd0
fix where dispose is called before buffer
annelo-msft Feb 12, 2024
444e8e5
bug fix
annelo-msft Feb 12, 2024
5b34a15
fix
annelo-msft Feb 12, 2024
a741614
updates from clientmodel PR
annelo-msft Feb 12, 2024
e879da7
instrumentation for debugging async issue
annelo-msft Feb 13, 2024
8d05e53
remove instrumentation
annelo-msft Feb 13, 2024
2d7d0d5
bug fix and remove more instrumentation
annelo-msft Feb 13, 2024
756783d
updates
annelo-msft Feb 13, 2024
2915cef
more cleanup
annelo-msft Feb 13, 2024
b673c67
update
annelo-msft Feb 13, 2024
86f748d
updates
annelo-msft Feb 13, 2024
2542998
keep in sync with PR to main
annelo-msft Feb 13, 2024
5ff5bc2
updates
annelo-msft Feb 13, 2024
09fe56b
add test for async exception factory
annelo-msft Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net461.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net472.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.net6.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,23 @@ public partial class RequestFailedException : System.ClientModel.ClientResultExc
public RequestFailedException(string message) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public RequestFailedException(string message, System.Exception? innerException) : base (default(System.ClientModel.Primitives.PipelineResponse), default(System.Exception)) { }
public string? ErrorCode { get { throw null; } }
public static System.Threading.Tasks.ValueTask<Azure.RequestFailedException> CreateAsync(Azure.Response response, Azure.Core.RequestFailedDetailsParser? parser = null, System.Exception? innerException = null) { throw null; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
public new Azure.Response? GetRawResponse() { throw null; }
}
public abstract partial class Response : System.ClientModel.Primitives.PipelineResponse
{
protected Response() { }
public abstract string ClientRequestId { get; set; }
public virtual new System.BinaryData Content { get { throw null; } }
public override System.BinaryData Content { get { throw null; } }
public virtual new Azure.Core.ResponseHeaders Headers { get { throw null; } }
protected internal abstract bool ContainsHeader(string name);
protected internal abstract System.Collections.Generic.IEnumerable<Azure.Core.HttpHeader> EnumerateHeaders();
public static Azure.Response<T> FromValue<T>(T value, Azure.Response response) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected override System.ClientModel.Primitives.PipelineResponseHeaders GetHeadersCore() { throw null; }
public override System.BinaryData ReadContent(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override System.Threading.Tasks.ValueTask<System.BinaryData> ReadContentAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
protected sealed override void SetIsErrorCore(bool isError) { }
public override string ToString() { throw null; }
Expand Down
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/src/ClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace Azure.Core
/// </summary>
public abstract class ClientOptions : ClientPipelineOptions
{
internal static readonly TimeSpan DefaultNetworkTimeout = TimeSpan.FromSeconds(100);

private HttpPipelineTransport _transport;
internal bool IsCustomTransportSet { get; private set; }

Expand Down
1 change: 1 addition & 0 deletions sdk/core/Azure.Core/src/HttpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public HttpMessage(Request request, ResponseClassifier responseClassifier)
Argument.AssertNotNull(request, nameof(request));

ResponseClassifier = responseClassifier;
NetworkTimeout = request.NetworkTimeout ?? ClientOptions.DefaultNetworkTimeout;
}

/// <summary>
Expand Down
80 changes: 80 additions & 0 deletions sdk/core/Azure.Core/src/Internal/AzureBaseBuffersExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace Azure.Core.Buffers
{
internal static class AzureBaseBuffersExtensions
{
// Same value as Stream.CopyTo uses by default
private const int DefaultCopyBufferSize = 81920;

public static async Task WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellation = default)
{
Argument.AssertNotNull(stream, nameof(stream));
Expand Down Expand Up @@ -87,5 +90,82 @@ public static async Task WriteAsync(this Stream stream, ReadOnlySequence<byte> b
ArrayPool<byte>.Shared.Return(array);
}
}

public static async Task CopyToAsync(this Stream source, Stream destination, CancellationToken cancellationToken)
{
//using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
//cts.CancelAfter(timeout);

//// If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then
//// register callback to dispose the stream on cancellation.
//if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled)
//{
// cts.Token.Register(state => ((Stream?)state)?.Dispose(), source);
//}

byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);

try
{
while (true)
{
#pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets
int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
#pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets
if (bytesRead == 0)
break;
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
}
}
//catch (Exception ex) when (ex is ObjectDisposedException
// or IOException
// or OperationCanceledException
// or NotSupportedException)
//{
// CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout);
// throw;
//}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

public static void CopyTo(this Stream source, Stream destination, CancellationToken cancellationToken)
{
//using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
//cts.CancelAfter(timeout);

//// If cancellation is possible (whether due to network timeout or a user cancellation token being passed), then
//// register callback to dispose the stream on cancellation.
//if (timeout != Timeout.InfiniteTimeSpan || cancellationToken.CanBeCanceled)
//{
// cts.Token.Register(state => ((Stream?)state)?.Dispose(), source);
//}

byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);

try
{
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
destination.Write(buffer, 0, read);
}
}
//catch (Exception ex) when (ex is ObjectDisposedException
// or IOException
// or OperationCanceledException
// or NotSupportedException)
//{
// CancellationHelper.ThrowIfCancellationRequestedOrTimeout(cancellationToken, cts.Token, ex, timeout);
// throw;
//}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
5 changes: 3 additions & 2 deletions sdk/core/Azure.Core/src/Pipeline/DisposableHttpPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public sealed class DisposableHttpPipeline : HttpPipeline, IDisposable
/// <param name="policies">Policies to be invoked as part of the pipeline in order.</param>
/// <param name="responseClassifier">The response classifier to be used in invocations.</param>
/// <param name="isTransportOwnedInternally"> </param>
internal DisposableHttpPipeline(HttpPipelineTransport transport, int perCallIndex, int perRetryIndex, HttpPipelinePolicy[] policies, ResponseClassifier responseClassifier, bool isTransportOwnedInternally)
: base(transport, perCallIndex, perRetryIndex, policies, responseClassifier)
/// <param name="networkTimeout"></param>
internal DisposableHttpPipeline(HttpPipelineTransport transport, int perCallIndex, int perRetryIndex, HttpPipelinePolicy[] policies, ResponseClassifier responseClassifier, bool isTransportOwnedInternally, TimeSpan networkTimeout)
: base(transport, perCallIndex, perRetryIndex, policies, responseClassifier, networkTimeout)
{
this.isTransportOwnedInternally = isTransportOwnedInternally;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.ClientModel.Primitives;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
Expand Down Expand Up @@ -41,6 +44,21 @@ public override Stream? ContentStream
set => _pipelineResponse.ContentStream = value;
}

public override BinaryData Content
{
get
{
ResetContentStreamPosition(_pipelineResponse);
return _pipelineResponse.Content;
}
}

public override BinaryData ReadContent(CancellationToken cancellationToken = default)
=> _pipelineResponse.ReadContent(cancellationToken);

public override async ValueTask<BinaryData> ReadContentAsync(CancellationToken cancellationToken = default)
=> await base.ReadContentAsync(cancellationToken).ConfigureAwait(false);

protected internal override bool ContainsHeader(string name)
=> _pipelineResponse.Headers.TryGetValue(name, out _);

Expand All @@ -61,8 +79,24 @@ protected internal override bool TryGetHeaderValues(string name, [NotNullWhen(tr
public override void Dispose()
{
PipelineResponse response = _pipelineResponse;
ResetContentStreamPosition(response);
response?.Dispose();
}

private void ResetContentStreamPosition(PipelineResponse response)
{
if (response.ContentStream is MemoryStream stream && stream.Position != 0)
{
// Azure.Core Response has a contract that ContentStream can be read
// without setting position back to 0. This means if ReadContent is
// called after such a read, the buffer will contain empty BinaryData.

// So that the ClientModel response implementations don't throw,
// set the position back to 0 if Azure.Core Response default
// ReadContent was called.
stream.Position = 0;
}
}
}
}
}
4 changes: 2 additions & 2 deletions sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ public override async ValueTask ProcessAsync(HttpMessage message)
{
if (message.HasResponse)
{
throw new RequestFailedException(message.Response, e.InnerException);
throw await RequestFailedException.CreateAsync(message.Response, innerException: e.InnerException).ConfigureAwait(false);
}
else
{
throw new RequestFailedException(e.Message, e.InnerException);
throw new RequestFailedException(e.Message, innerException: e.InnerException);
}
}
}
Expand Down
Loading
Loading