Skip to content

Commit

Permalink
Fixed Sqlite, removed transaction from dispatched, patched postgres o…
Browse files Browse the repository at this point in the history
…utbox (#2171)
  • Loading branch information
preardon authored Jul 1, 2022
1 parent 0d1f1bd commit 454599d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 32 deletions.
2 changes: 0 additions & 2 deletions src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,6 @@ public async Task MarkDispatchedAsync(IEnumerable<Guid> ids, DateTime? dispatche
await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
using (var command = InitMarkDispatchedCommand(connection, ids, dispatchedAt))
{
if (_connectionProvider.HasOpenTransaction)
command.Transaction = _connectionProvider.GetTransaction();
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
if (!_connectionProvider.IsSharedConnection)
Expand Down
2 changes: 0 additions & 2 deletions src/Paramore.Brighter.Outbox.MySql/MySqlOutboxSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,6 @@ public async Task MarkDispatchedAsync(IEnumerable<Guid> ids, DateTime? dispatche
await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
using (var command = InitMarkDispatchedCommand(connection, ids, dispatchedAt))
{
if (_connectionProvider.HasOpenTransaction)
command.Transaction = _connectionProvider.GetTransaction();
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
if (!_connectionProvider.IsSharedConnection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConne
{
using (var command = InitAddDbCommand(connection, parameters))
{
if (connectionProvider.HasOpenTransaction)
command.Transaction = connectionProvider.GetTransaction();
command.ExecuteNonQuery();
}
}
Expand Down
103 changes: 75 additions & 28 deletions src/Paramore.Brighter.Outbox.Sqlite/SqliteOutboxSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConne

try
{
if (connectionProvider.HasOpenTransaction) command.Transaction = connectionProvider.GetTransaction();
if (connectionProvider.HasOpenTransaction)
command.Transaction = connectionProvider.GetTransaction();
command.ExecuteNonQuery();
}
catch (SqliteException sqlException)
Expand All @@ -121,6 +122,11 @@ public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConne

throw;
}
finally
{
if(!connectionProvider.IsSharedConnection) connection.Dispose();
else if (!connectionProvider.HasOpenTransaction) connection.Close();
}
}
}

Expand All @@ -145,20 +151,27 @@ public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConne

try
{
if (connectionProvider.IsSharedConnection) command.Transaction = connectionProvider.GetTransaction();
if (connectionProvider.IsSharedConnection)
command.Transaction = connectionProvider.GetTransaction();
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
catch (SqliteException sqlException)
{
if (IsExceptionUnqiueOrDuplicateIssue(sqlException))
{
s_logger.LogWarning("MsSqlOutbox: A duplicate Message with the MessageId {Id} was inserted into the Outbox, ignoring and continuing",
s_logger.LogWarning(
"MsSqlOutbox: A duplicate Message with the MessageId {Id} was inserted into the Outbox, ignoring and continuing",
message.Id);
return;
}

throw;
}
finally
{
if(!connectionProvider.IsSharedConnection) connection.Dispose();
else if (!connectionProvider.HasOpenTransaction) await connection.CloseAsync();
}
}
}

Expand Down Expand Up @@ -192,6 +205,10 @@ public IEnumerable<Message> DispatchedMessages(
{
messages.Add(MapAMessage(dbDataReader));
}
dbDataReader.Close();

if(!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) connection.Close();

return messages;
}
Expand Down Expand Up @@ -236,7 +253,7 @@ public Message Get(Guid messageId, int outBoxTimeout = -1)
public async Task<IEnumerable<Message>> GetAsync(IEnumerable<Guid> messageIds, int outBoxTimeout = -1,
CancellationToken cancellationToken = default(CancellationToken))
{
var connection = _connectionProvider.GetConnection();
var connection = await _connectionProvider.GetConnectionAsync();
using (var command = connection.CreateCommand())
{
var inClause = string.Join(",", messageIds.ToList().Select((s, i) => "'" + s + "'").ToArray());
Expand All @@ -248,16 +265,18 @@ public async Task<IEnumerable<Message>> GetAsync(IEnumerable<Guid> messageIds, i
await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);

var messages = new List<Message>();
using (var dbDataReader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(ContinueOnCapturedContext))
{
var dbDataReader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(ContinueOnCapturedContext);

while (await dbDataReader.ReadAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext))
{
messages.Add(MapAMessage(dbDataReader));
}
}

;
dbDataReader.Close();

if(!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) await connection.CloseAsync();

return messages;
}
}
Expand All @@ -278,16 +297,20 @@ public IList<Message> Get(int pageSize = 100, int pageNumber = 1, Dictionary<str

if (connection.State != ConnectionState.Open) connection.Open();

using (var dbDataReader = command.ExecuteReader())
{
var messages = new List<Message>();
while (dbDataReader.Read())
{
messages.Add(MapAMessage(dbDataReader));
}
var dbDataReader = command.ExecuteReader();

return messages;
var messages = new List<Message>();
while (dbDataReader.Read())
{
messages.Add(MapAMessage(dbDataReader));
}

dbDataReader.Close();

if (!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) connection.Close();

return messages;
}
}

Expand All @@ -310,17 +333,21 @@ public async Task<IList<Message>> GetAsync(
{
CreatePagedRead(command, pageSize, pageNumber);

if (connection.State != ConnectionState.Open) await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
if (connection.State != ConnectionState.Open)
await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);

var messages = new List<Message>();
using (var dbDataReader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(ContinueOnCapturedContext))
var dbDataReader = await command.ExecuteReaderAsync(cancellationToken)
.ConfigureAwait(ContinueOnCapturedContext);

while (await dbDataReader.ReadAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext))
{
while (await dbDataReader.ReadAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext))
{
messages.Add(MapAMessage(dbDataReader));
}
messages.Add(MapAMessage(dbDataReader));
}
dbDataReader.Close();

if (!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) await connection.CloseAsync();

return messages;
}
Expand All @@ -343,13 +370,15 @@ public async Task MarkDispatchedAsync(IEnumerable<Guid> ids, DateTime? dispatche
Dictionary<string, object> args = null,
CancellationToken cancellationToken = default(CancellationToken))
{
var connection = _connectionProvider.GetConnection();
var connection = await _connectionProvider.GetConnectionAsync(cancellationToken);
if (connection.State != ConnectionState.Open)
await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
using (var command = InitMarkDispatchedCommand(connection, ids, dispatchedAt))
{
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext);
}
if (!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) await connection.CloseAsync();
}

/// <summary>
Expand All @@ -365,6 +394,8 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary<st
{
command.ExecuteNonQuery();
}
if (!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) connection.Close();
}

/// <summary>
Expand Down Expand Up @@ -396,6 +427,10 @@ public IEnumerable<Message> OutstandingMessages(
{
messages.Add(MapAMessage(dbDataReader));
}
dbDataReader.Close();

if(!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) connection.Close();

return messages;
}
Expand All @@ -418,7 +453,7 @@ public async Task<IEnumerable<Message>> OutstandingMessagesAsync(
Dictionary<string, object> args = null,
CancellationToken cancellationToken = default)
{
var connection = _connectionProvider.GetConnection();
var connection = await _connectionProvider.GetConnectionAsync(cancellationToken);
using (var command = connection.CreateCommand())
{
CreatePagedOutstandingCommand(command, millSecondsSinceSent, pageSize, pageNumber);
Expand All @@ -432,6 +467,10 @@ public async Task<IEnumerable<Message>> OutstandingMessagesAsync(
{
messages.Add(MapAMessage(dbDataReader));
}
dbDataReader.Close();

if(!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) await connection.CloseAsync();
return messages;
}
}
Expand Down Expand Up @@ -500,6 +539,10 @@ private T ExecuteCommand<T>(Func<SqliteCommand, T> execute, string sql, int outb
if (outboxTimeout != -1) command.CommandTimeout = outboxTimeout;

var item = execute(command);

if(!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) connection.Close();

return item;
}
}
Expand All @@ -511,7 +554,7 @@ private async Task<T> ExecuteCommandAsync<T>(
CancellationToken cancellationToken = default(CancellationToken),
params SqliteParameter[] parameters)
{
var connection = _connectionProvider.GetConnection();
var connection = await _connectionProvider.GetConnectionAsync(cancellationToken);

using (var command = connection.CreateCommand())
{
Expand All @@ -522,6 +565,10 @@ private async Task<T> ExecuteCommandAsync<T>(
AddParamtersParamArrayToCollection(parameters, command);

var item = await execute(command).ConfigureAwait(ContinueOnCapturedContext);

if(!_connectionProvider.IsSharedConnection) connection.Dispose();
else if (!_connectionProvider.HasOpenTransaction) await connection.CloseAsync();

return item;
}
}
Expand Down

0 comments on commit 454599d

Please sign in to comment.