Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SqlStoreClient class, SqlRetryService signatures plus #3427

Merged
merged 25 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions Microsoft.Health.Fhir.sln
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,13 @@ Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "Microsoft.Health.Fhir.Share
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4.Web.UnitTests", "src\Microsoft.Health.Fhir.R4.Web.UnitTests\Microsoft.Health.Fhir.R4.Web.UnitTests.csproj", "{C834E05D-79CA-4983-8599-28AC098F755A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.R4B.Web.UnitTests", "src\Microsoft.Health.Fhir.R4B.Web.UnitTests\Microsoft.Health.Fhir.R4B.Web.UnitTests.csproj", "{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4B.Web.UnitTests", "src\Microsoft.Health.Fhir.R4B.Web.UnitTests\Microsoft.Health.Fhir.R4B.Web.UnitTests.csproj", "{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.R5.Web.UnitTests", "src\Microsoft.Health.Fhir.R5.Web.UnitTests\Microsoft.Health.Fhir.R5.Web.UnitTests.csproj", "{62E8CD81-91A9-4872-BC6E-9EBBED8D50FD}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R5.Web.UnitTests", "src\Microsoft.Health.Fhir.R5.Web.UnitTests\Microsoft.Health.Fhir.R5.Web.UnitTests.csproj", "{62E8CD81-91A9-4872-BC6E-9EBBED8D50FD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Stu3.Web.UnitTests", "src\Microsoft.Health.Fhir.Stu3.Web.UnitTests\Microsoft.Health.Fhir.Stu3.Web.UnitTests.csproj", "{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Stu3.Web.UnitTests", "src\Microsoft.Health.Fhir.Stu3.Web.UnitTests\Microsoft.Health.Fhir.Stu3.Web.UnitTests.csproj", "{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventsReader", "tools\EventsReader\EventsReader.csproj", "{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -440,6 +442,10 @@ Global
{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Release|Any CPU.Build.0 = Release|Any CPU
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -527,10 +533,11 @@ Global
{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
{62E8CD81-91A9-4872-BC6E-9EBBED8D50FD} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
{8F4858B3-A3CF-4130-B3B2-954CBA9FE780} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7} = {B70945F4-01A6-4351-955B-C4A2943B5E3B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8}
RESX_SortFileContentOnSave = True
SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8}
EndGlobalSection
GlobalSection(SharedMSBuildProjectFiles) = preSolution
test\Microsoft.Health.Fhir.Shared.Tests.E2E.Common\Microsoft.Health.Fhir.Shared.Tests.E2E.Common.projitems*{0478b687-7105-40f6-a2dc-81057890e944}*SharedItemsImports = 13
Expand Down
1 change: 1 addition & 0 deletions src/Microsoft.Health.Fhir.SqlServer/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
[assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R4.Tests.E2E")]
[assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R4B.Tests.E2E")]
[assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R5.Tests.E2E")]
[assembly: InternalsVisibleTo("Microsoft.Health.Internal.Fhir.EventsReader")]
[assembly: NeutralResourcesLanguage("en-us")]
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Microsoft.Health.Fhir.SqlServer.Features
{
internal static class ExceptionExtention
internal static class ExceptionExtension
{
internal static bool IsRetriable(this Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private async Task TryLogEvent(string process, string status, string text, Cance
{
if (_store != null)
{
await _store.TryLogEvent(process, status, text, null, cancellationToken);
await _store.StoreClient.TryLogEvent(process, status, text, null, cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private void ImportResourcesInBuffer(List<ImportResource> resources, List<string
}

_logger.LogError(e, $"Error on {nameof(ImportResourcesInBufferInternal)} retries={{Retries}}", retries);
_store.TryLogEvent(nameof(ImportResourcesInBufferInternal), "Error", $"retries={retries} error={e}", null, cancellationToken).Wait();
_store.StoreClient.TryLogEvent(nameof(ImportResourcesInBufferInternal), "Error", $"retries={retries} error={e}", null, cancellationToken).Wait();

throw;
}
Expand Down Expand Up @@ -142,7 +142,7 @@ private async Task ImportResourcesInBufferInternal(List<ImportResource> resource
else
{
// dedup by last updated
var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(true)).Select(_ => _.First()).ToList();
var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId, true)).Select(_ => _.First()).ToList();

// 2 paths:
// 1 - if versions were specified on input then dups need to be checked within input and database
Expand All @@ -155,7 +155,7 @@ private async Task ImportResourcesInBufferInternal(List<ImportResource> resource
// assume that only one unassigned version is provided for a given resource as we cannot guarantee processing order across parallel file streams anyway
var inputDeduppedNoVersion = inputDedupped.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.First()).ToList();
//// check whether record can fit
var currentDates = (await _store.GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(true), _ => _.ToResourceDateKey());
var currentDates = (await _store.GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(true), _ => _.ToResourceDateKey(_model.GetResourceTypeId));
var inputDeduppedNoVersionForCheck = new List<ImportResource>();
foreach (var resource in inputDeduppedNoVersion)
{
Expand All @@ -166,7 +166,7 @@ private async Task ImportResourcesInBufferInternal(List<ImportResource> resource
}
}

var versionSlots = (await _store.GetResourceVervionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey()).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_.ResourceType, _.Id, null), _ => _);
var versionSlots = (await _store.StoreClient.GetResourceVersionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _);
foreach (var resource in inputDeduppedNoVersionForCheck)
{
var resourceKey = resource.ResourceWrapper.ToResourceKey(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,32 +666,12 @@ private static (long StartId, long EndId) ReaderToSurrogateIdRange(SqlDataReader

public override async Task<IReadOnlyList<(long StartId, long EndId)>> GetSurrogateIdRanges(string resourceType, long startId, long endId, int rangeSize, int numberOfRanges, bool up, CancellationToken cancellationToken)
{
// TODO: this code will not set capacity for the result list!

var resourceTypeId = _model.GetResourceTypeId(resourceType);
List<(long StartId, long EndId)> searchList = null;
await _sqlRetryService.ExecuteSql(
async (cancellationToken, sqlException) =>
{
using SqlConnection connection = await _sqlConnectionBuilder.GetSqlConnectionAsync(initialCatalog: null, cancellationToken: cancellationToken).ConfigureAwait(false);
using SqlCommand sqlCommand = connection.CreateCommand();
connection.RetryLogicProvider = null; // To remove this line _sqlConnectionBuilder in healthcare-shared-components must be modified.
await connection.OpenAsync(cancellationToken);

sqlCommand.CommandTimeout = GetReindexCommandTimeout();
GetResourceSurrogateIdRanges.PopulateCommand(sqlCommand, resourceTypeId, startId, endId, rangeSize, numberOfRanges, up);
LogSqlCommand(sqlCommand);

searchList = await _sqlRetryService.ExecuteSqlDataReader(
sqlCommand,
ReaderToSurrogateIdRange,
_logger,
$"{nameof(GetSurrogateIdRanges)} failed.",
cancellationToken);
return;
},
cancellationToken);
return searchList;
using var sqlCommand = new SqlCommand();
GetResourceSurrogateIdRanges.PopulateCommand(sqlCommand, resourceTypeId, startId, endId, rangeSize, numberOfRanges, up);
sqlCommand.CommandTimeout = GetReindexCommandTimeout();
LogSqlCommand(sqlCommand);
return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, ReaderToSurrogateIdRange, _logger, cancellationToken);
}

private static (short ResourceTypeId, string Name) ReaderGetUsedResourceTypes(SqlDataReader sqlDataReader)
Expand All @@ -703,7 +683,7 @@ private static (short ResourceTypeId, string Name) ReaderGetUsedResourceTypes(Sq
{
using var sqlCommand = new SqlCommand("dbo.GetUsedResourceTypes") { CommandType = CommandType.StoredProcedure };
LogSqlCommand(sqlCommand);
return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, ReaderGetUsedResourceTypes, _logger, $"{nameof(GetUsedResourceTypes)} failed.", cancellationToken);
return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, ReaderGetUsedResourceTypes, _logger, cancellationToken);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,23 @@

using System;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Models;

namespace Microsoft.Health.Fhir.Core.Features.Persistence
{
public class ResourceDateKey : IEquatable<ResourceDateKey>
{
public ResourceDateKey(string resourceType, string id, long resourceSurrogateId, string versionId, bool isDeleted = false)
public ResourceDateKey(short resourceTypeId, string id, long resourceSurrogateId, string versionId, bool isDeleted = false)
{
EnsureArg.IsNotNullOrEmpty(resourceType, nameof(resourceType));
EnsureArg.IsNotNullOrEmpty(id, nameof(id));
EnsureArg.IsTrue(ModelInfoProvider.IsKnownResource(resourceType), nameof(resourceType));

ResourceType = resourceType;
ResourceTypeId = resourceTypeId;
Id = id;
ResourceSurrogateId = resourceSurrogateId;
VersionId = versionId;
IsDeleted = isDeleted;
}

public string ResourceType { get; }
public short ResourceTypeId { get; }

public string Id { get; }

Expand All @@ -46,7 +43,7 @@ public bool Equals(ResourceDateKey other)
return true;
}

return ResourceType == other.ResourceType &&
return ResourceTypeId == other.ResourceTypeId &&
Id == other.Id &&
ResourceSurrogateId == other.ResourceSurrogateId &&
VersionId == other.VersionId &&
Expand Down Expand Up @@ -75,7 +72,7 @@ public override bool Equals(object obj)

public override int GetHashCode()
{
return HashCode.Combine(ResourceType, Id, ResourceSurrogateId, VersionId, IsDeleted);
return HashCode.Combine(ResourceTypeId, Id, ResourceSurrogateId, VersionId, IsDeleted);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using Microsoft.Health.Fhir.Core.Features.Persistence;

namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
{
public static class ResourceWrapperExtention
{
public static ResourceDateKey ToResourceDateKey(this ResourceWrapper wrapper, bool ignoreVersion = false)
public static ResourceDateKey ToResourceDateKey(this ResourceWrapper wrapper, Func<string, short> getResourceTypeId, bool ignoreVersion = false)
{
return new ResourceDateKey(wrapper.ResourceTypeName, wrapper.ResourceId, ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(wrapper.LastModified.DateTime), ignoreVersion ? null : wrapper.Version);
return new ResourceDateKey(getResourceTypeId(wrapper.ResourceTypeName), wrapper.ResourceId, ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(wrapper.LastModified.DateTime), ignoreVersion ? null : wrapper.Version);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ public virtual async Task CompleteJobAsync(JobInfo jobInfo, bool requestCancella

await _sqlRetryService.ExecuteSql(
sqlCommand,
async (sqlCommand, cancellationToken) =>
async (cmd, cancel) =>
{
try
{
await sqlCommand.ExecuteNonQueryAsync(cancellationToken);
await cmd.ExecuteNonQueryAsync(cancel);
}
catch (SqlException sqlEx)
{
Expand Down Expand Up @@ -156,7 +156,8 @@ public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heart
sqlCommand.Parameters.AddWithValue("@InputJobId", jobId.Value);
}

JobInfo jobInfo = await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, null, cancellationToken);
var jobInfos = await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken);
var jobInfo = jobInfos.Count == 0 ? null : jobInfos[0];
if (jobInfo != null)
{
jobInfo.QueueType = queueType;
Expand All @@ -180,7 +181,7 @@ public async Task<IReadOnlyList<JobInfo>> EnqueueAsync(byte queueType, string[]

try
{
return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, null, cancellationToken);
return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken);
}
catch (SqlException sqlEx)
{
Expand All @@ -197,21 +198,22 @@ public async Task<IReadOnlyList<JobInfo>> GetJobByGroupIdAsync(byte queueType, l
{
using var sqlCommand = new SqlCommand();
PopulateGetJobsCommand(sqlCommand, queueType, null, null, groupId, returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByGroupIdAsync failed.", cancellationToken);
return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobByGroupIdAsync failed.");
}

public async Task<JobInfo> GetJobByIdAsync(byte queueType, long jobId, bool returnDefinition, CancellationToken cancellationToken)
{
using var sqlCommand = new SqlCommand();
PopulateGetJobsCommand(sqlCommand, queueType, jobId, returnDefinition: returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByIdAsync failed.", cancellationToken);
var jobs = await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobByIdAsync failed.");
return jobs.Count == 0 ? null : jobs[0];
}

public async Task<IReadOnlyList<JobInfo>> GetJobsByIdsAsync(byte queueType, long[] jobIds, bool returnDefinition, CancellationToken cancellationToken)
{
using var cmd = new SqlCommand();
PopulateGetJobsCommand(cmd, queueType, jobIds: jobIds, returnDefinition: returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReader(cmd, JobInfoExtensions.LoadJobInfo, _logger, "GetJobsByIdsAsync failed.", cancellationToken);
return await cmd.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobsByIdsAsync failed.");
}

public bool IsInitialized()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
{
public interface ISqlRetryService
{
Task TryLogEvent(string process, string status, string text, DateTime? startDate, CancellationToken cancellationToken);

Task ExecuteSql(Func<CancellationToken, SqlException, Task> action, CancellationToken cancellationToken);

Task ExecuteSql<TLogger>(SqlCommand sqlCommand, Func<SqlCommand, CancellationToken, Task> action, ILogger<TLogger> logger, string logMessage, CancellationToken cancellationToken);

Task<List<TResult>> ExecuteSqlDataReader<TResult, TLogger>(SqlCommand sqlCommand, Func<SqlDataReader, TResult> readerToResult, ILogger<TLogger> logger, string logMessage, CancellationToken cancellationToken);

Task<TResult> ExecuteSqlDataReaderFirstRow<TResult, TLogger>(SqlCommand sqlCommand, Func<SqlDataReader, TResult> readerToResult, ILogger<TLogger> logger, string logMessage, CancellationToken cancellationToken)
where TResult : class;
Task<IReadOnlyList<TResult>> ExecuteReaderAsync<TResult, TLogger>(SqlCommand sqlCommand, Func<SqlDataReader, TResult> readerToResult, ILogger<TLogger> logger, string logMessage, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;

namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
{
public static class SqlCommandExtensions
{
public static async Task ExecuteNonQueryAsync<TLogger>(this SqlCommand cmd, ISqlRetryService retryService, ILogger<TLogger> logger, CancellationToken cancellationToken, string logMessage = null)
{
await retryService.ExecuteSql(cmd, async (sql, cancel) => await sql.ExecuteNonQueryAsync(cancel), logger, logMessage, cancellationToken);
}

public static async Task<IReadOnlyList<TResult>> ExecuteReaderAsync<TResult, TLogger>(this SqlCommand cmd, ISqlRetryService retryService, Func<SqlDataReader, TResult> readerToResult, ILogger<TLogger> logger, CancellationToken cancellationToken, string logMessage = null)
{
return await retryService.ExecuteReaderAsync(cmd, readerToResult, logger, logMessage, cancellationToken);
}
}
}
Loading