Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add response read timeouts #10128

Merged
merged 16 commits into from
Feb 25, 2020
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Release History

## 1.1.0-preview.1 (Unreleased)

### Fixes and improvements
- Add OPTIONS and TRACE HTTP request methods.
- Add `NetworkTimeout` property to `RetryOptions` and apply it to network operations like sending request or reading from the response stream.

## 1.0.2 (2020-01-10)

Expand Down
1 change: 1 addition & 0 deletions sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ internal RetryOptions() { }
public System.TimeSpan MaxDelay { [System.Runtime.CompilerServices.CompilerGeneratedAttribute] get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute] set { } }
public int MaxRetries { [System.Runtime.CompilerServices.CompilerGeneratedAttribute] get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute] set { } }
public Azure.Core.RetryMode Mode { [System.Runtime.CompilerServices.CompilerGeneratedAttribute] get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute] set { } }
public System.TimeSpan NetworkTimeout { [System.Runtime.CompilerServices.CompilerGeneratedAttribute] get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute] set { } }
}
public abstract partial class TokenCredential
{
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/Azure.Core/src/Pipeline/HttpPipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static HttpPipeline Build(ClientOptions options, HttpPipelinePolicy[] per
diagnostics.LoggedHeaderNames.ToArray(), diagnostics.LoggedQueryParameters.ToArray()));
}

policies.Add(BufferResponsePolicy.Shared);
policies.Add(new ResponseBodyPolicy(options.Retry.NetworkTimeout));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if the timeout was changed, and if not, use the shared policy


policies.Add(new RequestActivityPolicy(isDistributedTracingEnabled, ClientDiagnostics.GetResourceProviderNamespace(options.GetType().Assembly)));

Expand Down
58 changes: 0 additions & 58 deletions sdk/core/Azure.Core/src/Pipeline/Internal/BufferResponsePolicy.cs

This file was deleted.

128 changes: 128 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/Internal/ReadTimeoutStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
/// <summary>
/// Read-only Stream that will throw a <see cref="OperationCanceledException"/> if it has to wait longer than a configurable timeout to read more data
/// </summary>
internal class ReadTimeoutStream : ReadOnlyStream
heaths marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Stream _stream;
private TimeSpan _readTimeout;
private CancellationTokenSource _cancellationTokenSource;

public ReadTimeoutStream(Stream stream, TimeSpan readTimeout)
{
_stream = stream;
_readTimeout = readTimeout;
UpdateReadTimeout();
_cancellationTokenSource = new CancellationTokenSource();
}

public override int Read(byte[] buffer, int offset, int count)
{
return _stream.Read(buffer, offset, count);
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var source = StartTimeout(cancellationToken, out bool dispose);
try
{
return await _stream.ReadAsync(buffer, offset, count, source.Token).ConfigureAwait(false);
}
finally
{
StopTimeout(source, dispose);
}
}

private CancellationTokenSource StartTimeout(CancellationToken additionalToken, out bool dispose)
AlexanderSher marked this conversation as resolved.
Show resolved Hide resolved
{
if (_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource = new CancellationTokenSource();
}

CancellationTokenSource source;
if (additionalToken.CanBeCanceled)
{
source = CancellationTokenSource.CreateLinkedTokenSource(additionalToken, _cancellationTokenSource.Token);
dispose = true;
}
else
{
source = _cancellationTokenSource;
dispose = false;
}

_cancellationTokenSource.CancelAfter(_readTimeout);

return source;
}

private void StopTimeout(CancellationTokenSource source, bool dispose)
{
_cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
if (dispose)
{
source.Dispose();
}
}

public override long Seek(long offset, SeekOrigin origin)
{
return _stream.Seek(offset, origin);
}

public override bool CanRead => _stream.CanRead;
public override bool CanSeek => _stream.CanSeek;
public override long Length => _stream.Length;

public override long Position
{
get => _stream.Position;
set => _stream.Position = value;
}

public override int ReadTimeout
pakrym marked this conversation as resolved.
Show resolved Hide resolved
{
get => (int) _readTimeout.TotalMilliseconds;
set
{
_readTimeout = TimeSpan.FromMilliseconds(value);
UpdateReadTimeout();
}
}

private void UpdateReadTimeout()
{
try
{
_stream.ReadTimeout = (int) _readTimeout.TotalMilliseconds;
}
catch
{
// ignore
}
}

public override void Close()
{
_stream.Close();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_stream.Dispose();
pakrym marked this conversation as resolved.
Show resolved Hide resolved
_cancellationTokenSource.Dispose();
}
}
}
128 changes: 128 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/Internal/ResponseBodyPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Buffers;

namespace Azure.Core.Pipeline
{
/// <summary>
/// Pipeline policy to buffer response content or add a timeout to response content managed by the client
/// </summary>
internal class ResponseBodyPolicy : HttpPipelinePolicy
pakrym marked this conversation as resolved.
Show resolved Hide resolved
{
private const int DefaultCopyBufferSize = 81920;
pakrym marked this conversation as resolved.
Show resolved Hide resolved

private readonly TimeSpan _networkTimeout;

public ResponseBodyPolicy(TimeSpan networkTimeout)
{
_networkTimeout = networkTimeout;
}

public override async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
await ProcessAsync(message, pipeline, true).ConfigureAwait(false);
}

public override void Process(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
ProcessAsync(message, pipeline, false).EnsureCompleted();
}

private async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline, bool async)
{
CancellationToken oldToken = message.CancellationToken;
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(oldToken);

cts.CancelAfter(_networkTimeout);

try
{
message.CancellationToken = cts.Token;
if (async)
{
await ProcessNextAsync(message, pipeline).ConfigureAwait(false);
}
else
{
ProcessNext(message, pipeline);
}
}
finally
{
message.CancellationToken = oldToken;
cts.CancelAfter(Timeout.Infinite);
}

Stream? responseContentStream = message.Response.ContentStream;
if (responseContentStream == null || responseContentStream.CanSeek)
AlexanderSher marked this conversation as resolved.
Show resolved Hide resolved
{
return;
}

if (message.BufferResponse)
{
var bufferedStream = new MemoryStream();
if (async)
{
await CopyToAsync(responseContentStream, bufferedStream, cts).ConfigureAwait(false);
}
else
{
CopyTo(responseContentStream, bufferedStream, message.CancellationToken);
}

responseContentStream.Dispose();
bufferedStream.Position = 0;
message.Response.ContentStream = bufferedStream;
}
else if (_networkTimeout != Timeout.InfiniteTimeSpan)
{
message.Response.ContentStream = new ReadTimeoutStream(responseContentStream, _networkTimeout);
}
}

private async Task CopyToAsync(Stream source, Stream destination, CancellationTokenSource cancellationTokenSource)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);
try
{
while (true)
{
cancellationTokenSource.CancelAfter(_networkTimeout);
int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationTokenSource.Token).ConfigureAwait(false);
if (bytesRead == 0) break;
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationTokenSource.Token).ConfigureAwait(false);
}
}
finally
{
cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
ArrayPool<byte>.Shared.Return(buffer);
}
}

private static void CopyTo(Stream source, Stream destination, CancellationToken cancellationToken)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);
try
{
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
destination.Write(buffer, 0, read);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
5 changes: 5 additions & 0 deletions sdk/core/Azure.Core/src/RetryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,10 @@ internal RetryOptions()
/// The approach to use for calculating retry delays.
/// </summary>
public RetryMode Mode { get; set; } = RetryMode.Exponential;

/// <summary>
/// The timeout applied to an individual network operations.
/// </summary>
public TimeSpan NetworkTimeout { get; set; } = TimeSpan.FromSeconds(100);
}
}
11 changes: 7 additions & 4 deletions sdk/core/Azure.Core/src/Shared/RetriableStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,17 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
}
catch (Exception e)
{
await RetryAsync(e, true).ConfigureAwait(false);
await RetryAsync(e, true, cancellationToken).ConfigureAwait(false);
}
}
}

private async Task RetryAsync(Exception exception, bool async)
private async Task RetryAsync(Exception exception, bool async, CancellationToken cancellationToken)
{
if (!_responseClassifier.IsRetriableException(exception))
bool isNonCustomerCancelledException = exception is OperationCanceledException &&
!cancellationToken.IsCancellationRequested;

if (!_responseClassifier.IsRetriableException(exception) && !isNonCustomerCancelledException)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}
Expand Down Expand Up @@ -132,7 +135,7 @@ public override int Read(byte[] buffer, int offset, int count)
}
catch (Exception e)
{
RetryAsync(e, false).EnsureCompleted();
RetryAsync(e, false, default).EnsureCompleted();
}
}
}
Expand Down
Loading