Skip to content

Commit

Permalink
Merge pull request #66 from glopesdev/issue-62
Browse files Browse the repository at this point in the history
Refactor serial transport to avoid event callbacks
  • Loading branch information
glopesdev authored Jul 7, 2023
2 parents ac2b856 + 8e092cf commit 5f0a8b9
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 108 deletions.
2 changes: 0 additions & 2 deletions Bonsai.Harp/AsyncDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public AsyncDevice(string portName)
{
response = new Subject<HarpMessage>();
transport = new SerialTransport(portName, response);
transport.IgnoreErrors = true;
transport.Open();
}

internal AsyncDevice(string portName, bool leaveOpen)
Expand Down
4 changes: 2 additions & 2 deletions Bonsai.Harp/BufferedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public int PushBytes(int count)
if (writeOffset >= readOffset)
{
bytesWritten = Math.Min(readBuffer.Length - writeOffset, count);
serialStream.Read(readBuffer, writeOffset, bytesWritten);
bytesWritten = serialStream.Read(readBuffer, writeOffset, bytesWritten);
writeOffset = (writeOffset + bytesWritten) % readBuffer.Length;
count -= bytesWritten;
if (count == 0) return bytesWritten;
}

var remaining = Math.Min(readOffset - writeOffset, count);
serialStream.Read(readBuffer, writeOffset, remaining);
remaining = serialStream.Read(readBuffer, writeOffset, remaining);
writeOffset = (writeOffset + remaining) % readBuffer.Length;
return bytesWritten + remaining;
}
Expand Down
17 changes: 5 additions & 12 deletions Bonsai.Harp/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ static IObservable<string> GetDeviceName(string portName, LedState ledState, Led
observer.OnCompleted);
transport = new SerialTransport(portName, messageObserver);
transport.IgnoreErrors = true;
transport.Open();

transport.Write(writeOpCtrl);
return transport;
Expand All @@ -266,7 +265,7 @@ public override IObservable<HarpMessage> Generate()
return Observable.Create<HarpMessage>(async (observer, cancellationToken) =>
{
var transport = await CreateTransportAsync(observer, cancellationToken);
var cleanup = Disposable.Create(() =>
return Disposable.Create(() =>
{
var writeOpCtrl = OperationControl.FromPayload(MessageType.Write, new OperationControlPayload(
OperationMode.Standby,
Expand All @@ -276,11 +275,8 @@ public override IObservable<HarpMessage> Generate()
OperationLed,
Heartbeat));
transport.Write(writeOpCtrl);
transport.Close();
});

return new CompositeDisposable(
cleanup,
transport);
});
}

Expand All @@ -301,7 +297,7 @@ public IObservable<HarpMessage> Generate(IObservable<HarpMessage> source)
observer.OnError,
observer.OnCompleted);

var cleanup = Disposable.Create(() =>
return Disposable.Create(() =>
{
var writeOpCtrl = OperationControl.FromPayload(MessageType.Write, new OperationControlPayload(
OperationMode.Standby,
Expand All @@ -311,12 +307,9 @@ public IObservable<HarpMessage> Generate(IObservable<HarpMessage> source)
OperationLed,
Heartbeat));
transport.Write(writeOpCtrl);
sourceDisposable.Dispose();
transport.Close();
});

return new CompositeDisposable(
cleanup,
sourceDisposable,
transport);
});
}

Expand Down
125 changes: 54 additions & 71 deletions Bonsai.Harp/FileDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,6 @@ namespace Bonsai.Harp
[Description("Produces a sequence of Harp messages from a previously recorded data file.")]
public class FileDevice : Source<HarpMessage>
{
readonly IObservable<HarpMessage> source;
readonly object captureLock = new object();
const int ReadBufferSize = 4096;

/// <summary>
/// Initializes a new instance of the <see cref="FileDevice"/> class.
/// </summary>
public FileDevice()
{
PlaybackRate = 1;
source = Observable.Create<HarpMessage>((observer, cancellationToken) =>
{
return Task.Factory.StartNew(() =>
{
lock (captureLock)
{
using (var stream = new FileStream(FileName, FileMode.Open))
using (var waitSignal = new ManualResetEvent(false))
{
double timestampOffset = 0;
var stopwatch = new Stopwatch();

var harpObserver = Observer.Create<HarpMessage>(
value =>
{
var playbackRate = PlaybackRate;
if (playbackRate.HasValue && value.TryGetTimestamp(out double timestamp))
{
timestamp *= 1000.0 / playbackRate.Value; //ms
if (!stopwatch.IsRunning ||
value.MessageType == MessageType.Write &&
value.Address == TimestampSeconds.Address &&
value.PayloadType == (PayloadType.Timestamp | TimestampSeconds.RegisterType))
{
stopwatch.Restart();
timestampOffset = timestamp;
}

var waitInterval = timestamp - timestampOffset - stopwatch.ElapsedMilliseconds;
if (waitInterval > 0)
{
waitSignal.WaitOne((int)waitInterval);
}
}

observer.OnNext(value);
},
observer.OnError,
observer.OnCompleted);
var transport = new StreamTransport(harpObserver);
transport.IgnoreErrors = IgnoreErrors;

long bytesToRead;
while (!cancellationToken.IsCancellationRequested &&
(bytesToRead = Math.Min(ReadBufferSize, stream.Length - stream.Position)) > 0)
{
transport.ReceiveData(stream, ReadBufferSize, (int)bytesToRead);
}
}
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
})
.PublishReconnectable()
.RefCount();
}

/// <summary>
/// Gets or sets the path to the binary file containing Harp messages to playback.
/// </summary>
Expand All @@ -102,7 +33,7 @@ public FileDevice()
/// no rate is specified, playback will be done as fast as possible.
/// </summary>
[Description("The optional rate multiplier to either slowdown or speedup the playback. If no rate is specified, playback will be done as fast as possible.")]
public double? PlaybackRate { get; set; }
public double? PlaybackRate { get; set; } = 1;

/// <summary>
/// Opens the specified file name and returns the observable sequence of Harp messages
Expand All @@ -111,7 +42,59 @@ public FileDevice()
/// <returns>The observable sequence of Harp messages stored in the binary file.</returns>
public override IObservable<HarpMessage> Generate()
{
return source;
const int ReadBufferSize = 4096;
var fileName = FileName;
var ignoreErrors = IgnoreErrors;
return Observable.Create<HarpMessage>((observer, cancellationToken) =>
{
return Task.Factory.StartNew(() =>
{
using var stream = new FileStream(fileName, FileMode.Open);
using var waitSignal = new ManualResetEvent(false);
double timestampOffset = 0;
var stopwatch = new Stopwatch();

var harpObserver = Observer.Create<HarpMessage>(
value =>
{
var playbackRate = PlaybackRate;
if (playbackRate.HasValue && value.TryGetTimestamp(out double timestamp))
{
timestamp *= 1000.0 / playbackRate.Value; //ms
if (!stopwatch.IsRunning ||
value.MessageType == MessageType.Write &&
value.Address == TimestampSeconds.Address &&
value.PayloadType == (PayloadType.Timestamp | TimestampSeconds.RegisterType))
{
stopwatch.Restart();
timestampOffset = timestamp;
}

var waitInterval = timestamp - timestampOffset - stopwatch.ElapsedMilliseconds;
if (waitInterval > 0)
{
waitSignal.WaitOne((int)waitInterval);
}
}

observer.OnNext(value);
},
observer.OnError,
observer.OnCompleted);
var transport = new StreamTransport(harpObserver);
transport.IgnoreErrors = ignoreErrors;

long bytesToRead;
while (!cancellationToken.IsCancellationRequested &&
(bytesToRead = Math.Min(ReadBufferSize, stream.Length - stream.Position)) > 0)
{
transport.ReceiveData(stream, ReadBufferSize, (int)bytesToRead);
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
});
}
}
}
57 changes: 39 additions & 18 deletions Bonsai.Harp/SerialTransport.cs
Original file line number Diff line number Diff line change
@@ -1,52 +1,73 @@
using System;
using System.IO.Ports;
using System.Threading;
using System.Threading.Tasks;

namespace Bonsai.Harp
{
class SerialTransport : StreamTransport, IDisposable
{
const int DefaultBaudRate = 1000000;
const int DefaultReadBufferSize = 1048576; // 2^20 = 1 MB
readonly CancellationTokenSource taskCancellation;
readonly SerialPort serialPort;
bool disposed;

public SerialTransport(string portName, IObserver<HarpMessage> observer)
: base(observer)
{
IgnoreErrors = true;
taskCancellation = new CancellationTokenSource();
serialPort = new SerialPort(portName, DefaultBaudRate, Parity.None, 8, StopBits.One);
serialPort.ReadBufferSize = DefaultReadBufferSize;
serialPort.Handshake = Handshake.RequestToSend;
serialPort.DataReceived += serialPort_DataReceived;
serialPort.ErrorReceived += serialPort_ErrorReceived;
RunAsync(taskCancellation.Token);
}

void serialPort_ErrorReceived(object sender, SerialErrorReceivedEventArgs e)
{
//TODO: Create exception with the error state and send to observer
}

public void Open()
Task RunAsync(CancellationToken cancellationToken)
{
serialPort.Open();
return Task.Factory.StartNew(() =>
{
using var cancellation = cancellationToken.Register(serialPort.Dispose);
while (!cancellationToken.IsCancellationRequested)
{
try
{
var bytesToRead = serialPort.BytesToRead;
if (bytesToRead == 0)
{
PushData(serialPort.BaseStream, serialPort.ReadBufferSize, count: 1);
bytesToRead = serialPort.BytesToRead;
}

ReceiveData(serialPort.BaseStream, serialPort.ReadBufferSize, bytesToRead);
}
catch (Exception ex)
{
if (!cancellationToken.IsCancellationRequested)
{
OnError(ex);
}
break;
}
}
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}

public void Write(HarpMessage input)
{
serialPort.Write(input.MessageBytes, 0, input.MessageBytes.Length);
}

void serialPort_DataReceived(object sender, SerialDataReceivedEventArgs e)
{
try { ReceiveData(serialPort.BaseStream, serialPort.ReadBufferSize, serialPort.BytesToRead); }
catch (InvalidOperationException) { }
}

public void Close()
{
if (!disposed)
if (!taskCancellation.IsCancellationRequested)
{
serialPort.Dispose();
disposed = true;
taskCancellation.Cancel();
taskCancellation.Dispose();
}
}

Expand Down
17 changes: 14 additions & 3 deletions Bonsai.Harp/StreamTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ static bool CheckType(byte type)
return true;
}

internal void PushData(Stream stream, int readBufferSize, int count)
{
bufferedStream = bufferedStream ?? new BufferedStream(stream, readBufferSize);
bufferedStream.PushBytes(count);
}

internal void ReceiveData(Stream stream, int readBufferSize, int bytesToRead)
{
try
{
bufferedStream = bufferedStream ?? new BufferedStream(stream, readBufferSize);
bufferedStream.PushBytes(bytesToRead);
PushData(stream, readBufferSize, bytesToRead);
bytesToRead = bufferedStream.BytesToRead;

while (bytesToRead > 0)
{
Expand Down Expand Up @@ -140,8 +146,13 @@ internal void ReceiveData(Stream stream, int readBufferSize, int bytesToRead)
}
catch (Exception ex)
{
observer.OnError(ex);
OnError(ex);
}
}

internal void OnError(Exception error)
{
observer.OnError(error);
}
}
}

0 comments on commit 5f0a8b9

Please sign in to comment.