Skip to content

Commit

Permalink
Moves Cosmos Export to QueueClient (#3169)
Browse files Browse the repository at this point in the history
* Moves Cosmos Export to QueueClient
* Existing Cosmos Export jobs will continue to run under the old model (LegacyExportJobWorkerBackgroundService) to avoid disruption or file name changes

Warning:
* File names in Cosmos export will change for new jobs
* Existing FHIR Servier - Cosmos installations will need to enable the `TaskHosting__Enabled` setting to ensure new export jobs run
  • Loading branch information
brendankowitz authored Jun 5, 2023
1 parent b6be3a1 commit e261f9c
Show file tree
Hide file tree
Showing 49 changed files with 694 additions and 729 deletions.
2 changes: 1 addition & 1 deletion samples/templates/default-azuredeploy-docker.json
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@
"SqlServer__Initialize": "[equals(parameters('solutionType'),'FhirServerSqlServer')]",
"SqlServer__SchemaOptions__AutomaticUpdatesEnabled": "[if(equals(parameters('sqlSchemaAutomaticUpdatesEnabled'),'auto'), true(), false())]",
"DataStore": "[if(equals(parameters('solutionType'),'FhirServerCosmosDB'), 'CosmosDb', 'SqlServer')]",
"TaskHosting__Enabled": "[if(equals(parameters('solutionType'),'FhirServerCosmosDB'), false(), parameters('enableImport'))]",
"TaskHosting__Enabled": "[true()]",
"TaskHosting__MaxRunningTaskCount": "[parameters('backgroundTaskCount')]",
"FhirServer__Operations__IntegrationDataStore__StorageAccountUri": "[if(parameters('enableImport'), concat('https://', variables('storageAccountName'), variables('blobStorageUri')), 'null')]",
"FhirServer__Operations__Export__Enabled": "[parameters('enableExport')]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@
namespace Microsoft.Health.Fhir.Api.Features.Operations.Export
{
/// <summary>
/// The background service used to host the <see cref="ExportJobWorker"/>.
/// The background service used to host the <see cref="LegacyExportJobWorker"/>.
/// </summary>
public class ExportJobWorkerBackgroundService : BackgroundService
public class LegacyExportJobWorkerBackgroundService : BackgroundService
{
private readonly ExportJobWorker _exportJobWorker;
private readonly LegacyExportJobWorker _legacyExportJobWorker;
private readonly ExportJobConfiguration _exportJobConfiguration;

public ExportJobWorkerBackgroundService(ExportJobWorker exportJobWorker, IOptions<ExportJobConfiguration> exportJobConfiguration)
public LegacyExportJobWorkerBackgroundService(LegacyExportJobWorker legacyExportJobWorker, IOptions<ExportJobConfiguration> exportJobConfiguration)
{
EnsureArg.IsNotNull(exportJobWorker, nameof(exportJobWorker));
EnsureArg.IsNotNull(legacyExportJobWorker, nameof(legacyExportJobWorker));
EnsureArg.IsNotNull(exportJobConfiguration?.Value, nameof(exportJobConfiguration));

_exportJobWorker = exportJobWorker;
_legacyExportJobWorker = legacyExportJobWorker;
_exportJobConfiguration = exportJobConfiguration.Value;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (_exportJobConfiguration.Enabled)
{
await _exportJobWorker.ExecuteAsync(stoppingToken);
await _legacyExportJobWorker.ExecuteAsync(stoppingToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Messages.Export;
using Microsoft.Health.Fhir.Core.Messages.Storage;
using Microsoft.Health.Fhir.Core.UnitTests.Extensions;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Test.Utilities;
using NSubstitute;
Expand All @@ -32,12 +33,12 @@ public class ExportJobWorkerTests
private static readonly TimeSpan DefaultJobHeartbeatTimeoutThreshold = TimeSpan.FromMinutes(10);
private static readonly TimeSpan DefaultJobPollingFrequency = TimeSpan.FromMilliseconds(100);

private readonly IFhirOperationDataStore _fhirOperationDataStore = Substitute.For<IFhirOperationDataStore>();
private readonly ILegacyExportOperationDataStore _fhirOperationDataStore = Substitute.For<ILegacyExportOperationDataStore>();
private readonly ExportJobConfiguration _exportJobConfiguration = new ExportJobConfiguration();
private readonly Func<IExportJobTask> _exportJobTaskFactory = Substitute.For<Func<IExportJobTask>>();
private readonly IExportJobTask _task = Substitute.For<IExportJobTask>();

private readonly ExportJobWorker _exportJobWorker;
private readonly LegacyExportJobWorker _legacyExportJobWorker;

private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly CancellationToken _cancellationToken;
Expand All @@ -49,16 +50,14 @@ public ExportJobWorkerTests()
_exportJobConfiguration.JobPollingFrequency = DefaultJobPollingFrequency;

_exportJobTaskFactory().Returns(_task);
var scopedOperationDataStore = Substitute.For<IScoped<IFhirOperationDataStore>>();
scopedOperationDataStore.Value.Returns(_fhirOperationDataStore);

_exportJobWorker = new ExportJobWorker(
() => scopedOperationDataStore,
_legacyExportJobWorker = new LegacyExportJobWorker(
() => _fhirOperationDataStore.CreateMockScope(),
Options.Create(_exportJobConfiguration),
_exportJobTaskFactory,
NullLogger<ExportJobWorker>.Instance);
NullLogger<LegacyExportJobWorker>.Instance);

_exportJobWorker.Handle(new StorageInitializedNotification(), CancellationToken.None);
_legacyExportJobWorker.Handle(new StorageInitializedNotification(), CancellationToken.None);
_cancellationToken = _cancellationTokenSource.Token;
}

Expand All @@ -71,7 +70,7 @@ public async Task GivenThereIsNoRunningJob_WhenExecuted_ThenATaskShouldBeCreated

_cancellationTokenSource.CancelAfter(DefaultJobPollingFrequency);

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

_exportJobTaskFactory().Received(1);
}
Expand All @@ -87,7 +86,7 @@ public async Task GivenTheNumberOfRunningJobExceedsThreshold_WhenExecuted_ThenAT

_cancellationTokenSource.CancelAfter(DefaultJobPollingFrequency * 2);

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

_exportJobTaskFactory.Received(1);
}
Expand Down Expand Up @@ -131,33 +130,33 @@ public async Task GivenTheNumberOfRunningJobDoesNotExceedThreshold_WhenExecuted_
// In case the task was not called, cancel the worker after certain period of time.
_cancellationTokenSource.CancelAfter(DefaultJobPollingFrequency * 3);

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

Assert.True(isSecondJobCalled);
}

[Fact]
public async Task GivenAcquireExportJobThrowsException_WhenExecuted_ThenWeHaveADelayBeforeWeRetry()
{
_fhirOperationDataStore.AcquireExportJobsAsync(
_fhirOperationDataStore.AcquireLegacyExportJobsAsync(
DefaultMaximumNumberOfConcurrentJobAllowed,
DefaultJobHeartbeatTimeoutThreshold,
_cancellationToken)
.ThrowsForAnyArgs<Exception>();

_cancellationTokenSource.CancelAfter(DefaultJobPollingFrequency * 1.25);

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

// Assert that we received only one call to AcquireExportJobsAsync
await _fhirOperationDataStore.ReceivedWithAnyArgs(1).AcquireExportJobsAsync(Arg.Any<ushort>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
await _fhirOperationDataStore.ReceivedWithAnyArgs(1).AcquireLegacyExportJobsAsync(Arg.Any<ushort>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
}

[Fact]
public async Task GivenOperationIsCancelled_WhenExecuted_ThenWeExitTheLoop()
{
ExportJobOutcome job = CreateExportJobOutcome();
_fhirOperationDataStore.AcquireExportJobsAsync(
_fhirOperationDataStore.AcquireLegacyExportJobsAsync(
Arg.Any<ushort>(),
Arg.Any<TimeSpan>(),
_cancellationToken)
Expand All @@ -167,10 +166,10 @@ public async Task GivenOperationIsCancelled_WhenExecuted_ThenWeExitTheLoop()
return new[] { job };
});

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

// Assert that we received only one call to AcquireExportJobsAsync
await _fhirOperationDataStore.ReceivedWithAnyArgs(1).AcquireExportJobsAsync(Arg.Any<ushort>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
await _fhirOperationDataStore.ReceivedWithAnyArgs(1).AcquireLegacyExportJobsAsync(Arg.Any<ushort>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
}

private void SetupOperationDataStore(
Expand All @@ -189,7 +188,7 @@ private void SetupOperationDataStore(
jobPollingFrequency = DefaultJobPollingFrequency;
}

_fhirOperationDataStore.AcquireExportJobsAsync(
_fhirOperationDataStore.AcquireLegacyExportJobsAsync(
maximumNumberOfConcurrentJobsAllowed,
jobHeartbeatTimeoutThreshold.Value,
_cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@
using EnsureThat;
using Microsoft.Health.Core.Extensions;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;
namespace Microsoft.Health.Fhir.Core.Extensions;

/// <summary>
/// The DateTime is truncated to millisecond precision, turned into its 100ns ticks representation, and then left bit-shifted by 3.
/// </summary>
/// <remarks>
/// In the SQL provider, the resource surrogate ID is actually the last modified datetime with a "uniquifier" added to it by the database.
/// </remarks>
internal static class IdHelper
{
private const int ShiftFactor = 3;

internal static readonly DateTime MaxDateTime = new DateTime(long.MaxValue >> ShiftFactor, DateTimeKind.Utc).TruncateToMillisecond().AddTicks(-1);

public static long DateToId(DateTime dateTime)
public static long DateToId(this DateTime dateTime)
{
EnsureArg.IsLte(dateTime, MaxDateTime, nameof(dateTime));
long id = dateTime.TruncateToMillisecond().Ticks << ShiftFactor;
Expand All @@ -25,7 +31,7 @@ public static long DateToId(DateTime dateTime)
return id;
}

public static DateTime IdToDate(long resourceSurrogateId)
public static DateTime IdToDate(this long resourceSurrogateId)
{
var dateTime = new DateTime(resourceSurrogateId >> ShiftFactor, DateTimeKind.Utc);
return dateTime.TruncateToMillisecond();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public async Task ExecuteAsync(ExportJobRecord exportJobRecord, WeakETag weakETa
_exportJobRecord.RestartCount++;
}

// The intial list of query parameters will not have a continutation token. We will add that later if we get one back
// The initial list of query parameters will not have a continuation token. We will add that later if we get one back
// from the search result.
// As Till is a new property QueuedTime is being used as a backup incase Till doesn't exist in the job record.
var tillTime = _exportJobRecord.Till != null ? _exportJobRecord.Till : new PartialDateTime(_exportJobRecord.QueuedTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Health.Fhir.Core.Features.Operations.Export.Models;
using Microsoft.Health.Fhir.Core.Features.Persistence;

namespace Microsoft.Health.Fhir.Core.Features.Operations.Export;

/// <summary>
/// Provides access to the legacy export job data.
/// </summary>
public interface ILegacyExportOperationDataStore
{
/// <summary>
/// Gets an export job by id.
/// </summary>
/// <param name="id">The id of the job.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>An instance of the existing export job.</returns>
/// <exception cref="JobNotFoundException"> thrown when the specific <paramref name="id"/> is not found. </exception>
Task<ExportJobOutcome> GetLegacyExportJobByIdAsync(string id, CancellationToken cancellationToken);

/// <summary>
/// Updates an existing export job.
/// </summary>
/// <param name="jobRecord">The job record.</param>
/// <param name="eTag">The eTag used for optimistic concurrency.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>An instance of the updated export job.</returns>
Task<ExportJobOutcome> UpdateLegacyExportJobAsync(ExportJobRecord jobRecord, WeakETag eTag, CancellationToken cancellationToken);

/// <summary>
/// Acquires export jobs.
/// </summary>
/// <param name="numberOfJobsToAcquire">The number of jobs to acquire.</param>
/// <param name="jobHeartbeatTimeoutThreshold">The job heartbeat timeout threshold.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A list of acquired export job.</returns>
Task<IReadOnlyCollection<ExportJobOutcome>> AcquireLegacyExportJobsAsync(ushort numberOfJobsToAcquire, TimeSpan jobHeartbeatTimeoutThreshold, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Export
/// <summary>
/// The worker responsible for running the export job tasks.
/// </summary>
public class ExportJobWorker : INotificationHandler<StorageInitializedNotification>
public class LegacyExportJobWorker : INotificationHandler<StorageInitializedNotification>
{
private readonly Func<IScoped<IFhirOperationDataStore>> _fhirOperationDataStoreFactory;
private readonly Func<IScoped<ILegacyExportOperationDataStore>> _fhirOperationDataStoreFactory;
private readonly ExportJobConfiguration _exportJobConfiguration;
private readonly Func<IExportJobTask> _exportJobTaskFactory;
private readonly ILogger _logger;
private bool _storageReady;

private const int MaximumDelayInSeconds = 3600;

public ExportJobWorker(Func<IScoped<IFhirOperationDataStore>> fhirOperationDataStoreFactory, IOptions<ExportJobConfiguration> exportJobConfiguration, Func<IExportJobTask> exportJobTaskFactory, ILogger<ExportJobWorker> logger)
public LegacyExportJobWorker(Func<IScoped<ILegacyExportOperationDataStore>> fhirOperationDataStoreFactory, IOptions<ExportJobConfiguration> exportJobConfiguration, Func<IExportJobTask> exportJobTaskFactory, ILogger<LegacyExportJobWorker> logger)
{
EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
EnsureArg.IsNotNull(exportJobConfiguration?.Value, nameof(exportJobConfiguration));
Expand Down Expand Up @@ -61,20 +61,19 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
// Get list of available jobs.
if (runningTasks.Count < _exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowedPerInstance)
{
using (IScoped<IFhirOperationDataStore> store = _fhirOperationDataStoreFactory())
using IScoped<ILegacyExportOperationDataStore> store = _fhirOperationDataStoreFactory();
ushort numberOfJobsToAcquire = (ushort)(_exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowedPerInstance - runningTasks.Count);

IReadOnlyCollection<ExportJobOutcome> jobs = await store.Value.AcquireLegacyExportJobsAsync(
numberOfJobsToAcquire,
_exportJobConfiguration.JobHeartbeatTimeoutThreshold,
cancellationToken);

foreach (ExportJobOutcome job in jobs)
{
ushort numberOfJobsToAcquire = (ushort)(_exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowedPerInstance - runningTasks.Count);
IReadOnlyCollection<ExportJobOutcome> jobs = await store.Value.AcquireExportJobsAsync(
numberOfJobsToAcquire,
_exportJobConfiguration.JobHeartbeatTimeoutThreshold,
cancellationToken);

foreach (ExportJobOutcome job in jobs)
{
_logger.LogTrace("Picked up job: {JobId}.", job.JobRecord.Id);

runningTasks.Add(_exportJobTaskFactory().ExecuteAsync(job.JobRecord, job.ETag, cancellationToken));
}
_logger.LogTrace("Picked up job: {JobId}.", job.JobRecord.Id);

runningTasks.Add(_exportJobTaskFactory().ExecuteAsync(job.JobRecord, job.ETag, cancellationToken));
}
}

Expand Down
Loading

0 comments on commit e261f9c

Please sign in to comment.