Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sb cross receiver samples #39514

Merged
merged 7 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8B8C
samples\Sample12_ManagingRules.md = samples\Sample12_ManagingRules.md
samples\Sample13_AdvancedConfiguration.md = samples\Sample13_AdvancedConfiguration.md
samples\Sample14_AMQPMessage.md = samples\Sample14_AMQPMessage.md
samples\Sample15_MockingClientTypes.md = samples\Sample15_MockingClientTypes.md
samples\Sample16_CrossReceiverMessageSettlement.md = samples\Sample16_CrossReceiverMessageSettlement.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.Amqp", "..\..\core\Azure.Core.Amqp\src\Azure.Core.Amqp.csproj", "{2ADA26CA-77E5-4793-927A-A6185FD8AA29}"
Expand Down
2 changes: 2 additions & 0 deletions sdk/servicebus/Azure.Messaging.ServiceBus/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ description: Samples for the Azure.Messaging.ServiceBus client library
- [Managing rules](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample12_ManagingRules.md)
- [Advanced configuration](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample13_AdvancedConfiguration.md)
- [Interact with the AMQP message](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample14_AMQPMessage.md)
- [Mocking Client Types](https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample15_MockingClientTypes.md)
- [Cross-receiver message settlement]
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Cross Receiver Message Settlement

The message settlement APIs on the `ServiceBusReceiver` require passing in the
`ServiceBusReceivedMessage`. This can be limiting for scenarios in which the message needs to be persisted and then
settled across process boundaries. There are two strategies that can be used for settling a message with a different
receiver. The recommended strategy depends on the specific application scenario.

## Storing the entire `ServiceBusReceivedMessage`

If it is necessary to store the entire message and then rehydrate it in another process, use the following strategy.
First we can get the raw AMQP message bytes and lock token as shown below:
*Note: The lock token is not
actually part of the AMQP message so it needs to stored separately from the AMQP bytes.*

```C# Snippet:ServiceBusWriteReceivedMessage
var client1 = new ServiceBusClient(connectionString);
ServiceBusSender sender = client1.CreateSender(queueName);

var message = new ServiceBusMessage("some message");
await sender.SendMessageAsync(message);

ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync();
ReadOnlyMemory<byte> amqpMessageBytes = receivedMessage.GetRawAmqpMessage().ToBytes().ToMemory();
ReadOnlyMemory<byte> lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray();
```

In order to rehydrate the message in another process, we would do the following:

```C# Snippet:ServiceBusReadReceivedMessage
AmqpAnnotatedMessage amqpMessage = AmqpAnnotatedMessage.FromBytes(new BinaryData(amqpMessageBytes));
ServiceBusReceivedMessage rehydratedMessage = ServiceBusReceivedMessage.FromAmqpMessage(amqpMessage, new BinaryData(lockTokenBytes));

var client2 = new ServiceBusClient(connectionString);
ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName);
await receiver2.CompleteMessageAsync(rehydratedMessage);
```

## Storing only the lock token

If the entire message is not needed when settling the message in a different process, you can simply preserve the
lock token. In the example below, we store off the lock token using its GUID bytes. You can also simply store a
string if that is easier for your scenario.

```C# Snippet:ServiceBusWriteReceivedMessageLockToken
var client1 = new ServiceBusClient(connectionString);
ServiceBusSender sender = client1.CreateSender(queueName);

var message = new ServiceBusMessage("some message");
await sender.SendMessageAsync(message);

ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync();
ReadOnlyMemory<byte> lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray();
```

In order to rehydrate the message in another process using the lock token, we would do the following:
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
*Note: Because we only stored the lock token, when we rehydrate the message all of the properties of the
`ServiceBusReceivedMessage` other than `LockToken` will have default values.*

```C# Snippet:ServiceBusReadReceivedMessageLockToken
ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.ToArray()));

var client2 = new ServiceBusClient(connectionString);
ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName);
await receiver2.CompleteMessageAsync(rehydratedMessage);
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Core.Amqp;
using NUnit.Framework;

namespace Azure.Messaging.ServiceBus.Tests.Samples
{
public class Sample16_CrossReceiverMessageSettlement : ServiceBusLiveTestBase
{
[Test]
public async Task RehydrateReceivedMessageUsingRawBytes()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
#if SNIPPET
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
#else
string connectionString = TestEnvironment.ServiceBusConnectionString;
string queueName = scope.QueueName;
#endif

#region Snippet:ServiceBusWriteReceivedMessage

var client1 = new ServiceBusClient(connectionString);
ServiceBusSender sender = client1.CreateSender(queueName);

var message = new ServiceBusMessage("some message");
await sender.SendMessageAsync(message);

ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync();
ReadOnlyMemory<byte> amqpMessageBytes = receivedMessage.GetRawAmqpMessage().ToBytes().ToMemory();
ReadOnlyMemory<byte> lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray();
#endregion

#region Snippet:ServiceBusReadReceivedMessage
AmqpAnnotatedMessage amqpMessage = AmqpAnnotatedMessage.FromBytes(new BinaryData(amqpMessageBytes));
ServiceBusReceivedMessage rehydratedMessage = ServiceBusReceivedMessage.FromAmqpMessage(amqpMessage, new BinaryData(lockTokenBytes));

var client2 = new ServiceBusClient(connectionString);
ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName);
await receiver2.CompleteMessageAsync(rehydratedMessage);
#endregion

Assert.AreEqual("some message", rehydratedMessage.Body.ToString());
}
}

[Test]
public async Task RehydrateReceivedMessageUsingLockToken()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
#if SNIPPET
string connectionString = "<connection_string>";
string queueName = "<queue_name>";
#else
string connectionString = TestEnvironment.ServiceBusConnectionString;
string queueName = scope.QueueName;
#endif

#region Snippet:ServiceBusWriteReceivedMessageLockToken

var client1 = new ServiceBusClient(connectionString);
ServiceBusSender sender = client1.CreateSender(queueName);

var message = new ServiceBusMessage("some message");
await sender.SendMessageAsync(message);

ServiceBusReceiver receiver1 = client1.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = await receiver1.ReceiveMessageAsync();
ReadOnlyMemory<byte> lockTokenBytes = Guid.Parse(receivedMessage.LockToken).ToByteArray();
#endregion

#region Snippet:ServiceBusReadReceivedMessageLockToken

ServiceBusReceivedMessage rehydratedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: new Guid(lockTokenBytes.ToArray()));

var client2 = new ServiceBusClient(connectionString);
ServiceBusReceiver receiver2 = client2.CreateReceiver(queueName);
await receiver2.CompleteMessageAsync(rehydratedMessage);
#endregion
}
}
}
}