Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: Support async sns/sqs message publishing v9 #3269

Merged
merged 6 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ THE SOFTWARE. */

using System;
using System.Collections.Generic;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ THE SOFTWARE. */

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Paramore.Brighter.MessagingGateway.AWSSQS
{
/// <summary>
/// Class SqsMessageProducer.
/// </summary>
public class SqsMessageProducer : AWSMessagingGateway, IAmAMessageProducerSync
public class SqsMessageProducer : AWSMessagingGateway, IAmAMessageProducerSync, IAmAMessageProducerAsync
{
/// <summary>
/// How many outstanding messages may the outbox have before we terminate the programme with an OutboxLimitReached exception?
Expand Down Expand Up @@ -92,33 +93,42 @@ public bool ConfirmTopicExists(string topic = null)

return !string.IsNullOrEmpty(ChannelTopicArn);
}

/// <summary>
/// Sends the specified message.
/// </summary>
/// <param name="message">The message.</param>
public async Task SendAsync(Message message)
{
s_logger.LogDebug("SQSMessageProducer: Publishing message with topic {Topic} and id {Id} and message: {Request}",
message.Header.Topic, message.Id, message.Body);

ConfirmTopicExists(message.Header.Topic);

using (var client = _clientFactory.CreateSnsClient())
{
var publisher = new SqsMessagePublisher(ChannelTopicArn, client);
var messageId = await publisher.PublishAsync(message);
if (messageId != null)
{
s_logger.LogDebug(
"SQSMessageProducer: Published message with topic {Topic}, Brighter messageId {MessageId} and SNS messageId {SNSMessageId}",
message.Header.Topic, message.Id, messageId);
return;
}
}

throw new InvalidOperationException(
string.Format($"Failed to publish message with topic {message.Header.Topic} and id {message.Id} and message: {message.Body}"));
}

/// <summary>
/// Sends the specified message.
/// </summary>
/// <param name="message">The message.</param>
public void Send(Message message)
{
s_logger.LogDebug("SQSMessageProducer: Publishing message with topic {Topic} and id {Id} and message: {Request}",
message.Header.Topic, message.Id, message.Body);

ConfirmTopicExists(message.Header.Topic);

using (var client = _clientFactory.CreateSnsClient())
{
var publisher = new SqsMessagePublisher(ChannelTopicArn, client);
var messageId = publisher.Publish(message);
if (messageId != null)
{
s_logger.LogDebug(
"SQSMessageProducer: Published message with topic {Topic}, Brighter messageId {MessageId} and SNS messageId {SNSMessageId}",
message.Header.Topic, message.Id, messageId);
return;
}
}

throw new InvalidOperationException(
string.Format($"Failed to publish message with topic {message.Header.Topic} and id {message.Id} and message: {message.Body}"));
SendAsync(message).Wait();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ THE SOFTWARE. */

using System;
using System.Collections.Generic;
using System.Net.Mime;
using System.Text.Json;
using System.Threading.Tasks;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Paramore.Brighter.Transforms.Transformers;

namespace Paramore.Brighter.MessagingGateway.AWSSQS
{
Expand All @@ -42,7 +41,7 @@ public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
_client = client;
}

public string Publish(Message message)
public async Task<string> PublishAsync(Message message)
{
var messageString = message.Body.Value;
var publishRequest = new PublishRequest(_topicArn, messageString);
Expand All @@ -65,13 +64,18 @@ public string Publish(Message message)
publishRequest.MessageAttributes = messageAttributes;


var response = _client.PublishAsync(publishRequest).GetAwaiter().GetResult();
var response = await _client.PublishAsync(publishRequest);
if (response.HttpStatusCode == System.Net.HttpStatusCode.OK || response.HttpStatusCode == System.Net.HttpStatusCode.Created || response.HttpStatusCode == System.Net.HttpStatusCode.Accepted)
{
return response.MessageId;
}

return null;
}

public string Publish(Message message)
{
return PublishAsync(message).GetAwaiter().GetResult();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,20 @@ public SqsMessageProducerSendTests()



[Fact]
public async Task When_posting_a_message_via_the_producer()
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task When_posting_a_message_via_the_producer(bool sendAsync)
{
//arrange
_messageProducer.Send(_message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have one version of the test that calls the sync method, and one version that calls the async method.

if (sendAsync)
{
_messageProducer.SendAsync(_message).Wait();
}
else
{
_messageProducer.Send(_message);
}

await Task.Delay(1000);

Expand Down
Loading