Skip to content

Commit

Permalink
feat(hub-svc): add support and tests for configurations export (#2250)
Browse files Browse the repository at this point in the history
  • Loading branch information
David R. Williamson authored Dec 15, 2021
1 parent 5245797 commit d22f9f3
Show file tree
Hide file tree
Showing 12 changed files with 511 additions and 252 deletions.
29 changes: 0 additions & 29 deletions e2e/test/helpers/ImportExportDevicesHelpers.cs

This file was deleted.

30 changes: 30 additions & 0 deletions e2e/test/helpers/ImportExportHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;
using System.IO;
using System.Text;
using Newtonsoft.Json;

namespace Microsoft.Azure.Devices.E2ETests.Helpers
{
internal static class ImportExportHelpers
{
/// <summary>
/// Makes a stream compatible for writing to a storage blob of serialized, newline-delimited rows of the specified objects.
/// </summary>
/// <param name="items">The objects to serialize.</param>
public static Stream BuildImportStream<T>(IReadOnlyList<T> items)
{
var itemsFileSb = new StringBuilder();

foreach (T item in items)
{
itemsFileSb.AppendLine(JsonConvert.SerializeObject(item));
}

byte[] itemsFileInBytes = Encoding.Default.GetBytes(itemsFileSb.ToString());
return new MemoryStream(itemsFileInBytes);
}
}
}
308 changes: 206 additions & 102 deletions e2e/test/iothub/service/RegistryManagerExportDevicesTests.cs

Large diffs are not rendered by default.

167 changes: 114 additions & 53 deletions e2e/test/iothub/service/RegistryManagerImportDevicesTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.IO;
Expand Down Expand Up @@ -46,23 +47,28 @@ public async Task RegistryManager_ImportDevices(StorageAuthenticationType storag
{
// arrange

string deviceId = $"{nameof(RegistryManager_ImportDevices)}-device-{StorageContainer.GetRandomSuffix(4)}";
string devicesFileName = $"{nameof(RegistryManager_ImportDevices)}-{StorageContainer.GetRandomSuffix(4)}.txt";
using var registryManager = RegistryManager.CreateFromConnectionString(TestConfiguration.IoTHub.ConnectionString);
const string idPrefix = nameof(RegistryManager_ImportDevices);

string deviceId = $"{idPrefix}-device-{StorageContainer.GetRandomSuffix(4)}";
string configId = $"{idPrefix}-config-{StorageContainer.GetRandomSuffix(4)}".ToLower(); // Configuration Id characters must be all lower-case.
Logger.Trace($"Using Ids {deviceId} and {configId}.");

Logger.Trace($"Using deviceId {deviceId}.");
string devicesFileName = $"{idPrefix}-devices-{StorageContainer.GetRandomSuffix(4)}.txt";
string configsFileName = $"{idPrefix}-configs-{StorageContainer.GetRandomSuffix(4)}.txt";

using RegistryManager registryManager = RegistryManager.CreateFromConnectionString(TestConfiguration.IoTHub.ConnectionString);

try
{
string containerName = StorageContainer.BuildContainerName(nameof(RegistryManager_ImportDevices));
using StorageContainer storageContainer = await StorageContainer.GetInstanceAsync(containerName).ConfigureAwait(false);
Logger.Trace($"Using container {storageContainer.Uri}");
Logger.Trace($"Using devices container {storageContainer.Uri}");

Uri containerUri = storageAuthenticationType == StorageAuthenticationType.KeyBased
? storageContainer.SasUri
: storageContainer.Uri;

using Stream devicesStream = ImportExportDevicesHelpers.BuildDevicesStream(
using Stream devicesStream = ImportExportHelpers.BuildImportStream(
new List<ExportImportDevice>
{
new ExportImportDevice(
Expand All @@ -74,54 +80,44 @@ public async Task RegistryManager_ImportDevices(StorageAuthenticationType storag
});
await UploadFileAndConfirmAsync(storageContainer, devicesStream, devicesFileName).ConfigureAwait(false);

// act

JobProperties importJobResponse = null;
int tryCount = 0;
while (true)
{
try
using Stream configsStream = ImportExportHelpers.BuildImportStream(
new List<ImportConfiguration>
{
ManagedIdentity identity = null;
if (isUserAssignedMsi)
new ImportConfiguration(configId)
{
string userAssignedMsiResourceId = TestConfiguration.IoTHub.UserAssignedMsiResourceId;
identity = new ManagedIdentity
ImportMode = ConfigurationImportMode.CreateOrUpdateIfMatchETag,
Priority = 3,
Labels = { { "labelName", "labelValue" } },
TargetCondition = "*",
Content =
{
userAssignedIdentity = userAssignedMsiResourceId
};
}

importJobResponse = await registryManager
.ImportDevicesAsync(
JobProperties.CreateForImportJob(
containerUri.ToString(),
containerUri.ToString(),
devicesFileName,
storageAuthenticationType))
.ConfigureAwait(false);
break;
}
// Concurrent jobs can be rejected, so implement a retry mechanism to handle conflicts with other tests
catch (JobQuotaExceededException) when (++tryCount < MaxIterationWait)
{
Logger.Trace($"JobQuotaExceededException... waiting.");
await Task.Delay(s_waitDuration).ConfigureAwait(false);
continue;
}
}
DeviceContent = { { "properties.desired.x", 5L } },
},
Metrics =
{
Queries = { { "successfullyConfigured", "select deviceId from devices where properties.reported.x = 5" } }
},
},
});
await UploadFileAndConfirmAsync(storageContainer, configsStream, configsFileName).ConfigureAwait(false);

// wait for job to complete
for (int i = 0; i < MaxIterationWait; ++i)
{
await Task.Delay(1000).ConfigureAwait(false);
importJobResponse = await registryManager.GetJobAsync(importJobResponse.JobId).ConfigureAwait(false);
Logger.Trace($"Job {importJobResponse.JobId} is {importJobResponse.Status} with progress {importJobResponse.Progress}%");
if (!s_incompleteJobs.Contains(importJobResponse.Status))
ManagedIdentity identity = isUserAssignedMsi
? new ManagedIdentity
{
break;
UserAssignedIdentity = TestConfiguration.IoTHub.UserAssignedMsiResourceId
}
}
: null;

// act

JobProperties importJobResponse = await CreateAndWaitForJobAsync(
storageAuthenticationType,
devicesFileName,
configsFileName,
registryManager,
containerUri,
identity)
.ConfigureAwait(false);

// assert

Expand All @@ -130,38 +126,48 @@ public async Task RegistryManager_ImportDevices(StorageAuthenticationType storag

// should not throw due to 404, but device may not immediately appear in registry
Device device = null;
Configuration config = null;
for (int i = 0; i < MaxIterationWait; ++i)
{
await Task.Delay(s_waitDuration).ConfigureAwait(false);
try
{
device = await registryManager.GetDeviceAsync(deviceId).ConfigureAwait(false);
config = await registryManager.GetConfigurationAsync(configId).ConfigureAwait(false);
break;
}
catch (Exception ex)
{
Logger.Trace($"Could not find device on iteration {i} due to [{ex.Message}]");
Logger.Trace($"Could not find device/config on iteration {i} due to [{ex.Message}]");
}
}
if (device == null)
{
Assert.Fail($"Device {deviceId} not found in registry manager");
}
if (config == null)
{
Assert.Fail($"Config {configId} not found in registry manager");
}
}
finally
{
try
{
await registryManager.RemoveDeviceAsync(deviceId).ConfigureAwait(false);
await registryManager.RemoveConfigurationAsync(configId).ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.Trace($"Failed to clean up device/config due to {ex}");
}
catch { }
}
}

private static async Task UploadFileAndConfirmAsync(StorageContainer storageContainer, Stream devicesFile, string fileName)
private static async Task UploadFileAndConfirmAsync(StorageContainer storageContainer, Stream fileContents, string fileName)
{
CloudBlockBlob cloudBlob = storageContainer.CloudBlobContainer.GetBlockBlobReference(fileName);
await cloudBlob.UploadFromStreamAsync(devicesFile).ConfigureAwait(false);
await cloudBlob.UploadFromStreamAsync(fileContents).ConfigureAwait(false);

// wait for blob to be written
bool foundBlob = false;
Expand All @@ -174,7 +180,62 @@ private static async Task UploadFileAndConfirmAsync(StorageContainer storageCont
break;
}
}
foundBlob.Should().BeTrue($"Failed to find {fileName} in storage container, required for test.");
foundBlob.Should().BeTrue($"Failed to find {fileName} in storage container - required for test.");
}

private async Task<JobProperties> CreateAndWaitForJobAsync(
StorageAuthenticationType storageAuthenticationType,
string devicesFileName,
string configsFileName,
RegistryManager registryManager,
Uri containerUri,
ManagedIdentity identity)
{
int tryCount = 0;
JobProperties importJobResponse = null;

JobProperties jobProperties = JobProperties.CreateForImportJob(
containerUri.ToString(),
containerUri.ToString(),
devicesFileName,
storageAuthenticationType,
identity);
jobProperties.ConfigurationsBlobName = configsFileName;
jobProperties.IncludeConfigurations = true;

while (tryCount < MaxIterationWait)
{
try
{
importJobResponse = await registryManager.ImportDevicesAsync(jobProperties).ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(importJobResponse.FailureReason))
{
Logger.Trace($"Job failed due to {importJobResponse.FailureReason}");
}
break;
}
// Concurrent jobs can be rejected, so implement a retry mechanism to handle conflicts with other tests
catch (JobQuotaExceededException) when (++tryCount < MaxIterationWait)
{
Logger.Trace($"JobQuotaExceededException... waiting.");
await Task.Delay(s_waitDuration).ConfigureAwait(false);
continue;
}
}

// wait for job to complete
for (int i = 0; i < MaxIterationWait; ++i)
{
await Task.Delay(1000).ConfigureAwait(false);
importJobResponse = await registryManager.GetJobAsync(importJobResponse?.JobId).ConfigureAwait(false);
Logger.Trace($"Job {importJobResponse.JobId} is {importJobResponse.Status} with progress {importJobResponse.Progress}%");
if (!s_incompleteJobs.Contains(importJobResponse.Status))
{
break;
}
}

return importJobResponse;
}
}
}
Loading

0 comments on commit d22f9f3

Please sign in to comment.