Skip to content

Commit

Permalink
Add new .NET Core async methods. Fixes #642
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Jun 28, 2019
1 parent 7cad67e commit 170523d
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 33 deletions.
6 changes: 6 additions & 0 deletions src/MySqlConnector/MySql.Data.MySqlClient/MySqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ protected override void Dispose(bool disposing)
m_isDisposed = true;
}

public Task DisposeAsync()
{
Dispose();
return Utility.CompletedTask;
}

/// <summary>
/// Registers <see cref="Cancel"/> as a callback with <paramref name="token"/> if cancellation is supported.
/// </summary>
Expand Down
50 changes: 38 additions & 12 deletions src/MySqlConnector/MySql.Data.MySqlClient/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,13 @@ private void TakeSessionFrom(MySqlConnection other)
#endif

public override void Close() => CloseAsync(changeState: true, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
public Task CloseAsync(CancellationToken cancellationToken = default) => CloseAsync(changeState: true, IOBehavior.Asynchronous, cancellationToken);
public Task CloseAsync() => CloseAsync(changeState: true, SimpleAsyncIOBehavior, CancellationToken.None);
public Task CloseAsync(CancellationToken cancellationToken) => CloseAsync(changeState: true, SimpleAsyncIOBehavior, cancellationToken);
internal Task CloseAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) => CloseAsync(changeState: true, ioBehavior, cancellationToken);

public override void ChangeDatabase(string databaseName) => ChangeDatabaseAsync(IOBehavior.Synchronous, databaseName, CancellationToken.None).GetAwaiter().GetResult();
public Task ChangeDatabaseAsync(string databaseName) => ChangeDatabaseAsync(IOBehavior.Asynchronous, databaseName, CancellationToken.None);
public Task ChangeDatabaseAsync(string databaseName, CancellationToken cancellationToken) => ChangeDatabaseAsync(IOBehavior.Asynchronous, databaseName, cancellationToken);
public Task ChangeDatabaseAsync(string databaseName) => ChangeDatabaseAsync(AsyncIOBehavior, databaseName, CancellationToken.None);
public Task ChangeDatabaseAsync(string databaseName, CancellationToken cancellationToken) => ChangeDatabaseAsync(AsyncIOBehavior, databaseName, cancellationToken);

private async Task ChangeDatabaseAsync(IOBehavior ioBehavior, string databaseName, CancellationToken cancellationToken)
{
Expand All @@ -239,7 +241,7 @@ private async Task ChangeDatabaseAsync(IOBehavior ioBehavior, string databaseNam
if (State != ConnectionState.Open)
throw new InvalidOperationException("Connection is not open.");

CloseDatabase();
await CloseDatabaseAsync(ioBehavior, cancellationToken).ConfigureAwait(false);

using (var initDatabasePayload = InitDatabasePayload.Create(databaseName))
await m_session.SendAsync(initDatabasePayload, ioBehavior, cancellationToken).ConfigureAwait(false);
Expand All @@ -251,8 +253,8 @@ private async Task ChangeDatabaseAsync(IOBehavior ioBehavior, string databaseNam
public new MySqlCommand CreateCommand() => (MySqlCommand) base.CreateCommand();

public bool Ping() => PingAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
public Task<bool> PingAsync() => PingAsync((m_connectionSettings?.ForceSynchronous ?? false) ? IOBehavior.Synchronous : IOBehavior.Asynchronous, CancellationToken.None).AsTask();
public Task<bool> PingAsync(CancellationToken cancellationToken) => PingAsync((m_connectionSettings?.ForceSynchronous ?? false) ? IOBehavior.Synchronous : IOBehavior.Asynchronous, cancellationToken).AsTask();
public Task<bool> PingAsync() => PingAsync(SimpleAsyncIOBehavior, CancellationToken.None).AsTask();
public Task<bool> PingAsync(CancellationToken cancellationToken) => PingAsync(SimpleAsyncIOBehavior, cancellationToken).AsTask();

private async ValueTask<bool> PingAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -421,6 +423,18 @@ protected override void Dispose(bool disposing)
}
}

public async Task DisposeAsync()
{
try
{
await CloseAsync(changeState: true, SimpleAsyncIOBehavior, CancellationToken.None).ConfigureAwait(false);
}
finally
{
m_isDisposed = true;
}
}

internal ServerSession Session
{
get
Expand Down Expand Up @@ -538,6 +552,9 @@ internal async Task<CachedProcedure> GetCachedProcedure(IOBehavior ioBehavior, s
internal bool TreatTinyAsBoolean => m_connectionSettings.TreatTinyAsBoolean;
internal IOBehavior AsyncIOBehavior => GetConnectionSettings().ForceSynchronous ? IOBehavior.Synchronous : IOBehavior.Asynchronous;

// Defaults to IOBehavior.Synchronous if the connection hasn't been opened yet; only use if it's a no-op for a closed connection.
internal IOBehavior SimpleAsyncIOBehavior => (m_connectionSettings?.ForceSynchronous ?? false) ? IOBehavior.Synchronous : IOBehavior.Asynchronous;

internal MySqlSslMode SslMode => m_connectionSettings.SslMode;

internal bool HasActiveReader => m_activeReader != null;
Expand Down Expand Up @@ -682,7 +699,8 @@ private async Task DoCloseAsync(bool changeState, IOBehavior ioBehavior, Cancell
if (m_enlistedTransaction is object)
{
// make sure all DB work is done
m_activeReader?.Dispose();
if (m_activeReader is object)
await m_activeReader.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
m_activeReader = null;

// This connection is being closed, so create a new MySqlConnection that will own the ServerSession
Expand Down Expand Up @@ -721,7 +739,7 @@ private async Task DoCloseAsync(bool changeState, IOBehavior ioBehavior, Cancell
{
try
{
CloseDatabase();
await CloseDatabaseAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
finally
{
Expand All @@ -740,13 +758,21 @@ private async Task DoCloseAsync(bool changeState, IOBehavior ioBehavior, Cancell
}
}

private void CloseDatabase()
private Task CloseDatabaseAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
m_cachedProcedures = null;
m_activeReader?.Dispose();
if (CurrentTransaction != null && m_session.IsConnected)
if (m_activeReader is null && CurrentTransaction is null)
return Utility.CompletedTask;
return DoCloseDatabaseAsync(ioBehavior, cancellationToken);
}

private async Task DoCloseDatabaseAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (m_activeReader is object)
await m_activeReader.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
if (CurrentTransaction is object && m_session.IsConnected)
{
CurrentTransaction.Dispose();
await CurrentTransaction.DisposeAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
CurrentTransaction = null;
}
}
Expand Down
15 changes: 7 additions & 8 deletions src/MySqlConnector/MySql.Data.MySqlClient/MySqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,7 @@ public override DataTable GetSchemaTable()
return m_schemaTable;
}

public override void Close()
{
DoClose();
}
public override void Close() => DisposeAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
#endif

public ReadOnlyCollection<DbColumn> GetColumnSchema()
Expand Down Expand Up @@ -307,14 +304,16 @@ protected override void Dispose(bool disposing)
try
{
if (disposing)
DoClose();
DisposeAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
}
finally
{
base.Dispose(disposing);
}
}

public Task DisposeAsync() => DisposeAsync(Connection?.AsyncIOBehavior ?? IOBehavior.Asynchronous, CancellationToken.None);

internal IMySqlCommand Command { get; private set; }
internal MySqlConnection Connection => Command?.Connection;
internal ServerSession Session => Command?.Connection.Session;
Expand Down Expand Up @@ -451,7 +450,7 @@ private MySqlDataReader(CommandListPosition commandListPosition, ICommandPayload
m_resultSet = new ResultSet(this);
}

private void DoClose()
internal async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (!m_closed)
{
Expand All @@ -462,7 +461,7 @@ private void DoClose()
Command.Connection.Session.SetTimeout(Constants.InfiniteTimeout);
try
{
while (NextResultAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult())
while (await NextResultAsync(ioBehavior, cancellationToken).ConfigureAwait(false))
{
}
}
Expand All @@ -481,7 +480,7 @@ private void DoClose()
if ((m_behavior & CommandBehavior.CloseConnection) != 0)
{
(Command as IDisposable)?.Dispose();
connection.Close();
await connection.CloseAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
Command = null;
}
Expand Down
40 changes: 27 additions & 13 deletions src/MySqlConnector/MySql.Data.MySqlClient/MySqlTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading;
using System.Threading.Tasks;
using MySqlConnector.Protocol.Serialization;
using MySqlConnector.Utilities;

namespace MySql.Data.MySqlClient
{
Expand Down Expand Up @@ -72,26 +73,39 @@ protected override void Dispose(bool disposing)
try
{
if (disposing)
{
m_isDisposed = true;
if (Connection?.CurrentTransaction == this)
{
if (Connection.State == ConnectionState.Open && Connection.Session.IsConnected)
{
using (var cmd = new MySqlCommand("rollback", Connection, this))
cmd.ExecuteNonQuery();
}
Connection.CurrentTransaction = null;
}
Connection = null;
}
DisposeAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
}
finally
{
base.Dispose(disposing);
}
}

public Task DisposeAsync() => DisposeAsync(Connection?.AsyncIOBehavior ?? IOBehavior.Asynchronous, CancellationToken.None);

internal Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
m_isDisposed = true;
if (Connection?.CurrentTransaction == this)
return DoDisposeAsync(ioBehavior, cancellationToken);
Connection = null;
return Utility.CompletedTask;
}

private async Task DoDisposeAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (Connection?.CurrentTransaction == this)
{
if (Connection.State == ConnectionState.Open && Connection.Session.IsConnected)
{
using (var cmd = new MySqlCommand("rollback", Connection, this))
await cmd.ExecuteNonQueryAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
}
Connection.CurrentTransaction = null;
}
Connection = null;
}

internal MySqlTransaction(MySqlConnection connection, IsolationLevel isolationLevel)
{
Connection = connection;
Expand Down
58 changes: 58 additions & 0 deletions tests/SideBySide/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ public async Task CommitAsync()
var results = await m_connection.QueryAsync<int>(@"select value from transactions_test order by value;").ConfigureAwait(false);
Assert.Equal(new[] { 1, 2 }, results);
}

[Fact]
public async Task CommitDisposeAsync()
{
await m_connection.ExecuteAsync("delete from transactions_test").ConfigureAwait(false);
MySqlTransaction trans = null;
try
{
trans = await m_connection.BeginTransactionAsync().ConfigureAwait(false);
await m_connection.ExecuteAsync("insert into transactions_test values(1), (2)", transaction: trans).ConfigureAwait(false);
await trans.CommitAsync().ConfigureAwait(false);
}
finally
{
await trans.DisposeAsync().ConfigureAwait(false);
}
var results = await m_connection.QueryAsync<int>(@"select value from transactions_test order by value;").ConfigureAwait(false);
Assert.Equal(new[] { 1, 2 }, results);
}
#endif

[Fact]
Expand Down Expand Up @@ -77,6 +96,25 @@ public async Task RollbackAsync()
var results = await m_connection.QueryAsync<int>(@"select value from transactions_test order by value;").ConfigureAwait(false);
Assert.Equal(new int[0], results);
}

[Fact]
public async Task RollbackDisposeAsync()
{
await m_connection.ExecuteAsync("delete from transactions_test").ConfigureAwait(false);
MySqlTransaction trans = null;
try
{
trans = await m_connection.BeginTransactionAsync().ConfigureAwait(false);
await m_connection.ExecuteAsync("insert into transactions_test values(1), (2)", transaction: trans).ConfigureAwait(false);
await trans.RollbackAsync().ConfigureAwait(false);
}
finally
{
await trans.DisposeAsync().ConfigureAwait(false);
}
var results = await m_connection.QueryAsync<int>(@"select value from transactions_test order by value;").ConfigureAwait(false);
Assert.Equal(new int[0], results);
}
#endif

[Fact]
Expand All @@ -91,6 +129,26 @@ public void NoCommit()
Assert.Equal(new int[0], results);
}

#if !BASELINE
[Fact]
public async Task DisposeAsync()
{
await m_connection.ExecuteAsync("delete from transactions_test").ConfigureAwait(false);
MySqlTransaction trans = null;
try
{
trans = await m_connection.BeginTransactionAsync().ConfigureAwait(false);
await m_connection.ExecuteAsync("insert into transactions_test values(1), (2)", transaction: trans).ConfigureAwait(false);
}
finally
{
await trans.DisposeAsync().ConfigureAwait(false);
}
var results = await m_connection.QueryAsync<int>(@"select value from transactions_test order by value;").ConfigureAwait(false);
Assert.Equal(new int[0], results);
}
#endif

readonly TransactionFixture m_database;
readonly MySqlConnection m_connection;
}
Expand Down

0 comments on commit 170523d

Please sign in to comment.