-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deadlock when disposing high-frequency serial stream #1437
Comments
One very performant but incomplete solution to the issue is to detect contention on the lock between the Unfortunately we were not able to find a way to generalize the contention detection to full deadlock detection. Contention is necessary, but not sufficient, for deadlock. If there is no deadlock we face the danger of never closing the port, or closing it only on the next callback, which may arrive much later. Basically, if there is contention without a deadlock, the best approach would be to simply call There are important limitations with this approach:
Code below: public static IObservable<string> ReadLine(string portName, string newLine)
{
return Observable.Create<string>(observer =>
{
var data = string.Empty;
Action dispose = default;
var connection = SerialPortManager.ReserveConnection(portName);
SerialDataReceivedEventHandler dataReceivedHandler;
var serialPort = connection.SerialPort;
var baseStream = connection.SerialPort.BaseStream;
dataReceivedHandler = (sender, e) =>
{
switch (e.EventType)
{
case SerialData.Eof: observer.OnCompleted(); break;
case SerialData.Chars:
default:
if (serialPort.IsOpen && serialPort.BytesToRead > 0)
{
data += serialPort.ReadExisting();
var lines = data.Split(new[] { newLine }, StringSplitOptions.None);
for (int i = 0; i < lines.Length; i++)
{
if (i == lines.Length - 1) data = lines[i];
else observer.OnNext(lines[i]);
if (dispose != null)
{
// If we reach this branch, we were in deadlock
// and the dispose thread delegated to us the
// responsibility of disposing the serial port.
dispose();
break;
}
}
}
break;
}
};
connection.SerialPort.DataReceived += dataReceivedHandler;
return Disposable.Create(() =>
{
connection.SerialPort.DataReceived -= dataReceivedHandler;
// The SerialPort class holds a lock on base stream to
// ensure synchronization between calls to Dispose and
// calls to DataReceived handler.
if (Monitor.TryEnter(baseStream))
{
try { connection.Dispose(); }
finally { Monitor.Exit(baseStream); }
}
else
{
// If we reach this branch, we are in deadlock iff the
// data received thread is paused at the OnNext call.
// Note we should not need a memory barrier here since both
// threads are already inside a lock (observer lock).
Console.WriteLine("deadlock!");
dispose = connection.Dispose;
}
});
});
} |
An improved version of the above logic can be designed that will ensure disposal of the shared resource. Basically the missing piece is to still delegate disposal to the data callback, but spin a new task that double-checks that the port is disposed. If it is not, the new task will try to take the lock again, and so forth until it either takes the lock and disposes it itself, or the blocked data thread does it. This ensures that the port dispose method is called at least once. Expand for detailed solutionpublic static IObservable<string> ReadLine(string portName, string newLine)
{
return Observable.Create<string>(observer =>
{
var data = string.Empty;
Action disposeAction = default;
var connection = SerialPortManager.ReserveConnection(portName);
SerialDataReceivedEventHandler dataReceivedHandler;
var serialPort = connection.SerialPort;
var baseStream = connection.SerialPort.BaseStream;
dataReceivedHandler = (sender, e) =>
{
try
{
switch (e.EventType)
{
case SerialData.Eof: observer.OnCompleted(); break;
case SerialData.Chars:
default:
if (serialPort.IsOpen && serialPort.BytesToRead > 0)
{
data += serialPort.ReadExisting();
var lines = data.Split(new[] { newLine }, StringSplitOptions.None);
for (int i = 0; i < lines.Length; i++)
{
if (i == lines.Length - 1) data = lines[i];
else observer.OnNext(lines[i]);
}
}
break;
}
}
finally
{
// Access to the shared variable can be reordered so
// we need a call to volatile read here
var dispose = Volatile.Read(ref disposeAction);
if (dispose != null)
{
// If we reach this branch, we might be in deadlock
// so we share the responsibility of disposing the
// serial port.
dispose();
disposeAction = null;
}
}
};
connection.SerialPort.DataReceived += dataReceivedHandler;
return Disposable.Create(() =>
{
connection.SerialPort.DataReceived -= dataReceivedHandler;
// Arm the dispose call. We should not need a memory barrier here
// since both threads are already sharing a lock.
disposeAction = connection.Dispose;
void TryDispose()
{
// We do an async spin lock until someone can dispose the serial port.
// Since the dispose call is idempotent it is enough to guarantee
// at-least-once semantics
var dispose = Volatile.Read(ref disposeAction);
if (dispose == null) return;
// The SerialPort class holds a lock on base stream to
// ensure synchronization between calls to Dispose and
// calls to DataReceived handler
if (Monitor.TryEnter(baseStream))
{
// If we enter the critical section we can go ahead and
// dispose the serial port
try
{
dispose();
disposeAction = null;
}
finally { Monitor.Exit(baseStream); }
}
else
{
// If we reach this branch we may be in deadlock so we
// need to release this thread
DefaultScheduler.Instance.Schedule(TryDispose);
}
}
// Run the spin lock
TryDispose();
});
});
} However, it's not clear there is any benefit to all the code complexity given that we still need to spin another task anyway and there is no guarantee the serial port will be disposed synchronous with cancellation. It might be easier to just call dispose on the serial port asynchronously anyway: DefaultScheduler.Instance.Schedule(connection.Dispose); |
This is a reproduction of the issue first reported in bonsai-rx/harp#62. We have now confirmed the same problem is also present in the system serial port observable stream, and indeed it is caused by a deadlock on accessing the inner
serialStream
object simultaneously with cancelling an observable either on completion or on error.The issue likely stems from the fact that the
SerialPort
class has a lock in place to prevent concurrentDispose
calls andDataReceived
notifications. The observable wrappers also have their own locks to prevent concurrent notification and cancellation of subscriptions. Likely the issue stems from each thread holding their own lock.Several possibilities might be considered for resolving this:
Dispose
call in a separate task / thread (would work but would change the currently synchronous semantics of subscription cancellation)The text was updated successfully, but these errors were encountered: