From e9658c7c24144606bc1f10e86d6dbd59c133feed Mon Sep 17 00:00:00 2001 From: Sam Rumley Date: Thu, 11 Nov 2021 16:43:29 +0000 Subject: [PATCH 1/3] - Fix bug in UsePostgreSqlOutbox ServiceCollection extension regarding the DI registration of the IAmAnOutboxAsync interface. - Fix bugs in PostgreSql syntax for PostgreSqlOutboxSync.CreatePagedDispatchedCommand, PostgreSqlOutboxSync.CreatePagedOutstandingCommand and PostgreSqlOutboxSync.InitMarkDispatchedCommand. --- .../PostgreSqlOutboxSync.cs | 52 +++++++++---------- .../ServiceCollectionExtensions.cs | 7 ++- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs index 67f98c057d..b168ed40dc 100644 --- a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs +++ b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs @@ -25,12 +25,12 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; -using Npgsql; using System.Data; -using NpgsqlTypes; -using Paramore.Brighter.Logging; using System.Text.Json; using Microsoft.Extensions.Logging; +using Npgsql; +using NpgsqlTypes; +using Paramore.Brighter.Logging; namespace Paramore.Brighter.Outbox.PostgreSql { @@ -97,10 +97,10 @@ public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConne /// Additional parameters required for search, if any /// A list of dispatched messages public IEnumerable DispatchedMessages( - double millisecondsDispatchedSince, - int pageSize = 100, + double millisecondsDispatchedSince, + int pageSize = 100, int pageNumber = 1, - int outboxTimeout = -1, + int outboxTimeout = -1, Dictionary args = null) { using (var connection = GetConnection()) @@ -193,8 +193,8 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, DictionaryAdditional parameters required for search, if any /// A list of messages that are outstanding for dispatch public IEnumerable OutstandingMessages( - double millSecondsSinceSent, - int pageSize = 100, + double millSecondsSinceSent, + int pageSize = 100, int pageNumber = 1, Dictionary args = null) { @@ -229,7 +229,7 @@ private void CreatePagedDispatchedCommand(NpgsqlCommand command, double millisec int pageSize, int pageNumber) { var pagingSqlFormat = - "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < DATEADD(millisecond, @OutStandingSince, getdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; + "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutStandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; var parameters = new[] { CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize), @@ -262,7 +262,7 @@ private void CreatePagedOutstandingCommand(NpgsqlCommand command, double milliSe int pageNumber) { var pagingSqlFormat = - "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < DATEADD(millisecond, @OutStandingSince, getdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC"; + "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutStandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC"; var parameters = new[] { CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize), @@ -319,7 +319,7 @@ private NpgsqlParameter[] InitAddDbParameters(Message message) CreateNpgsqlParameter("CorrelationId", message.Header.CorrelationId), CreateNpgsqlParameter("ReplyTo", message.Header.ReplyTo), CreateNpgsqlParameter("ContentType", message.Header.ContentType), - CreateNpgsqlParameter("HeaderBag", bagjson), + CreateNpgsqlParameter("HeaderBag", bagjson), CreateNpgsqlParameter("Body", message.Body.Value) }; return parameters; @@ -330,7 +330,7 @@ private NpgsqlCommand InitMarkDispatchedCommand(NpgsqlConnection connection, Gui { var command = connection.CreateCommand(); var sql = - $"UPDATE {_configuration.OutboxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @mMessageId"; + $"UPDATE {_configuration.OutboxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @MessageId"; command.CommandText = sql; command.Parameters.Add(CreateNpgsqlParameter("MessageId", messageId)); command.Parameters.Add(CreateNpgsqlParameter("DispatchedAt", dispatchedAt)); @@ -358,13 +358,13 @@ private Message MapAMessage(IDataReader dr) var correlationId = GetCorrelationId(dr); var replyTo = GetReplyTo(dr); var contentType = GetContentType(dr); - + var header = new MessageHeader( - messageId:id, - topic:topic, - messageType:messageType, - timeStamp:timeStamp, - handledCount:0, + messageId:id, + topic:topic, + messageType:messageType, + timeStamp:timeStamp, + handledCount:0, delayedMilliseconds: 0, correlationId: correlationId, replyTo: replyTo, @@ -402,8 +402,8 @@ private static Guid GetMessageId(IDataReader dr) private string GetContentType(IDataReader dr) { var ordinal = dr.GetOrdinal("ContentType"); - if (dr.IsDBNull(ordinal)) return null; - + if (dr.IsDBNull(ordinal)) return null; + var replyTo = dr.GetString(ordinal); return replyTo; } @@ -411,8 +411,8 @@ private string GetContentType(IDataReader dr) private string GetReplyTo(IDataReader dr) { var ordinal = dr.GetOrdinal("ReplyTo"); - if (dr.IsDBNull(ordinal)) return null; - + if (dr.IsDBNull(ordinal)) return null; + var replyTo = dr.GetString(ordinal); return replyTo; } @@ -428,8 +428,8 @@ private static Dictionary GetContextBag(IDataReader dr) private Guid? GetCorrelationId(IDataReader dr) { var ordinal = dr.GetOrdinal("CorrelationId"); - if (dr.IsDBNull(ordinal)) return null; - + if (dr.IsDBNull(ordinal)) return null; + var correlationId = dr.GetGuid(ordinal); return correlationId; } @@ -444,7 +444,7 @@ private static DateTime GetTimeStamp(IDataReader dr) } } - - + + } diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs b/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs index 0fd4de80f0..f8ff53093e 100644 --- a/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs +++ b/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs @@ -11,13 +11,12 @@ public static IBrighterBuilder UsePostgreSqlOutbox( { brighterBuilder.Services.AddSingleton(configuration); - brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxSync), BuildDynamoDbOutbox, serviceLifetime)); - brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxAsync), BuildDynamoDbOutbox, serviceLifetime)); - + brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxSync), BuildPostgreSqlOutboxSync, serviceLifetime)); + return brighterBuilder; } - private static PostgreSqlOutboxSync BuildDynamoDbOutbox(IServiceProvider provider) + private static PostgreSqlOutboxSync BuildPostgreSqlOutboxSync(IServiceProvider provider) { var config = provider.GetService(); From 744aaf6056374e99a9efe4981031c546bc491521 Mon Sep 17 00:00:00 2001 From: Sam Rumley Date: Mon, 15 Nov 2021 23:12:33 +0000 Subject: [PATCH 2/3] Fixed PostgreSqlInbox NpgsqlParameter.ParameterName and inline parameter name combinations which have do not have the exact same casing. Npgsql 6.0 no longer matches in a case-insensitive manner. https://github.com/npgsql/npgsql/issues/4027 --- .../PostgresSqlInbox.cs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs index 065553b881..691d0afa11 100644 --- a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs +++ b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs @@ -49,19 +49,19 @@ public class PostgresSqlInbox : IAmAnInbox, IAmAnInboxAsync /// thread specific storage /// such as HTTPContext /// - public bool ContinueOnCapturedContext { get; set; } - + public bool ContinueOnCapturedContext { get; set; } + public PostgresSqlInbox(PostgresSqlInboxConfiguration postgresSqlInboxConfiguration) { _configuration = postgresSqlInboxConfiguration; ContinueOnCapturedContext = false; } - - + + public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest { var parameters = InitAddDbParameters(command, contextKey); - + using (var connection = GetConnection()) { connection.Open(); @@ -79,7 +79,7 @@ public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) command.Id); return; } - + throw; } } @@ -87,7 +87,7 @@ public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) public T Get(Guid id, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest { - var sql = $"select * from {_configuration.InBoxTableName} where CommandId = @commandId AND ContextKey = @contextKey"; + var sql = $"SELECT * FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey"; var parameters = new[] { CreateNpgsqlParameter("CommandId", id), @@ -99,7 +99,7 @@ public T Get(Guid id, string contextKey, int timeoutInMilliseconds = -1) wher public bool Exists(Guid id, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest { - var sql = $"SELECT DISTINCT CommandId FROM {_configuration.InBoxTableName} WHERE CommandId = @commandId AND ContextKey = @contextKey FETCH FIRST 1 ROWS ONLY"; + var sql = $"SELECT DISTINCT CommandId FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey FETCH FIRST 1 ROWS ONLY"; var parameters = new[] { CreateNpgsqlParameter("CommandId", id), @@ -139,7 +139,7 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise public async Task GetAsync(Guid id, string contextKey, int timeoutInMilliseconds = -1, CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest { - var sql = $"select * from {_configuration.InBoxTableName} where CommandId = @commandId AND ContextKey = @contextKey"; + var sql = $"SELECT * FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey"; var parameters = new[] { @@ -158,13 +158,13 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise public async Task ExistsAsync(Guid id, string contextKey, int timeoutInMilliseconds = -1, CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest { - var sql = $"SELECT DISTINCT CommandId FROM {_configuration.InBoxTableName} WHERE CommandId = @commandId AND ContextKey = @contextKey FETCH FIRST 1 ROWS ONLY"; + var sql = $"SELECT DISTINCT CommandId FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey FETCH FIRST 1 ROWS ONLY"; var parameters = new[] { CreateNpgsqlParameter("CommandId", id), CreateNpgsqlParameter("ContextKey", contextKey) }; - + return await ExecuteCommandAsync( async command => { @@ -177,12 +177,12 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise parameters) .ConfigureAwait(ContinueOnCapturedContext); } - + private NpgsqlConnection GetConnection() { return new NpgsqlConnection(_configuration.ConnectionString); } - + private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value) { if (value != null) @@ -251,7 +251,7 @@ private async Task ExecuteCommandAsync( return item; } } - + private TResult ReadCommand(IDataReader dr, Guid commandId) where TResult : class, IRequest { if (dr.Read()) From d70dab9dd10f11d2abb220a5bf8781ce4fa132b3 Mon Sep 17 00:00:00 2001 From: Sam Rumley Date: Tue, 16 Nov 2021 01:09:16 +0000 Subject: [PATCH 3/3] Fixed PostgreSqlOutboxSync NpgsqlParameter.ParameterName and inline parameter name combinations which have do not have the exact same casing. Npgsql 6.0 no longer matches in a case-insensitive manner. https://github.com/npgsql/npgsql/issues/4027 --- .../PostgreSqlOutboxSync.cs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs index b168ed40dc..8bf59bfaa7 100644 --- a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs +++ b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs @@ -229,10 +229,11 @@ private void CreatePagedDispatchedCommand(NpgsqlCommand command, double millisec int pageSize, int pageNumber) { var pagingSqlFormat = - "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutStandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; + "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; var parameters = new[] { - CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize), + CreateNpgsqlParameter("PageNumber", pageNumber), + CreateNpgsqlParameter("PageSize", pageSize), CreateNpgsqlParameter("OutstandingSince", -1 * millisecondsDispatchedSince) }; @@ -249,7 +250,8 @@ private void CreatePagedReadCommand(NpgsqlCommand command, PostgreSqlOutboxConfi "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; var parameters = new[] { - CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize) + CreateNpgsqlParameter("PageNumber", pageNumber), + CreateNpgsqlParameter("PageSize", pageSize) }; var sql = string.Format(pagingSqlFormat, _configuration.OutboxTableName); @@ -262,10 +264,11 @@ private void CreatePagedOutstandingCommand(NpgsqlCommand command, double milliSe int pageNumber) { var pagingSqlFormat = - "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutStandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC"; + "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC"; var parameters = new[] { - CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize), + CreateNpgsqlParameter("PageNumber", pageNumber), + CreateNpgsqlParameter("PageSize", pageSize), CreateNpgsqlParameter("OutstandingSince", milliSecondsSinceAdded) };