Skip to content

Commit

Permalink
Merge pull request #351 from WildernessLabs/command-control
Browse files Browse the repository at this point in the history
This adds the implementation for the command and control feature of Meadow.Cloud. The purpose of this feature is to allow a user to publish command messages to their Meadow devices and have their devices respond accordingly.
  • Loading branch information
stevenkuhn authored Aug 7, 2023
2 parents 5682e26 + 836cc0b commit afff8fb
Show file tree
Hide file tree
Showing 6 changed files with 566 additions and 7 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ jobs:

- name: Build Meadow.Core
run: dotnet build -c Release Meadow.Core/source/Meadow.Core.sln

- name: Test Core.Unit.Tests
run: dotnet test -c Release --no-build Meadow.Core/source/Tests/Core.Unit.Tests/Core.Unit.Tests.csproj
2 changes: 1 addition & 1 deletion source/Meadow.Core/Configuration/MeadowUpdateSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ internal class MeadowUpdateSettings : IUpdateSettings
public string UpdateServer { get; set; } = "mqtt-01.meadowcloud.co";
public int UpdatePort { get; set; } = 1883;
public string Organization { get; set; } = "Default organization";
public string RootTopic { get; set; } = "{OID}/ota/{ID}";
public string RootTopic { get; set; } = "{OID}/ota/{ID};{OID}/commands/{ID}";
public int CloudConnectRetrySeconds { get; set; } = 15;
public bool UseAuthentication { get; set; } = true;
}
2 changes: 2 additions & 0 deletions source/Meadow.Core/MeadowOS.cs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ Type FindDeviceType(Type type)
var meadowCloudService = new MeadowCloudService(MeadowCloudSettings);
Resolver.Services.Add<IMeadowCloudService>(meadowCloudService);

Resolver.Services.Add<ICommandService>(updateService);

Resolver.Log.Info($"Update Service is {(UpdateSettings.Enabled ? "enabled" : "disabled")}.");
if (UpdateSettings.Enabled)
{
Expand Down
108 changes: 104 additions & 4 deletions source/Meadow.Core/Update/UpdateService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Meadow.Hardware;
using Meadow.Cloud;
using Meadow.Hardware;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
Expand All @@ -7,6 +8,8 @@
using MQTTnet.Client.Receiving;
using MQTTnet.Exceptions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
Expand All @@ -17,7 +20,6 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -28,7 +30,7 @@ namespace Meadow.Update;
/// <summary>
/// The default Meadow implementation of IUpdateService
/// </summary>
public class UpdateService : IUpdateService
public class UpdateService : IUpdateService, ICommandService
{
/// <summary>
/// Retry period the service will use to attempt network reconnection
Expand Down Expand Up @@ -127,6 +129,13 @@ private void Initialize()
{
Resolver.Log.Debug("MQTT message received");
if (f.ApplicationMessage.Topic.EndsWith($"/commands/{Resolver.Device.Information.UniqueID}", StringComparison.OrdinalIgnoreCase))
{
Resolver.Log.Trace("Meadow command received");
ProcessPublishedCommand(f.ApplicationMessage);
return Task.CompletedTask;
}
var json = Encoding.UTF8.GetString(f.ApplicationMessage.Payload);
var opts = new JsonSerializerOptions
Expand Down Expand Up @@ -237,7 +246,9 @@ private async void UpdateStateMachine()
{
Resolver.Log.Debug("Creating MQTT client options");
var builder = new MqttClientOptionsBuilder()
.WithTcpServer(Config.UpdateServer, Config.UpdatePort);
.WithTcpServer(Config.UpdateServer, Config.UpdatePort)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithCommunicationTimeout(TimeSpan.FromSeconds(30));

if (Config.UseAuthentication)
{
Expand Down Expand Up @@ -651,4 +662,93 @@ public void ApplyUpdate(UpdateInfo updateInfo)

State = UpdateState.Idle;
}

private readonly ConcurrentDictionary<string, (Type CommandType, Action<object> Action)> CommandSubscriptions = new();
private const string UntypedCommandTypeName = "<<<MEADOWCOMMAND>>>";

void ICommandService.Subscribe(Action<MeadowCommand> action)
{
CommandSubscriptions[UntypedCommandTypeName] = (CommandType: typeof(MeadowCommand), Action: x => action((MeadowCommand)x));
}

void ICommandService.Subscribe<T>(Action<T> action)
{
var commandTypeName = typeof(T).Name;
CommandSubscriptions[commandTypeName.ToUpperInvariant()] = (CommandType: typeof(T), Action: x => action((T)x));
}

void ICommandService.Unsubscribe()
{
CommandSubscriptions.TryRemove(UntypedCommandTypeName, out _);
}

void ICommandService.Unsubscribe<T>()
{
var commandTypeName = typeof(T).Name;
CommandSubscriptions.TryRemove(commandTypeName.ToUpperInvariant(), out _);
}

internal void ProcessPublishedCommand(MqttApplicationMessage message)
{
if (message.UserProperties == null)
{
Resolver.Log.Error("Unable to process published command without a command name.");
return;
}

var properties = message.UserProperties.ToDictionary(x => x.Name.ToUpperInvariant(), x => x.Value);

if (!properties.TryGetValue("COMMANDNAME", out string commandName) ||
string.IsNullOrWhiteSpace(commandName))
{
Resolver.Log.Error("Unable to process published command without a command name.");
return;
}

var jsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);

// First attempt to run the untyped command subscription, Action<MeadowCommand>, if available.
if (CommandSubscriptions.TryGetValue(UntypedCommandTypeName, out (Type CommandType, Action<object> Action) value))
{
Resolver.Log.Trace($"Processing generic Meadow command with command name '{commandName}'...");

IReadOnlyDictionary<string, object>? arguments;
try
{
arguments = message.Payload != null
? JsonSerializer.Deserialize<IReadOnlyDictionary<string, object>>(message.Payload, jsonSerializerOptions)
: null;
}
catch (JsonException ex)
{
Resolver.Log.Error($"Unable to deserialize command arguments: {ex.Message}");
return;
}

var command = new MeadowCommand(commandName, arguments);
value.Action(command);
}

// Then attempt to run the typed command subscription, Action<T> where T : ICommand, new(),
// if available. Also prevent user from running the untyped command subscription.
if (CommandSubscriptions.TryGetValue(commandName.ToUpperInvariant(), out value))
{
Resolver.Log.Trace($"Processing Meadow command of type '{value.CommandType.Name}'...");

object command;
try
{
command = message.Payload != null
? JsonSerializer.Deserialize(message.Payload, value.CommandType, jsonSerializerOptions) ?? Activator.CreateInstance(value.CommandType)
: Activator.CreateInstance(value.CommandType);
}
catch (JsonException ex)
{
Resolver.Log.Error($"Unable to deserialize command arguments: {ex.Message}");
return;
}

value.Action(command);
}
}
}
4 changes: 2 additions & 2 deletions source/Tests/Core.Unit.Tests/Core.Unit.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net7.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down
Loading

0 comments on commit afff8fb

Please sign in to comment.