Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Kestrel Request PipeReader #7603

Merged
merged 29 commits into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0a39f3f
Initial prototyping
jkotalik Feb 11, 2019
281efb4
Majority of refactor for HttpRequest
jkotalik Feb 12, 2019
df26a51
remove streams => bodyControl
jkotalik Feb 12, 2019
f0bca46
Things are generally working. TODO adding tests for everything
jkotalik Feb 13, 2019
9c5ca05
Rename and organization; Add Complete()
jkotalik Feb 13, 2019
879021a
Make things pipe like :D
jkotalik Feb 13, 2019
eb74db8
minor nits
jkotalik Feb 13, 2019
54e0f5d
updating tests
jkotalik Feb 13, 2019
e0a67d4
Start removing response pipe
jkotalik Feb 13, 2019
d3eb44a
Major refactor
jkotalik Feb 14, 2019
bfba404
Majority of refactor done. TODO tests
jkotalik Feb 14, 2019
3da4058
Remove InternalRequestBodyPipeReader
jkotalik Feb 14, 2019
9886fc0
Fix cancel
jkotalik Feb 14, 2019
aeaf6b1
bunch of tests
jkotalik Feb 15, 2019
f1ea1ec
minor nits
jkotalik Feb 15, 2019
196c72f
Handle Chunked.ReadAsync better
jkotalik Feb 15, 2019
a89e132
Renames for files and cleanup read/tryread logic
jkotalik Feb 15, 2019
118f8d1
make tests pass and some general cleanup
jkotalik Feb 15, 2019
af4234c
Add some more http1 tests
jkotalik Feb 15, 2019
d737360
RequestPipeReader tests
jkotalik Feb 15, 2019
d32295d
adding benchmarks even though they don't work
jkotalik Feb 15, 2019
20a6ad3
removing bad comments
jkotalik Feb 15, 2019
86ac73b
Wrap cancellation exceptions with TaskCanceledException
jkotalik Feb 16, 2019
1f35f3a
Majority of feedback.
jkotalik Feb 19, 2019
c9ac10d
nits and bad merge conflict resolution
jkotalik Feb 19, 2019
283396a
Log message
jkotalik Feb 22, 2019
c1799e1
Correct fix
jkotalik Feb 22, 2019
749537e
Don't complete the reader in Complete
jkotalik Feb 22, 2019
9f97ab5
Misread feedback
jkotalik Feb 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

17 changes: 3 additions & 14 deletions src/Servers/Kestrel/Core/src/Internal/Http/Http1Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public Http1Connection(HttpConnectionContext context)
_keepAliveTicks = ServerOptions.Limits.KeepAliveTimeout.Ticks;
_requestHeadersTimeoutTicks = ServerOptions.Limits.RequestHeadersTimeout.Ticks;

RequestBodyPipe = CreateRequestBodyPipe();

_http1Output = new Http1OutputProducer(
_context.Transport.Output,
_context.ConnectionId,
Expand All @@ -57,6 +55,7 @@ public Http1Connection(HttpConnectionContext context)

Input = _context.Transport.Input;
Output = _http1Output;
MemoryPool = _context.MemoryPool;
}

public PipeReader Input { get; }
Expand All @@ -67,6 +66,8 @@ public Http1Connection(HttpConnectionContext context)

public MinDataRate MinResponseDataRate { get; set; }

public MemoryPool<byte> MemoryPool { get; }

protected override void OnRequestProcessingEnded()
{
Input.Complete();
Expand Down Expand Up @@ -531,17 +532,5 @@ protected override bool TryParseRequest(ReadResult result, out bool endConnectio
}

void IRequestProcessor.Tick(DateTimeOffset now) { }

private Pipe CreateRequestBodyPipe()
=> new Pipe(new PipeOptions
(
pool: _context.MemoryPool,
readerScheduler: ServiceContext.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
useSynchronizationContext: false,
minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
public class Http1ContentLengthMessageBody : Http1MessageBody
{
private readonly long _contentLength;
private long _inputLength;
private ReadResult _readResult;
private bool _completed;
private int _userCanceled;

public Http1ContentLengthMessageBody(bool keepAlive, long contentLength, Http1Connection context)
: base(context)
{
RequestKeepAlive = keepAlive;
_contentLength = contentLength;
_inputLength = _contentLength;
}

public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfCompleted();

if (_inputLength == 0)
{
_readResult = new ReadResult(default, isCanceled: false, isCompleted: true);
return _readResult;
}

TryStart();

// The while(true) loop is required because the Http1 connection calls CancelPendingRead to unblock
// the call to StartTimingReadAsync to check if the request timed out.
// However, if the user called CancelPendingRead, we want that to return a canceled ReadResult
// We internally track an int for that.
while (true)
{
// The issue is that TryRead can get a canceled read result
// which is unknown to StartTimingReadAsync.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand this. Shouldn't TryRead check IsCanceled and throw a BadHttpRequestException too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try* API shouldn’t throw. Makes them unusable, so we should avoid it where possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was trying to avoid throwing in Try methods. However, TryRead throws internally in pipes, so we should consider fixing that in pipes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try* API shouldn’t throw. Makes them unusable, so we should avoid it where possible

Like @jkotalik points out, TryRead is a little special. Errors also need to be raised from TryRead when the writer completes with an error. TryRead should only return false when the PipeReader is still active but there's no data available to currently read.

Copy link
Member

@JamesNK JamesNK Feb 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little more info about the TryXXX pattern: TryXXX doesn't mean the method should never throw an exception. It means it shouldn't throw an exception for a certain use-case.

For example, int.TryParse will return false if the string cannot be parsed to an integer. But it will throw an ArgumentException if you give it an invalid NumberStyles value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TryRead should only return false when the PipeReader is still active but there's no data available to currently read.

Sure, it has to throw sometimes in those rare cases where the input is invalid. I don't know that I agree with the above.

if (_context.RequestTimedOut)
{
BadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
}

try
{
var readAwaitable = _context.Input.ReadAsync(cancellationToken);
_readResult = await StartTimingReadAsync(readAwaitable, cancellationToken);
}
catch (ConnectionAbortedException ex)
{
throw new TaskCanceledException("The request was aborted", ex);
}

if (_context.RequestTimedOut)
{
BadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
}

// Make sure to handle when this is canceled here.
if (_readResult.IsCanceled)
{
if (Interlocked.Exchange(ref _userCanceled, 0) == 1)
{
// Ignore the readResult if it wasn't by the user.
break;
}
else
{
// Reset the timing read here for the next call to read.
StopTimingRead(0);
continue;
}
}

var readableBuffer = _readResult.Buffer;
var readableBufferLength = readableBuffer.Length;
StopTimingRead(readableBufferLength);

CheckCompletedReadResult(_readResult);

if (readableBufferLength > 0)
{
CreateReadResultFromConnectionReadResult();

break;
jkotalik marked this conversation as resolved.
Show resolved Hide resolved
}
}

return _readResult;
}

public override bool TryRead(out ReadResult readResult)
{
ThrowIfCompleted();

if (_inputLength == 0)
{
readResult = new ReadResult(default, isCanceled: false, isCompleted: true);
return true;
}

TryStart();

if (!_context.Input.TryRead(out _readResult))
{
readResult = default;
return false;
}

if (_readResult.IsCanceled)
{
if (Interlocked.Exchange(ref _userCanceled, 0) == 0)
{
// Cancellation wasn't by the user, return default ReadResult
readResult = default;
return false;
}
}

CreateReadResultFromConnectionReadResult();
jkotalik marked this conversation as resolved.
Show resolved Hide resolved

readResult = _readResult;

return true;
}

private void ThrowIfCompleted()
{
if (_completed)
{
throw new InvalidOperationException("Reading is not allowed after the reader was completed.");
}
}

private void CreateReadResultFromConnectionReadResult()
{
if (_readResult.Buffer.Length > _inputLength)
{
_readResult = new ReadResult(_readResult.Buffer.Slice(0, _inputLength), _readResult.IsCanceled, isCompleted: true);
}
else if (_readResult.Buffer.Length == _inputLength)
{
_readResult = new ReadResult(_readResult.Buffer, _readResult.IsCanceled, isCompleted: true);
}

if (_readResult.IsCompleted)
{
TryStop();
}
}

public override void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
if (_inputLength == 0)
{
return;
}

var dataLength = _readResult.Buffer.Slice(_readResult.Buffer.Start, consumed).Length;

_inputLength -= dataLength;

_context.Input.AdvanceTo(consumed, examined);

OnDataRead(dataLength);
}

protected override void OnReadStarting()
{
if (_contentLength > _context.MaxRequestBodySize)
{
BadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTooLarge);
}
}

public override void Complete(Exception exception)
{
_context.ReportApplicationError(exception);
_completed = true;
}

public override void OnWriterCompleted(Action<Exception, object> callback, object state)
{
// TODO make this work with ContentLength.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File an issue for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will once I merge this PR.

}

public override void CancelPendingRead()
{
Interlocked.Exchange(ref _userCanceled, 1);
_context.Input.CancelPendingRead();
}

protected override Task OnStopAsync()
{
Complete(null);
return Task.CompletedTask;
}
}
}
Loading