Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose response PipeWriter in Kestrel #7110

Merged
merged 33 commits into from
Feb 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9cefd66
Expose response PipeWriter in Kestrel
jkotalik Jan 18, 2019
0e1301b
small cleanup
jkotalik Jan 30, 2019
53af11d
Make tests pass
jkotalik Jan 30, 2019
b3c101e
Making WriteAsync use GetMemory
jkotalik Jan 30, 2019
9324538
nit
jkotalik Jan 30, 2019
89284d6
update response writing extensions
jkotalik Feb 4, 2019
d75c9e4
Add inner stream/pipe to all adapters
jkotalik Feb 4, 2019
9781e9e
Clarify chunking code
jkotalik Feb 4, 2019
eae4448
Implement Complete()
jkotalik Feb 4, 2019
7b0db9e
Use bool for checking chunked status
jkotalik Feb 4, 2019
df0412c
Handle wrapping logic for dispose
jkotalik Feb 5, 2019
ba59c4c
Make HttpResponseWriteAsync good and react in tests. TODO many many t…
jkotalik Feb 5, 2019
258485e
Pass in memory pool to streampipewriter
jkotalik Feb 5, 2019
1e3c50a
Improve WriteOnlyPipeStream and small nit
jkotalik Feb 6, 2019
0dc4803
Attempted fix for BodyWrapper stream and check nonresponsebody writes…
jkotalik Feb 6, 2019
e308e5f
Fix a few tests for Advance scenarios, disable a few response compres…
jkotalik Feb 6, 2019
532c63d
Add tests for response writing with encoding
jkotalik Feb 6, 2019
2b22796
Shim StartAsync
jkotalik Feb 6, 2019
5b213c2
Add some comments
jkotalik Feb 6, 2019
7d9647a
Avoid throwing in writes as much as possible and general cleanup
jkotalik Feb 6, 2019
7d821b4
Trying to figure out what to do with pipeWriter.Complete for Http2.
jkotalik Feb 6, 2019
a2796fc
Don't actually write data
jkotalik Feb 7, 2019
8714576
Some feedback
jkotalik Feb 7, 2019
c1c0247
Offline feedback
jkotalik Feb 7, 2019
1ee90a8
Renames
jkotalik Feb 8, 2019
177d34d
IsCompletedSuccessfully
jkotalik Feb 8, 2019
1a7b2c2
Feedback
jkotalik Feb 8, 2019
67023fa
Only dispose the stream pipe wrapper and more tests
jkotalik Feb 8, 2019
45f3739
Use kestrel's minimum segment size
jkotalik Feb 8, 2019
4d16db0
Bad rebase
jkotalik Feb 8, 2019
db79af5
Feedback
jkotalik Feb 8, 2019
31a6dab
whoops
jkotalik Feb 8, 2019
312f42a
Fix test
jkotalik Feb 8, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -60,8 +61,73 @@ public static class HttpResponseWritingExtensions
throw new ArgumentNullException(nameof(encoding));
}

byte[] data = encoding.GetBytes(text);
return response.Body.WriteAsync(data, 0, data.Length, cancellationToken);
// Need to call StartAsync before GetMemory/GetSpan
if (!response.HasStarted)
{
var startAsyncTask = response.StartAsync(cancellationToken);
if (!startAsyncTask.IsCompletedSuccessfully)
{
return StartAndWriteAsyncAwaited(response, text, encoding, cancellationToken, startAsyncTask);
}
}

Write(response, text, encoding);

var flushAsyncTask = response.BodyPipe.FlushAsync(cancellationToken);
if (flushAsyncTask.IsCompletedSuccessfully)
{
// Most implementations of ValueTask reset state in GetResult, so call it before returning a completed task.
flushAsyncTask.GetAwaiter().GetResult();
return Task.CompletedTask;
}

return flushAsyncTask.AsTask();
}

private static async Task StartAndWriteAsyncAwaited(this HttpResponse response, string text, Encoding encoding, CancellationToken cancellationToken, Task startAsyncTask)
{
await startAsyncTask;
Write(response, text, encoding);
await response.BodyPipe.FlushAsync(cancellationToken);
}

private static void Write(this HttpResponse response, string text, Encoding encoding)
{
var pipeWriter = response.BodyPipe;
var encodedLength = encoding.GetByteCount(text);
var destination = pipeWriter.GetSpan(encodedLength);

if (encodedLength <= destination.Length)
{
// Just call Encoding.GetBytes if everything will fit into a single segment.
var bytesWritten = encoding.GetBytes(text, destination);
pipeWriter.Advance(bytesWritten);
}
else
{
WriteMutliSegmentEncoded(pipeWriter, text, encoding, destination, encodedLength);
}
}

private static void WriteMutliSegmentEncoded(PipeWriter writer, string text, Encoding encoding, Span<byte> destination, int encodedLength)
{
var encoder = encoding.GetEncoder();
var source = text.AsSpan();
var completed = false;
var totalBytesUsed = 0;

// This may be a bug, but encoder.Convert returns completed = true for UTF7 too early.
// Therefore, we check encodedLength - totalBytesUsed too.
while (!completed || encodedLength - totalBytesUsed != 0)
{
encoder.Convert(source, destination, flush: source.Length == 0, out var charsUsed, out var bytesUsed, out completed);
totalBytesUsed += bytesUsed;

writer.Advance(bytesUsed);
source = source.Slice(charsUsed);

destination = writer.GetSpan(encodedLength - totalBytesUsed);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO;
using System.IO.Pipelines;
using System.IO.Pipelines.Tests;
using System.Text;
using System.Threading.Tasks;
using Xunit;

Expand All @@ -28,6 +32,65 @@ public async Task WritingText_MultipleWrites()
Assert.Equal(22, context.Response.Body.Length);
}

[Theory]
[MemberData(nameof(Encodings))]
public async Task WritingTextThatRequiresMultipleSegmentsWorks(Encoding encoding)
{
// Need to change the StreamPipeWriter with a capped MemoryPool
var memoryPool = new TestMemoryPool(maxBufferSize: 16);
var outputStream = new MemoryStream();
var streamPipeWriter = new StreamPipeWriter(outputStream, minimumSegmentSize: 0, memoryPool);

HttpContext context = new DefaultHttpContext();
context.Response.BodyPipe = streamPipeWriter;

var inputString = "昨日すき焼きを食べました";
var expected = encoding.GetBytes(inputString);
await context.Response.WriteAsync(inputString, encoding);

outputStream.Position = 0;
var actual = new byte[expected.Length];
jkotalik marked this conversation as resolved.
Show resolved Hide resolved
var length = outputStream.Read(actual);

var res1 = encoding.GetString(actual);
var res2 = encoding.GetString(expected);
Assert.Equal(expected.Length, length);
Assert.Equal(expected, actual);
streamPipeWriter.Complete();
}

[Theory]
[MemberData(nameof(Encodings))]
public async Task WritingTextWithPassedInEncodingWorks(Encoding encoding)
{
HttpContext context = CreateRequest();

var inputString = "昨日すき焼きを食べました";
var expected = encoding.GetBytes(inputString);
await context.Response.WriteAsync(inputString, encoding);

context.Response.Body.Position = 0;
var actual = new byte[expected.Length * 2];
var length = context.Response.Body.Read(actual);

var actualShortened = new byte[length];
Array.Copy(actual, actualShortened, length);

Assert.Equal(expected.Length, length);
Assert.Equal(expected, actualShortened);
}

public static TheoryData<Encoding> Encodings =>
new TheoryData<Encoding>
{
{ Encoding.ASCII },
{ Encoding.BigEndianUnicode },
{ Encoding.Unicode },
{ Encoding.UTF32 },
{ Encoding.UTF7 },
{ Encoding.UTF8 }
};

private HttpContext CreateRequest()
{
HttpContext context = new DefaultHttpContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
<TargetFramework>netcoreapp3.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\Http\test\Microsoft.AspNetCore.Http.Tests.csproj" />
</ItemGroup>

<ItemGroup>
<Reference Include="Microsoft.AspNetCore.Http" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Http/Http.Features/src/FeatureReferences.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ private TFeature UpdateCached<TFeature, TState>(ref TFeature cached, TState stat
public TFeature Fetch<TFeature>(ref TFeature cached, Func<IFeatureCollection, TFeature> factory)
where TFeature : class => Fetch(ref cached, Collection, factory);
}
}
}
2 changes: 1 addition & 1 deletion src/Http/Http/src/Internal/DefaultHttpResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
return HttpResponseFeature.Body.FlushAsync(cancellationToken);
}

return HttpResponseStartFeature.StartAsync();
return HttpResponseStartFeature.StartAsync(cancellationToken);
}

struct FeatureInterfaces
Expand Down
13 changes: 7 additions & 6 deletions src/Http/Http/src/ReadOnlyPipeStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace System.IO.Pipelines
/// </summary>
public class ReadOnlyPipeStream : Stream
{
private readonly PipeReader _pipeReader;
private bool _allowSynchronousIO = true;

/// <summary>
Expand All @@ -33,7 +32,7 @@ public ReadOnlyPipeStream(PipeReader pipeReader) :
public ReadOnlyPipeStream(PipeReader pipeReader, bool allowSynchronousIO)
{
_allowSynchronousIO = allowSynchronousIO;
_pipeReader = pipeReader;
InnerPipeReader = pipeReader;
}

/// <inheritdoc />
Expand Down Expand Up @@ -62,6 +61,8 @@ public override int WriteTimeout
set => throw new NotSupportedException();
}

public PipeReader InnerPipeReader { get; }

/// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
Expand Down Expand Up @@ -160,7 +161,7 @@ private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, Cancellation
{
while (true)
{
var result = await _pipeReader.ReadAsync(cancellationToken);
var result = await InnerPipeReader.ReadAsync(cancellationToken);
var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;

Expand All @@ -186,7 +187,7 @@ private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, Cancellation
}
finally
{
_pipeReader.AdvanceTo(consumed);
InnerPipeReader.AdvanceTo(consumed);
}
}
}
Expand All @@ -211,7 +212,7 @@ private async Task CopyToAsyncInternal(Stream destination, CancellationToken can
{
while (true)
{
var result = await _pipeReader.ReadAsync(cancellationToken);
var result = await InnerPipeReader.ReadAsync(cancellationToken);
var readableBuffer = result.Buffer;
var readableBufferLength = readableBuffer.Length;

Expand All @@ -232,7 +233,7 @@ private async Task CopyToAsyncInternal(Stream destination, CancellationToken can
}
finally
{
_pipeReader.AdvanceTo(readableBuffer.End);
InnerPipeReader.AdvanceTo(readableBuffer.End);
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/Http/Http/src/StreamPipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public class StreamPipeReader : PipeReader, IDisposable
{
private readonly int _minimumSegmentSize;
private readonly int _minimumReadThreshold;
private readonly Stream _readingStream;
private readonly MemoryPool<byte> _pool;

private CancellationTokenSource _internalTokenSource;
Expand All @@ -42,15 +41,14 @@ public StreamPipeReader(Stream readingStream)
{
}


/// <summary>
/// Creates a new StreamPipeReader.
/// </summary>
/// <param name="readingStream">The stream to read from.</param>
/// <param name="options">The options to use.</param>
public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
{
_readingStream = readingStream ?? throw new ArgumentNullException(nameof(readingStream));
InnerStream = readingStream ?? throw new ArgumentNullException(nameof(readingStream));

if (options == null)
{
Expand All @@ -70,7 +68,7 @@ public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
/// <summary>
/// Gets the inner stream that is being read from.
/// </summary>
public Stream InnerStream => _readingStream;
public Stream InnerStream { get; }

/// <inheritdoc />
public override void AdvanceTo(SequencePosition consumed)
Expand Down Expand Up @@ -235,7 +233,7 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
{
AllocateReadTail();
#if NETCOREAPP3_0
var length = await _readingStream.ReadAsync(_readTail.AvailableMemory.Slice(_readTail.End), tokenSource.Token);
var length = await InnerStream.ReadAsync(_readTail.AvailableMemory.Slice(_readTail.End), tokenSource.Token);
#elif NETSTANDARD2_0
jkotalik marked this conversation as resolved.
Show resolved Hide resolved
if (!MemoryMarshal.TryGetArray<byte>(_readTail.AvailableMemory.Slice(_readTail.End), out var arraySegment))
{
Expand Down
15 changes: 7 additions & 8 deletions src/Http/Http/src/StreamPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace System.IO.Pipelines
public class StreamPipeWriter : PipeWriter, IDisposable
{
private readonly int _minimumSegmentSize;
private readonly Stream _writingStream;
private int _bytesWritten;

private List<CompletedBuffer> _completedSegments;
Expand Down Expand Up @@ -53,14 +52,14 @@ public StreamPipeWriter(Stream writingStream) : this(writingStream, 4096)
public StreamPipeWriter(Stream writingStream, int minimumSegmentSize, MemoryPool<byte> pool = null)
{
_minimumSegmentSize = minimumSegmentSize;
_writingStream = writingStream;
InnerStream = writingStream;
_pool = pool ?? MemoryPool<byte>.Shared;
}

/// <summary>
/// Gets the inner stream that is being written to.
/// </summary>
public Stream InnerStream => _writingStream;
public Stream InnerStream { get; }

/// <inheritdoc />
public override void Advance(int count)
Expand Down Expand Up @@ -180,10 +179,10 @@ private async ValueTask<FlushResult> FlushAsyncInternal(CancellationToken cancel
{
var segment = _completedSegments[0];
#if NETCOREAPP3_0
await _writingStream.WriteAsync(segment.Buffer.Slice(0, segment.Length), localToken);
await InnerStream.WriteAsync(segment.Buffer.Slice(0, segment.Length), localToken);
#elif NETSTANDARD2_0
MemoryMarshal.TryGetArray<byte>(segment.Buffer, out var arraySegment);
await _writingStream.WriteAsync(arraySegment.Array, 0, segment.Length, localToken);
await InnerStream.WriteAsync(arraySegment.Array, 0, segment.Length, localToken);
#else
#error Target frameworks need to be updated.
#endif
Expand All @@ -196,18 +195,18 @@ private async ValueTask<FlushResult> FlushAsyncInternal(CancellationToken cancel
if (!_currentSegment.IsEmpty)
{
#if NETCOREAPP3_0
await _writingStream.WriteAsync(_currentSegment.Slice(0, _position), localToken);
await InnerStream.WriteAsync(_currentSegment.Slice(0, _position), localToken);
#elif NETSTANDARD2_0
MemoryMarshal.TryGetArray<byte>(_currentSegment, out var arraySegment);
await _writingStream.WriteAsync(arraySegment.Array, 0, _position, localToken);
await InnerStream.WriteAsync(arraySegment.Array, 0, _position, localToken);
#else
#error Target frameworks need to be updated.
#endif
_bytesWritten -= _position;
_position = 0;
}

await _writingStream.FlushAsync(localToken);
await InnerStream.FlushAsync(localToken);

return new FlushResult(isCanceled: false, _isCompleted);
}
Expand Down
Loading