Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add response read timeouts #10128

Merged
merged 16 commits into from
Feb 25, 2020
2 changes: 2 additions & 0 deletions sdk/core/Azure.Core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Release History

## 1.1.0-preview.1 (Unreleased)

### Fixes and improvements
- Add OPTIONS and TRACE HTTP request methods.
- Add `TryTimeout` property to `RetryOptions` and apply if to stream reads.

## 1.0.2 (2020-01-10)

Expand Down
5 changes: 4 additions & 1 deletion sdk/core/Azure.Core/src/Pipeline/HttpClientTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ private static HttpClient CreateDefaultClient()
httpClientHandler.Proxy = webProxy;
}

return new HttpClient(httpClientHandler);
return new HttpClient(httpClientHandler)
{
Timeout = Timeout.InfiniteTimeSpan
};
}

private static HttpRequestMessage BuildRequestMessage(HttpMessage message)
Expand Down
4 changes: 3 additions & 1 deletion sdk/core/Azure.Core/src/Pipeline/HttpPipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public static HttpPipeline Build(ClientOptions options, HttpPipelinePolicy[] per
RetryOptions retryOptions = options.Retry;
policies.Add(new RetryPolicy(retryOptions.Mode, retryOptions.Delay, retryOptions.MaxDelay, retryOptions.MaxRetries));

policies.Add(new OperationTimeoutPolicy(options.Retry.TryTimeout));

policies.AddRange(perRetryPolicies);

policies.AddRange(options.PerRetryPolicies);
Expand All @@ -73,7 +75,7 @@ public static HttpPipeline Build(ClientOptions options, HttpPipelinePolicy[] per
diagnostics.LoggedHeaderNames.ToArray(), diagnostics.LoggedQueryParameters.ToArray()));
}

policies.Add(BufferResponsePolicy.Shared);
policies.Add(new ResponseBodyPolicy(options.Retry.TryTimeout));

policies.Add(new RequestActivityPolicy(isDistributedTracingEnabled, ClientDiagnostics.GetResourceProviderNamespace(options.GetType().Assembly)));

Expand Down
58 changes: 0 additions & 58 deletions sdk/core/Azure.Core/src/Pipeline/Internal/BufferResponsePolicy.cs

This file was deleted.

123 changes: 123 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/Internal/ReadTimeoutStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
internal class ReadTimeoutStream : ReadOnlyStream
heaths marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Stream _stream;
private TimeSpan _readTimeout;
private CancellationTokenSource _cancellationTokenSource;

public ReadTimeoutStream(Stream stream, TimeSpan readTimeout)
{
_stream = stream;
_readTimeout = readTimeout;
UpdateReadTimeout();
_cancellationTokenSource = new CancellationTokenSource();
}

public override int Read(byte[] buffer, int offset, int count)
{
return _stream.Read(buffer, offset, count);
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var source = StartTimeout(cancellationToken, out bool dispose);
try
{
source.Token.Register(() =>
{
Console.WriteLine("Cancelled");
pakrym marked this conversation as resolved.
Show resolved Hide resolved
});
return await _stream.ReadAsync(buffer, offset, count, source.Token).ConfigureAwait(false);
}
finally
{
StopTimeout(source, dispose);
}
}

private CancellationTokenSource StartTimeout(CancellationToken additionalToken, out bool dispose)
AlexanderSher marked this conversation as resolved.
Show resolved Hide resolved
{
if (_cancellationTokenSource.IsCancellationRequested)
{
_cancellationTokenSource = new CancellationTokenSource();
}

CancellationTokenSource source;
if (additionalToken.CanBeCanceled)
{
source = CancellationTokenSource.CreateLinkedTokenSource(additionalToken, _cancellationTokenSource.Token);
dispose = true;
}
else
{
source = _cancellationTokenSource;
dispose = false;
}

_cancellationTokenSource.CancelAfter(_readTimeout);

return source;
}

private void StopTimeout(CancellationTokenSource source, bool dispose)
{
_cancellationTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
if (dispose)
{
source.Dispose();
}
}

public override long Seek(long offset, SeekOrigin origin)
{
return _stream.Seek(offset, origin);
}

public override bool CanRead => _stream.CanRead;
public override bool CanSeek => _stream.CanSeek;
public override long Length => _stream.Length;

public override long Position
{
get => _stream.Position;
set => _stream.Position = value;
}

public override int ReadTimeout
pakrym marked this conversation as resolved.
Show resolved Hide resolved
{
get => _readTimeout.Milliseconds;
pakrym marked this conversation as resolved.
Show resolved Hide resolved
set
{
_readTimeout = TimeSpan.FromMilliseconds(value);
UpdateReadTimeout();
}
}

private void UpdateReadTimeout()
{
try
{
_stream.ReadTimeout = _readTimeout.Milliseconds;
pakrym marked this conversation as resolved.
Show resolved Hide resolved
}
catch
{
// ignore
}
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
_stream.Dispose();
pakrym marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
68 changes: 68 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/Internal/ResponseBodyPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading.Tasks;

namespace Azure.Core.Pipeline
{
internal class ResponseBodyPolicy : HttpPipelinePolicy
pakrym marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly TimeSpan _readTimeout;

public ResponseBodyPolicy(TimeSpan readTimeout)
{
_readTimeout = readTimeout;
}

public override async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
await ProcessAsync(message, pipeline, true).ConfigureAwait(false);
}

public override void Process(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline)
{
ProcessAsync(message, pipeline, false).EnsureCompleted();
}

private async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline, bool async)
{
if (async)
{
await ProcessNextAsync(message, pipeline).ConfigureAwait(false);
}
else
{
ProcessNext(message, pipeline);
}

Stream? responseContentStream = message.Response.ContentStream;
if (responseContentStream == null || responseContentStream.CanSeek)
AlexanderSher marked this conversation as resolved.
Show resolved Hide resolved
{
return;
}

if (message.BufferResponse)
{
var bufferedStream = new MemoryStream();
if (async)
{
await responseContentStream.CopyToAsync(bufferedStream, message.CancellationToken).ConfigureAwait(false);
}
else
{
responseContentStream.CopyTo(bufferedStream, message.CancellationToken);
}

responseContentStream.Dispose();
bufferedStream.Position = 0;
message.Response.ContentStream = bufferedStream;
}
else
{
message.Response.ContentStream = new ReadTimeoutStream(responseContentStream, _readTimeout);
pakrym marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
53 changes: 53 additions & 0 deletions sdk/core/Azure.Core/src/Pipeline/Internal/StreamExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Buffers;

namespace Azure.Core.Pipeline
{
internal static class StreamExtensions
{
internal const int DefaultCopyBufferSize = 81920;

public static async Task CopyToAsync(this Stream source, Stream destination, CancellationToken cancellationToken)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);
try
{
while (true)
{
int bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0) break;
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

public static void CopyTo(this Stream source, Stream destination, CancellationToken cancellationToken)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(DefaultCopyBufferSize);
try
{
int read;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0)
{
cancellationToken.ThrowIfCancellationRequested();
destination.Write(buffer, 0, read);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
Loading