From 8c434af83afb1b06ae5e83c1c147c05a269cb41a Mon Sep 17 00:00:00 2001 From: SergeyGaluzo <95932081+SergeyGaluzo@users.noreply.github.com> Date: Tue, 23 May 2023 15:01:49 -0700 Subject: [PATCH] Fixing logic in SqlQueueClient plus (#3308) * Ability to ignore input last updated * null is false * Added correct retry logic on get jobs plus honor return definition * removed check --- .../Features/Storage/SqlQueueClient.cs | 107 ++++++------------ .../Storage/SqlServerFhirDataStore.cs | 101 ++++++++++++++++- 2 files changed, 130 insertions(+), 78 deletions(-) diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs index 04c56448b4..fa94080eef 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; +using System.Data; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -15,7 +16,6 @@ using Microsoft.Health.JobManagement; using Microsoft.Health.SqlServer.Features.Client; using Microsoft.Health.SqlServer.Features.Schema; -using Microsoft.Health.SqlServer.Features.Schema.Model; using Microsoft.Health.SqlServer.Features.Storage; using JobStatus = Microsoft.Health.JobManagement.JobStatus; @@ -28,8 +28,6 @@ public class SqlQueueClient : IQueueClient private readonly ISqlRetryService _sqlRetryService; private readonly ILogger _logger; - private static readonly GetJobsProcedure GetJobs = new GetJobsProcedure(); - public SqlQueueClient( SqlConnectionWrapperFactory sqlConnectionWrapperFactory, SchemaInformation schemaInformation, @@ -93,7 +91,7 @@ public async Task CancelJobByIdAsync(byte queueType, long jobId, CancellationTok public virtual async Task CompleteJobAsync(JobInfo jobInfo, bool requestCancellationOnFailure, CancellationToken cancellationToken) { - using SqlCommand sqlCommand = new SqlCommand(); + using var sqlCommand = new SqlCommand(); // cannot use VLatest as it incorrectly sends nulls sqlCommand.CommandType = System.Data.CommandType.StoredProcedure; @@ -153,7 +151,7 @@ public async Task> DequeueJobsAsync(byte queueType, public async Task DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null) { - using SqlCommand sqlCommand = new SqlCommand(); + using var sqlCommand = new SqlCommand(); // cannot use VLatest as it incorrectly asks for optional @InputJobId sqlCommand.CommandText = "dbo.DequeueJob"; @@ -166,12 +164,7 @@ public async Task DequeueAsync(byte queueType, string worker, int heart sqlCommand.Parameters.AddWithValue("@InputJobId", jobId.Value); } - JobInfo jobInfo = await _sqlRetryService.ExecuteSqlDataReaderFirstRow( - sqlCommand, - JobInfoExtensions.LoadJobInfo, - _logger, - "DequeueAsync failed.", - cancellationToken); + JobInfo jobInfo = await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "DequeueAsync failed.", cancellationToken); if (jobInfo != null) { jobInfo.QueueType = queueType; @@ -182,7 +175,7 @@ public async Task DequeueAsync(byte queueType, string worker, int heart public async Task> EnqueueAsync(byte queueType, string[] definitions, long? groupId, bool forceOneActiveJobGroup, bool isCompleted, CancellationToken cancellationToken) { - using SqlCommand sqlCommand = new SqlCommand(); + using var sqlCommand = new SqlCommand(); // cannot use VLatest as it does not understand optional parameters sqlCommand.CommandType = System.Data.CommandType.StoredProcedure; @@ -200,12 +193,7 @@ public async Task> EnqueueAsync(byte queueType, string[] try { - return await _sqlRetryService.ExecuteSqlDataReader( - sqlCommand, - JobInfoExtensions.LoadJobInfo, - _logger, - "EnqueueJobs failed.", - cancellationToken); + return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "EnqueueJobs failed.", cancellationToken); } catch (SqlException sqlEx) { @@ -220,54 +208,23 @@ public async Task> EnqueueAsync(byte queueType, string[] public async Task> GetJobByGroupIdAsync(byte queueType, long groupId, bool returnDefinition, CancellationToken cancellationToken) { - using SqlCommand sqlCommand = new SqlCommand(); - - GetJobs.PopulateCommand(sqlCommand, queueType, null, null, groupId, returnDefinition); - - return await _sqlRetryService.ExecuteSqlDataReader( - sqlCommand, - JobInfoExtensions.LoadJobInfo, - _logger, - "GetJobByGroupIdAsync failed.", - cancellationToken); + using var sqlCommand = new SqlCommand(); + PopulateGetJobsCommand(sqlCommand, queueType, null, null, groupId, returnDefinition); + return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByGroupIdAsync failed.", cancellationToken); } public async Task GetJobByIdAsync(byte queueType, long jobId, bool returnDefinition, CancellationToken cancellationToken) { - using SqlCommand sqlCommand = new SqlCommand(); - - GetJobs.PopulateCommand(sqlCommand, queueType, jobId, null, null, returnDefinition); - - return await _sqlRetryService.ExecuteSqlDataReaderFirstRow( - sqlCommand, - JobInfoExtensions.LoadJobInfo, - _logger, - "GetJobByIdAsync failed.", - cancellationToken); + using var sqlCommand = new SqlCommand(); + PopulateGetJobsCommand(sqlCommand, queueType, jobId, returnDefinition: returnDefinition); + return await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByIdAsync failed.", cancellationToken); } public async Task> GetJobsByIdsAsync(byte queueType, long[] jobIds, bool returnDefinition, CancellationToken cancellationToken) { - try - { - using SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true); - using SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateRetrySqlCommand(); - - VLatest.GetJobs.PopulateCommand(sqlCommandWrapper, queueType, null, jobIds.Select(i => new BigintListRow(i)), null, returnDefinition); - await using SqlDataReader sqlDataReader = await sqlCommandWrapper.ExecuteReaderAsync(cancellationToken); - - return await sqlDataReader.ReadJobInfosAsync(cancellationToken); - } - catch (Exception ex) - { - _logger.LogError(ex, "GetJobsByIdsAsync failed."); - if (ex.IsRetriable()) - { - throw new RetriableJobException(ex.Message, ex); - } - - throw; - } + using var cmd = new SqlCommand(); + PopulateGetJobsCommand(cmd, queueType, jobIds: jobIds, returnDefinition: returnDefinition); + return await _sqlRetryService.ExecuteSqlDataReader(cmd, JobInfoExtensions.LoadJobInfo, _logger, "GetJobsByIdsAsync failed.", cancellationToken); } public bool IsInitialized() @@ -302,29 +259,29 @@ public async Task PutJobHeartbeatAsync(JobInfo jobInfo, CancellationToken return cancel; } - // Class copied from src\Microsoft.Health.Fhir.SqlServer\Features\Schema\Model\VLatest.Generated.cs . - // Autogenerated version in VLatest.Generated.cs uses SqlCommandWrapper instead of SqlCommand as the input parameter, so here - // we have the version that uses SqlCommand since we are depreciating SqlCommandWrapper. - private class GetJobsProcedure : StoredProcedure + private static void PopulateGetJobsCommand(SqlCommand cmd, byte queueType, long? jobId = null, IEnumerable jobIds = null, long? groupId = null, bool? returnDefinition = null) { - private readonly ParameterDefinition _queueType = new ParameterDefinition("@QueueType", global::System.Data.SqlDbType.TinyInt, false); - private readonly ParameterDefinition _jobId = new ParameterDefinition("@JobId", global::System.Data.SqlDbType.BigInt, true); - private readonly BigintListTableValuedParameterDefinition _jobIds = new BigintListTableValuedParameterDefinition("@JobIds"); - private readonly ParameterDefinition _groupId = new ParameterDefinition("@GroupId", global::System.Data.SqlDbType.BigInt, true); + cmd.CommandType = CommandType.StoredProcedure; + cmd.CommandText = "dbo.GetJobs"; + cmd.Parameters.AddWithValue("@QueueType", queueType); + if (jobId.HasValue) + { + cmd.Parameters.AddWithValue("@JobId", jobId.Value); + } - internal GetJobsProcedure() - : base("dbo.GetJobs") + if (jobIds != null) + { + new BigintListTableValuedParameterDefinition("@JobIds").AddParameter(cmd.Parameters, jobIds.Select(_ => new BigintListRow(_))); + } + + if (groupId.HasValue) { + cmd.Parameters.AddWithValue("@GroupId", groupId.Value); } - public void PopulateCommand(SqlCommand sqlCommand, byte queueType, long? jobId, global::System.Collections.Generic.IEnumerable jobIds, long? groupId, bool? returnDefinition) + if (returnDefinition.HasValue) { - sqlCommand.CommandType = global::System.Data.CommandType.StoredProcedure; - sqlCommand.CommandText = "dbo.GetJobs"; - _queueType.AddParameter(sqlCommand.Parameters, queueType); - _jobId.AddParameter(sqlCommand.Parameters, jobId); - _jobIds.AddParameter(sqlCommand.Parameters, jobIds); - _groupId.AddParameter(sqlCommand.Parameters, groupId); + cmd.Parameters.AddWithValue("@ReturnDefinition", returnDefinition.Value); } } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs index 01d999a697..efddf46bf4 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs @@ -53,6 +53,8 @@ internal class SqlServerFhirDataStore : IFhirDataStore, IProvideCapability private readonly SchemaInformation _schemaInformation; private readonly IModelInfoProvider _modelInfoProvider; private const string InitialVersion = "1"; + private static IgnoreInputLastUpdated _ignoreInputLastUpdated; + private static object _flagLocker = new object(); public SqlServerFhirDataStore( ISqlServerFhirModel model, @@ -84,6 +86,14 @@ public SqlServerFhirDataStore( _requestContextAccessor = EnsureArg.IsNotNull(requestContextAccessor, nameof(requestContextAccessor)); _memoryStreamManager = new RecyclableMemoryStreamManager(); + + if (_ignoreInputLastUpdated == null) + { + lock (_flagLocker) + { + _ignoreInputLastUpdated ??= new IgnoreInputLastUpdated(_sqlConnectionWrapperFactory); + } + } } public async Task> MergeAsync(IReadOnlyList resources, CancellationToken cancellationToken) @@ -264,9 +274,20 @@ internal async Task> MergeInternalAsync(I } } - var surrIdBase = ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.LastModified.DateTime); - var surrId = surrIdBase + minSequenceId + index; - ReplaceVersionIdInMeta(resource); + long surrId; + if (_ignoreInputLastUpdated.IsEnabled()) + { + surrId = minSurrId + index; + resource.LastModified = new DateTimeOffset(ResourceSurrogateIdHelper.ResourceSurrogateIdToLastUpdated(surrId), TimeSpan.Zero); + ReplaceVersionIdAndLastUpdatedInMeta(resource); + } + else + { + var surrIdBase = ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.LastModified.DateTime); + surrId = surrIdBase + minSequenceId + index; + ReplaceVersionIdInMeta(resource); + } + resource.ResourceSurrogateId = surrId; mergeWrappers.Add(new MergeResourceWrapper(resource, resourceExt.KeepHistory, hasVersionToCompare)); index++; @@ -484,6 +505,26 @@ private void ReplaceVersionIdInMeta(ResourceWrapper resourceWrapper) resourceWrapper.RawResource = new RawResource(rawResourceData, FhirResourceFormat.Json, true); } + private void ReplaceVersionIdAndLastUpdatedInMeta(ResourceWrapper resourceWrapper) + { + var date = GetJsonValue(resourceWrapper.RawResource.Data, "lastUpdated", false); + string rawResourceData; + if (resourceWrapper.Version == InitialVersion) // version is already correct + { + rawResourceData = resourceWrapper.RawResource.Data + .Replace($"\"lastUpdated\":\"{date}\"", $"\"lastUpdated\":\"{RemoveTrailingZerosFromMillisecondsForAGivenDate(resourceWrapper.LastModified)}\"", StringComparison.Ordinal); + } + else + { + var version = GetJsonValue(resourceWrapper.RawResource.Data, "versionId", false); + rawResourceData = resourceWrapper.RawResource.Data + .Replace($"\"versionId\":\"{version}\"", $"\"versionId\":\"{resourceWrapper.Version}\"", StringComparison.Ordinal) + .Replace($"\"lastUpdated\":\"{date}\"", $"\"lastUpdated\":\"{RemoveTrailingZerosFromMillisecondsForAGivenDate(resourceWrapper.LastModified)}\"", StringComparison.Ordinal); + } + + resourceWrapper.RawResource = new RawResource(rawResourceData, FhirResourceFormat.Json, true); + } + private bool ExistingRawResourceIsEqualToInput(ResourceWrapper input, ResourceWrapper existing) // call is not symmetrical, it assumes version = 1 on input. { var inputDate = GetJsonValue(input.RawResource.Data, "lastUpdated", false); @@ -616,5 +657,59 @@ public async Task UpdateSearchParameterIndicesAsync(ResourceWra { return await Task.FromResult((int?)null); } + + private class IgnoreInputLastUpdated + { + private SqlConnectionWrapperFactory _sqlConnectionWrapperFactory; + private bool _isEnabled; + private DateTime? _lastUpdated; + private object _databaseAccessLocker = new object(); + + public IgnoreInputLastUpdated(SqlConnectionWrapperFactory sqlConnectionWrapperFactory) + { + _sqlConnectionWrapperFactory = sqlConnectionWrapperFactory; + } + + public bool IsEnabled() + { + if (_lastUpdated.HasValue && (DateTime.UtcNow - _lastUpdated.Value).TotalSeconds < 600) + { + return _isEnabled; + } + + lock (_databaseAccessLocker) + { + if (_lastUpdated.HasValue && (DateTime.UtcNow - _lastUpdated.Value).TotalSeconds < 600) + { + return _isEnabled; + } + + var isEnabled = IsEnabledInDatabase(); + if (isEnabled.HasValue) + { + _isEnabled = isEnabled.Value; + _lastUpdated = DateTime.UtcNow; + } + } + + return _isEnabled; + } + + private bool? IsEnabledInDatabase() + { + try + { + using var conn = _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(CancellationToken.None, false).Result; + using var cmd = conn.CreateRetrySqlCommand(); + cmd.CommandText = "IF object_id('dbo.Parameters') IS NOT NULL SELECT Number FROM dbo.Parameters WHERE Id = 'MergeResources.IgnoreInputLastUpdated'"; // call can be made before store is initialized + var value = cmd.ExecuteScalarAsync(CancellationToken.None).Result; + return value != null && (double)value == 1; + } + catch (SqlException) + { + return null; + } + } + } } }