Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retries on async calls and timeouts #3336

Merged
merged 6 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ internal static bool IsRetriable(this Exception e)
|| HasInternalSqlErrorPattern(str)
|| HasDatabaseAvailabilityPattern(str)
|| HasDatabaseOverloadPattern(str)
|| HasDeadlockErrorPattern(str);
|| HasDeadlockErrorPattern(str)
|| HasIncorrectAsyncCallPattern(str);
}

internal static bool IsExecutionTimeout(this Exception e)
Expand Down Expand Up @@ -103,5 +104,11 @@ private static bool HasDatabaseOverloadPattern(string str)

////The request limit for the database is 200 and has been reached.
}

// TODO: Remove when source of this exception is identified
private static bool HasIncorrectAsyncCallPattern(string str)
{
return str.Contains("This method may not be called when another read operation is pending", StringComparison.OrdinalIgnoreCase);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public async Task<ImportProcessingProgress> Import(Channel<ImportResource> input
private void ImportResourcesInBuffer(List<ImportResource> resources, List<string> errors, ImportMode importMode, CancellationToken cancellationToken, ref long succeededCount, ref long failedCount, ref long processedBytes)
{
var retries = 0;
var timeoutRetries = 0;
var loaded = new List<ImportResource>();
var conflicts = new List<ImportResource>();
while (true)
Expand All @@ -99,7 +100,7 @@ private void ImportResourcesInBuffer(List<ImportResource> resources, List<string
mergeStart = DateTime.UtcNow;
loaded = new List<ImportResource>();
conflicts = new List<ImportResource>();
ImportResourcesInBufferMain(resources, loaded, conflicts, importMode, cancellationToken).Wait();
ImportResourcesInBufferMain(resources, loaded, conflicts, importMode, timeoutRetries, cancellationToken).Wait();
break;
}
catch (Exception e)
Expand All @@ -109,23 +110,20 @@ private void ImportResourcesInBuffer(List<ImportResource> resources, List<string
var isExecutionTimeout = false;
if ((sqlEx != null && sqlEx.Number == SqlErrorCodes.Conflict && retries++ < 30)
|| (isRetriable = e.IsRetriable()) // this should allow to deal with intermittent database errors.
|| ((isExecutionTimeout = e.IsExecutionTimeout()) && retries++ < 3)) // timeouts happen once in a while on highly loaded databases.
|| ((isExecutionTimeout = e.IsExecutionTimeout()) && timeoutRetries++ < 3)) // timeouts happen once in a while on highly loaded databases.
{
_logger.LogWarning(e, $"Error on {nameof(ImportResourcesInBufferMain)} retries={{Retries}}", retries);
_logger.LogWarning(e, $"Error on {nameof(ImportResourcesInBufferMain)} retries={{Retries}} timeoutRetries={{TimeoutRetries}}", retries, timeoutRetries);
if (isRetriable || isExecutionTimeout) // others are logged in SQL by merge stored procedure
{
_store.TryLogEvent(nameof(ImportResourcesInBufferMain), "Warn", $"retries={retries} error={e}", mergeStart, cancellationToken).Wait();
_store.TryLogEvent(nameof(ImportResourcesInBufferMain), "Warn", $"retries={retries} timeoutRetries={timeoutRetries} error={e}", mergeStart, cancellationToken).Wait();
}

Task.Delay(5000, cancellationToken);
continue;
}

_logger.LogError(e, $"Error on {nameof(ImportResourcesInBufferMain)} retries={{Retries}}", retries);
if (sqlEx != null)
{
_store.TryLogEvent(nameof(ImportResourcesInBufferMain), "Error", $"retries={retries} error={e}", mergeStart, cancellationToken).Wait();
}
_logger.LogError(e, $"Error on {nameof(ImportResourcesInBufferMain)} retries={{Retries}} timeoutRetries={{TimeoutRetries}}", retries, timeoutRetries);
_store.TryLogEvent(nameof(ImportResourcesInBufferMain), "Error", $"retries={retries} timeoutRetries={timeoutRetries} error={e}", mergeStart, cancellationToken).Wait();

throw;
}
Expand All @@ -143,15 +141,15 @@ private void ImportResourcesInBuffer(List<ImportResource> resources, List<string
resources.Clear();
}

private async Task ImportResourcesInBufferMain(List<ImportResource> resources, List<ImportResource> loaded, List<ImportResource> conflicts, ImportMode importMode, CancellationToken cancellationToken)
private async Task ImportResourcesInBufferMain(List<ImportResource> resources, List<ImportResource> loaded, List<ImportResource> conflicts, ImportMode importMode, int timeoutRetries, CancellationToken cancellationToken)
{
var goodResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList();
if (importMode == ImportMode.InitialLoad)
{
var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.OrderBy(_ => _.ResourceWrapper.LastModified).First()).ToList();
var current = new HashSet<ResourceKey>((await _store.GetAsync(inputDedupped.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).Select(_ => _.ToResourceKey(true)));
loaded.AddRange(inputDedupped.Where(i => !current.TryGetValue(i.ResourceWrapper.ToResourceKey(true), out _)));
await MergeResourcesAsync(loaded, cancellationToken);
await MergeResourcesAsync(loaded, timeoutRetries, cancellationToken);
}
else
{
Expand All @@ -163,7 +161,7 @@ private async Task ImportResourcesInBufferMain(List<ImportResource> resources, L
var inputDeduppedWithVersions = inputDedupped.Where(_ => _.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey()).Select(_ => _.First()).ToList();
var currentKeys = new HashSet<ResourceKey>((await _store.GetAsync(inputDeduppedWithVersions.Select(_ => _.ResourceWrapper.ToResourceKey()).ToList(), cancellationToken)).Select(_ => _.ToResourceKey()));
loaded.AddRange(inputDeduppedWithVersions.Where(i => !currentKeys.TryGetValue(i.ResourceWrapper.ToResourceKey(), out _)).OrderBy(_ => _.ResourceWrapper.ResourceId).ThenByDescending(_ => _.ResourceWrapper.LastModified)); // sorting is used in merge to set isHistory
await MergeResourcesAsync(loaded, cancellationToken);
await MergeResourcesAsync(loaded, timeoutRetries, cancellationToken);

// 2 - if versions were not specified they have to be assigned as next based on union of input and database.
// assume that only one unassigned version is provided for a given resource as we cannot guarantee processing order across parallel file streams anyway
Expand Down Expand Up @@ -198,16 +196,16 @@ private async Task ImportResourcesInBufferMain(List<ImportResource> resources, L
}

var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts); // some resources might get version assigned
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), cancellationToken);
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), cancellationToken);
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), timeoutRetries, cancellationToken);
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), timeoutRetries, cancellationToken);
loaded.AddRange(inputDeduppedNoVersionNoConflict);
}
}

private async Task MergeResourcesAsync(IEnumerable<ImportResource> resources, CancellationToken cancellationToken)
private async Task MergeResourcesAsync(IEnumerable<ImportResource> resources, int timeoutRetries, CancellationToken cancellationToken)
{
var input = resources.Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleOperationId: null)).ToList();
await _store.MergeInternalAsync(input, cancellationToken);
await _store.MergeInternalAsync(input, timeoutRetries, cancellationToken);
}

private void AppendErrorsToBuffer(IEnumerable<ImportResource> dups, IEnumerable<ImportResource> conflicts, List<string> importErrorBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOu
try
{
mergeStart = DateTime.UtcNow;
var results = await MergeInternalAsync(resources, cancellationToken);
var results = await MergeInternalAsync(resources, 0, cancellationToken); // TODO: Pass correct retries value once we start supporting retries
return results;
}
catch (Exception e)
Expand All @@ -140,18 +140,15 @@ public async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOu
}

_logger.LogError(e, $"Error from SQL database on {nameof(MergeAsync)} retries={{Retries}}", retries);
if (sqlEx != null)
{
await TryLogEvent(nameof(MergeAsync), "Error", $"retries={retries}, error={sqlEx}", mergeStart, cancellationToken);
}
await TryLogEvent(nameof(MergeAsync), "Error", $"retries={retries}, error={sqlEx}", mergeStart, cancellationToken);

throw trueEx;
}
}
}

// Split in a separate method to allow special logic in $import.
internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>> MergeInternalAsync(IReadOnlyList<ResourceWrapperOperation> resources, CancellationToken cancellationToken)
internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>> MergeInternalAsync(IReadOnlyList<ResourceWrapperOperation> resources, int timeoutRetries, CancellationToken cancellationToken)
{
var results = new Dictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>();
if (resources == null || resources.Count == 0)
Expand Down Expand Up @@ -319,7 +316,7 @@ internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperation
RaiseExceptionOnConflict: true,
IsResourceChangeCaptureEnabled: _coreFeatures.SupportsResourceChangeCapture,
tableValuedParameters: _mergeResourcesTvpGeneratorVLatest.Generate(mergeWrappers));
cmd.CommandTimeout = 300 + (int)(3600.0 / 10000 * mergeWrappers.Count);
cmd.CommandTimeout = 300 + (int)(3600.0 / 10000 * (timeoutRetries + 1) * mergeWrappers.Count);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}

Expand Down