From 9d63b5ae69d5ae6739c6ec27d3eea884e0aa97d8 Mon Sep 17 00:00:00 2001 From: Neeraj Makam Date: Tue, 16 Jul 2019 17:40:02 -0700 Subject: [PATCH] Ensuring "Path" returns the right destination in the case of via-sender (#6941) Fixing #6031 In the case of Via-Sender defined by `viaSender = new MessageSender(connection, "path", "via")`, 1. Path will now point to wherever the amqp-link is created to. 2. viaEntityPath will point to via entity. 3. TransferDestinationPath will point to the final destination of the message --- .../src/Core/MessageSender.cs | 36 +++++++++------ ...rovals.ApproveAzureServiceBus.approved.txt | 1 + .../tests/MessageSenderTests.cs | 46 +++++++++++++++++++ 3 files changed, 69 insertions(+), 14 deletions(-) create mode 100644 sdk/servicebus/Microsoft.Azure.ServiceBus/tests/MessageSenderTests.cs diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageSender.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageSender.cs index 64dcf7a0c0e94..3e7ddcda5e748 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageSender.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageSender.cs @@ -40,7 +40,6 @@ public class MessageSender : ClientEntity, IMessageSender readonly ActiveClientLinkManager clientLinkManager; readonly ServiceBusDiagnosticSource diagnosticSource; readonly bool isViaSender; - readonly string transferDestinationPath; /// /// Creates a new AMQP MessageSender. @@ -152,7 +151,7 @@ internal MessageSender( this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection)); this.Path = entityPath; - this.TransferDestinationPath = transferDestinationPath; + this.SendingLinkDestination = entityPath; this.EntityType = entityType; this.ServiceBusConnection.ThrowIfClosed(); @@ -177,7 +176,8 @@ internal MessageSender( if (!string.IsNullOrWhiteSpace(transferDestinationPath)) { this.isViaSender = true; - this.transferDestinationPath = transferDestinationPath; + this.TransferDestinationPath = transferDestinationPath; + this.ViaEntityPath = entityPath; } MessagingEventSource.Log.MessageSenderCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.ClientId); @@ -190,15 +190,21 @@ internal MessageSender( public override IList RegisteredPlugins { get; } = new List(); /// - /// Gets the entity path of the MessageSender. + /// Gets the entity path of the MessageSender. + /// In the case of a via-sender, this returns the path of the via entity. /// public override string Path { get; } /// - /// Gets the transfer destination path (send-via) of the MessageSender. + /// In the case of a via-sender, gets the final destination path of the messages; null otherwise. /// public string TransferDestinationPath { get; } + /// + /// In the case of a via-sender, the message is sent to via ; null otherwise. + /// + public string ViaEntityPath { get; } + /// /// Duration after which individual operations will timeout. /// @@ -215,6 +221,8 @@ public override TimeSpan OperationTimeout internal MessagingEntityType? EntityType { get; } + internal string SendingLinkDestination { get; set; } + ICbsTokenProvider CbsTokenProvider { get; } FaultTolerantAmqpObject SendLinkManager { get; } @@ -672,13 +680,13 @@ async Task OnCancelScheduledMessageAsync(long sequenceNumber) async Task CreateLinkAsync(TimeSpan timeout) { - MessagingEventSource.Log.AmqpSendLinkCreateStart(this.ClientId, this.EntityType, this.Path); + MessagingEventSource.Log.AmqpSendLinkCreateStart(this.ClientId, this.EntityType, this.SendingLinkDestination); var amqpLinkSettings = new AmqpLinkSettings { Role = false, InitialDeliveryCount = 0, - Target = new Target { Address = this.Path }, + Target = new Target { Address = this.SendingLinkDestination }, Source = new Source { Address = this.ClientId }, }; if (this.EntityType != null) @@ -686,14 +694,14 @@ async Task CreateLinkAsync(TimeSpan timeout) amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, (int)this.EntityType); } - var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.Path); + var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.SendingLinkDestination); string[] audience; if (this.isViaSender) { - var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.transferDestinationPath); + var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.TransferDestinationPath); audience = new string[] { endpointUri.AbsoluteUri, transferDestinationEndpointUri.AbsoluteUri }; - amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.transferDestinationPath); + amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.TransferDestinationPath); } else { @@ -701,7 +709,7 @@ async Task CreateLinkAsync(TimeSpan timeout) } string[] claims = {ClaimConstants.Send}; - var amqpSendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.Path, this.ServiceBusConnection, endpointUri, audience, claims, this.CbsTokenProvider, amqpLinkSettings, this.ClientId); + var amqpSendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.SendingLinkDestination, this.ServiceBusConnection, endpointUri, audience, claims, this.CbsTokenProvider, amqpLinkSettings, this.ClientId); Tuple linkDetails = await amqpSendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false); var sendingAmqpLink = (SendingAmqpLink) linkDetails.Item1; @@ -720,7 +728,7 @@ async Task CreateLinkAsync(TimeSpan timeout) async Task CreateRequestResponseLinkAsync(TimeSpan timeout) { - var entityPath = this.Path + '/' + AmqpClientConstants.ManagementAddress; + var entityPath = this.SendingLinkDestination + '/' + AmqpClientConstants.ManagementAddress; var amqpLinkSettings = new AmqpLinkSettings(); amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement); @@ -729,9 +737,9 @@ async Task CreateRequestResponseLinkAsync(TimeSpan time string[] audience; if (this.isViaSender) { - var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.transferDestinationPath); + var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.TransferDestinationPath); audience = new string[] { endpointUri.AbsoluteUri, transferDestinationEndpointUri.AbsoluteUri }; - amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.transferDestinationPath); + amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.TransferDestinationPath); } else { diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index a4b2947ea5a12..c1b51775d8fee 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -593,6 +593,7 @@ namespace Microsoft.Azure.ServiceBus.Core public override System.Collections.Generic.IList RegisteredPlugins { get; } public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; } public string TransferDestinationPath { get; } + public string ViaEntityPath { get; } public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { } protected override System.Threading.Tasks.Task OnClosingAsync() { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/MessageSenderTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/MessageSenderTests.cs new file mode 100644 index 0000000000000..0e02d41455222 --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/MessageSenderTests.cs @@ -0,0 +1,46 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Microsoft.Azure.ServiceBus.UnitTests +{ + using Core; + using Xunit; + + public class MessageSenderTests + { + private MessageSender viaSender; + private MessageSender nonViaSender; + + public MessageSenderTests() + { + var builder = new ServiceBusConnectionStringBuilder("blah.com", "path", "key-name", "key-value"); + var connection = new ServiceBusConnection(builder); + viaSender = new MessageSender(connection, "path", "via"); + nonViaSender = new MessageSender(connection, "path"); + } + + [Fact] + [DisplayTestMethodName] + public void Path_reflects_actual_link_destination() + { + Assert.Equal("via", viaSender.Path); + Assert.Equal("path", nonViaSender.Path); + } + + [Fact] + [DisplayTestMethodName] + public void TransferDestinationPath_should_be_final_destination_name() + { + Assert.Equal("path", viaSender.TransferDestinationPath); + Assert.Null(nonViaSender.TransferDestinationPath); + } + + [Fact] + [DisplayTestMethodName] + public void ViaEntityPath_should_be_via_entity_name() + { + Assert.Equal("via", viaSender.ViaEntityPath); + Assert.Null(nonViaSender.ViaEntityPath); + } + } +}