Skip to content

Commit

Permalink
Finalize streaming implementation
Browse files Browse the repository at this point in the history
- Extract StreamedParameterCollection
- Implement StreamCollection reading
- Adapt tests

References #8
  • Loading branch information
andreashuber-lawo committed Jan 7, 2016
1 parent 72aec27 commit 09fb6d2
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 44 deletions.
5 changes: 4 additions & 1 deletion Lawo.EmberPlusSharp/Glow/GlowLogInterpreter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public sealed class GlowLogInterpreter
private readonly Dictionary<int, IInvocationResult> pendingInvocations =
new Dictionary<int, IInvocationResult>();

private readonly StreamedParameterCollection streamedParameters = new StreamedParameterCollection();
private readonly S101LogReader reader;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -69,8 +70,10 @@ public void ApplyPayload()
{
using (var stream = new MemoryStream(payload))
using (var emberReader = new EmberReader(stream))
using (var dummyWriter = new EmberWriter(Stream.Null))
{
this.root.Read(emberReader, this.pendingInvocations);
this.root.Read(emberReader, this.pendingInvocations, this.streamedParameters);
this.root.WriteRequest(dummyWriter, this.streamedParameters);
this.root.SetComplete();
this.root.UpdateRequestState(true);
}
Expand Down
1 change: 1 addition & 0 deletions Lawo.EmberPlusSharp/Lawo.EmberPlusSharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
<Compile Include="Model\Result1.cs" />
<Compile Include="Model\Result.cs" />
<Compile Include="Model\ResultBase.cs" />
<Compile Include="Model\StreamedParameterCollection.cs" />
<Compile Include="Model\ValueParameter.cs" />
<Compile Include="Model\ReferenceParameter.cs" />
<Compile Include="Model\NullableParameter.cs" />
Expand Down
24 changes: 4 additions & 20 deletions Lawo.EmberPlusSharp/Model/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Lawo.EmberPlusSharp.Model
/// <typeparam name="TRoot">The type of the root of the object tree that will mirror the state of the tree published
/// by the provider.</typeparam>
/// <threadsafety static="true" instance="false"/>
public sealed class Consumer<TRoot> : IMonitoredConnection, IInvocationCollection, IStreamedParameterCollection
public sealed class Consumer<TRoot> : IMonitoredConnection, IInvocationCollection
where TRoot : Root<TRoot>
{
private static readonly EmberData EmberDataCommand = new EmberData(0x01, 0x0A, 0x02);
Expand All @@ -35,9 +35,7 @@ public sealed class Consumer<TRoot> : IMonitoredConnection, IInvocationCollectio
private readonly Dictionary<int, IInvocationResult> pendingInvocations =
new Dictionary<int, IInvocationResult>();

private readonly Dictionary<int, HashSet<IStreamedParameter>> streamedParameters =
new Dictionary<int, HashSet<IStreamedParameter>>();

private readonly StreamedParameterCollection streamedParameters = new StreamedParameterCollection();
private readonly S101Client client;
private readonly int queryChildrenTimeout;
private readonly S101Message emberDataMessage;
Expand Down Expand Up @@ -205,20 +203,6 @@ int IInvocationCollection.Add(IInvocationResult invocationResult)
return this.lastInvocationId;
}

void IStreamedParameterCollection.Add(IStreamedParameter parameter)
{
var streamIdentifier = (int)parameter.StreamIdentifier;
HashSet<IStreamedParameter> group;

if (!this.streamedParameters.TryGetValue(streamIdentifier, out group))
{
group = new HashSet<IStreamedParameter>();
this.streamedParameters.Add(streamIdentifier, group);
}

group.Add(parameter);
}

private Consumer(S101Client client, int timeout, byte slot)
{
this.client = client;
Expand Down Expand Up @@ -431,7 +415,7 @@ private void ApplyChange(MessageReceivedEventArgs args)
using (var stream = new MemoryStream(payload))
using (var reader = new EmberReader(stream))
{
this.root.Read(reader, this.pendingInvocations);
this.root.Read(reader, this.pendingInvocations, this.streamedParameters);
}
}

Expand All @@ -441,7 +425,7 @@ private bool WriteRequest(out MemoryStream stream)
using (stream = new MemoryStream())
using (var writer = new EmberWriter(stream))
{
return this.root.WriteRequest(writer, this);
return this.root.WriteRequest(writer, this.streamedParameters);
}
}

Expand Down
6 changes: 6 additions & 0 deletions Lawo.EmberPlusSharp/Model/EnumParameter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ internal sealed override void WriteValue(EmberWriter writer, TEnum? value)
writer.WriteValue(GlowParameterContents.Value.OuterId, FastEnum.ToInt64(value.Value));
}

internal sealed override TEnum? AssertValueType(object value)
{
var integer = value as long?;
return integer.HasValue ? FastEnum.ToEnum<TEnum>(integer.Value) : base.AssertValueType(value);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

private EnumParameter()
Expand Down
204 changes: 195 additions & 9 deletions Lawo.EmberPlusSharp/Model/Root.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Lawo.EmberPlusSharp.Model
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Text;

using Ember;
Expand Down Expand Up @@ -59,7 +60,10 @@ void IParent.AppendPath(StringBuilder builder)

internal event EventHandler<EventArgs> HasChangesSet;

internal void Read(EmberReader reader, IDictionary<int, IInvocationResult> pendingInvocations)
internal void Read(
EmberReader reader,
IDictionary<int, IInvocationResult> pendingInvocations,
IReadOnlyDictionary<int, IEnumerable<IStreamedParameter>> streamedParameters)
{
reader.ReadAndAssertOuter(GlowGlobal.Root.OuterId);

Expand All @@ -71,6 +75,9 @@ internal void Read(EmberReader reader, IDictionary<int, IInvocationResult> pendi
case GlowInvocationResult.InnerNumber:
ReadInvocationResult(reader, pendingInvocations);
break;
case GlowStreamCollection.InnerNumber:
ReadStreamCollection(reader, streamedParameters);
break;
default:
reader.Skip();
break;
Expand Down Expand Up @@ -159,7 +166,21 @@ protected Root()

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

private static void ReadInvocationResult(EmberReader reader, IDictionary<int, IInvocationResult> pendingInvocations)
private void ReadQualifiedChild(EmberReader reader, ElementType actualType)
{
reader.ReadAndAssertOuter(GlowQualifiedNode.Path.OuterId);
var path = reader.AssertAndReadContentsAsInt32Array();

if (path.Length == 0)
{
throw new ModelException("Invalid path for a qualified element.");
}

this.ReadQualifiedChild(reader, actualType, path, 0);
}

private static void ReadInvocationResult(
EmberReader reader, IDictionary<int, IInvocationResult> pendingInvocations)
{
int invocationId = 0;
bool success = true;
Expand All @@ -169,7 +190,7 @@ private static void ReadInvocationResult(EmberReader reader, IDictionary<int, II
switch (reader.GetContextSpecificOuterNumber())
{
case GlowInvocationResult.InvocationId.OuterNumber:
invocationId = (int)reader.AssertAndReadContentsAsInt32();
invocationId = reader.AssertAndReadContentsAsInt32();
break;
case GlowInvocationResult.Success.OuterNumber:
success = reader.ReadContentsAsBoolean();
Expand All @@ -195,17 +216,182 @@ private static void ReadInvocationResult(EmberReader reader, IDictionary<int, II
}
}

private void ReadQualifiedChild(EmberReader reader, ElementType actualType)
private static void ReadStreamCollection(
EmberReader reader, IReadOnlyDictionary<int, IEnumerable<IStreamedParameter>> streamedParameters)
{
reader.ReadAndAssertOuter(GlowQualifiedNode.Path.OuterId);
var path = reader.AssertAndReadContentsAsInt32Array();
while (reader.Read() && (reader.InnerNumber != InnerNumber.EndContainer))
{
switch (reader.GetContextSpecificOuterNumber())
{
case GlowStreamCollection.StreamEntry.OuterNumber:
ReadStreamEntry(reader, streamedParameters);
break;
default:
reader.Skip();
break;
}
}
}

if (path.Length == 0)
private static void ReadStreamEntry(
EmberReader reader, IReadOnlyDictionary<int, IEnumerable<IStreamedParameter>> streamedParameters)
{
reader.AssertInnerNumber(GlowStreamEntry.InnerNumber);
int? identifier = null;
object rawValue = null;

while (reader.Read() && (reader.InnerNumber != InnerNumber.EndContainer))
{
throw new ModelException("Invalid path for a qualified element.");
switch (reader.GetContextSpecificOuterNumber())
{
case GlowStreamEntry.StreamIdentifier.OuterNumber:
identifier = reader.AssertAndReadContentsAsInt32();
break;
case GlowStreamEntry.StreamValue.OuterNumber:
rawValue = reader.ReadContentsAsObject();
break;
default:
reader.Skip();
break;
}
}

this.ReadQualifiedChild(reader, actualType, path, 0);
IEnumerable<IStreamedParameter> group;

if (identifier.HasValue && streamedParameters.TryGetValue(identifier.Value, out group) &&
(rawValue != null))
{
foreach (var parameter in group)
{
var value = ExtractValue(parameter, rawValue);

try
{
parameter.Value = value;
}
catch (ArgumentException ex)
{
throw CreateTypeMismatchException(value, parameter.Type, ex);
}
}
}
}

private static object ExtractValue(IStreamedParameter parameter, object rawValue)
{
if (parameter.StreamDescriptor.HasValue)
{
var rawArray = rawValue as byte[];

if (rawArray == null)
{
throw CreateTypeMismatchException(rawValue, ParameterType.Octets, null);
}
else
{
return BitConvert(parameter.StreamDescriptor.Value, rawArray);
}
}
else
{
return rawValue;
}
}

private static ModelException CreateTypeMismatchException(object value, ParameterType type, Exception ex)
{
const string Format = "Read parameter value {0} while expecting to read a value of type {1}.";
return new ModelException(string.Format(CultureInfo.InvariantCulture, Format, value, type), ex);
}

private static object BitConvert(StreamDescription descriptor, byte[] rawArray)
{
int offset;
var array = GetArray(descriptor, rawArray, out offset);

try
{
switch (descriptor.Format)
{
case StreamFormat.Byte:
return (long)array[offset];
case StreamFormat.UInt16BigEndian:
case StreamFormat.UInt16LittleEndian:
return (long)BitConverter.ToUInt16(array, offset);
case StreamFormat.UInt32BigEndian:
case StreamFormat.UInt32LittleEndian:
return (long)BitConverter.ToUInt32(array, offset);
case StreamFormat.UInt64BigEndian:
case StreamFormat.UInt64LittleEndian:
return (long)BitConverter.ToUInt64(array, offset);
case StreamFormat.SByte:
return (long)unchecked((sbyte)array[offset]);
case StreamFormat.Int16BigEndian:
case StreamFormat.Int16LittleEndian:
return (long)BitConverter.ToInt16(array, offset);
case StreamFormat.Int32BigEndian:
case StreamFormat.Int32LittleEndian:
return (long)BitConverter.ToInt32(array, offset);
case StreamFormat.Int64BigEndian:
case StreamFormat.Int64LittleEndian:
return BitConverter.ToInt64(array, offset);
case StreamFormat.Float32BigEndian:
case StreamFormat.Float32LittleEndian:
return (double)BitConverter.ToSingle(array, offset);
case StreamFormat.Float64BigEndian:
case StreamFormat.Float64LittleEndian:
return BitConverter.ToDouble(array, offset);
default:
const string Format = "Unexpected stream format: {0}.";
throw new ModelException(
string.Format(CultureInfo.InvariantCulture, Format, descriptor.Format));
}
}
catch (ArgumentException ex)
{
throw CreateOffsetException(descriptor, ex);
}
catch (IndexOutOfRangeException ex)
{
throw CreateOffsetException(descriptor, ex);
}
}

private static byte[] GetArray(StreamDescription descriptor, byte[] rawArray, out int offset)
{
offset = Math.Max(0, Math.Min(descriptor.Offset, rawArray.Length));

if (AreBytesReverse(descriptor.Format))
{
var count = Math.Min(1 << (int)descriptor.Format & 6, rawArray.Length - offset);
var result = new byte[count];
Array.Copy(rawArray, offset, result, 0, count);
Array.Reverse(result);
offset = 0;
return result;
}
else
{
return rawArray;
}
}

private static bool AreBytesReverse(StreamFormat format)
{
switch (format)
{
case StreamFormat.Byte:
case StreamFormat.SByte:
return false;
default:
return (((int)format & 1) == 1) != BitConverter.IsLittleEndian;
}
}

private static ModelException CreateOffsetException(StreamDescription descriptor, Exception ex)
{
const string Format = "Offset {0} is out of range.";
return new ModelException(string.Format(CultureInfo.InvariantCulture, Format, descriptor.Offset), ex);
}
}
}
28 changes: 28 additions & 0 deletions Lawo.EmberPlusSharp/Model/StreamedParameterCollection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// <copyright>Copyright 2012-2015 Lawo AG (http://www.lawo.com).</copyright>
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

namespace Lawo.EmberPlusSharp.Model
{
using System.Collections.Generic;

internal sealed class StreamedParameterCollection :
Dictionary<int, IEnumerable<IStreamedParameter>>, IStreamedParameterCollection
{
void IStreamedParameterCollection.Add(IStreamedParameter parameter)
{
var streamIdentifier = (int)parameter.StreamIdentifier;
IEnumerable<IStreamedParameter> group;

if (!this.TryGetValue(streamIdentifier, out group))
{
group = new HashSet<IStreamedParameter>();
this.Add(streamIdentifier, group);
}

((HashSet<IStreamedParameter>)group).Add(parameter);
}
}
}
Loading

0 comments on commit 09fb6d2

Please sign in to comment.