Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileStream rewrite: Use IValueTaskSource instead of TaskCompletionSource #50802

Merged
merged 19 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
bbc28e9
Remove IFileStreamCompletionStrategy. This makes FileStreamCompletion…
carlossanlop Apr 3, 2021
7449c79
Use AwaitableProvider for AsyncWindowsFileStreamStrategy.
carlossanlop Apr 3, 2021
4ee6d26
No need for ManualResetValueTaskSource class. Remove it to get rid of…
carlossanlop Apr 6, 2021
c586540
Bring back ReadAsync/WriteAsync code to the strategy for simpler code…
carlossanlop Apr 6, 2021
84b2d30
FileStreamCompletionSource can now be nested in Net5CompatFileStreamS…
carlossanlop Apr 6, 2021
3f842c4
Remove duplicate error definitions, use centralized Interop.Errors.
carlossanlop Apr 6, 2021
302e2fd
Move shared mask values to FileStreamHelpers to avoid duplication.
carlossanlop Apr 6, 2021
f0af5d7
Use Interop.Errors also in SyncWindowsFileStreamStrategy.
carlossanlop Apr 6, 2021
47cb553
Move misplaced comment, move method location.
carlossanlop Apr 6, 2021
dc2fb7e
Slightly better comment.
carlossanlop Apr 6, 2021
21fe9af
Rename FileStreamAwaitableProvider to FileStreamValueTaskSource to ke…
carlossanlop Apr 6, 2021
877e28d
Bring back the raw pointer intOverlapped.
carlossanlop Apr 6, 2021
48a50b3
Rename MemoryAwaitableProvider to MemoryFileStreamValueTaskSource.
carlossanlop Apr 7, 2021
e61732b
Remove numBufferedBytes and avoid unnecessary allocation in Read(byte…
carlossanlop Apr 7, 2021
ba3e23e
Rename files of nested types.
carlossanlop Apr 7, 2021
6595f14
Nested async result codes in static class.
carlossanlop Apr 8, 2021
32bdb7b
Address feedback.
carlossanlop Apr 8, 2021
b05dab9
Address suggestions
carlossanlop Apr 9, 2021
d5344be
Bring back RunContinuationAsynchronously=true
carlossanlop Apr 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1100,10 +1100,10 @@
<Compile Include="$(CommonPath)Interop\Interop.ResultCode.cs">
<Link>Common\Interop\Interop.ResultCode.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Interop.TimeZoneDisplayNameType.cs" Condition="'$(TargetsBrowser)' != 'true'" >
<Compile Include="$(CommonPath)Interop\Interop.TimeZoneDisplayNameType.cs" Condition="'$(TargetsBrowser)' != 'true'">
<Link>Common\Interop\Interop.TimeZoneDisplayNameType.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Interop.TimeZoneInfo.cs" Condition="'$(TargetsBrowser)' != 'true'" >
<Compile Include="$(CommonPath)Interop\Interop.TimeZoneInfo.cs" Condition="'$(TargetsBrowser)' != 'true'">
<Link>Common\Interop\Interop.TimeZoneInfo.cs</Link>
</Compile>
<Compile Include="$(CommonPath)Interop\Interop.Utils.cs">
Expand Down Expand Up @@ -1652,8 +1652,9 @@
<Compile Include="$(MSBuildThisFileDirectory)System\IO\PathHelper.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\PathInternal.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\AsyncWindowsFileStreamStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\AsyncWindowsFileStreamStrategy.ValueTaskSource.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\FileStreamHelpers.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\FileStreamCompletionSource.Win32.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.CompletionSource.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\Net5CompatFileStreamStrategy.Windows.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\SyncWindowsFileStreamStrategy.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\IO\Strategies\WindowsFileStreamStrategy.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks.Sources;
using TaskSourceCodes = System.IO.Strategies.FileStreamHelpers.TaskSourceCodes;

namespace System.IO.Strategies
{
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
{
/// <summary>
/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync.
/// </summary>
private unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
jozkee marked this conversation as resolved.
Show resolved Hide resolved
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

private readonly AsyncWindowsFileStreamStrategy _strategy;

private ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
private long _result; // Using long since this needs to be used in Interlocked APIs
#if DEBUG
private bool _cancellationHasBeenRegistered;
#endif

public static ValueTaskSource Create(
AsyncWindowsFileStreamStrategy strategy,
PreAllocatedOverlapped? preallocatedOverlapped,
ReadOnlyMemory<byte> memory)
{
// If the memory passed in is the strategy's internal buffer, we can use the base AwaitableProvider,
// which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived
// MemoryAwaitableProvider, which Retains the memory, which will result in less pinning in the case
// where the underlying memory is backed by pre-pinned buffers.
return preallocatedOverlapped != null &&
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> buffer) &&
preallocatedOverlapped.IsUserObject(buffer.Array) ?
new ValueTaskSource(strategy, preallocatedOverlapped, buffer.Array) :
new MemoryValueTaskSource(strategy, memory);
}

protected ValueTaskSource(
AsyncWindowsFileStreamStrategy strategy,
PreAllocatedOverlapped? preallocatedOverlapped,
byte[]? bytes)
{
_strategy = strategy;
_result = TaskSourceCodes.NoResult;

_source = default;
_source.RunContinuationsAsynchronously = true;

_overlapped = bytes != null &&
_strategy.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(preallocatedOverlapped!) : // allocated when buffer was created, and buffer is non-null
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(s_ioCallback, this, bytes);

Debug.Assert(_overlapped != null, "AllocateNativeOverlapped returned null");
}

internal NativeOverlapped* Overlapped => _overlapped;
public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _source.OnCompleted(continuation, state, token, flags);
void IValueTaskSource.GetResult(short token) => _source.GetResult(token);
int IValueTaskSource<int>.GetResult(short token) => _source.GetResult(token);
internal short Version => _source.Version;

internal void RegisterForCancellation(CancellationToken cancellationToken)
{
#if DEBUG
Debug.Assert(cancellationToken.CanBeCanceled);
Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
_cancellationHasBeenRegistered = true;
#endif

// Quick check to make sure the IO hasn't completed
if (_overlapped != null)
{
// Register the cancellation only if the IO hasn't completed
long packedResult = Interlocked.CompareExchange(ref _result, TaskSourceCodes.RegisteringCancellation, TaskSourceCodes.NoResult);
if (packedResult == TaskSourceCodes.NoResult)
{
_cancellationRegistration = cancellationToken.UnsafeRegister((s, token) => Cancel(token), this);

// Switch the result, just in case IO completed while we were setting the registration
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
}
else if (packedResult != TaskSourceCodes.CompletedCallback)
{
// Failed to set the result, IO is in the process of completing
// Attempt to take the packed result
packedResult = Interlocked.Exchange(ref _result, TaskSourceCodes.NoResult);
}

// If we have a callback that needs to be completed
if ((packedResult != TaskSourceCodes.NoResult) && (packedResult != TaskSourceCodes.CompletedCallback) && (packedResult != TaskSourceCodes.RegisteringCancellation))
{
CompleteCallback((ulong)packedResult);
}
}
}

internal virtual void ReleaseNativeResource()
{
// Ensure that cancellation has been completed and cleaned up.
_cancellationRegistration.Dispose();

// Free the overlapped.
// NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
// (this is why we disposed the registration above).
if (_overlapped != null)
{
_strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped);
_overlapped = null;
}

// Ensure we're no longer set as the current AwaitableProvider (we may not have been to begin with).
// Only one operation at a time is eligible to use the preallocated overlapped
_strategy.CompareExchangeCurrentOverlappedOwner(null, this);
carlossanlop marked this conversation as resolved.
Show resolved Hide resolved
}

private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
// Extract the AwaitableProvider from the overlapped. The state in the overlapped
// will either be a AsyncWindowsFileStreamStrategy (in the case where the preallocated overlapped was used),
// in which case the operation being completed is its _currentOverlappedOwner, or it'll
// be directly the AwaitableProvider that's completing (in the case where the preallocated
// overlapped was already in use by another operation).
object? state = ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(state is AsyncWindowsFileStreamStrategy or ValueTaskSource);
ValueTaskSource valueTaskSource = state switch
{
AsyncWindowsFileStreamStrategy strategy => strategy._currentOverlappedOwner!, // must be owned
_ => (ValueTaskSource)state
};
Debug.Assert(valueTaskSource != null);
Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match");

// Handle reading from & writing to closed pipes. While I'm not sure
// this is entirely necessary anymore, maybe it's possible for
// an async read on a pipe to be issued and then the pipe is closed,
// returning this error. This may very well be necessary.
ulong packedResult;
if (errorCode != 0 && errorCode != Interop.Errors.ERROR_BROKEN_PIPE && errorCode != Interop.Errors.ERROR_NO_DATA)
{
packedResult = ((ulong)TaskSourceCodes.ResultError | errorCode);
}
else
{
packedResult = ((ulong)TaskSourceCodes.ResultSuccess | numBytes);
}

// Stow the result so that other threads can observe it
// And, if no other thread is registering cancellation, continue
if (Interlocked.Exchange(ref valueTaskSource._result, (long)packedResult) == TaskSourceCodes.NoResult)
{
// Successfully set the state, attempt to take back the callback
if (Interlocked.Exchange(ref valueTaskSource._result, TaskSourceCodes.CompletedCallback) != TaskSourceCodes.NoResult)
{
// Successfully got the callback, finish the callback
valueTaskSource.CompleteCallback(packedResult);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're copying this logic from what was there before, but it seems to be unnecessarily complex, with lots of interlocked operations for transitioning from one state to another to another. If we're looking for places to reduce overheads, revisiting this whole implementation is likely a good place to start.

// else: Some other thread stole the result, so now it is responsible to finish the callback
}
// else: Some other thread is registering a cancellation, so it *must* finish the callback
}

private void CompleteCallback(ulong packedResult)
{
CancellationToken cancellationToken = _cancellationRegistration.Token;

ReleaseNativeResource();

// Unpack the result and send it to the user
long result = (long)(packedResult & TaskSourceCodes.ResultMask);
if (result == TaskSourceCodes.ResultError)
{
int errorCode = unchecked((int)(packedResult & uint.MaxValue));
Exception e;
if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
{
CancellationToken ct = cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(canceled: true);
e = new OperationCanceledException(ct);
}
else
{
e = Win32Marshal.GetExceptionForWin32Error(errorCode);
}
e.SetCurrentStackTrace();
_source.SetException(e);
}
else
{
Debug.Assert(result == TaskSourceCodes.ResultSuccess, "Unknown result");
_source.SetResult((int)(packedResult & uint.MaxValue));
}
}

private void Cancel(CancellationToken token)
{
// WARNING: This may potentially be called under a lock (during cancellation registration)
Debug.Assert(_overlapped != null && GetStatus(Version) != ValueTaskSourceStatus.Succeeded, "IO should not have completed yet");

// If the handle is still valid, attempt to cancel the IO
if (!_strategy._fileHandle.IsInvalid &&
!Interop.Kernel32.CancelIoEx(_strategy._fileHandle, _overlapped))
{
int errorCode = Marshal.GetLastWin32Error();

// ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel.
// This probably means that the IO operation has completed.
if (errorCode != Interop.Errors.ERROR_NOT_FOUND)
{
Exception e = new OperationCanceledException(SR.OperationCanceled, Win32Marshal.GetExceptionForWin32Error(errorCode), token);
e.SetCurrentStackTrace();
_source.SetException(e);
}
}
}
}

/// <summary>
/// Extends <see cref="ValueTaskSource"/> with to support disposing of a
/// <see cref="MemoryHandle"/> when the operation has completed. This should only be used
/// when memory doesn't wrap a byte[].
/// </summary>
private sealed class MemoryValueTaskSource : ValueTaskSource
{
private MemoryHandle _handle; // mutable struct; do not make this readonly

// this type handles the pinning, so bytes are null
internal unsafe MemoryValueTaskSource(AsyncWindowsFileStreamStrategy strategy, ReadOnlyMemory<byte> memory)
: base(strategy, null, null) // this type handles the pinning, so null is passed for bytes to the base
{
_handle = memory.Pin();
}

internal override void ReleaseNativeResource()
{
_handle.Dispose();
base.ReleaseNativeResource();
}
}
}
}
Loading