diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtention.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtention.cs index 4b29487af9..5d0bfd38d2 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtention.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/ExceptionExtention.cs @@ -15,7 +15,8 @@ internal static bool IsRetriable(this Exception e) || HasInternalSqlErrorPattern(str) || HasDatabaseAvailabilityPattern(str) || HasDatabaseOverloadPattern(str) - || HasDeadlockErrorPattern(str); + || HasDeadlockErrorPattern(str) + || HasIncorrectAsyncCallPattern(str); } internal static bool IsExecutionTimeout(this Exception e) @@ -103,5 +104,11 @@ private static bool HasDatabaseOverloadPattern(string str) ////The request limit for the database is 200 and has been reached. } + + // TODO: Remove when source of this exception is identified + private static bool HasIncorrectAsyncCallPattern(string str) + { + return str.Contains("This method may not be called when another read operation is pending", StringComparison.OrdinalIgnoreCase); + } } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs index 77d1efbb56..6ee29a60b2 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs @@ -89,6 +89,7 @@ public async Task Import(Channel input private void ImportResourcesInBuffer(List resources, List errors, ImportMode importMode, CancellationToken cancellationToken, ref long succeededCount, ref long failedCount, ref long processedBytes) { var retries = 0; + var timeoutRetries = 0; var loaded = new List(); var conflicts = new List(); while (true) @@ -99,7 +100,7 @@ private void ImportResourcesInBuffer(List resources, List(); conflicts = new List(); - ImportResourcesInBufferMain(resources, loaded, conflicts, importMode, cancellationToken).Wait(); + ImportResourcesInBufferMain(resources, loaded, conflicts, importMode, timeoutRetries, cancellationToken).Wait(); break; } catch (Exception e) @@ -109,23 +110,20 @@ private void ImportResourcesInBuffer(List resources, List resources, List resources, List loaded, List conflicts, ImportMode importMode, CancellationToken cancellationToken) + private async Task ImportResourcesInBufferMain(List resources, List loaded, List conflicts, ImportMode importMode, int timeoutRetries, CancellationToken cancellationToken) { var goodResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList(); if (importMode == ImportMode.InitialLoad) @@ -151,7 +149,7 @@ private async Task ImportResourcesInBufferMain(List resources, L var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.OrderBy(_ => _.ResourceWrapper.LastModified).First()).ToList(); var current = new HashSet((await _store.GetAsync(inputDedupped.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).Select(_ => _.ToResourceKey(true))); loaded.AddRange(inputDedupped.Where(i => !current.TryGetValue(i.ResourceWrapper.ToResourceKey(true), out _))); - await MergeResourcesAsync(loaded, cancellationToken); + await MergeResourcesAsync(loaded, timeoutRetries, cancellationToken); } else { @@ -163,7 +161,7 @@ private async Task ImportResourcesInBufferMain(List resources, L var inputDeduppedWithVersions = inputDedupped.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList(); var currentKeys = new HashSet((await _store.GetAsync(inputDeduppedWithVersions.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).Select(_ => _.ToResourceKey())); loaded.AddRange(inputDeduppedWithVersions.Where(i => !currentKeys.TryGetValue(i.ResourceWrapper.ToResourceKey(), out _)).OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified)); // sorting is used in merge to set isHistory - await MergeResourcesAsync(loaded, cancellationToken); + await MergeResourcesAsync(loaded, timeoutRetries, cancellationToken); // 2 - if versions were not specified they have to be assigned as next based on union of input and database. // assume that only one unassigned version is provided for a given resource as we cannot guarantee processing order across parallel file streams anyway @@ -198,16 +196,16 @@ private async Task ImportResourcesInBufferMain(List resources, L } var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts); // some resources might get version assigned - await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), cancellationToken); - await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), cancellationToken); + await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), timeoutRetries, cancellationToken); + await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), timeoutRetries, cancellationToken); loaded.AddRange(inputDeduppedNoVersionNoConflict); } } - private async Task MergeResourcesAsync(IEnumerable resources, CancellationToken cancellationToken) + private async Task MergeResourcesAsync(IEnumerable resources, int timeoutRetries, CancellationToken cancellationToken) { var input = resources.Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleOperationId: null)).ToList(); - await _store.MergeInternalAsync(input, cancellationToken); + await _store.MergeInternalAsync(input, timeoutRetries, cancellationToken); } private void AppendErrorsToBuffer(IEnumerable dups, IEnumerable conflicts, List importErrorBuffer) diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs index b7e9fcc2ce..f78832c606 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs @@ -114,7 +114,7 @@ public async Task> MergeInternalAsync(IReadOnlyList resources, CancellationToken cancellationToken) + internal async Task> MergeInternalAsync(IReadOnlyList resources, int timeoutRetries, CancellationToken cancellationToken) { var results = new Dictionary(); if (resources == null || resources.Count == 0) @@ -319,7 +316,7 @@ internal async Task