Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.
/ NuGet.Jobs Public archive

Add retries to queries run by ImportAzureCdnStatistics #763

Merged
merged 8 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 21 additions & 80 deletions src/NuGet.Jobs.Common/Extensions/DapperExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,108 +4,49 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Dapper;
using NuGet.Jobs;

// ReSharper disable once CheckNamespace
namespace System.Data.SqlClient
{
public static class DapperExtensions
{
public static async Task<IEnumerable<T>> QueryWithRetryAsync<T>(
public static Task<IEnumerable<T>> QueryWithRetryAsync<T>(
this SqlConnection connection,
string sql,
object param = null,
IDbTransaction transaction = null,
TimeSpan? commandTimeout = null,
CommandType? commandType = null,
int maxRetries = 10,
Action onRetry = null)
int maxRetries = SqlRetryUtility.DefaultMaxRetries)
{
for (int attempt = 0; attempt < maxRetries; attempt++)
{
try
{
return await connection.QueryAsync<T>(sql, param, transaction, (int?)commandTimeout?.TotalSeconds, commandType);
}
catch (SqlException ex)
{
switch (ex.Number)
{
case -2: // Client Timeout
case 701: // Out of Memory
case 1204: // Lock Issue
case 1205: // >>> Deadlock Victim
case 1222: // Lock Request Timeout
case 8645: // Timeout waiting for memory resource
case 8651: // Low memory condition
// Ignore
if (attempt < (maxRetries - 1))
{
if (onRetry != null)
{
onRetry();
}
}
else
{
throw;
}
break;
default:
throw;
}
}
}

throw new Exception("Unknown error! Should have thrown the final timeout!");
return SqlRetryUtility.RetryReadOnlySql(
() => connection.QueryAsync<T>(
sql,
param,
transaction,
(int?)commandTimeout?.TotalSeconds,
commandType),
maxRetries);
}

public static async Task<T> ExecuteScalarWithRetryAsync<T>(
public static Task<T> ExecuteScalarWithRetryAsync<T>(
this SqlConnection connection,
string sql,
object param = null,
IDbTransaction transaction = null,
TimeSpan? commandTimeout = null,
CommandType? commandType = null,
int maxRetries = 10,
Action onRetry = null)
int maxRetries = SqlRetryUtility.DefaultMaxRetries)
{
for (int attempt = 0; attempt < maxRetries; attempt++)
{
try
{
return await connection.ExecuteScalarAsync<T>(sql, param, transaction, (int?)commandTimeout?.TotalSeconds, commandType);
}
catch (SqlException ex)
{
switch (ex.Number)
{
case -2: // Client Timeout
case 701: // Out of Memory
case 1204: // Lock Issue
case 1205: // >>> Deadlock Victim
case 1222: // Lock Request Timeout
case 8645: // Timeout waiting for memory resource
case 8651: // Low memory condition
// Ignore
if (attempt < (maxRetries - 1))
{
if (onRetry != null)
{
onRetry();
}
}
else
{
throw;
}
break;
default:
throw;
}
}
}

throw new Exception("Unknown error! Should have thrown the final timeout!");
return SqlRetryUtility.RetryReadOnlySql(
() => connection.ExecuteScalarAsync<T>(
sql,
param,
transaction,
(int?)commandTimeout?.TotalSeconds,
commandType),
maxRetries);
}
}
}
1 change: 1 addition & 0 deletions src/NuGet.Jobs.Common/NuGet.Jobs.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
<Compile Include="JobRunner.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Properties\AssemblyInfo.*.cs" />
<Compile Include="SqlRetryUtility.cs" />
<Compile Include="StorageHelpers.cs" />
<Compile Include="Strings.Designer.cs" />
</ItemGroup>
Expand Down
87 changes: 87 additions & 0 deletions src/NuGet.Jobs.Common/SqlRetryUtility.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;

namespace NuGet.Jobs
{
public static class SqlRetryUtility
{
public const int DefaultMaxRetries = 10;

/// <summary>
/// If a SQL query fails due to one of these exception numbers, it should always be retried.
/// </summary>
/// <remarks>
/// There are many more retriable SQL exception numbers, but these are ones we've encountered in the past.
/// If we encounter more in the future, we should add them here as well.
/// </remarks>
private static readonly IReadOnlyCollection<int> RetriableSqlExceptionNumbers = new[]
{
701, // Out of memory
1204, // Lock issue
1205, // Deadlock victim
1222, // Lock request timeout
8645, // Timeout waiting for memory resource
8651, // Low memory condition
};

/// <summary>
/// If a SQL query fails due to one of these exception numbers, it should always be retried if the query is read-only.
/// </summary>
/// <remarks>
/// The exception numbers on this list that are not on <see cref="RetriableSqlExceptionNumbers"/> do not explicitly state that the query did not complete.
/// Retrying them may cause duplicate write operations.
/// </remarks>
private static readonly IReadOnlyCollection<int> RetriableReadOnlySqlExceptionNumbers = RetriableSqlExceptionNumbers.Concat(new[]
{
-2, // Client timeout
}).ToList();

public static Task RetrySql(
Func<Task> executeSql,
int maxRetries = DefaultMaxRetries)
{
return RetrySqlInternal(
executeSql,
RetriableSqlExceptionNumbers,
maxRetries);
}

public static Task<T> RetryReadOnlySql<T>(
Func<Task<T>> executeSql,
int maxRetries = DefaultMaxRetries)
{
return RetrySqlInternal(
executeSql,
RetriableReadOnlySqlExceptionNumbers,
maxRetries);
}

private static T RetrySqlInternal<T>(
Func<T> executeSql,
IReadOnlyCollection<int> retriableExceptionNumbers,
int maxRetries)
{
for (int attempt = 0; attempt < maxRetries; attempt++)
{
try
{
return executeSql();
scottbommarito marked this conversation as resolved.
Show resolved Hide resolved
}
catch (SqlException ex) when (attempt < maxRetries - 1 && retriableExceptionNumbers.Contains(ex.Number))
{
continue;
}
}

// Ideally we should never get to this point, because if all iterations of the loop throw, we should rethrow the last exception encountered.
// However, we need to have this throw here so this code will compile.
throw new InvalidOperationException("Failed to execute SQL call in retry loop.");
}
}
}
115 changes: 63 additions & 52 deletions src/Stats.ImportAzureCdnStatistics/Warehouse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NuGet.Jobs;
using Stats.AzureCdnLogs.Common;
using Stopwatch = System.Diagnostics.Stopwatch;

Expand Down Expand Up @@ -48,48 +49,53 @@ public async Task InsertDownloadFactsAsync(DataTable downloadFactsDataTable, str
_logger.LogDebug("Inserting into facts table...");
var stopwatch = Stopwatch.StartNew();

using (var connection = await _openStatisticsSqlConnectionAsync())
using (var transaction = connection.BeginTransaction(IsolationLevel.Snapshot))
try
{
try
await SqlRetryUtility.RetrySql(
() => RunInsertDownloadFactsQueryAsync(downloadFactsDataTable, logFileName));

stopwatch.Stop();
ApplicationInsightsHelper.TrackMetric("Insert facts duration (ms)", stopwatch.ElapsedMilliseconds, logFileName);
}
catch (Exception exception)
{
if (stopwatch.IsRunning)
{
var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction)
{
BatchSize = 25000,
DestinationTableName = downloadFactsDataTable.TableName,
BulkCopyTimeout = _defaultCommandTimeout
};
stopwatch.Stop();
}

// This avoids identity insert issues, as these are db-generated.
foreach (DataColumn column in downloadFactsDataTable.Columns)
{
bulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
_logger.LogError("Failed to insert download facts for {LogFile}.", logFileName);
scottbommarito marked this conversation as resolved.
Show resolved Hide resolved

await bulkCopy.WriteToServerAsync(downloadFactsDataTable);
ApplicationInsightsHelper.TrackException(exception, logFileName);

transaction.Commit();
throw;
}

stopwatch.Stop();
ApplicationInsightsHelper.TrackMetric("Insert facts duration (ms)", stopwatch.ElapsedMilliseconds, logFileName);
}
catch (Exception exception)
_logger.LogDebug(" DONE");
}

private async Task RunInsertDownloadFactsQueryAsync(DataTable downloadFactsDataTable, string logFileName)
{
using (var connection = await _openStatisticsSqlConnectionAsync())
using (var transaction = connection.BeginTransaction(IsolationLevel.Snapshot))
{
var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction)
{
if (stopwatch.IsRunning)
{
stopwatch.Stop();
}
BatchSize = 25000,
DestinationTableName = downloadFactsDataTable.TableName,
BulkCopyTimeout = _defaultCommandTimeout
};

_logger.LogError("Failed to insert download facts for {LogFile}.", logFileName);
// This avoids identity insert issues, as these are db-generated.
foreach (DataColumn column in downloadFactsDataTable.Columns)
loic-sharma marked this conversation as resolved.
Show resolved Hide resolved
{
bulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}

ApplicationInsightsHelper.TrackException(exception, logFileName);
await bulkCopy.WriteToServerAsync(downloadFactsDataTable);

transaction.Rollback();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transaction will be automatically rolled back when an exception is thrown at await bulkCopy.WriteToServerAsync(downloadFactsDataTable);...no need for this call.

throw;
}
transaction.Commit();
}

_logger.LogDebug(" DONE");
}

public async Task<DataTable> CreateAsync(IReadOnlyCollection<PackageStatistics> sourceData, string logFileName)
Expand Down Expand Up @@ -342,35 +348,40 @@ public async Task StoreLogFileAggregatesAsync(LogFileAggregates logFileAggregate
{
_logger.LogDebug("Storing log file aggregates...");

using (var connection = await _openStatisticsSqlConnectionAsync())
try
{
try
{
var command = connection.CreateCommand();
command.CommandText = "[dbo].[StoreLogFileAggregates]";
command.CommandTimeout = _defaultCommandTimeout;
command.CommandType = CommandType.StoredProcedure;

var parameterValue = CreateDataTableForLogFileAggregatesPackageDownloadsByDate(logFileAggregates);
var parameter = command.Parameters.AddWithValue("packageDownloadsByDate", parameterValue);
parameter.SqlDbType = SqlDbType.Structured;
parameter.TypeName = "[dbo].[LogFileAggregatesPackageDownloadsByDateTableType]";

await command.ExecuteNonQueryAsync();
}
catch (Exception exception)
{
_logger.LogError("Failed to insert log file aggregates for {LogFile}.", logFileAggregates.LogFileName);
await SqlRetryUtility.RetrySql(() => RunStoreLogFileAggregatesQueryAsync(logFileAggregates));
}
catch (Exception exception)
{
_logger.LogError("Failed to insert log file aggregates for {LogFile}.", logFileAggregates.LogFileName);
scottbommarito marked this conversation as resolved.
Show resolved Hide resolved

ApplicationInsightsHelper.TrackException(exception, logFileAggregates.LogFileName);
ApplicationInsightsHelper.TrackException(exception, logFileAggregates.LogFileName);

throw;
}
throw;
}

_logger.LogDebug(" DONE");
}

private async Task RunStoreLogFileAggregatesQueryAsync(LogFileAggregates logFileAggregates)
{
using (var connection = await _openStatisticsSqlConnectionAsync())
{
var command = connection.CreateCommand();
command.CommandText = "[dbo].[StoreLogFileAggregates]";
command.CommandTimeout = _defaultCommandTimeout;
command.CommandType = CommandType.StoredProcedure;

var parameterValue = CreateDataTableForLogFileAggregatesPackageDownloadsByDate(logFileAggregates);
var parameter = command.Parameters.AddWithValue("packageDownloadsByDate", parameterValue);
parameter.SqlDbType = SqlDbType.Structured;
parameter.TypeName = "[dbo].[LogFileAggregatesPackageDownloadsByDateTableType]";

await command.ExecuteNonQueryAsync();
}
}

public async Task<IReadOnlyCollection<string>> GetAlreadyAggregatedLogFilesAsync()
{
_logger.LogDebug("Retrieving already processed log files...");
Expand Down