Skip to content

Commit

Permalink
EdgeHub: Twin manager (#660)
Browse files Browse the repository at this point in the history
* Twin Manager2 changes

* TwinManager2 refactor

* Reported properties Sync to cloud changes

* Add ctors

* ReportedProperties sync changes

* Cleanup

* Make Twin an option as well

* Add events

* Fix reported properties handling

* Fix logs

* Cleanup

* Fix twin store entity

* Fix reported properties validator

* Remove commented code

* More tests

* Split out types for easier tests

* Cleanup CloudSync

* More tests

* TwinStore tests

* More tests

* Fix merge

* More tests

* More checks

* More tests

* More tests

* Fix config and add test

* Cleanup

* Fix config

* Fix reported property updates

* Fix build

* fix test
  • Loading branch information
varunpuranik authored Jan 17, 2019
1 parent 127fec0 commit d99f8ff
Show file tree
Hide file tree
Showing 26 changed files with 2,622 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
[assembly: InternalsVisibleTo("Microsoft.Azure.Devices.Edge.Hub.Core.Test")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Devices.Edge.Hub.Mqtt.Test")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Devices.Edge.Hub.E2E.Test")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public Option<IReadOnlyDictionary<DeviceSubscription, bool>> GetSubscriptions(st
.Map(d => new ReadOnlyDictionary<DeviceSubscription, bool>(d.Subscriptions) as IReadOnlyDictionary<DeviceSubscription, bool>)
: Option.None<IReadOnlyDictionary<DeviceSubscription, bool>>();

public bool CheckClientSubscription(string id, DeviceSubscription subscription) =>
this.GetSubscriptions(id)
.Filter(s => s.TryGetValue(subscription, out bool isActive) && isActive)
.HasValue;

public async Task<Try<ICloudProxy>> CreateCloudConnectionAsync(IClientCredentials credentials)
{
Preconditions.CheckNotNull(credentials, nameof(credentials));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class EdgeHubConnection : IConfigSource
readonly IIdentity edgeHubIdentity;
readonly ITwinManager twinManager;
readonly IMessageConverter<TwinCollection> twinCollectionMessageConverter;
readonly IMessageConverter<Twin> twinMessageConverter;
readonly IMessageConverter<Shared.Twin> twinMessageConverter;
readonly VersionInfo versionInfo;
readonly RouteFactory routeFactory;
readonly AsyncLock edgeHubConfigLock = new AsyncLock();
Expand All @@ -40,7 +40,7 @@ internal EdgeHubConnection(
ITwinManager twinManager,
RouteFactory routeFactory,
IMessageConverter<TwinCollection> twinCollectionMessageConverter,
IMessageConverter<Twin> twinMessageConverter,
IMessageConverter<Shared.Twin> twinMessageConverter,
VersionInfo versionInfo,
IDeviceScopeIdentitiesCache deviceScopeIdentitiesCache)
{
Expand All @@ -60,7 +60,7 @@ public static async Task<EdgeHubConnection> Create(
IConnectionManager connectionManager,
RouteFactory routeFactory,
IMessageConverter<TwinCollection> twinCollectionMessageConverter,
IMessageConverter<Twin> twinMessageConverter,
IMessageConverter<Shared.Twin> twinMessageConverter,
VersionInfo versionInfo,
IDeviceScopeIdentitiesCache deviceScopeIdentitiesCache)
{
Expand Down Expand Up @@ -198,7 +198,7 @@ async Task<Option<EdgeHubConfig>> GetConfigInternal()
try
{
IMessage message = await this.twinManager.GetTwinAsync(this.edgeHubIdentity.Id);
Twin twin = this.twinMessageConverter.FromMessage(message);
Shared.Twin twin = this.twinMessageConverter.FromMessage(message);
this.lastDesiredProperties = Option.Some(twin.Properties.Desired);
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public interface IConnectionManager

Option<IReadOnlyDictionary<DeviceSubscription, bool>> GetSubscriptions(string id);

bool CheckClientSubscription(string id, DeviceSubscription subscription);

IEnumerable<IIdentity> GetConnectedClients();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core
public class TwinInfo
{
[JsonConstructor]
public TwinInfo(Twin twin, TwinCollection reportedPropertiesPatch)
public TwinInfo(Shared.Twin twin, TwinCollection reportedPropertiesPatch)
{
this.Twin = twin;
this.ReportedPropertiesPatch = reportedPropertiesPatch;
}

public Twin Twin { get; }
public Shared.Twin Twin { get; }

public TwinCollection ReportedPropertiesPatch { get; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ public class TwinManager : ITwinManager
const long TwinPropertyMinSafeValue = -4503599627370496; // -2^52. taken from IoTHub
const int TwinPropertyDocMaxLength = 8 * 1024; // 8K bytes. taken from IoTHub
readonly IMessageConverter<TwinCollection> twinCollectionConverter;
readonly IMessageConverter<Twin> twinConverter;
readonly IMessageConverter<Shared.Twin> twinConverter;
readonly IConnectionManager connectionManager;
readonly AsyncLock reportedPropertiesLock;
readonly AsyncLock twinLock;
readonly ActionBlock<IIdentity> actionBlock;

public TwinManager(IConnectionManager connectionManager, IMessageConverter<TwinCollection> twinCollectionConverter, IMessageConverter<Twin> twinConverter, Option<IEntityStore<string, TwinInfo>> twinStore)
public TwinManager(IConnectionManager connectionManager, IMessageConverter<TwinCollection> twinCollectionConverter, IMessageConverter<Shared.Twin> twinConverter, Option<IEntityStore<string, TwinInfo>> twinStore)
{
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.twinCollectionConverter = Preconditions.CheckNotNull(twinCollectionConverter, nameof(twinCollectionConverter));
Expand All @@ -55,7 +55,7 @@ public static ITwinManager CreateTwinManager(
var twinManager = new TwinManager(
connectionManager,
messageConverterProvider.Get<TwinCollection>(),
messageConverterProvider.Get<Twin>(),
messageConverterProvider.Get<Shared.Twin>(),
storeProvider.Match(
s => Option.Some(s.GetEntityStore<string, TwinInfo>(Constants.TwinStorePartitionKey)),
() => Option.None<IEntityStore<string, TwinInfo>>()));
Expand Down Expand Up @@ -131,7 +131,7 @@ internal async Task<TwinInfo> GetTwinInfoWhenCloudOnlineAsync(string id, ICloudP
using (await this.twinLock.LockAsync())
{
IMessage twinMessage = await cp.GetTwinAsync();
Twin cloudTwin = this.twinConverter.FromMessage(twinMessage);
Shared.Twin cloudTwin = this.twinConverter.FromMessage(twinMessage);
Events.GotTwinFromCloudSuccess(id, cloudTwin.Properties.Desired.Version, cloudTwin.Properties.Reported.Version);
var newTwin = new TwinInfo(cloudTwin, null);
cached = newTwin;
Expand Down Expand Up @@ -461,7 +461,7 @@ await twinStore.Update(
Desired = new TwinCollection(),
Reported = reported
};
var twin = new Twin(twinProperties);
var twin = new Shared.Twin(twinProperties);
Events.UpdatedCachedReportedProperties(id, reported.Version, cloudVerified);
return new TwinInfo(twin, reported);
}
Expand Down
121 changes: 121 additions & 0 deletions edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/twin/CloudSync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Extensions.Logging;

class CloudSync : ICloudSync
{
readonly IConnectionManager connectionManager;
readonly IMessageConverter<TwinCollection> twinCollectionConverter;
readonly IMessageConverter<Twin> twinConverter;

public CloudSync(
IConnectionManager connectionManager,
IMessageConverter<TwinCollection> twinCollectionConverter,
IMessageConverter<Twin> twinConverter)
{
this.connectionManager = connectionManager;
this.twinCollectionConverter = twinCollectionConverter;
this.twinConverter = twinConverter;
}

public async Task<Option<Twin>> GetTwin(string id)
{
try
{
Events.GettingTwin(id);
Option<ICloudProxy> cloudProxy = await this.connectionManager.GetCloudConnection(id);
Option<Twin> twin = await cloudProxy.Map(
async cp =>
{
IMessage twinMessage = await cp.GetTwinAsync();
Twin twinValue = this.twinConverter.FromMessage(twinMessage);
Events.GetTwinSucceeded(id);
return Option.Some(twinValue);
})
.GetOrElse(() => Task.FromResult(Option.None<Twin>()));
return twin;
}
catch (Exception ex)
{
Events.ErrorGettingTwin(id, ex);
return Option.None<Twin>();
}
}

public async Task<bool> UpdateReportedProperties(string id, TwinCollection patch)
{
try
{
Events.UpdatingReportedProperties(id);
Option<ICloudProxy> cloudProxy = await this.connectionManager.GetCloudConnection(id);
bool result = await cloudProxy.Map(
async cp =>
{
IMessage patchMessage = this.twinCollectionConverter.ToMessage(patch);
await cp.UpdateReportedPropertiesAsync(patchMessage);
Events.UpdatedReportedProperties(id);
return true;
})
.GetOrElse(() => Task.FromResult(false));
return result;
}
catch (Exception ex)
{
Events.ErrorUpdatingReportedProperties(id, ex);
return false;
}
}

static class Events
{
const int IdStart = HubCoreEventIds.TwinManager;
static readonly ILogger Log = Logger.Factory.CreateLogger<StoringTwinManager>();

enum EventIds
{
GettingTwin = IdStart + 70,
GetTwinSucceeded,
ErrorGettingTwin,
UpdatingReportedProperties,
UpdatedReportedProperties,
ErrorUpdatingReportedProperties
}

public static void ErrorUpdatingReportedProperties(string id, Exception ex)
{
Log.LogDebug((int)EventIds.ErrorUpdatingReportedProperties, ex, $"Error updating reported properties for {id}");
}

public static void UpdatedReportedProperties(string id)
{
Log.LogInformation((int)EventIds.UpdatedReportedProperties, $"Updated reported properties for {id}");
}

public static void UpdatingReportedProperties(string id)
{
Log.LogDebug((int)EventIds.UpdatingReportedProperties, $"Updating reported properties for {id}");
}

public static void ErrorGettingTwin(string id, Exception ex)
{
Log.LogWarning((int)EventIds.ErrorGettingTwin, ex, $"Error getting twin for {id}");
}

public static void GetTwinSucceeded(string id)
{
Log.LogDebug((int)EventIds.GetTwinSucceeded, $"Got twin for {id}");
}

public static void GettingTwin(string id)
{
Log.LogDebug((int)EventIds.GettingTwin, $"Getting twin for {id}");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin
{
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Shared;

interface ICloudSync
{
Task<Option<Twin>> GetTwin(string id);

Task<bool> UpdateReportedProperties(string id, TwinCollection patch);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin
{
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Shared;

interface IReportedPropertiesStore
{
Task Update(string id, TwinCollection patch);

void InitSyncToCloud(string id);

Task SyncToCloud(string id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin
{
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Shared;

interface ITwinStore
{
Task<Option<Twin>> Get(string id);

Task UpdateReportedProperties(string id, TwinCollection patch);

Task UpdateDesiredProperties(string id, TwinCollection patch);

Task Update(string id, Twin twin);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Twin
{
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Hub.Core.Cloud;
using Microsoft.Azure.Devices.Edge.Hub.Core.Device;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Shared;

public class PassThroughTwinManager : ITwinManager
{
readonly IConnectionManager connectionManager;
readonly IMessageConverter<Twin> twinConverter;

public PassThroughTwinManager(IConnectionManager connectionManager, IMessageConverterProvider messageConverterProvider)
{
Preconditions.CheckNotNull(messageConverterProvider, nameof(messageConverterProvider));
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.twinConverter = messageConverterProvider.Get<Twin>();
}

public async Task<IMessage> GetTwinAsync(string id)
{
Preconditions.CheckNonWhiteSpace(id, nameof(id));
Option<ICloudProxy> cloudProxy = await this.connectionManager.GetCloudConnection(id);
IMessage twin = await cloudProxy
.Map(c => c.GetTwinAsync())
.GetOrElse(() => Task.FromResult(this.twinConverter.ToMessage(new Twin())));
return twin;
}

public Task UpdateDesiredPropertiesAsync(string id, IMessage twinCollection)
{
Preconditions.CheckNonWhiteSpace(id, nameof(id));
Option<IDeviceProxy> deviceProxy = this.connectionManager.GetDeviceConnection(id);
return deviceProxy.ForEachAsync(dp => dp.OnDesiredPropertyUpdates(twinCollection));
}

public async Task UpdateReportedPropertiesAsync(string id, IMessage twinCollection)
{
Preconditions.CheckNonWhiteSpace(id, nameof(id));
Option<ICloudProxy> cloudProxy = await this.connectionManager.GetCloudConnection(id);
await cloudProxy.ForEachAsync(cp => cp.UpdateReportedPropertiesAsync(twinCollection));
}
}
}
Loading

0 comments on commit d99f8ff

Please sign in to comment.