Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moves Cosmos Export to QueueClient #3169

Merged
merged 4 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/templates/default-azuredeploy-docker.json
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@
"SqlServer__Initialize": "[equals(parameters('solutionType'),'FhirServerSqlServer')]",
"SqlServer__SchemaOptions__AutomaticUpdatesEnabled": "[if(equals(parameters('sqlSchemaAutomaticUpdatesEnabled'),'auto'), true(), false())]",
"DataStore": "[if(equals(parameters('solutionType'),'FhirServerCosmosDB'), 'CosmosDb', 'SqlServer')]",
"TaskHosting__Enabled": "[if(equals(parameters('solutionType'),'FhirServerCosmosDB'), false(), parameters('enableImport'))]",
"TaskHosting__Enabled": "[true()]",
"TaskHosting__MaxRunningTaskCount": "[parameters('backgroundTaskCount')]",
"FhirServer__Operations__IntegrationDataStore__StorageAccountUri": "[if(parameters('enableImport'), concat('https://', variables('storageAccountName'), variables('blobStorageUri')), 'null')]",
"FhirServer__Operations__Export__Enabled": "[parameters('enableExport')]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,27 @@
namespace Microsoft.Health.Fhir.Api.Features.Operations.Export
{
/// <summary>
/// The background service used to host the <see cref="ExportJobWorker"/>.
/// The background service used to host the <see cref="LegacyExportJobWorker"/>.
/// </summary>
public class ExportJobWorkerBackgroundService : BackgroundService
public class LegacyExportJobWorkerBackgroundService : BackgroundService
{
private readonly ExportJobWorker _exportJobWorker;
private readonly LegacyExportJobWorker _legacyExportJobWorker;
private readonly ExportJobConfiguration _exportJobConfiguration;

public ExportJobWorkerBackgroundService(ExportJobWorker exportJobWorker, IOptions<ExportJobConfiguration> exportJobConfiguration)
public LegacyExportJobWorkerBackgroundService(LegacyExportJobWorker legacyExportJobWorker, IOptions<ExportJobConfiguration> exportJobConfiguration)
{
EnsureArg.IsNotNull(exportJobWorker, nameof(exportJobWorker));
EnsureArg.IsNotNull(legacyExportJobWorker, nameof(legacyExportJobWorker));
EnsureArg.IsNotNull(exportJobConfiguration?.Value, nameof(exportJobConfiguration));

_exportJobWorker = exportJobWorker;
_legacyExportJobWorker = legacyExportJobWorker;
_exportJobConfiguration = exportJobConfiguration.Value;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (_exportJobConfiguration.Enabled)
{
await _exportJobWorker.ExecuteAsync(stoppingToken);
await _legacyExportJobWorker.ExecuteAsync(stoppingToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Messages.Export;
using Microsoft.Health.Fhir.Core.Messages.Storage;
using Microsoft.Health.Fhir.Core.UnitTests.Extensions;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Test.Utilities;
using NSubstitute;
Expand All @@ -32,12 +33,12 @@ public class ExportJobWorkerTests
private static readonly TimeSpan DefaultJobHeartbeatTimeoutThreshold = TimeSpan.FromMinutes(10);
private static readonly TimeSpan DefaultJobPollingFrequency = TimeSpan.FromMilliseconds(100);

private readonly IFhirOperationDataStore _fhirOperationDataStore = Substitute.For<IFhirOperationDataStore>();
private readonly ILegacyExportOperationDataStore _fhirOperationDataStore = Substitute.For<ILegacyExportOperationDataStore>();
private readonly ExportJobConfiguration _exportJobConfiguration = new ExportJobConfiguration();
private readonly Func<IExportJobTask> _exportJobTaskFactory = Substitute.For<Func<IExportJobTask>>();
private readonly IExportJobTask _task = Substitute.For<IExportJobTask>();

private readonly ExportJobWorker _exportJobWorker;
private readonly LegacyExportJobWorker _legacyExportJobWorker;

private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly CancellationToken _cancellationToken;
Expand All @@ -49,16 +50,14 @@ public ExportJobWorkerTests()
_exportJobConfiguration.JobPollingFrequency = DefaultJobPollingFrequency;

_exportJobTaskFactory().Returns(_task);
var scopedOperationDataStore = Substitute.For<IScoped<IFhirOperationDataStore>>();
scopedOperationDataStore.Value.Returns(_fhirOperationDataStore);

_exportJobWorker = new ExportJobWorker(
() => scopedOperationDataStore,
_legacyExportJobWorker = new LegacyExportJobWorker(
() => _fhirOperationDataStore.CreateMockScope(),
Options.Create(_exportJobConfiguration),
_exportJobTaskFactory,
NullLogger<ExportJobWorker>.Instance);
NullLogger<LegacyExportJobWorker>.Instance);

_exportJobWorker.Handle(new StorageInitializedNotification(), CancellationToken.None);
_legacyExportJobWorker.Handle(new StorageInitializedNotification(), CancellationToken.None);
_cancellationToken = _cancellationTokenSource.Token;
}

Expand All @@ -71,7 +70,7 @@ public async Task GivenThereIsNoRunningJob_WhenExecuted_ThenATaskShouldBeCreated

_cancellationTokenSource.CancelAfter(DefaultJobPollingFrequency);

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

_exportJobTaskFactory().Received(1);
}
Expand All @@ -87,7 +86,7 @@ public async Task GivenTheNumberOfRunningJobExceedsThreshold_WhenExecuted_ThenAT

_cancellationTokenSource.CancelAfter(DefaultJobPollingFrequency * 2);

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

_exportJobTaskFactory.Received(1);
}
Expand Down Expand Up @@ -131,33 +130,33 @@ public async Task GivenTheNumberOfRunningJobDoesNotExceedThreshold_WhenExecuted_
// In case the task was not called, cancel the worker after certain period of time.
_cancellationTokenSource.CancelAfter(DefaultJobPollingFrequency * 3);

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

Assert.True(isSecondJobCalled);
}

[Fact]
public async Task GivenAcquireExportJobThrowsException_WhenExecuted_ThenWeHaveADelayBeforeWeRetry()
{
_fhirOperationDataStore.AcquireExportJobsAsync(
_fhirOperationDataStore.AcquireLegacyExportJobsAsync(
DefaultMaximumNumberOfConcurrentJobAllowed,
DefaultJobHeartbeatTimeoutThreshold,
_cancellationToken)
.ThrowsForAnyArgs<Exception>();

_cancellationTokenSource.CancelAfter(DefaultJobPollingFrequency * 1.25);

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

// Assert that we received only one call to AcquireExportJobsAsync
await _fhirOperationDataStore.ReceivedWithAnyArgs(1).AcquireExportJobsAsync(Arg.Any<ushort>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
await _fhirOperationDataStore.ReceivedWithAnyArgs(1).AcquireLegacyExportJobsAsync(Arg.Any<ushort>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
}

[Fact]
public async Task GivenOperationIsCancelled_WhenExecuted_ThenWeExitTheLoop()
{
ExportJobOutcome job = CreateExportJobOutcome();
_fhirOperationDataStore.AcquireExportJobsAsync(
_fhirOperationDataStore.AcquireLegacyExportJobsAsync(
Arg.Any<ushort>(),
Arg.Any<TimeSpan>(),
_cancellationToken)
Expand All @@ -167,10 +166,10 @@ public async Task GivenOperationIsCancelled_WhenExecuted_ThenWeExitTheLoop()
return new[] { job };
});

await _exportJobWorker.ExecuteAsync(_cancellationToken);
await _legacyExportJobWorker.ExecuteAsync(_cancellationToken);

// Assert that we received only one call to AcquireExportJobsAsync
await _fhirOperationDataStore.ReceivedWithAnyArgs(1).AcquireExportJobsAsync(Arg.Any<ushort>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
await _fhirOperationDataStore.ReceivedWithAnyArgs(1).AcquireLegacyExportJobsAsync(Arg.Any<ushort>(), Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
}

private void SetupOperationDataStore(
Expand All @@ -189,7 +188,7 @@ private void SetupOperationDataStore(
jobPollingFrequency = DefaultJobPollingFrequency;
}

_fhirOperationDataStore.AcquireExportJobsAsync(
_fhirOperationDataStore.AcquireLegacyExportJobsAsync(
maximumNumberOfConcurrentJobsAllowed,
jobHeartbeatTimeoutThreshold.Value,
_cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@
using EnsureThat;
using Microsoft.Health.Core.Extensions;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;
namespace Microsoft.Health.Fhir.Core.Extensions;

/// <summary>
/// The DateTime is truncated to millisecond precision, turned into its 100ns ticks representation, and then left bit-shifted by 3.
/// </summary>
/// <remarks>
/// In the SQL provider, the resource surrogate ID is actually the last modified datetime with a "uniquifier" added to it by the database.
/// </remarks>
internal static class IdHelper
{
private const int ShiftFactor = 3;

internal static readonly DateTime MaxDateTime = new DateTime(long.MaxValue >> ShiftFactor, DateTimeKind.Utc).TruncateToMillisecond().AddTicks(-1);

public static long DateToId(DateTime dateTime)
public static long DateToId(this DateTime dateTime)
{
EnsureArg.IsLte(dateTime, MaxDateTime, nameof(dateTime));
long id = dateTime.TruncateToMillisecond().Ticks << ShiftFactor;
Expand All @@ -25,7 +31,7 @@ public static long DateToId(DateTime dateTime)
return id;
}

public static DateTime IdToDate(long resourceSurrogateId)
public static DateTime IdToDate(this long resourceSurrogateId)
{
var dateTime = new DateTime(resourceSurrogateId >> ShiftFactor, DateTimeKind.Utc);
return dateTime.TruncateToMillisecond();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public async Task ExecuteAsync(ExportJobRecord exportJobRecord, WeakETag weakETa
_exportJobRecord.RestartCount++;
}

// The intial list of query parameters will not have a continutation token. We will add that later if we get one back
// The initial list of query parameters will not have a continuation token. We will add that later if we get one back
// from the search result.
// As Till is a new property QueuedTime is being used as a backup incase Till doesn't exist in the job record.
var tillTime = _exportJobRecord.Till != null ? _exportJobRecord.Till : new PartialDateTime(_exportJobRecord.QueuedTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Health.Fhir.Core.Features.Operations.Export.Models;
using Microsoft.Health.Fhir.Core.Features.Persistence;

namespace Microsoft.Health.Fhir.Core.Features.Operations.Export;

/// <summary>
/// Provides access to the legacy export job data.
/// </summary>
public interface ILegacyExportOperationDataStore
{
/// <summary>
/// Gets an export job by id.
/// </summary>
/// <param name="id">The id of the job.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>An instance of the existing export job.</returns>
/// <exception cref="JobNotFoundException"> thrown when the specific <paramref name="id"/> is not found. </exception>
Task<ExportJobOutcome> GetLegacyExportJobByIdAsync(string id, CancellationToken cancellationToken);

/// <summary>
/// Updates an existing export job.
/// </summary>
/// <param name="jobRecord">The job record.</param>
/// <param name="eTag">The eTag used for optimistic concurrency.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>An instance of the updated export job.</returns>
Task<ExportJobOutcome> UpdateLegacyExportJobAsync(ExportJobRecord jobRecord, WeakETag eTag, CancellationToken cancellationToken);

/// <summary>
/// Acquires export jobs.
/// </summary>
/// <param name="numberOfJobsToAcquire">The number of jobs to acquire.</param>
/// <param name="jobHeartbeatTimeoutThreshold">The job heartbeat timeout threshold.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A list of acquired export job.</returns>
Task<IReadOnlyCollection<ExportJobOutcome>> AcquireLegacyExportJobsAsync(ushort numberOfJobsToAcquire, TimeSpan jobHeartbeatTimeoutThreshold, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Export
/// <summary>
/// The worker responsible for running the export job tasks.
/// </summary>
public class ExportJobWorker : INotificationHandler<StorageInitializedNotification>
public class LegacyExportJobWorker : INotificationHandler<StorageInitializedNotification>
{
private readonly Func<IScoped<IFhirOperationDataStore>> _fhirOperationDataStoreFactory;
private readonly Func<IScoped<ILegacyExportOperationDataStore>> _fhirOperationDataStoreFactory;
private readonly ExportJobConfiguration _exportJobConfiguration;
private readonly Func<IExportJobTask> _exportJobTaskFactory;
private readonly ILogger _logger;
private bool _storageReady;

private const int MaximumDelayInSeconds = 3600;

public ExportJobWorker(Func<IScoped<IFhirOperationDataStore>> fhirOperationDataStoreFactory, IOptions<ExportJobConfiguration> exportJobConfiguration, Func<IExportJobTask> exportJobTaskFactory, ILogger<ExportJobWorker> logger)
public LegacyExportJobWorker(Func<IScoped<ILegacyExportOperationDataStore>> fhirOperationDataStoreFactory, IOptions<ExportJobConfiguration> exportJobConfiguration, Func<IExportJobTask> exportJobTaskFactory, ILogger<LegacyExportJobWorker> logger)
{
EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
EnsureArg.IsNotNull(exportJobConfiguration?.Value, nameof(exportJobConfiguration));
Expand Down Expand Up @@ -61,20 +61,19 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
// Get list of available jobs.
if (runningTasks.Count < _exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowedPerInstance)
{
using (IScoped<IFhirOperationDataStore> store = _fhirOperationDataStoreFactory())
using IScoped<ILegacyExportOperationDataStore> store = _fhirOperationDataStoreFactory();
ushort numberOfJobsToAcquire = (ushort)(_exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowedPerInstance - runningTasks.Count);

IReadOnlyCollection<ExportJobOutcome> jobs = await store.Value.AcquireLegacyExportJobsAsync(
numberOfJobsToAcquire,
_exportJobConfiguration.JobHeartbeatTimeoutThreshold,
cancellationToken);

foreach (ExportJobOutcome job in jobs)
{
ushort numberOfJobsToAcquire = (ushort)(_exportJobConfiguration.MaximumNumberOfConcurrentJobsAllowedPerInstance - runningTasks.Count);
IReadOnlyCollection<ExportJobOutcome> jobs = await store.Value.AcquireExportJobsAsync(
numberOfJobsToAcquire,
_exportJobConfiguration.JobHeartbeatTimeoutThreshold,
cancellationToken);

foreach (ExportJobOutcome job in jobs)
{
_logger.LogTrace("Picked up job: {JobId}.", job.JobRecord.Id);

runningTasks.Add(_exportJobTaskFactory().ExecuteAsync(job.JobRecord, job.ETag, cancellationToken));
}
_logger.LogTrace("Picked up job: {JobId}.", job.JobRecord.Id);

runningTasks.Add(_exportJobTaskFactory().ExecuteAsync(job.JobRecord, job.ETag, cancellationToken));
}
}

Expand Down
Loading