Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use IValueTaskSource in PipeStream on Windows #52695

Merged
merged 2 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/libraries/System.IO.Pipes/src/System.IO.Pipes.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,16 @@
<Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Windows.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStreamAcl.cs" />
<Compile Include="System\IO\Pipes\AnonymousPipeServerStream.Windows.cs" />
<Compile Include="System\IO\Pipes\ConnectionCompletionSource.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStreamAcl.cs" />
<Compile Include="System\IO\Pipes\NamedPipeClientStream.Windows.cs" />
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Windows.cs" />
<Compile Include="System\IO\Pipes\PipeAccessRights.cs" />
<Compile Include="System\IO\Pipes\PipeAccessRule.cs" />
<Compile Include="System\IO\Pipes\PipeAuditRule.cs" />
<Compile Include="System\IO\Pipes\PipeCompletionSource.cs" />
<Compile Include="System\IO\Pipes\PipesAclExtensions.cs" />
<Compile Include="System\IO\Pipes\PipeSecurity.cs" />
<Compile Include="System\IO\Pipes\PipeStream.ValueTaskSource.cs" />
<Compile Include="System\IO\Pipes\PipeStream.Windows.cs" />
<Compile Include="System\IO\Pipes\ReadWriteCompletionSource.cs" />
</ItemGroup>
<!-- Windows : Win32 only -->
<ItemGroup Condition="'$(TargetsWindows)' == 'true'">
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Security.AccessControl;
using System.Security.Principal;
Expand All @@ -18,6 +17,8 @@ namespace System.IO.Pipes
/// </summary>
public sealed partial class NamedPipeServerStream : PipeStream
{
private ConnectionValueTaskSource? _reusableConnectionValueTaskSource; // reusable ConnectionValueTaskSource that is currently NOT being used

internal NamedPipeServerStream(
string pipeName,
PipeDirection direction,
Expand All @@ -41,6 +42,31 @@ internal NamedPipeServerStream(
Create(pipeName, direction, maxNumberOfServerInstances, transmissionMode, options, inBufferSize, outBufferSize, pipeSecurity, inheritability, additionalAccessRights);
}

protected override void Dispose(bool disposing)
{
try
{
Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null)?.Dispose();
}
finally
{
base.Dispose(disposing);
}
}

internal override void TryToReuse(PipeValueTaskSource source)
{
base.TryToReuse(source);

if (source is ConnectionValueTaskSource connectionSource)
{
if (Interlocked.CompareExchange(ref _reusableConnectionValueTaskSource, connectionSource, null) is not null)
{
source._preallocatedOverlapped.Dispose();
}
}
}

private void Create(string pipeName, PipeDirection direction, int maxNumberOfServerInstances,
PipeTransmissionMode transmissionMode, PipeOptions options, int inBufferSize, int outBufferSize,
HandleInheritability inheritability)
Expand Down Expand Up @@ -140,7 +166,8 @@ public void WaitForConnection()

if (IsAsync)
{
WaitForConnectionCoreAsync(CancellationToken.None).GetAwaiter().GetResult();
ValueTask vt = WaitForConnectionCoreAsync(CancellationToken.None);
vt.AsTask().GetAwaiter().GetResult();
}
else
{
Expand Down Expand Up @@ -180,7 +207,7 @@ public Task WaitForConnectionAsync(CancellationToken cancellationToken)
this, cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}

return WaitForConnectionCoreAsync(cancellationToken);
return WaitForConnectionCoreAsync(cancellationToken).AsTask();
}

public void Disconnect()
Expand Down Expand Up @@ -293,50 +320,52 @@ internal ExecuteHelper(PipeStreamImpersonationWorker userCode, SafePipeHandle? h
}

// Async version of WaitForConnection. See the comments above for more info.
private unsafe Task WaitForConnectionCoreAsync(CancellationToken cancellationToken)
private unsafe ValueTask WaitForConnectionCoreAsync(CancellationToken cancellationToken)
{
CheckConnectOperationsServerWithHandle();
Debug.Assert(IsAsync);

if (!IsAsync)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeNotAsync);
}

var completionSource = new ConnectionCompletionSource(this);

if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, completionSource.Overlapped))
ConnectionValueTaskSource? vts = Interlocked.Exchange(ref _reusableConnectionValueTaskSource, null) ?? new ConnectionValueTaskSource(this);
try
{
int errorCode = Marshal.GetLastPInvokeError();

switch (errorCode)
vts.PrepareForOperation();
if (!Interop.Kernel32.ConnectNamedPipe(InternalHandle!, vts._overlapped))
{
case Interop.Errors.ERROR_IO_PENDING:
break;

// If we are here then the pipe is already connected, or there was an error
// so we should unpin and free the overlapped.
case Interop.Errors.ERROR_PIPE_CONNECTED:
// IOCompletitionCallback will not be called because we completed synchronously.
completionSource.ReleaseResources();
if (State == PipeState.Connected)
{
throw new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected);
}
completionSource.SetCompletedSynchronously();

// We return a cached task instead of TaskCompletionSource's Task allowing the GC to collect it.
return Task.CompletedTask;

default:
completionSource.ReleaseResources();
throw Win32Marshal.GetExceptionForWin32Error(errorCode);
int errorCode = Marshal.GetLastPInvokeError();
switch (errorCode)
{
case Interop.Errors.ERROR_IO_PENDING:
// Common case: IO was initiated, completion will be handled by callback.
// Register for cancellation now that the operation has been initiated.
vts.RegisterForCancellation(cancellationToken);
break;

case Interop.Errors.ERROR_PIPE_CONNECTED:
// If we are here then the pipe is already connected.
// IOCompletitionCallback will not be called because we completed synchronously.
vts.Dispose();
if (State == PipeState.Connected)
{
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new InvalidOperationException(SR.InvalidOperation_PipeAlreadyConnected)));
}
State = PipeState.Connected;
return ValueTask.CompletedTask;

default:
vts.Dispose();
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(Win32Marshal.GetExceptionForWin32Error(errorCode)));
}
}
}
catch
{
vts.Dispose();
throw;
}

// If we are here then connection is pending.
completionSource.RegisterForCancellation(cancellationToken);

return completionSource.Task;
// Completion handled by callback.
vts.FinishedScheduling();
return new ValueTask(vts, vts.Version);
}

private void CheckConnectOperationsServerWithHandle()
Expand Down
Loading