Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SerialPort memory leak when requests time out and no data is received #68014

Merged
merged 4 commits into from
Aug 10, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 126 additions & 44 deletions src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.IO.Ports;
Expand Down Expand Up @@ -31,8 +31,15 @@ internal sealed partial class SerialStream : Stream
private byte[] _tempBuf = new byte[1];
private Task _ioLoop;
private object _ioLoopLock = new object();
private ConcurrentQueue<SerialStreamIORequest> _readQueue = new ConcurrentQueue<SerialStreamIORequest>();
private ConcurrentQueue<SerialStreamIORequest> _writeQueue = new ConcurrentQueue<SerialStreamIORequest>();
private bool _hasCancelledTasksToProcess;
// Use a Queue with locking instead of ConcurrentQueue because ConcurrentQueue preserves segments for
// observation when using TryPeek(). These segments will not clear out references after a dequeue
// and as a result they hold on to SerialStreamIORequest instances so that they cannot be GC'ed.
// This in turn means that any buffers that the client supplied are not eligible for GC either.
private readonly Queue<SerialStreamIORequest> _readQueue = new();
private readonly object _readQueueLock = new();
private readonly Queue<SerialStreamIORequest> _writeQueue = new();
private readonly object _writeQueueLock = new();

private long _totalBytesRead;
private long TotalBytesAvailable => _totalBytesRead + BytesToRead;
Expand Down Expand Up @@ -353,6 +360,12 @@ internal byte ParityReplace
}
#pragma warning restore CA1822

private bool HasCancelledTasksToProcess
{
get => Volatile.Read(ref _hasCancelledTasksToProcess);
set => Volatile.Write(ref _hasCancelledTasksToProcess, value);
}

internal void DiscardInBuffer()
{
if (_handle == null) InternalResources.FileNotOpen();
Expand Down Expand Up @@ -386,7 +399,7 @@ public override void Flush()
if (_handle == null) InternalResources.FileNotOpen();

SpinWait sw = default;
while (!_writeQueue.IsEmpty)
while (!IsWriteQueueEmpty())
{
sw.SpinOnce();
}
Expand Down Expand Up @@ -433,8 +446,11 @@ public override Task<int> ReadAsync(byte[] array, int offset, int count, Cancell
return Task<int>.FromResult(0); // return immediately if no bytes requested; no need for overhead.

Memory<byte> buffer = new Memory<byte>(array, offset, count);
SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer);
_readQueue.Enqueue(result);
SerialStreamReadRequest result = new SerialStreamReadRequest(this, cancellationToken, buffer);
lock (_readQueueLock)
{
_readQueue.Enqueue(result);
}

EnsureIOLoopRunning();

Expand All @@ -449,8 +465,11 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
if (buffer.IsEmpty)
return new ValueTask<int>(0);

SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer);
_readQueue.Enqueue(result);
SerialStreamReadRequest result = new SerialStreamReadRequest(this, cancellationToken, buffer);
lock (_readQueueLock)
{
_readQueue.Enqueue(result);
}

EnsureIOLoopRunning();

Expand All @@ -466,8 +485,11 @@ public override Task WriteAsync(byte[] array, int offset, int count, Cancellatio
return Task.CompletedTask; // return immediately if no bytes to write; no need for overhead.

ReadOnlyMemory<byte> buffer = new ReadOnlyMemory<byte>(array, offset, count);
SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer);
_writeQueue.Enqueue(result);
SerialStreamWriteRequest result = new SerialStreamWriteRequest(this, cancellationToken, buffer);
lock (_writeQueueLock)
{
_writeQueue.Enqueue(result);
}

EnsureIOLoopRunning();

Expand All @@ -482,8 +504,11 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
if (buffer.IsEmpty)
return ValueTask.CompletedTask; // return immediately if no bytes to write; no need for overhead.

SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer);
_writeQueue.Enqueue(result);
SerialStreamWriteRequest result = new SerialStreamWriteRequest(this, cancellationToken, buffer);
lock (_writeQueueLock)
{
_writeQueue.Enqueue(result);
}

EnsureIOLoopRunning();

Expand Down Expand Up @@ -664,18 +689,24 @@ private void EnsureIOLoopRunning()

private void FinishPendingIORequests(Interop.ErrorInfo? error = null)
{
while (_readQueue.TryDequeue(out SerialStreamIORequest r))
lock (_readQueueLock)
{
r.Complete(error.HasValue ?
Interop.GetIOException(error.Value) :
InternalResources.FileNotOpenException());
while (_readQueue.TryDequeue(out SerialStreamIORequest r))
{
r.Complete(error.HasValue ?
Interop.GetIOException(error.Value) :
InternalResources.FileNotOpenException());
}
}

while (_writeQueue.TryDequeue(out SerialStreamIORequest r))
lock (_writeQueueLock)
{
r.Complete(error.HasValue ?
Interop.GetIOException(error.Value) :
InternalResources.FileNotOpenException());
while (_writeQueue.TryDequeue(out SerialStreamIORequest r))
{
r.Complete(error.HasValue ?
Interop.GetIOException(error.Value) :
InternalResources.FileNotOpenException());
}
}
}

Expand Down Expand Up @@ -812,30 +843,43 @@ private unsafe int ProcessWrite(SerialStreamIORequest r)
}

// returns number of bytes read/written
private static int DoIORequest(ConcurrentQueue<SerialStreamIORequest> q, RequestProcessor op)
private static int DoIORequest(Queue<SerialStreamIORequest> q, object queueLock, RequestProcessor op)
{
// assumes dequeue-ing happens on a single thread
while (q.TryPeek(out SerialStreamIORequest r))
while (TryPeekNextRequest(out SerialStreamIORequest r))
{
if (r.IsCompleted)
{
q.TryDequeue(out _);
// take another item since we haven't processed anything
continue;
}

int ret = op(r);
Debug.Assert(ret >= 0);

if (r.IsCompleted)
{
q.TryDequeue(out _);
lock (queueLock)
{
q.TryDequeue(out _);
}
}

return ret;
}

return 0;

bool TryPeekNextRequest(out SerialStreamIORequest r)
{
lock (queueLock)
{
while (q.TryPeek(out r))
{
if (!r.IsCompleted)
{
return true;
}
q.TryDequeue(out _);
}
}
r = default;
return false;
}
}

private void IOLoop()
Expand All @@ -852,8 +896,15 @@ private void IOLoop()

while (IsOpen && !eofReceived && !_ioLoopFinished)
{
bool hasPendingReads = !_readQueue.IsEmpty;
bool hasPendingWrites = !_writeQueue.IsEmpty;
if (HasCancelledTasksToProcess)
{
HasCancelledTasksToProcess = false;
RemoveCompletedTasks(_readQueue, _readQueueLock);
RemoveCompletedTasks(_writeQueue, _writeQueueLock);
}

bool hasPendingReads = !IsReadQueueEmpty();
bool hasPendingWrites = !IsWriteQueueEmpty();

bool hasPendingIO = hasPendingReads || hasPendingWrites;
bool isIdle = IsNoEventRegistered() && !hasPendingIO;
Expand All @@ -875,7 +926,7 @@ private void IOLoop()
lock (_ioLoopLock)
{
// double check we are done under lock
if (IsNoEventRegistered() && _readQueue.IsEmpty && _writeQueue.IsEmpty)
if (IsNoEventRegistered() && IsReadQueueEmpty() && IsWriteQueueEmpty())
{
_ioLoop = null;
break;
Expand Down Expand Up @@ -915,13 +966,13 @@ private void IOLoop()

if (events.HasFlag(Interop.PollEvents.POLLIN))
{
int bytesRead = DoIORequest(_readQueue, _processReadDelegate);
int bytesRead = DoIORequest(_readQueue, _readQueueLock, _processReadDelegate);
_totalBytesRead += bytesRead;
}

if (events.HasFlag(Interop.PollEvents.POLLOUT))
{
DoIORequest(_writeQueue, _processWriteDelegate);
DoIORequest(_writeQueue, _writeQueueLock, _processWriteDelegate);
}
}

Expand Down Expand Up @@ -961,6 +1012,32 @@ private void IOLoop()
}
}

private static void RemoveCompletedTasks(Queue<SerialStreamIORequest> queue, object queueLock)
krwq marked this conversation as resolved.
Show resolved Hide resolved
{
// assumes dequeue-ing happens on a single thread
lock (queueLock)
{
while (queue.TryPeek(out var r) && r.IsCompleted)
queue.TryDequeue(out _);
}
}

private bool IsReadQueueEmpty()
{
lock (_readQueueLock)
{
return _readQueue.Count == 0;
}
}

private bool IsWriteQueueEmpty()
{
lock (_writeQueueLock)
{
return _writeQueue.Count == 0;
}
}

private void NotifyPinChanges(Signals signals)
{
if (signals.HasFlag(Signals.SignalCts))
Expand Down Expand Up @@ -999,14 +1076,19 @@ private static Exception GetLastIOError()
private abstract class SerialStreamIORequest : TaskCompletionSource<int>
{
public bool IsCompleted => Task.IsCompleted;
private CancellationToken _cancellationToken;
private readonly SerialStream _parent;
private readonly CancellationTokenRegistration _cancellationTokenRegistration;

protected SerialStreamIORequest(CancellationToken ct)
protected SerialStreamIORequest(SerialStream parent, CancellationToken ct)
: base(TaskCreationOptions.RunContinuationsAsynchronously)
{
_cancellationToken = ct;
_cancellationTokenRegistration = ct.Register(s => ((TaskCompletionSource<int>)s).TrySetCanceled(), this);
_parent = parent;
_cancellationTokenRegistration = ct.Register(s =>
{
var request = (SerialStreamIORequest)s;
request.TrySetCanceled();
request._parent.HasCancelledTasksToProcess = true;
}, this);
}

internal void Complete(int numBytes)
Expand All @@ -1024,10 +1106,10 @@ internal void Complete(Exception exception)

private sealed class SerialStreamReadRequest : SerialStreamIORequest
{
public Memory<byte> Buffer { get; private set; }
public Memory<byte> Buffer { get; }

public SerialStreamReadRequest(CancellationToken ct, Memory<byte> buffer)
: base(ct)
public SerialStreamReadRequest(SerialStream parent, CancellationToken ct, Memory<byte> buffer)
: base(parent, ct)
{
Buffer = buffer;
}
Expand All @@ -1037,8 +1119,8 @@ private sealed class SerialStreamWriteRequest : SerialStreamIORequest
{
public ReadOnlyMemory<byte> Buffer { get; private set; }

public SerialStreamWriteRequest(CancellationToken ct, ReadOnlyMemory<byte> buffer)
: base(ct)
public SerialStreamWriteRequest(SerialStream parent, CancellationToken ct, ReadOnlyMemory<byte> buffer)
: base(parent, ct)
{
Buffer = buffer;
}
Expand Down