Skip to content

Commit

Permalink
EdgeAgent: Change twin refresh timer logic to loop (#799)
Browse files Browse the repository at this point in the history
* EA Loop

* Use periodic task

* Add tests

* Cleanup

* Clean up

* Cleanup

* Remove unnecessary test because of change in behavior.

* Cleanup

* Add logic around RefreshTwin
  • Loading branch information
varunpuranik authored Feb 13, 2019
1 parent 408f70d commit cb7af40
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
{
using System;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Agent.Core;
using Microsoft.Azure.Devices.Edge.Agent.Core.ConfigSources;
Expand All @@ -17,16 +16,17 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub

public class EdgeAgentConnection : IEdgeAgentConnection
{
const string PingMethodName = "ping";
internal static readonly Version ExpectedSchemaVersion = new Version("1.0");
const string PingMethodName = "ping";
static readonly TimeSpan DefaultConfigRefreshFrequency = TimeSpan.FromHours(1);
static readonly Task<MethodResponse> PingMethodResponse = Task.FromResult(new MethodResponse(200));
static readonly TimeSpan DeviceClientInitializationWaitTime = TimeSpan.FromSeconds(5);

readonly AsyncLock twinLock = new AsyncLock();
readonly ISerde<DeploymentConfig> desiredPropertiesSerDe;
readonly Task initTask;
readonly RetryStrategy retryStrategy;
readonly Timer refreshTimer;
readonly PeriodicTask refreshTwinTask;

Option<IModuleClient> deviceClient;
TwinCollection desiredProperties;
Expand Down Expand Up @@ -64,8 +64,7 @@ internal EdgeAgentConnection(
this.reportedProperties = Option.None<TwinCollection>();
this.deviceClient = Option.None<IModuleClient>();
this.retryStrategy = Preconditions.CheckNotNull(retryStrategy, nameof(retryStrategy));
this.refreshTimer = new Timer(refreshConfigFrequency.TotalMilliseconds);
this.refreshTimer.Elapsed += (_, __) => this.RefreshTimerElapsed();
this.refreshTwinTask = new PeriodicTask(this.ForceRefreshTwin, refreshConfigFrequency, refreshConfigFrequency, Events.Log, "refresh twin config");
this.initTask = this.CreateAndInitDeviceClient(Preconditions.CheckNotNull(moduleClientProvider, nameof(moduleClientProvider)));

Events.TwinRefreshInit(refreshConfigFrequency);
Expand All @@ -82,7 +81,7 @@ public async Task<Option<DeploymentConfigInfo>> GetDeploymentConfigInfoAsync()
public void Dispose()
{
this.deviceClient.ForEach(d => d.Dispose());
this.refreshTimer?.Dispose();
this.refreshTwinTask.Dispose();
}

public async Task UpdateReportedPropertiesAsync(TwinCollection patch)
Expand Down Expand Up @@ -111,14 +110,6 @@ internal static void ValidateSchemaVersion(string schemaVersion)
}
}

async void RefreshTimerElapsed() => await this.RefreshTwinAsync();

void ResetRefreshTimer()
{
this.refreshTimer.Stop();
this.refreshTimer.Start();
}

async Task CreateAndInitDeviceClient(IModuleClientProvider moduleClientProvider)
{
using (await this.twinLock.LockAsync())
Expand Down Expand Up @@ -173,6 +164,7 @@ async Task OnDesiredPropertiesUpdated(TwinCollection desiredPropertiesPatch, obj
}
}

// This method updates local state and should be called only after acquiring twinLock
async Task RefreshTwinAsync()
{
try
Expand All @@ -193,7 +185,6 @@ async Task RefreshTwinAsync()
this.reportedProperties = Option.Some(twin.Properties.Reported);
await this.UpdateDeploymentConfig();
Events.TwinRefreshSuccess();
this.ResetRefreshTimer();
}
catch (Exception ex) when (!ex.IsFatal())
{
Expand All @@ -202,6 +193,14 @@ async Task RefreshTwinAsync()
}
}

async Task ForceRefreshTwin()
{
using (await this.twinLock.LockAsync())
{
await this.RefreshTwinAsync();
}
}

// This method updates local state and should be called only after acquiring twinLock
async Task ApplyPatchAsync(TwinCollection patch)
{
Expand All @@ -211,7 +210,6 @@ async Task ApplyPatchAsync(TwinCollection patch)
this.desiredProperties = new TwinCollection(mergedJson);
await this.UpdateDeploymentConfig();
Events.DesiredPropertiesPatchApplied();
this.ResetRefreshTimer();
}
catch (Exception ex) when (!ex.IsFatal())
{
Expand Down Expand Up @@ -270,8 +268,8 @@ async Task<bool> WaitForDeviceClientInitialization() =>

static class Events
{
public static readonly ILogger Log = Logger.Factory.CreateLogger<EdgeAgentConnection>();
const int IdStart = AgentEventIds.EdgeAgentConnection;
static readonly ILogger Log = Logger.Factory.CreateLogger<EdgeAgentConnection>();

enum EventIds
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,97 +1255,6 @@ public async Task EdgeAgentConnectionRefreshTest()
}
}

[Fact]
[Unit]
public async Task EdgeAgentConnectionRefreshTest_NoRefresh()
{
// Arrange
var moduleDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(DockerDesiredModule) }
};

var edgeAgentDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(EdgeAgentDockerModule) }
};

var edgeHubDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(EdgeHubDockerModule) }
};

var runtimeInfoDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(DockerRuntimeInfo) }
};

var deserializerTypes = new Dictionary<Type, IDictionary<string, Type>>
{
[typeof(IModule)] = moduleDeserializerTypes,
[typeof(IEdgeAgentModule)] = edgeAgentDeserializerTypes,
[typeof(IEdgeHubModule)] = edgeHubDeserializerTypes,
[typeof(IRuntimeInfo)] = runtimeInfoDeserializerTypes,
};

ISerde<DeploymentConfig> serde = new TypeSpecificSerDe<DeploymentConfig>(deserializerTypes);

var runtimeInfo = new DockerRuntimeInfo("docker", new DockerRuntimeConfig("1.0", null));
var edgeAgentDockerModule = new EdgeAgentDockerModule("docker", new DockerConfig("image", string.Empty), null, null);
var edgeHubDockerModule = new EdgeHubDockerModule(
"docker",
ModuleStatus.Running,
RestartPolicy.Always,
new DockerConfig("image", string.Empty),
null,
null);
var deploymentConfig = new DeploymentConfig(
"1.0",
runtimeInfo,
new SystemModules(edgeAgentDockerModule, edgeHubDockerModule),
new Dictionary<string, IModule>());
long version = 1;
string deploymentConfigJson = serde.Serialize(deploymentConfig);
JObject deploymentConfigJobject = JObject.Parse(deploymentConfigJson);
deploymentConfigJobject.Add("$version", JToken.Parse($"{version}"));
var twinCollection = new TwinCollection(deploymentConfigJobject, new JObject());
var twin = new Twin(new TwinProperties { Desired = new TwinCollection(deploymentConfigJson) });

DesiredPropertyUpdateCallback desiredPropertyUpdateCallback = null;
var moduleClient = new Mock<IModuleClient>();
moduleClient.Setup(m => m.GetTwinAsync())
.ReturnsAsync(twin);
moduleClient.Setup(m => m.SetDesiredPropertyUpdateCallbackAsync(It.IsAny<DesiredPropertyUpdateCallback>()))
.Callback<DesiredPropertyUpdateCallback>(d => desiredPropertyUpdateCallback = d)
.Returns(Task.CompletedTask);

var moduleClientProvider = new Mock<IModuleClientProvider>();
Func<IModuleClient, Task> updateModuleClient = null;
moduleClientProvider.Setup(m => m.Create(It.IsAny<ConnectionStatusChangesHandler>(), It.IsAny<Func<IModuleClient, Task>>()))
.Callback<ConnectionStatusChangesHandler, Func<IModuleClient, Task>>((c, f) => updateModuleClient = f)
.ReturnsAsync(moduleClient.Object);

// Act
using (var edgeAgentConnection = new EdgeAgentConnection(moduleClientProvider.Object, serde, TimeSpan.FromSeconds(5)))
{
await Task.Delay(TimeSpan.FromSeconds(0.5));
Assert.NotNull(updateModuleClient);

await updateModuleClient(moduleClient.Object);
Assert.NotNull(desiredPropertyUpdateCallback);

await Task.Delay(TimeSpan.FromSeconds(3));
JObject patchConfigJobject = JObject.Parse(deploymentConfigJson);
patchConfigJobject.Add("$version", JToken.Parse($"{version + 1}"));
var patchTwinCollection = new TwinCollection(deploymentConfigJobject, new JObject());
await desiredPropertyUpdateCallback(patchTwinCollection, null);
await Task.Delay(TimeSpan.FromSeconds(4));

// Assert
moduleClient.Verify(m => m.GetTwinAsync(), Times.Once);
}
}

[Theory]
[Unit]
[InlineData("1.0", null)]
Expand Down
108 changes: 108 additions & 0 deletions edge-util/src/Microsoft.Azure.Devices.Edge.Util/PeriodicTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Util
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

public class PeriodicTask : IDisposable
{
readonly Func<CancellationToken, Task> work;
readonly TimeSpan frequency;
readonly TimeSpan startAfter;
readonly object stateLock = new object();
readonly ILogger logger;
readonly string operationName;
readonly Timer checkTimer;
readonly CancellationTokenSource cts = new CancellationTokenSource();

Task currentTask;

public PeriodicTask(
Func<CancellationToken, Task> work,
TimeSpan frequency,
TimeSpan startAfter,
ILogger logger,
string operationName)
{
Preconditions.CheckArgument(frequency > TimeSpan.Zero, "Frequency should be > 0");
Preconditions.CheckArgument(startAfter >= TimeSpan.Zero, "startAfter should be >= 0");

this.work = Preconditions.CheckNotNull(work, nameof(work));
this.frequency = frequency;
this.startAfter = startAfter;
this.logger = Preconditions.CheckNotNull(logger, nameof(logger));
this.operationName = Preconditions.CheckNonWhiteSpace(operationName, nameof(operationName));
this.currentTask = this.DoWork();
this.checkTimer = new Timer(this.EnsureWork, null, startAfter, frequency);
this.logger.LogInformation($"Started operation {this.operationName}");
}

public PeriodicTask(
Func<Task> work,
TimeSpan frequency,
TimeSpan startAfter,
ILogger logger,
string operationName)
: this(_ => Preconditions.CheckNotNull(work, nameof(work))(), frequency, startAfter, logger, operationName)
{
}

/// <summary>
/// Do not dispose the task here in case it hasn't completed.
/// </summary>
public void Dispose()
{
this.checkTimer?.Dispose();
this.cts?.Cancel();
this.cts?.Dispose();
}

/// <summary>
/// The current task should never complete, but in case it does, this makes sure it is started again.
/// </summary>
void EnsureWork(object state)
{
lock (this.stateLock)
{
if (this.currentTask == null || this.currentTask.IsCompleted)
{
this.logger.LogInformation($"Periodic operation {this.operationName}, is not running. Attempting to start again...");
this.currentTask = this.DoWork();
this.logger.LogInformation($"Started operation {this.operationName}");
}
}
}

async Task DoWork()
{
try
{
CancellationToken cancellationToken = this.cts.Token;
await Task.Delay(this.startAfter, cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
try
{
this.logger.LogInformation($"Starting periodic operation {this.operationName}...");
await this.work(cancellationToken);
this.logger.LogInformation($"Successfully completed periodic operation {this.operationName}");
}
catch (Exception e)
{
this.logger.LogWarning(e, $"Error in periodic operation {this.operationName}");
}

await Task.Delay(this.frequency, cancellationToken);
}

this.logger.LogDebug($"Periodic operation {this.operationName} cancelled");
}
catch (Exception ex)
{
this.logger.LogError(ex, $"Unexpected error in periodic operation {this.operationName}");
}
}
}
}
Loading

0 comments on commit cb7af40

Please sign in to comment.