Skip to content

Commit

Permalink
Merge pull request #2 from BrighterCommand/master
Browse files Browse the repository at this point in the history
Merge from original master
  • Loading branch information
honkuan86 authored Mar 26, 2021
2 parents 71908af + 965491b commit 4b3a6e5
Show file tree
Hide file tree
Showing 19 changed files with 133 additions and 52 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:

package:
runs-on: ubuntu-latest
timeout-minutes: 5
timeout-minutes: 10
needs: [test]

steps:
Expand All @@ -101,6 +101,18 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPOSITORY_OWNER: ${{ github.repository_owner }}

release:
if: startsWith(github.ref, 'refs/tags')
runs-on: ubuntu-latest
timeout-minutes: 5
needs: [package]
steps:
- uses: actions/download-artifact@v2
with:
name: nuget packages
- name: Push generated package to NuGet
run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.NUGET_KEY }} --skip-duplicate

memory:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ Brighter contributors (sorted alphabeticaly)
**[Tarun Pothulapati](https://github.com/Pothulapati)**
* PostgreSQL Outbox

**[Paul Reardon](https://github.com/preardon)**
* Improvements to ASB Transport

**[tmschlot](https://github.com/tmschlot)**
* Various fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.5.4.35" />
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.5.4.37" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.5.4.35" />
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.5.4.37" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Paramore.Brighter\Paramore.Brighter.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<ItemGroup>
<PackageReference Include="Polly" Version="7.2.1" />
<PackageReference Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.5.1.49" />
<PackageReference Include="AWSSDK.SQS" Version="3.5.1.25" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.5.1.50" />
<PackageReference Include="AWSSDK.SQS" Version="3.5.1.27" />
</ItemGroup>
</Project>
55 changes: 21 additions & 34 deletions src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,31 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
timeStamp = ReadTimestamp(sqsMessage);
replyTo = ReadReplyTo(sqsMessage);
receiptHandle = ReadReceiptHandle(sqsMessage);

if (false == (topic.Success && messageId.Success && contentType.Success && correlationId.Success
&& handledCount.Success && messageType.Success && timeStamp.Success
&& receiptHandle.Success))
{
return FailureMessage(topic, messageId);
}
else
{
var messageHeader = timeStamp.Success
? new MessageHeader(messageId.Result, topic.Result, messageType.Result, timeStamp.Result, handledCount.Result, 0)
: new MessageHeader(messageId.Result, topic.Result, messageType.Result);

if (correlationId.Success)
messageHeader.CorrelationId = correlationId.Result;
var messageHeader = timeStamp.Success
? new MessageHeader(messageId.Result, topic.Result, messageType.Result, timeStamp.Result, handledCount.Result, 0)
: new MessageHeader(messageId.Result, topic.Result, messageType.Result);

if (replyTo.Success)
messageHeader.ReplyTo = replyTo.Result;
if (correlationId.Success)
messageHeader.CorrelationId = correlationId.Result;

if (contentType.Success)
messageHeader.ContentType = contentType.Result;

message = new Message(messageHeader, new MessageBody(sqsMessage.Body));

//deserialize the bag
var bag = ReadMessageBag(sqsMessage);
foreach (var key in bag.Keys)
{
message.Header.Bag.Add(key, bag[key]);
}
if (replyTo.Success)
messageHeader.ReplyTo = replyTo.Result;

if (contentType.Success)
messageHeader.ContentType = contentType.Result;

message = new Message(messageHeader, new MessageBody(sqsMessage.Body));

if(receiptHandle.Success)
message.Header.Bag.Add("ReceiptHandle", ((Amazon.SQS.Model.Message)sqsMessage).ReceiptHandle);
//deserialize the bag
var bag = ReadMessageBag(sqsMessage);
foreach (var key in bag.Keys)
{
message.Header.Bag.Add(key, bag[key]);
}



if(receiptHandle.Success)
message.Header.Bag.Add("ReceiptHandle", ((Amazon.SQS.Model.Message)sqsMessage).ReceiptHandle);
}
catch (Exception e)
{
Expand Down Expand Up @@ -144,7 +131,7 @@ private HeaderResult<MessageType> ReadMessageType(Amazon.SQS.Model.Message sqsMe
return new HeaderResult<MessageType>(messageType, true);
}
}
return new HeaderResult<MessageType>(MessageType.MT_UNACCEPTABLE, false);
return new HeaderResult<MessageType>(MessageType.MT_EVENT, true);
}

private HeaderResult<int> ReadHandledCount(Amazon.SQS.Model.Message sqsMessage)
Expand Down Expand Up @@ -189,7 +176,7 @@ private HeaderResult<Guid> ReadMessageId(Amazon.SQS.Model.Message sqsMessage)
return new HeaderResult<Guid>(messageId, true);
}
}
return new HeaderResult<Guid>(Guid.Empty, false);
return new HeaderResult<Guid>(Guid.Empty, true);
}

private HeaderResult<string> ReadTopic(Amazon.SQS.Model.Message sqsMessage)
Expand All @@ -201,7 +188,7 @@ private HeaderResult<string> ReadTopic(Amazon.SQS.Model.Message sqsMessage)
var topic = arnElements[(int)ARNAmazonSNS.TopicName];
return new HeaderResult<string>(topic, true);
}
return new HeaderResult<string>(String.Empty, false);
return new HeaderResult<string>(String.Empty, true);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
{
public class AzureServiceBusConfiguration
{
public AzureServiceBusConfiguration(string connectionString)
public AzureServiceBusConfiguration(string connectionString, bool ackOnRead = false )
{
ConnectionString = connectionString;
AckOnRead = ackOnRead;
}

public string ConnectionString { get; }

/// <summary>
/// When set to true this will Chanage RecieveMode from ReceiveAndDelete to PeekAndLock
/// </summary>
public bool AckOnRead{ get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ public class AzureServiceBusConsumer : IAmAMessageConsumer
private bool _subscriptionCreated;
private static readonly Lazy<ILog> _logger = new Lazy<ILog>(LogProvider.For<AzureServiceBusConsumer>);
private readonly OnMissingChannel _makeChannel;
private readonly ReceiveMode _receiveMode;

private const string _lockTokenKey = "LockToken";

public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMessageProducer messageProducer, IManagementClientWrapper managementClientWrapper,
IMessageReceiverProvider messageReceiverProvider, int batchSize = 10, OnMissingChannel makeChannels = OnMissingChannel.Create)
IMessageReceiverProvider messageReceiverProvider, int batchSize = 10, ReceiveMode receiveMode = ReceiveMode.ReceiveAndDelete, OnMissingChannel makeChannels = OnMissingChannel.Create)
{
_subscriptionName = subscriptionName;
_topicName = topicName;
Expand All @@ -30,16 +33,17 @@ public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMe
_messageReceiverProvider = messageReceiverProvider;
_batchSize = batchSize;
_makeChannel = makeChannels;
_receiveMode = receiveMode;

GetMessageReceiverProvider();
}

private void GetMessageReceiverProvider()
{
_logger.Value.Info($"Getting message receiver provider for topic {_topicName} and subscription {_subscriptionName}...");
_logger.Value.Info($"Getting message receiver provider for topic {_topicName} and subscription {_subscriptionName} with recieve Mode {_receiveMode}...");
try
{
_messageReceiver = _messageReceiverProvider.Get(_topicName, _subscriptionName, ReceiveMode.ReceiveAndDelete);
_messageReceiver = _messageReceiverProvider.Get(_topicName, _subscriptionName, _receiveMode);
}
catch (Exception e)
{
Expand Down Expand Up @@ -97,7 +101,9 @@ private Message MapToBrighterMessage(IBrokeredMessageWrapper azureServiceBusMess
var messageBody = System.Text.Encoding.Default.GetString(azureServiceBusMessage.MessageBodyValue ?? Array.Empty<byte>());
_logger.Value.Debug($"Received message from topic {_topicName} via subscription {_subscriptionName} with body {messageBody}.");
MessageType messageType = GetMessageType(azureServiceBusMessage);
var message = new Message(new MessageHeader(Guid.NewGuid(), _topicName, messageType), new MessageBody(messageBody));
var headers = new MessageHeader(Guid.NewGuid(), _topicName, messageType);
if(_receiveMode.Equals(ReceiveMode.PeekLock)) headers.Bag.Add(_lockTokenKey, azureServiceBusMessage.LockToken);
var message = new Message(headers, new MessageBody(messageBody));
return message;
}

Expand Down Expand Up @@ -169,7 +175,26 @@ public void Requeue(Message message, int delayMilliseconds)

public void Acknowledge(Message message)
{
//Not implemented as we use ReceiveMode.ReceiveAndDelete (Brighter will call this method anyway)
//Only ACK if ReceiveMode is Peek
if (_receiveMode.Equals(ReceiveMode.PeekLock))
{
try
{
EnsureSubscription();
var lockToken = message.Header.Bag[_lockTokenKey].ToString();

if (string.IsNullOrEmpty(lockToken))
throw new Exception($"LockToken for message with id {message.Id} is null or empty");
_logger.Value.Debug($"Acknowledging Message with Id {message.Id} Lock Token : {lockToken}");

_messageReceiver.Complete(lockToken).Wait();
}
catch(Exception ex)
{
_logger.Value.ErrorException($"Error completing message with id {message.Id}", ex);
throw;
}
}
}

public void Reject(Message message)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers;
using Microsoft.Azure.ServiceBus;
using Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers;

namespace Paramore.Brighter.MessagingGateway.AzureServiceBus
{
Expand All @@ -18,7 +19,7 @@ public IAmAMessageConsumer Create(Subscription subscription)
return new AzureServiceBusConsumer(subscription.RoutingKey, subscription.ChannelName,
new AzureServiceBusMessageProducer(nameSpaceManagerWrapper,
new TopicClientProvider(_configuration)), nameSpaceManagerWrapper,
new MessageReceiverProvider(_configuration));
new MessageReceiverProvider(_configuration), receiveMode: _configuration.AckOnRead ? ReceiveMode.PeekLock : ReceiveMode.ReceiveAndDelete);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ public BrokeredMessageWrapper(Microsoft.Azure.ServiceBus.Message brokeredMessage
public byte[] MessageBodyValue => _brokeredMessage.Body;

public IDictionary<string, object> UserProperties => _brokeredMessage.UserProperties;

public string LockToken => _brokeredMessage.SystemProperties.LockToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public interface IBrokeredMessageWrapper
{
byte[] MessageBodyValue { get; }

IDictionary<string, object> UserProperties { get; }
IDictionary<string, object> UserProperties { get; }

string LockToken { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrap
public interface IMessageReceiverWrapper
{
Task<IEnumerable<IBrokeredMessageWrapper>> Receive(int batchSize, TimeSpan serverWaitTime);

Task Complete(string lockToken);

void Close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public void CreateSubscription(string topicName, string subscriptionName, int ma
{
Logger.Value.Info($"Creating subscription {subscriptionName} for topic {topicName}...");

if (!TopicExists(topicName))
{
CreateTopic(topicName);
}

var subscriptionDescription = new SubscriptionDescription(topicName, subscriptionName)
{
MaxDeliveryCount = maxDeliveryCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public void Close()
Logger.Value.Warn("MessageReceiver connection stopped");
}

public async Task Complete(string lockToken)
{
await _messageReceiver.CompleteAsync(lockToken).ConfigureAwait(false);
}

public bool IsClosedOrClosing => _messageReceiver.IsClosedOrClosing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<PackageTags>awssqs;AMQP;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="4.1.3" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.1.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Paramore.Brighter\Paramore.Brighter.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
<ProjectReference Include="..\Paramore.Brighter\Paramore.Brighter.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.6.2" />
<PackageReference Include="Confluent.Kafka" Version="1.6.3" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.5.4.35" />
<PackageReference Include="AWSSDK.DynamoDBv2" Version="3.5.4.37" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Paramore.Brighter.DynamoDb.Extensions\Paramore.Brighter.DynamoDb.Extensions.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -382,5 +383,35 @@ public void When_a_subscription_does_not_exist_and_Missing_is_set_to_Validate_a_

Assert.Throws<ChannelFailureException>(() => azureServiceBusConsumerValidate.Receive(400));
}

[Fact]
public void When_ackOnRead_is_Set_and_ack_fails_then_exception_is_thrown()
{
var brokeredMessageList = new List<IBrokeredMessageWrapper>();
var message1 = new Mock<IBrokeredMessageWrapper>();
var mockMessageReceiver = new Mock<IMessageReceiverProvider>();

mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ReceiveMode.PeekLock)).Returns(_messageReceiver.Object);

var lockToken = Guid.NewGuid().ToString();

message1.Setup(x => x.MessageBodyValue).Returns((byte[])null);
message1.Setup(m => m.UserProperties).Returns(new Dictionary<string, object>() { { "MessageType", "MT_EVENT" } });
message1.Setup(m => m.LockToken).Returns(lockToken);

brokeredMessageList.Add(message1.Object);

_messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult<IEnumerable<IBrokeredMessageWrapper>>(brokeredMessageList));
_messageReceiver.Setup(x => x.Complete(lockToken)).Throws(new Exception());

var azureServiceBusConsumer = new AzureServiceBusConsumer("topic", "subscription", _mockMessageProducer.Object,
_nameSpaceManagerWrapper.Object, mockMessageReceiver.Object, makeChannels: OnMissingChannel.Create, receiveMode: ReceiveMode.PeekLock);

Message[] result = azureServiceBusConsumer.Receive(400);

var msg = result.First();

Assert.Throws<Exception>(() => azureServiceBusConsumer.Acknowledge(msg));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="FluentAssertions" Version="5.10.3" />
<PackageReference Include="FakeItEasy" Version="6.2.1" />
<PackageReference Include="FakeItEasy" Version="7.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="Serilog" Version="2.10.0" />
Expand Down

0 comments on commit 4b3a6e5

Please sign in to comment.