Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System.InvalidCastException thrown when using UsePostgreSqlOutbox ServiceCollection Extension #1837

52 changes: 26 additions & 26 deletions src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ THE SOFTWARE. */

using System;
using System.Collections.Generic;
using Npgsql;
using System.Data;
using NpgsqlTypes;
using Paramore.Brighter.Logging;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Npgsql;
using NpgsqlTypes;
using Paramore.Brighter.Logging;

namespace Paramore.Brighter.Outbox.PostgreSql
{
Expand Down Expand Up @@ -97,10 +97,10 @@ public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConne
/// <param name="args">Additional parameters required for search, if any</param>
/// <returns>A list of dispatched messages</returns>
public IEnumerable<Message> DispatchedMessages(
double millisecondsDispatchedSince,
int pageSize = 100,
double millisecondsDispatchedSince,
int pageSize = 100,
int pageNumber = 1,
int outboxTimeout = -1,
int outboxTimeout = -1,
Dictionary<string, object> args = null)
{
using (var connection = GetConnection())
Expand Down Expand Up @@ -193,8 +193,8 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary<st
/// <param name="args">Additional parameters required for search, if any</param>
/// <returns>A list of messages that are outstanding for dispatch</returns>
public IEnumerable<Message> OutstandingMessages(
double millSecondsSinceSent,
int pageSize = 100,
double millSecondsSinceSent,
int pageSize = 100,
int pageNumber = 1,
Dictionary<string, object> args = null)
{
Expand Down Expand Up @@ -229,7 +229,7 @@ private void CreatePagedDispatchedCommand(NpgsqlCommand command, double millisec
int pageSize, int pageNumber)
{
var pagingSqlFormat =
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < DATEADD(millisecond, @OutStandingSince, getdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutStandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
var parameters = new[]
{
CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize),
Expand Down Expand Up @@ -262,7 +262,7 @@ private void CreatePagedOutstandingCommand(NpgsqlCommand command, double milliSe
int pageNumber)
{
var pagingSqlFormat =
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < DATEADD(millisecond, @OutStandingSince, getdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC";
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutStandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC";
var parameters = new[]
{
CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize),
Expand Down Expand Up @@ -319,7 +319,7 @@ private NpgsqlParameter[] InitAddDbParameters(Message message)
CreateNpgsqlParameter("CorrelationId", message.Header.CorrelationId),
CreateNpgsqlParameter("ReplyTo", message.Header.ReplyTo),
CreateNpgsqlParameter("ContentType", message.Header.ContentType),
CreateNpgsqlParameter("HeaderBag", bagjson),
CreateNpgsqlParameter("HeaderBag", bagjson),
CreateNpgsqlParameter("Body", message.Body.Value)
};
return parameters;
Expand All @@ -330,7 +330,7 @@ private NpgsqlCommand InitMarkDispatchedCommand(NpgsqlConnection connection, Gui
{
var command = connection.CreateCommand();
var sql =
$"UPDATE {_configuration.OutboxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @mMessageId";
$"UPDATE {_configuration.OutboxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @MessageId";
command.CommandText = sql;
command.Parameters.Add(CreateNpgsqlParameter("MessageId", messageId));
command.Parameters.Add(CreateNpgsqlParameter("DispatchedAt", dispatchedAt));
Expand Down Expand Up @@ -358,13 +358,13 @@ private Message MapAMessage(IDataReader dr)
var correlationId = GetCorrelationId(dr);
var replyTo = GetReplyTo(dr);
var contentType = GetContentType(dr);

var header = new MessageHeader(
messageId:id,
topic:topic,
messageType:messageType,
timeStamp:timeStamp,
handledCount:0,
messageId:id,
topic:topic,
messageType:messageType,
timeStamp:timeStamp,
handledCount:0,
delayedMilliseconds: 0,
correlationId: correlationId,
replyTo: replyTo,
Expand Down Expand Up @@ -402,17 +402,17 @@ private static Guid GetMessageId(IDataReader dr)
private string GetContentType(IDataReader dr)
{
var ordinal = dr.GetOrdinal("ContentType");
if (dr.IsDBNull(ordinal)) return null;
if (dr.IsDBNull(ordinal)) return null;

var replyTo = dr.GetString(ordinal);
return replyTo;
}

private string GetReplyTo(IDataReader dr)
{
var ordinal = dr.GetOrdinal("ReplyTo");
if (dr.IsDBNull(ordinal)) return null;
if (dr.IsDBNull(ordinal)) return null;

var replyTo = dr.GetString(ordinal);
return replyTo;
}
Expand All @@ -428,8 +428,8 @@ private static Dictionary<string, object> GetContextBag(IDataReader dr)
private Guid? GetCorrelationId(IDataReader dr)
{
var ordinal = dr.GetOrdinal("CorrelationId");
if (dr.IsDBNull(ordinal)) return null;
if (dr.IsDBNull(ordinal)) return null;

var correlationId = dr.GetGuid(ordinal);
return correlationId;
}
Expand All @@ -444,7 +444,7 @@ private static DateTime GetTimeStamp(IDataReader dr)
}

}


}

Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ public static IBrighterBuilder UsePostgreSqlOutbox(
{
brighterBuilder.Services.AddSingleton<PostgreSqlOutboxConfiguration>(configuration);

brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxSync<Message>), BuildDynamoDbOutbox, serviceLifetime));
brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxAsync<Message>), BuildDynamoDbOutbox, serviceLifetime));

brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxSync<Message>), BuildPostgreSqlOutboxSync, serviceLifetime));

return brighterBuilder;
}

private static PostgreSqlOutboxSync BuildDynamoDbOutbox(IServiceProvider provider)
private static PostgreSqlOutboxSync BuildPostgreSqlOutboxSync(IServiceProvider provider)
{
var config = provider.GetService<PostgreSqlOutboxConfiguration>();

Expand Down