Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for subscribing to Meadow.Cloud commands #351

Merged
merged 8 commits into from
Aug 7, 2023
3 changes: 3 additions & 0 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ jobs:

- name: Build Meadow.Core
run: dotnet build -c Release Meadow.Core/source/Meadow.Core.sln

- name: Test Core.Unit.Tests
run: dotnet test -c Release --no-build Meadow.Core/source/Tests/Core.Unit.Tests/Core.Unit.Tests.csproj
2 changes: 1 addition & 1 deletion source/Meadow.Core/Configuration/MeadowUpdateSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ internal class MeadowUpdateSettings : IUpdateSettings
public string UpdateServer { get; set; } = "mqtt-01.meadowcloud.co";
public int UpdatePort { get; set; } = 1883;
public string Organization { get; set; } = "Default organization";
public string RootTopic { get; set; } = "{OID}/ota/{ID}";
public string RootTopic { get; set; } = "{OID}/ota/{ID};{OID}/commands/{ID}";
public int CloudConnectRetrySeconds { get; set; } = 15;
public bool UseAuthentication { get; set; } = true;
}
2 changes: 2 additions & 0 deletions source/Meadow.Core/MeadowOS.cs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ Type FindDeviceType(Type type)
var meadowCloudService = new MeadowCloudService(MeadowCloudSettings);
Resolver.Services.Add<IMeadowCloudService>(meadowCloudService);

Resolver.Services.Add<ICommandService>(updateService);

Resolver.Log.Info($"Update Service is {(UpdateSettings.Enabled ? "enabled" : "disabled")}.");
if (UpdateSettings.Enabled)
{
Expand Down
108 changes: 104 additions & 4 deletions source/Meadow.Core/Update/UpdateService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Meadow.Hardware;
using Meadow.Cloud;
using Meadow.Hardware;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
Expand All @@ -7,6 +8,8 @@
using MQTTnet.Client.Receiving;
using MQTTnet.Exceptions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
Expand All @@ -17,7 +20,6 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -28,7 +30,7 @@ namespace Meadow.Update;
/// <summary>
/// The default Meadow implementation of IUpdateService
/// </summary>
public class UpdateService : IUpdateService
public class UpdateService : IUpdateService, ICommandService
{
/// <summary>
/// Retry period the service will use to attempt network reconnection
Expand Down Expand Up @@ -127,6 +129,13 @@ private void Initialize()
{
Resolver.Log.Debug("MQTT message received");

if (f.ApplicationMessage.Topic.EndsWith($"/commands/{Resolver.Device.Information.UniqueID}", StringComparison.OrdinalIgnoreCase))
{
Resolver.Log.Trace("Meadow command received");
ProcessPublishedCommand(f.ApplicationMessage);
return Task.CompletedTask;
}

var json = Encoding.UTF8.GetString(f.ApplicationMessage.Payload);

var opts = new JsonSerializerOptions
Expand Down Expand Up @@ -237,7 +246,9 @@ private async void UpdateStateMachine()
{
Resolver.Log.Debug("Creating MQTT client options");
var builder = new MqttClientOptionsBuilder()
.WithTcpServer(Config.UpdateServer, Config.UpdatePort);
.WithTcpServer(Config.UpdateServer, Config.UpdatePort)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithCommunicationTimeout(TimeSpan.FromSeconds(30));

if (Config.UseAuthentication)
{
Expand Down Expand Up @@ -651,4 +662,93 @@ public void ApplyUpdate(UpdateInfo updateInfo)

State = UpdateState.Idle;
}

private readonly ConcurrentDictionary<string, (Type CommandType, Action<object> Action)> CommandSubscriptions = new();
private const string UntypedCommandTypeName = "<<<MEADOWCOMMAND>>>";

void ICommandService.Subscribe(Action<MeadowCommand> action)
{
CommandSubscriptions[UntypedCommandTypeName] = (CommandType: typeof(MeadowCommand), Action: x => action((MeadowCommand)x));
}

void ICommandService.Subscribe<T>(Action<T> action)
{
stevenkuhn marked this conversation as resolved.
Show resolved Hide resolved
var commandTypeName = typeof(T).Name;
CommandSubscriptions[commandTypeName.ToUpperInvariant()] = (CommandType: typeof(T), Action: x => action((T)x));
}

void ICommandService.Unsubscribe()
{
CommandSubscriptions.TryRemove(UntypedCommandTypeName, out _);
}

void ICommandService.Unsubscribe<T>()
{
var commandTypeName = typeof(T).Name;
CommandSubscriptions.TryRemove(commandTypeName.ToUpperInvariant(), out _);
}

internal void ProcessPublishedCommand(MqttApplicationMessage message)
{
if (message.UserProperties == null)
{
Resolver.Log.Error("Unable to process published command without a command name.");
return;
}

var properties = message.UserProperties.ToDictionary(x => x.Name.ToUpperInvariant(), x => x.Value);

if (!properties.TryGetValue("COMMANDNAME", out string commandName) ||
string.IsNullOrWhiteSpace(commandName))
{
Resolver.Log.Error("Unable to process published command without a command name.");
return;
}

var jsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);

// First attempt to run the untyped command subscription, Action<MeadowCommand>, if available.
if (CommandSubscriptions.TryGetValue(UntypedCommandTypeName, out (Type CommandType, Action<object> Action) value))
{
Resolver.Log.Trace($"Processing generic Meadow command with command name '{commandName}'...");

IReadOnlyDictionary<string, object>? arguments;
try
{
arguments = message.Payload != null
? JsonSerializer.Deserialize<IReadOnlyDictionary<string, object>>(message.Payload, jsonSerializerOptions)
: null;
}
catch (JsonException ex)
{
Resolver.Log.Error($"Unable to deserialize command arguments: {ex.Message}");
return;
}

var command = new MeadowCommand(commandName, arguments);
value.Action(command);
}

// Then attempt to run the typed command subscription, Action<T> where T : ICommand, new(),
// if available. Also prevent user from running the untyped command subscription.
if (CommandSubscriptions.TryGetValue(commandName.ToUpperInvariant(), out value))
{
Resolver.Log.Trace($"Processing Meadow command of type '{value.CommandType.Name}'...");

object command;
try
{
command = message.Payload != null
? JsonSerializer.Deserialize(message.Payload, value.CommandType, jsonSerializerOptions) ?? Activator.CreateInstance(value.CommandType)
: Activator.CreateInstance(value.CommandType);
}
catch (JsonException ex)
{
Resolver.Log.Error($"Unable to deserialize command arguments: {ex.Message}");
return;
}

value.Action(command);
}
}
}
4 changes: 2 additions & 2 deletions source/Tests/Core.Unit.Tests/Core.Unit.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net7.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down
Loading
Loading