Skip to content

Commit

Permalink
Add out-of-frame byte support to S101Reader, S101Writer and S101Clien…
Browse files Browse the repository at this point in the history
…t and adapt tests accordingly

References #2
  • Loading branch information
andreashuber-lawo committed Nov 8, 2015
1 parent f031e6b commit 6a7fa34
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 58 deletions.
1 change: 1 addition & 0 deletions Lawo.EmberPlusSharp/Lawo.EmberPlusSharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
<Compile Include="S101\EventInfo.cs" />
<Compile Include="S101\LogNames.cs" />
<Compile Include="Glow\GlowLogConverter.cs" />
<Compile Include="S101\OutOfFrameByteReceivedEventArgs.cs" />
<Compile Include="S101\S101Logger.cs" />
<Compile Include="Model\Node.cs" />
<Compile Include="Model\OctetstringParameter.cs" />
Expand Down
15 changes: 10 additions & 5 deletions Lawo.EmberPlusSharp/S101/DeframingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

namespace Lawo.EmberPlusSharp.S101
{
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
Expand All @@ -19,6 +20,7 @@ namespace Lawo.EmberPlusSharp.S101
/// <threadsafety static="true" instance="false"/>
internal sealed class DeframingStream : BufferStream
{
private readonly Action<byte> outOfFrameByteReceived;
private State state;
private ushort crc = 0xFFFF;
private readonly Queue<byte> decodedQueue = new Queue<byte>();
Expand Down Expand Up @@ -55,8 +57,9 @@ public sealed override async Task<int> ReadAsync(
////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// <summary>Initializes a new instance of the <see cref="DeframingStream"/> class.</summary>
internal DeframingStream(ReadBuffer readBuffer) : base(readBuffer, null)
internal DeframingStream(ReadBuffer readBuffer, Action<byte> outOfFrameByteReceived) : base(readBuffer, null)
{
this.outOfFrameByteReceived = outOfFrameByteReceived;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -68,13 +71,15 @@ private bool ReadByte(ReadBuffer readBuffer, byte[] buffer, ref int index)
switch (this.state)
{
case State.BeforeFrame:
if (currentByte != Frame.BeginOfFrame)
if (currentByte == Frame.BeginOfFrame)
{
this.state = State.AfterFrame;
throw new S101Exception("Unexpected byte while looking for BOF.");
this.state = State.InFrame;
}
else
{
this.outOfFrameByteReceived(currentByte);
}

this.state = State.InFrame;
break;
case State.InFrame:
if (currentByte < Frame.InvalidStart)
Expand Down
19 changes: 4 additions & 15 deletions Lawo.EmberPlusSharp/S101/EventInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,11 @@ namespace Lawo.EmberPlusSharp.S101
[SuppressMessage("Microsoft.Performance", "CA1815:OverrideEqualsAndOperatorEqualsOnValueTypes", Justification = "There's no point in comparing instances of this type.")]
public struct EventInfo
{
private readonly DateTime? timeUtc;
private readonly int? number;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// <summary>Gets the time when the event was logged.</summary>
public DateTime? TimeUtc
{
get { return this.timeUtc; }
}
public DateTime? TimeUtc { get; }

/// <summary>Gets the number of the event.</summary>
public int? Number
{
get { return this.number; }
}
public int? Number { get; }

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand All @@ -39,8 +28,8 @@ internal EventInfo(DateTime? timeUtc) : this(timeUtc, null)

internal EventInfo(DateTime? timeUtc, int? number)
{
this.timeUtc = timeUtc;
this.number = number;
this.TimeUtc = timeUtc;
this.Number = number;
}
}
}
18 changes: 12 additions & 6 deletions Lawo.EmberPlusSharp/S101/MessageDecodingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

namespace Lawo.EmberPlusSharp.S101
{
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;

using Lawo.IO;
using IO;

/// <summary>Transparently decodes a single S101 message.</summary>
/// <remarks>
Expand All @@ -29,6 +30,7 @@ internal sealed class MessageDecodingStream : NonSeekableStream
private DeframingStream deframingStream;
private readonly ReadBuffer deframedBuffer;
private readonly byte[] discardBuffer;
private readonly Action<byte> outOfFrameByteReceived;
private S101Message message;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -70,9 +72,12 @@ public sealed override Task<int> ReadAsync(
////////////////////////////////////////////////////////////////////////////////////////////////////////////////

internal static async Task<MessageDecodingStream> CreateAsync(
ReadBuffer rawBuffer, byte[] discardBuffer, CancellationToken cancellationToken)
ReadBuffer rawBuffer,
byte[] discardBuffer,
Action<byte> outOfFrameByteReceived,
CancellationToken cancellationToken)
{
var result = new MessageDecodingStream(rawBuffer, discardBuffer);
var result = new MessageDecodingStream(rawBuffer, discardBuffer, outOfFrameByteReceived);
var newMessage = await S101Message.ReadFromAsync(result.deframedBuffer, cancellationToken);

if ((newMessage != null) && newMessage.CanHaveMultiplePackets &&
Expand All @@ -93,10 +98,10 @@ internal S101Message Message

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

private MessageDecodingStream(ReadBuffer rawBuffer, byte[] discardBuffer)
private MessageDecodingStream(ReadBuffer rawBuffer, byte[] discardBuffer, Action<byte> outOfFrameByteReceived)
{
this.rawBuffer = rawBuffer;
this.deframingStream = new DeframingStream(this.rawBuffer);
this.deframingStream = new DeframingStream(this.rawBuffer, outOfFrameByteReceived);

// This buffer is kept small in size, because a new one is allocated for each message.
// This has the effect that only the bytes of reads <= MessageHeaderMaxLength bytes are actually copied into
Expand All @@ -109,6 +114,7 @@ private MessageDecodingStream(ReadBuffer rawBuffer, byte[] discardBuffer)
this.deframedBuffer =
new ReadBuffer((ReadAsyncCallback)this.ReadDeframedAsync, Constants.MessageHeaderMaxLength);
this.discardBuffer = discardBuffer;
this.outOfFrameByteReceived = outOfFrameByteReceived;
}

private async Task<int> ReadFromCurrentPacketAsync(
Expand All @@ -121,7 +127,7 @@ private async Task<int> ReadFromCurrentPacketAsync(
this.message.CanHaveMultiplePackets && ((this.message.PacketFlags & PacketFlags.LastPacket) == 0))
{
this.deframingStream.Dispose();
this.deframingStream = new DeframingStream(this.rawBuffer);
this.deframingStream = new DeframingStream(this.rawBuffer, this.outOfFrameByteReceived);
this.ValidateMessage(await S101Message.ReadFromAsync(this.deframedBuffer, cancellationToken));
}

Expand Down
27 changes: 22 additions & 5 deletions Lawo.EmberPlusSharp/S101/MessageEncodingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ public sealed override async Task DisposeAsync(CancellationToken cancellationTok
{
if (this.message.CanHaveMultiplePackets)
{
await this.CreateFramingStreamAsync(
await this.DisposeAndCreateFramingStreamAsync(
PacketFlags.EmptyPacket | PacketFlags.LastPacket, cancellationToken);
}

await this.unframedBuffer.FlushAsync(cancellationToken);
await this.framingStream.DisposeAsync(cancellationToken);
await this.DisposeFramingStream(cancellationToken);
await this.rawBuffer.FlushAsync(cancellationToken);
await base.DisposeAsync(cancellationToken);
}
Expand All @@ -73,7 +72,7 @@ public sealed override async Task WriteAsync(
{
if (this.framingStream.TotalCount >= MaxFrameLength)
{
await this.CreateFramingStreamAsync(PacketFlags.None, cancellationToken);
await this.DisposeAndCreateFramingStreamAsync(PacketFlags.None, cancellationToken);
}

var countToWrite = Math.Min(count, MaxFrameLength - this.framingStream.TotalCount);
Expand All @@ -92,6 +91,14 @@ public sealed override async Task FlushAsync(CancellationToken cancellationToken

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

internal async Task WriteOutOfFrameByteAsync(byte value, CancellationToken cancellationToken)
{
await this.DisposeFramingStream(cancellationToken);
await this.rawBuffer.ReserveAsync(1, cancellationToken);
this.rawBuffer[this.rawBuffer.Count++] = value;
await this.CreateFramingStream(PacketFlags.None, cancellationToken);
}

internal static async Task<MessageEncodingStream> CreateAsync(
WriteBuffer rawBuffer, S101Message message, CancellationToken cancellationToken)
{
Expand All @@ -114,10 +121,20 @@ private MessageEncodingStream(WriteBuffer rawBuffer, FramingStream framingStream
this.message = message;
}

private async Task CreateFramingStreamAsync(PacketFlags packetFlags, CancellationToken cancellationToken)
private async Task DisposeAndCreateFramingStreamAsync(PacketFlags packetFlags, CancellationToken cancellationToken)
{
await DisposeFramingStream(cancellationToken);
await CreateFramingStream(packetFlags, cancellationToken);
}

private async Task DisposeFramingStream(CancellationToken cancellationToken)
{
await this.unframedBuffer.FlushAsync(cancellationToken);
await this.framingStream.DisposeAsync(cancellationToken);
}

private async Task CreateFramingStream(PacketFlags packetFlags, CancellationToken cancellationToken)
{
this.framingStream = await FramingStream.CreateAsync(this.rawBuffer, cancellationToken);
this.message.PacketFlags = packetFlags;
await this.message.WriteToAsync(this.unframedBuffer, cancellationToken);
Expand Down
33 changes: 33 additions & 0 deletions Lawo.EmberPlusSharp/S101/OutOfFrameByteReceivedEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// <copyright>Copyright 2012-2015 Lawo AG (http://www.lawo.com).</copyright>
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

namespace Lawo.EmberPlusSharp.S101
{
using System;

/// <summary>Provides the data for the <see cref="S101Reader.OutOfFrameByteReceived"/> and
/// <see cref="S101Client.OutOfFrameByteReceived"/> events.</summary>
/// <threadsafety static="true" instance="false"/>
public sealed class OutOfFrameByteReceivedEventArgs : EventArgs
{
private readonly byte value;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// <summary>Gets the message.</summary>
public byte Value
{
get { return this.value; }
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

internal OutOfFrameByteReceivedEventArgs(byte value)
{
this.value = value;
}
}
}
18 changes: 18 additions & 0 deletions Lawo.EmberPlusSharp/S101/S101Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ public Task SendMessageAsync(S101Message message, byte[] payload)
return this.SendMessageAsyncCore(message, payload);
}

/// <summary>Sends <paramref name="value"/> as an out-of-frame byte.</summary>
/// <param name="value">The byte to write.</param>
/// <exception cref="ArgumentException"><paramref name="value"/> equals <c>0xFE</c>.</exception>
public Task SendOutOfFrameByte(byte value)
{
return this.writer.WriteOutOfFrameByte(value, this.source.Token);
}

/// <summary>See <see cref="IDisposable.Dispose"/>.</summary>
/// <remarks>Cancels all communication currently in progress and calls <see cref="IDisposable.Dispose"/> on the
/// connection object passed to the constructor.</remarks>
Expand All @@ -238,6 +246,9 @@ public void Dispose()
/// command.</summary>
public event EventHandler<MessageReceivedEventArgs> EmberDataReceived;

/// <summary>Occurs when an out-of-frame byte has been received.</summary>
public event EventHandler<OutOfFrameByteReceivedEventArgs> OutOfFrameByteReceived;

/// <summary>Occurs when the connection to the provider has been lost.</summary>
/// <remarks>
/// <para>This event is raised in the following situations:
Expand Down Expand Up @@ -275,6 +286,7 @@ private async void ReadLoop(IDisposable connection, S101Reader reader)
this.source.Token.Register(() => disposed.SetResult(true));

await this.EnqueueLogOperation(() => this.logger.LogEvent("StartingReadLoop"));
reader.OutOfFrameByteReceived += this.OnOutOfFrameByteReceived;
Exception exception;
bool remote = false;

Expand Down Expand Up @@ -304,6 +316,7 @@ private async void ReadLoop(IDisposable connection, S101Reader reader)
}
finally
{
reader.OutOfFrameByteReceived -= this.OnOutOfFrameByteReceived;
this.DisposeCore(false);

if (connection != null)
Expand Down Expand Up @@ -437,6 +450,11 @@ await this.SendMessageAsyncCore(
}
}

private void OnOutOfFrameByteReceived(object sender, OutOfFrameByteReceivedEventArgs e)
{
this.OnEvent(this.OutOfFrameByteReceived, e);
}

private void OnEvent<TEventArgs>(EventHandler<TEventArgs> handler, TEventArgs args) where TEventArgs : EventArgs
{
if (handler != null)
Expand Down
18 changes: 16 additions & 2 deletions Lawo.EmberPlusSharp/S101/S101Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public sealed class S101Reader
////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// <summary>Initializes a new instance of the <see cref="S101Reader"/> class by calling
/// <see cref="S101Reader(ReadAsyncCallback, int)">S101Reader(<paramref name="readAsync"/>, 8192)</see>.</summary>
/// <see cref="S101Reader(ReadAsyncCallback, int)">S101Reader(<paramref name="readAsync"/>, 8192)</see>.
/// </summary>
[CLSCompliant(false)]
public S101Reader(ReadAsyncCallback readAsync) : this(readAsync, Constants.PhysicalStreamBufferSize)
{
Expand Down Expand Up @@ -143,6 +144,9 @@ public bool IsAnotherMessageAvailable
}
}

/// <summary>Occurs when an out-of-frame byte has been received.</summary>
public event EventHandler<OutOfFrameByteReceivedEventArgs> OutOfFrameByteReceived;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

private async Task DisposeCoreAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -171,10 +175,20 @@ private async Task<bool> ReadCoreAsync(CancellationToken cancellationToken)
}

this.stream = await MessageDecodingStream.CreateAsync(
this.readBuffer, this.discardBuffer, cancellationToken);
this.readBuffer, this.discardBuffer, this.OnOutOfFrameByteReceived, cancellationToken);
return this.stream.Message != null;
}

private void OnOutOfFrameByteReceived(byte value)
{
var handler = this.OutOfFrameByteReceived;

if (handler != null)
{
handler(this, new OutOfFrameByteReceivedEventArgs(value));
}
}

private void AssertNotDisposed()
{
if (this.disposed)
Expand Down
Loading

0 comments on commit 6a7fa34

Please sign in to comment.