Skip to content

Commit

Permalink
EdgeHub: Fix recovering connections after connectivity is resumed ove…
Browse files Browse the repository at this point in the history
…r AMQP (#422)

* Open/Close cloud proxy

* Mark proxy inactive if open fails

* Fix ConnectionIdleTimeout to 5 secs

* Fix cloud token authenticator and logging

* Add tests

* Make value a constant

* Fix formatting

* Fix tests
  • Loading branch information
varunpuranik authored Oct 15, 2018
1 parent 9f500e4 commit 6069f7f
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async Task<IClient> CreateDeviceClient(
{
client.SetProductInfo(newCredentials.ProductInfo);
}

await client.OpenAsync();
Events.CreateDeviceClientSuccess(transportSettings.GetTransportType(), OperationTimeoutMilliseconds, newCredentials.Identity);
return client;
}
Expand Down Expand Up @@ -299,7 +299,7 @@ async Task<string> GetNewToken(string iotHub, string id, string currentToken, II
}
else
{
Events.TokenNotUsable(iotHub, id, token);
Events.TokenNotUsable(iotHub, id, token);
}

bool newTokenGetterCreated = false;
Expand All @@ -311,7 +311,7 @@ async Task<string> GetNewToken(string iotHub, string id, string currentToken, II
Events.SafeCreateNewToken(id);
var taskCompletionSource = new TaskCompletionSource<string>();
this.tokenGetter = Option.Some(taskCompletionSource);
newTokenGetterCreated = true;
newTokenGetterCreated = true;
return taskCompletionSource;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ namespace Microsoft.Azure.Devices.Edge.Hub.CloudProxy

public class CloudConnectionProvider : ICloudConnectionProvider
{
// Minimum value allowed by the SDK for Connection Idle timeout for AMQP Multiplexed connections.
static readonly TimeSpan MinAmqpConnectionMuxIdleTimeout = TimeSpan.FromSeconds(5);

static readonly IDictionary<UpstreamProtocol, TransportType> UpstreamProtocolTransportTypeMap = new Dictionary<UpstreamProtocol, TransportType>
{
[UpstreamProtocol.Amqp] = TransportType.Amqp_Tcp_Only,
Expand Down Expand Up @@ -75,7 +78,8 @@ internal static ITransportSettings[] GetTransportSettings(Option<UpstreamProtoco
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings
{
Pooling = true,
MaxPoolSize = (uint)connectionPoolSize
MaxPoolSize = (uint)connectionPoolSize,
ConnectionIdleTimeout = MinAmqpConnectionMuxIdleTimeout
}
}
};
Expand All @@ -98,19 +102,21 @@ internal static ITransportSettings[] GetTransportSettings(Option<UpstreamProtoco
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings
{
Pooling = true,
MaxPoolSize = (uint)connectionPoolSize
MaxPoolSize = (uint)connectionPoolSize,
ConnectionIdleTimeout = MinAmqpConnectionMuxIdleTimeout
}
},
new AmqpTransportSettings(TransportType.Amqp_WebSocket_Only)
{
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings
{
Pooling = true,
MaxPoolSize = (uint)connectionPoolSize
MaxPoolSize = (uint)connectionPoolSize,
ConnectionIdleTimeout = MinAmqpConnectionMuxIdleTimeout
}
}
});
}
}

public async Task<Try<ICloudConnection>> Connect(IClientCredentials identity, Action<string, CloudConnectionStatus> connectionStatusChangedHandler)
{
Expand Down Expand Up @@ -162,5 +168,5 @@ public static void ErrorCreatingCloudConnection(IIdentity identity, Exception ex
Log.LogWarning((int)EventIds.CloudConnectError, exception, $"Error creating cloud connection for client {identity.Id}");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,18 @@ public async Task<bool> CloseAsync()
{
try
{
await this.client.CloseAsync();
// In offline scenario, sometimes the underlying connection has already closed and
// in that case calling CloseAsync throws. This needs to be fixed in the SDK, but meanwhile
// wrapping this.client.CloseAsync in a try/catch
try
{
await this.client.CloseAsync();
}
catch (Exception ex)
{
Events.ErrorClosingClient(this.clientId, ex);
}

await (this.cloudReceiver?.CloseAsync() ?? Task.CompletedTask);
this.timer.Disable();
Events.Closed(this);
Expand Down Expand Up @@ -389,7 +400,8 @@ enum EventIds
CloudReceiverNull,
ErrorOpening,
TimedOutClosing,
Initialized
Initialized,
ErrorClosingClient
}

public static void Closed(CloudProxy cloudProxy)
Expand Down Expand Up @@ -491,6 +503,11 @@ public static void Initialized(CloudProxy cloudProxy)
{
Log.LogInformation((int)EventIds.Initialized, Invariant($"Initialized cloud proxy {cloudProxy.id} for {cloudProxy.clientId}"));
}

public static void ErrorClosingClient(string clientId, Exception ex)
{
Log.LogDebug((int)EventIds.ErrorClosingClient, ex, Invariant($"Error closing client for {clientId}"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,19 @@ public void Dispose()

public Task<Twin> GetTwinAsync() => this.underlyingDeviceClient.GetTwinAsync();

public Task OpenAsync() => this.underlyingDeviceClient.OpenAsync();
public async Task OpenAsync()
{
try
{
await this.underlyingDeviceClient.OpenAsync();
}
catch (Exception)
{
this.isActive.Set(false);
throw;
}
}


public Task<Message> ReceiveAsync(TimeSpan receiveMessageTimeout) => this.underlyingDeviceClient.ReceiveAsync(receiveMessageTimeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ public void Dispose()

public Task<Twin> GetTwinAsync() => this.underlyingModuleClient.GetTwinAsync();

public Task OpenAsync() => this.underlyingModuleClient.OpenAsync();
public async Task OpenAsync()
{
try
{
await this.underlyingModuleClient.OpenAsync();
}
catch(Exception)
{
this.isActive.Set(false);
throw;
}
}

public Task SendEventAsync(Message message) => this.underlyingModuleClient.SendEventAsync(message);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,12 @@ public async Task<bool> AuthenticateAsync(IClientCredentials clientCredentials)
Try<ICloudProxy> cloudProxyTry = await this.connectionManager.CreateCloudConnectionAsync(clientCredentials);
if (cloudProxyTry.Success)
{
try
{
await cloudProxyTry.Value.OpenAsync();
Events.AuthenticatedWithIotHub(clientCredentials.Identity);
return true;
}
catch (Exception ex)
{
Events.ErrorValidatingTokenWithIoTHub(clientCredentials.Identity, ex);
}
Events.AuthenticatedWithIotHub(clientCredentials.Identity);
return true;
}
else
{
Events.ErrorGettingCloudProxy(clientCredentials.Identity, cloudProxyTry.Exception);
Events.ErrorValidatingTokenWithIoTHub(clientCredentials.Identity, cloudProxyTry.Exception);
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ await token.ForEachAsync(async t =>
case CloudConnectionStatus.Disconnected:
Events.InvokingCloudConnectionLostEvent(device.Identity);
this.CloudConnectionLost?.Invoke(this, device.Identity);
await device.CloudConnection.Filter(cp => cp.IsActive).ForEachAsync(cp =>
{
Events.CloudConnectionLostClosingClient(device.Identity);
return cp.CloseAsync();
});
break;

case CloudConnectionStatus.ConnectionEstablished:
Expand Down Expand Up @@ -406,7 +411,8 @@ enum EventIds
ProcessingTokenNearExpiryEvent,
InvokingCloudConnectionLostEvent,
InvokingCloudConnectionEstablishedEvent,
HandlingConnectionStatusChangedHandler
HandlingConnectionStatusChangedHandler,
CloudConnectionLostClosingClient
}

public static void NewCloudConnection(IIdentity identity, Try<ICloudConnection> cloudConnection)
Expand Down Expand Up @@ -462,6 +468,11 @@ public static void HandlingConnectionStatusChangedHandler(string deviceId, Cloud
{
Log.LogInformation((int)EventIds.HandlingConnectionStatusChangedHandler, Invariant($"Connection status for {deviceId} changed to {connectionStatus}"));
}

public static void CloudConnectionLostClosingClient(IIdentity identity)
{
Log.LogDebug((int)EventIds.CloudConnectionLostClosingClient, Invariant($"Cloud connection lost for {identity.Id}, closing client."));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,17 @@ public static IEnumerable<object[]> UpstreamProtocolTransportSettingsData()
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings
{
Pooling = true,
MaxPoolSize = 20
MaxPoolSize = 20,
ConnectionIdleTimeout = TimeSpan.FromSeconds(5)
}
},
new AmqpTransportSettings(TransportType.Amqp_WebSocket_Only)
{
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings
{
Pooling = true,
MaxPoolSize = 20
MaxPoolSize = 20,
ConnectionIdleTimeout = TimeSpan.FromSeconds(5)
}
}
}
Expand All @@ -123,7 +125,8 @@ public static IEnumerable<object[]> UpstreamProtocolTransportSettingsData()
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings
{
Pooling = true,
MaxPoolSize = 30
MaxPoolSize = 30,
ConnectionIdleTimeout = TimeSpan.FromSeconds(5)
}
}
}
Expand All @@ -140,7 +143,8 @@ public static IEnumerable<object[]> UpstreamProtocolTransportSettingsData()
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings
{
Pooling = true,
MaxPoolSize = 50
MaxPoolSize = 50,
ConnectionIdleTimeout = TimeSpan.FromSeconds(5)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ public async Task UpdateInvalidIdentityWithTokenTest()
Assert.Equal(cloudProxy1, cloudConnection.CloudProxy.OrDefault());
}

[Fact]
[Unit]
public async Task InitializeAndGetCloudProxyTest()
{
string iothubHostName = "test.azure-devices.net";
string deviceId = "device1";

IClientCredentials GetClientCredentialsWithNonExpiringToken()
{
string token = TokenHelper.CreateSasToken(iothubHostName, DateTime.UtcNow.AddMinutes(10));
var identity = new DeviceIdentity(iothubHostName, deviceId);
return new TokenCredentials(identity, token, string.Empty);
}

IClient client = GetMockDeviceClient();
var deviceClientProvider = new Mock<IClientProvider>();
deviceClientProvider.Setup(dc => dc.Create(It.IsAny<IIdentity>(), It.IsAny<IAuthenticationMethod>(), It.IsAny<ITransportSettings[]>()))
.Returns(() => client);

var transportSettings = new ITransportSettings[] { new AmqpTransportSettings(TransportType.Amqp_Tcp_Only) };

var messageConverterProvider = new MessageConverterProvider(new Dictionary<Type, IMessageConverter> { [typeof(TwinCollection)] = Mock.Of<IMessageConverter>() });

var cloudConnection = new CloudConnection((_, __) => { }, transportSettings, messageConverterProvider, deviceClientProvider.Object, Mock.Of<ICloudListener>(), TokenProvider, DeviceScopeIdentitiesCache, TimeSpan.FromMinutes(60), true);

IClientCredentials clientCredentialsWithExpiringToken2 = GetClientCredentialsWithNonExpiringToken();
ICloudProxy cloudProxy = await cloudConnection.CreateOrUpdateAsync(clientCredentialsWithExpiringToken2);

// Wait for the task to complete
await Task.Delay(TimeSpan.FromSeconds(10));
Assert.Equal(cloudProxy, cloudConnection.CloudProxy.OrDefault());
Assert.True(cloudProxy.IsActive);
Mock.Get(client).Verify(c => c.OpenAsync(), Times.Once);
}

[Fact]
[Unit]
public async Task RefreshTokenTest()
Expand Down Expand Up @@ -421,6 +456,7 @@ static IClient GetMockDeviceClient()
deviceClient.Setup(dc => dc.CloseAsync())
.Callback(() => deviceClient.SetupGet(dc => dc.IsActive).Returns(false))
.Returns(Task.FromResult(true));
deviceClient.Setup(dc => dc.OpenAsync()).Returns(Task.CompletedTask);
return deviceClient.Object;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,27 @@ public async Task CloudProxyNullReceiverTest()
cloudProxy.StartListening();
}

[Fact]
public async Task TestCloseThrows()
{
// Arrange
var messageConverterProvider = Mock.Of<IMessageConverterProvider>();
string clientId = "d1";
var cloudListener = Mock.Of<ICloudListener>();
TimeSpan idleTimeout = TimeSpan.FromSeconds(60);
Action<string, CloudConnectionStatus> connectionStatusChangedHandler = (s, status) => { };
var client = new Mock<IClient>();
client.Setup(c => c.CloseAsync()).ThrowsAsync(new InvalidOperationException());
var cloudProxy = new CloudProxy(client.Object, messageConverterProvider, clientId, connectionStatusChangedHandler, cloudListener, idleTimeout, false);

// Act
bool result = await cloudProxy.CloseAsync();

// Assert.
Assert.True(result);
client.VerifyAll();
}

Task<ICloudProxy> GetCloudProxyWithConnectionStringKey(string connectionStringConfigKey) =>
GetCloudProxyWithConnectionStringKey(connectionStringConfigKey, Mock.Of<IEdgeHub>());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public async Task AuthenticateFailureTest()
// Arrange
var deviceIdentity = Mock.Of<IDeviceIdentity>(d => d.Id == "d1" && d.DeviceId == "d1");
IClientCredentials credentials = new TokenCredentials(deviceIdentity, Guid.NewGuid().ToString(), string.Empty);
var cloudProxy = Mock.Of<ICloudProxy>();
Mock.Get(cloudProxy).Setup(c => c.OpenAsync()).ThrowsAsync(new Exception("Unauthorized"));
var connectionManager = Mock.Of<IConnectionManager>(c => c.CreateCloudConnectionAsync(credentials) == Task.FromResult(Try.Success(cloudProxy)));
var connectionManager = Mock.Of<IConnectionManager>(c => c.CreateCloudConnectionAsync(credentials) == Task.FromResult(Try<ICloudProxy>.Failure(new TimeoutException())));
IAuthenticator cloudAuthenticator = new CloudTokenAuthenticator(connectionManager, IotHubHostName);

// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,9 @@ public async Task AuthenticateFromCacheTest()
public async Task NotAuthenticatedTest()
{
// Arrange
var cloudProxy = new Mock<ICloudProxy>();
cloudProxy.Setup(c => c.OpenAsync())
.Throws(new UnauthorizedException("Not authorized"));
var connectionManager = Mock.Of<IConnectionManager>(
c =>
c.CreateCloudConnectionAsync(It.IsAny<IClientCredentials>()) == Task.FromResult(Try.Success(cloudProxy.Object)));
c.CreateCloudConnectionAsync(It.IsAny<IClientCredentials>()) == Task.FromResult(Try<ICloudProxy>.Failure(new UnauthorizedException("Not authorized"))));

string iothubHostName = "iothub1.azure.net";
string callerProductInfo = "productInfo";
Expand All @@ -118,20 +115,16 @@ public async Task NotAuthenticatedTest()
Assert.False(isAuthenticated);
Mock.Verify(credentialsStore);
Mock.Verify(Mock.Get(connectionManager));
cloudProxy.VerifyAll();
}

[Unit]
[Fact]
public async Task CacheTokenExpiredNotAuthenticatedTest()
{
// Arrange
var cloudProxy = new Mock<ICloudProxy>();
cloudProxy.Setup(c => c.OpenAsync())
.Throws(new UnauthorizedException("Not authorized"));
var connectionManager = Mock.Of<IConnectionManager>(
c =>
c.CreateCloudConnectionAsync(It.IsAny<IClientCredentials>()) == Task.FromResult(Try.Success(cloudProxy.Object)));
c.CreateCloudConnectionAsync(It.IsAny<IClientCredentials>()) == Task.FromResult(Try<ICloudProxy>.Failure(new UnauthorizedException("Not authorized"))));

string iothubHostName = "iothub1.azure.net";
string callerProductInfo = "productInfo";
Expand All @@ -155,7 +148,6 @@ public async Task CacheTokenExpiredNotAuthenticatedTest()
Assert.False(isAuthenticated);
Mock.Verify(credentialsStore);
Mock.Verify(Mock.Get(connectionManager));
cloudProxy.VerifyAll();
}
}
}

0 comments on commit 6069f7f

Please sign in to comment.