Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fall back to management link when settling non-session message #37704

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ await receiver.CompleteInternalAsync(
/// </summary>
///
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
/// <param name="timeout"></param>
/// <param name="timeout">The timeout for the operation.</param>
private async Task CompleteInternalAsync(
Guid lockToken,
TimeSpan timeout)
Expand All @@ -460,20 +460,28 @@ await DisposeMessageRequestResponseAsync(
SessionId).ConfigureAwait(false);
return;
}
await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, DispositionStatus.Completed, timeout).ConfigureAwait(false);
}

/// <summary>
/// Settles a <see cref="ServiceBusReceivedMessage"/> using a lock token.
/// </summary>
///
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
/// <param name="outcome"></param>
/// <param name="timeout"></param>
/// <param name="outcome">The outcome of the message - used when disposing over receive link.</param>
/// <param name="disposition">The disposition of the message - used when disposing over the management link.</param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="propertiesToModify">Properties to modify when deadlettering, deferring, or abandoning.</param>
/// <param name="deadLetterReason">Dead letter reason. Only valid when deadlettering.</param>
/// <param name="deadLetterDescription">Dead letter description. Only valid when deadlettering.</param>
private async Task DisposeMessageAsync(
Guid lockToken,
Outcome outcome,
TimeSpan timeout)
DispositionStatus disposition,
TimeSpan timeout,
IDictionary<string, object> propertiesToModify = null,
string deadLetterReason = null,
string deadLetterDescription = null)
{
byte[] bufferForLockToken = ArrayPool<byte>.Shared.Rent(SizeOfGuidInBytes);
GuidUtilities.WriteGuidToBuffer(lockToken, bufferForLockToken.AsSpan(0, SizeOfGuidInBytes));
Expand Down Expand Up @@ -508,7 +516,21 @@ private async Task DisposeMessageAsync(
{
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
ThrowLockLostException();
if (_isSessionReceiver)
{
ThrowLockLostException();
}

// The message was not found on the link which can occur as a result of a reconnect.
// Attempt to settle the message over the management link.
await DisposeMessageRequestResponseAsync(
lockToken,
timeout,
disposition,
propertiesToModify: propertiesToModify,
deadLetterReason: deadLetterReason,
deadLetterDescription: deadLetterDescription).ConfigureAwait(false);
return;
}

throw error.ToMessagingContractException();
Expand Down Expand Up @@ -587,7 +609,7 @@ await receiver.DeferInternalAsync(
/// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
///
/// <param name="lockToken">The lock token of the <see cref="ServiceBusMessage" />.</param>
/// <param name="timeout"></param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param>
///
private Task DeferInternalAsync(
Expand All @@ -605,7 +627,12 @@ private Task DeferInternalAsync(
SessionId,
propertiesToModify);
}
return DisposeMessageAsync(lockToken, GetDeferOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(
lockToken,
GetDeferOutcome(propertiesToModify),
DispositionStatus.Defered,
timeout,
propertiesToModify: propertiesToModify);
}

/// <summary>
Expand Down Expand Up @@ -645,7 +672,7 @@ await receiver.AbandonInternalAsync(
/// </summary>
///
/// <param name="lockToken">The lock token of the corresponding message to abandon.</param>
/// <param name="timeout"></param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</param>
private Task AbandonInternalAsync(
Guid lockToken,
Expand All @@ -662,7 +689,12 @@ private Task AbandonInternalAsync(
SessionId,
propertiesToModify);
}
return DisposeMessageAsync(lockToken, GetAbandonOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(
lockToken,
GetAbandonOutcome(propertiesToModify),
DispositionStatus.Abandoned,
timeout,
propertiesToModify: propertiesToModify);
}

/// <summary>
Expand Down Expand Up @@ -710,7 +742,7 @@ await receiver.DeadLetterInternalAsync(
/// </summary>
///
/// <param name="lockToken">The lock token of the corresponding message to dead-letter.</param>
/// <param name="timeout"></param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="propertiesToModify">The properties of the message to modify while moving to subqueue.</param>
/// <param name="deadLetterReason">The reason for dead-lettering the message.</param>
/// <param name="deadLetterErrorDescription">The error description for dead-lettering the message.</param>
Expand Down Expand Up @@ -740,7 +772,11 @@ internal virtual Task DeadLetterInternalAsync(
return DisposeMessageAsync(
lockToken,
GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription),
timeout);
DispositionStatus.Suspended,
timeout,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription);
}

private static Rejected GetRejectedOutcome(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,106 @@ public async Task PeekSingleMessage()
}
}

[Test]
public async Task CanRenewWithSeparateReceiver()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
ServiceBusSender sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());
var receiver1 = client.CreateReceiver(scope.QueueName);
var message1 = await receiver1.ReceiveMessageAsync();
await receiver1.RenewMessageLockAsync(message1);

var receiver2 = client.CreateReceiver(scope.QueueName);
await receiver2.RenewMessageLockAsync(message1);
await receiver2.CompleteMessageAsync(message1);
}
}

[Test]
public async Task CanCompleteAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);

await receiver.CompleteMessageAsync(message);
}
}

[Test]
public async Task CanAbandonAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);

await receiver.AbandonMessageAsync(message, new Dictionary<string, object>{{ "test key", "test value" }});
message = await receiver.ReceiveMessageAsync();
Assert.AreEqual("test value", message.ApplicationProperties["test key"]);
}
}

[Test]
public async Task CanDeferAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);

await receiver.DeferMessageAsync(message, new Dictionary<string, object>{{ "test key", "test value" }});
message = await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber);
Assert.AreEqual("test value", message.ApplicationProperties["test key"]);
}
}

[Test]
public async Task CanDeadLetterAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
var receiver = client.CreateReceiver(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage());

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);

await receiver.DeadLetterMessageAsync(message, new Dictionary<string, object>{{ "test key", "test value" }}, "test reason", "test description");

var dlqReceiver = client.CreateReceiver(scope.QueueName, new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
var dlqMessage = await dlqReceiver.ReceiveMessageAsync();
Assert.AreEqual("test reason", dlqMessage.DeadLetterReason);
Assert.AreEqual("test description", dlqMessage.DeadLetterErrorDescription);
Assert.AreEqual("test value", dlqMessage.ApplicationProperties["test key"]);
}
}

[Test]
public async Task PeekMessagesWithACustomIdentifier()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,5 +1064,85 @@ public async Task OpenSessionIsNotClosedWhenAcceptNextSessionTimesOut(bool enabl
Assert.IsNotNull(message);
}
}

[Test]
public async Task CannotCompleteAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
var receiver = await client.AcceptNextSessionAsync(scope.QueueName);

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);
Assert.That(
async () => await receiver.CompleteMessageAsync(message),
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}

[Test]
public async Task CanAbandonAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
var receiver = await client.AcceptNextSessionAsync(scope.QueueName);

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);
Assert.That(
async () => await receiver.AbandonMessageAsync(message),
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}

[Test]
public async Task CannotDeferAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
var receiver = await client.AcceptNextSessionAsync(scope.QueueName);

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);
Assert.That(
async () => await receiver.DeferMessageAsync(message),
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}

[Test]
public async Task CannotDeadLetterAfterLinkReconnect()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("session"));
var receiver = await client.AcceptNextSessionAsync(scope.QueueName);

var message = await receiver.ReceiveMessageAsync();

SimulateNetworkFailure(client);
Assert.That(
async () => await receiver.DeadLetterMessageAsync(message),
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.SessionLockLost));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,7 @@ public async Task TransactionThrowsWhenOperationsOfDifferentPartitionsAreInSameT

await receiver.CompleteMessageAsync(receivedMessage1);

// the service seems to abandon the message that
// triggered the InvalidOperationException
// in the transaction
Assert.That(
async () =>
await receiver.CompleteMessageAsync(receivedMessage2), Throws.InstanceOf<ServiceBusException>()
.And.Property(nameof(ServiceBusException.Reason))
.EqualTo(ServiceBusFailureReason.MessageLockLost));
await receiver.CompleteMessageAsync(receivedMessage2);
m-redding marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down