From d99f8ff085799092fb665466ae7ed7beeffceea3 Mon Sep 17 00:00:00 2001 From: Varun Puranik Date: Thu, 17 Jan 2019 10:37:34 -0800 Subject: [PATCH] EdgeHub: Twin manager (#660) * Twin Manager2 changes * TwinManager2 refactor * Reported properties Sync to cloud changes * Add ctors * ReportedProperties sync changes * Cleanup * Make Twin an option as well * Add events * Fix reported properties handling * Fix logs * Cleanup * Fix twin store entity * Fix reported properties validator * Remove commented code * More tests * Split out types for easier tests * Cleanup CloudSync * More tests * TwinStore tests * More tests * Fix merge * More tests * More checks * More tests * More tests * Fix config and add test * Cleanup * Fix config * Fix reported property updates * Fix build * fix test --- .../AssemblyInfo.cs | 1 + .../ConnectionManager.cs | 5 + .../EdgeHubConnection.cs | 8 +- .../IConnectionManager.cs | 2 + .../TwinInfo.cs | 4 +- .../TwinManager.cs | 10 +- .../twin/CloudSync.cs | 121 ++++ .../twin/ICloudSync.cs | 14 + .../twin/IReportedPropertiesStore.cs | 15 + .../twin/ITwinStore.cs | 18 + .../twin/PassThroughTwinManager.cs | 46 ++ .../twin/ReportedPropertiesStore.cs | 192 ++++++ .../twin/ReportedPropertiesValidator.cs | 117 ++++ .../twin/StoringTwinManager.cs | 323 +++++++++ .../twin/TwinStore.cs | 167 +++++ .../twin/TwinStoreEntity.cs | 47 ++ .../DependencyManager.cs | 18 +- .../modules/RoutingModule.cs | 56 +- .../EdgeHubConnectionTest.cs | 8 +- .../TwinManagerTest.cs | 46 +- .../twin/ReportedPropertiesStoreTest.cs | 116 ++++ .../twin/ReportedPropertiesValidatorTest.cs | 128 ++++ .../twin/StoringTwinManagerTest.cs | 624 ++++++++++++++++++ .../twin/TwinStoreEntityTest.cs | 120 ++++ .../twin/TwinStoreTest.cs | 459 +++++++++++++ .../DependencyManager.cs | 5 +- 26 files changed, 2622 insertions(+), 48 deletions(-) create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/CloudSync.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ICloudSync.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/IReportedPropertiesStore.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ITwinStore.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/PassThroughTwinManager.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ReportedPropertiesStore.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ReportedPropertiesValidator.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/StoringTwinManager.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/TwinStore.cs create mode 100644 edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/TwinStoreEntity.cs create mode 100644 edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/ReportedPropertiesStoreTest.cs create mode 100644 edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/ReportedPropertiesValidatorTest.cs create mode 100644 edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/StoringTwinManagerTest.cs create mode 100644 edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/TwinStoreEntityTest.cs create mode 100644 edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/TwinStoreTest.cs diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/AssemblyInfo.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/AssemblyInfo.cs index 5ac2d7eff4f..2cb1f660a26 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/AssemblyInfo.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/AssemblyInfo.cs @@ -4,3 +4,4 @@ [assembly: InternalsVisibleTo("Microsoft.Azure.Devices.Edge.Hub.Core.Test")] [assembly: InternalsVisibleTo("Microsoft.Azure.Devices.Edge.Hub.Mqtt.Test")] [assembly: InternalsVisibleTo("Microsoft.Azure.Devices.Edge.Hub.E2E.Test")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/ConnectionManager.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/ConnectionManager.cs index 182d669e1fa..eb02d06e357 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/ConnectionManager.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/ConnectionManager.cs @@ -121,6 +121,11 @@ public Option> GetSubscriptions(st .Map(d => new ReadOnlyDictionary(d.Subscriptions) as IReadOnlyDictionary) : Option.None>(); + public bool CheckClientSubscription(string id, DeviceSubscription subscription) => + this.GetSubscriptions(id) + .Filter(s => s.TryGetValue(subscription, out bool isActive) && isActive) + .HasValue; + public async Task> CreateCloudConnectionAsync(IClientCredentials credentials) { Preconditions.CheckNotNull(credentials, nameof(credentials)); diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/EdgeHubConnection.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/EdgeHubConnection.cs index afe3415a3f7..a50886271c5 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/EdgeHubConnection.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/EdgeHubConnection.cs @@ -27,7 +27,7 @@ public class EdgeHubConnection : IConfigSource readonly IIdentity edgeHubIdentity; readonly ITwinManager twinManager; readonly IMessageConverter twinCollectionMessageConverter; - readonly IMessageConverter twinMessageConverter; + readonly IMessageConverter twinMessageConverter; readonly VersionInfo versionInfo; readonly RouteFactory routeFactory; readonly AsyncLock edgeHubConfigLock = new AsyncLock(); @@ -40,7 +40,7 @@ internal EdgeHubConnection( ITwinManager twinManager, RouteFactory routeFactory, IMessageConverter twinCollectionMessageConverter, - IMessageConverter twinMessageConverter, + IMessageConverter twinMessageConverter, VersionInfo versionInfo, IDeviceScopeIdentitiesCache deviceScopeIdentitiesCache) { @@ -60,7 +60,7 @@ public static async Task Create( IConnectionManager connectionManager, RouteFactory routeFactory, IMessageConverter twinCollectionMessageConverter, - IMessageConverter twinMessageConverter, + IMessageConverter twinMessageConverter, VersionInfo versionInfo, IDeviceScopeIdentitiesCache deviceScopeIdentitiesCache) { @@ -198,7 +198,7 @@ async Task> GetConfigInternal() try { IMessage message = await this.twinManager.GetTwinAsync(this.edgeHubIdentity.Id); - Twin twin = this.twinMessageConverter.FromMessage(message); + Shared.Twin twin = this.twinMessageConverter.FromMessage(message); this.lastDesiredProperties = Option.Some(twin.Properties.Desired); try { diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/IConnectionManager.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/IConnectionManager.cs index 7075890ee6d..69296fd1e51 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/IConnectionManager.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/IConnectionManager.cs @@ -45,6 +45,8 @@ public interface IConnectionManager Option> GetSubscriptions(string id); + bool CheckClientSubscription(string id, DeviceSubscription subscription); + IEnumerable GetConnectedClients(); } } diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/TwinInfo.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/TwinInfo.cs index e51270dc8b8..989d3ba88e0 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/TwinInfo.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/TwinInfo.cs @@ -7,13 +7,13 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core public class TwinInfo { [JsonConstructor] - public TwinInfo(Twin twin, TwinCollection reportedPropertiesPatch) + public TwinInfo(Shared.Twin twin, TwinCollection reportedPropertiesPatch) { this.Twin = twin; this.ReportedPropertiesPatch = reportedPropertiesPatch; } - public Twin Twin { get; } + public Shared.Twin Twin { get; } public TwinCollection ReportedPropertiesPatch { get; } } diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/TwinManager.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/TwinManager.cs index a455328fdb1..173fd90f04d 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/TwinManager.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/TwinManager.cs @@ -25,13 +25,13 @@ public class TwinManager : ITwinManager const long TwinPropertyMinSafeValue = -4503599627370496; // -2^52. taken from IoTHub const int TwinPropertyDocMaxLength = 8 * 1024; // 8K bytes. taken from IoTHub readonly IMessageConverter twinCollectionConverter; - readonly IMessageConverter twinConverter; + readonly IMessageConverter twinConverter; readonly IConnectionManager connectionManager; readonly AsyncLock reportedPropertiesLock; readonly AsyncLock twinLock; readonly ActionBlock actionBlock; - public TwinManager(IConnectionManager connectionManager, IMessageConverter twinCollectionConverter, IMessageConverter twinConverter, Option> twinStore) + public TwinManager(IConnectionManager connectionManager, IMessageConverter twinCollectionConverter, IMessageConverter twinConverter, Option> twinStore) { this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager)); this.twinCollectionConverter = Preconditions.CheckNotNull(twinCollectionConverter, nameof(twinCollectionConverter)); @@ -55,7 +55,7 @@ public static ITwinManager CreateTwinManager( var twinManager = new TwinManager( connectionManager, messageConverterProvider.Get(), - messageConverterProvider.Get(), + messageConverterProvider.Get(), storeProvider.Match( s => Option.Some(s.GetEntityStore(Constants.TwinStorePartitionKey)), () => Option.None>())); @@ -131,7 +131,7 @@ internal async Task GetTwinInfoWhenCloudOnlineAsync(string id, ICloudP using (await this.twinLock.LockAsync()) { IMessage twinMessage = await cp.GetTwinAsync(); - Twin cloudTwin = this.twinConverter.FromMessage(twinMessage); + Shared.Twin cloudTwin = this.twinConverter.FromMessage(twinMessage); Events.GotTwinFromCloudSuccess(id, cloudTwin.Properties.Desired.Version, cloudTwin.Properties.Reported.Version); var newTwin = new TwinInfo(cloudTwin, null); cached = newTwin; @@ -461,7 +461,7 @@ await twinStore.Update( Desired = new TwinCollection(), Reported = reported }; - var twin = new Twin(twinProperties); + var twin = new Shared.Twin(twinProperties); Events.UpdatedCachedReportedProperties(id, reported.Version, cloudVerified); return new TwinInfo(twin, reported); } diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/CloudSync.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/CloudSync.cs new file mode 100644 index 00000000000..25b9b1f5781 --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/CloudSync.cs @@ -0,0 +1,121 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System; + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Shared; + using Microsoft.Extensions.Logging; + + class CloudSync : ICloudSync + { + readonly IConnectionManager connectionManager; + readonly IMessageConverter twinCollectionConverter; + readonly IMessageConverter twinConverter; + + public CloudSync( + IConnectionManager connectionManager, + IMessageConverter twinCollectionConverter, + IMessageConverter twinConverter) + { + this.connectionManager = connectionManager; + this.twinCollectionConverter = twinCollectionConverter; + this.twinConverter = twinConverter; + } + + public async Task> GetTwin(string id) + { + try + { + Events.GettingTwin(id); + Option cloudProxy = await this.connectionManager.GetCloudConnection(id); + Option twin = await cloudProxy.Map( + async cp => + { + IMessage twinMessage = await cp.GetTwinAsync(); + Twin twinValue = this.twinConverter.FromMessage(twinMessage); + Events.GetTwinSucceeded(id); + return Option.Some(twinValue); + }) + .GetOrElse(() => Task.FromResult(Option.None())); + return twin; + } + catch (Exception ex) + { + Events.ErrorGettingTwin(id, ex); + return Option.None(); + } + } + + public async Task UpdateReportedProperties(string id, TwinCollection patch) + { + try + { + Events.UpdatingReportedProperties(id); + Option cloudProxy = await this.connectionManager.GetCloudConnection(id); + bool result = await cloudProxy.Map( + async cp => + { + IMessage patchMessage = this.twinCollectionConverter.ToMessage(patch); + await cp.UpdateReportedPropertiesAsync(patchMessage); + Events.UpdatedReportedProperties(id); + return true; + }) + .GetOrElse(() => Task.FromResult(false)); + return result; + } + catch (Exception ex) + { + Events.ErrorUpdatingReportedProperties(id, ex); + return false; + } + } + + static class Events + { + const int IdStart = HubCoreEventIds.TwinManager; + static readonly ILogger Log = Logger.Factory.CreateLogger(); + + enum EventIds + { + GettingTwin = IdStart + 70, + GetTwinSucceeded, + ErrorGettingTwin, + UpdatingReportedProperties, + UpdatedReportedProperties, + ErrorUpdatingReportedProperties + } + + public static void ErrorUpdatingReportedProperties(string id, Exception ex) + { + Log.LogDebug((int)EventIds.ErrorUpdatingReportedProperties, ex, $"Error updating reported properties for {id}"); + } + + public static void UpdatedReportedProperties(string id) + { + Log.LogInformation((int)EventIds.UpdatedReportedProperties, $"Updated reported properties for {id}"); + } + + public static void UpdatingReportedProperties(string id) + { + Log.LogDebug((int)EventIds.UpdatingReportedProperties, $"Updating reported properties for {id}"); + } + + public static void ErrorGettingTwin(string id, Exception ex) + { + Log.LogWarning((int)EventIds.ErrorGettingTwin, ex, $"Error getting twin for {id}"); + } + + public static void GetTwinSucceeded(string id) + { + Log.LogDebug((int)EventIds.GetTwinSucceeded, $"Got twin for {id}"); + } + + public static void GettingTwin(string id) + { + Log.LogDebug((int)EventIds.GettingTwin, $"Getting twin for {id}"); + } + } + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ICloudSync.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ICloudSync.cs new file mode 100644 index 00000000000..0290e8bf420 --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ICloudSync.cs @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Shared; + + interface ICloudSync + { + Task> GetTwin(string id); + + Task UpdateReportedProperties(string id, TwinCollection patch); + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/IReportedPropertiesStore.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/IReportedPropertiesStore.cs new file mode 100644 index 00000000000..83ea16e09f1 --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/IReportedPropertiesStore.cs @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Shared; + + interface IReportedPropertiesStore + { + Task Update(string id, TwinCollection patch); + + void InitSyncToCloud(string id); + + Task SyncToCloud(string id); + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ITwinStore.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ITwinStore.cs new file mode 100644 index 00000000000..b5047c932f0 --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ITwinStore.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Shared; + + interface ITwinStore + { + Task> Get(string id); + + Task UpdateReportedProperties(string id, TwinCollection patch); + + Task UpdateDesiredProperties(string id, TwinCollection patch); + + Task Update(string id, Twin twin); + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/PassThroughTwinManager.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/PassThroughTwinManager.cs new file mode 100644 index 00000000000..6145737faf5 --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/PassThroughTwinManager.cs @@ -0,0 +1,46 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud; + using Microsoft.Azure.Devices.Edge.Hub.Core.Device; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Shared; + + public class PassThroughTwinManager : ITwinManager + { + readonly IConnectionManager connectionManager; + readonly IMessageConverter twinConverter; + + public PassThroughTwinManager(IConnectionManager connectionManager, IMessageConverterProvider messageConverterProvider) + { + Preconditions.CheckNotNull(messageConverterProvider, nameof(messageConverterProvider)); + this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager)); + this.twinConverter = messageConverterProvider.Get(); + } + + public async Task GetTwinAsync(string id) + { + Preconditions.CheckNonWhiteSpace(id, nameof(id)); + Option cloudProxy = await this.connectionManager.GetCloudConnection(id); + IMessage twin = await cloudProxy + .Map(c => c.GetTwinAsync()) + .GetOrElse(() => Task.FromResult(this.twinConverter.ToMessage(new Twin()))); + return twin; + } + + public Task UpdateDesiredPropertiesAsync(string id, IMessage twinCollection) + { + Preconditions.CheckNonWhiteSpace(id, nameof(id)); + Option deviceProxy = this.connectionManager.GetDeviceConnection(id); + return deviceProxy.ForEachAsync(dp => dp.OnDesiredPropertyUpdates(twinCollection)); + } + + public async Task UpdateReportedPropertiesAsync(string id, IMessage twinCollection) + { + Preconditions.CheckNonWhiteSpace(id, nameof(id)); + Option cloudProxy = await this.connectionManager.GetCloudConnection(id); + await cloudProxy.ForEachAsync(cp => cp.UpdateReportedPropertiesAsync(twinCollection)); + } + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ReportedPropertiesStore.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ReportedPropertiesStore.cs new file mode 100644 index 00000000000..5c900ecb84b --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ReportedPropertiesStore.cs @@ -0,0 +1,192 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Storage; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Shared; + using Microsoft.Extensions.Logging; + using Nito.AsyncEx; + + class ReportedPropertiesStore : IReportedPropertiesStore + { + static readonly TimeSpan DefaultSyncFrequency = TimeSpan.FromSeconds(5); + readonly IEntityStore twinStore; + readonly ICloudSync cloudSync; + readonly AsyncLockProvider lockProvider = new AsyncLockProvider(10); + readonly AsyncAutoResetEvent syncToCloudSignal = new AsyncAutoResetEvent(false); + readonly HashSet syncToCloudClients = new HashSet(StringComparer.Ordinal); + readonly object syncToCloudSetLock = new object(); + readonly Task syncToCloudTask; + readonly TimeSpan syncFrequency; + + public ReportedPropertiesStore(IEntityStore twinStore, ICloudSync cloudSync, Option syncFrequency) + { + this.twinStore = twinStore; + this.cloudSync = cloudSync; + this.syncFrequency = syncFrequency.GetOrElse(DefaultSyncFrequency); + this.syncToCloudTask = this.SyncToCloud(); + } + + public async Task Update(string id, TwinCollection patch) + { + using (await this.lockProvider.GetLock(id).LockAsync()) + { + Events.StoringReportedPropertiesInStore(id, patch); + await this.twinStore.PutOrUpdate( + id, + new TwinStoreEntity(patch), + twinInfo => + { + Events.UpdatingReportedPropertiesInStore(id, patch); + TwinCollection updatedReportedProperties = twinInfo.ReportedPropertiesPatch + .Map(reportedProperties => new TwinCollection(JsonEx.Merge(reportedProperties, patch, /*treatNullAsDelete*/ false))) + .GetOrElse(() => patch); + return new TwinStoreEntity(twinInfo.Twin, Option.Maybe(updatedReportedProperties)); + }); + } + } + + public void InitSyncToCloud(string id) + { + lock (this.syncToCloudSetLock) + { + this.syncToCloudClients.Add(id); + } + + this.syncToCloudSignal.Set(); + } + + public async Task SyncToCloud(string id) + { + Events.SyncingReportedPropertiesToCloud(id); + Option twinWithReportedProperties = (await this.twinStore.Get(id)) + .Filter(t => t.ReportedPropertiesPatch.HasValue); + if (twinWithReportedProperties.HasValue) + { + using (await this.lockProvider.GetLock(id).LockAsync()) + { + Events.StoredReportedPropertiesFound(id); + Option twinInfo = await this.twinStore.Get(id); + await twinInfo.ForEachAsync( + async ti => + { + await ti.ReportedPropertiesPatch.ForEachAsync( + async reportedPropertiesPatch => + { + bool result = await this.cloudSync.UpdateReportedProperties(id, reportedPropertiesPatch); + if (result) + { + Events.UpdateReportedPropertiesSucceeded(id); + await this.twinStore.Update( + id, + t => new TwinStoreEntity(t.Twin, Option.None())); + twinInfo = await this.twinStore.Get(id); + } + else + { + Events.UpdateReportedPropertiesFailed(id); + } + }); + }); + } + } + else + { + Events.DoneSyncingReportedProperties(id); + } + } + + async Task SyncToCloud() + { + while (true) + { + try + { + // Take a snapshot of clients to process + IEnumerable clientsToProcess; + lock (this.syncToCloudSetLock) + { + clientsToProcess = this.syncToCloudClients.ToList(); + this.syncToCloudClients.Clear(); + } + + foreach (string id in clientsToProcess) + { + await this.SyncToCloud(id); + } + } + catch (Exception e) + { + Events.ErrorSyncingReportedPropertiesToCloud(e); + } + + // Wait for syncfrequency to avoid looping too fast, + // then wait for the signal indicating more work is ready + await Task.Delay(this.syncFrequency); + await this.syncToCloudSignal.WaitAsync(); + } + } + + static class Events + { + const int IdStart = HubCoreEventIds.TwinManager; + static readonly ILogger Log = Logger.Factory.CreateLogger(); + + enum EventIds + { + DoneSyncingReportedProperties = IdStart + 30, + ErrorSyncingReportedPropertiesToCloud, + StoringReportedPropertiesInStore, + UpdatingReportedPropertiesInStore, + SyncingReportedPropertiesToCloud, + StoredReportedPropertiesFound, + UpdateReportedPropertiesSucceeded, + UpdateReportedPropertiesFailed + } + + public static void UpdateReportedPropertiesFailed(string id) + { + Log.LogWarning((int)EventIds.UpdateReportedPropertiesFailed, $"Updating reported properties failed {id}"); + } + + public static void UpdateReportedPropertiesSucceeded(string id) + { + Log.LogDebug((int)EventIds.UpdateReportedPropertiesSucceeded, $"Updated reported properties for {id}"); + } + + public static void StoredReportedPropertiesFound(string id) + { + Log.LogDebug((int)EventIds.StoredReportedPropertiesFound, $"Found stored reported properties for {id} to sync to cloud"); + } + + public static void SyncingReportedPropertiesToCloud(string id) + { + Log.LogDebug((int)EventIds.SyncingReportedPropertiesToCloud, $"Syncing stored reported properties to cloud in {id}"); + } + + public static void UpdatingReportedPropertiesInStore(string id, TwinCollection patch) + { + Log.LogDebug((int)EventIds.UpdatingReportedPropertiesInStore, $"Updating reported properties in store with version {patch.Version} for {id}"); + } + + public static void StoringReportedPropertiesInStore(string id, TwinCollection patch) + { + Log.LogDebug((int)EventIds.StoringReportedPropertiesInStore, $"Storing reported properties in store for {id} with version {patch.Version}"); + } + + public static void DoneSyncingReportedProperties(string id) + { + Log.LogInformation((int)EventIds.DoneSyncingReportedProperties, $"Done syncing reported properties for {id}"); + } + + internal static void ErrorSyncingReportedPropertiesToCloud(Exception e) + { + Log.LogWarning((int)EventIds.ErrorSyncingReportedPropertiesToCloud, e, $"Error in pump to sync reported properties to cloud"); + } + } + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ReportedPropertiesValidator.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ReportedPropertiesValidator.cs new file mode 100644 index 00000000000..ca7c058d7cf --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/ReportedPropertiesValidator.cs @@ -0,0 +1,117 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System; + using System.Text; + using JetBrains.Annotations; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Shared; + using Newtonsoft.Json.Linq; + + public class ReportedPropertiesValidator : IValidator + { + const int TwinPropertyMaxDepth = 5; // taken from IoTHub + const int TwinPropertyValueMaxLength = 4096; // bytes. taken from IoTHub + const long TwinPropertyMaxSafeValue = 4503599627370495; // (2^52) - 1. taken from IoTHub + const long TwinPropertyMinSafeValue = -4503599627370496; // -2^52. taken from IoTHub + const int TwinPropertyDocMaxLength = 8 * 1024; // 8K bytes. taken from IoTHub + + public void Validate(TwinCollection reportedProperties) + { + Preconditions.CheckNotNull(reportedProperties, nameof(reportedProperties)); + JToken reportedPropertiesJToken = JToken.Parse(reportedProperties.ToJson()); + ValidateTwinProperties(reportedPropertiesJToken, 1); + ValidateTwinCollectionSize(reportedProperties); + } + + static void ValidateTwinProperties(JToken properties, int currentDepth) + { + foreach (JProperty kvp in ((JObject)properties).Properties()) + { + ValidatePropertyNameAndLength(kvp.Name); + + ValidateValueType(kvp.Name, kvp.Value); + + string s = kvp.Value.ToString(); + ValidatePropertyValueLength(kvp.Name, s); + + if ((kvp.Value is JValue) && (kvp.Value.Type is JTokenType.Integer)) + { + ValidateIntegerValue(kvp.Name, (long)kvp.Value); + } + + if ((kvp.Value != null) && (kvp.Value is JObject)) + { + if (currentDepth > TwinPropertyMaxDepth) + { + throw new InvalidOperationException($"Nested depth of twin property exceeds {TwinPropertyMaxDepth}"); + } + + // do validation recursively + ValidateTwinProperties(kvp.Value, currentDepth + 1); + } + } + } + + static void ValidatePropertyNameAndLength(string name) + { + if (name == null) + { + throw new ArgumentNullException(nameof(name)); + } + + if (Encoding.UTF8.GetByteCount(name) > TwinPropertyValueMaxLength) + { + string truncated = name.Substring(0, 10); + throw new InvalidOperationException($"Length of property name {truncated}.. exceeds maximum length of {TwinPropertyValueMaxLength}"); + } + + for (int index = 0; index < name.Length; index++) + { + char ch = name[index]; + // $ is reserved for service properties like $metadata, $version etc. + // However, $ is already a reserved character in Mongo, so we need to substitute it with another character like #. + // So we're also reserving # for service side usage. + if (char.IsControl(ch) || ch == '.' || ch == '$' || ch == '#' || char.IsWhiteSpace(ch)) + { + throw new InvalidOperationException($"Property name {name} contains invalid character '{ch}'"); + } + } + } + + static void ValidatePropertyValueLength(string name, string value) + { + int valueByteCount = value != null ? Encoding.UTF8.GetByteCount(value) : 0; + if (valueByteCount > TwinPropertyValueMaxLength) + { + throw new InvalidOperationException($"Value associated with property name {name} has length {valueByteCount} that exceeds maximum length of {TwinPropertyValueMaxLength}"); + } + } + + [AssertionMethod] + static void ValidateIntegerValue(string name, long value) + { + if (value > TwinPropertyMaxSafeValue || value < TwinPropertyMinSafeValue) + { + throw new InvalidOperationException($"Property {name} has an out of bound value. Valid values are between {TwinPropertyMinSafeValue} and {TwinPropertyMaxSafeValue}"); + } + } + + static void ValidateValueType(string property, JToken value) + { + if (!JsonEx.IsValidToken(value)) + { + throw new InvalidOperationException($"Property {property} has a value of unsupported type. Valid types are integer, float, string, bool, null and nested object"); + } + } + + static void ValidateTwinCollectionSize(TwinCollection collection) + { + long size = Encoding.UTF8.GetByteCount(collection.ToJson()); + if (size > TwinPropertyDocMaxLength) + { + throw new InvalidOperationException($"Twin properties size {size} exceeds maximum {TwinPropertyDocMaxLength}"); + } + } + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/StoringTwinManager.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/StoringTwinManager.cs new file mode 100644 index 00000000000..a2b02dbcc29 --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/StoringTwinManager.cs @@ -0,0 +1,323 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Hub.Core.Device; + using Microsoft.Azure.Devices.Edge.Hub.Core.Identity; + using Microsoft.Azure.Devices.Edge.Storage; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Shared; + using Microsoft.Extensions.Logging; + + public class StoringTwinManager : ITwinManager + { + static readonly TimeSpan DefaultMinTwinSyncPeriod = TimeSpan.FromMinutes(2); + readonly IMessageConverter twinCollectionConverter; + readonly IMessageConverter twinConverter; + readonly IConnectionManager connectionManager; + readonly IValidator reportedPropertiesValidator; + readonly IReportedPropertiesStore reportedPropertiesStore; + readonly AsyncLockProvider twinStoreLock = new AsyncLockProvider(10); + readonly AsyncLockProvider reportedPropertiesStoreLock = new AsyncLockProvider(10); + readonly ITwinStore twinStore; + readonly ICloudSync cloudSync; + readonly TimeSpan minTwinSyncPeriod; + readonly ConcurrentDictionary twinSyncTime = new ConcurrentDictionary(); + + internal StoringTwinManager( + IConnectionManager connectionManager, + IMessageConverter twinCollectionConverter, + IMessageConverter twinConverter, + IValidator reportedPropertiesValidator, + ITwinStore twinStore, + IReportedPropertiesStore reportedPropertiesStore, + ICloudSync cloudSync, + IDeviceConnectivityManager deviceConnectivityManager, + TimeSpan minTwinSyncPeriod) + { + Preconditions.CheckNotNull(twinStore, nameof(twinStore)); + Preconditions.CheckNotNull(deviceConnectivityManager, nameof(deviceConnectivityManager)); + this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager)); + this.twinCollectionConverter = Preconditions.CheckNotNull(twinCollectionConverter, nameof(twinCollectionConverter)); + this.twinConverter = Preconditions.CheckNotNull(twinConverter, nameof(twinConverter)); + this.cloudSync = Preconditions.CheckNotNull(cloudSync, nameof(cloudSync)); + this.twinStore = Preconditions.CheckNotNull(twinStore, nameof(twinStore)); + this.reportedPropertiesStore = Preconditions.CheckNotNull(reportedPropertiesStore, nameof(reportedPropertiesStore)); + this.reportedPropertiesValidator = reportedPropertiesValidator; + this.minTwinSyncPeriod = minTwinSyncPeriod; + + deviceConnectivityManager.DeviceConnected += (_, __) => this.DeviceConnectedCallback(); + } + + public static ITwinManager Create( + IConnectionManager connectionManager, + IMessageConverterProvider messageConverterProvider, + IEntityStore entityStore, + IDeviceConnectivityManager deviceConnectivityManager, + IValidator reportedPropertiesValidator, + Option minTwinSyncPeriod, + Option reportedPropertiesSyncFrequency) + { + Preconditions.CheckNotNull(connectionManager, nameof(connectionManager)); + Preconditions.CheckNotNull(messageConverterProvider, nameof(messageConverterProvider)); + Preconditions.CheckNotNull(entityStore, nameof(entityStore)); + Preconditions.CheckNotNull(deviceConnectivityManager, nameof(deviceConnectivityManager)); + Preconditions.CheckNotNull(reportedPropertiesValidator, nameof(reportedPropertiesValidator)); + + IMessageConverter twinCollectionConverter = messageConverterProvider.Get(); + IMessageConverter twinConverter = messageConverterProvider.Get(); + ICloudSync cloudSync = new CloudSync(connectionManager, twinCollectionConverter, twinConverter); + var twinManager = new StoringTwinManager( + connectionManager, + twinCollectionConverter, + twinConverter, + reportedPropertiesValidator, + new TwinStore(entityStore), + new ReportedPropertiesStore(entityStore, cloudSync, reportedPropertiesSyncFrequency), + cloudSync, + deviceConnectivityManager, + minTwinSyncPeriod.GetOrElse(DefaultMinTwinSyncPeriod)); + + return twinManager; + } + + public async Task GetTwinAsync(string id) + { + Preconditions.CheckNotNull(id, nameof(id)); + + Option twinOption = await this.cloudSync.GetTwin(id); + Twin twin = await twinOption + .Map( + async t => + { + Events.GotTwinFromCloud(id); + await this.StoreTwinInStore(id, t); + return t; + }) + .GetOrElse( + async () => + { + Events.GettingTwinFromStore(id); + Option storedTwin = await this.twinStore.Get(id); + return storedTwin.GetOrElse(() => new Twin()); + }); + return this.twinConverter.ToMessage(twin); + } + + public async Task UpdateDesiredPropertiesAsync(string id, IMessage twinCollection) + { + Preconditions.CheckNotNull(id, nameof(id)); + Preconditions.CheckNotNull(twinCollection, nameof(twinCollection)); + + TwinCollection patch = this.twinCollectionConverter.FromMessage(twinCollection); + Events.UpdatingDesiredProperties(id, patch); + + Option storeTwin = await this.twinStore.Get(id); + await storeTwin + .Filter(t => t.Properties?.Desired?.Version + 1 != patch.Version) + .Map(t => this.SyncTwinAndSendDesiredPropertyUpdates(id, t)) + .GetOrElse( + async () => + { + await this.twinStore.UpdateDesiredProperties(id, patch); + await this.SendPatchToDevice(id, twinCollection); + }); + } + + public async Task UpdateReportedPropertiesAsync(string id, IMessage twinCollection) + { + Preconditions.CheckNotNull(id, nameof(id)); + Preconditions.CheckNotNull(twinCollection, nameof(twinCollection)); + + Events.UpdatingReportedProperties(id); + TwinCollection patch = this.twinCollectionConverter.FromMessage(twinCollection); + this.reportedPropertiesValidator.Validate(patch); + using (await this.reportedPropertiesStoreLock.GetLock(id).LockAsync()) + { + await this.twinStore.UpdateReportedProperties(id, patch); + await this.reportedPropertiesStore.Update(id, patch); + } + + this.reportedPropertiesStore.InitSyncToCloud(id); + } + + async void DeviceConnectedCallback() + { + try + { + Events.HandlingDeviceConnectedCallback(); + IEnumerable connectedClients = this.connectionManager.GetConnectedClients(); + foreach (IIdentity client in connectedClients) + { + string id = client.Id; + try + { + await this.reportedPropertiesStore.SyncToCloud(id); + await this.HandleDesiredPropertiesUpdates(id); + } + catch (Exception e) + { + Events.ErrorHandlingDeviceConnected(e, id); + } + } + } + catch (Exception ex) + { + Events.ErrorInDeviceConnectedCallback(ex); + } + } + + async Task SyncTwinAndSendDesiredPropertyUpdates(string id, Twin storeTwin) + { + Option twinOption = await this.cloudSync.GetTwin(id); + await twinOption.ForEachAsync( + async cloudTwin => + { + Events.UpdatingTwinOnDeviceConnect(id); + await this.StoreTwinInStore(id, cloudTwin); + + string diffPatch = JsonEx.Diff(storeTwin.Properties.Desired, cloudTwin.Properties.Desired); + if (!string.IsNullOrWhiteSpace(diffPatch)) + { + var patch = new TwinCollection(diffPatch); + IMessage patchMessage = this.twinCollectionConverter.ToMessage(patch); + await this.SendPatchToDevice(id, patchMessage); + } + }); + } + + async Task HandleDesiredPropertiesUpdates(string id) + { + Option storeTwin = await this.twinStore.Get(id); + if (!storeTwin.HasValue && !this.connectionManager.CheckClientSubscription(id, DeviceSubscription.DesiredPropertyUpdates)) + { + Events.NoTwinUsage(id); + } + else + { + await storeTwin.ForEachAsync( + async twin => + { + if (!this.twinSyncTime.TryGetValue(id, out DateTime syncTime) || + DateTime.UtcNow - syncTime > this.minTwinSyncPeriod) + { + await this.SyncTwinAndSendDesiredPropertyUpdates(id, twin); + } + else + { + Events.TwinSyncedRecently(id, syncTime, this.minTwinSyncPeriod); + } + }); + } + } + + Task SendPatchToDevice(string id, IMessage twinCollection) + { + Events.SendDesiredPropertyUpdates(id); + bool hasDesiredPropertyUpdatesSubscription = this.connectionManager.CheckClientSubscription(id, DeviceSubscription.DesiredPropertyUpdates); + if (hasDesiredPropertyUpdatesSubscription) + { + Events.SendDesiredPropertyUpdates(id); + Option deviceProxyOption = this.connectionManager.GetDeviceConnection(id); + return deviceProxyOption.ForEachAsync(deviceProxy => deviceProxy.OnDesiredPropertyUpdates(twinCollection)); + } + + Events.SendDesiredPropertyUpdates(id); + return Task.CompletedTask; + } + + async Task StoreTwinInStore(string id, Twin twin) + { + using (await this.twinStoreLock.GetLock(id).LockAsync()) + { + await this.twinStore.Update(id, twin); + this.twinSyncTime.AddOrUpdate(id, DateTime.UtcNow, (_, __) => DateTime.UtcNow); + } + } + + static class Events + { + const int IdStart = HubCoreEventIds.TwinManager; + static readonly ILogger Log = Logger.Factory.CreateLogger(); + + enum EventIds + { + ErrorInDeviceConnectedCallback = IdStart, + StoringTwinManagerCreated, + NoTwinUsage, + UpdatingTwinOnDeviceConnect, + SendDesiredPropertyUpdates, + UpdatingDesiredProperties, + GettingTwinFromStore, + GotTwinFromTwin, + TwinSyncedRecently, + UpdatingReportedProperties, + ErrorHandlingDeviceConnected, + HandlingDeviceConnectedCallback + } + + public static void ErrorInDeviceConnectedCallback(Exception ex) + { + Log.LogWarning((int)EventIds.ErrorInDeviceConnectedCallback, ex, "Error in device connected callback"); + } + + public static void StoringTwinManagerCreated() + { + Log.LogInformation((int)EventIds.StoringTwinManagerCreated, $"Storing twin manager created"); + } + + public static void HandlingDeviceConnectedCallback() + { + Log.LogInformation((int)EventIds.HandlingDeviceConnectedCallback, $"Received device connected callback"); + } + + public static void ErrorHandlingDeviceConnected(Exception ex, string id) + { + Log.LogWarning((int)EventIds.ErrorHandlingDeviceConnected, ex, $"Error handling device connected event for {id}"); + } + + public static void UpdatingReportedProperties(string id) + { + Log.LogDebug((int)EventIds.UpdatingReportedProperties, $"Updating reported properties for {id}"); + } + + public static void UpdatingDesiredProperties(string id, TwinCollection patch) + { + Log.LogDebug((int)EventIds.UpdatingDesiredProperties, $"Received desired property updates for {id} with version {patch.Version}"); + } + + public static void SendDesiredPropertyUpdates(string id) + { + Log.LogDebug((int)EventIds.SendDesiredPropertyUpdates, $"Sending desired property updates to {id}"); + } + + public static void GettingTwinFromStore(string id) + { + Log.LogDebug((int)EventIds.GettingTwinFromStore, $"Getting twin for {id} from store"); + } + + public static void GotTwinFromCloud(string id) + { + Log.LogDebug((int)EventIds.GotTwinFromTwin, $"Got twin for {id} from cloud"); + } + + public static void TwinSyncedRecently(string id, DateTime syncTime, TimeSpan timeSpan) + { + Log.LogDebug((int)EventIds.TwinSyncedRecently, $"Twin for {id} synced at {syncTime} which is sooner than twin sync period {timeSpan.TotalSeconds} secs, skipping syncing twin"); + } + + public static void NoTwinUsage(string id) + { + Log.LogDebug((int)EventIds.NoTwinUsage, $"Not syncing twin on device connect for {id} as the twin does not exist in the store and client does not subscribe to twin change notifications"); + } + + public static void UpdatingTwinOnDeviceConnect(string id) + { + Log.LogDebug((int)EventIds.UpdatingTwinOnDeviceConnect, $"Updated twin for {id} on device connect event"); + } + } + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/TwinStore.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/TwinStore.cs new file mode 100644 index 00000000000..f8d7d2e7f3b --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/TwinStore.cs @@ -0,0 +1,167 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Storage; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Shared; + using Microsoft.Extensions.Logging; + + class TwinStore : ITwinStore + { + readonly IEntityStore twinEntityStore; + + public TwinStore(IEntityStore twinEntityStore) + { + this.twinEntityStore = twinEntityStore; + } + + public async Task> Get(string id) + { + Preconditions.CheckNonWhiteSpace(id, nameof(id)); + Option twinStoreEntity = await this.twinEntityStore.Get(id); + return twinStoreEntity.FlatMap(t => t.Twin); + } + + public async Task UpdateReportedProperties(string id, TwinCollection patch) + { + Preconditions.CheckNonWhiteSpace(id, nameof(id)); + Preconditions.CheckNotNull(patch, nameof(patch)); + Events.UpdatingReportedProperties(id); + await this.twinEntityStore.PutOrUpdate( + id, + new TwinStoreEntity(new Twin { Properties = new TwinProperties { Reported = patch } }), + twinInfo => + { + twinInfo.Twin + .ForEach( + twin => + { + TwinProperties twinProperties = twin.Properties ?? new TwinProperties(); + TwinCollection reportedProperties = twinProperties.Reported ?? new TwinCollection(); + string mergedReportedPropertiesString = JsonEx.Merge(reportedProperties, patch, /* treatNullAsDelete */ true); + twinProperties.Reported = new TwinCollection(mergedReportedPropertiesString); + twin.Properties = twinProperties; + Events.MergedReportedProperties(id); + }); + return twinInfo; + }); + } + + public async Task UpdateDesiredProperties(string id, TwinCollection patch) + { + Events.UpdatingDesiredProperties(id); + Preconditions.CheckNotNull(patch, nameof(patch)); + Option storedTwin = await this.Get(id); + if (storedTwin.HasValue) + { + await this.twinEntityStore.Update( + id, + twinInfo => + { + twinInfo.Twin + .ForEach( + twin => + { + TwinProperties twinProperties = twin.Properties ?? new TwinProperties(); + TwinCollection desiredProperties = twinProperties.Desired ?? new TwinCollection(); + if (desiredProperties.Version + 1 == patch.Version) + { + string mergedDesiredPropertiesString = JsonEx.Merge(desiredProperties, patch, /* treatNullAsDelete */ true); + twinProperties.Desired = new TwinCollection(mergedDesiredPropertiesString); + twin.Properties = twinProperties; + Events.MergedDesiredProperties(id); + } + else + { + Events.DesiredPropertiesVersionMismatch(id, desiredProperties.Version, patch.Version); + } + }); + + return twinInfo; + }); + } + else + { + Events.NoTwinForDesiredPropertiesPatch(id); + } + } + + public async Task Update(string id, Twin twin) + { + Events.UpdatingTwin(id); + Preconditions.CheckNotNull(twin, nameof(twin)); + await this.twinEntityStore.PutOrUpdate( + id, + new TwinStoreEntity(twin), + twinInfo => + { + return twinInfo.Twin + .Filter( + storedTwin => storedTwin.Properties?.Desired?.Version < twin.Properties.Desired.Version + && storedTwin.Properties?.Reported?.Version < twin.Properties.Reported.Version) + .Map(_ => new TwinStoreEntity(Option.Some(twin), twinInfo.ReportedPropertiesPatch)) + .GetOrElse(twinInfo); + }); + Events.DoneUpdatingTwin(id); + } + + static class Events + { + const int IdStart = HubCoreEventIds.TwinManager; + static readonly ILogger Log = Logger.Factory.CreateLogger(); + + enum EventIds + { + MergedReportedProperties = IdStart + 50, + MergedDesiredProperties, + UpdatingDesiredProperties, + DoneUpdatingTwin, + UpdatingTwin, + UpdatingReportedProperties, + NoTwinForDesiredPropertiesPatch, + DesiredPropertiesVersionMismatch + } + + public static void UpdatingReportedProperties(string id) + { + Log.LogDebug((int)EventIds.UpdatingReportedProperties, $"Updating reported properties for {id}"); + } + + public static void DoneUpdatingTwin(string id) + { + Log.LogDebug((int)EventIds.DoneUpdatingTwin, $"Updated twin in store for {id}"); + } + + public static void UpdatingTwin(string id) + { + Log.LogDebug((int)EventIds.UpdatingTwin, $"Updating twin in store for {id}"); + } + + public static void MergedDesiredProperties(string id) + { + Log.LogDebug((int)EventIds.MergedDesiredProperties, $"Merged desired properties for {id} in store"); + } + + public static void UpdatingDesiredProperties(string id) + { + Log.LogDebug((int)EventIds.UpdatingDesiredProperties, $"Updating desired properties for {id}"); + } + + public static void MergedReportedProperties(string id) + { + Log.LogDebug((int)EventIds.MergedReportedProperties, $"Merged reported properties in store for {id}"); + } + + public static void NoTwinForDesiredPropertiesPatch(string id) + { + Log.LogInformation((int)EventIds.NoTwinForDesiredPropertiesPatch, $"Cannot store desired properties patch for {id} in store as twin was not found"); + } + + public static void DesiredPropertiesVersionMismatch(string id, long desiredPropertiesVersion, long patchVersion) + { + Log.LogInformation((int)EventIds.DesiredPropertiesVersionMismatch, $"Skipped updating the desired properties for {id} because patch version {patchVersion} cannot be applied on current version {desiredPropertiesVersion}"); + } + } + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/TwinStoreEntity.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/TwinStoreEntity.cs new file mode 100644 index 00000000000..484702848a3 --- /dev/null +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/TwinStoreEntity.cs @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin +{ + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Edge.Util.Json; + using Microsoft.Azure.Devices.Shared; + using Newtonsoft.Json; + + public class TwinStoreEntity + { + public TwinStoreEntity() + : this(Option.None(), Option.None()) + { + } + + public TwinStoreEntity(Twin twin) + : this(Option.Maybe(twin), Option.None()) + { + } + + public TwinStoreEntity(TwinCollection reportedPropertiesPatch) + : this(Option.None(), Option.Maybe(reportedPropertiesPatch)) + { + } + + [JsonConstructor] + public TwinStoreEntity(Twin twin, TwinCollection reportedPropertiesPatch) + { + this.Twin = Option.Maybe(twin); + this.ReportedPropertiesPatch = reportedPropertiesPatch?.Count != 0 + ? Option.Some(reportedPropertiesPatch) + : Option.None(); + } + + public TwinStoreEntity(Option twin, Option reportedPropertiesPatch) + { + this.Twin = twin; + this.ReportedPropertiesPatch = reportedPropertiesPatch; + } + + [JsonConverter(typeof(OptionConverter))] + public Option Twin { get; } + + [JsonConverter(typeof(OptionConverter))] + public Option ReportedPropertiesPatch { get; } + } +} diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs index 468436f6c5f..6d2e642c8ac 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/DependencyManager.cs @@ -123,6 +123,13 @@ void RegisterRoutingModule(ContainerBuilder builder, (bool isEnabled, bool usePe bool closeCloudConnectionOnIdleTimeout = this.configuration.GetValue("CloseCloudConnectionOnIdleTimeout", true); int cloudOperationTimeoutSecs = this.configuration.GetValue("CloudOperationTimeoutSecs", 20); TimeSpan cloudOperationTimeout = TimeSpan.FromSeconds(cloudOperationTimeoutSecs); + Option minTwinSyncPeriod = this.GetConfigurationValueIfExists("MinTwinSyncPeriodSecs") + .Map(s => TimeSpan.FromSeconds(s)); + Option reportedPropertiesSyncFrequency = this.GetConfigurationValueIfExists("ReportedPropertiesSyncFrequencySecs") + .Map(s => TimeSpan.FromSeconds(s)); + bool useV1TwinManager = this.GetConfigurationValueIfExists("TwinManagerVersion") + .Map(v => v.Equals("v1", StringComparison.OrdinalIgnoreCase)) + .GetOrElse(false); builder.RegisterModule( new RoutingModule( @@ -141,7 +148,10 @@ void RegisterRoutingModule(ContainerBuilder builder, (bool isEnabled, bool usePe maxConnectedClients, cloudConnectionIdleTimeout, closeCloudConnectionOnIdleTimeout, - cloudOperationTimeout)); + cloudOperationTimeout, + minTwinSyncPeriod, + reportedPropertiesSyncFrequency, + useV1TwinManager)); } void RegisterCommonModule(ContainerBuilder builder, bool optimizeForPerformance, (bool isEnabled, bool usePersistentStorage, StoreAndForwardConfiguration config, string storagePath) storeAndForward) @@ -218,5 +228,11 @@ Option GetConfigurationValueIfExists(string key) var value = this.configuration.GetValue(key); return EqualityComparer.Default.Equals(value, default(T)) ? Option.None() : Option.Some(value); } + + Option GetConfigurationValueIfExists(string key) + { + long value = this.configuration.GetValue(key, long.MinValue); + return value == long.MinValue ? Option.None() : Option.Some(value); + } } } diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs index 9f83bc495a1..becbf1d7b01 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Service.Modules using Microsoft.Azure.Devices.Edge.Hub.Core.Identity; using Microsoft.Azure.Devices.Edge.Hub.Core.Routing; using Microsoft.Azure.Devices.Edge.Hub.Core.Storage; + using Microsoft.Azure.Devices.Edge.Hub.Core.Twin; using Microsoft.Azure.Devices.Edge.Storage; using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling; @@ -43,6 +44,9 @@ public class RoutingModule : Module readonly TimeSpan cloudConnectionIdleTimeout; readonly bool closeCloudConnectionOnIdleTimeout; readonly TimeSpan operationTimeout; + readonly Option minTwinSyncPeriod; + readonly Option reportedPropertiesSyncFrequency; + readonly bool useV1TwinManager; public RoutingModule( string iotHubName, @@ -60,7 +64,10 @@ public RoutingModule( int maxConnectedClients, TimeSpan cloudConnectionIdleTimeout, bool closeCloudConnectionOnIdleTimeout, - TimeSpan operationTimeout) + TimeSpan operationTimeout, + Option minTwinSyncPeriod, + Option reportedPropertiesSyncFrequency, + bool useV1TwinManager) { this.iotHubName = Preconditions.CheckNonWhiteSpace(iotHubName, nameof(iotHubName)); this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId)); @@ -78,6 +85,9 @@ public RoutingModule( this.cloudConnectionIdleTimeout = cloudConnectionIdleTimeout; this.closeCloudConnectionOnIdleTimeout = closeCloudConnectionOnIdleTimeout; this.operationTimeout = operationTimeout; + this.minTwinSyncPeriod = minTwinSyncPeriod; + this.reportedPropertiesSyncFrequency = reportedPropertiesSyncFrequency; + this.useV1TwinManager = useV1TwinManager; } protected override void Load(ContainerBuilder builder) @@ -280,9 +290,19 @@ protected override void Load(ContainerBuilder builder) builder.Register( async c => { - var messageConverterProvider = c.Resolve(); - IConnectionManager connectionManager = await c.Resolve>(); - return TwinManager.CreateTwinManager(connectionManager, messageConverterProvider, Option.None()); + if (this.useV1TwinManager) + { + var messageConverterProvider = c.Resolve(); + IConnectionManager connectionManager = await c.Resolve>(); + ITwinManager twinManager = new PassThroughTwinManager(connectionManager, messageConverterProvider); + return twinManager; + } + else + { + var messageConverterProvider = c.Resolve(); + IConnectionManager connectionManager = await c.Resolve>(); + return TwinManager.CreateTwinManager(connectionManager, messageConverterProvider, Option.None()); + } }) .As>() .SingleInstance(); @@ -365,10 +385,30 @@ protected override void Load(ContainerBuilder builder) builder.Register( async c => { - var dbStoreProvider = c.Resolve(); - var messageConverterProvider = c.Resolve(); - IConnectionManager connectionManager = await c.Resolve>(); - return TwinManager.CreateTwinManager(connectionManager, messageConverterProvider, Option.Some(new StoreProvider(dbStoreProvider))); + if (this.useV1TwinManager) + { + var dbStoreProvider = c.Resolve(); + var messageConverterProvider = c.Resolve(); + IConnectionManager connectionManager = await c.Resolve>(); + return TwinManager.CreateTwinManager(connectionManager, messageConverterProvider, Option.Some(new StoreProvider(dbStoreProvider))); + } + else + { + var storeProvider = c.Resolve(); + var messageConverterProvider = c.Resolve(); + var deviceConnectivityManager = c.Resolve(); + IConnectionManager connectionManager = await c.Resolve>(); + IEntityStore entityStore = storeProvider.GetEntityStore("EdgeTwin"); + ITwinManager twinManager = StoringTwinManager.Create( + connectionManager, + messageConverterProvider, + entityStore, + deviceConnectivityManager, + new ReportedPropertiesValidator(), + this.minTwinSyncPeriod, + this.reportedPropertiesSyncFrequency); + return twinManager; + } }) .As>() .SingleInstance(); diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/EdgeHubConnectionTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/EdgeHubConnectionTest.cs index 6cac1368c60..4936ea72483 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/EdgeHubConnectionTest.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/EdgeHubConnectionTest.cs @@ -53,7 +53,7 @@ public async Task HandleMethodInvocationTest() var twinManager = Mock.Of(); var routeFactory = new EdgeRouteFactory(Mock.Of()); var twinCollectionMessageConverter = Mock.Of>(); - var twinMessageConverter = Mock.Of>(); + var twinMessageConverter = Mock.Of>(); var versionInfo = new VersionInfo("1.0", "1", "123"); var deviceScopeIdentitiesCache = new Mock(); deviceScopeIdentitiesCache.Setup(d => d.RefreshServiceIdentities(It.IsAny>())).Returns(Task.CompletedTask); @@ -87,7 +87,7 @@ public async Task HandleMethodInvocationBadInputTest() var twinManager = Mock.Of(); var routeFactory = new EdgeRouteFactory(Mock.Of()); var twinCollectionMessageConverter = Mock.Of>(); - var twinMessageConverter = Mock.Of>(); + var twinMessageConverter = Mock.Of>(); var versionInfo = new VersionInfo("1.0", "1", "123"); var deviceScopeIdentitiesCache = new Mock(); deviceScopeIdentitiesCache.Setup(d => d.RefreshServiceIdentities(It.IsAny>())).Returns(Task.CompletedTask); @@ -118,7 +118,7 @@ public async Task HandleMethodInvocationServerErrorTest() var twinManager = Mock.Of(); var routeFactory = new EdgeRouteFactory(Mock.Of()); var twinCollectionMessageConverter = Mock.Of>(); - var twinMessageConverter = Mock.Of>(); + var twinMessageConverter = Mock.Of>(); var versionInfo = new VersionInfo("1.0", "1", "123"); var deviceScopeIdentitiesCache = new Mock(); deviceScopeIdentitiesCache.Setup(d => d.RefreshServiceIdentities(It.IsAny>())).ThrowsAsync(new Exception("Foo")); @@ -152,7 +152,7 @@ public async Task HandleMethodInvocationInvalidMethodNameTest() var twinManager = Mock.Of(); var routeFactory = new EdgeRouteFactory(Mock.Of()); var twinCollectionMessageConverter = Mock.Of>(); - var twinMessageConverter = Mock.Of>(); + var twinMessageConverter = Mock.Of>(); var versionInfo = new VersionInfo("1.0", "1", "123"); var deviceScopeIdentitiesCache = new Mock(); deviceScopeIdentitiesCache.Setup(d => d.RefreshServiceIdentities(It.IsAny>())).Returns(Task.CompletedTask); diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/TwinManagerTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/TwinManagerTest.cs index 69eda521a22..1409c8a3d6b 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/TwinManagerTest.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/TwinManagerTest.cs @@ -24,7 +24,7 @@ public class TwinManagerTest { readonly Option> twinStore; readonly IMessageConverter twinCollectionMessageConverter; - readonly IMessageConverter twinMessageConverter; + readonly IMessageConverter twinMessageConverter; public TwinManagerTest() { @@ -54,7 +54,7 @@ public void TwinManagerConstructorWithValidArgumentsSucceeds() public async void GetTwinWhenCloudOnlineTwinNotStoredSuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockProxy = new Mock(); @@ -108,7 +108,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void GetTwinWhenCloudOfflineSuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockProxy = new Mock(); @@ -148,7 +148,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void GetTwinPassthroughWhenTwinNotStoredSuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockProxy = new Mock(); @@ -174,7 +174,7 @@ public async void GetTwinPassthroughWhenTwinNotStoredSuccess() public async void UpdateDesiredPropertiesWhenTwinStoredVersionPlus1Success() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; twin.Properties.Desired = new TwinCollection() { ["name"] = "original", @@ -230,7 +230,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void UpdateDesiredPropertiesWhenTwinNotStoredVersionPlus1Success() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); bool getTwinCalled = false; @@ -349,7 +349,7 @@ public async void UpdateReportedPropertiesPassthroughSuccess() public async void UpdateReportedPropertiesWhenCloudOnlineTwinNotStoredSuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; twin.Properties.Reported = new TwinCollection() { ["name"] = "oldvalue", @@ -549,7 +549,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void UpdateReportedPropertiesWhenCloudOfflineTwinStoredSuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; twin.Properties.Reported = new TwinCollection() { ["name"] = "oldvalue" @@ -656,7 +656,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void UpdateReportedPropertiesWhenCloudOfflineMalformedPropertiesThrows() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; twin.Properties.Reported = new TwinCollection() { ["name"] = "oldvalue", @@ -742,7 +742,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void UpdateReportedPropertiesWhenCloudOfflineTooLargeCollectionThrows() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; twin.Properties.Reported = new TwinCollection() { ["name"] = "oldvalue", @@ -885,7 +885,7 @@ public async void UpdateDesiredPropertiesWhenDeviceProxyOfflineThrows() public async void GetTwinDoesNotOverwriteSavedReportedPropertiesSuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockProxy = new Mock(); @@ -991,7 +991,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void GetTwinWhenStorePutFailsReturnsLastKnownSuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockProxy = new Mock(); @@ -1043,7 +1043,7 @@ public async void GetTwinWhenStorePutFailsReturnsLastKnownSuccess() public async void UpdateReportedPropertiesWhenStoreThrowsFailure() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockProxy = new Mock(); @@ -1077,7 +1077,7 @@ public async void UpdateReportedPropertiesWhenStoreThrowsFailure() public async void GetTwinRejectsLowerVersionTwinsSuccess() { // Arrange - setup twin with version - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; twin.Version = 32; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); @@ -1094,7 +1094,7 @@ public async void GetTwinRejectsLowerVersionTwinsSuccess() // Act - call get twin to cache twin IMessage received = await twinManager.GetTwinAsync(deviceId); - Twin cached = null; + Shared.Twin cached = null; await twinManager.ExecuteOnTwinStoreResultAsync( deviceId, t => @@ -1135,7 +1135,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void GetTwinDoesNotGeneratesDesiredPropertyUpdateIfNotSusbribedSuccess() { // Arrange - setup twin with version - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; twin.Properties.Desired = new TwinCollection { ["$version"] = "32" }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); @@ -1159,7 +1159,7 @@ public async void GetTwinDoesNotGeneratesDesiredPropertyUpdateIfNotSusbribedSucc // Act - call get twin to cache twin IMessage received = await twinManager.GetTwinAsync(deviceId); - Twin cached = null; + Shared.Twin cached = null; await twinManager.ExecuteOnTwinStoreResultAsync( deviceId, t => @@ -1204,7 +1204,7 @@ public async void DesiredPropertyFetchesTwinWithCallbackSuccess() { // Arrange - make a twin with a version string deviceId = "device20"; - var twin = new Twin(deviceId); + var twin = new Shared.Twin(deviceId); twin.Version = 32; twin.Properties.Desired = new TwinCollection { @@ -1240,7 +1240,7 @@ public async void DesiredPropertyFetchesTwinWithCallbackSuccess() // Act - cache a twin await twinManager.GetTwinAsync(deviceId); - Twin cached = null; + Shared.Twin cached = null; await twinManager.ExecuteOnTwinStoreResultAsync( deviceId, t => @@ -1283,7 +1283,7 @@ await twinManager.ExecuteOnTwinStoreResultAsync( public async void ConnectionReestablishedReportedPropertiesSyncSuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockCloudProxy = new Mock(); @@ -1393,7 +1393,7 @@ public async void ConnectionReestablishedGetTwinWithDesiredPropertyUpdateSuccess { // Arrange string deviceId = "device22"; - var twin = new Twin(deviceId); + var twin = new Shared.Twin(deviceId); IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockCloudProxy = new Mock(); @@ -1440,7 +1440,7 @@ public async void ConnectionReestablishedGetTwinWithDesiredPropertyUpdateSuccess // Arrange var identity = Mock.Of(i => i.Id == deviceId); - twin = new Twin(); + twin = new Shared.Twin(); twin.Version = 33; twin.Properties.Desired = new TwinCollection() { @@ -1471,7 +1471,7 @@ public async void ConnectionReestablishedGetTwinWithDesiredPropertyUpdateSuccess public async void ConnectionReestablishedDoesNotSyncReportedPropertiesWhenEmptySuccess() { // Arrange - var twin = new Twin("d1") { Version = 1 }; + var twin = new Shared.Twin("d1") { Version = 1 }; IMessage twinMessage = this.twinMessageConverter.ToMessage(twin); var mockCloudProxy = new Mock(); diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/ReportedPropertiesStoreTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/ReportedPropertiesStoreTest.cs new file mode 100644 index 00000000000..8d032c99503 --- /dev/null +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/ReportedPropertiesStoreTest.cs @@ -0,0 +1,116 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Twin +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Hub.Core.Twin; + using Microsoft.Azure.Devices.Edge.Storage; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Edge.Util.Test.Common; + using Microsoft.Azure.Devices.Shared; + using Moq; + using Xunit; + + [Unit] + public class ReportedPropertiesStoreTest + { + [Fact] + public async Task UpdateTest() + { + // Arrange + string id = "d1"; + IEntityStore rpEntityStore = GetReportedPropertiesEntityStore(); + + TwinCollection receivedReportedProperties = null; + var cloudSync = new Mock(); + cloudSync.Setup(c => c.UpdateReportedProperties(id, It.IsAny())) + .Callback((s, collection) => receivedReportedProperties = collection) + .ReturnsAsync(true); + + var reportedPropertiesStore = new ReportedPropertiesStore(rpEntityStore, cloudSync.Object, Option.None()); + + var rbase = new TwinCollection + { + ["p1"] = "v1", + ["p2"] = "v2" + }; + + // Act + await reportedPropertiesStore.Update(id, rbase); + await reportedPropertiesStore.SyncToCloud(id); + + // Assert + Assert.NotNull(receivedReportedProperties); + Assert.Equal(receivedReportedProperties.ToJson(), rbase.ToJson()); + } + + [Fact] + public async Task SyncToCloudTest() + { + // Arrange + string id = "d1"; + IEntityStore rpEntityStore = GetReportedPropertiesEntityStore(); + + var receivedReportedProperties = new List(); + var cloudSync = new Mock(); + cloudSync.Setup(c => c.UpdateReportedProperties(id, It.IsAny())) + .Callback((s, collection) => receivedReportedProperties.Add(collection)) + .ReturnsAsync(true); + + var reportedPropertiesStore = new ReportedPropertiesStore(rpEntityStore, cloudSync.Object, Option.None()); + + var rp1 = new TwinCollection + { + ["p1"] = "v1", + ["p2"] = "v2" + }; + + var rp2 = new TwinCollection + { + ["p1"] = "v12", + ["p3"] = "v3" + }; + + var rp3 = new TwinCollection + { + ["p1"] = "v13", + ["p3"] = "v32" + }; + + var rp4 = new TwinCollection + { + ["p1"] = "v14", + ["p4"] = "v4" + }; + + // Act + await reportedPropertiesStore.Update(id, rp1); + reportedPropertiesStore.InitSyncToCloud(id); + + await reportedPropertiesStore.Update(id, rp2); + reportedPropertiesStore.InitSyncToCloud(id); + + await reportedPropertiesStore.Update(id, rp3); + reportedPropertiesStore.InitSyncToCloud(id); + + await reportedPropertiesStore.Update(id, rp4); + reportedPropertiesStore.InitSyncToCloud(id); + + // Assert + await Task.Delay(TimeSpan.FromSeconds(7)); + + cloudSync.Verify(c => c.UpdateReportedProperties(id, It.IsAny()), Times.Once); + Assert.Equal(1, receivedReportedProperties.Count); + Assert.Equal(receivedReportedProperties[0].ToJson(), "{\"p1\":\"v14\",\"p2\":\"v2\",\"p3\":\"v32\",\"p4\":\"v4\"}"); + } + + static IEntityStore GetReportedPropertiesEntityStore() + { + var dbStoreProvider = new InMemoryDbStoreProvider(); + var entityStoreProvider = new StoreProvider(dbStoreProvider); + IEntityStore entityStore = entityStoreProvider.GetEntityStore($"rp{Guid.NewGuid()}"); + return entityStore; + } + } +} diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/ReportedPropertiesValidatorTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/ReportedPropertiesValidatorTest.cs new file mode 100644 index 00000000000..e344081041e --- /dev/null +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/ReportedPropertiesValidatorTest.cs @@ -0,0 +1,128 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Twin +{ + using System; + using System.Collections.Generic; + using Microsoft.Azure.Devices.Edge.Hub.Core.Twin; + using Microsoft.Azure.Devices.Edge.Util.Test.Common; + using Microsoft.Azure.Devices.Shared; + using Newtonsoft.Json; + using Xunit; + + public class ReportedPropertiesValidatorTest + { + static IEnumerable GetTwinCollections() + { + string longString = new string('*', 5000); + + yield return new object[] + { + new TwinCollection(JsonConvert.SerializeObject(new + { + level1 = new + { + level2 = new + { + level3 = new + { + level4 = new + { + level5 = new { } + } + } + } + } + })), + null + }; + + yield return new object[] + { + new TwinCollection(JsonConvert.SerializeObject(new + { + ok = "ok", + level = new + { + ok = "ok", + s = longString + } + })), + typeof(InvalidOperationException) + }; + + yield return new object[] + { + new TwinCollection(JsonConvert.SerializeObject(new + { + level = new + { + number = -4503599627370497 + } + })), + typeof(InvalidOperationException) + }; + + yield return new object[] + { + new TwinCollection(JsonConvert.SerializeObject(new + { + level1 = new + { + level2 = new + { + level3 = new + { + level4 = new + { + level5 = new + { + level6 = new { } + } + } + } + } + } + })), + typeof(InvalidOperationException) + }; + + yield return new object[] + { + new TwinCollection(JsonConvert.SerializeObject(new + { + array = new[] { 0, 1, 2 } + })), + typeof(InvalidOperationException) + }; + + yield return new object[] + { + new TwinCollection(JsonConvert.SerializeObject(new + { + tooBig = new byte[10 * 1024] + })), + typeof(InvalidOperationException) + }; + } + + [Unit] + [Theory] + [MemberData(nameof(GetTwinCollections))] + public void ValidateReportedPropertiesTest(TwinCollection twinCollection, Type expectedExceptionType) + { + // Arrange + var reportedPropertiesValidator = new ReportedPropertiesValidator(); + + // Act/Assert + if (expectedExceptionType == null) + { + reportedPropertiesValidator.Validate(twinCollection); + } + else + { + Assert.Throws(expectedExceptionType, () => reportedPropertiesValidator.Validate(twinCollection)); + } + } + } +} diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/StoringTwinManagerTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/StoringTwinManagerTest.cs new file mode 100644 index 00000000000..6fcc71f1260 --- /dev/null +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/StoringTwinManagerTest.cs @@ -0,0 +1,624 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Twin +{ + using System; + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Hub.CloudProxy; + using Microsoft.Azure.Devices.Edge.Hub.Core.Device; + using Microsoft.Azure.Devices.Edge.Hub.Core.Identity; + using Microsoft.Azure.Devices.Edge.Hub.Core.Twin; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Edge.Util.Test.Common; + using Microsoft.Azure.Devices.Shared; + using Moq; + using Xunit; + + [Unit] + public class StoringTwinManagerTest + { + [Fact] + public async Task GetTwinTest() + { + // Arrange + string id = "d1"; + + var desired1 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v3", + ["$version"] = 1 + }; + + var reported1 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v3", + ["$version"] = 1 + }; + + var twin1 = new Twin + { + Properties = new TwinProperties + { + Desired = desired1, + Reported = reported1 + } + }; + + var cloudSync = new Mock(); + cloudSync.SetupSequence(c => c.GetTwin(id)) + .ReturnsAsync(Option.None()) + .ReturnsAsync(Option.Some(twin1)) + .ReturnsAsync(Option.None()); + + Twin receivedTwin = null; + var twinStore = new Mock(); + twinStore.Setup(c => c.Update(id, It.IsAny())) + .Callback((s, t) => receivedTwin = t) + .Returns(Task.CompletedTask); + + twinStore.Setup(c => c.Get(id)) + .ReturnsAsync(() => Option.Maybe(receivedTwin)); + + var twinMessageConverter = new TwinMessageConverter(); + var connectionManager = Mock.Of(); + var twinCollectionConverter = Mock.Of>(); + var reportedPropertiesValidator = Mock.Of>(); + var reportedPropertiesStore = Mock.Of(); + var deviceConnectivityManager = Mock.Of(); + + var twinManager = new StoringTwinManager( + connectionManager, + twinCollectionConverter, + twinMessageConverter, + reportedPropertiesValidator, + twinStore.Object, + reportedPropertiesStore, + cloudSync.Object, + deviceConnectivityManager, + TimeSpan.FromMinutes(10)); + + // Act + IMessage twinMessage = await twinManager.GetTwinAsync(id); + + // Assert + Assert.NotNull(twinMessage); + Twin twin = twinMessageConverter.FromMessage(twinMessage); + Assert.NotNull(twin); + Assert.Equal("{\"deviceId\":null,\"etag\":null,\"version\":null,\"properties\":{\"desired\":{},\"reported\":{}}}", twin.ToJson()); + + // Act + twinMessage = await twinManager.GetTwinAsync(id); + + // Assert + Assert.NotNull(twinMessage); + twin = twinMessageConverter.FromMessage(twinMessage); + Assert.NotNull(twin); + Assert.NotNull(receivedTwin); + Assert.Equal(receivedTwin.ToJson(), twin.ToJson()); + Assert.Equal(twin1.ToJson(), twin.ToJson()); + + // Act + twinMessage = await twinManager.GetTwinAsync(id); + + // Assert + Assert.NotNull(twinMessage); + twin = twinMessageConverter.FromMessage(twinMessage); + Assert.NotNull(twin); + Assert.NotNull(receivedTwin); + Assert.Equal(receivedTwin.ToJson(), twin.ToJson()); + Assert.Equal(twin1.ToJson(), twin.ToJson()); + } + + [Fact] + public async Task UpdateReportedPropertiesTest() + { + string id = "d1"; + + var reported1 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v3" + }; + + TwinCollection receivedTwinPatch = null; + var twinStore = new Mock(MockBehavior.Strict); + twinStore.Setup(c => c.UpdateReportedProperties(id, It.IsAny())) + .Callback((s, t) => receivedTwinPatch = t) + .Returns(Task.CompletedTask); + + TwinCollection receivedTwinPatch2 = null; + var reportedPropertiesStore = new Mock(MockBehavior.Strict); + reportedPropertiesStore.Setup(r => r.InitSyncToCloud(id)); + reportedPropertiesStore.Setup(r => r.Update(id, It.IsAny())) + .Callback((s, t) => receivedTwinPatch2 = t) + .Returns(Task.CompletedTask); + + var cloudSync = Mock.Of(); + var twinMessageConverter = new TwinMessageConverter(); + var connectionManager = Mock.Of(); + var twinCollectionConverter = new TwinCollectionMessageConverter(); + var reportedPropertiesValidator = Mock.Of>(); + var deviceConnectivityManager = Mock.Of(); + + var twinManager = new StoringTwinManager( + connectionManager, + twinCollectionConverter, + twinMessageConverter, + reportedPropertiesValidator, + twinStore.Object, + reportedPropertiesStore.Object, + cloudSync, + deviceConnectivityManager, + TimeSpan.FromMinutes(10)); + + IMessage reportedPropertiesMessage = twinCollectionConverter.ToMessage(reported1); + + // Act + await twinManager.UpdateReportedPropertiesAsync(id, reportedPropertiesMessage); + + // Assert + twinStore.VerifyAll(); + reportedPropertiesStore.VerifyAll(); + + Assert.NotNull(receivedTwinPatch); + Assert.NotNull(receivedTwinPatch2); + Assert.Equal(reported1.ToJson(), receivedTwinPatch.ToJson()); + Assert.Equal(reported1.ToJson(), receivedTwinPatch2.ToJson()); + } + + [Fact] + public async Task UpdateDesiredPropertiesTest() + { + string id = "d1"; + + var desired0 = new TwinCollection + { + ["p0"] = "vp0", + ["$version"] = 0 + }; + + var reported0 = new TwinCollection + { + ["p0"] = "vp0", + ["$version"] = 0 + }; + + var twinBase = new Twin + { + Properties = new TwinProperties + { + Reported = reported0, + Desired = desired0 + } + }; + + var desired1 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v3", + ["$version"] = 1 + }; + + TwinCollection receivedTwinPatch = null; + var twinStore = new Mock(MockBehavior.Strict); + twinStore.Setup(c => c.UpdateDesiredProperties(id, It.IsAny())) + .Callback((s, t) => receivedTwinPatch = t) + .Returns(Task.CompletedTask); + + twinStore.Setup(c => c.Get(id)) + .ReturnsAsync(Option.Some(twinBase)); + + var reportedPropertiesStore = new Mock(MockBehavior.Strict); + + IMessage receivedTwinPatchMessage = null; + var deviceProxy = new Mock(); + deviceProxy.Setup(d => d.OnDesiredPropertyUpdates(It.IsAny())) + .Callback(m => receivedTwinPatchMessage = m) + .Returns(Task.CompletedTask); + + var cloudSync = Mock.Of(); + var twinMessageConverter = new TwinMessageConverter(); + var twinCollectionConverter = new TwinCollectionMessageConverter(); + var reportedPropertiesValidator = Mock.Of>(); + var deviceConnectivityManager = Mock.Of(); + + var connectionManager = Mock.Of( + c => + c.CheckClientSubscription(id, DeviceSubscription.DesiredPropertyUpdates) + && c.GetDeviceConnection(id) == Option.Some(deviceProxy.Object)); + + var twinManager = new StoringTwinManager( + connectionManager, + twinCollectionConverter, + twinMessageConverter, + reportedPropertiesValidator, + twinStore.Object, + reportedPropertiesStore.Object, + cloudSync, + deviceConnectivityManager, + TimeSpan.FromMinutes(10)); + + IMessage desiredPropertiesMessage = twinCollectionConverter.ToMessage(desired1); + + // Act + await twinManager.UpdateDesiredPropertiesAsync(id, desiredPropertiesMessage); + + // Assert + twinStore.VerifyAll(); + reportedPropertiesStore.VerifyAll(); + + Assert.NotNull(receivedTwinPatch); + Assert.NotNull(receivedTwinPatchMessage); + Assert.Equal(desired1.ToJson(), receivedTwinPatch.ToJson()); + TwinCollection receivedTwinPatch2 = twinCollectionConverter.FromMessage(receivedTwinPatchMessage); + Assert.Equal(desired1.ToJson(), receivedTwinPatch2.ToJson()); + } + + [Fact] + public async Task UpdateDesiredPropertiesWithIncorrectPatchTest() + { + string id = "d1"; + + var desired0 = new TwinCollection + { + ["p0"] = "vp0", + ["$version"] = 0 + }; + + var reported0 = new TwinCollection + { + ["p0"] = "vp0", + ["$version"] = 0 + }; + + var twinBase = new Twin + { + Properties = new TwinProperties + { + Reported = reported0, + Desired = desired0 + } + }; + + var desired2 = new TwinCollection + { + ["p1"] = "vp1", + ["p2"] = "v2", + ["p3"] = "v3", + ["$version"] = 2 + }; + + var reported2 = new TwinCollection + { + ["p2"] = "vp2", + ["$version"] = 2 + }; + + var twin2 = new Twin + { + Properties = new TwinProperties + { + Reported = reported2, + Desired = desired2 + } + }; + + var desired2Patch = new TwinCollection + { + ["p1"] = "vp1", + ["$version"] = 2 + }; + + var twinStore = new Mock(MockBehavior.Strict); + twinStore.Setup(c => c.Get(id)) + .ReturnsAsync(Option.Some(twinBase)); + + Twin storedTwin = null; + twinStore.Setup(c => c.Update(id, It.IsAny())) + .Callback((s, t) => storedTwin = t) + .Returns(Task.CompletedTask); + + var reportedPropertiesStore = new Mock(MockBehavior.Strict); + + IMessage receivedTwinPatchMessage = null; + var deviceProxy = new Mock(); + deviceProxy.Setup(d => d.OnDesiredPropertyUpdates(It.IsAny())) + .Callback(m => receivedTwinPatchMessage = m) + .Returns(Task.CompletedTask); + + var cloudSync = Mock.Of(c => c.GetTwin(id) == Task.FromResult(Option.Some(twin2))); + var twinMessageConverter = new TwinMessageConverter(); + var twinCollectionConverter = new TwinCollectionMessageConverter(); + var reportedPropertiesValidator = Mock.Of>(); + var deviceConnectivityManager = Mock.Of(); + + var connectionManager = Mock.Of( + c => + c.CheckClientSubscription(id, DeviceSubscription.DesiredPropertyUpdates) + && c.GetDeviceConnection(id) == Option.Some(deviceProxy.Object)); + + var twinManager = new StoringTwinManager( + connectionManager, + twinCollectionConverter, + twinMessageConverter, + reportedPropertiesValidator, + twinStore.Object, + reportedPropertiesStore.Object, + cloudSync, + deviceConnectivityManager, + TimeSpan.FromMinutes(10)); + + IMessage desiredPropertiesMessage = twinCollectionConverter.ToMessage(desired2Patch); + + // Act + await twinManager.UpdateDesiredPropertiesAsync(id, desiredPropertiesMessage); + + // Assert + twinStore.VerifyAll(); + reportedPropertiesStore.VerifyAll(); + + Assert.NotNull(storedTwin); + Assert.NotNull(receivedTwinPatchMessage); + Assert.Equal(twin2.ToJson(), storedTwin.ToJson()); + TwinCollection receivedTwinPatch2 = twinCollectionConverter.FromMessage(receivedTwinPatchMessage); + Assert.Equal("{\"p0\":null,\"$version\":2,\"p1\":\"vp1\",\"p2\":\"v2\",\"p3\":\"v3\"}", receivedTwinPatch2.ToJson()); + } + + [Fact] + public async Task DeviceConnectionTest() + { + string id = "d1"; + var identity = Mock.Of(i => i.Id == id); + + var desired0 = new TwinCollection + { + ["p0"] = "vp0", + ["$version"] = 0 + }; + + var reported0 = new TwinCollection + { + ["p0"] = "vp0", + ["$version"] = 0 + }; + + var twinBase = new Twin + { + Properties = new TwinProperties + { + Reported = reported0, + Desired = desired0 + } + }; + + var desired2 = new TwinCollection + { + ["p1"] = "vp1", + ["p2"] = "v2", + ["p3"] = "v3", + ["$version"] = 2 + }; + + var reported2 = new TwinCollection + { + ["p2"] = "vp2", + ["$version"] = 2 + }; + + var twin2 = new Twin + { + Properties = new TwinProperties + { + Reported = reported2, + Desired = desired2 + } + }; + + var twinStore = new Mock(MockBehavior.Strict); + twinStore.Setup(c => c.Get(id)) + .ReturnsAsync(Option.Some(twinBase)); + + Twin storedTwin = null; + twinStore.Setup(c => c.Update(id, It.IsAny())) + .Callback((s, t) => storedTwin = t) + .Returns(Task.CompletedTask); + + var reportedPropertiesStore = new Mock(MockBehavior.Strict); + reportedPropertiesStore.Setup(r => r.SyncToCloud(id)) + .Returns(Task.CompletedTask); + + IMessage receivedTwinPatchMessage = null; + var deviceProxy = new Mock(MockBehavior.Strict); + deviceProxy.Setup(d => d.OnDesiredPropertyUpdates(It.IsAny())) + .Callback(m => receivedTwinPatchMessage = m) + .Returns(Task.CompletedTask); + + var cloudSync = new Mock(MockBehavior.Strict); + cloudSync.Setup(c => c.GetTwin(id)) + .ReturnsAsync(Option.Some(twin2)); + var twinMessageConverter = new TwinMessageConverter(); + var twinCollectionConverter = new TwinCollectionMessageConverter(); + var reportedPropertiesValidator = Mock.Of>(); + + var connectionManager = Mock.Of( + c => + c.CheckClientSubscription(id, DeviceSubscription.DesiredPropertyUpdates) + && c.GetDeviceConnection(id) == Option.Some(deviceProxy.Object) + && c.GetConnectedClients() == new[] { identity }); + + var deviceConnectivityManager = new Mock(); + + var twinManager = new StoringTwinManager( + connectionManager, + twinCollectionConverter, + twinMessageConverter, + reportedPropertiesValidator, + twinStore.Object, + reportedPropertiesStore.Object, + cloudSync.Object, + deviceConnectivityManager.Object, + TimeSpan.FromMinutes(10)); + + // Act + deviceConnectivityManager.Raise(d => d.DeviceConnected += null, this, new EventArgs()); + + // Assert + await Task.Delay(TimeSpan.FromSeconds(3)); + twinStore.VerifyAll(); + reportedPropertiesStore.VerifyAll(); + cloudSync.VerifyAll(); + deviceProxy.VerifyAll(); + Mock.Get(connectionManager).VerifyAll(); + + Assert.NotNull(storedTwin); + Assert.NotNull(receivedTwinPatchMessage); + Assert.Equal(twin2.ToJson(), storedTwin.ToJson()); + TwinCollection receivedTwinPatch2 = twinCollectionConverter.FromMessage(receivedTwinPatchMessage); + Assert.Equal("{\"p0\":null,\"$version\":2,\"p1\":\"vp1\",\"p2\":\"v2\",\"p3\":\"v3\"}", receivedTwinPatch2.ToJson()); + } + + [Fact] + public async Task DeviceConnectionNoSubscriptionTest() + { + string id = "d1"; + var identity = Mock.Of(i => i.Id == id); + + var twinStore = new Mock(MockBehavior.Strict); + twinStore.Setup(c => c.Get(id)) + .ReturnsAsync(Option.None()); + + var reportedPropertiesStore = new Mock(MockBehavior.Strict); + reportedPropertiesStore.Setup(r => r.SyncToCloud(id)) + .Returns(Task.CompletedTask); + + var deviceProxy = new Mock(MockBehavior.Strict); + var cloudSync = new Mock(MockBehavior.Strict); + var twinMessageConverter = new TwinMessageConverter(); + var twinCollectionConverter = new TwinCollectionMessageConverter(); + var reportedPropertiesValidator = Mock.Of>(); + + var connectionManager = Mock.Of( + c => + c.CheckClientSubscription(id, DeviceSubscription.DesiredPropertyUpdates) == false + && c.GetConnectedClients() == new[] { identity }); + + var deviceConnectivityManager = new Mock(); + + var twinManager = new StoringTwinManager( + connectionManager, + twinCollectionConverter, + twinMessageConverter, + reportedPropertiesValidator, + twinStore.Object, + reportedPropertiesStore.Object, + cloudSync.Object, + deviceConnectivityManager.Object, + TimeSpan.FromMinutes(10)); + + // Act + deviceConnectivityManager.Raise(d => d.DeviceConnected += null, this, new EventArgs()); + + // Assert + await Task.Delay(TimeSpan.FromSeconds(3)); + twinStore.VerifyAll(); + reportedPropertiesStore.VerifyAll(); + Mock.Get(connectionManager).VerifyAll(); + cloudSync.VerifyAll(); + deviceProxy.VerifyAll(); + } + + [Fact] + public async Task DeviceConnectionSyncPeriodTest() + { + string id = "d1"; + var identity = Mock.Of(i => i.Id == id); + + var desired2 = new TwinCollection + { + ["p1"] = "vp1", + ["p2"] = "v2", + ["p3"] = "v3", + ["$version"] = 2 + }; + + var reported2 = new TwinCollection + { + ["p2"] = "vp2", + ["$version"] = 2 + }; + + var twin2 = new Twin + { + Properties = new TwinProperties + { + Reported = reported2, + Desired = desired2 + } + }; + + var twinStore = new Mock(MockBehavior.Strict); + twinStore.Setup(c => c.Get(id)) + .ReturnsAsync(Option.Some(twin2)); + + Twin storedTwin = null; + twinStore.Setup(c => c.Update(id, It.IsAny())) + .Callback((s, t) => storedTwin = t) + .Returns(Task.CompletedTask); + + var reportedPropertiesStore = new Mock(MockBehavior.Strict); + reportedPropertiesStore.Setup(r => r.SyncToCloud(id)) + .Returns(Task.CompletedTask); + + var deviceProxy = new Mock(MockBehavior.Strict); + + var cloudSync = new Mock(MockBehavior.Strict); + cloudSync.Setup(c => c.GetTwin(id)) + .ReturnsAsync(Option.Some(twin2)); + var twinMessageConverter = new TwinMessageConverter(); + var twinCollectionConverter = new TwinCollectionMessageConverter(); + var reportedPropertiesValidator = Mock.Of>(); + + var connectionManager = Mock.Of( + c => + c.CheckClientSubscription(id, DeviceSubscription.DesiredPropertyUpdates) + && c.GetDeviceConnection(id) == Option.Some(deviceProxy.Object) + && c.GetConnectedClients() == new[] { identity }); + + var deviceConnectivityManager = new Mock(); + + var twinManager = new StoringTwinManager( + connectionManager, + twinCollectionConverter, + twinMessageConverter, + reportedPropertiesValidator, + twinStore.Object, + reportedPropertiesStore.Object, + cloudSync.Object, + deviceConnectivityManager.Object, + TimeSpan.FromMinutes(10)); + + // Act + IMessage getTwinMessage = await twinManager.GetTwinAsync(id); + + // Assert + Assert.NotNull(getTwinMessage); + Twin getTwin = twinMessageConverter.FromMessage(getTwinMessage); + Assert.NotNull(getTwin); + Assert.Equal(twin2.ToJson(), getTwin.ToJson()); + + // Act + deviceConnectivityManager.Raise(d => d.DeviceConnected += null, this, new EventArgs()); + + // Assert + await Task.Delay(TimeSpan.FromSeconds(3)); + twinStore.VerifyAll(); + reportedPropertiesStore.VerifyAll(); + deviceProxy.VerifyAll(); + cloudSync.Verify(c => c.GetTwin(id), Times.AtMostOnce); + + Assert.NotNull(storedTwin); + Assert.Equal(twin2.ToJson(), storedTwin.ToJson()); + } + } +} diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/TwinStoreEntityTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/TwinStoreEntityTest.cs new file mode 100644 index 00000000000..2890964f18b --- /dev/null +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/TwinStoreEntityTest.cs @@ -0,0 +1,120 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Twin +{ + using Microsoft.Azure.Devices.Edge.Hub.Core.Twin; + using Microsoft.Azure.Devices.Edge.Storage; + using Microsoft.Azure.Devices.Edge.Util.Test.Common; + using Microsoft.Azure.Devices.Shared; + using Newtonsoft.Json; + using Xunit; + + [Unit] + public class TwinStoreEntityTest + { + [Fact] + public void RoundtripEmptyTest() + { + var twinStoreEntity = new TwinStoreEntity(); + string json = JsonConvert.SerializeObject(twinStoreEntity); + var deserializedObject = JsonConvert.DeserializeObject(json); + Assert.False(deserializedObject.Twin.HasValue); + Assert.False(deserializedObject.ReportedPropertiesPatch.HasValue); + } + + [Fact] + public void RoundtripReportedPropertiesPatchTest() + { + var reportedProperties = new TwinCollection(); + reportedProperties["P1"] = "v1"; + reportedProperties["P2"] = "v2"; + + var twinStoreEntity = new TwinStoreEntity(reportedProperties); + string json = JsonConvert.SerializeObject(twinStoreEntity); + var deserializedObject = JsonConvert.DeserializeObject(json); + + Assert.False(deserializedObject.Twin.HasValue); + Assert.True(deserializedObject.ReportedPropertiesPatch.HasValue); + Assert.Equal("v1", (string)deserializedObject.ReportedPropertiesPatch.OrDefault()["P1"]); + Assert.Equal("v2", (string)deserializedObject.ReportedPropertiesPatch.OrDefault()["P2"]); + } + + [Fact] + public void RoundtripTwinTest() + { + var reportedProperties = new TwinCollection(); + reportedProperties["P1"] = "v1"; + var desiredProperties = new TwinCollection(); + desiredProperties["P2"] = "v2"; + var twin = new Twin(new TwinProperties { Desired = desiredProperties, Reported = reportedProperties }); + var twinStoreEntity = new TwinStoreEntity(twin); + string json = JsonConvert.SerializeObject(twinStoreEntity); + var deserializedObject = JsonConvert.DeserializeObject(json); + + Assert.False(deserializedObject.ReportedPropertiesPatch.HasValue); + Assert.True(deserializedObject.Twin.HasValue); + Assert.Equal("v1", (string)deserializedObject.Twin.OrDefault().Properties.Reported["P1"]); + Assert.Equal("v2", (string)deserializedObject.Twin.OrDefault().Properties.Desired["P2"]); + } + + [Fact] + public void BackwardCompatTest() + { + // Arrange + var twinReportedProperties = new TwinCollection(); + twinReportedProperties["P1"] = "v1"; + var twinDesiredProperties = new TwinCollection(); + twinDesiredProperties["P2"] = "v2"; + var twin = new Twin(new TwinProperties { Desired = twinDesiredProperties, Reported = twinReportedProperties }); + + var reportedProperties = new TwinCollection(); + reportedProperties["r1"] = "rv1"; + + // Act + var twinInfo1 = new TwinInfo(twin, reportedProperties); + string json1 = twinInfo1.ToJson(); + var twinStoreEntity1 = json1.FromJson(); + + // Assert + Assert.NotNull(twinStoreEntity1); + Assert.True(twinStoreEntity1.Twin.HasValue); + Assert.Equal("v1", (string)twinStoreEntity1.Twin.OrDefault().Properties.Reported["P1"]); + Assert.Equal("v2", (string)twinStoreEntity1.Twin.OrDefault().Properties.Desired["P2"]); + Assert.True(twinStoreEntity1.ReportedPropertiesPatch.HasValue); + Assert.Equal("rv1", (string)twinStoreEntity1.ReportedPropertiesPatch.OrDefault()["r1"]); + + // Act + var twinInfo2 = new TwinInfo(twin, null); + string json2 = twinInfo2.ToJson(); + var twinStoreEntity2 = json2.FromJson(); + + // Assert + Assert.NotNull(twinStoreEntity2); + Assert.True(twinStoreEntity2.Twin.HasValue); + Assert.Equal("v1", (string)twinStoreEntity2.Twin.OrDefault().Properties.Reported["P1"]); + Assert.Equal("v2", (string)twinStoreEntity2.Twin.OrDefault().Properties.Desired["P2"]); + Assert.False(twinStoreEntity2.ReportedPropertiesPatch.HasValue); + + // Act + var twinInfo3 = new TwinInfo(null, reportedProperties); + string json3 = twinInfo3.ToJson(); + var twinStoreEntity3 = json3.FromJson(); + + // Assert + Assert.NotNull(twinStoreEntity3); + Assert.False(twinStoreEntity3.Twin.HasValue); + Assert.True(twinStoreEntity3.ReportedPropertiesPatch.HasValue); + Assert.Equal("rv1", (string)twinStoreEntity3.ReportedPropertiesPatch.OrDefault()["r1"]); + + // Act + var twinInfo4 = new TwinInfo(null, null); + string json4 = twinInfo4.ToJson(); + var twinStoreEntity4 = json4.FromJson(); + + // Assert + Assert.NotNull(twinStoreEntity4); + Assert.False(twinStoreEntity4.Twin.HasValue); + Assert.False(twinStoreEntity4.ReportedPropertiesPatch.HasValue); + } + } +} diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/TwinStoreTest.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/TwinStoreTest.cs new file mode 100644 index 00000000000..6cdf664ae3a --- /dev/null +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/twin/TwinStoreTest.cs @@ -0,0 +1,459 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Hub.Core.Test.Twin +{ + using System; + using System.Threading.Tasks; + using Microsoft.Azure.Devices.Edge.Hub.Core.Twin; + using Microsoft.Azure.Devices.Edge.Storage; + using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Edge.Util.Test.Common; + using Microsoft.Azure.Devices.Shared; + using Xunit; + + [Unit] + public class TwinStoreTest + { + [Fact] + public async Task GetTwinTest() + { + // Arrange + IEntityStore twinEntityStore = GetTwinEntityStore(); + await twinEntityStore.Put( + "d1", + new TwinStoreEntity( + new Twin + { + Properties = new TwinProperties + { + Desired = new TwinCollection + { + ["dp1"] = "d1" + }, + Reported = new TwinCollection + { + ["rp1"] = "r1" + } + } + })); + + await twinEntityStore.Put( + "d2", + new TwinStoreEntity( + new TwinCollection + { + ["rp2"] = "d2" + })); + + await twinEntityStore.Put( + "d3", + new TwinStoreEntity()); + + await twinEntityStore.Put( + "d4", + new TwinStoreEntity( + new Twin + { + Properties = new TwinProperties + { + Desired = new TwinCollection + { + ["dp4"] = "d4" + }, + Reported = new TwinCollection + { + ["rp4"] = "r4" + } + } + }, + new TwinCollection + { + ["rp4"] = "r4" + })); + ITwinStore twinStore = new TwinStore(twinEntityStore); + + // Act + Option t1 = await twinStore.Get("d1"); + + // Assert + Assert.True(t1.HasValue); + Assert.Equal(t1.OrDefault().Properties.Desired.ToJson(), "{\"dp1\":\"d1\"}"); + Assert.Equal(t1.OrDefault().Properties.Reported.ToJson(), "{\"rp1\":\"r1\"}"); + + // Act + Option t2 = await twinStore.Get("d2"); + + // Assert + Assert.False(t2.HasValue); + + // Act + Option t3 = await twinStore.Get("d3"); + + // Assert + Assert.False(t3.HasValue); + + // Act + Option t4 = await twinStore.Get("d4"); + + // Assert + Assert.True(t4.HasValue); + Assert.Equal(t4.OrDefault().Properties.Desired.ToJson(), "{\"dp4\":\"d4\"}"); + Assert.Equal(t4.OrDefault().Properties.Reported.ToJson(), "{\"rp4\":\"r4\"}"); + } + + [Fact] + public async Task UpdatedReportedPropertiesTest() + { + // Arrange + IEntityStore twinEntityStore = GetTwinEntityStore(); + ITwinStore twinStore = new TwinStore(twinEntityStore); + + var rbase = new TwinCollection + { + ["p1"] = "v1", + ["p2"] = "v2" + }; + + var rpatch1 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v3" + }; + + var rpatch2 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = new + { + p31 = "v31" + } + }; + + var rpatch3 = new TwinCollection + { + ["p2"] = "vp2", + ["p3"] = new + { + p31 = "v32" + } + }; + + string id = "d1"; + + // Act + await twinStore.UpdateReportedProperties(id, rbase); + + // Assert + Option twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"v1\",\"p2\":\"v2\"}"); + + // Act + await twinStore.UpdateReportedProperties(id, rpatch1); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"v2\",\"p3\":\"v3\"}"); + + // Act + await twinStore.UpdateReportedProperties(id, rpatch2); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"v2\",\"p3\":{\"p31\":\"v31\"}}"); + + // Act + await twinStore.UpdateReportedProperties(id, rpatch3); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"vp2\",\"p3\":{\"p31\":\"v32\"}}"); + } + + [Fact] + public async Task UpdatedDesiredPropertiesTest() + { + // Arrange + IEntityStore twinEntityStore = GetTwinEntityStore(); + ITwinStore twinStore = new TwinStore(twinEntityStore); + + var dbase = new TwinCollection + { + ["p1"] = "v1", + ["p2"] = "v2", + ["$version"] = 0 + }; + + var basetwin = new Twin + { + Properties = new TwinProperties + { + Desired = dbase + } + }; + + var dpatch1 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v3", + ["$version"] = 1 + }; + + var dpatch2 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = new + { + p31 = "v31" + }, + ["$version"] = 2 + }; + + var dpatch3 = new TwinCollection + { + ["p2"] = "vp2", + ["p3"] = new + { + p31 = "v32" + }, + ["$version"] = 3 + }; + + var dpatch4 = new TwinCollection + { + ["p2"] = "vp4", + ["p3"] = new + { + p31 = "v50" + }, + ["$version"] = 3 + }; + + string id = "d1"; + + // Act + await twinStore.Update(id, basetwin); + + // Assert + Option twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p1\":\"v1\",\"p2\":\"v2\",\"$version\":0}"); + + // Act + await twinStore.UpdateDesiredProperties(id, dpatch1); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"v2\",\"$version\":1,\"p3\":\"v3\"}"); + + // Act + await twinStore.UpdateDesiredProperties(id, dpatch2); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"v2\",\"$version\":2,\"p3\":{\"p31\":\"v31\"}}"); + + // Act + await twinStore.UpdateDesiredProperties(id, dpatch3); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"vp2\",\"$version\":3,\"p3\":{\"p31\":\"v32\"}}"); + + // Act + await twinStore.UpdateDesiredProperties(id, dpatch4); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"vp2\",\"$version\":3,\"p3\":{\"p31\":\"v32\"}}"); + } + + [Fact] + public async Task UpdatedTest() + { + // Arrange + IEntityStore twinEntityStore = GetTwinEntityStore(); + ITwinStore twinStore = new TwinStore(twinEntityStore); + + var dbase = new TwinCollection + { + ["p1"] = "v1", + ["p2"] = "v2", + ["$version"] = 0 + }; + + var rbase = new TwinCollection + { + ["p1"] = "v1", + ["p2"] = "v2", + ["$version"] = 0 + }; + + var basetwin = new Twin + { + Properties = new TwinProperties + { + Desired = dbase, + Reported = rbase + } + }; + + var desired1 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v3", + ["$version"] = 1 + }; + + var reported1 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v3", + ["$version"] = 1 + }; + + var twin1 = new Twin + { + Properties = new TwinProperties + { + Desired = desired1, + Reported = reported1 + } + }; + + var desired2 = new TwinCollection + { + ["p2"] = "vp2", + ["p3"] = "v3", + ["$version"] = 2 + }; + + var reported2 = new TwinCollection + { + ["p1"] = "vp1", + ["p2"] = "vp3", + ["$version"] = 2 + }; + + var twin2 = new Twin + { + Properties = new TwinProperties + { + Desired = desired2, + Reported = reported2 + } + }; + + var desired3 = new TwinCollection + { + ["p2"] = "v10", + ["p3"] = "vp3", + ["$version"] = 3 + }; + + var reported3 = new TwinCollection + { + ["p1"] = "vp1", + ["p3"] = "v10", + ["$version"] = 3 + }; + + var twin3 = new Twin + { + Properties = new TwinProperties + { + Desired = desired3, + Reported = reported1 + } + }; + + var twin4 = new Twin + { + Properties = new TwinProperties + { + Desired = desired2, + Reported = reported3 + } + }; + + var twin5 = new Twin + { + Properties = new TwinProperties + { + Desired = desired2, + Reported = reported2 + } + }; + + string id = "d1"; + + // Act + await twinStore.Update(id, basetwin); + + // Assert + Option twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p1\":\"v1\",\"p2\":\"v2\",\"$version\":0}"); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"v1\",\"p2\":\"v2\",\"$version\":0}"); + + // Act + await twinStore.Update(id, twin1); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p1\":\"vp1\",\"p3\":\"v3\",\"$version\":1}"); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"vp1\",\"p3\":\"v3\",\"$version\":1}"); + + // Act + await twinStore.Update(id, twin2); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p2\":\"vp2\",\"p3\":\"v3\",\"$version\":2}"); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"vp3\",\"$version\":2}"); + + // Act + await twinStore.Update(id, twin3); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p2\":\"vp2\",\"p3\":\"v3\",\"$version\":2}"); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"vp3\",\"$version\":2}"); + + // Act + await twinStore.Update(id, twin4); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p2\":\"vp2\",\"p3\":\"v3\",\"$version\":2}"); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"vp3\",\"$version\":2}"); + + // Act + await twinStore.Update(id, twin5); + + // Assert + twin = await twinStore.Get(id); + Assert.True(twin.HasValue); + Assert.Equal(twin.OrDefault().Properties.Desired.ToJson(), "{\"p2\":\"vp2\",\"p3\":\"v3\",\"$version\":2}"); + Assert.Equal(twin.OrDefault().Properties.Reported.ToJson(), "{\"p1\":\"vp1\",\"p2\":\"vp3\",\"$version\":2}"); + } + + static IEntityStore GetTwinEntityStore() + { + var dbStoreProvider = new InMemoryDbStoreProvider(); + var entityStoreProvider = new StoreProvider(dbStoreProvider); + IEntityStore entityStore = entityStoreProvider.GetEntityStore($"twin{Guid.NewGuid()}"); + return entityStore; + } + } +} diff --git a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.E2E.Test/DependencyManager.cs b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.E2E.Test/DependencyManager.cs index 0a2e41efe19..b47f545b9c2 100644 --- a/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.E2E.Test/DependencyManager.cs +++ b/edge-hub/test/Microsoft.Azure.Devices.Edge.Hub.E2E.Test/DependencyManager.cs @@ -133,7 +133,10 @@ public void Register(ContainerBuilder builder) 101, TimeSpan.FromSeconds(3600), true, - TimeSpan.FromSeconds(20))); + TimeSpan.FromSeconds(20), + Option.None(), + Option.None(), + false)); builder.RegisterModule(new HttpModule()); builder.RegisterModule(new MqttModule(mqttSettingsConfiguration.Object, topics, this.serverCertificate, false, false, false));