Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System.InvalidCastException thrown when using UsePostgreSqlOutbox ServiceCollection Extension #1837

24 changes: 12 additions & 12 deletions src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ public class PostgresSqlInbox : IAmAnInbox, IAmAnInboxAsync
/// thread specific storage
/// such as HTTPContext
/// </summary>
public bool ContinueOnCapturedContext { get; set; }
public bool ContinueOnCapturedContext { get; set; }

public PostgresSqlInbox(PostgresSqlInboxConfiguration postgresSqlInboxConfiguration)
{
_configuration = postgresSqlInboxConfiguration;
ContinueOnCapturedContext = false;
}


public void Add<T>(T command, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest
{
var parameters = InitAddDbParameters(command, contextKey);

using (var connection = GetConnection())
{
connection.Open();
Expand All @@ -79,15 +79,15 @@ public void Add<T>(T command, string contextKey, int timeoutInMilliseconds = -1)
command.Id);
return;
}

throw;
}
}
}

public T Get<T>(Guid id, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest
{
var sql = $"select * from {_configuration.InBoxTableName} where CommandId = @CommandId AND ContextKey = @ContextKey";
var sql = $"SELECT * FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey";
var parameters = new[]
{
CreateNpgsqlParameter("CommandId", id),
Expand Down Expand Up @@ -139,7 +139,7 @@ public async Task AddAsync<T>(T command, string contextKey, int timeoutInMillise

public async Task<T> GetAsync<T>(Guid id, string contextKey, int timeoutInMilliseconds = -1, CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest
{
var sql = $"select * from {_configuration.InBoxTableName} where CommandId = @CommandId AND ContextKey = @ContextKey";
var sql = $"SELECT * FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey";

var parameters = new[]
{
Expand All @@ -164,7 +164,7 @@ public async Task AddAsync<T>(T command, string contextKey, int timeoutInMillise
CreateNpgsqlParameter("CommandId", id),
CreateNpgsqlParameter("ContextKey", contextKey)
};

return await ExecuteCommandAsync<bool>(
async command =>
{
Expand All @@ -177,12 +177,12 @@ public async Task AddAsync<T>(T command, string contextKey, int timeoutInMillise
parameters)
.ConfigureAwait(ContinueOnCapturedContext);
}

private NpgsqlConnection GetConnection()
{
return new NpgsqlConnection(_configuration.ConnectionString);
}

private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value)
{
if (value != null)
Expand Down Expand Up @@ -251,7 +251,7 @@ private async Task<T> ExecuteCommandAsync<T>(
return item;
}
}

private TResult ReadCommand<TResult>(IDataReader dr, Guid commandId) where TResult : class, IRequest
{
if (dr.Read())
Expand Down
59 changes: 31 additions & 28 deletions src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ THE SOFTWARE. */

using System;
using System.Collections.Generic;
using Npgsql;
using System.Data;
using NpgsqlTypes;
using Paramore.Brighter.Logging;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Npgsql;
using NpgsqlTypes;
using Paramore.Brighter.Logging;

namespace Paramore.Brighter.Outbox.PostgreSql
{
Expand Down Expand Up @@ -97,10 +97,10 @@ public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConne
/// <param name="args">Additional parameters required for search, if any</param>
/// <returns>A list of dispatched messages</returns>
public IEnumerable<Message> DispatchedMessages(
double millisecondsDispatchedSince,
int pageSize = 100,
double millisecondsDispatchedSince,
int pageSize = 100,
int pageNumber = 1,
int outboxTimeout = -1,
int outboxTimeout = -1,
Dictionary<string, object> args = null)
{
using (var connection = GetConnection())
Expand Down Expand Up @@ -193,8 +193,8 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary<st
/// <param name="args">Additional parameters required for search, if any</param>
/// <returns>A list of messages that are outstanding for dispatch</returns>
public IEnumerable<Message> OutstandingMessages(
double millSecondsSinceSent,
int pageSize = 100,
double millSecondsSinceSent,
int pageSize = 100,
int pageNumber = 1,
Dictionary<string, object> args = null)
{
Expand Down Expand Up @@ -229,10 +229,11 @@ private void CreatePagedDispatchedCommand(NpgsqlCommand command, double millisec
int pageSize, int pageNumber)
{
var pagingSqlFormat =
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < DATEADD(millisecond, @OutStandingSince, getdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
var parameters = new[]
{
CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize),
CreateNpgsqlParameter("PageNumber", pageNumber),
CreateNpgsqlParameter("PageSize", pageSize),
CreateNpgsqlParameter("OutstandingSince", -1 * millisecondsDispatchedSince)
};

Expand All @@ -249,7 +250,8 @@ private void CreatePagedReadCommand(NpgsqlCommand command, PostgreSqlOutboxConfi
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC";
var parameters = new[]
{
CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize)
CreateNpgsqlParameter("PageNumber", pageNumber),
CreateNpgsqlParameter("PageSize", pageSize)
};

var sql = string.Format(pagingSqlFormat, _configuration.OutboxTableName);
Expand All @@ -262,10 +264,11 @@ private void CreatePagedOutstandingCommand(NpgsqlCommand command, double milliSe
int pageNumber)
{
var pagingSqlFormat =
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < DATEADD(millisecond, @OutStandingSince, getdate()) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC";
"SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC";
var parameters = new[]
{
CreateNpgsqlParameter("PageNumber", pageNumber), CreateNpgsqlParameter("PageSize", pageSize),
CreateNpgsqlParameter("PageNumber", pageNumber),
CreateNpgsqlParameter("PageSize", pageSize),
CreateNpgsqlParameter("OutstandingSince", milliSecondsSinceAdded)
};

Expand Down Expand Up @@ -319,7 +322,7 @@ private NpgsqlParameter[] InitAddDbParameters(Message message)
CreateNpgsqlParameter("CorrelationId", message.Header.CorrelationId),
CreateNpgsqlParameter("ReplyTo", message.Header.ReplyTo),
CreateNpgsqlParameter("ContentType", message.Header.ContentType),
CreateNpgsqlParameter("HeaderBag", bagjson),
CreateNpgsqlParameter("HeaderBag", bagjson),
CreateNpgsqlParameter("Body", message.Body.Value)
};
return parameters;
Expand Down Expand Up @@ -358,13 +361,13 @@ private Message MapAMessage(IDataReader dr)
var correlationId = GetCorrelationId(dr);
var replyTo = GetReplyTo(dr);
var contentType = GetContentType(dr);

var header = new MessageHeader(
messageId:id,
topic:topic,
messageType:messageType,
timeStamp:timeStamp,
handledCount:0,
messageId:id,
topic:topic,
messageType:messageType,
timeStamp:timeStamp,
handledCount:0,
delayedMilliseconds: 0,
correlationId: correlationId,
replyTo: replyTo,
Expand Down Expand Up @@ -402,17 +405,17 @@ private static Guid GetMessageId(IDataReader dr)
private string GetContentType(IDataReader dr)
{
var ordinal = dr.GetOrdinal("ContentType");
if (dr.IsDBNull(ordinal)) return null;
if (dr.IsDBNull(ordinal)) return null;

var replyTo = dr.GetString(ordinal);
return replyTo;
}

private string GetReplyTo(IDataReader dr)
{
var ordinal = dr.GetOrdinal("ReplyTo");
if (dr.IsDBNull(ordinal)) return null;
if (dr.IsDBNull(ordinal)) return null;

var replyTo = dr.GetString(ordinal);
return replyTo;
}
Expand All @@ -428,8 +431,8 @@ private static Dictionary<string, object> GetContextBag(IDataReader dr)
private Guid? GetCorrelationId(IDataReader dr)
{
var ordinal = dr.GetOrdinal("CorrelationId");
if (dr.IsDBNull(ordinal)) return null;
if (dr.IsDBNull(ordinal)) return null;

var correlationId = dr.GetGuid(ordinal);
return correlationId;
}
Expand All @@ -444,7 +447,7 @@ private static DateTime GetTimeStamp(IDataReader dr)
}

}


}

Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ public static IBrighterBuilder UsePostgreSqlOutbox(
{
brighterBuilder.Services.AddSingleton<PostgreSqlOutboxConfiguration>(configuration);

brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxSync<Message>), BuildDynamoDbOutbox, serviceLifetime));
brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxAsync<Message>), BuildDynamoDbOutbox, serviceLifetime));

brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxSync<Message>), BuildPostgreSqlOutboxSync, serviceLifetime));

return brighterBuilder;
}

private static PostgreSqlOutboxSync BuildDynamoDbOutbox(IServiceProvider provider)
private static PostgreSqlOutboxSync BuildPostgreSqlOutboxSync(IServiceProvider provider)
{
var config = provider.GetService<PostgreSqlOutboxConfiguration>();

Expand Down