Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor serial transport to avoid event callbacks #66

Merged
merged 2 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Bonsai.Harp/AsyncDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public AsyncDevice(string portName)
{
response = new Subject<HarpMessage>();
transport = new SerialTransport(portName, response);
transport.IgnoreErrors = true;
transport.Open();
}

internal AsyncDevice(string portName, bool leaveOpen)
Expand Down
4 changes: 2 additions & 2 deletions Bonsai.Harp/BufferedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public int PushBytes(int count)
if (writeOffset >= readOffset)
{
bytesWritten = Math.Min(readBuffer.Length - writeOffset, count);
serialStream.Read(readBuffer, writeOffset, bytesWritten);
bytesWritten = serialStream.Read(readBuffer, writeOffset, bytesWritten);
writeOffset = (writeOffset + bytesWritten) % readBuffer.Length;
count -= bytesWritten;
if (count == 0) return bytesWritten;
}

var remaining = Math.Min(readOffset - writeOffset, count);
serialStream.Read(readBuffer, writeOffset, remaining);
remaining = serialStream.Read(readBuffer, writeOffset, remaining);
writeOffset = (writeOffset + remaining) % readBuffer.Length;
return bytesWritten + remaining;
}
Expand Down
17 changes: 5 additions & 12 deletions Bonsai.Harp/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ static IObservable<string> GetDeviceName(string portName, LedState ledState, Led
observer.OnCompleted);
transport = new SerialTransport(portName, messageObserver);
transport.IgnoreErrors = true;
transport.Open();

transport.Write(writeOpCtrl);
return transport;
Expand All @@ -266,7 +265,7 @@ public override IObservable<HarpMessage> Generate()
return Observable.Create<HarpMessage>(async (observer, cancellationToken) =>
{
var transport = await CreateTransportAsync(observer, cancellationToken);
var cleanup = Disposable.Create(() =>
return Disposable.Create(() =>
{
var writeOpCtrl = OperationControl.FromPayload(MessageType.Write, new OperationControlPayload(
OperationMode.Standby,
Expand All @@ -276,11 +275,8 @@ public override IObservable<HarpMessage> Generate()
OperationLed,
Heartbeat));
transport.Write(writeOpCtrl);
transport.Close();
});

return new CompositeDisposable(
cleanup,
transport);
});
}

Expand All @@ -301,7 +297,7 @@ public IObservable<HarpMessage> Generate(IObservable<HarpMessage> source)
observer.OnError,
observer.OnCompleted);

var cleanup = Disposable.Create(() =>
return Disposable.Create(() =>
{
var writeOpCtrl = OperationControl.FromPayload(MessageType.Write, new OperationControlPayload(
OperationMode.Standby,
Expand All @@ -311,12 +307,9 @@ public IObservable<HarpMessage> Generate(IObservable<HarpMessage> source)
OperationLed,
Heartbeat));
transport.Write(writeOpCtrl);
sourceDisposable.Dispose();
transport.Close();
});

return new CompositeDisposable(
cleanup,
sourceDisposable,
transport);
});
}

Expand Down
125 changes: 54 additions & 71 deletions Bonsai.Harp/FileDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,6 @@ namespace Bonsai.Harp
[Description("Produces a sequence of Harp messages from a previously recorded data file.")]
public class FileDevice : Source<HarpMessage>
{
readonly IObservable<HarpMessage> source;
readonly object captureLock = new object();
const int ReadBufferSize = 4096;

/// <summary>
/// Initializes a new instance of the <see cref="FileDevice"/> class.
/// </summary>
public FileDevice()
{
PlaybackRate = 1;
source = Observable.Create<HarpMessage>((observer, cancellationToken) =>
{
return Task.Factory.StartNew(() =>
{
lock (captureLock)
{
using (var stream = new FileStream(FileName, FileMode.Open))
using (var waitSignal = new ManualResetEvent(false))
{
double timestampOffset = 0;
var stopwatch = new Stopwatch();

var harpObserver = Observer.Create<HarpMessage>(
value =>
{
var playbackRate = PlaybackRate;
if (playbackRate.HasValue && value.TryGetTimestamp(out double timestamp))
{
timestamp *= 1000.0 / playbackRate.Value; //ms
if (!stopwatch.IsRunning ||
value.MessageType == MessageType.Write &&
value.Address == TimestampSeconds.Address &&
value.PayloadType == (PayloadType.Timestamp | TimestampSeconds.RegisterType))
{
stopwatch.Restart();
timestampOffset = timestamp;
}

var waitInterval = timestamp - timestampOffset - stopwatch.ElapsedMilliseconds;
if (waitInterval > 0)
{
waitSignal.WaitOne((int)waitInterval);
}
}

observer.OnNext(value);
},
observer.OnError,
observer.OnCompleted);
var transport = new StreamTransport(harpObserver);
transport.IgnoreErrors = IgnoreErrors;

long bytesToRead;
while (!cancellationToken.IsCancellationRequested &&
(bytesToRead = Math.Min(ReadBufferSize, stream.Length - stream.Position)) > 0)
{
transport.ReceiveData(stream, ReadBufferSize, (int)bytesToRead);
}
}
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
})
.PublishReconnectable()
.RefCount();
}

/// <summary>
/// Gets or sets the path to the binary file containing Harp messages to playback.
/// </summary>
Expand All @@ -102,7 +33,7 @@ public FileDevice()
/// no rate is specified, playback will be done as fast as possible.
/// </summary>
[Description("The optional rate multiplier to either slowdown or speedup the playback. If no rate is specified, playback will be done as fast as possible.")]
public double? PlaybackRate { get; set; }
public double? PlaybackRate { get; set; } = 1;

/// <summary>
/// Opens the specified file name and returns the observable sequence of Harp messages
Expand All @@ -111,7 +42,59 @@ public FileDevice()
/// <returns>The observable sequence of Harp messages stored in the binary file.</returns>
public override IObservable<HarpMessage> Generate()
{
return source;
const int ReadBufferSize = 4096;
var fileName = FileName;
var ignoreErrors = IgnoreErrors;
return Observable.Create<HarpMessage>((observer, cancellationToken) =>
{
return Task.Factory.StartNew(() =>
{
using var stream = new FileStream(fileName, FileMode.Open);
using var waitSignal = new ManualResetEvent(false);
double timestampOffset = 0;
var stopwatch = new Stopwatch();

var harpObserver = Observer.Create<HarpMessage>(
value =>
{
var playbackRate = PlaybackRate;
if (playbackRate.HasValue && value.TryGetTimestamp(out double timestamp))
{
timestamp *= 1000.0 / playbackRate.Value; //ms
if (!stopwatch.IsRunning ||
value.MessageType == MessageType.Write &&
value.Address == TimestampSeconds.Address &&
value.PayloadType == (PayloadType.Timestamp | TimestampSeconds.RegisterType))
{
stopwatch.Restart();
timestampOffset = timestamp;
}

var waitInterval = timestamp - timestampOffset - stopwatch.ElapsedMilliseconds;
if (waitInterval > 0)
{
waitSignal.WaitOne((int)waitInterval);
}
}

observer.OnNext(value);
},
observer.OnError,
observer.OnCompleted);
var transport = new StreamTransport(harpObserver);
transport.IgnoreErrors = ignoreErrors;

long bytesToRead;
while (!cancellationToken.IsCancellationRequested &&
(bytesToRead = Math.Min(ReadBufferSize, stream.Length - stream.Position)) > 0)
{
transport.ReceiveData(stream, ReadBufferSize, (int)bytesToRead);
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
});
}
}
}
57 changes: 39 additions & 18 deletions Bonsai.Harp/SerialTransport.cs
Original file line number Diff line number Diff line change
@@ -1,52 +1,73 @@
using System;
using System.IO.Ports;
using System.Threading;
using System.Threading.Tasks;

namespace Bonsai.Harp
{
class SerialTransport : StreamTransport, IDisposable
{
const int DefaultBaudRate = 1000000;
const int DefaultReadBufferSize = 1048576; // 2^20 = 1 MB
readonly CancellationTokenSource taskCancellation;
readonly SerialPort serialPort;
bool disposed;

public SerialTransport(string portName, IObserver<HarpMessage> observer)
: base(observer)
{
IgnoreErrors = true;
taskCancellation = new CancellationTokenSource();
serialPort = new SerialPort(portName, DefaultBaudRate, Parity.None, 8, StopBits.One);
serialPort.ReadBufferSize = DefaultReadBufferSize;
serialPort.Handshake = Handshake.RequestToSend;
serialPort.DataReceived += serialPort_DataReceived;
serialPort.ErrorReceived += serialPort_ErrorReceived;
RunAsync(taskCancellation.Token);
}

void serialPort_ErrorReceived(object sender, SerialErrorReceivedEventArgs e)
{
//TODO: Create exception with the error state and send to observer
}

public void Open()
Task RunAsync(CancellationToken cancellationToken)
{
serialPort.Open();
return Task.Factory.StartNew(() =>
{
using var cancellation = cancellationToken.Register(serialPort.Dispose);
while (!cancellationToken.IsCancellationRequested)
{
try
{
var bytesToRead = serialPort.BytesToRead;
if (bytesToRead == 0)
{
PushData(serialPort.BaseStream, serialPort.ReadBufferSize, count: 1);
bytesToRead = serialPort.BytesToRead;
}

ReceiveData(serialPort.BaseStream, serialPort.ReadBufferSize, bytesToRead);
}
catch (Exception ex)
{
if (!cancellationToken.IsCancellationRequested)
{
OnError(ex);
}
break;
}
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}

public void Write(HarpMessage input)
{
serialPort.Write(input.MessageBytes, 0, input.MessageBytes.Length);
}

void serialPort_DataReceived(object sender, SerialDataReceivedEventArgs e)
{
try { ReceiveData(serialPort.BaseStream, serialPort.ReadBufferSize, serialPort.BytesToRead); }
catch (InvalidOperationException) { }
}

public void Close()
{
if (!disposed)
if (!taskCancellation.IsCancellationRequested)
{
serialPort.Dispose();
disposed = true;
taskCancellation.Cancel();
taskCancellation.Dispose();
}
}

Expand Down
17 changes: 14 additions & 3 deletions Bonsai.Harp/StreamTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ static bool CheckType(byte type)
return true;
}

internal void PushData(Stream stream, int readBufferSize, int count)
{
bufferedStream = bufferedStream ?? new BufferedStream(stream, readBufferSize);
bufferedStream.PushBytes(count);
}

internal void ReceiveData(Stream stream, int readBufferSize, int bytesToRead)
{
try
{
bufferedStream = bufferedStream ?? new BufferedStream(stream, readBufferSize);
bufferedStream.PushBytes(bytesToRead);
PushData(stream, readBufferSize, bytesToRead);
bytesToRead = bufferedStream.BytesToRead;

while (bytesToRead > 0)
{
Expand Down Expand Up @@ -140,8 +146,13 @@ internal void ReceiveData(Stream stream, int readBufferSize, int bytesToRead)
}
catch (Exception ex)
{
observer.OnError(ex);
OnError(ex);
}
}

internal void OnError(Exception error)
{
observer.OnError(error);
}
}
}