Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/restore cdb export #3465

Merged
merged 6 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Export;
/// </summary>
public interface ILegacyExportOperationDataStore
{
/// <summary>
/// Creates a new export job.
/// </summary>
/// <param name="jobRecord">The job record.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>An instance of the export job.</returns>
Task<ExportJobOutcome> CreateLegacyExportJobAsync(ExportJobRecord jobRecord, CancellationToken cancellationToken);

/// <summary>
/// Gets an export job by id.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected FhirOperationDataStoreBase(IQueueClient queueClient, ILoggerFactory lo
};
}

public async Task<ExportJobOutcome> CreateExportJobAsync(ExportJobRecord jobRecord, CancellationToken cancellationToken)
public virtual async Task<ExportJobOutcome> CreateExportJobAsync(ExportJobRecord jobRecord, CancellationToken cancellationToken)
{
var clone = jobRecord.Clone();
clone.QueuedTime = DateTime.Parse("1900-01-01");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,10 @@ public class CosmosDataStoreConfiguration
/// Options to determine if the parallel query execution is needed across physical partitions to speed up the selective queries
/// </summary>
public CosmosDataStoreParallelQueryOptions ParallelQueryOptions { get; } = new CosmosDataStoreParallelQueryOptions { MaxQueryConcurrency = 500 };

/// <summary>
/// When True, jobs will be processed by a QueueClient framework when available.
/// </summary>
public bool UseQueueClientJobs { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Microsoft.Azure.Cosmos.Scripts;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Health.Core.Extensions;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Operations;
Expand All @@ -28,16 +29,23 @@
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures.AcquireExportJobs;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures.AcquireReindexJobs;
using Microsoft.Health.JobManagement;
using Newtonsoft.Json;
using JobConflictException = Microsoft.Health.Fhir.Core.Features.Operations.JobConflictException;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations
{
public sealed class CosmosFhirOperationDataStore : FhirOperationDataStoreBase, ILegacyExportOperationDataStore
{
private const string HashParameterName = "@hash";

private static readonly string GetJobByHashQuery =
$"SELECT TOP 1 * FROM ROOT r WHERE r.{JobRecordProperties.JobRecord}.{JobRecordProperties.Hash} = {HashParameterName} AND r.{JobRecordProperties.JobRecord}.{JobRecordProperties.Status} IN ('{OperationStatus.Queued}', '{OperationStatus.Running}') ORDER BY r.{KnownDocumentProperties.Timestamp} ASC";

private static readonly string CheckActiveJobsByStatusQuery =
$"SELECT TOP 1 * FROM ROOT r WHERE r.{JobRecordProperties.JobRecord}.{JobRecordProperties.Status} IN ('{OperationStatus.Queued}', '{OperationStatus.Running}', '{OperationStatus.Paused}')";

private readonly IScoped<Container> _containerScope;
private readonly CosmosDataStoreConfiguration _cosmosDataStoreConfiguration;
private readonly RetryExceptionPolicyFactory _retryExceptionPolicyFactory;
private readonly ICosmosQueryFactory _queryFactory;
private readonly ILogger _logger;
Expand Down Expand Up @@ -74,6 +82,7 @@ public CosmosFhirOperationDataStore(
EnsureArg.IsNotNull(logger, nameof(logger));

_containerScope = containerScope;
_cosmosDataStoreConfiguration = cosmosDataStoreConfiguration;
_retryExceptionPolicyFactory = retryExceptionPolicyFactory;
_queryFactory = queryFactory;
_logger = logger;
Expand All @@ -88,6 +97,18 @@ public CosmosFhirOperationDataStore(

private string CollectionId { get; }

public override async Task<ExportJobOutcome> CreateExportJobAsync(ExportJobRecord jobRecord, CancellationToken cancellationToken)
{
if (_cosmosDataStoreConfiguration.UseQueueClientJobs)
{
return await base.CreateExportJobAsync(jobRecord, cancellationToken);
}

// try old job records
var oldJobs = (ILegacyExportOperationDataStore)this;
return await oldJobs.CreateLegacyExportJobAsync(jobRecord, cancellationToken);
}

public override async Task<ExportJobOutcome> GetExportJobByIdAsync(string id, CancellationToken cancellationToken)
{
if (IsLegacyJob(id))
Expand Down Expand Up @@ -146,6 +167,44 @@ async Task<IReadOnlyCollection<ExportJobOutcome>> ILegacyExportOperationDataStor
}
}

async Task<ExportJobOutcome> ILegacyExportOperationDataStore.CreateLegacyExportJobAsync(ExportJobRecord jobRecord, CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(jobRecord, nameof(jobRecord));

var hashObject = new { jobRecord.RequestUri, jobRecord.RequestorClaims };
var hash = JsonConvert.SerializeObject(hashObject).ComputeHash();

ExportJobOutcome resultFromHash = await GetExportJobByHashAsync(hash, cancellationToken);
if (resultFromHash != null)
{
return resultFromHash;
}

jobRecord.Hash = hash;

var cosmosExportJob = new CosmosExportJobRecordWrapper(jobRecord);

try
{
var result = await _containerScope.Value.CreateItemAsync(
cosmosExportJob,
new PartitionKey(CosmosDbExportConstants.ExportJobPartitionKey),
cancellationToken: cancellationToken);

return new ExportJobOutcome(jobRecord, WeakETag.FromVersionId(result.Resource.ETag));
}
catch (CosmosException dce)
{
if (dce.IsRequestRateExceeded())
{
throw;
}

_logger.LogError(dce, "Failed to create an export job.");
throw;
}
}

async Task<ExportJobOutcome> ILegacyExportOperationDataStore.GetLegacyExportJobByIdAsync(string id, CancellationToken cancellationToken)
{
EnsureArg.IsNotNullOrWhiteSpace(id, nameof(id));
Expand Down Expand Up @@ -386,5 +445,42 @@ public override async Task<ReindexJobWrapper> UpdateReindexJobAsync(ReindexJobRe
throw;
}
}

private async Task<ExportJobOutcome> GetExportJobByHashAsync(string hash, CancellationToken cancellationToken)
{
EnsureArg.IsNotNullOrWhiteSpace(hash, nameof(hash));

try
{
var query = _queryFactory.Create<CosmosExportJobRecordWrapper>(
_containerScope.Value,
new CosmosQueryContext(
new QueryDefinition(GetJobByHashQuery)
.WithParameter(HashParameterName, hash),
new QueryRequestOptions { PartitionKey = new PartitionKey(CosmosDbExportConstants.ExportJobPartitionKey) }));

FeedResponse<CosmosExportJobRecordWrapper> result = await query.ExecuteNextAsync(cancellationToken);

if (result.Count == 1)
{
// We found an existing job that matches the hash.
CosmosExportJobRecordWrapper wrapper = result.First();

return new ExportJobOutcome(wrapper.JobRecord, WeakETag.FromVersionId(wrapper.ETag));
}

return null;
}
catch (CosmosException dce)
{
if (dce.IsRequestRateExceeded())
{
throw;
}

_logger.LogError(dce, "Failed to get an export job by hash.");
throw;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*************************************************************
/***************************************************************
This migration script takes care of increasing precision and scale
for Decimal value fields in below mentioned tables and table types to (36,18 ) from (18,6)

Expand Down Expand Up @@ -302,15 +302,15 @@ ALTER PROCEDURE dbo.MergeResources
,@TokenTexts dbo.TokenTextList READONLY
,@StringSearchParams dbo.StringSearchParamList READONLY
,@UriSearchParams dbo.UriSearchParamList READONLY
,@NumberSearchParams dbo.Tmp_NumberSearchParamList READONLY
,@QuantitySearchParams dbo.Tmp_QuantitySearchParamList READONLY
,@NumberSearchParams dbo.NumberSearchParamList READONLY
,@QuantitySearchParams dbo.QuantitySearchParamList READONLY
brendankowitz marked this conversation as resolved.
Show resolved Hide resolved
,@DateTimeSearchParms dbo.DateTimeSearchParamList READONLY
,@ReferenceTokenCompositeSearchParams dbo.ReferenceTokenCompositeSearchParamList READONLY
,@TokenTokenCompositeSearchParams dbo.TokenTokenCompositeSearchParamList READONLY
,@TokenDateTimeCompositeSearchParams dbo.TokenDateTimeCompositeSearchParamList READONLY
,@TokenQuantityCompositeSearchParams dbo.Tmp_TokenQuantityCompositeSearchParamList READONLY
,@TokenQuantityCompositeSearchParams dbo.TokenQuantityCompositeSearchParamList READONLY
,@TokenStringCompositeSearchParams dbo.TokenStringCompositeSearchParamList READONLY
,@TokenNumberNumberCompositeSearchParams dbo.Tmp_TokenNumberNumberCompositeSearchParamList READONLY
,@TokenNumberNumberCompositeSearchParams dbo.TokenNumberNumberCompositeSearchParamList READONLY
AS
set nocount on
DECLARE @st datetime = getUTCdate()
Expand Down Expand Up @@ -881,27 +881,51 @@ END CATCH
DECLARE @message varchar(100) = '';
IF (@origNumberSearchParamRows != @tmpNumberSearchParamRows)
BEGIN
set @message = CONCAT(@origNumberSearchParamRows, @tmpNumberSearchParamRows,'NumberSearchParam rows does not match')
EXECUTE dbo.LogEvent @Process='Verification',@Status=@message;
THROW 5000, @message, 25;
SELECT @origNumberSearchParamRows = count(*)
FROM NumberSearchParam WHERE IsHistory = 0

IF (@origNumberSearchParamRows != @tmpNumberSearchParamRows)
BEGIN
set @message = CONCAT(@origNumberSearchParamRows,' ', @tmpNumberSearchParamRows,' NumberSearchParam rows does not match')
EXECUTE dbo.LogEvent @Process='Verification',@Status=@message;
THROW 50001, @message, 25;
END
END
IF (@origQuantitySearchParamRows != @tmpQuantitySearchParamRows)
BEGIN
set @message = CONCAT(@origQuantitySearchParamRows, @tmpQuantitySearchParamRows,'QuantitySearchParam rows does not match')
EXECUTE dbo.LogEvent @Process='Verification',@Status=@message;
THROW 5000, @message, 25;
END
BEGIN
SELECT @origQuantitySearchParamRows = count(*)
FROM QuantitySearchParam WHERE IsHistory = 0

IF (@origQuantitySearchParamRows != @tmpQuantitySearchParamRows)
BEGIN
set @message = CONCAT(@origQuantitySearchParamRows, ' ',@tmpQuantitySearchParamRows,' QuantitySearchParam rows does not match')
EXECUTE dbo.LogEvent @Process='Verification',@Status=@message;
THROW 50001, @message, 25;
END
END
IF (@origTokenNumberNumberCompositeSearchParamRows != @tmpTokenNumberNumberCompositeSearchParamRows)
BEGIN
set @message = CONCAT(@origTokenNumberNumberCompositeSearchParamRows, @tmpTokenNumberNumberCompositeSearchParamRows,'TokenNumberNumberCompositeSearchParam rows does not match')
EXECUTE dbo.LogEvent @Process='Verification',@Status=@message;
THROW 5000, @message, 25;
SELECT @origTokenNumberNumberCompositeSearchParamRows = count(*)
FROM TokenNumberNumberCompositeSearchParam WHERE IsHistory = 0

IF (@origTokenNumberNumberCompositeSearchParamRows != @tmpTokenNumberNumberCompositeSearchParamRows)
BEGIN
set @message = CONCAT(@origTokenNumberNumberCompositeSearchParamRows, ' ',@tmpTokenNumberNumberCompositeSearchParamRows,' TokenNumberNumberCompositeSearchParam rows does not match')
EXECUTE dbo.LogEvent @Process='Verification',@Status=@message;
THROW 50001, @message, 25;
END
END
IF (@origTokenQuantityCompositeSearchParamRows != @tmpTokenQuantityCompositeSearchParamRows)
BEGIN
set @message = CONCAT(@origTokenQuantityCompositeSearchParamRows, @tmpTokenQuantityCompositeSearchParamRows,'TokenQuantityCompositeSearchParam rows does not match')
EXECUTE dbo.LogEvent @Process='Verification',@Status=@message;
THROW 5000, @message, 25;
SELECT @origTokenQuantityCompositeSearchParamRows = count(*)
FROM TokenQuantityCompositeSearchParam WHERE IsHistory = 0

IF (@origTokenQuantityCompositeSearchParamRows != @tmpTokenQuantityCompositeSearchParamRows)
BEGIN
set @message = CONCAT(@origTokenQuantityCompositeSearchParamRows, ' ',@tmpTokenQuantityCompositeSearchParamRows,' TokenQuantityCompositeSearchParam rows does not match')
EXECUTE dbo.LogEvent @Process='Verification',@Status=@message;
THROW 50001, @message, 25;
END
END

EXECUTE dbo.LogEvent @Process='Verification',@Status='End'
Expand All @@ -911,6 +935,7 @@ END CATCH
INSERT INTO dbo.Parameters (Id, Char) SELECT 'Alter Transaction', 'LogEvent'
EXECUTE dbo.LogEvent @Process='Alter Transaction',@Status='Start'
BEGIN TRANSACTION
DROP PROCEDURE dbo.MergeResources;
DROP TYPE dbo.NumberSearchParamList
DROP TYPE dbo.QuantitySearchParamList
DROP TYPE dbo.TokenNumberNumberCompositeSearchParamList
Expand Down Expand Up @@ -965,7 +990,7 @@ END CATCH
HighValue2 DECIMAL (36, 18) NULL);

EXECUTE(
'ALTER PROCEDURE dbo.MergeResources
'CREATE PROCEDURE dbo.MergeResources
-- This stored procedure can be used for:
-- 1. Ordinary put with single version per resource in input
-- 2. Put with history preservation (multiple input versions per resource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public CosmosDbFhirStorageTestsFixture()
DatabaseId = Environment.GetEnvironmentVariable("CosmosDb:DatabaseId") ?? "FhirTests",
AllowDatabaseCreation = true,
PreferredLocations = Environment.GetEnvironmentVariable("CosmosDb:PreferredLocations")?.Split(';', StringSplitOptions.RemoveEmptyEntries),
UseQueueClientJobs = true,
};

_cosmosCollectionConfiguration = new CosmosCollectionConfiguration
Expand Down