Skip to content

Commit

Permalink
Remove pass thru behavior
Browse files Browse the repository at this point in the history
* Require calling CopyTo to copy content to response stream
* Use PagedByteBuffer to buffer content in memory
  • Loading branch information
pranavkm committed Apr 16, 2019
1 parent 692569d commit fa081f6
Show file tree
Hide file tree
Showing 14 changed files with 872 additions and 359 deletions.
178 changes: 85 additions & 93 deletions src/Http/WebUtilities/src/FileBufferingWriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -19,30 +20,23 @@ namespace Microsoft.AspNetCore.WebUtilities
/// a temporary file on disk.
/// </para>
/// <para>
/// The <see cref="FileBufferingWriteStream"/> performs opportunistic writes to the wrapping stream
/// when asychronous operation such as <see cref="WriteAsync(byte[], int, int, CancellationToken)"/> or <see cref="FlushAsync(CancellationToken)"/>
/// are performed.
/// Consumers of this API can invoke <see cref="M:Stream.CopyToAsync" /> to copy the results to the HTTP Response Stream.
/// </para>
/// </summary>
public sealed class FileBufferingWriteStream : Stream
{
private const int MaxRentedBufferSize = 1024 * 1024; // 1MB
private const int DefaultMemoryThreshold = 30 * 1024; // 30k
private const int DefaultMemoryThreshold = 32 * 1024; // 32k

private readonly Stream _writeStream;
private readonly int _memoryThreshold;
private readonly long? _bufferLimit;
private readonly Func<string> _tempFileDirectoryAccessor;
private readonly ArrayPool<byte> _bytePool;
private readonly byte[] _rentedBuffer;

/// <summary>
/// Initializes a new instance of <see cref="FileBufferingWriteStream"/>.
/// </summary>
/// <param name="writeStream">The <see cref="Stream"/> to write buffered contents to.</param>
/// <param name="memoryThreshold">
/// The maximum amount of memory in bytes to allocate before switching to a file on disk.
/// Defaults to 30kb.
/// Defaults to 32kb.
/// </param>
/// <param name="bufferLimit">
/// The maximum amouont of bytes that the <see cref="FileBufferingWriteStream"/> is allowed to buffer.
Expand All @@ -52,24 +46,10 @@ public sealed class FileBufferingWriteStream : Stream
/// uses the value returned by <see cref="Path.GetTempPath"/>.
/// </param>
public FileBufferingWriteStream(
Stream writeStream,
int memoryThreshold = DefaultMemoryThreshold,
long? bufferLimit = null,
Func<string> tempFileDirectoryAccessor = null)
: this(writeStream, memoryThreshold, bufferLimit, tempFileDirectoryAccessor, ArrayPool<byte>.Shared)
{

}

internal FileBufferingWriteStream(
Stream writeStream,
int memoryThreshold,
long? bufferLimit,
Func<string> tempFileDirectoryAccessor,
ArrayPool<byte> bytePool)
{
_writeStream = writeStream ?? throw new ArgumentNullException(nameof(writeStream));

if (memoryThreshold < 0)
{
throw new ArgumentOutOfRangeException(nameof(memoryThreshold));
Expand All @@ -84,18 +64,7 @@ internal FileBufferingWriteStream(
_memoryThreshold = memoryThreshold;
_bufferLimit = bufferLimit;
_tempFileDirectoryAccessor = tempFileDirectoryAccessor ?? AspNetCoreTempDirectory.TempDirectoryFactory;
_bytePool = bytePool;

if (memoryThreshold < MaxRentedBufferSize)
{
_rentedBuffer = bytePool.Rent(memoryThreshold);
MemoryStream = new MemoryStream(_rentedBuffer);
MemoryStream.SetLength(0);
}
else
{
MemoryStream = new MemoryStream();
}
PagedByteBuffer = new PagedByteBuffer(ArrayPool<byte>.Shared);
}

/// <inheritdoc />
Expand All @@ -117,9 +86,9 @@ public override long Position
set => throw new NotSupportedException();
}

internal long BufferedLength => MemoryStream.Length + (FileStream?.Length ?? 0);
internal long BufferedLength => PagedByteBuffer.Length + (FileStream?.Length ?? 0);

internal MemoryStream MemoryStream { get; }
internal PagedByteBuffer PagedByteBuffer { get; }

internal FileStream FileStream { get; private set; }

Expand All @@ -144,22 +113,27 @@ public override void Write(byte[] buffer, int offset, int count)

if (_bufferLimit.HasValue && _bufferLimit - BufferedLength < count)
{
DiposeInternal();
Dispose();
throw new IOException("Buffer limit exceeded.");
}

var availableMemory = _memoryThreshold - MemoryStream.Position;
if (count <= availableMemory)
// Allow buffering in memory if we're below the memory threshold once the current buffer is written.
var allowMemoryBuffer = (_memoryThreshold - count) >= PagedByteBuffer.Length;
if (allowMemoryBuffer)
{
// Buffer content in the MemoryStream if it has capacity.
MemoryStream.Write(buffer, offset, count);
PagedByteBuffer.Add(buffer, offset, count);
Debug.Assert(PagedByteBuffer.Length <= _memoryThreshold);
}
else
{
// If the MemoryStream is incapable of accomodating the content to be written
// spool to disk.
EnsureFileStream();
CopyContent(MemoryStream, FileStream);

// Spool memory content to disk and clear in memory buffers. We no longer need to hold on to it
PagedByteBuffer.CopyTo(FileStream, clearBuffers: true);

FileStream.Write(buffer, offset, count);
}
}
Expand All @@ -170,36 +144,86 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count, Canc
ThrowArgumentException(buffer, offset, count);
ThrowIfDisposed();

// If we have the opportunity to go async, write the buffered content to the response.
await FlushAsync(cancellationToken);
await _writeStream.WriteAsync(buffer, offset, count, cancellationToken);
if (_bufferLimit.HasValue && _bufferLimit - BufferedLength < count)
{
Dispose();
throw new IOException("Buffer limit exceeded.");
}

// Allow buffering in memory if we're below the memory threshold once the current buffer is written.
var allowMemoryBuffer = (_memoryThreshold - count) >= PagedByteBuffer.Length;
if (allowMemoryBuffer)
{
// Buffer content in the MemoryStream if it has capacity.
PagedByteBuffer.Add(buffer, offset, count);
Debug.Assert(PagedByteBuffer.Length <= _memoryThreshold);
}
else
{
// If the MemoryStream is incapable of accomodating the content to be written
// spool to disk.
EnsureFileStream();

// Spool memory content to disk and clear in memory buffers. We no longer need to hold on to it
await PagedByteBuffer.CopyToAsync(FileStream, clearBuffers: true, cancellationToken);

await FileStream.WriteAsync(buffer, offset, count, cancellationToken);
}
}

public override void Flush()
{
// Do nothing.
}

/// <inheritdoc />
public override void SetLength(long value) => throw new NotSupportedException();

/// <inheritdoc />
// In the ordinary case, we expect this to throw if the target is the HttpResponse Body
// and disallows synchronous writes. We do not need to optimize for this.
public override void Flush()
/// <summary>
/// Copies buffered content to <paramref name="destination"/>.
/// </summary>
/// <param name="destination">The <see cref="Stream" /> to copy to.</param>
/// <param name="bufferSize">The size of the buffer.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken" />.</param>
/// <returns>A <see cref="Task" /> that represents the asynchronous copy operation.</returns>
/// <remarks>
/// Users of this API do not need to reset the <see cref="Position" /> of this instance, prior to copying content.
/// </remarks>
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
// When not null, FileStream always has "older" spooled content. The PagedByteBuffer always has "newer"
// unspooled content. Copy the FileStream content first when available.
if (FileStream != null)
{
CopyContent(FileStream, _writeStream);
FileStream.Position = 0;
await FileStream.CopyToAsync(destination, bufferSize, cancellationToken);
}

CopyContent(MemoryStream, _writeStream);
// Copy memory content, but do not clear the buffers. We want multiple invocations of CopyTo \ CopyToAsync
// on this instance to behave the same.
await PagedByteBuffer.CopyToAsync(destination, clearBuffers: false, cancellationToken);
}

/// <inheritdoc />
public override async Task FlushAsync(CancellationToken cancellationToken)
/// <summary>
/// Copies buffered content to <paramref name="destination"/>.
/// </summary>
/// <param name="destination">The <see cref="Stream" /> to copy to.</param>
/// <param name="bufferSize">The size of the buffer.</param>
/// <remarks>
/// Users of this API do not need to reset the <see cref="Position" /> of this instance, prior to copying content.
/// </remarks>
public override void CopyTo(Stream destination, int bufferSize)
{
// See comments under CopyToAsync for an explanation for the order of execution.
if (FileStream != null)
{
await CopyContentAsync(FileStream, _writeStream, cancellationToken);
FileStream.Position = 0;
FileStream.CopyTo(destination, bufferSize);
}

await CopyContentAsync(MemoryStream, _writeStream, cancellationToken);
// Copy memory content, but do not clear the buffers. We want multiple invocations of CopyTo \ CopyToAsync
// on this instance to behave the same.
PagedByteBuffer.CopyTo(destination, clearBuffers: false);
}

/// <inheritdoc />
Expand All @@ -208,39 +232,21 @@ protected override void Dispose(bool disposing)
if (!Disposed)
{
Disposed = true;
Flush();

DiposeInternal();
PagedByteBuffer.Dispose();
FileStream?.Dispose();
}
}

private void DiposeInternal()
{
Disposed = true;
_bytePool.Return(_rentedBuffer);
MemoryStream.Dispose();
FileStream?.Dispose();
}

/// <inheritdoc />
public override async ValueTask DisposeAsync()
{
if (!Disposed)
{
Disposed = true;
try
{
await FlushAsync();
}
finally
{
if (_rentedBuffer != null)
{
_bytePool.Return(_rentedBuffer);
}
await MemoryStream.DisposeAsync();
await (FileStream?.DisposeAsync() ?? default);
}

PagedByteBuffer.Dispose();
await (FileStream?.DisposeAsync() ?? default);
}
}

Expand Down Expand Up @@ -290,19 +296,5 @@ private static void ThrowArgumentException(byte[] buffer, int offset, int count)
throw new ArgumentOutOfRangeException(nameof(offset));
}
}

private static void CopyContent(Stream source, Stream destination)
{
source.Position = 0;
source.CopyTo(destination);
source.SetLength(0);
}

private static async Task CopyContentAsync(Stream source, Stream destination, CancellationToken cancellationToken)
{
source.Position = 0;
await source.CopyToAsync(destination, cancellationToken);
source.SetLength(0);
}
}
}
Loading

0 comments on commit fa081f6

Please sign in to comment.