Skip to content

Commit

Permalink
EdgeHub: Process Subscriptions on device connect event (#455)
Browse files Browse the repository at this point in the history
* Fix handling re-subscriptions

* Add tests

* Fix test

* Add ToList()
  • Loading branch information
varunpuranik authored Oct 19, 2018
1 parent 96bc2de commit d8b9038
Show file tree
Hide file tree
Showing 15 changed files with 276 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy
{
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public ConnectionManager(

public IEnumerable<IIdentity> GetConnectedClients() =>
this.devices.Values
.Where(d => d.DeviceConnection.Map(dc => dc.IsActive).GetOrElse(false) && !d.Identity.Id.Equals($"{this.edgeDeviceId}/{this.edgeModuleId}"))
.Where(d => d.DeviceConnection.Map(dc => dc.IsActive).GetOrElse(false))
.Select(d => d.Identity);

public async Task AddDeviceConnection(IIdentity identity, IDeviceProxy deviceProxy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public ConnectionReauthenticator(
ICredentialsCache credentialsCache,
IDeviceScopeIdentitiesCache deviceScopeIdentitiesCache,
TimeSpan reauthenticateFrequency,
IIdentity edgeHubIdentity)
IIdentity edgeHubIdentity,
IDeviceConnectivityManager deviceConnectivityManager)
{
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.authenticator = Preconditions.CheckNotNull(authenticator, nameof(authenticator));
Expand All @@ -38,18 +39,15 @@ public ConnectionReauthenticator(
this.deviceScopeIdentitiesCache = Preconditions.CheckNotNull(deviceScopeIdentitiesCache, nameof(deviceScopeIdentitiesCache));
this.timer = new Timer(reauthenticateFrequency.TotalMilliseconds);
this.timer.Elapsed += this.ReauthenticateConnections;
this.connectionManager.CloudConnectionEstablished += this.CloudConnectionEstablishedHandler;
deviceConnectivityManager.DeviceConnected += this.DeviceConnected;
this.deviceScopeIdentitiesCache.ServiceIdentityUpdated += this.HandleServiceIdentityUpdate;
this.deviceScopeIdentitiesCache.ServiceIdentityRemoved += this.HandleServiceIdentityRemove;
}

void CloudConnectionEstablishedHandler(object sender, IIdentity identity)
void DeviceConnected(object sender, EventArgs args)
{
if (this.edgeHubIdentity.Id.Equals(identity.Id))
{
Events.EdgeHubConnectionReestablished();
this.deviceScopeIdentitiesCache.InitiateCacheRefresh();
}
Events.EdgeHubConnectionReestablished();
this.deviceScopeIdentitiesCache.InitiateCacheRefresh();
}

public void Init()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy
namespace Microsoft.Azure.Devices.Edge.Hub.Core
{
using System;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public TwinManager(IConnectionManager connectionManager, IMessageConverter<TwinC
this.actionBlock = new ActionBlock<IIdentity>(this.ProcessConnectionEstablishedForDevice);
}

public static ITwinManager CreateTwinManager(IConnectionManager connectionManager, IMessageConverterProvider messageConverterProvider, Option<IStoreProvider> storeProvider)
public static ITwinManager CreateTwinManager(
IConnectionManager connectionManager,
IMessageConverterProvider messageConverterProvider,
Option<IStoreProvider> storeProvider)
{
Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
Preconditions.CheckNotNull(messageConverterProvider, nameof(messageConverterProvider));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ public class RoutingEdgeHub : IEdgeHub
const long MaxMessageSize = 256 * 1024; // matches IoTHub

public RoutingEdgeHub(Router router, Core.IMessageConverter<IRoutingMessage> messageConverter,
IConnectionManager connectionManager, ITwinManager twinManager, string edgeDeviceId, IInvokeMethodHandler invokeMethodHandler)
IConnectionManager connectionManager, ITwinManager twinManager, string edgeDeviceId,
IInvokeMethodHandler invokeMethodHandler,
IDeviceConnectivityManager deviceConnectivityManager)
{
this.router = Preconditions.CheckNotNull(router, nameof(router));
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.twinManager = Preconditions.CheckNotNull(twinManager, nameof(twinManager));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
this.invokeMethodHandler = Preconditions.CheckNotNull(invokeMethodHandler, nameof(invokeMethodHandler));
this.connectionManager.CloudConnectionEstablished += this.CloudConnectionEstablished;
deviceConnectivityManager.DeviceConnected += this.DeviceConnected;
}

public Task ProcessDeviceMessage(IIdentity identity, IMessage message)
Expand Down Expand Up @@ -214,16 +216,28 @@ internal async Task ProcessSubscription(string id, Option<ICloudProxy> cloudProx
}
}

async void CloudConnectionEstablished(object sender, IIdentity identity)
async void DeviceConnected(object sender, EventArgs eventArgs)
{
Events.DeviceConnectedProcessingSubscriptions();
try
{
Events.ProcessingSubscriptions(identity);
await this.ProcessSubscriptions(identity.Id);
IEnumerable<IIdentity> connectedClients = this.connectionManager.GetConnectedClients().ToList();
foreach (IIdentity identity in connectedClients)
{
try
{
Events.ProcessingSubscriptions(identity);
await this.ProcessSubscriptions(identity.Id);
}
catch (Exception e)
{
Events.ErrorProcessingSubscriptions(e, identity);
}
}
}
catch (Exception e)
{
Events.ErrorProcessingSubscriptions(e, identity);
Events.ErrorProcessingSubscriptions(e);
}
}

Expand Down Expand Up @@ -387,6 +401,16 @@ public static void ProcessingSubscription(string id, DeviceSubscription deviceSu
{
Log.LogInformation((int)EventIds.ProcessingSubscription, Invariant($"Processing subscription {deviceSubscription} for client {id}."));
}

internal static void DeviceConnectedProcessingSubscriptions()
{
Log.LogInformation((int)EventIds.ProcessingSubscription, Invariant($"Device connected to cloud, processing subscriptions for connected clients."));
}

internal static void ErrorProcessingSubscriptions(Exception e)
{
Log.LogWarning((int)EventIds.ProcessingSubscription, e, Invariant($"Error processing subscriptions for connected clients."));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ protected override void Load(ContainerBuilder builder)
var authenticatorTask = c.Resolve<Task<IAuthenticator>>();
var credentialsCacheTask = c.Resolve<Task<ICredentialsCache>>();
var deviceScopeIdentitiesCacheTask = c.Resolve<Task<IDeviceScopeIdentitiesCache>>();
var deviceConnectivityManager = c.Resolve<IDeviceConnectivityManager>();
IConnectionManager connectionManager = await connectionManagerTask;
IAuthenticator authenticator = await authenticatorTask;
ICredentialsCache credentialsCache = await credentialsCacheTask;
Expand All @@ -272,7 +273,8 @@ protected override void Load(ContainerBuilder builder)
credentialsCache,
deviceScopeIdentitiesCache,
TimeSpan.FromMinutes(5),
edgeHubCredentials.Identity);
edgeHubCredentials.Identity,
deviceConnectivityManager);
return connectionReauthenticator;
})
.As<Task<ConnectionReauthenticator>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,13 @@ protected override void Load(ContainerBuilder builder)
var twinManagerTask = c.Resolve<Task<ITwinManager>>();
var invokeMethodHandlerTask = c.Resolve<Task<IInvokeMethodHandler>>();
var connectionManagerTask = c.Resolve<Task<IConnectionManager>>();
var deviceConnectivityManager = c.Resolve<IDeviceConnectivityManager>();
Router router = await routerTask;
ITwinManager twinManager = await twinManagerTask;
IConnectionManager connectionManager = await connectionManagerTask;
IInvokeMethodHandler invokeMethodHandler = await invokeMethodHandlerTask;
IEdgeHub hub = new RoutingEdgeHub(router, routingMessageConverter,
connectionManager, twinManager, this.edgeDeviceId, invokeMethodHandler);
connectionManager, twinManager, this.edgeDeviceId, invokeMethodHandler, deviceConnectivityManager);
return hub;
})
.As<Task<IEdgeHub>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy.Test
using DotNetty.Transport.Channels;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Hub.Core.Identity;
using Microsoft.Azure.Devices.Edge.Util.Test.Common;
using Moq;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ public async Task GetConnectedClientsTest()
// Assert
Assert.NotNull(connectedClients);
List<IIdentity> connectedClientsList = connectedClients.ToList();
Assert.Equal(10, connectedClientsList.Count);
Assert.Equal(11, connectedClientsList.Count);
Assert.True(connectedClientsList.Any(c => c.Id.Equals($"{EdgeDeviceId}/{EdgeModuleId}")));

for (int i = 0; i < 10; i++)
{
Expand All @@ -654,7 +655,8 @@ public async Task GetConnectedClientsTest()
// Assert
Assert.NotNull(connectedClients);
connectedClientsList = connectedClients.ToList();
Assert.Equal(5, connectedClientsList.Count);
Assert.Equal(6, connectedClientsList.Count);
Assert.True(connectedClientsList.Any(c => c.Id.Equals($"{EdgeDeviceId}/{EdgeModuleId}")));

for (int i = 5; i < 10; i++)
{
Expand Down
Loading

0 comments on commit d8b9038

Please sign in to comment.