Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SerialPort memory leak when requests time out and no data is received #68014

Merged
merged 4 commits into from
Aug 10, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 100 additions & 17 deletions src/libraries/System.IO.Ports/src/System/IO/Ports/SerialStream.Unix.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.IO.Ports;
Expand Down Expand Up @@ -31,8 +31,9 @@ internal sealed partial class SerialStream : Stream
private byte[] _tempBuf = new byte[1];
private Task _ioLoop;
private object _ioLoopLock = new object();
private ConcurrentQueue<SerialStreamIORequest> _readQueue = new ConcurrentQueue<SerialStreamIORequest>();
private ConcurrentQueue<SerialStreamIORequest> _writeQueue = new ConcurrentQueue<SerialStreamIORequest>();
private SynchronizedQueue<SerialStreamIORequest> _readQueue = new();
private SynchronizedQueue<SerialStreamIORequest> _writeQueue = new();
private bool _hasCancelledTasksToProcess;

private long _totalBytesRead;
private long TotalBytesAvailable => _totalBytesRead + BytesToRead;
Expand Down Expand Up @@ -353,6 +354,12 @@ internal byte ParityReplace
}
#pragma warning restore CA1822

private bool HasCancelledTasksToProcess
{
get => Volatile.Read(ref _hasCancelledTasksToProcess);
set => Volatile.Write(ref _hasCancelledTasksToProcess, value);
}

internal void DiscardInBuffer()
{
if (_handle == null) InternalResources.FileNotOpen();
Expand Down Expand Up @@ -433,7 +440,7 @@ public override Task<int> ReadAsync(byte[] array, int offset, int count, Cancell
return Task<int>.FromResult(0); // return immediately if no bytes requested; no need for overhead.

Memory<byte> buffer = new Memory<byte>(array, offset, count);
SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer);
SerialStreamReadRequest result = new SerialStreamReadRequest(this, cancellationToken, buffer);
_readQueue.Enqueue(result);

EnsureIOLoopRunning();
Expand All @@ -449,7 +456,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
if (buffer.IsEmpty)
return new ValueTask<int>(0);

SerialStreamReadRequest result = new SerialStreamReadRequest(cancellationToken, buffer);
SerialStreamReadRequest result = new SerialStreamReadRequest(this, cancellationToken, buffer);
_readQueue.Enqueue(result);

EnsureIOLoopRunning();
Expand All @@ -466,7 +473,7 @@ public override Task WriteAsync(byte[] array, int offset, int count, Cancellatio
return Task.CompletedTask; // return immediately if no bytes to write; no need for overhead.

ReadOnlyMemory<byte> buffer = new ReadOnlyMemory<byte>(array, offset, count);
SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer);
SerialStreamWriteRequest result = new SerialStreamWriteRequest(this, cancellationToken, buffer);
_writeQueue.Enqueue(result);

EnsureIOLoopRunning();
Expand All @@ -482,7 +489,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
if (buffer.IsEmpty)
return ValueTask.CompletedTask; // return immediately if no bytes to write; no need for overhead.

SerialStreamWriteRequest result = new SerialStreamWriteRequest(cancellationToken, buffer);
SerialStreamWriteRequest result = new SerialStreamWriteRequest(this, cancellationToken, buffer);
_writeQueue.Enqueue(result);

EnsureIOLoopRunning();
Expand Down Expand Up @@ -814,7 +821,7 @@ private unsafe int ProcessWrite(SerialStreamIORequest r)
}

// returns number of bytes read/written
private static int DoIORequest(ConcurrentQueue<SerialStreamIORequest> q, RequestProcessor op)
private static int DoIORequest(SynchronizedQueue<SerialStreamIORequest> q, RequestProcessor op)
{
// assumes dequeue-ing happens on a single thread
while (q.TryPeek(out SerialStreamIORequest r))
Expand Down Expand Up @@ -854,6 +861,13 @@ private void IOLoop()

while (IsOpen && !eofReceived && !_ioLoopFinished)
{
if (HasCancelledTasksToProcess)
{
HasCancelledTasksToProcess = false;
RemoveCompletedTasks(_readQueue);
RemoveCompletedTasks(_writeQueue);
}

bool hasPendingReads = !_readQueue.IsEmpty;
bool hasPendingWrites = !_writeQueue.IsEmpty;

Expand Down Expand Up @@ -964,6 +978,13 @@ private void IOLoop()
}
}

private static void RemoveCompletedTasks(SynchronizedQueue<SerialStreamIORequest> queue)
{
// assumes dequeue-ing happens on a single thread
while (queue.TryPeek(out var r) && r.IsCompleted)
queue.TryDequeue(out _);
}

private static SerialPinChange SignalsToPinChanges(Signals signals)
{
SerialPinChange pinChanges = default;
Expand Down Expand Up @@ -998,14 +1019,19 @@ private static Exception GetLastIOError()
private abstract class SerialStreamIORequest : TaskCompletionSource<int>
{
public bool IsCompleted => Task.IsCompleted;
private CancellationToken _cancellationToken;
private readonly SerialStream _parent;
private readonly CancellationTokenRegistration _cancellationTokenRegistration;

protected SerialStreamIORequest(CancellationToken ct)
protected SerialStreamIORequest(SerialStream parent, CancellationToken ct)
: base(TaskCreationOptions.RunContinuationsAsynchronously)
{
_cancellationToken = ct;
_cancellationTokenRegistration = ct.Register(s => ((TaskCompletionSource<int>)s).TrySetCanceled(), this);
_parent = parent;
_cancellationTokenRegistration = ct.Register(s =>
{
var request = (SerialStreamIORequest)s;
request.TrySetCanceled();
request._parent.HasCancelledTasksToProcess = true;
}, this);
}

internal void Complete(int numBytes)
Expand All @@ -1023,10 +1049,10 @@ internal void Complete(Exception exception)

private sealed class SerialStreamReadRequest : SerialStreamIORequest
{
public Memory<byte> Buffer { get; private set; }
public Memory<byte> Buffer { get; }

public SerialStreamReadRequest(CancellationToken ct, Memory<byte> buffer)
: base(ct)
public SerialStreamReadRequest(SerialStream parent, CancellationToken ct, Memory<byte> buffer)
: base(parent, ct)
{
Buffer = buffer;
}
Expand All @@ -1036,8 +1062,8 @@ private sealed class SerialStreamWriteRequest : SerialStreamIORequest
{
public ReadOnlyMemory<byte> Buffer { get; private set; }

public SerialStreamWriteRequest(CancellationToken ct, ReadOnlyMemory<byte> buffer)
: base(ct)
public SerialStreamWriteRequest(SerialStream parent, CancellationToken ct, ReadOnlyMemory<byte> buffer)
: base(parent, ct)
{
Buffer = buffer;
}
Expand All @@ -1053,5 +1079,62 @@ internal void ProcessBytes(int numBytes)
Buffer = Buffer.Slice(numBytes);
}
}

// Use a custom queue instead of ConcurrentQueue because ConcurrentQueue preserves segments for
// observation when using TryPeek(). These segments will not clear out references after a dequeue
// and as a result they hold on to SerialStreamIORequest instances so that they cannot be GC'ed.
// This in turn means that any buffers that the client supplied are not eligible for GC either.
bklop marked this conversation as resolved.
Show resolved Hide resolved
private sealed class SynchronizedQueue<T>
{
private readonly Queue<T> _queue = new();
private readonly object _lock = new();

public bool IsEmpty
{
get
{
lock (_lock)
{
return _queue.Count == 0;
}
}
}

public void Enqueue(T item)
{
lock (_lock)
{
_queue.Enqueue(item);
}
}

public bool TryPeek(out T result)
{
lock (_lock)
{
if (_queue.Count > 0)
{
result = _queue.Peek();
return true;
}
}
result = default;
return false;
}

public bool TryDequeue(out T result)
{
lock (_lock)
{
if (_queue.Count > 0)
{
result = _queue.Dequeue();
return true;
}
}
result = default;
return false;
}
}
}
}