Skip to content

Commit

Permalink
Ensuring "Path" returns the right destination in the case of via-send…
Browse files Browse the repository at this point in the history
…er (#6941)

Fixing #6031 
In the case of Via-Sender defined by 
`viaSender = new MessageSender(connection, "path", "via")`,

1. Path will now point to wherever the amqp-link is created to.
2. viaEntityPath will point to via entity.
3. TransferDestinationPath will point to the final destination of the message
  • Loading branch information
nemakam authored Jul 17, 2019
1 parent b19a86c commit 9d63b5a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 14 deletions.
36 changes: 22 additions & 14 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class MessageSender : ClientEntity, IMessageSender
readonly ActiveClientLinkManager clientLinkManager;
readonly ServiceBusDiagnosticSource diagnosticSource;
readonly bool isViaSender;
readonly string transferDestinationPath;

/// <summary>
/// Creates a new AMQP MessageSender.
Expand Down Expand Up @@ -152,7 +151,7 @@ internal MessageSender(

this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.Path = entityPath;
this.TransferDestinationPath = transferDestinationPath;
this.SendingLinkDestination = entityPath;
this.EntityType = entityType;
this.ServiceBusConnection.ThrowIfClosed();

Expand All @@ -177,7 +176,8 @@ internal MessageSender(
if (!string.IsNullOrWhiteSpace(transferDestinationPath))
{
this.isViaSender = true;
this.transferDestinationPath = transferDestinationPath;
this.TransferDestinationPath = transferDestinationPath;
this.ViaEntityPath = entityPath;
}

MessagingEventSource.Log.MessageSenderCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.ClientId);
Expand All @@ -190,15 +190,21 @@ internal MessageSender(
public override IList<ServiceBusPlugin> RegisteredPlugins { get; } = new List<ServiceBusPlugin>();

/// <summary>
/// Gets the entity path of the MessageSender.
/// Gets the entity path of the MessageSender.
/// In the case of a via-sender, this returns the path of the via entity.
/// </summary>
public override string Path { get; }

/// <summary>
/// Gets the transfer destination path (send-via) of the MessageSender.
/// In the case of a via-sender, gets the final destination path of the messages; null otherwise.
/// </summary>
public string TransferDestinationPath { get; }

/// <summary>
/// In the case of a via-sender, the message is sent to <see cref="TransferDestinationPath"/> via <see cref="ViaEntityPath"/>; null otherwise.
/// </summary>
public string ViaEntityPath { get; }

/// <summary>
/// Duration after which individual operations will timeout.
/// </summary>
Expand All @@ -215,6 +221,8 @@ public override TimeSpan OperationTimeout

internal MessagingEntityType? EntityType { get; }

internal string SendingLinkDestination { get; set; }

ICbsTokenProvider CbsTokenProvider { get; }

FaultTolerantAmqpObject<SendingAmqpLink> SendLinkManager { get; }
Expand Down Expand Up @@ -672,36 +680,36 @@ async Task OnCancelScheduledMessageAsync(long sequenceNumber)

async Task<SendingAmqpLink> CreateLinkAsync(TimeSpan timeout)
{
MessagingEventSource.Log.AmqpSendLinkCreateStart(this.ClientId, this.EntityType, this.Path);
MessagingEventSource.Log.AmqpSendLinkCreateStart(this.ClientId, this.EntityType, this.SendingLinkDestination);

var amqpLinkSettings = new AmqpLinkSettings
{
Role = false,
InitialDeliveryCount = 0,
Target = new Target { Address = this.Path },
Target = new Target { Address = this.SendingLinkDestination },
Source = new Source { Address = this.ClientId },
};
if (this.EntityType != null)
{
amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, (int)this.EntityType);
}

var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.Path);
var endpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.SendingLinkDestination);

string[] audience;
if (this.isViaSender)
{
var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.transferDestinationPath);
var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.TransferDestinationPath);
audience = new string[] { endpointUri.AbsoluteUri, transferDestinationEndpointUri.AbsoluteUri };
amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.transferDestinationPath);
amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.TransferDestinationPath);
}
else
{
audience = new string[] { endpointUri.AbsoluteUri };
}

string[] claims = {ClaimConstants.Send};
var amqpSendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.Path, this.ServiceBusConnection, endpointUri, audience, claims, this.CbsTokenProvider, amqpLinkSettings, this.ClientId);
var amqpSendReceiveLinkCreator = new AmqpSendReceiveLinkCreator(this.SendingLinkDestination, this.ServiceBusConnection, endpointUri, audience, claims, this.CbsTokenProvider, amqpLinkSettings, this.ClientId);
Tuple<AmqpObject, DateTime> linkDetails = await amqpSendReceiveLinkCreator.CreateAndOpenAmqpLinkAsync().ConfigureAwait(false);

var sendingAmqpLink = (SendingAmqpLink) linkDetails.Item1;
Expand All @@ -720,7 +728,7 @@ async Task<SendingAmqpLink> CreateLinkAsync(TimeSpan timeout)

async Task<RequestResponseAmqpLink> CreateRequestResponseLinkAsync(TimeSpan timeout)
{
var entityPath = this.Path + '/' + AmqpClientConstants.ManagementAddress;
var entityPath = this.SendingLinkDestination + '/' + AmqpClientConstants.ManagementAddress;
var amqpLinkSettings = new AmqpLinkSettings();
amqpLinkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);

Expand All @@ -729,9 +737,9 @@ async Task<RequestResponseAmqpLink> CreateRequestResponseLinkAsync(TimeSpan time
string[] audience;
if (this.isViaSender)
{
var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.transferDestinationPath);
var transferDestinationEndpointUri = new Uri(this.ServiceBusConnection.Endpoint, this.TransferDestinationPath);
audience = new string[] { endpointUri.AbsoluteUri, transferDestinationEndpointUri.AbsoluteUri };
amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.transferDestinationPath);
amqpLinkSettings.AddProperty(AmqpClientConstants.TransferDestinationAddress, this.TransferDestinationPath);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ namespace Microsoft.Azure.ServiceBus.Core
public override System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public override Microsoft.Azure.ServiceBus.ServiceBusConnection ServiceBusConnection { get; }
public string TransferDestinationPath { get; }
public string ViaEntityPath { get; }
public System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber) { }
protected override System.Threading.Tasks.Task OnClosingAsync() { }
public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.ServiceBus.UnitTests
{
using Core;
using Xunit;

public class MessageSenderTests
{
private MessageSender viaSender;
private MessageSender nonViaSender;

public MessageSenderTests()
{
var builder = new ServiceBusConnectionStringBuilder("blah.com", "path", "key-name", "key-value");
var connection = new ServiceBusConnection(builder);
viaSender = new MessageSender(connection, "path", "via");
nonViaSender = new MessageSender(connection, "path");
}

[Fact]
[DisplayTestMethodName]
public void Path_reflects_actual_link_destination()
{
Assert.Equal("via", viaSender.Path);
Assert.Equal("path", nonViaSender.Path);
}

[Fact]
[DisplayTestMethodName]
public void TransferDestinationPath_should_be_final_destination_name()
{
Assert.Equal("path", viaSender.TransferDestinationPath);
Assert.Null(nonViaSender.TransferDestinationPath);
}

[Fact]
[DisplayTestMethodName]
public void ViaEntityPath_should_be_via_entity_name()
{
Assert.Equal("via", viaSender.ViaEntityPath);
Assert.Null(nonViaSender.ViaEntityPath);
}
}
}

0 comments on commit 9d63b5a

Please sign in to comment.