diff --git a/Microsoft.Health.Fhir.sln b/Microsoft.Health.Fhir.sln index 6dd1c53acd..07efb84bc4 100644 --- a/Microsoft.Health.Fhir.sln +++ b/Microsoft.Health.Fhir.sln @@ -184,11 +184,13 @@ Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "Microsoft.Health.Fhir.Share EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4.Web.UnitTests", "src\Microsoft.Health.Fhir.R4.Web.UnitTests\Microsoft.Health.Fhir.R4.Web.UnitTests.csproj", "{C834E05D-79CA-4983-8599-28AC098F755A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.R4B.Web.UnitTests", "src\Microsoft.Health.Fhir.R4B.Web.UnitTests\Microsoft.Health.Fhir.R4B.Web.UnitTests.csproj", "{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4B.Web.UnitTests", "src\Microsoft.Health.Fhir.R4B.Web.UnitTests\Microsoft.Health.Fhir.R4B.Web.UnitTests.csproj", "{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.R5.Web.UnitTests", "src\Microsoft.Health.Fhir.R5.Web.UnitTests\Microsoft.Health.Fhir.R5.Web.UnitTests.csproj", "{62E8CD81-91A9-4872-BC6E-9EBBED8D50FD}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R5.Web.UnitTests", "src\Microsoft.Health.Fhir.R5.Web.UnitTests\Microsoft.Health.Fhir.R5.Web.UnitTests.csproj", "{62E8CD81-91A9-4872-BC6E-9EBBED8D50FD}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Stu3.Web.UnitTests", "src\Microsoft.Health.Fhir.Stu3.Web.UnitTests\Microsoft.Health.Fhir.Stu3.Web.UnitTests.csproj", "{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Stu3.Web.UnitTests", "src\Microsoft.Health.Fhir.Stu3.Web.UnitTests\Microsoft.Health.Fhir.Stu3.Web.UnitTests.csproj", "{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventsReader", "tools\EventsReader\EventsReader.csproj", "{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -440,6 +442,10 @@ Global {8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Debug|Any CPU.Build.0 = Debug|Any CPU {8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Release|Any CPU.ActiveCfg = Release|Any CPU {8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Release|Any CPU.Build.0 = Release|Any CPU + {9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -527,10 +533,11 @@ Global {6F000A06-6307-46FF-83FA-DD9FD2FD2AA5} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05} {62E8CD81-91A9-4872-BC6E-9EBBED8D50FD} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05} {8F4858B3-A3CF-4130-B3B2-954CBA9FE780} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05} + {9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7} = {B70945F4-01A6-4351-955B-C4A2943B5E3B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution - SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8} RESX_SortFileContentOnSave = True + SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8} EndGlobalSection GlobalSection(SharedMSBuildProjectFiles) = preSolution test\Microsoft.Health.Fhir.Shared.Tests.E2E.Common\Microsoft.Health.Fhir.Shared.Tests.E2E.Common.projitems*{0478b687-7105-40f6-a2dc-81057890e944}*SharedItemsImports = 13 diff --git a/src/Microsoft.Health.Fhir.SqlServer/AssemblyInfo.cs b/src/Microsoft.Health.Fhir.SqlServer/AssemblyInfo.cs index 4bb5918d35..d34f610740 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/AssemblyInfo.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/AssemblyInfo.cs @@ -15,4 +15,5 @@ [assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R4.Tests.E2E")] [assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R4B.Tests.E2E")] [assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R5.Tests.E2E")] +[assembly: InternalsVisibleTo("Microsoft.Health.Internal.Fhir.EventsReader")] [assembly: NeutralResourcesLanguage("en-us")] diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtention.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtension.cs similarity index 99% rename from src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtention.cs rename to src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtension.cs index f7a1c421bd..a18bcf9969 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtention.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtension.cs @@ -6,7 +6,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features { - internal static class ExceptionExtention + internal static class ExceptionExtension { internal static bool IsRetriable(this Exception e) { diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImportReindexer.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImportReindexer.cs index 4172b11ccd..ab3314dd67 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImportReindexer.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImportReindexer.cs @@ -209,7 +209,7 @@ private async Task TryLogEvent(string process, string status, string text, Cance { if (_store != null) { - await _store.TryLogEvent(process, status, text, null, cancellationToken); + await _store.StoreClient.TryLogEvent(process, status, text, null, cancellationToken); } } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs index 1b8f371957..f56794306f 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs @@ -111,7 +111,7 @@ private void ImportResourcesInBuffer(List resources, List resource else { // dedup by last updated - var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(true)).Select(_ => _.First()).ToList(); + var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId, true)).Select(_ => _.First()).ToList(); // 2 paths: // 1 - if versions were specified on input then dups need to be checked within input and database @@ -155,7 +155,7 @@ private async Task ImportResourcesInBufferInternal(List resource // assume that only one unassigned version is provided for a given resource as we cannot guarantee processing order across parallel file streams anyway var inputDeduppedNoVersion = inputDedupped.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.First()).ToList(); //// check whether record can fit - var currentDates = (await _store.GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(true), _ => _.ToResourceDateKey()); + var currentDates = (await _store.GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(true), _ => _.ToResourceDateKey(_model.GetResourceTypeId)); var inputDeduppedNoVersionForCheck = new List(); foreach (var resource in inputDeduppedNoVersion) { @@ -166,7 +166,7 @@ private async Task ImportResourcesInBufferInternal(List resource } } - var versionSlots = (await _store.GetResourceVervionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey()).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_.ResourceType, _.Id, null), _ => _); + var versionSlots = (await _store.StoreClient.GetResourceVersionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _); foreach (var resource in inputDeduppedNoVersionForCheck) { var resourceKey = resource.ResourceWrapper.ToResourceKey(true); diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Search/SqlServerSearchService.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Search/SqlServerSearchService.cs index d33c822f6d..8db4186f8a 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Search/SqlServerSearchService.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Search/SqlServerSearchService.cs @@ -666,32 +666,12 @@ private static (long StartId, long EndId) ReaderToSurrogateIdRange(SqlDataReader public override async Task> GetSurrogateIdRanges(string resourceType, long startId, long endId, int rangeSize, int numberOfRanges, bool up, CancellationToken cancellationToken) { - // TODO: this code will not set capacity for the result list! - var resourceTypeId = _model.GetResourceTypeId(resourceType); - List<(long StartId, long EndId)> searchList = null; - await _sqlRetryService.ExecuteSql( - async (cancellationToken, sqlException) => - { - using SqlConnection connection = await _sqlConnectionBuilder.GetSqlConnectionAsync(initialCatalog: null, cancellationToken: cancellationToken).ConfigureAwait(false); - using SqlCommand sqlCommand = connection.CreateCommand(); - connection.RetryLogicProvider = null; // To remove this line _sqlConnectionBuilder in healthcare-shared-components must be modified. - await connection.OpenAsync(cancellationToken); - - sqlCommand.CommandTimeout = GetReindexCommandTimeout(); - GetResourceSurrogateIdRanges.PopulateCommand(sqlCommand, resourceTypeId, startId, endId, rangeSize, numberOfRanges, up); - LogSqlCommand(sqlCommand); - - searchList = await _sqlRetryService.ExecuteSqlDataReader( - sqlCommand, - ReaderToSurrogateIdRange, - _logger, - $"{nameof(GetSurrogateIdRanges)} failed.", - cancellationToken); - return; - }, - cancellationToken); - return searchList; + using var sqlCommand = new SqlCommand(); + GetResourceSurrogateIdRanges.PopulateCommand(sqlCommand, resourceTypeId, startId, endId, rangeSize, numberOfRanges, up); + sqlCommand.CommandTimeout = GetReindexCommandTimeout(); + LogSqlCommand(sqlCommand); + return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, ReaderToSurrogateIdRange, _logger, cancellationToken); } private static (short ResourceTypeId, string Name) ReaderGetUsedResourceTypes(SqlDataReader sqlDataReader) @@ -703,7 +683,7 @@ private static (short ResourceTypeId, string Name) ReaderGetUsedResourceTypes(Sq { using var sqlCommand = new SqlCommand("dbo.GetUsedResourceTypes") { CommandType = CommandType.StoredProcedure }; LogSqlCommand(sqlCommand); - return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, ReaderGetUsedResourceTypes, _logger, $"{nameof(GetUsedResourceTypes)} failed.", cancellationToken); + return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, ReaderGetUsedResourceTypes, _logger, cancellationToken); } /// diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/ResourceDateKey.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/ResourceDateKey.cs index b59dcdb1b1..32423107f9 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/ResourceDateKey.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/ResourceDateKey.cs @@ -5,26 +5,23 @@ using System; using EnsureThat; -using Microsoft.Health.Fhir.Core.Models; namespace Microsoft.Health.Fhir.Core.Features.Persistence { public class ResourceDateKey : IEquatable { - public ResourceDateKey(string resourceType, string id, long resourceSurrogateId, string versionId, bool isDeleted = false) + public ResourceDateKey(short resourceTypeId, string id, long resourceSurrogateId, string versionId, bool isDeleted = false) { - EnsureArg.IsNotNullOrEmpty(resourceType, nameof(resourceType)); EnsureArg.IsNotNullOrEmpty(id, nameof(id)); - EnsureArg.IsTrue(ModelInfoProvider.IsKnownResource(resourceType), nameof(resourceType)); - ResourceType = resourceType; + ResourceTypeId = resourceTypeId; Id = id; ResourceSurrogateId = resourceSurrogateId; VersionId = versionId; IsDeleted = isDeleted; } - public string ResourceType { get; } + public short ResourceTypeId { get; } public string Id { get; } @@ -46,7 +43,7 @@ public bool Equals(ResourceDateKey other) return true; } - return ResourceType == other.ResourceType && + return ResourceTypeId == other.ResourceTypeId && Id == other.Id && ResourceSurrogateId == other.ResourceSurrogateId && VersionId == other.VersionId && @@ -75,7 +72,7 @@ public override bool Equals(object obj) public override int GetHashCode() { - return HashCode.Combine(ResourceType, Id, ResourceSurrogateId, VersionId, IsDeleted); + return HashCode.Combine(ResourceTypeId, Id, ResourceSurrogateId, VersionId, IsDeleted); } } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/ResourceWrapperExtention.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/ResourceWrapperExtention.cs index 49b31684a4..2bde02c83e 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/ResourceWrapperExtention.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/ResourceWrapperExtention.cs @@ -3,15 +3,16 @@ // Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. // ------------------------------------------------------------------------------------------------- +using System; using Microsoft.Health.Fhir.Core.Features.Persistence; namespace Microsoft.Health.Fhir.SqlServer.Features.Storage { public static class ResourceWrapperExtention { - public static ResourceDateKey ToResourceDateKey(this ResourceWrapper wrapper, bool ignoreVersion = false) + public static ResourceDateKey ToResourceDateKey(this ResourceWrapper wrapper, Func getResourceTypeId, bool ignoreVersion = false) { - return new ResourceDateKey(wrapper.ResourceTypeName, wrapper.ResourceId, ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(wrapper.LastModified.DateTime), ignoreVersion ? null : wrapper.Version); + return new ResourceDateKey(getResourceTypeId(wrapper.ResourceTypeName), wrapper.ResourceId, ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(wrapper.LastModified.DateTime), ignoreVersion ? null : wrapper.Version); } } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs index e3ddaa1ee8..0f4573412a 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlQueueClient.cs @@ -102,11 +102,11 @@ public virtual async Task CompleteJobAsync(JobInfo jobInfo, bool requestCancella await _sqlRetryService.ExecuteSql( sqlCommand, - async (sqlCommand, cancellationToken) => + async (cmd, cancel) => { try { - await sqlCommand.ExecuteNonQueryAsync(cancellationToken); + await cmd.ExecuteNonQueryAsync(cancel); } catch (SqlException sqlEx) { @@ -156,7 +156,8 @@ public async Task DequeueAsync(byte queueType, string worker, int heart sqlCommand.Parameters.AddWithValue("@InputJobId", jobId.Value); } - JobInfo jobInfo = await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, null, cancellationToken); + var jobInfos = await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken); + var jobInfo = jobInfos.Count == 0 ? null : jobInfos[0]; if (jobInfo != null) { jobInfo.QueueType = queueType; @@ -180,7 +181,7 @@ public async Task> EnqueueAsync(byte queueType, string[] try { - return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, null, cancellationToken); + return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken); } catch (SqlException sqlEx) { @@ -197,21 +198,22 @@ public async Task> GetJobByGroupIdAsync(byte queueType, l { using var sqlCommand = new SqlCommand(); PopulateGetJobsCommand(sqlCommand, queueType, null, null, groupId, returnDefinition); - return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByGroupIdAsync failed.", cancellationToken); + return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobByGroupIdAsync failed."); } public async Task GetJobByIdAsync(byte queueType, long jobId, bool returnDefinition, CancellationToken cancellationToken) { using var sqlCommand = new SqlCommand(); PopulateGetJobsCommand(sqlCommand, queueType, jobId, returnDefinition: returnDefinition); - return await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByIdAsync failed.", cancellationToken); + var jobs = await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobByIdAsync failed."); + return jobs.Count == 0 ? null : jobs[0]; } public async Task> GetJobsByIdsAsync(byte queueType, long[] jobIds, bool returnDefinition, CancellationToken cancellationToken) { using var cmd = new SqlCommand(); PopulateGetJobsCommand(cmd, queueType, jobIds: jobIds, returnDefinition: returnDefinition); - return await _sqlRetryService.ExecuteSqlDataReader(cmd, JobInfoExtensions.LoadJobInfo, _logger, "GetJobsByIdsAsync failed.", cancellationToken); + return await cmd.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobsByIdsAsync failed."); } public bool IsInitialized() diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/ISqlRetryService.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/ISqlRetryService.cs index 1d2753329a..9dd9f53556 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/ISqlRetryService.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/ISqlRetryService.cs @@ -14,13 +14,12 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage { public interface ISqlRetryService { + Task TryLogEvent(string process, string status, string text, DateTime? startDate, CancellationToken cancellationToken); + Task ExecuteSql(Func action, CancellationToken cancellationToken); Task ExecuteSql(SqlCommand sqlCommand, Func action, ILogger logger, string logMessage, CancellationToken cancellationToken); - Task> ExecuteSqlDataReader(SqlCommand sqlCommand, Func readerToResult, ILogger logger, string logMessage, CancellationToken cancellationToken); - - Task ExecuteSqlDataReaderFirstRow(SqlCommand sqlCommand, Func readerToResult, ILogger logger, string logMessage, CancellationToken cancellationToken) - where TResult : class; + Task> ExecuteReaderAsync(SqlCommand sqlCommand, Func readerToResult, ILogger logger, string logMessage, CancellationToken cancellationToken); } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/SqlCommandExtensions.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/SqlCommandExtensions.cs new file mode 100644 index 0000000000..bfd9d45b96 --- /dev/null +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/SqlCommandExtensions.cs @@ -0,0 +1,27 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Health.Fhir.SqlServer.Features.Storage +{ + public static class SqlCommandExtensions + { + public static async Task ExecuteNonQueryAsync(this SqlCommand cmd, ISqlRetryService retryService, ILogger logger, CancellationToken cancellationToken, string logMessage = null) + { + await retryService.ExecuteSql(cmd, async (sql, cancel) => await sql.ExecuteNonQueryAsync(cancel), logger, logMessage, cancellationToken); + } + + public static async Task> ExecuteReaderAsync(this SqlCommand cmd, ISqlRetryService retryService, Func readerToResult, ILogger logger, CancellationToken cancellationToken, string logMessage = null) + { + return await retryService.ExecuteReaderAsync(cmd, readerToResult, logger, logMessage, cancellationToken); + } + } +} diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/SqlRetryService.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/SqlRetryService.cs index 38a93b5140..3da843192e 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/SqlRetryService.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlRetry/SqlRetryService.cs @@ -60,13 +60,13 @@ private readonly HashSet _transientErrors SqlErrorCodes.QueryProcessorNoQueryPlan, // The query processor ran out of internal resources and could not produce a query plan. }; - private readonly ISqlConnectionBuilder _sqlConnectionBuilder; - private readonly SqlServerDataStoreConfiguration _sqlServerDataStoreConfiguration; + private ISqlConnectionBuilder _sqlConnectionBuilder; private readonly IsExceptionRetriable _defaultIsExceptionRetriable = DefaultIsExceptionRetriable; private readonly bool _defaultIsExceptionRetriableOff; private readonly IsExceptionRetriable _customIsExceptionRetriable; - private readonly int _maxRetries; - private readonly int _retryMillisecondsDelay; + private int _maxRetries; + private int _retryMillisecondsDelay; + private int _commandTimeout; /// /// Constructor that initializes this implementation of the ISqlRetryService interface. This class @@ -86,7 +86,7 @@ public SqlRetryService( EnsureArg.IsNotNull(sqlConnectionBuilder, nameof(sqlConnectionBuilder)); EnsureArg.IsNotNull(sqlRetryServiceOptions?.Value, nameof(sqlRetryServiceOptions)); EnsureArg.IsNotNull(sqlRetryServiceDelegateOptions, nameof(sqlRetryServiceDelegateOptions)); - _sqlServerDataStoreConfiguration = EnsureArg.IsNotNull(sqlServerDataStoreConfiguration?.Value, nameof(sqlServerDataStoreConfiguration)); + _commandTimeout = (int)EnsureArg.IsNotNull(sqlServerDataStoreConfiguration?.Value, nameof(sqlServerDataStoreConfiguration)).CommandTimeout.TotalSeconds; _sqlConnectionBuilder = sqlConnectionBuilder; @@ -107,6 +107,11 @@ public SqlRetryService( _customIsExceptionRetriable = sqlRetryServiceDelegateOptions.CustomIsExceptionRetriable; } + private SqlRetryService(ISqlConnectionBuilder sqlConnectionBuilder) + { + _sqlConnectionBuilder = sqlConnectionBuilder; + } + /// /// Defines a custom delegate that can be used instead of or in addition to IsExceptionRetriable method to examine if thrown /// exception represent a retriable error. @@ -116,6 +121,23 @@ public SqlRetryService( /// public delegate bool IsExceptionRetriable(Exception ex); + /// + /// Simplified class generator. + /// + /// Internal FHIR server interface used to create SqlConnection. + /// command timeout. + /// max retries. + /// retry milliseconds delay. + public static SqlRetryService GetInstance(ISqlConnectionBuilder sqlConnectionBuilder, int commandTimeout = 300, int maxRetries = 5, int retryMillisecondsDelay = 5000) + { + EnsureArg.IsNotNull(sqlConnectionBuilder, nameof(sqlConnectionBuilder)); + var service = new SqlRetryService(sqlConnectionBuilder); + service._commandTimeout = commandTimeout; + service._maxRetries = maxRetries; + service._retryMillisecondsDelay = retryMillisecondsDelay; + return service; + } + /// /// This method examines exception and determines if the exception represent an retriable error. /// In this case the code that caused the exception is executed again. @@ -243,13 +265,13 @@ public async Task ExecuteSql(SqlCommand sqlCommand, Func 0) { - await TryLogToDatabase($"Retry:{sqlCommand.CommandText}", "Warn", $"retries={retry} error={lastException}", start, cancellationToken); + await TryLogEvent($"Retry:{sqlCommand.CommandText}", "Warn", $"retries={retry} error={lastException}", start, cancellationToken); } return; @@ -265,7 +287,7 @@ public async Task ExecuteSql(SqlCommand sqlCommand, Func= _maxRetries) { logger.LogError(ex, $"Final attempt ({retry}): {logMessage}"); - await TryLogToDatabase($"Retry:{sqlCommand.CommandText}", "Error", $"retries={retry} error={lastException}", start, cancellationToken); + await TryLogEvent($"Retry:{sqlCommand.CommandText}", "Error", $"retries={retry} error={lastException}", start, cancellationToken); throw; } @@ -276,7 +298,7 @@ public async Task ExecuteSql(SqlCommand sqlCommand, Func> ExecuteSqlDataReader(SqlCommand sqlCommand, Func readerToResult, ILogger logger, string logMessage, bool allRows, CancellationToken cancellationToken) + private async Task> ExecuteSqlDataReader(SqlCommand sqlCommand, Func readerToResult, ILogger logger, string logMessage, bool allRows, CancellationToken cancellationToken) { EnsureArg.IsNotNull(sqlCommand, nameof(sqlCommand)); EnsureArg.IsNotNull(readerToResult, nameof(readerToResult)); @@ -320,34 +342,20 @@ await ExecuteSql( /// A task representing the asynchronous operation that returns all the rows that result from execution. The rows are translated by delegate /// into data type. /// When executing this method, if exception is thrown that is not retriable or if last retry fails, then same exception is thrown by this method. - public async Task> ExecuteSqlDataReader(SqlCommand sqlCommand, Func readerToResult, ILogger logger, string logMessage, CancellationToken cancellationToken) + public async Task> ExecuteReaderAsync(SqlCommand sqlCommand, Func readerToResult, ILogger logger, string logMessage, CancellationToken cancellationToken) { return await ExecuteSqlDataReader(sqlCommand, readerToResult, logger, logMessage, true, cancellationToken); } /// - /// Executes and reads the first row only. Translates the read row by using - /// into the returned data type. Retries execution of on SQL error or failed - /// SQL connection error. In the case if non-retriable exception or if the last retry failed tha same exception is thrown. + /// Tries logging an event to the EventLog table. /// - /// Defines data type for the returned SQL row. - /// Type used for the . - /// SQL command to be executed. - /// Translation delegate that translates the row returned by execution into the data type. - /// Logger used on first try error or retry error. - /// Message to be logged on error. + /// Name of the process. + /// Status. By default Warn and Error are logged automatically. Other stuses can be enabled in the Parameters table. + /// Message text. + /// Optional start date of the process. /// Cancellation token. - /// A task representing the asynchronous operation that returns the first row that results from execution. The row is translated by delegate - /// into data type. - /// When executing this method, if exception is thrown that is not retriable or if last retry fails, then same exception is thrown by this method. - public async Task ExecuteSqlDataReaderFirstRow(SqlCommand sqlCommand, Func readerToResult, ILogger logger, string logMessage, CancellationToken cancellationToken) - where TResult : class - { - List result = await ExecuteSqlDataReader(sqlCommand, readerToResult, logger, logMessage, false, cancellationToken); - return result.Count > 0 ? result[0] : null; - } - - private async Task TryLogToDatabase(string process, string status, string text, DateTime? startDate, CancellationToken cancellationToken) + public async Task TryLogEvent(string process, string status, string text, DateTime? startDate, CancellationToken cancellationToken) { try { diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs index cee7b4eadb..d425e120a2 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs @@ -7,7 +7,6 @@ using System.Collections.Generic; using System.Data; using System.Globalization; -using System.IO; using System.Linq; using System.Security.Cryptography; using System.Threading; @@ -54,6 +53,7 @@ internal class SqlServerFhirDataStore : IFhirDataStore, IProvideCapability private readonly IBundleOrchestrator _bundleOrchestrator; private readonly CoreFeatureConfiguration _coreFeatures; private readonly ISqlRetryService _sqlRetryService; + private readonly SqlStoreClient _sqlStoreClient; private readonly SqlConnectionWrapperFactory _sqlConnectionWrapperFactory; private readonly ICompressedRawResourceConverter _compressedRawResourceConverter; private readonly ILogger _logger; @@ -84,6 +84,7 @@ public SqlServerFhirDataStore( _coreFeatures = EnsureArg.IsNotNull(coreFeatures?.Value, nameof(coreFeatures)); _bundleOrchestrator = EnsureArg.IsNotNull(bundleOrchestrator, nameof(bundleOrchestrator)); _sqlRetryService = EnsureArg.IsNotNull(sqlRetryService, nameof(sqlRetryService)); + _sqlStoreClient = new SqlStoreClient(_sqlRetryService, logger); _sqlConnectionWrapperFactory = EnsureArg.IsNotNull(sqlConnectionWrapperFactory, nameof(sqlConnectionWrapperFactory)); _compressedRawResourceConverter = EnsureArg.IsNotNull(compressedRawResourceConverter, nameof(compressedRawResourceConverter)); _logger = EnsureArg.IsNotNull(logger, nameof(logger)); @@ -102,6 +103,8 @@ public SqlServerFhirDataStore( } } + internal SqlStoreClient StoreClient => _sqlStoreClient; + internal static TimeSpan MergeResourcesTransactionHeartbeatPeriod => TimeSpan.FromSeconds(10); public async Task> MergeAsync(IReadOnlyList resources, CancellationToken cancellationToken) @@ -124,13 +127,13 @@ public async Task r.Wrapper.ToResourceKey(true)).Distinct().ToList(), cancellationToken)).ToDictionary(r => r.ToResourceKey(true), r => r); // assume that most likely case is that all resources should be updated - (var transactionId, var minSequenceId) = await MergeResourcesBeginTransactionAsync(resources.Count, cancellationToken); + (var transactionId, var minSequenceId) = await StoreClient.MergeResourcesBeginTransactionAsync(resources.Count, cancellationToken); var index = 0; var mergeWrappers = new List(); @@ -304,7 +307,7 @@ internal async Task 0) // do not call db with empty input { - await using (new Timer(async _ => await MergeResourcesPutTransactionHeartbeatAsync(transactionId, MergeResourcesTransactionHeartbeatPeriod, cancellationToken), null, TimeSpan.FromSeconds(RandomNumberGenerator.GetInt32(100) / 100.0 * MergeResourcesTransactionHeartbeatPeriod.TotalSeconds), MergeResourcesTransactionHeartbeatPeriod)) + await using (new Timer(async _ => await _sqlStoreClient.MergeResourcesPutTransactionHeartbeatAsync(transactionId, MergeResourcesTransactionHeartbeatPeriod, cancellationToken), null, TimeSpan.FromSeconds(RandomNumberGenerator.GetInt32(100) / 100.0 * MergeResourcesTransactionHeartbeatPeriod.TotalSeconds), MergeResourcesTransactionHeartbeatPeriod)) { var retries = 0; var timeoutRetries = 0; @@ -321,12 +324,12 @@ internal async Task await sql.ExecuteNonQueryAsync(cancellationToken), _logger, null, cancellationToken); - } - catch (Exception e) - { - _logger.LogWarning(e, $"Error from SQL database on {nameof(MergeResourcesPutTransactionHeartbeatAsync)}"); - } - } - - internal async Task TryLogEvent(string process, string status, string text, DateTime? startDate, CancellationToken cancellationToken) - { - try - { - using var conn = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, false); - using var cmd = conn.CreateNonRetrySqlCommand(); - VLatest.LogEvent.PopulateCommand(cmd, process, status, null, null, null, null, startDate, text, null, null); - await cmd.ExecuteNonQueryAsync(cancellationToken); - } - catch - { - // do nothing; - } - } - public async Task UpsertAsync(ResourceWrapperOperation resource, CancellationToken cancellationToken) { bool isBundleOperation = _bundleOrchestrator.IsEnabled && resource.BundleOperationId != null; @@ -426,111 +400,7 @@ public async Task UpsertAsync(ResourceWrapperOperation resource, public async Task> GetAsync(IReadOnlyList keys, CancellationToken cancellationToken) { - if (keys == null || keys.Count == 0) - { - return new List(); - } - - using var cmd = new SqlCommand() { CommandText = "dbo.GetResources", CommandType = CommandType.StoredProcedure, CommandTimeout = 180 + (int)(2400.0 / 10000 * keys.Count) }; - var tvpRows = keys.Select(_ => new ResourceKeyListRow(_model.GetResourceTypeId(_.ResourceType), _.Id, _.VersionId == null ? null : int.TryParse(_.VersionId, out var version) ? version : int.MinValue)); - new ResourceKeyListTableValuedParameterDefinition("@ResourceKeys").AddParameter(cmd.Parameters, tvpRows); - var start = DateTime.UtcNow; - var timeoutRetries = 0; - while (true) - { - try - { - return await _sqlRetryService.ExecuteSqlDataReader(cmd, (reader) => { return ReadResourceWrapper(reader, false); }, _logger, null, cancellationToken); - } - catch (Exception e) - { - if (e.IsExecutionTimeout() && timeoutRetries++ < 3) - { - _logger.LogWarning(e, $"Error on {nameof(GetAsync)} timeoutRetries={{TimeoutRetries}}", timeoutRetries); - await TryLogEvent(nameof(GetAsync), "Warn", $"timeout retries={timeoutRetries}", start, cancellationToken); - await Task.Delay(5000, cancellationToken); - continue; - } - } - } - } - - private ResourceWrapper ReadResourceWrapper(SqlDataReader reader, bool readRequestMethod) - { - var resourceTypeId = reader.Read(VLatest.Resource.ResourceTypeId, 0); - var resourceId = reader.Read(VLatest.Resource.ResourceId, 1); - var resourceSurrogateId = reader.Read(VLatest.Resource.ResourceSurrogateId, 2); - var version = reader.Read(VLatest.Resource.Version, 3); - var isDeleted = reader.Read(VLatest.Resource.IsDeleted, 4); - var isHistory = reader.Read(VLatest.Resource.IsHistory, 5); - var rawResourceBytes = reader.GetSqlBytes(6).Value; - var isRawResourceMetaSet = reader.Read(VLatest.Resource.IsRawResourceMetaSet, 7); - var searchParamHash = reader.Read(VLatest.Resource.SearchParamHash, 8); - var requestMethod = readRequestMethod ? reader.Read(VLatest.Resource.RequestMethod, 9) : null; - - using var rawResourceStream = new MemoryStream(rawResourceBytes); - var rawResource = _compressedRawResourceConverter.ReadCompressedRawResource(rawResourceStream); - - return new ResourceWrapper( - resourceId, - version.ToString(CultureInfo.InvariantCulture), - _model.GetResourceTypeName(resourceTypeId), - new RawResource(rawResource, FhirResourceFormat.Json, isMetaSet: isRawResourceMetaSet), - readRequestMethod ? new ResourceRequest(requestMethod) : null, - new DateTimeOffset(ResourceSurrogateIdHelper.ResourceSurrogateIdToLastUpdated(resourceSurrogateId), TimeSpan.Zero), - isDeleted, - searchIndices: null, - compartmentIndices: null, - lastModifiedClaims: null, - searchParameterHash: searchParamHash, - resourceSurrogateId: resourceSurrogateId) - { - IsHistory = isHistory, - }; - } - - private ResourceDateKey ReadResourceDateKeyWrapper(SqlDataReader reader) - { - var resourceTypeId = reader.Read(VLatest.Resource.ResourceTypeId, 0); - var resourceId = reader.Read(VLatest.Resource.ResourceId, 1); - var resourceSurrogateId = reader.Read(VLatest.Resource.ResourceSurrogateId, 2); - var version = reader.Read(VLatest.Resource.Version, 3); - var isDeleted = reader.Read(VLatest.Resource.IsDeleted, 4); - - return new ResourceDateKey( - _model.GetResourceTypeName(resourceTypeId), - resourceId, - resourceSurrogateId, - version.ToString(CultureInfo.InvariantCulture), - isDeleted); - } - - public async Task> GetResourceVervionsAsync(IReadOnlyList keys, CancellationToken cancellationToken) - { - var resources = new List(); - if (keys == null || keys.Count == 0) - { - return resources; - } - - using var cmd = new SqlCommand() { CommandText = "dbo.GetResourceVersions", CommandType = CommandType.StoredProcedure, CommandTimeout = 180 + (int)(1200.0 / 10000 * keys.Count) }; - var tvpRows = keys.Select(_ => new ResourceDateKeyListRow(_model.GetResourceTypeId(_.ResourceType), _.Id, _.ResourceSurrogateId)); - new ResourceDateKeyListTableValuedParameterDefinition("@ResourceDateKeys").AddParameter(cmd.Parameters, tvpRows); - var table = VLatest.Resource; - resources = await _sqlRetryService.ExecuteSqlDataReader( - cmd, - (reader) => - { - var resourceTypeId = reader.Read(table.ResourceTypeId, 0); - var resourceId = reader.Read(table.ResourceId, 1); - var resourceSurrogateId = reader.Read(table.ResourceSurrogateId, 2); - var version = reader.Read(table.Version, 3); - return new ResourceDateKey(_model.GetResourceTypeName(resourceTypeId), resourceId, resourceSurrogateId, version.ToString(CultureInfo.InvariantCulture)); - }, - _logger, - null, - cancellationToken); - return resources; + return await _sqlStoreClient.GetAsync(keys, _model.GetResourceTypeId, _compressedRawResourceConverter.ReadCompressedRawResource, _model.GetResourceTypeName, cancellationToken); } public async Task GetAsync(ResourceKey key, CancellationToken cancellationToken) @@ -541,10 +411,7 @@ public async Task GetAsync(ResourceKey key, CancellationToken c public async Task HardDeleteAsync(ResourceKey key, bool keepCurrentVersion, CancellationToken cancellationToken) { - using var sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true); - using var sqlCommandWrapper = sqlConnectionWrapper.CreateRetrySqlCommand(); - VLatest.HardDeleteResource.PopulateCommand(sqlCommandWrapper, _model.GetResourceTypeId(key.ResourceType), key.Id, keepCurrentVersion, _coreFeatures.SupportsResourceChangeCapture); - await sqlCommandWrapper.ExecuteNonQueryAsync(cancellationToken); + await _sqlStoreClient.HardDeleteAsync(_model.GetResourceTypeId(key.ResourceType), key.Id, keepCurrentVersion, _coreFeatures.SupportsResourceChangeCapture, cancellationToken); } public async Task BulkUpdateSearchParameterIndicesAsync(IReadOnlyCollection resources, CancellationToken cancellationToken) @@ -697,115 +564,9 @@ public void Build(ICapabilityStatementBuilder builder) } } - internal async Task MergeResourcesGetTransactionVisibilityAsync(CancellationToken cancellationToken) - { - using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesGetTransactionVisibility", CommandType = CommandType.StoredProcedure }; - var transactionIdParam = new SqlParameter("@TransactionId", SqlDbType.BigInt) { Direction = ParameterDirection.Output }; - cmd.Parameters.Add(transactionIdParam); - await _sqlRetryService.ExecuteSql(cmd, async (sql, cancellationToken) => await sql.ExecuteNonQueryAsync(cancellationToken), _logger, null, cancellationToken); - return (long)transactionIdParam.Value; - } - - internal async Task<(long TransactionId, int Sequence)> MergeResourcesBeginTransactionAsync(int resourceVersionCount, CancellationToken cancellationToken, DateTime? heartbeatDate = null) - { - using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesBeginTransaction", CommandType = CommandType.StoredProcedure }; - cmd.Parameters.AddWithValue("@Count", resourceVersionCount); - var transactionIdParam = new SqlParameter("@TransactionId", SqlDbType.BigInt) { Direction = ParameterDirection.Output }; - cmd.Parameters.Add(transactionIdParam); - var sequenceParam = new SqlParameter("@SequenceRangeFirstValue", SqlDbType.Int) { Direction = ParameterDirection.Output }; - cmd.Parameters.Add(sequenceParam); - if (heartbeatDate.HasValue) - { - cmd.Parameters.AddWithValue("@HeartbeatDate", heartbeatDate.Value); - } - - await _sqlRetryService.ExecuteSql(cmd, async (sql, cancellationToken) => await sql.ExecuteNonQueryAsync(cancellationToken), _logger, null, cancellationToken); - return ((long)transactionIdParam.Value, (int)sequenceParam.Value); - } - - internal async Task MergeResourcesDeleteInvisibleHistory(long transactionId, CancellationToken cancellationToken) - { - using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesDeleteInvisibleHistory", CommandType = CommandType.StoredProcedure }; - cmd.Parameters.AddWithValue("@TransactionId", transactionId); - var affectedRowsParam = new SqlParameter("@affectedRows", SqlDbType.Int) { Direction = ParameterDirection.Output }; - cmd.Parameters.Add(affectedRowsParam); - await _sqlRetryService.ExecuteSql(cmd, async (sql, cancellationToken) => await sql.ExecuteNonQueryAsync(cancellationToken), _logger, null, cancellationToken); - return (int)affectedRowsParam.Value; - } - - internal async Task MergeResourcesCommitTransactionAsync(long transactionId, string failureReason, CancellationToken cancellationToken) - { - using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesCommitTransaction", CommandType = CommandType.StoredProcedure }; - cmd.Parameters.AddWithValue("@TransactionId", transactionId); - if (failureReason != null) - { - cmd.Parameters.AddWithValue("@FailureReason", failureReason); - } - - await _sqlRetryService.ExecuteSql(cmd, async (sql, cancellationToken) => await sql.ExecuteNonQueryAsync(cancellationToken), _logger, null, cancellationToken); - } - - internal async Task MergeResourcesPutTransactionInvisibleHistoryAsync(long transactionId, CancellationToken cancellationToken) - { - using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesPutTransactionInvisibleHistory", CommandType = CommandType.StoredProcedure }; - cmd.Parameters.AddWithValue("@TransactionId", transactionId); - await _sqlRetryService.ExecuteSql(cmd, async (sql, cancellationToken) => await sql.ExecuteNonQueryAsync(cancellationToken), _logger, null, cancellationToken); - } - - internal async Task MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken cancellationToken) - { - using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesAdvanceTransactionVisibility", CommandType = CommandType.StoredProcedure }; - var affectedRowsParam = new SqlParameter("@AffectedRows", SqlDbType.Int) { Direction = ParameterDirection.Output }; - cmd.Parameters.Add(affectedRowsParam); - await _sqlRetryService.ExecuteSql(cmd, async (sql, cancellationToken) => await sql.ExecuteNonQueryAsync(cancellationToken), _logger, null, cancellationToken); - var affectedRows = (int)affectedRowsParam.Value; - return affectedRows; - } - - internal async Task> MergeResourcesGetTimeoutTransactionsAsync(int timeoutSec, CancellationToken cancellationToken) - { - using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesGetTimeoutTransactions", CommandType = CommandType.StoredProcedure }; - cmd.Parameters.AddWithValue("@TimeoutSec", timeoutSec); - return await _sqlRetryService.ExecuteSqlDataReader(cmd, (reader) => { return reader.GetInt64(0); }, _logger, null, cancellationToken); - } - - internal async Task> GetTransactionsAsync(long startNotInclusiveTranId, long endInclusiveTranId, CancellationToken cancellationToken, DateTime? endDate = null) - { - using var cmd = new SqlCommand() { CommandText = "dbo.GetTransactions", CommandType = CommandType.StoredProcedure }; - cmd.Parameters.AddWithValue("@StartNotInclusiveTranId", startNotInclusiveTranId); - cmd.Parameters.AddWithValue("@EndInclusiveTranId", endInclusiveTranId); - if (endDate.HasValue) - { - cmd.Parameters.AddWithValue("@EndDate", endDate.Value); - } - - return await _sqlRetryService.ExecuteSqlDataReader( - cmd, - (reader) => - { - return (reader.Read(VLatest.Transactions.SurrogateIdRangeFirstValue, 0), - reader.Read(VLatest.Transactions.VisibleDate, 1), - reader.Read(VLatest.Transactions.InvisibleHistoryRemovedDate, 2)); - }, - _logger, - null, - cancellationToken); - } - internal async Task> GetResourcesByTransactionIdAsync(long transactionId, CancellationToken cancellationToken) { - using var cmd = new SqlCommand() { CommandText = "dbo.GetResourcesByTransactionId", CommandType = CommandType.StoredProcedure, CommandTimeout = 600 }; - cmd.Parameters.AddWithValue("@TransactionId", transactionId); - return await _sqlRetryService.ExecuteSqlDataReader(cmd, (reader) => { return ReadResourceWrapper(reader, true); }, _logger, null, cancellationToken); - } - - internal async Task> GetResourceDateKeysByTransactionIdAsync(long transactionId, CancellationToken cancellationToken) - { - using var cmd = new SqlCommand() { CommandText = "dbo.GetResourcesByTransactionId", CommandType = CommandType.StoredProcedure, CommandTimeout = 600 }; - cmd.Parameters.AddWithValue("@TransactionId", transactionId); - cmd.Parameters.AddWithValue("@IncludeHistory", true); - cmd.Parameters.AddWithValue("@ReturnResourceKeysOnly", true); - return await _sqlRetryService.ExecuteSqlDataReader(cmd, ReadResourceDateKeyWrapper, _logger, null, cancellationToken); + return await _sqlStoreClient.GetResourcesByTransactionIdAsync(transactionId, _compressedRawResourceConverter.ReadCompressedRawResource, _model.GetResourceTypeName, cancellationToken); } public async Task UpdateSearchParameterIndicesAsync(ResourceWrapper resource, WeakETag weakETag, CancellationToken cancellationToken) diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlStoreClient.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlStoreClient.cs new file mode 100644 index 0000000000..aa40394f82 --- /dev/null +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlStoreClient.cs @@ -0,0 +1,281 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Data; +using System.Globalization; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EnsureThat; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using Microsoft.Health.Fhir.Core.Features.Persistence; +using Microsoft.Health.Fhir.Core.Models; +using Microsoft.Health.Fhir.SqlServer.Features.Schema.Model; +using Microsoft.Health.SqlServer.Features.Storage; +using Task = System.Threading.Tasks.Task; + +namespace Microsoft.Health.Fhir.SqlServer.Features.Storage +{ + /// + /// Lightweight SQL store client. + /// + /// class used in logger + internal class SqlStoreClient + { + private readonly ISqlRetryService _sqlRetryService; + private readonly ILogger _logger; + + public SqlStoreClient(ISqlRetryService sqlRetryService, ILogger logger) + { + _sqlRetryService = EnsureArg.IsNotNull(sqlRetryService, nameof(sqlRetryService)); + _logger = EnsureArg.IsNotNull(logger, nameof(logger)); + } + + public async Task HardDeleteAsync(short resourceTypeId, string resourceId, bool keepCurrentVersion, bool isResourceChangeCaptureEnabled, CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.HardDeleteResource", CommandType = CommandType.StoredProcedure }; + cmd.Parameters.AddWithValue("@ResourceTypeId", resourceTypeId); + cmd.Parameters.AddWithValue("@ResourceId", resourceId); + cmd.Parameters.AddWithValue("@KeepCurrentVersion", keepCurrentVersion); + cmd.Parameters.AddWithValue("@IsResourceChangeCaptureEnabled", isResourceChangeCaptureEnabled); + await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken); + } + + internal async Task TryLogEvent(string process, string status, string text, DateTime? startDate, CancellationToken cancellationToken) + { + await _sqlRetryService.TryLogEvent(process, status, text, startDate, cancellationToken); + } + + public async Task> GetAsync(IReadOnlyList keys, Func getResourceTypeId, Func decompress, Func getResourceTypeName, CancellationToken cancellationToken) + { + if (keys == null || keys.Count == 0) + { + return new List(); + } + + using var cmd = new SqlCommand() { CommandText = "dbo.GetResources", CommandType = CommandType.StoredProcedure, CommandTimeout = 180 + (int)(2400.0 / 10000 * keys.Count) }; + var tvpRows = keys.Select(_ => new ResourceKeyListRow(getResourceTypeId(_.ResourceType), _.Id, _.VersionId == null ? null : int.TryParse(_.VersionId, out var version) ? version : int.MinValue)); + new ResourceKeyListTableValuedParameterDefinition("@ResourceKeys").AddParameter(cmd.Parameters, tvpRows); + var start = DateTime.UtcNow; + var timeoutRetries = 0; + while (true) + { + try + { + return await cmd.ExecuteReaderAsync(_sqlRetryService, (reader) => { return ReadResourceWrapper(reader, false, decompress, getResourceTypeName); }, _logger, cancellationToken); + } + catch (Exception e) + { + if (e.IsExecutionTimeout() && timeoutRetries++ < 3) + { + _logger.LogWarning(e, $"Error on {nameof(GetAsync)} timeoutRetries={{TimeoutRetries}}", timeoutRetries); + await TryLogEvent(nameof(GetAsync), "Warn", $"timeout retries={timeoutRetries}", start, cancellationToken); + await Task.Delay(5000, cancellationToken); + continue; + } + } + } + } + + public async Task> GetResourceVersionsAsync(IReadOnlyList keys, CancellationToken cancellationToken) + { + if (keys == null || keys.Count == 0) + { + return new List(); + } + + using var cmd = new SqlCommand() { CommandText = "dbo.GetResourceVersions", CommandType = CommandType.StoredProcedure, CommandTimeout = 180 + (int)(1200.0 / 10000 * keys.Count) }; + var tvpRows = keys.Select(_ => new ResourceDateKeyListRow(_.ResourceTypeId, _.Id, _.ResourceSurrogateId)); + new ResourceDateKeyListTableValuedParameterDefinition("@ResourceDateKeys").AddParameter(cmd.Parameters, tvpRows); + var table = VLatest.Resource; + var resources = await cmd.ExecuteReaderAsync( + _sqlRetryService, + (reader) => + { + var resourceTypeId = reader.Read(table.ResourceTypeId, 0); + var resourceId = reader.Read(table.ResourceId, 1); + var resourceSurrogateId = reader.Read(table.ResourceSurrogateId, 2); + var version = reader.Read(table.Version, 3); + return new ResourceDateKey(resourceTypeId, resourceId, resourceSurrogateId, version.ToString(CultureInfo.InvariantCulture)); + }, + _logger, + cancellationToken); + return resources; + } + + internal async Task> GetResourcesByTransactionIdAsync(long transactionId, Func decompress, Func getResourceTypeName, CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.GetResourcesByTransactionId", CommandType = CommandType.StoredProcedure, CommandTimeout = 600 }; + cmd.Parameters.AddWithValue("@TransactionId", transactionId); + return await cmd.ExecuteReaderAsync(_sqlRetryService, (reader) => { return ReadResourceWrapper(reader, true, decompress, getResourceTypeName); }, _logger, cancellationToken); + } + + private static ResourceWrapper ReadResourceWrapper(SqlDataReader reader, bool readRequestMethod, Func decompress, Func getResourceTypeName) + { + var resourceTypeId = reader.Read(VLatest.Resource.ResourceTypeId, 0); + var resourceId = reader.Read(VLatest.Resource.ResourceId, 1); + var resourceSurrogateId = reader.Read(VLatest.Resource.ResourceSurrogateId, 2); + var version = reader.Read(VLatest.Resource.Version, 3); + var isDeleted = reader.Read(VLatest.Resource.IsDeleted, 4); + var isHistory = reader.Read(VLatest.Resource.IsHistory, 5); + var rawResourceBytes = reader.GetSqlBytes(6).Value; + var isRawResourceMetaSet = reader.Read(VLatest.Resource.IsRawResourceMetaSet, 7); + var searchParamHash = reader.Read(VLatest.Resource.SearchParamHash, 8); + var requestMethod = readRequestMethod ? reader.Read(VLatest.Resource.RequestMethod, 9) : null; + + using var rawResourceStream = new MemoryStream(rawResourceBytes); + var rawResource = decompress(rawResourceStream); + + return new ResourceWrapper( + resourceId, + version.ToString(CultureInfo.InvariantCulture), + getResourceTypeName(resourceTypeId), + new RawResource(rawResource, FhirResourceFormat.Json, isMetaSet: isRawResourceMetaSet), + readRequestMethod ? new ResourceRequest(requestMethod) : null, + new DateTimeOffset(ResourceSurrogateIdHelper.ResourceSurrogateIdToLastUpdated(resourceSurrogateId), TimeSpan.Zero), + isDeleted, + searchIndices: null, + compartmentIndices: null, + lastModifiedClaims: null, + searchParameterHash: searchParamHash, + resourceSurrogateId: resourceSurrogateId) + { + IsHistory = isHistory, + }; + } + + internal async Task MergeResourcesPutTransactionHeartbeatAsync(long transactionId, TimeSpan heartbeatPeriod, CancellationToken cancellationToken) + { + try + { + using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesPutTransactionHeartbeat", CommandType = CommandType.StoredProcedure, CommandTimeout = (heartbeatPeriod.Seconds / 3) + 1 }; // +1 to avoid = SQL default timeout value + cmd.Parameters.AddWithValue("@TransactionId", transactionId); + await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken); + } + catch (Exception e) + { + _logger.LogWarning(e, $"Error from SQL database on {nameof(MergeResourcesPutTransactionHeartbeatAsync)}"); + } + } + + private ResourceDateKey ReadResourceDateKeyWrapper(SqlDataReader reader) + { + var resourceTypeId = reader.Read(VLatest.Resource.ResourceTypeId, 0); + var resourceId = reader.Read(VLatest.Resource.ResourceId, 1); + var resourceSurrogateId = reader.Read(VLatest.Resource.ResourceSurrogateId, 2); + var version = reader.Read(VLatest.Resource.Version, 3); + var isDeleted = reader.Read(VLatest.Resource.IsDeleted, 4); + + return new ResourceDateKey(resourceTypeId, resourceId, resourceSurrogateId, version.ToString(CultureInfo.InvariantCulture), isDeleted); + } + + internal async Task MergeResourcesGetTransactionVisibilityAsync(CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesGetTransactionVisibility", CommandType = CommandType.StoredProcedure }; + var transactionIdParam = new SqlParameter("@TransactionId", SqlDbType.BigInt) { Direction = ParameterDirection.Output }; + cmd.Parameters.Add(transactionIdParam); + await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken); + return (long)transactionIdParam.Value; + } + + internal async Task<(long TransactionId, int Sequence)> MergeResourcesBeginTransactionAsync(int resourceVersionCount, CancellationToken cancellationToken, DateTime? heartbeatDate = null) + { + using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesBeginTransaction", CommandType = CommandType.StoredProcedure }; + cmd.Parameters.AddWithValue("@Count", resourceVersionCount); + var transactionIdParam = new SqlParameter("@TransactionId", SqlDbType.BigInt) { Direction = ParameterDirection.Output }; + cmd.Parameters.Add(transactionIdParam); + var sequenceParam = new SqlParameter("@SequenceRangeFirstValue", SqlDbType.Int) { Direction = ParameterDirection.Output }; + cmd.Parameters.Add(sequenceParam); + if (heartbeatDate.HasValue) + { + cmd.Parameters.AddWithValue("@HeartbeatDate", heartbeatDate.Value); + } + + await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken); + return ((long)transactionIdParam.Value, (int)sequenceParam.Value); + } + + internal async Task MergeResourcesDeleteInvisibleHistory(long transactionId, CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesDeleteInvisibleHistory", CommandType = CommandType.StoredProcedure }; + cmd.Parameters.AddWithValue("@TransactionId", transactionId); + var affectedRowsParam = new SqlParameter("@affectedRows", SqlDbType.Int) { Direction = ParameterDirection.Output }; + cmd.Parameters.Add(affectedRowsParam); + await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken); + return (int)affectedRowsParam.Value; + } + + internal async Task MergeResourcesCommitTransactionAsync(long transactionId, string failureReason, CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesCommitTransaction", CommandType = CommandType.StoredProcedure }; + cmd.Parameters.AddWithValue("@TransactionId", transactionId); + if (failureReason != null) + { + cmd.Parameters.AddWithValue("@FailureReason", failureReason); + } + + await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken); + } + + internal async Task MergeResourcesPutTransactionInvisibleHistoryAsync(long transactionId, CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesPutTransactionInvisibleHistory", CommandType = CommandType.StoredProcedure }; + cmd.Parameters.AddWithValue("@TransactionId", transactionId); + await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken); + } + + internal async Task MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesAdvanceTransactionVisibility", CommandType = CommandType.StoredProcedure }; + var affectedRowsParam = new SqlParameter("@AffectedRows", SqlDbType.Int) { Direction = ParameterDirection.Output }; + cmd.Parameters.Add(affectedRowsParam); + await cmd.ExecuteNonQueryAsync(_sqlRetryService, _logger, cancellationToken); + var affectedRows = (int)affectedRowsParam.Value; + return affectedRows; + } + + internal async Task> MergeResourcesGetTimeoutTransactionsAsync(int timeoutSec, CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.MergeResourcesGetTimeoutTransactions", CommandType = CommandType.StoredProcedure }; + cmd.Parameters.AddWithValue("@TimeoutSec", timeoutSec); + return await cmd.ExecuteReaderAsync(_sqlRetryService, (reader) => { return reader.GetInt64(0); }, _logger, cancellationToken); + } + + internal async Task> GetTransactionsAsync(long startNotInclusiveTranId, long endInclusiveTranId, CancellationToken cancellationToken, DateTime? endDate = null) + { + using var cmd = new SqlCommand() { CommandText = "dbo.GetTransactions", CommandType = CommandType.StoredProcedure }; + cmd.Parameters.AddWithValue("@StartNotInclusiveTranId", startNotInclusiveTranId); + cmd.Parameters.AddWithValue("@EndInclusiveTranId", endInclusiveTranId); + if (endDate.HasValue) + { + cmd.Parameters.AddWithValue("@EndDate", endDate.Value); + } + + return await cmd.ExecuteReaderAsync( + _sqlRetryService, + (reader) => + { + return (reader.Read(VLatest.Transactions.SurrogateIdRangeFirstValue, 0), + reader.Read(VLatest.Transactions.VisibleDate, 1), + reader.Read(VLatest.Transactions.InvisibleHistoryRemovedDate, 2)); + }, + _logger, + cancellationToken); + } + + internal async Task> GetResourceDateKeysByTransactionIdAsync(long transactionId, CancellationToken cancellationToken) + { + using var cmd = new SqlCommand() { CommandText = "dbo.GetResourcesByTransactionId", CommandType = CommandType.StoredProcedure, CommandTimeout = 600 }; + cmd.Parameters.AddWithValue("@TransactionId", transactionId); + cmd.Parameters.AddWithValue("@IncludeHistory", true); + cmd.Parameters.AddWithValue("@ReturnResourceKeysOnly", true); + return await cmd.ExecuteReaderAsync(_sqlRetryService, ReadResourceDateKeyWrapper, _logger, cancellationToken); + } + } +} diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Watchdogs/InvisibleHistoryCleanupWatchdog.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Watchdogs/InvisibleHistoryCleanupWatchdog.cs index 3fe70f65b0..4a2a009072 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Watchdogs/InvisibleHistoryCleanupWatchdog.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Watchdogs/InvisibleHistoryCleanupWatchdog.cs @@ -48,10 +48,10 @@ protected override async Task ExecuteAsync() { _logger.LogInformation($"{Name}: starting..."); var lastTranId = await GetLastCleanedUpTransactionId(); - var visibility = await _store.MergeResourcesGetTransactionVisibilityAsync(_cancellationToken); + var visibility = await _store.StoreClient.MergeResourcesGetTransactionVisibilityAsync(_cancellationToken); _logger.LogInformation($"{Name}: last cleaned up transaction={lastTranId} visibility={visibility}."); - var transToClean = await _store.GetTransactionsAsync(lastTranId, visibility, _cancellationToken, DateTime.UtcNow.AddDays((-1) * _retentionPeriodDays)); + var transToClean = await _store.StoreClient.GetTransactionsAsync(lastTranId, visibility, _cancellationToken, DateTime.UtcNow.AddDays((-1) * _retentionPeriodDays)); _logger.LogInformation($"{Name}: found transactions={transToClean.Count}."); if (transToClean.Count == 0) @@ -63,11 +63,11 @@ protected override async Task ExecuteAsync() var totalRows = 0; foreach (var tran in transToClean.Where(_ => !_.InvisibleHistoryRemovedDate.HasValue).OrderBy(_ => _.TransactionId)) { - var rows = await _store.MergeResourcesDeleteInvisibleHistory(tran.TransactionId, _cancellationToken); + var rows = await _store.StoreClient.MergeResourcesDeleteInvisibleHistory(tran.TransactionId, _cancellationToken); _logger.LogInformation($"{Name}: transaction={tran.TransactionId} removed rows={rows}."); totalRows += rows; - await _store.MergeResourcesPutTransactionInvisibleHistoryAsync(tran.TransactionId, _cancellationToken); + await _store.StoreClient.MergeResourcesPutTransactionInvisibleHistoryAsync(tran.TransactionId, _cancellationToken); } await UpdateLastCleanedUpTransactionId(transToClean.Max(_ => _.TransactionId)); diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Watchdogs/TransactionWatchdog.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Watchdogs/TransactionWatchdog.cs index 812002458e..b6851d32ac 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Watchdogs/TransactionWatchdog.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Watchdogs/TransactionWatchdog.cs @@ -43,7 +43,7 @@ internal async Task StartAsync(CancellationToken cancellationToken) protected override async Task ExecuteAsync() { _logger.LogInformation("TransactionWatchdog starting..."); - var affectedRows = await _store.MergeResourcesAdvanceTransactionVisibilityAsync(_cancellationToken); + var affectedRows = await _store.StoreClient.MergeResourcesAdvanceTransactionVisibilityAsync(_cancellationToken); _logger.LogInformation("TransactionWatchdog advanced visibility on {Transactions} transactions.", affectedRows); if (affectedRows > 0) @@ -51,11 +51,11 @@ protected override async Task ExecuteAsync() return; } - var timeoutTransactions = await _store.MergeResourcesGetTimeoutTransactionsAsync((int)SqlServerFhirDataStore.MergeResourcesTransactionHeartbeatPeriod.TotalSeconds * 6, _cancellationToken); + var timeoutTransactions = await _store.StoreClient.MergeResourcesGetTimeoutTransactionsAsync((int)SqlServerFhirDataStore.MergeResourcesTransactionHeartbeatPeriod.TotalSeconds * 6, _cancellationToken); _logger.LogWarning("TransactionWatchdog found {Transactions} timed out transactions", timeoutTransactions.Count); if (timeoutTransactions.Count > 0) { - await _store.TryLogEvent("TransactionWatchdog", "Warn", $"found timed out transactions={timeoutTransactions.Count}", null, _cancellationToken); + await _store.StoreClient.TryLogEvent("TransactionWatchdog", "Warn", $"found timed out transactions={timeoutTransactions.Count}", null, _cancellationToken); } foreach (var tranId in timeoutTransactions) @@ -65,9 +65,9 @@ protected override async Task ExecuteAsync() var resources = await _store.GetResourcesByTransactionIdAsync(tranId, _cancellationToken); if (resources.Count == 0) { - await _store.MergeResourcesCommitTransactionAsync(tranId, "WD: 0 resources", _cancellationToken); + await _store.StoreClient.MergeResourcesCommitTransactionAsync(tranId, "WD: 0 resources", _cancellationToken); _logger.LogWarning("TransactionWatchdog committed transaction={Transaction}, resources=0", tranId); - await _store.TryLogEvent("TransactionWatchdog", "Warn", $"committed transaction={tranId}, resources=0", st, _cancellationToken); + await _store.StoreClient.TryLogEvent("TransactionWatchdog", "Warn", $"committed transaction={tranId}, resources=0", st, _cancellationToken); continue; } @@ -77,11 +77,11 @@ protected override async Task ExecuteAsync() } await _store.MergeResourcesWrapperAsync(tranId, false, resources.Select(_ => new MergeResourceWrapper(_, true, true)).ToList(), false, 0, _cancellationToken); - await _store.MergeResourcesCommitTransactionAsync(tranId, null, _cancellationToken); + await _store.StoreClient.MergeResourcesCommitTransactionAsync(tranId, null, _cancellationToken); _logger.LogWarning("TransactionWatchdog committed transaction={Transaction}, resources={Resources}", tranId, resources.Count); - await _store.TryLogEvent("TransactionWatchdog", "Warn", $"committed transaction={tranId}, resources={resources.Count}", st, _cancellationToken); + await _store.StoreClient.TryLogEvent("TransactionWatchdog", "Warn", $"committed transaction={tranId}, resources={resources.Count}", st, _cancellationToken); - affectedRows = await _store.MergeResourcesAdvanceTransactionVisibilityAsync(_cancellationToken); + affectedRows = await _store.StoreClient.MergeResourcesAdvanceTransactionVisibilityAsync(_cancellationToken); _logger.LogInformation("TransactionWatchdog advanced visibility on {Transactions} transactions.", affectedRows); } } diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/ChangeFeed/SqlServerFhirResourceChangeCaptureEnabledTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/ChangeFeed/SqlServerFhirResourceChangeCaptureEnabledTests.cs index f87b4cad7d..4b28acf9bc 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/ChangeFeed/SqlServerFhirResourceChangeCaptureEnabledTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/ChangeFeed/SqlServerFhirResourceChangeCaptureEnabledTests.cs @@ -125,7 +125,7 @@ public async Task GivenChangeCaptureEnabledAndNoVersionPolicy_AfterUpdating_Invi // check 2 records exist Assert.Equal(2, await GetCount()); - await store.MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken.None); // this logic is invoked by WD normally + await store.StoreClient.MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken.None); // this logic is invoked by WD normally await Task.Delay(5000); // check only 1 record remains @@ -161,7 +161,7 @@ public async Task GivenChangeCaptureEnabledAndNoVersionPolicy_AfterHardDeleting_ // check 1 record exist Assert.Equal(1, await GetCount()); - await store.MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken.None); // this logic is invoked by WD normally + await store.StoreClient.MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken.None); // this logic is invoked by WD normally await Task.Delay(5000); // check no records @@ -174,8 +174,8 @@ public async Task GivenChangeCaptureEnabledAndNoVersionPolicy_AfterUpdating_Hist EnableDatabaseLogging(); var store = (SqlServerFhirDataStore)_fixture.DataStore; - await store.MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken.None); // this logic is invoked by WD normally - var startTranId = await store.MergeResourcesGetTransactionVisibilityAsync(CancellationToken.None); + await store.StoreClient.MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken.None); // this logic is invoked by WD normally + var startTranId = await store.StoreClient.MergeResourcesGetTransactionVisibilityAsync(CancellationToken.None); var create = await _fixture.Mediator.CreateResourceAsync(Samples.GetDefaultOrganization()); Assert.Equal("1", create.VersionId); @@ -189,8 +189,8 @@ public async Task GivenChangeCaptureEnabledAndNoVersionPolicy_AfterUpdating_Hist var bundle = history.ToPoco(); Assert.Single(bundle.Entry); - await store.MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken.None); // this logic is invoked by WD normally - var endTranId = await store.MergeResourcesGetTransactionVisibilityAsync(CancellationToken.None); + await store.StoreClient.MergeResourcesAdvanceTransactionVisibilityAsync(CancellationToken.None); // this logic is invoked by WD normally + var endTranId = await store.StoreClient.MergeResourcesGetTransactionVisibilityAsync(CancellationToken.None); // old style TODO: Remove once events switch to new style var changeStore = new SqlServerFhirResourceChangeDataStore(_fixture.SqlConnectionWrapperFactory, NullLogger.Instance, _fixture.SchemaInformation); @@ -204,14 +204,14 @@ public async Task GivenChangeCaptureEnabledAndNoVersionPolicy_AfterUpdating_Hist Assert.Equal(ResourceChangeTypeUpdated, change.ResourceChangeTypeId); // new style - var trans = await store.GetTransactionsAsync(startTranId, endTranId, CancellationToken.None); + var trans = await store.StoreClient.GetTransactionsAsync(startTranId, endTranId, CancellationToken.None); Assert.Equal(2, trans.Count); - var resourceKeys = await store.GetResourceDateKeysByTransactionIdAsync(trans[0].TransactionId, CancellationToken.None); + var resourceKeys = await store.StoreClient.GetResourceDateKeysByTransactionIdAsync(trans[0].TransactionId, CancellationToken.None); Assert.Single(resourceKeys); Assert.Equal(create.Id, resourceKeys[0].Id); Assert.Equal("1", resourceKeys[0].VersionId); Assert.False(resourceKeys[0].IsDeleted); - resourceKeys = await store.GetResourceDateKeysByTransactionIdAsync(trans[1].TransactionId, CancellationToken.None); + resourceKeys = await store.StoreClient.GetResourceDateKeysByTransactionIdAsync(trans[1].TransactionId, CancellationToken.None); Assert.Single(resourceKeys); Assert.Equal("2", resourceKeys[0].VersionId); Assert.False(resourceKeys[0].IsDeleted); diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlRetryServiceTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlRetryServiceTests.cs index 60e31c1293..feabec7f6f 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlRetryServiceTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlRetryServiceTests.cs @@ -377,7 +377,7 @@ private async Task SingleConnectionRetryTest(Func testStor using var sqlCommand = new SqlCommand(); sqlCommand.CommandText = $"dbo.{storedProcedureName}"; - List result = await sqlRetryService.ExecuteSqlDataReader( + var result = await sqlRetryService.ExecuteReaderAsync( sqlCommand, testConnectionInitializationFailure ? ReaderToResult : ReaderToResultAndKillConnection, logger, @@ -415,7 +415,7 @@ private async Task AllConnectionRetriesTest(Func testStore try { _output.WriteLine($"{DateTime.Now:O}: Start executing ExecuteSqlDataReader."); - await sqlRetryService.ExecuteSqlDataReader( + await sqlRetryService.ExecuteReaderAsync( sqlCommand, testConnectionInitializationFailure ? ReaderToResult : ReaderToResultAndKillConnection, logger, @@ -463,12 +463,12 @@ private async Task SingleRetryTest(int sqlErrorNumber, Func result = await sqlRetryService.ExecuteSqlDataReader( - sqlCommand, + var result = await sqlCommand.ExecuteReaderAsync( + sqlRetryService, ReaderToResult, logger, - "log message", - CancellationToken.None); + CancellationToken.None, + "log message"); Assert.Equal(resultCount, result.Count); for (int i = 0; i < resultCount; i++) @@ -512,7 +512,7 @@ private async Task AllRetriesFailTest(int sqlErrorNumber, Func(() => sqlRetryService.ExecuteSqlDataReader( + ex = await Assert.ThrowsAsync(() => sqlRetryService.ExecuteReaderAsync( sqlCommand, ReaderToResult, logger, diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerWatchdogTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerWatchdogTests.cs index 4667417e12..a1bff85ef2 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerWatchdogTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerWatchdogTests.cs @@ -158,7 +158,7 @@ public async Task RollTransactionForward() cts.CancelAfter(TimeSpan.FromSeconds(60)); var factory = CreateResourceWrapperFactory(); - var tran = await _fixture.SqlServerFhirDataStore.MergeResourcesBeginTransactionAsync(1, cts.Token, DateTime.UtcNow.AddHours(-1)); // register timed out + var tran = await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesBeginTransactionAsync(1, cts.Token, DateTime.UtcNow.AddHours(-1)); // register timed out var patient = (Hl7.Fhir.Model.Patient)Samples.GetJsonSample("Patient").ToPoco(); patient.Id = Guid.NewGuid().ToString(); @@ -234,19 +234,19 @@ public async Task AdvanceVisibility() _testOutputHelper.WriteLine($"Acquired lease in {(DateTime.UtcNow - startTime).TotalSeconds} seconds."); // create 3 trans - var tran1 = await _fixture.SqlServerFhirDataStore.MergeResourcesBeginTransactionAsync(1, cts.Token, DateTime.UtcNow.AddHours(1)); - var tran2 = await _fixture.SqlServerFhirDataStore.MergeResourcesBeginTransactionAsync(1, cts.Token, DateTime.UtcNow.AddHours(1)); - var tran3 = await _fixture.SqlServerFhirDataStore.MergeResourcesBeginTransactionAsync(1, cts.Token, DateTime.UtcNow.AddHours(1)); - var visibility = await _fixture.SqlServerFhirDataStore.MergeResourcesGetTransactionVisibilityAsync(cts.Token); + var tran1 = await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesBeginTransactionAsync(1, cts.Token, DateTime.UtcNow.AddHours(1)); + var tran2 = await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesBeginTransactionAsync(1, cts.Token, DateTime.UtcNow.AddHours(1)); + var tran3 = await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesBeginTransactionAsync(1, cts.Token, DateTime.UtcNow.AddHours(1)); + var visibility = await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesGetTransactionVisibilityAsync(cts.Token); _testOutputHelper.WriteLine($"Visibility={visibility}"); Assert.Equal(-1, visibility); // commit 1 - await _fixture.SqlServerFhirDataStore.MergeResourcesCommitTransactionAsync(tran1.TransactionId, null, cts.Token); + await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesCommitTransactionAsync(tran1.TransactionId, null, cts.Token); _testOutputHelper.WriteLine($"Tran1={tran1.TransactionId} committed."); startTime = DateTime.UtcNow; - while ((visibility = await _fixture.SqlServerFhirDataStore.MergeResourcesGetTransactionVisibilityAsync(cts.Token)) != tran1.TransactionId && (DateTime.UtcNow - startTime).TotalSeconds < 10) + while ((visibility = await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesGetTransactionVisibilityAsync(cts.Token)) != tran1.TransactionId && (DateTime.UtcNow - startTime).TotalSeconds < 10) { await Task.Delay(TimeSpan.FromSeconds(0.1)); } @@ -255,11 +255,11 @@ public async Task AdvanceVisibility() Assert.Equal(tran1.TransactionId, visibility); // commit 3 - await _fixture.SqlServerFhirDataStore.MergeResourcesCommitTransactionAsync(tran3.TransactionId, null, cts.Token); + await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesCommitTransactionAsync(tran3.TransactionId, null, cts.Token); _testOutputHelper.WriteLine($"Tran3={tran3.TransactionId} committed."); startTime = DateTime.UtcNow; - while ((visibility = await _fixture.SqlServerFhirDataStore.MergeResourcesGetTransactionVisibilityAsync(cts.Token)) != tran2.TransactionId && (DateTime.UtcNow - startTime).TotalSeconds < 10) + while ((visibility = await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesGetTransactionVisibilityAsync(cts.Token)) != tran2.TransactionId && (DateTime.UtcNow - startTime).TotalSeconds < 10) { await Task.Delay(TimeSpan.FromSeconds(0.1)); } @@ -268,11 +268,11 @@ public async Task AdvanceVisibility() Assert.Equal(tran1.TransactionId, visibility); // remains t1 though t3 is committed. // commit 2 - await _fixture.SqlServerFhirDataStore.MergeResourcesCommitTransactionAsync(tran2.TransactionId, null, cts.Token); + await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesCommitTransactionAsync(tran2.TransactionId, null, cts.Token); _testOutputHelper.WriteLine($"Tran2={tran2.TransactionId} committed."); startTime = DateTime.UtcNow; - while ((visibility = await _fixture.SqlServerFhirDataStore.MergeResourcesGetTransactionVisibilityAsync(cts.Token)) != tran3.TransactionId && (DateTime.UtcNow - startTime).TotalSeconds < 10) + while ((visibility = await _fixture.SqlServerFhirDataStore.StoreClient.MergeResourcesGetTransactionVisibilityAsync(cts.Token)) != tran3.TransactionId && (DateTime.UtcNow - startTime).TotalSeconds < 10) { await Task.Delay(TimeSpan.FromSeconds(0.1)); } diff --git a/tools/EventsReader/App.config b/tools/EventsReader/App.config new file mode 100644 index 0000000000..41a538e164 --- /dev/null +++ b/tools/EventsReader/App.config @@ -0,0 +1,6 @@ + + + + + + diff --git a/tools/EventsReader/EventsReader.csproj b/tools/EventsReader/EventsReader.csproj new file mode 100644 index 0000000000..922d39391a --- /dev/null +++ b/tools/EventsReader/EventsReader.csproj @@ -0,0 +1,19 @@ + + + + Exe + Microsoft.Health.Internal.Fhir.EventsReader + Microsoft.Health.Internal.Fhir.EventsReader + enable + true + + + + + + + + + + + diff --git a/tools/EventsReader/Program.cs b/tools/EventsReader/Program.cs new file mode 100644 index 0000000000..5ad79f483d --- /dev/null +++ b/tools/EventsReader/Program.cs @@ -0,0 +1,64 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Configuration; +using System.Diagnostics; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Health.Fhir.SqlServer.Features.Storage; +using Microsoft.Health.SqlServer; + +namespace Microsoft.Health.Internal.Fhir.EventsReader +{ + public static class Program + { + private static readonly string _connectionString = ConfigurationManager.ConnectionStrings["Database"].ConnectionString; + private static SqlRetryService _sqlRetryService; + private static SqlStoreClient _store; + + public static void Main() + { + ISqlConnectionBuilder iSqlConnectionBuilder = new SqlConnectionBuilder(_connectionString); + _sqlRetryService = SqlRetryService.GetInstance(iSqlConnectionBuilder); + _store = new SqlStoreClient(_sqlRetryService, NullLogger.Instance); + + var totalsKeys = 0L; + var totalTrans = 0; + var sw = Stopwatch.StartNew(); + var swReport = Stopwatch.StartNew(); + var visibility = 0L; + while (true) + { + Thread.Sleep(3000); + var currentVisibility = _store.MergeResourcesGetTransactionVisibilityAsync(CancellationToken.None).Result; + if (currentVisibility > visibility) + { + var transactions = _store.GetTransactionsAsync(visibility, currentVisibility, CancellationToken.None).Result; + Parallel.ForEach(transactions, new ParallelOptions { MaxDegreeOfParallelism = 64 }, transaction => + { + var keys = _store.GetResourceDateKeysByTransactionIdAsync(transaction.TransactionId, CancellationToken.None).Result; + Interlocked.Add(ref totalsKeys, keys.Count); + Interlocked.Increment(ref totalTrans); + + if (swReport.Elapsed.TotalSeconds > 60) + { + lock (swReport) + { + if (swReport.Elapsed.TotalSeconds > 60) + { + Console.WriteLine($"secs={(int)sw.Elapsed.TotalSeconds} trans={totalTrans} total={totalsKeys} speed={totalsKeys / 1000.0 / sw.Elapsed.TotalSeconds} K KPS."); + swReport.Restart(); + } + } + } + }); + + Console.WriteLine($"secs={(int)sw.Elapsed.TotalSeconds} trans={totalTrans} total={totalsKeys} speed={totalsKeys / 1000.0 / sw.Elapsed.TotalSeconds} K KPS."); + + visibility = currentVisibility; + } + } + } + } +} diff --git a/tools/EventsReader/SqlConnectionBuilder.cs b/tools/EventsReader/SqlConnectionBuilder.cs new file mode 100644 index 0000000000..e3231d0f95 --- /dev/null +++ b/tools/EventsReader/SqlConnectionBuilder.cs @@ -0,0 +1,27 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using Microsoft.Data.SqlClient; +using Microsoft.Health.SqlServer; + +namespace Microsoft.Health.Internal.Fhir.EventsReader +{ + public class SqlConnectionBuilder : ISqlConnectionBuilder + { + private readonly string _connectionString; + + public SqlConnectionBuilder(string connectionString) + { + _connectionString = connectionString; + } + + #pragma warning disable CA2000 + public async Task GetSqlConnectionAsync(string initialCatalog = null, CancellationToken cancellationToken = default) + { + await Task.CompletedTask; + return new SqlConnection(_connectionString); + } + } +}