Skip to content

Commit

Permalink
1.0.8: EdgeAgent: Fix error handling in GetTwin (#1351)
Browse files Browse the repository at this point in the history
* EdgeAgent: Fix error handling in GetTwin (#1329)

* Fix EdgeAgentConnection

* Add test

* Add tests

* Fix test

* Fix test and cleanup

* Add when not fatal
  • Loading branch information
varunpuranik authored and myagley committed Jun 18, 2019
1 parent a1b77bf commit 2c4bc2a
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
using Microsoft.Azure.Devices.Edge.Agent.Core.ConfigSources;
using Microsoft.Azure.Devices.Edge.Agent.Core.Requests;
using Microsoft.Azure.Devices.Edge.Agent.Core.Serde;
using Microsoft.Azure.Devices.Edge.Agent.IoTHub.Stream;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling;
Expand Down Expand Up @@ -163,14 +162,39 @@ await this.desiredProperties

// This method updates local state and should be called only after acquiring twinLock
async Task RefreshTwinAsync()
{
Events.TwinRefreshStart();
Option<Twin> twinOption = await this.GetTwinFromIoTHub();

await twinOption.ForEachAsync(
async twin =>
{
try
{
this.desiredProperties = Option.Some(twin.Properties.Desired);
this.reportedProperties = Option.Some(twin.Properties.Reported);
await this.UpdateDeploymentConfig(twin.Properties.Desired);
Events.TwinRefreshSuccess();
}
catch (Exception ex) when (!ex.IsFatal())
{
this.deploymentConfigInfo = Option.Some(new DeploymentConfigInfo(this.desiredProperties.Map(d => d.Version).GetOrElse(0), ex));
Events.TwinRefreshError(ex);
}
});
}

async Task<Option<Twin>> GetTwinFromIoTHub()
{
try
{
Events.TwinRefreshStart();

Events.GettingModuleClient();
IModuleClient moduleClient = await this.moduleConnection.GetOrCreateModuleClient();
Events.GotModuleClient();
async Task<Twin> GetTwinFunc()
{
Events.GettingModuleClient();
IModuleClient moduleClient = await this.moduleConnection.GetOrCreateModuleClient();
Events.GotModuleClient();
return await moduleClient.GetTwinAsync();
}

// if GetTwinAsync fails its possible that it might be due to transient network errors or because
// we are getting throttled by IoT Hub; if we didn't attempt a retry then this object would be
Expand All @@ -179,18 +203,14 @@ async Task RefreshTwinAsync()
// recover from this situation
var retryPolicy = new RetryPolicy(AllButFatalErrorDetectionStrategy, this.retryStrategy);
retryPolicy.Retrying += (_, args) => Events.RetryingGetTwin(args);
Twin twin = await retryPolicy.ExecuteAsync(moduleClient.GetTwinAsync);

Twin twin = await retryPolicy.ExecuteAsync(GetTwinFunc);
Events.GotTwin(twin);
this.desiredProperties = Option.Some(twin.Properties.Desired);
this.reportedProperties = Option.Some(twin.Properties.Reported);
await this.UpdateDeploymentConfig(twin.Properties.Desired);
Events.TwinRefreshSuccess();
return Option.Some(twin);
}
catch (Exception ex) when (!ex.IsFatal())
catch (Exception e) when (!e.IsFatal())
{
this.deploymentConfigInfo = Option.Some(new DeploymentConfigInfo(this.desiredProperties.Map(d => d.Version).GetOrElse(0), ex));
Events.TwinRefreshError(ex);
Events.ErrorGettingTwin(e);
return Option.None<Twin>();
}
}

Expand Down Expand Up @@ -344,6 +364,11 @@ public static void GotModuleClient()
Log.LogDebug((int)EventIds.GotModuleClient, "Got module client to refresh the twin");
}

public static void ErrorGettingTwin(Exception e)
{
Log.LogWarning((int)EventIds.RetryingGetTwin, e, "Error getting edge agent twin from IoTHub");
}

internal static void DesiredPropertiesUpdated()
{
Log.LogDebug((int)EventIds.DesiredPropertiesUpdated, "Edge agent desired properties updated callback invoked.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public async Task GetDeploymentConfigInfoIncludesExceptionWhenSchemaVersionDoesN

[Fact]
[Unit]
public async Task GetDeploymentConfigInfoAsyncIncludesExceptionWhenGetTwinThrows()
public async Task GetDeploymentConfigInfoAsyncIDoesNotIncludeExceptionWhenGetTwinThrows()
{
// Arrange
var deviceClient = new Mock<IModuleClient>();
Expand Down Expand Up @@ -662,9 +662,7 @@ public async Task GetDeploymentConfigInfoAsyncIncludesExceptionWhenGetTwinThrows
Option<DeploymentConfigInfo> deploymentConfigInfo = await connection.GetDeploymentConfigInfoAsync();

// Assert
Assert.True(deploymentConfigInfo.HasValue);
Assert.True(deploymentConfigInfo.OrDefault().Exception.HasValue);
Assert.IsType<InvalidOperationException>(deploymentConfigInfo.OrDefault().Exception.OrDefault());
Assert.False(deploymentConfigInfo.HasValue);
}

[Fact]
Expand Down Expand Up @@ -1092,6 +1090,209 @@ public async Task EdgeAgentConnectionRefreshTest()
}
}

[Fact]
[Unit]
public async Task GetTwinFailureDoesNotUpdateState()
{
// Arrange
var moduleDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(DockerDesiredModule) }
};

var edgeAgentDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(EdgeAgentDockerModule) }
};

var edgeHubDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(EdgeHubDockerModule) }
};

var runtimeInfoDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(DockerRuntimeInfo) }
};

var deserializerTypes = new Dictionary<Type, IDictionary<string, Type>>
{
[typeof(IModule)] = moduleDeserializerTypes,
[typeof(IEdgeAgentModule)] = edgeAgentDeserializerTypes,
[typeof(IEdgeHubModule)] = edgeHubDeserializerTypes,
[typeof(IRuntimeInfo)] = runtimeInfoDeserializerTypes,
};

ISerde<DeploymentConfig> serde = new TypeSpecificSerDe<DeploymentConfig>(deserializerTypes);

var runtimeInfo = new DockerRuntimeInfo("docker", new DockerRuntimeConfig("1.0", null));
var edgeAgentDockerModule = new EdgeAgentDockerModule("docker", new DockerConfig("image", string.Empty), null, null);
var edgeHubDockerModule = new EdgeHubDockerModule(
"docker",
ModuleStatus.Running,
RestartPolicy.Always,
new DockerConfig("image", string.Empty),
null,
null);
var deploymentConfig = new DeploymentConfig(
"1.0",
runtimeInfo,
new SystemModules(edgeAgentDockerModule, edgeHubDockerModule),
new Dictionary<string, IModule>());
string deploymentConfigJson = serde.Serialize(deploymentConfig);
var twin = new Twin(new TwinProperties { Desired = new TwinCollection(deploymentConfigJson) });

var moduleClient = new Mock<IModuleClient>();
moduleClient.Setup(m => m.GetTwinAsync())
.ReturnsAsync(twin);
moduleClient.SetupGet(m => m.IsActive).Returns(true);

var moduleClientProvider = new Mock<IModuleClientProvider>();
moduleClientProvider.Setup(m => m.Create(It.IsAny<ConnectionStatusChangesHandler>()))
.ReturnsAsync(moduleClient.Object);

IEnumerable<IRequestHandler> requestHandlers = new List<IRequestHandler>();
var retryStrategy = new FixedInterval(3, TimeSpan.FromSeconds(2));

// Act
using (var edgeAgentConnection = new EdgeAgentConnection(moduleClientProvider.Object, serde, new RequestManager(requestHandlers, DefaultRequestTimeout), true, TimeSpan.FromSeconds(10), retryStrategy))
{
await Task.Delay(TimeSpan.FromSeconds(3));
Option<DeploymentConfigInfo> receivedDeploymentConfigInfo = await edgeAgentConnection.GetDeploymentConfigInfoAsync();

// Assert
Assert.True(receivedDeploymentConfigInfo.HasValue);
Assert.False(receivedDeploymentConfigInfo.OrDefault().Exception.HasValue);
Assert.Equal(deploymentConfig, receivedDeploymentConfigInfo.OrDefault().DeploymentConfig);

moduleClient.Setup(m => m.GetTwinAsync())
.ThrowsAsync(new ObjectDisposedException("Dummy obj disp"));

await Task.Delay(TimeSpan.FromSeconds(12));

// Act
receivedDeploymentConfigInfo = await edgeAgentConnection.GetDeploymentConfigInfoAsync();

// Assert
moduleClient.Verify(m => m.GetTwinAsync(), Times.Exactly(5));
Assert.True(receivedDeploymentConfigInfo.HasValue);
Assert.False(receivedDeploymentConfigInfo.OrDefault().Exception.HasValue);
Assert.Equal(deploymentConfig, receivedDeploymentConfigInfo.OrDefault().DeploymentConfig);
}
}

[Fact]
[Unit]
public async Task GetTwinRetryLogicGetsNewClient()
{
// Arrange
var moduleDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(DockerDesiredModule) }
};

var edgeAgentDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(EdgeAgentDockerModule) }
};

var edgeHubDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(EdgeHubDockerModule) }
};

var runtimeInfoDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(DockerRuntimeInfo) }
};

var deserializerTypes = new Dictionary<Type, IDictionary<string, Type>>
{
[typeof(IModule)] = moduleDeserializerTypes,
[typeof(IEdgeAgentModule)] = edgeAgentDeserializerTypes,
[typeof(IEdgeHubModule)] = edgeHubDeserializerTypes,
[typeof(IRuntimeInfo)] = runtimeInfoDeserializerTypes,
};

ISerde<DeploymentConfig> serde = new TypeSpecificSerDe<DeploymentConfig>(deserializerTypes);

var runtimeInfo = new DockerRuntimeInfo("docker", new DockerRuntimeConfig("1.0", null));
var edgeAgentDockerModule = new EdgeAgentDockerModule("docker", new DockerConfig("image", string.Empty), null, null);
var edgeHubDockerModule = new EdgeHubDockerModule(
"docker",
ModuleStatus.Running,
RestartPolicy.Always,
new DockerConfig("image", string.Empty),
null,
null);
var deploymentConfig = new DeploymentConfig(
"1.0",
runtimeInfo,
new SystemModules(edgeAgentDockerModule, edgeHubDockerModule),
new Dictionary<string, IModule>());
string deploymentConfigJson = serde.Serialize(deploymentConfig);
var twin = new Twin(new TwinProperties { Desired = new TwinCollection(deploymentConfigJson) });

var edgeHubDockerModule2 = new EdgeHubDockerModule(
"docker",
ModuleStatus.Running,
RestartPolicy.Always,
new DockerConfig("image2", string.Empty),
null,
null);
var deploymentConfig2 = new DeploymentConfig(
"1.0",
runtimeInfo,
new SystemModules(edgeAgentDockerModule, edgeHubDockerModule2),
new Dictionary<string, IModule>());
string deploymentConfigJson2 = serde.Serialize(deploymentConfig2);
var twin2 = new Twin(new TwinProperties { Desired = new TwinCollection(deploymentConfigJson2) });

var moduleClient = new Mock<IModuleClient>();
moduleClient.Setup(m => m.GetTwinAsync())
.ReturnsAsync(twin);
moduleClient.SetupGet(m => m.IsActive).Returns(true);

var moduleClientProvider = new Mock<IModuleClientProvider>();
moduleClientProvider.Setup(m => m.Create(It.IsAny<ConnectionStatusChangesHandler>()))
.ReturnsAsync(moduleClient.Object);

IEnumerable<IRequestHandler> requestHandlers = new List<IRequestHandler>();
var retryStrategy = new FixedInterval(3, TimeSpan.FromSeconds(2));

// Act
using (var edgeAgentConnection = new EdgeAgentConnection(moduleClientProvider.Object, serde, new RequestManager(requestHandlers, DefaultRequestTimeout), true, TimeSpan.FromSeconds(10), retryStrategy))
{
await Task.Delay(TimeSpan.FromSeconds(3));
Option<DeploymentConfigInfo> receivedDeploymentConfigInfo = await edgeAgentConnection.GetDeploymentConfigInfoAsync();

// Assert
Assert.True(receivedDeploymentConfigInfo.HasValue);
Assert.False(receivedDeploymentConfigInfo.OrDefault().Exception.HasValue);
Assert.Equal(deploymentConfig, receivedDeploymentConfigInfo.OrDefault().DeploymentConfig);
Assert.NotEqual(deploymentConfig2, receivedDeploymentConfigInfo.OrDefault().DeploymentConfig);

moduleClient.SetupSequence(m => m.GetTwinAsync())
.ThrowsAsync(new ObjectDisposedException("Dummy obj disp"))
.ThrowsAsync(new ObjectDisposedException("Dummy obj disp 2"))
.ReturnsAsync(twin2);

await Task.Delay(TimeSpan.FromSeconds(12));

// Assert
moduleClient.Verify(m => m.GetTwinAsync(), Times.Exactly(4));

// Act
receivedDeploymentConfigInfo = await edgeAgentConnection.GetDeploymentConfigInfoAsync();

// Assert
Assert.True(receivedDeploymentConfigInfo.HasValue);
Assert.False(receivedDeploymentConfigInfo.OrDefault().Exception.HasValue);
Assert.Equal(deploymentConfig2, receivedDeploymentConfigInfo.OrDefault().DeploymentConfig);
Assert.NotEqual(deploymentConfig, receivedDeploymentConfigInfo.OrDefault().DeploymentConfig);
}
}

[Theory]
[Unit]
[InlineData("1.0", null)]
Expand Down

0 comments on commit 2c4bc2a

Please sign in to comment.