Skip to content

Commit

Permalink
Merge pull request #47 from agoda-com/executeasync-with-cancellation-…
Browse files Browse the repository at this point in the history
…token

Fixed CancellationTokenSource Issue
  • Loading branch information
szaboopeeter authored Mar 5, 2024
2 parents 6f1c823 + 4e9dc6f commit d0c6daa
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 60 deletions.
22 changes: 0 additions & 22 deletions Agoda.Frameworks.DB.Tests/DbRepositoryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -666,28 +666,6 @@ public void QueryMultipleAsync_Retry_Failure()
Assert.IsNotNull(_onQueryCompleteEvents[0].Error);
Assert.IsNotNull(_onQueryCompleteEvents[1].Error);
}

[Test]
public async Task ExecuteReaderAsync_CancellationToken_Cancelled()
{
var cancellationToken = new CancellationTokenSource(TimeSpan.FromDays(1));
cancellationToken.Cancel();
var maxAttemptCount = 2;
Assert.ThrowsAsync<TaskCanceledException>(async () => await _db.ExecuteReaderAsync<string>("mobile_ro", "db.v1.sp_foo", 1,
maxAttemptCount,
cancellationToken.Token,
new IDbDataParameter[]
{
new SqlParameter("@param1", "value1"),
new SqlParameter("@param2", "value2")
}, reader => { return Task.FromResult("");})
);

_dbResources.Verify(x => x.ChooseDb("mobile_ro").UpdateWeight(It.IsAny<string>(), false), Times.Exactly(maxAttemptCount));
}


protected class FakeStoredProc : IStoredProc<string, string>
{
public string DbName => "mobile_ro";
Expand Down
79 changes: 42 additions & 37 deletions Agoda.Frameworks.DB/DbRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,61 +303,66 @@ public Task<T> ExecuteReaderAsync<T>(
string storedProc,
int timeoutSecs,
int maxAttemptCount,
CancellationToken token,
int taskCancellationTimeOutInMilliSecs,
IDbDataParameter[] parameters,
Func<SqlDataReader, Task<T>> callback)
{
return _dbResources.ChooseDb(database).ExecuteAsync(async (connectionStr, _) =>
{
var stopwatch = Stopwatch.StartNew();
Exception error = null;
try
using (var cancellationTokenSource = new CancellationTokenSource())
{
using (var connection = _generateConnection(connectionStr))
cancellationTokenSource.CancelAfter(taskCancellationTimeOutInMilliSecs);
try
{
if (connection is SqlConnection sqlConn)
using (var connection = _generateConnection(connectionStr))
{
await sqlConn.OpenAsync();
}
else
{
connection.Open();
}
SqlCommand sqlCommand = null;
try
{
sqlCommand = new SqlCommand(storedProc, connection as SqlConnection)
if (connection is SqlConnection sqlConn)
{
CommandType = CommandType.StoredProcedure,
CommandTimeout = timeoutSecs
};
sqlCommand.Parameters.AddRange(parameters);
using (var reader = await sqlCommand.ExecuteReaderAsync(token))
await sqlConn.OpenAsync(cancellationTokenSource.Token);
}
else
{
return await callback(reader);
connection.Open();
}
}
finally
{
if (sqlCommand != null)
SqlCommand sqlCommand = null;
try
{
sqlCommand.Parameters.Clear();
sqlCommand.Dispose();
sqlCommand = new SqlCommand(storedProc, connection as SqlConnection)
{
CommandType = CommandType.StoredProcedure,
CommandTimeout = timeoutSecs
};
sqlCommand.Parameters.AddRange(parameters);
using (var reader = await sqlCommand.ExecuteReaderAsync(cancellationTokenSource.Token))
{
return await callback(reader);
}
}
finally
{
if (sqlCommand != null)
{
sqlCommand.Parameters.Clear();
sqlCommand.Dispose();
}
}
}
}
catch (Exception e)
{
error = e;
throw;
}
finally
{
stopwatch.Stop();
RaiseOnExecuteReaderComplete(
database, storedProc, stopwatch.ElapsedMilliseconds, error);
}
}
catch (Exception e)
{
error = e;
throw;
}
finally
{
stopwatch.Stop();
RaiseOnExecuteReaderComplete(
database, storedProc, stopwatch.ElapsedMilliseconds, error);
}
}, ShouldRetry(maxAttemptCount), RaiseOnError);
}

Expand Down
2 changes: 1 addition & 1 deletion Agoda.Frameworks.DB/IDbRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Task<T> ExecuteReaderAsync<T>(
string storedProc,
int timeoutSecs,
int maxAttemptCount,
CancellationToken token,
int taskCancellationTimeOutInMilliSecs,
IDbDataParameter[] parameters,
Func<SqlDataReader, Task<T>> callback);

Expand Down

0 comments on commit d0c6daa

Please sign in to comment.