Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientModel: Add transport and buffering policy to System.ClientModel #41223

Merged
merged 42 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9899b8b
initial addition of files
annelo-msft Jan 8, 2024
e23ee5e
pull buffering policy from this PR
annelo-msft Jan 8, 2024
ea0fb9a
Initial tests for BinaryContent and export API
annelo-msft Jan 8, 2024
a4481cf
increase code coverage
annelo-msft Jan 9, 2024
e362d57
misc tidy
annelo-msft Jan 9, 2024
b299747
Merge remote-tracking branch 'upstream/main' into clientmodel-add-pip…
annelo-msft Jan 9, 2024
18eb719
Updates from API renames
annelo-msft Jan 9, 2024
8d8d858
Add message property bag tests
annelo-msft Jan 9, 2024
1e8d228
backup of WIP
annelo-msft Jan 9, 2024
36e4440
Pipeline options tests and fix
annelo-msft Jan 10, 2024
487dbaa
pipeline create tests
annelo-msft Jan 10, 2024
ca95d86
PipelinePolicy update and test
annelo-msft Jan 10, 2024
f76b0ae
Add tests for StreamExtensions
annelo-msft Jan 10, 2024
9226b06
nit
annelo-msft Jan 10, 2024
9c6bb79
pr fb
annelo-msft Jan 10, 2024
f3aa986
PR feedback on ArrayBackedPropertyBag
annelo-msft Jan 10, 2024
1309c75
pr fb
annelo-msft Jan 10, 2024
352903a
pr fb
annelo-msft Jan 11, 2024
3abe73f
Merge remote-tracking branch 'upstream/main' into clientmodel-add-pip…
annelo-msft Jan 12, 2024
844944e
pr fb
annelo-msft Jan 12, 2024
5eb413c
initial move of transport files
annelo-msft Jan 12, 2024
1d0e4e1
updates
annelo-msft Jan 12, 2024
282fd20
WIP: add tests
annelo-msft Jan 12, 2024
afccd91
add tests
annelo-msft Jan 12, 2024
1543e05
export API
annelo-msft Jan 12, 2024
ecacf77
update
annelo-msft Jan 12, 2024
142fc9b
Merge remote-tracking branch 'upstream/main' into clientmodel-add-tra…
annelo-msft Jan 12, 2024
49c7acd
export API after merge
annelo-msft Jan 12, 2024
d02225e
updates
annelo-msft Jan 12, 2024
dd0152c
remove tests dependent on retry policy
annelo-msft Jan 12, 2024
e7a54fb
Add tests for internal header implementations
annelo-msft Jan 12, 2024
2be9e8b
pr fb: quick rename
annelo-msft Jan 13, 2024
cdb0d31
pr fb: buffering policy shared instance
annelo-msft Jan 13, 2024
7de8fbc
pr fb
annelo-msft Jan 13, 2024
c1ee9f1
pr fb: Add TaskExtensions to get EnsureCompleted method
annelo-msft Jan 13, 2024
4185d95
fix
annelo-msft Jan 13, 2024
533aaad
make BufferResponse and NetworkTimeout publicly settable on PipelineM…
annelo-msft Jan 13, 2024
113d12d
pr fb
annelo-msft Jan 16, 2024
71f0057
pr fb
annelo-msft Jan 16, 2024
a65a238
pr fb
annelo-msft Jan 16, 2024
70fdcf9
pr fb
annelo-msft Jan 16, 2024
b788a6a
Merge remote-tracking branch 'upstream/main' into clientmodel-add-tra…
annelo-msft Jan 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sdk/core/System.ClientModel/api/System.ClientModel.net6.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ public ClientPipelineOptions() { }
public System.ClientModel.Primitives.PipelineTransport? Transport { get { throw null; } set { } }
public void AddPolicy(System.ClientModel.Primitives.PipelinePolicy policy, System.ClientModel.Primitives.PipelinePosition position) { }
}
public partial class HttpClientPipelineTransport : System.ClientModel.Primitives.PipelineTransport, System.IDisposable
{
public static readonly System.ClientModel.Primitives.HttpClientPipelineTransport Shared;
public HttpClientPipelineTransport() { }
public HttpClientPipelineTransport(System.Net.Http.HttpClient client) { }
protected override System.ClientModel.Primitives.PipelineMessage CreateMessageCore() { throw null; }
public void Dispose() { }
protected virtual void Dispose(bool disposing) { }
protected virtual void OnReceivedResponse(System.ClientModel.Primitives.PipelineMessage message, System.Net.Http.HttpResponseMessage httpResponse) { }
annelo-msft marked this conversation as resolved.
Show resolved Hide resolved
protected virtual void OnSendingRequest(System.ClientModel.Primitives.PipelineMessage message, System.Net.Http.HttpRequestMessage httpRequest) { }
protected sealed override void ProcessCore(System.ClientModel.Primitives.PipelineMessage message) { }
protected sealed override System.Threading.Tasks.ValueTask ProcessCoreAsync(System.ClientModel.Primitives.PipelineMessage message) { throw null; }
}
public partial interface IJsonModel<out T> : System.ClientModel.Primitives.IPersistableModel<T>
{
T Create(ref System.Text.Json.Utf8JsonReader reader, System.ClientModel.Primitives.ModelReaderWriterOptions options);
Expand Down Expand Up @@ -102,7 +115,9 @@ public PersistableModelProxyAttribute([System.Diagnostics.CodeAnalysis.Dynamical
public partial class PipelineMessage : System.IDisposable
{
protected internal PipelineMessage(System.ClientModel.Primitives.PipelineRequest request) { }
public bool BufferResponse { get { throw null; } set { } }
public System.Threading.CancellationToken CancellationToken { get { throw null; } set { } }
public System.TimeSpan? NetworkTimeout { get { throw null; } set { } }
public System.ClientModel.Primitives.PipelineRequest Request { get { throw null; } }
public System.ClientModel.Primitives.PipelineResponse? Response { get { throw null; } protected internal set { } }
public void Dispose() { }
Expand Down Expand Up @@ -176,4 +191,10 @@ public sealed override void Process(System.ClientModel.Primitives.PipelineMessag
protected abstract void ProcessCore(System.ClientModel.Primitives.PipelineMessage message);
protected abstract System.Threading.Tasks.ValueTask ProcessCoreAsync(System.ClientModel.Primitives.PipelineMessage message);
}
public partial class ResponseBufferingPolicy : System.ClientModel.Primitives.PipelinePolicy
{
public ResponseBufferingPolicy() { }
public sealed override void Process(System.ClientModel.Primitives.PipelineMessage message, System.Collections.Generic.IReadOnlyList<System.ClientModel.Primitives.PipelinePolicy> pipeline, int currentIndex) { }
public sealed override System.Threading.Tasks.ValueTask ProcessAsync(System.ClientModel.Primitives.PipelineMessage message, System.Collections.Generic.IReadOnlyList<System.ClientModel.Primitives.PipelinePolicy> pipeline, int currentIndex) { throw null; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ public ClientPipelineOptions() { }
public System.ClientModel.Primitives.PipelineTransport? Transport { get { throw null; } set { } }
public void AddPolicy(System.ClientModel.Primitives.PipelinePolicy policy, System.ClientModel.Primitives.PipelinePosition position) { }
}
public partial class HttpClientPipelineTransport : System.ClientModel.Primitives.PipelineTransport, System.IDisposable
{
public static readonly System.ClientModel.Primitives.HttpClientPipelineTransport Shared;
public HttpClientPipelineTransport() { }
public HttpClientPipelineTransport(System.Net.Http.HttpClient client) { }
protected override System.ClientModel.Primitives.PipelineMessage CreateMessageCore() { throw null; }
public void Dispose() { }
protected virtual void Dispose(bool disposing) { }
protected virtual void OnReceivedResponse(System.ClientModel.Primitives.PipelineMessage message, System.Net.Http.HttpResponseMessage httpResponse) { }
protected virtual void OnSendingRequest(System.ClientModel.Primitives.PipelineMessage message, System.Net.Http.HttpRequestMessage httpRequest) { }
protected sealed override void ProcessCore(System.ClientModel.Primitives.PipelineMessage message) { }
protected sealed override System.Threading.Tasks.ValueTask ProcessCoreAsync(System.ClientModel.Primitives.PipelineMessage message) { throw null; }
}
public partial interface IJsonModel<out T> : System.ClientModel.Primitives.IPersistableModel<T>
{
T Create(ref System.Text.Json.Utf8JsonReader reader, System.ClientModel.Primitives.ModelReaderWriterOptions options);
Expand Down Expand Up @@ -101,7 +114,9 @@ public PersistableModelProxyAttribute(System.Type proxyType) { }
public partial class PipelineMessage : System.IDisposable
{
protected internal PipelineMessage(System.ClientModel.Primitives.PipelineRequest request) { }
public bool BufferResponse { get { throw null; } set { } }
public System.Threading.CancellationToken CancellationToken { get { throw null; } set { } }
public System.TimeSpan? NetworkTimeout { get { throw null; } set { } }
public System.ClientModel.Primitives.PipelineRequest Request { get { throw null; } }
public System.ClientModel.Primitives.PipelineResponse? Response { get { throw null; } protected internal set { } }
public void Dispose() { }
Expand Down Expand Up @@ -175,4 +190,10 @@ public sealed override void Process(System.ClientModel.Primitives.PipelineMessag
protected abstract void ProcessCore(System.ClientModel.Primitives.PipelineMessage message);
protected abstract System.Threading.Tasks.ValueTask ProcessCoreAsync(System.ClientModel.Primitives.PipelineMessage message);
}
public partial class ResponseBufferingPolicy : System.ClientModel.Primitives.PipelinePolicy
{
public ResponseBufferingPolicy() { }
public sealed override void Process(System.ClientModel.Primitives.PipelineMessage message, System.Collections.Generic.IReadOnlyList<System.ClientModel.Primitives.PipelinePolicy> pipeline, int currentIndex) { }
public sealed override System.Threading.Tasks.ValueTask ProcessAsync(System.ClientModel.Primitives.PipelineMessage message, System.Collections.Generic.IReadOnlyList<System.ClientModel.Primitives.PipelinePolicy> pipeline, int currentIndex) { throw null; }
}
}
47 changes: 47 additions & 0 deletions sdk/core/System.ClientModel/src/Internal/CancellationHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Threading;
using System.Threading.Tasks;

namespace System.ClientModel.Internal;

internal class CancellationHelper
annelo-msft marked this conversation as resolved.
Show resolved Hide resolved
{
#region CancellationToken helpers

/// <summary>The default message used by <see cref="OperationCanceledException"/>.</summary>
private static readonly string s_cancellationMessage = new OperationCanceledException().Message; // use same message as the default ctor

/// <summary>Determines whether to wrap an <see cref="Exception"/> in a cancellation exception.</summary>
/// <param name="exception">The exception.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> that may have triggered the exception.</param>
/// <returns>true if the exception should be wrapped; otherwise, false.</returns>
public static bool ShouldWrapInOperationCanceledException(Exception exception, CancellationToken cancellationToken) =>
!(exception is OperationCanceledException) && cancellationToken.IsCancellationRequested;

/// <summary>Throws a cancellation exception if cancellation has been requested via <paramref name="cancellationToken"/>.</summary>
/// <param name="cancellationToken">The token to check for a cancellation request.</param>
public static void ThrowIfCancellationRequested(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
ThrowOperationCanceledException(innerException: null, cancellationToken);
}
}

/// <summary>Throws a cancellation exception.</summary>
/// <param name="innerException">The inner exception to wrap. May be null.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> that triggered the cancellation.</param>
private static void ThrowOperationCanceledException(Exception? innerException, CancellationToken cancellationToken) =>
throw CreateOperationCanceledException(innerException, cancellationToken);

public static Exception CreateOperationCanceledException(Exception? innerException, CancellationToken cancellationToken, string? message = null) =>
#if NET6_0_OR_GREATER
new TaskCanceledException(message ?? s_cancellationMessage, innerException, cancellationToken); // TCE for compatibility with other handlers that use TaskCompletionSource.TrySetCanceled()
#else
new TaskCanceledException(message ?? s_cancellationMessage, innerException);
#endif

#endregion
}
198 changes: 198 additions & 0 deletions sdk/core/System.ClientModel/src/Internal/ReadTimeoutStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.ClientModel.Primitives;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.ClientModel.Internal;

/// <summary>
/// Read-only Stream that will throw a <see cref="OperationCanceledException"/> if it has to wait longer than a configurable timeout to read more data
/// </summary>
internal class ReadTimeoutStream : Stream
{
private readonly Stream _stream;
private TimeSpan _readTimeout;
private CancellationTokenSource _cancellationTokenSource = null!;

public ReadTimeoutStream(Stream stream, TimeSpan readTimeout)
{
_stream = stream;
_readTimeout = readTimeout;

UpdateReadTimeout();
InitializeTokenSource();
}

public override bool CanRead => _stream.CanRead;

public override bool CanSeek => _stream.CanSeek;

public override bool CanWrite => false;

public override long Length => _stream.Length;

public override long Position
{
get => _stream.Position;
set => _stream.Position = value;
}

public override int ReadTimeout
{
get => (int)_readTimeout.TotalMilliseconds;
set
{
_readTimeout = TimeSpan.FromMilliseconds(value);
UpdateReadTimeout();
}
}

public override void Close()
=> _stream.Close();

public override void Flush()
{
// Flush is allowed on read-only stream
}

public override int Read(byte[] buffer, int offset, int count)
{
var source = StartTimeout(default, out bool dispose);
try
{
return _stream.Read(buffer, offset, count);
annelo-msft marked this conversation as resolved.
Show resolved Hide resolved
}
// We dispose stream on timeout so catch and check if cancellation token was cancelled
catch (IOException ex)
{
ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout);
throw;
}
// We dispose stream on timeout so catch and check if cancellation token was cancelled
catch (ObjectDisposedException ex)
{
ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout);
throw;
}
catch (OperationCanceledException ex)
{
ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(default, source.Token, ex, _readTimeout);
throw;
}
finally
{
StopTimeout(source, dispose);
}
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var source = StartTimeout(cancellationToken, out bool dispose);
try
{
#pragma warning disable CA1835 // ReadAsync(Memory<>) overload is not available in all targets
return await _stream.ReadAsync(buffer, offset, count, source.Token).ConfigureAwait(false);
#pragma warning restore // ReadAsync(Memory<>) overload is not available in all targets
}
// We dispose stream on timeout so catch and check if cancellation token was cancelled
catch (IOException ex)
{
ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout);
throw;
}
// We dispose stream on timeout so catch and check if cancellation token was cancelled
catch (ObjectDisposedException ex)
{
ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout);
throw;
}
catch (OperationCanceledException ex)
{
ResponseBufferingPolicy.ThrowIfCancellationRequestedOrTimeout(cancellationToken, source.Token, ex, _readTimeout);
throw;
}
finally
{
StopTimeout(source, dispose);
}
}

public override long Seek(long offset, SeekOrigin origin)
=> _stream.Seek(offset, origin);

public override void SetLength(long value)
=> throw new NotSupportedException();

public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();

private CancellationTokenSource StartTimeout(CancellationToken additionalToken, out bool dispose)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
InitializeTokenSource();
}

CancellationTokenSource source;
if (additionalToken.CanBeCanceled)
{
source = CancellationTokenSource.CreateLinkedTokenSource(additionalToken, _cancellationTokenSource.Token);
dispose = true;
}
else
{
source = _cancellationTokenSource;
dispose = false;
}

_cancellationTokenSource.CancelAfter(_readTimeout);

return source;
}

private void InitializeTokenSource()
{
_cancellationTokenSource = new CancellationTokenSource();
_cancellationTokenSource.Token.Register(static state => ((ReadTimeoutStream)state!).DisposeStream(), this);
}

private void StopTimeout(CancellationTokenSource source, bool dispose)
{
_cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
if (dispose)
{
source.Dispose();
}
}

private void UpdateReadTimeout()
{
try
{
if (_stream.CanTimeout)
{
_stream.ReadTimeout = (int)_readTimeout.TotalMilliseconds;
}
}
catch
{
// ignore
}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

_stream.Dispose();
_cancellationTokenSource.Dispose();
}

private void DisposeStream()
{
_stream.Dispose();
}
}
Loading
Loading