Skip to content

Commit

Permalink
Add response read timeouts (#10169)
Browse files Browse the repository at this point in the history
  • Loading branch information
pakrym authored Feb 26, 2020
1 parent 94a6962 commit 29fd275
Show file tree
Hide file tree
Showing 12 changed files with 594 additions and 112 deletions.
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Release History

## 1.1.0-preview.1 (Unreleased)

### Fixes and improvements
- Add OPTIONS and TRACE HTTP request methods.
- Add `NetworkTimeout` property to `RetryOptions` and apply it to network operations like sending request or reading from the response stream.

## 1.0.2 (2020-01-10)

Expand Down
1 change: 1 addition & 0 deletions sdk/core/Azure.Core/api/Azure.Core.netstandard2.0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ internal RetryOptions() { }
public System.TimeSpan MaxDelay { [System.Runtime.CompilerServices.CompilerGeneratedAttribute] get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute] set { } }
public int MaxRetries { [System.Runtime.CompilerServices.CompilerGeneratedAttribute] get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute] set { } }
public Azure.Core.RetryMode Mode { [System.Runtime.CompilerServices.CompilerGeneratedAttribute] get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute] set { } }
public System.TimeSpan NetworkTimeout { [System.Runtime.CompilerServices.CompilerGeneratedAttribute] get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute] set { } }
}
public abstract partial class TokenCredential
{
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/Azure.Core/src/Pipeline/HttpPipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static HttpPipeline Build(ClientOptions options, HttpPipelinePolicy[] per
diagnostics.LoggedHeaderNames.ToArray(), diagnostics.LoggedQueryParameters.ToArray()));
}

policies.Add(BufferResponsePolicy.Shared);
policies.Add(new ResponseBodyPolicy(options.Retry.NetworkTimeout));

policies.Add(new RequestActivityPolicy(isDistributedTracingEnabled, ClientDiagnostics.GetResourceProviderNamespace(options.GetType().Assembly)));

Expand Down
58 changes: 0 additions & 58 deletions sdk/core/Azure.Core/src/Pipeline/Internal/BufferResponsePolicy.cs

This file was deleted.

128 changes: 128 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/Internal/ReadTimeoutStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
/// <summary>
/// Read-only Stream that will throw a <see cref="OperationCanceledException"/> if it has to wait longer than a configurable timeout to read more data
/// </summary>
internal class ReadTimeoutStream : ReadOnlyStream
{
private readonly Stream _stream;
private TimeSpan _readTimeout;
private CancellationTokenSource _cancellationTokenSource;

public ReadTimeoutStream(Stream stream, TimeSpan readTimeout)
{
_stream = stream;
_readTimeout = readTimeout;
UpdateReadTimeout();
_cancellationTokenSource = new CancellationTokenSource();
}

public override int Read(byte[] buffer, int offset, int count)
{
return _stream.Read(buffer, offset, count);
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var source = StartTimeout(cancellationToken, out bool dispose);
try
{
return await _stream.ReadAsync(buffer, offset, count, source.Token).ConfigureAwait(false);
}
finally
{
StopTimeout(source, dispose);
}
}

private CancellationTokenSource StartTimeout(CancellationToken additionalToken, out bool dispose)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource = new CancellationTokenSource();
}

CancellationTokenSource source;
if (additionalToken.CanBeCanceled)
{
source = CancellationTokenSource.CreateLinkedTokenSource(additionalToken, _cancellationTokenSource.Token);
dispose = true;
}
else
{
source = _cancellationTokenSource;
dispose = false;
}

_cancellationTokenSource.CancelAfter(_readTimeout);

return source;
}

private void StopTimeout(CancellationTokenSource source, bool dispose)
{
_cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
if (dispose)
{
source.Dispose();
}
}

public override long Seek(long offset, SeekOrigin origin)
{
return _stream.Seek(offset, origin);
}

public override bool CanRead => _stream.CanRead;
public override bool CanSeek => _stream.CanSeek;
public override long Length => _stream.Length;

public override long Position
{
get => _stream.Position;
set => _stream.Position = value;
}

public override int ReadTimeout
{
get => (int) _readTimeout.TotalMilliseconds;
set
{
_readTimeout = TimeSpan.FromMilliseconds(value);
UpdateReadTimeout();
}
}

private void UpdateReadTimeout()
{
try
{
_stream.ReadTimeout = (int) _readTimeout.TotalMilliseconds;
}
catch
{
// ignore
}
}

public override void Close()
{
_stream.Close();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_stream.Dispose();
_cancellationTokenSource.Dispose();
}
}
}
129 changes: 129 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/Internal/ResponseBodyPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Buffers;

namespace Azure.Core.Pipeline
{
/// <summary>
/// Pipeline policy to buffer response content or add a timeout to response content managed by the client
/// </summary>
internal class ResponseBodyPolicy : HttpPipelinePolicy
{
// Same value as Stream.CopyTo uses by default
private const int DefaultCopyBufferSize = 81920;

private readonly TimeSpan _networkTimeout;

public ResponseBodyPolicy(TimeSpan networkTimeout)
{
_networkTimeout = networkTimeout;
}

public override async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
await ProcessAsync(message, pipeline, true).ConfigureAwait(false);
}

public override void Process(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
ProcessAsync(message, pipeline, false).EnsureCompleted();
}

private async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline, bool async)
{
CancellationToken oldToken = message.CancellationToken;
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(oldToken);

cts.CancelAfter(_networkTimeout);

try
{
message.CancellationToken = cts.Token;
if (async)
{
await ProcessNextAsync(message, pipeline).ConfigureAwait(false);
}
else
{
ProcessNext(message, pipeline);
}
}
finally
{
message.CancellationToken = oldToken;
cts.CancelAfter(Timeout.Infinite);
}

Stream? responseContentStream = message.Response.ContentStream;
if (responseContentStream == null || responseContentStream.CanSeek)
{
return;
}

if (message.BufferResponse)
{
var bufferedStream = new MemoryStream();
if (async)
{
await CopyToAsync(responseContentStream, bufferedStream, cts).ConfigureAwait(false);
}
else
{
CopyTo(responseContentStream, bufferedStream, message.CancellationToken);
}

responseContentStream.Dispose();
bufferedStream.Position = 0;
message.Response.ContentStream = bufferedStream;
}
else if (_networkTimeout != Timeout.InfiniteTimeSpan)
{
message.Response.ContentStream = new ReadTimeoutStream(responseContentStream, _networkTimeout);
}
}

private async Task CopyToAsync(Stream source, Stream destination, CancellationTokenSource cancellationTokenSource)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);
try
{
while (true)
{
cancellationTokenSource.CancelAfter(_networkTimeout);
int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationTokenSource.Token).ConfigureAwait(false);
if (bytesRead == 0) break;
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationTokenSource.Token).ConfigureAwait(false);
}
}
finally
{
cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
ArrayPool<byte>.Shared.Return(buffer);
}
}

private static void CopyTo(Stream source, Stream destination, CancellationToken cancellationToken)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);
try
{
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
destination.Write(buffer, 0, read);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
5 changes: 5 additions & 0 deletions sdk/core/Azure.Core/src/RetryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,10 @@ internal RetryOptions()
/// The approach to use for calculating retry delays.
/// </summary>
public RetryMode Mode { get; set; } = RetryMode.Exponential;

/// <summary>
/// The timeout applied to an individual network operations.
/// </summary>
public TimeSpan NetworkTimeout { get; set; } = TimeSpan.FromSeconds(100);
}
}
11 changes: 7 additions & 4 deletions sdk/core/Azure.Core/src/Shared/RetriableStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,17 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
}
catch (Exception e)
{
await RetryAsync(e, true).ConfigureAwait(false);
await RetryAsync(e, true, cancellationToken).ConfigureAwait(false);
}
}
}

private async Task RetryAsync(Exception exception, bool async)
private async Task RetryAsync(Exception exception, bool async, CancellationToken cancellationToken)
{
if (!_responseClassifier.IsRetriableException(exception))
bool isNonCustomerCancelledException = exception is OperationCanceledException &&
!cancellationToken.IsCancellationRequested;

if (!_responseClassifier.IsRetriableException(exception) && !isNonCustomerCancelledException)
{
ExceptionDispatchInfo.Capture(exception).Throw();
}
Expand Down Expand Up @@ -132,7 +135,7 @@ public override int Read(byte[] buffer, int offset, int count)
}
catch (Exception e)
{
RetryAsync(e, false).EnsureCompleted();
RetryAsync(e, false, default).EnsureCompleted();
}
}
}
Expand Down
Loading

0 comments on commit 29fd275

Please sign in to comment.