Skip to content

Commit

Permalink
Fixing logic in SqlQueueClient plus (#3308)
Browse files Browse the repository at this point in the history
* Ability to ignore input last updated

* null is false

* Added correct retry logic on get jobs plus honor return definition

* removed check
  • Loading branch information
SergeyGaluzo authored May 23, 2023
1 parent 7ff280a commit 8c434af
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 78 deletions.
107 changes: 32 additions & 75 deletions src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -15,7 +16,6 @@
using Microsoft.Health.JobManagement;
using Microsoft.Health.SqlServer.Features.Client;
using Microsoft.Health.SqlServer.Features.Schema;
using Microsoft.Health.SqlServer.Features.Schema.Model;
using Microsoft.Health.SqlServer.Features.Storage;
using JobStatus = Microsoft.Health.JobManagement.JobStatus;

Expand All @@ -28,8 +28,6 @@ public class SqlQueueClient : IQueueClient
private readonly ISqlRetryService _sqlRetryService;
private readonly ILogger<SqlQueueClient> _logger;

private static readonly GetJobsProcedure GetJobs = new GetJobsProcedure();

public SqlQueueClient(
SqlConnectionWrapperFactory sqlConnectionWrapperFactory,
SchemaInformation schemaInformation,
Expand Down Expand Up @@ -93,7 +91,7 @@ public async Task CancelJobByIdAsync(byte queueType, long jobId, CancellationTok

public virtual async Task CompleteJobAsync(JobInfo jobInfo, bool requestCancellationOnFailure, CancellationToken cancellationToken)
{
using SqlCommand sqlCommand = new SqlCommand();
using var sqlCommand = new SqlCommand();

// cannot use VLatest as it incorrectly sends nulls
sqlCommand.CommandType = System.Data.CommandType.StoredProcedure;
Expand Down Expand Up @@ -153,7 +151,7 @@ public async Task<IReadOnlyCollection<JobInfo>> DequeueJobsAsync(byte queueType,

public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null)
{
using SqlCommand sqlCommand = new SqlCommand();
using var sqlCommand = new SqlCommand();

// cannot use VLatest as it incorrectly asks for optional @InputJobId
sqlCommand.CommandText = "dbo.DequeueJob";
Expand All @@ -166,12 +164,7 @@ public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heart
sqlCommand.Parameters.AddWithValue("@InputJobId", jobId.Value);
}

JobInfo jobInfo = await _sqlRetryService.ExecuteSqlDataReaderFirstRow(
sqlCommand,
JobInfoExtensions.LoadJobInfo,
_logger,
"DequeueAsync failed.",
cancellationToken);
JobInfo jobInfo = await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "DequeueAsync failed.", cancellationToken);
if (jobInfo != null)
{
jobInfo.QueueType = queueType;
Expand All @@ -182,7 +175,7 @@ public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heart

public async Task<IReadOnlyList<JobInfo>> EnqueueAsync(byte queueType, string[] definitions, long? groupId, bool forceOneActiveJobGroup, bool isCompleted, CancellationToken cancellationToken)
{
using SqlCommand sqlCommand = new SqlCommand();
using var sqlCommand = new SqlCommand();

// cannot use VLatest as it does not understand optional parameters
sqlCommand.CommandType = System.Data.CommandType.StoredProcedure;
Expand All @@ -200,12 +193,7 @@ public async Task<IReadOnlyList<JobInfo>> EnqueueAsync(byte queueType, string[]

try
{
return await _sqlRetryService.ExecuteSqlDataReader(
sqlCommand,
JobInfoExtensions.LoadJobInfo,
_logger,
"EnqueueJobs failed.",
cancellationToken);
return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "EnqueueJobs failed.", cancellationToken);
}
catch (SqlException sqlEx)
{
Expand All @@ -220,54 +208,23 @@ public async Task<IReadOnlyList<JobInfo>> EnqueueAsync(byte queueType, string[]

public async Task<IReadOnlyList<JobInfo>> GetJobByGroupIdAsync(byte queueType, long groupId, bool returnDefinition, CancellationToken cancellationToken)
{
using SqlCommand sqlCommand = new SqlCommand();

GetJobs.PopulateCommand(sqlCommand, queueType, null, null, groupId, returnDefinition);

return await _sqlRetryService.ExecuteSqlDataReader(
sqlCommand,
JobInfoExtensions.LoadJobInfo,
_logger,
"GetJobByGroupIdAsync failed.",
cancellationToken);
using var sqlCommand = new SqlCommand();
PopulateGetJobsCommand(sqlCommand, queueType, null, null, groupId, returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByGroupIdAsync failed.", cancellationToken);
}

public async Task<JobInfo> GetJobByIdAsync(byte queueType, long jobId, bool returnDefinition, CancellationToken cancellationToken)
{
using SqlCommand sqlCommand = new SqlCommand();

GetJobs.PopulateCommand(sqlCommand, queueType, jobId, null, null, returnDefinition);

return await _sqlRetryService.ExecuteSqlDataReaderFirstRow(
sqlCommand,
JobInfoExtensions.LoadJobInfo,
_logger,
"GetJobByIdAsync failed.",
cancellationToken);
using var sqlCommand = new SqlCommand();
PopulateGetJobsCommand(sqlCommand, queueType, jobId, returnDefinition: returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByIdAsync failed.", cancellationToken);
}

public async Task<IReadOnlyList<JobInfo>> GetJobsByIdsAsync(byte queueType, long[] jobIds, bool returnDefinition, CancellationToken cancellationToken)
{
try
{
using SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true);
using SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateRetrySqlCommand();

VLatest.GetJobs.PopulateCommand(sqlCommandWrapper, queueType, null, jobIds.Select(i => new BigintListRow(i)), null, returnDefinition);
await using SqlDataReader sqlDataReader = await sqlCommandWrapper.ExecuteReaderAsync(cancellationToken);

return await sqlDataReader.ReadJobInfosAsync(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "GetJobsByIdsAsync failed.");
if (ex.IsRetriable())
{
throw new RetriableJobException(ex.Message, ex);
}

throw;
}
using var cmd = new SqlCommand();
PopulateGetJobsCommand(cmd, queueType, jobIds: jobIds, returnDefinition: returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReader(cmd, JobInfoExtensions.LoadJobInfo, _logger, "GetJobsByIdsAsync failed.", cancellationToken);
}

public bool IsInitialized()
Expand Down Expand Up @@ -302,29 +259,29 @@ public async Task<bool> PutJobHeartbeatAsync(JobInfo jobInfo, CancellationToken
return cancel;
}

// Class copied from src\Microsoft.Health.Fhir.SqlServer\Features\Schema\Model\VLatest.Generated.cs .
// Autogenerated version in VLatest.Generated.cs uses SqlCommandWrapper instead of SqlCommand as the input parameter, so here
// we have the version that uses SqlCommand since we are depreciating SqlCommandWrapper.
private class GetJobsProcedure : StoredProcedure
private static void PopulateGetJobsCommand(SqlCommand cmd, byte queueType, long? jobId = null, IEnumerable<long> jobIds = null, long? groupId = null, bool? returnDefinition = null)
{
private readonly ParameterDefinition<byte> _queueType = new ParameterDefinition<byte>("@QueueType", global::System.Data.SqlDbType.TinyInt, false);
private readonly ParameterDefinition<long?> _jobId = new ParameterDefinition<long?>("@JobId", global::System.Data.SqlDbType.BigInt, true);
private readonly BigintListTableValuedParameterDefinition _jobIds = new BigintListTableValuedParameterDefinition("@JobIds");
private readonly ParameterDefinition<long?> _groupId = new ParameterDefinition<long?>("@GroupId", global::System.Data.SqlDbType.BigInt, true);
cmd.CommandType = CommandType.StoredProcedure;
cmd.CommandText = "dbo.GetJobs";
cmd.Parameters.AddWithValue("@QueueType", queueType);
if (jobId.HasValue)
{
cmd.Parameters.AddWithValue("@JobId", jobId.Value);
}

internal GetJobsProcedure()
: base("dbo.GetJobs")
if (jobIds != null)
{
new BigintListTableValuedParameterDefinition("@JobIds").AddParameter(cmd.Parameters, jobIds.Select(_ => new BigintListRow(_)));
}

if (groupId.HasValue)
{
cmd.Parameters.AddWithValue("@GroupId", groupId.Value);
}

public void PopulateCommand(SqlCommand sqlCommand, byte queueType, long? jobId, global::System.Collections.Generic.IEnumerable<BigintListRow> jobIds, long? groupId, bool? returnDefinition)
if (returnDefinition.HasValue)
{
sqlCommand.CommandType = global::System.Data.CommandType.StoredProcedure;
sqlCommand.CommandText = "dbo.GetJobs";
_queueType.AddParameter(sqlCommand.Parameters, queueType);
_jobId.AddParameter(sqlCommand.Parameters, jobId);
_jobIds.AddParameter(sqlCommand.Parameters, jobIds);
_groupId.AddParameter(sqlCommand.Parameters, groupId);
cmd.Parameters.AddWithValue("@ReturnDefinition", returnDefinition.Value);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ internal class SqlServerFhirDataStore : IFhirDataStore, IProvideCapability
private readonly SchemaInformation _schemaInformation;
private readonly IModelInfoProvider _modelInfoProvider;
private const string InitialVersion = "1";
private static IgnoreInputLastUpdated _ignoreInputLastUpdated;
private static object _flagLocker = new object();

public SqlServerFhirDataStore(
ISqlServerFhirModel model,
Expand Down Expand Up @@ -84,6 +86,14 @@ public SqlServerFhirDataStore(
_requestContextAccessor = EnsureArg.IsNotNull(requestContextAccessor, nameof(requestContextAccessor));

_memoryStreamManager = new RecyclableMemoryStreamManager();

if (_ignoreInputLastUpdated == null)
{
lock (_flagLocker)
{
_ignoreInputLastUpdated ??= new IgnoreInputLastUpdated(_sqlConnectionWrapperFactory);
}
}
}

public async Task<IDictionary<ResourceKey, UpsertOutcome>> MergeAsync(IReadOnlyList<ResourceWrapperOperation> resources, CancellationToken cancellationToken)
Expand Down Expand Up @@ -264,9 +274,20 @@ internal async Task<Dictionary<ResourceKey, UpsertOutcome>> MergeInternalAsync(I
}
}

var surrIdBase = ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.LastModified.DateTime);
var surrId = surrIdBase + minSequenceId + index;
ReplaceVersionIdInMeta(resource);
long surrId;
if (_ignoreInputLastUpdated.IsEnabled())
{
surrId = minSurrId + index;
resource.LastModified = new DateTimeOffset(ResourceSurrogateIdHelper.ResourceSurrogateIdToLastUpdated(surrId), TimeSpan.Zero);
ReplaceVersionIdAndLastUpdatedInMeta(resource);
}
else
{
var surrIdBase = ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(resource.LastModified.DateTime);
surrId = surrIdBase + minSequenceId + index;
ReplaceVersionIdInMeta(resource);
}

resource.ResourceSurrogateId = surrId;
mergeWrappers.Add(new MergeResourceWrapper(resource, resourceExt.KeepHistory, hasVersionToCompare));
index++;
Expand Down Expand Up @@ -484,6 +505,26 @@ private void ReplaceVersionIdInMeta(ResourceWrapper resourceWrapper)
resourceWrapper.RawResource = new RawResource(rawResourceData, FhirResourceFormat.Json, true);
}

private void ReplaceVersionIdAndLastUpdatedInMeta(ResourceWrapper resourceWrapper)
{
var date = GetJsonValue(resourceWrapper.RawResource.Data, "lastUpdated", false);
string rawResourceData;
if (resourceWrapper.Version == InitialVersion) // version is already correct
{
rawResourceData = resourceWrapper.RawResource.Data
.Replace($"\"lastUpdated\":\"{date}\"", $"\"lastUpdated\":\"{RemoveTrailingZerosFromMillisecondsForAGivenDate(resourceWrapper.LastModified)}\"", StringComparison.Ordinal);
}
else
{
var version = GetJsonValue(resourceWrapper.RawResource.Data, "versionId", false);
rawResourceData = resourceWrapper.RawResource.Data
.Replace($"\"versionId\":\"{version}\"", $"\"versionId\":\"{resourceWrapper.Version}\"", StringComparison.Ordinal)
.Replace($"\"lastUpdated\":\"{date}\"", $"\"lastUpdated\":\"{RemoveTrailingZerosFromMillisecondsForAGivenDate(resourceWrapper.LastModified)}\"", StringComparison.Ordinal);
}

resourceWrapper.RawResource = new RawResource(rawResourceData, FhirResourceFormat.Json, true);
}

private bool ExistingRawResourceIsEqualToInput(ResourceWrapper input, ResourceWrapper existing) // call is not symmetrical, it assumes version = 1 on input.
{
var inputDate = GetJsonValue(input.RawResource.Data, "lastUpdated", false);
Expand Down Expand Up @@ -616,5 +657,59 @@ public async Task<ResourceWrapper> UpdateSearchParameterIndicesAsync(ResourceWra
{
return await Task.FromResult((int?)null);
}

private class IgnoreInputLastUpdated
{
private SqlConnectionWrapperFactory _sqlConnectionWrapperFactory;
private bool _isEnabled;
private DateTime? _lastUpdated;
private object _databaseAccessLocker = new object();

public IgnoreInputLastUpdated(SqlConnectionWrapperFactory sqlConnectionWrapperFactory)
{
_sqlConnectionWrapperFactory = sqlConnectionWrapperFactory;
}

public bool IsEnabled()
{
if (_lastUpdated.HasValue && (DateTime.UtcNow - _lastUpdated.Value).TotalSeconds < 600)
{
return _isEnabled;
}

lock (_databaseAccessLocker)
{
if (_lastUpdated.HasValue && (DateTime.UtcNow - _lastUpdated.Value).TotalSeconds < 600)
{
return _isEnabled;
}

var isEnabled = IsEnabledInDatabase();
if (isEnabled.HasValue)
{
_isEnabled = isEnabled.Value;
_lastUpdated = DateTime.UtcNow;
}
}

return _isEnabled;
}

private bool? IsEnabledInDatabase()
{
try
{
using var conn = _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(CancellationToken.None, false).Result;
using var cmd = conn.CreateRetrySqlCommand();
cmd.CommandText = "IF object_id('dbo.Parameters') IS NOT NULL SELECT Number FROM dbo.Parameters WHERE Id = 'MergeResources.IgnoreInputLastUpdated'"; // call can be made before store is initialized
var value = cmd.ExecuteScalarAsync(CancellationToken.None).Result;
return value != null && (double)value == 1;
}
catch (SqlException)
{
return null;
}
}
}
}
}

0 comments on commit 8c434af

Please sign in to comment.