From e260b95c2ee30d8a8a39403a33587975805d7f7f Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 5 Jul 2023 17:54:22 +0100 Subject: [PATCH 1/2] Allow playing back multiple files simultaneously --- Bonsai.Harp/FileDevice.cs | 125 ++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 71 deletions(-) diff --git a/Bonsai.Harp/FileDevice.cs b/Bonsai.Harp/FileDevice.cs index c4f25b7..719003b 100644 --- a/Bonsai.Harp/FileDevice.cs +++ b/Bonsai.Harp/FileDevice.cs @@ -15,75 +15,6 @@ namespace Bonsai.Harp [Description("Produces a sequence of Harp messages from a previously recorded data file.")] public class FileDevice : Source { - readonly IObservable source; - readonly object captureLock = new object(); - const int ReadBufferSize = 4096; - - /// - /// Initializes a new instance of the class. - /// - public FileDevice() - { - PlaybackRate = 1; - source = Observable.Create((observer, cancellationToken) => - { - return Task.Factory.StartNew(() => - { - lock (captureLock) - { - using (var stream = new FileStream(FileName, FileMode.Open)) - using (var waitSignal = new ManualResetEvent(false)) - { - double timestampOffset = 0; - var stopwatch = new Stopwatch(); - - var harpObserver = Observer.Create( - value => - { - var playbackRate = PlaybackRate; - if (playbackRate.HasValue && value.TryGetTimestamp(out double timestamp)) - { - timestamp *= 1000.0 / playbackRate.Value; //ms - if (!stopwatch.IsRunning || - value.MessageType == MessageType.Write && - value.Address == TimestampSeconds.Address && - value.PayloadType == (PayloadType.Timestamp | TimestampSeconds.RegisterType)) - { - stopwatch.Restart(); - timestampOffset = timestamp; - } - - var waitInterval = timestamp - timestampOffset - stopwatch.ElapsedMilliseconds; - if (waitInterval > 0) - { - waitSignal.WaitOne((int)waitInterval); - } - } - - observer.OnNext(value); - }, - observer.OnError, - observer.OnCompleted); - var transport = new StreamTransport(harpObserver); - transport.IgnoreErrors = IgnoreErrors; - - long bytesToRead; - while (!cancellationToken.IsCancellationRequested && - (bytesToRead = Math.Min(ReadBufferSize, stream.Length - stream.Position)) > 0) - { - transport.ReceiveData(stream, ReadBufferSize, (int)bytesToRead); - } - } - } - }, - cancellationToken, - TaskCreationOptions.LongRunning, - TaskScheduler.Default); - }) - .PublishReconnectable() - .RefCount(); - } - /// /// Gets or sets the path to the binary file containing Harp messages to playback. /// @@ -102,7 +33,7 @@ public FileDevice() /// no rate is specified, playback will be done as fast as possible. /// [Description("The optional rate multiplier to either slowdown or speedup the playback. If no rate is specified, playback will be done as fast as possible.")] - public double? PlaybackRate { get; set; } + public double? PlaybackRate { get; set; } = 1; /// /// Opens the specified file name and returns the observable sequence of Harp messages @@ -111,7 +42,59 @@ public FileDevice() /// The observable sequence of Harp messages stored in the binary file. public override IObservable Generate() { - return source; + const int ReadBufferSize = 4096; + var fileName = FileName; + var ignoreErrors = IgnoreErrors; + return Observable.Create((observer, cancellationToken) => + { + return Task.Factory.StartNew(() => + { + using var stream = new FileStream(fileName, FileMode.Open); + using var waitSignal = new ManualResetEvent(false); + double timestampOffset = 0; + var stopwatch = new Stopwatch(); + + var harpObserver = Observer.Create( + value => + { + var playbackRate = PlaybackRate; + if (playbackRate.HasValue && value.TryGetTimestamp(out double timestamp)) + { + timestamp *= 1000.0 / playbackRate.Value; //ms + if (!stopwatch.IsRunning || + value.MessageType == MessageType.Write && + value.Address == TimestampSeconds.Address && + value.PayloadType == (PayloadType.Timestamp | TimestampSeconds.RegisterType)) + { + stopwatch.Restart(); + timestampOffset = timestamp; + } + + var waitInterval = timestamp - timestampOffset - stopwatch.ElapsedMilliseconds; + if (waitInterval > 0) + { + waitSignal.WaitOne((int)waitInterval); + } + } + + observer.OnNext(value); + }, + observer.OnError, + observer.OnCompleted); + var transport = new StreamTransport(harpObserver); + transport.IgnoreErrors = ignoreErrors; + + long bytesToRead; + while (!cancellationToken.IsCancellationRequested && + (bytesToRead = Math.Min(ReadBufferSize, stream.Length - stream.Position)) > 0) + { + transport.ReceiveData(stream, ReadBufferSize, (int)bytesToRead); + } + }, + cancellationToken, + TaskCreationOptions.LongRunning, + TaskScheduler.Default); + }); } } } From 8e092cf0f00d179b58148fd9adc2b8c127aaadd8 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Wed, 5 Jul 2023 18:01:33 +0100 Subject: [PATCH 2/2] Refactor serial transport to avoid event callbacks --- Bonsai.Harp/AsyncDevice.cs | 2 -- Bonsai.Harp/BufferedStream.cs | 4 +-- Bonsai.Harp/Device.cs | 17 +++------- Bonsai.Harp/SerialTransport.cs | 57 +++++++++++++++++++++++----------- Bonsai.Harp/StreamTransport.cs | 17 ++++++++-- 5 files changed, 60 insertions(+), 37 deletions(-) diff --git a/Bonsai.Harp/AsyncDevice.cs b/Bonsai.Harp/AsyncDevice.cs index 84746f2..95e65fe 100644 --- a/Bonsai.Harp/AsyncDevice.cs +++ b/Bonsai.Harp/AsyncDevice.cs @@ -24,8 +24,6 @@ public AsyncDevice(string portName) { response = new Subject(); transport = new SerialTransport(portName, response); - transport.IgnoreErrors = true; - transport.Open(); } internal AsyncDevice(string portName, bool leaveOpen) diff --git a/Bonsai.Harp/BufferedStream.cs b/Bonsai.Harp/BufferedStream.cs index c7dbf7d..e554c26 100644 --- a/Bonsai.Harp/BufferedStream.cs +++ b/Bonsai.Harp/BufferedStream.cs @@ -70,14 +70,14 @@ public int PushBytes(int count) if (writeOffset >= readOffset) { bytesWritten = Math.Min(readBuffer.Length - writeOffset, count); - serialStream.Read(readBuffer, writeOffset, bytesWritten); + bytesWritten = serialStream.Read(readBuffer, writeOffset, bytesWritten); writeOffset = (writeOffset + bytesWritten) % readBuffer.Length; count -= bytesWritten; if (count == 0) return bytesWritten; } var remaining = Math.Min(readOffset - writeOffset, count); - serialStream.Read(readBuffer, writeOffset, remaining); + remaining = serialStream.Read(readBuffer, writeOffset, remaining); writeOffset = (writeOffset + remaining) % readBuffer.Length; return bytesWritten + remaining; } diff --git a/Bonsai.Harp/Device.cs b/Bonsai.Harp/Device.cs index 2d64d79..4d0253f 100644 --- a/Bonsai.Harp/Device.cs +++ b/Bonsai.Harp/Device.cs @@ -246,7 +246,6 @@ static IObservable GetDeviceName(string portName, LedState ledState, Led observer.OnCompleted); transport = new SerialTransport(portName, messageObserver); transport.IgnoreErrors = true; - transport.Open(); transport.Write(writeOpCtrl); return transport; @@ -266,7 +265,7 @@ public override IObservable Generate() return Observable.Create(async (observer, cancellationToken) => { var transport = await CreateTransportAsync(observer, cancellationToken); - var cleanup = Disposable.Create(() => + return Disposable.Create(() => { var writeOpCtrl = OperationControl.FromPayload(MessageType.Write, new OperationControlPayload( OperationMode.Standby, @@ -276,11 +275,8 @@ public override IObservable Generate() OperationLed, Heartbeat)); transport.Write(writeOpCtrl); + transport.Close(); }); - - return new CompositeDisposable( - cleanup, - transport); }); } @@ -301,7 +297,7 @@ public IObservable Generate(IObservable source) observer.OnError, observer.OnCompleted); - var cleanup = Disposable.Create(() => + return Disposable.Create(() => { var writeOpCtrl = OperationControl.FromPayload(MessageType.Write, new OperationControlPayload( OperationMode.Standby, @@ -311,12 +307,9 @@ public IObservable Generate(IObservable source) OperationLed, Heartbeat)); transport.Write(writeOpCtrl); + sourceDisposable.Dispose(); + transport.Close(); }); - - return new CompositeDisposable( - cleanup, - sourceDisposable, - transport); }); } diff --git a/Bonsai.Harp/SerialTransport.cs b/Bonsai.Harp/SerialTransport.cs index 810d676..d6186c6 100644 --- a/Bonsai.Harp/SerialTransport.cs +++ b/Bonsai.Harp/SerialTransport.cs @@ -1,5 +1,7 @@ using System; using System.IO.Ports; +using System.Threading; +using System.Threading.Tasks; namespace Bonsai.Harp { @@ -7,27 +9,52 @@ class SerialTransport : StreamTransport, IDisposable { const int DefaultBaudRate = 1000000; const int DefaultReadBufferSize = 1048576; // 2^20 = 1 MB + readonly CancellationTokenSource taskCancellation; readonly SerialPort serialPort; - bool disposed; public SerialTransport(string portName, IObserver observer) : base(observer) { + IgnoreErrors = true; + taskCancellation = new CancellationTokenSource(); serialPort = new SerialPort(portName, DefaultBaudRate, Parity.None, 8, StopBits.One); serialPort.ReadBufferSize = DefaultReadBufferSize; serialPort.Handshake = Handshake.RequestToSend; - serialPort.DataReceived += serialPort_DataReceived; - serialPort.ErrorReceived += serialPort_ErrorReceived; + RunAsync(taskCancellation.Token); } - void serialPort_ErrorReceived(object sender, SerialErrorReceivedEventArgs e) - { - //TODO: Create exception with the error state and send to observer - } - - public void Open() + Task RunAsync(CancellationToken cancellationToken) { serialPort.Open(); + return Task.Factory.StartNew(() => + { + using var cancellation = cancellationToken.Register(serialPort.Dispose); + while (!cancellationToken.IsCancellationRequested) + { + try + { + var bytesToRead = serialPort.BytesToRead; + if (bytesToRead == 0) + { + PushData(serialPort.BaseStream, serialPort.ReadBufferSize, count: 1); + bytesToRead = serialPort.BytesToRead; + } + + ReceiveData(serialPort.BaseStream, serialPort.ReadBufferSize, bytesToRead); + } + catch (Exception ex) + { + if (!cancellationToken.IsCancellationRequested) + { + OnError(ex); + } + break; + } + } + }, + cancellationToken, + TaskCreationOptions.LongRunning, + TaskScheduler.Default); } public void Write(HarpMessage input) @@ -35,18 +62,12 @@ public void Write(HarpMessage input) serialPort.Write(input.MessageBytes, 0, input.MessageBytes.Length); } - void serialPort_DataReceived(object sender, SerialDataReceivedEventArgs e) - { - try { ReceiveData(serialPort.BaseStream, serialPort.ReadBufferSize, serialPort.BytesToRead); } - catch (InvalidOperationException) { } - } - public void Close() { - if (!disposed) + if (!taskCancellation.IsCancellationRequested) { - serialPort.Dispose(); - disposed = true; + taskCancellation.Cancel(); + taskCancellation.Dispose(); } } diff --git a/Bonsai.Harp/StreamTransport.cs b/Bonsai.Harp/StreamTransport.cs index 8461e18..c7e6b34 100644 --- a/Bonsai.Harp/StreamTransport.cs +++ b/Bonsai.Harp/StreamTransport.cs @@ -51,12 +51,18 @@ static bool CheckType(byte type) return true; } + internal void PushData(Stream stream, int readBufferSize, int count) + { + bufferedStream = bufferedStream ?? new BufferedStream(stream, readBufferSize); + bufferedStream.PushBytes(count); + } + internal void ReceiveData(Stream stream, int readBufferSize, int bytesToRead) { try { - bufferedStream = bufferedStream ?? new BufferedStream(stream, readBufferSize); - bufferedStream.PushBytes(bytesToRead); + PushData(stream, readBufferSize, bytesToRead); + bytesToRead = bufferedStream.BytesToRead; while (bytesToRead > 0) { @@ -140,8 +146,13 @@ internal void ReceiveData(Stream stream, int readBufferSize, int bytesToRead) } catch (Exception ex) { - observer.OnError(ex); + OnError(ex); } } + + internal void OnError(Exception error) + { + observer.OnError(error); + } } }