Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EdgeAgent: Change twin refresh timer logic to loop #799

Merged
merged 13 commits into from
Feb 13, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub
{
using System;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Agent.Core;
using Microsoft.Azure.Devices.Edge.Agent.Core.ConfigSources;
Expand All @@ -17,16 +16,17 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub

public class EdgeAgentConnection : IEdgeAgentConnection
{
const string PingMethodName = "ping";
internal static readonly Version ExpectedSchemaVersion = new Version("1.0");
const string PingMethodName = "ping";
static readonly TimeSpan DefaultConfigRefreshFrequency = TimeSpan.FromHours(1);
static readonly Task<MethodResponse> PingMethodResponse = Task.FromResult(new MethodResponse(200));
static readonly TimeSpan DeviceClientInitializationWaitTime = TimeSpan.FromSeconds(5);

readonly AsyncLock twinLock = new AsyncLock();
readonly ISerde<DeploymentConfig> desiredPropertiesSerDe;
readonly Task initTask;
readonly RetryStrategy retryStrategy;
readonly Timer refreshTimer;
readonly PeriodicTask refreshTwinTask;

Option<IModuleClient> deviceClient;
TwinCollection desiredProperties;
Expand Down Expand Up @@ -64,8 +64,7 @@ internal EdgeAgentConnection(
this.reportedProperties = Option.None<TwinCollection>();
this.deviceClient = Option.None<IModuleClient>();
this.retryStrategy = Preconditions.CheckNotNull(retryStrategy, nameof(retryStrategy));
this.refreshTimer = new Timer(refreshConfigFrequency.TotalMilliseconds);
this.refreshTimer.Elapsed += (_, __) => this.RefreshTimerElapsed();
this.refreshTwinTask = new PeriodicTask(this.ForceRefreshTwin, refreshConfigFrequency, refreshConfigFrequency, Events.Log, "refresh twin config");
this.initTask = this.CreateAndInitDeviceClient(Preconditions.CheckNotNull(moduleClientProvider, nameof(moduleClientProvider)));

Events.TwinRefreshInit(refreshConfigFrequency);
Expand All @@ -82,7 +81,7 @@ public async Task<Option<DeploymentConfigInfo>> GetDeploymentConfigInfoAsync()
public void Dispose()
{
this.deviceClient.ForEach(d => d.Dispose());
this.refreshTimer?.Dispose();
this.refreshTwinTask.Dispose();
}

public async Task UpdateReportedPropertiesAsync(TwinCollection patch)
Expand Down Expand Up @@ -111,14 +110,6 @@ internal static void ValidateSchemaVersion(string schemaVersion)
}
}

async void RefreshTimerElapsed() => await this.RefreshTwinAsync();

void ResetRefreshTimer()
{
this.refreshTimer.Stop();
this.refreshTimer.Start();
}

async Task CreateAndInitDeviceClient(IModuleClientProvider moduleClientProvider)
{
using (await this.twinLock.LockAsync())
Expand Down Expand Up @@ -173,6 +164,7 @@ async Task OnDesiredPropertiesUpdated(TwinCollection desiredPropertiesPatch, obj
}
}

// This method updates local state and should be called only after acquiring twinLock
async Task RefreshTwinAsync()
{
try
Expand All @@ -193,7 +185,6 @@ async Task RefreshTwinAsync()
this.reportedProperties = Option.Some(twin.Properties.Reported);
await this.UpdateDeploymentConfig();
Events.TwinRefreshSuccess();
this.ResetRefreshTimer();
}
catch (Exception ex) when (!ex.IsFatal())
{
Expand All @@ -202,6 +193,14 @@ async Task RefreshTwinAsync()
}
}

async Task ForceRefreshTwin()
{
using (await this.twinLock.LockAsync())
{
await this.RefreshTwinAsync();
}
}

// This method updates local state and should be called only after acquiring twinLock
async Task ApplyPatchAsync(TwinCollection patch)
{
Expand All @@ -211,7 +210,6 @@ async Task ApplyPatchAsync(TwinCollection patch)
this.desiredProperties = new TwinCollection(mergedJson);
await this.UpdateDeploymentConfig();
Events.DesiredPropertiesPatchApplied();
this.ResetRefreshTimer();
}
catch (Exception ex) when (!ex.IsFatal())
{
Expand Down Expand Up @@ -270,8 +268,8 @@ async Task<bool> WaitForDeviceClientInitialization() =>

static class Events
{
public static readonly ILogger Log = Logger.Factory.CreateLogger<EdgeAgentConnection>();
const int IdStart = AgentEventIds.EdgeAgentConnection;
static readonly ILogger Log = Logger.Factory.CreateLogger<EdgeAgentConnection>();

enum EventIds
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,97 +1255,6 @@ public async Task EdgeAgentConnectionRefreshTest()
}
}

[Fact]
[Unit]
public async Task EdgeAgentConnectionRefreshTest_NoRefresh()
{
// Arrange
var moduleDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(DockerDesiredModule) }
};

var edgeAgentDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(EdgeAgentDockerModule) }
};

var edgeHubDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(EdgeHubDockerModule) }
};

var runtimeInfoDeserializerTypes = new Dictionary<string, Type>
{
{ DockerType, typeof(DockerRuntimeInfo) }
};

var deserializerTypes = new Dictionary<Type, IDictionary<string, Type>>
{
[typeof(IModule)] = moduleDeserializerTypes,
[typeof(IEdgeAgentModule)] = edgeAgentDeserializerTypes,
[typeof(IEdgeHubModule)] = edgeHubDeserializerTypes,
[typeof(IRuntimeInfo)] = runtimeInfoDeserializerTypes,
};

ISerde<DeploymentConfig> serde = new TypeSpecificSerDe<DeploymentConfig>(deserializerTypes);

var runtimeInfo = new DockerRuntimeInfo("docker", new DockerRuntimeConfig("1.0", null));
var edgeAgentDockerModule = new EdgeAgentDockerModule("docker", new DockerConfig("image", string.Empty), null, null);
var edgeHubDockerModule = new EdgeHubDockerModule(
"docker",
ModuleStatus.Running,
RestartPolicy.Always,
new DockerConfig("image", string.Empty),
null,
null);
var deploymentConfig = new DeploymentConfig(
"1.0",
runtimeInfo,
new SystemModules(edgeAgentDockerModule, edgeHubDockerModule),
new Dictionary<string, IModule>());
long version = 1;
string deploymentConfigJson = serde.Serialize(deploymentConfig);
JObject deploymentConfigJobject = JObject.Parse(deploymentConfigJson);
deploymentConfigJobject.Add("$version", JToken.Parse($"{version}"));
var twinCollection = new TwinCollection(deploymentConfigJobject, new JObject());
var twin = new Twin(new TwinProperties { Desired = new TwinCollection(deploymentConfigJson) });

DesiredPropertyUpdateCallback desiredPropertyUpdateCallback = null;
var moduleClient = new Mock<IModuleClient>();
moduleClient.Setup(m => m.GetTwinAsync())
.ReturnsAsync(twin);
moduleClient.Setup(m => m.SetDesiredPropertyUpdateCallbackAsync(It.IsAny<DesiredPropertyUpdateCallback>()))
.Callback<DesiredPropertyUpdateCallback>(d => desiredPropertyUpdateCallback = d)
.Returns(Task.CompletedTask);

var moduleClientProvider = new Mock<IModuleClientProvider>();
Func<IModuleClient, Task> updateModuleClient = null;
moduleClientProvider.Setup(m => m.Create(It.IsAny<ConnectionStatusChangesHandler>(), It.IsAny<Func<IModuleClient, Task>>()))
.Callback<ConnectionStatusChangesHandler, Func<IModuleClient, Task>>((c, f) => updateModuleClient = f)
.ReturnsAsync(moduleClient.Object);

// Act
using (var edgeAgentConnection = new EdgeAgentConnection(moduleClientProvider.Object, serde, TimeSpan.FromSeconds(5)))
{
await Task.Delay(TimeSpan.FromSeconds(0.5));
Assert.NotNull(updateModuleClient);

await updateModuleClient(moduleClient.Object);
Assert.NotNull(desiredPropertyUpdateCallback);

await Task.Delay(TimeSpan.FromSeconds(3));
JObject patchConfigJobject = JObject.Parse(deploymentConfigJson);
patchConfigJobject.Add("$version", JToken.Parse($"{version + 1}"));
var patchTwinCollection = new TwinCollection(deploymentConfigJobject, new JObject());
await desiredPropertyUpdateCallback(patchTwinCollection, null);
await Task.Delay(TimeSpan.FromSeconds(4));

// Assert
moduleClient.Verify(m => m.GetTwinAsync(), Times.Once);
}
}

[Theory]
[Unit]
[InlineData("1.0", null)]
Expand Down
108 changes: 108 additions & 0 deletions edge-util/src/Microsoft.Azure.Devices.Edge.Util/PeriodicTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Util
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

public class PeriodicTask : IDisposable
{
readonly Func<CancellationToken, Task> work;
readonly TimeSpan frequency;
readonly TimeSpan startAfter;
readonly object stateLock = new object();
readonly ILogger logger;
readonly string operationName;
readonly Timer checkTimer;
readonly CancellationTokenSource cts = new CancellationTokenSource();

Task currentTask;

public PeriodicTask(
Func<CancellationToken, Task> work,
TimeSpan frequency,
TimeSpan startAfter,
ILogger logger,
string operationName)
{
Preconditions.CheckArgument(frequency > TimeSpan.Zero, "Frequency should be > 0");
Preconditions.CheckArgument(startAfter >= TimeSpan.Zero, "startAfter should be >= 0");

this.work = Preconditions.CheckNotNull(work, nameof(work));
this.frequency = frequency;
this.startAfter = startAfter;
this.logger = Preconditions.CheckNotNull(logger, nameof(logger));
this.operationName = Preconditions.CheckNonWhiteSpace(operationName, nameof(operationName));
this.currentTask = this.DoWork();
this.checkTimer = new Timer(this.EnsureWork, null, startAfter, frequency);
this.logger.LogInformation($"Started operation {this.operationName}");
}

public PeriodicTask(
Func<Task> work,
TimeSpan frequency,
TimeSpan startAfter,
ILogger logger,
string operationName)
: this(_ => Preconditions.CheckNotNull(work, nameof(work))(), frequency, startAfter, logger, operationName)
{
}

/// <summary>
/// Do not dispose the task here in case it hasn't completed.
/// </summary>
public void Dispose()
{
this.checkTimer?.Dispose();
this.cts?.Cancel();
this.cts?.Dispose();
}

/// <summary>
/// The current task should never complete, but in case it does, this makes sure it is started again.
/// </summary>
void EnsureWork(object state)
{
lock (this.stateLock)
{
if (this.currentTask == null || this.currentTask.IsCompleted)
{
this.logger.LogInformation($"Periodic operation {this.operationName}, is not running. Attempting to start again...");
this.currentTask = this.DoWork();
this.logger.LogInformation($"Started operation {this.operationName}");
}
}
}

async Task DoWork()
{
try
{
CancellationToken cancellationToken = this.cts.Token;
await Task.Delay(this.startAfter, cancellationToken);
while (!cancellationToken.IsCancellationRequested)
varunpuranik marked this conversation as resolved.
Show resolved Hide resolved
{
try
{
this.logger.LogInformation($"Starting periodic operation {this.operationName}...");
await this.work(cancellationToken);
this.logger.LogInformation($"Successfully completed periodic operation {this.operationName}");
}
catch (Exception e)
{
this.logger.LogWarning(e, $"Error in periodic operation {this.operationName}");
varunpuranik marked this conversation as resolved.
Show resolved Hide resolved
}

await Task.Delay(this.frequency, cancellationToken);
}

this.logger.LogDebug($"Periodic operation {this.operationName} cancelled");
}
catch (Exception ex)
{
this.logger.LogError(ex, $"Unexpected error in periodic operation {this.operationName}");
}
}
}
}
Loading