Skip to content

Commit

Permalink
Change SendSettleMode for amqp Events Link to Unsettled (#167)
Browse files Browse the repository at this point in the history
* Fix SendSettleMode for amqp Events Link

* Fix test

* Add correct error message

* Choose right Settle modes for each link

* Fix test and build

* Fix test
  • Loading branch information
varunpuranik authored and myagley committed Aug 18, 2018
1 parent 2590d7e commit 93f13b8
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Amqp
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Devices.Common.Exceptions;
using Microsoft.Azure.Devices.Edge.Hub.Core;
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;

Expand Down Expand Up @@ -34,13 +35,18 @@ static EdgeHubAmqpException GetEdgeHubAmqpException(Exception exception)
{
return edgeHubAmqpException;
}

var asUnAuthedException = exception.UnwindAs<UnauthorizedAccessException>();
if (asUnAuthedException != null)
else if (exception.UnwindAs<UnauthorizedAccessException>() != null)
{
return new EdgeHubAmqpException("Unauthorized access", ErrorCode.IotHubUnauthorizedAccess, exception);
}

else if (exception is EdgeHubMessageTooLargeException)
{
return new EdgeHubAmqpException(exception.Message, ErrorCode.MessageTooLarge);
}
else if (exception is InvalidOperationException)
{
return new EdgeHubAmqpException("Invalid action performed", ErrorCode.InvalidOperation);
}
return new EdgeHubAmqpException("Encountered server error", ErrorCode.ServerError, exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public EventsLinkHandler(IReceivingAmqpLink link, Uri requestUri, IDictionary<st

public override LinkType Type => LinkType.Events;

protected override QualityOfService QualityOfService => QualityOfService.AtLeastOnce;

protected override async Task OnMessageReceived(AmqpMessage amqpMessage)
{
IList<AmqpMessage> messages = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public MethodReceivingLinkHandler(IReceivingAmqpLink link, Uri requestUri, IDict

public override LinkType Type => LinkType.MethodReceiving;

protected override QualityOfService QualityOfService => QualityOfService.AtMostOnce;

public override string CorrelationId =>
AmqpConnectionUtils.GetCorrelationId(this.Link);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,33 @@ protected ReceivingLinkHandler(IReceivingAmqpLink link, Uri requestUri, IDiction

protected IReceivingAmqpLink ReceivingLink { get; }

protected abstract QualityOfService QualityOfService { get; }

protected override Task OnOpenAsync(TimeSpan timeout)
{
// TODO: Check if these can be set to null to use defaults and avoid bytes on the wire
// The Receiver will spontaneously settle all incoming transfers.
this.ReceivingLink.Settings.RcvSettleMode = (byte)ReceiverSettleMode.First;
// The Sender will send all deliveries settled to the receiver.
this.ReceivingLink.Settings.SndSettleMode = (byte)SenderSettleMode.Settled;
switch (this.QualityOfService)
{
case QualityOfService.ExactlyOnce:
// The receiver will only settle after sending the disposition to the sender and receiving a disposition indicating settlement of the delivery from the sender.
this.ReceivingLink.Settings.RcvSettleMode = (byte)ReceiverSettleMode.Second;
// SenderSettleMode.Unsettled (null as it is the default and to avoid bytes on the wire)
this.ReceivingLink.Settings.SndSettleMode = null;
break;

case QualityOfService.AtLeastOnce:
// The Receiver will spontaneously settle all incoming transfers.
this.ReceivingLink.Settings.RcvSettleMode = null;// Default ReceiverSettleMode.First;
// The Sender will send all deliveries unsettled to the receiver.
this.ReceivingLink.Settings.SndSettleMode = null; // Default SenderSettleMode.Unettled;
break;

case QualityOfService.AtMostOnce:
// The Receiver will spontaneously settle all incoming transfers.
this.ReceivingLink.Settings.RcvSettleMode = null;// Default ReceiverSettleMode.First;
// The Sender will send all deliveries unsettled to the receiver.
this.ReceivingLink.Settings.SndSettleMode = (byte)SenderSettleMode.Settled;
break;
}

this.ReceivingLink.RegisterMessageListener(m => this.sendMessageProcessor.Post(m));
this.ReceivingLink.SafeAddClosed((s, e) => this.OnReceiveLinkClosed()
Expand Down Expand Up @@ -63,12 +83,15 @@ internal async Task ProcessMessageAsync(AmqpMessage amqpMessage)
try
{
await this.OnMessageReceived(amqpMessage);
((IReceivingAmqpLink)this.Link).DisposeMessage(amqpMessage, AmqpConstants.AcceptedOutcome, true, true);
amqpMessage.Dispose();
}
catch (Exception e) when (!e.IsFatal())
{
Events.ErrorProcessingMessage(e, this);
((IReceivingAmqpLink)this.Link).DisposeMessage(amqpMessage, AmqpConstants.RejectedOutcome, true, true);
}
finally
{
amqpMessage.Dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public TwinReceivingLinkHandler(IReceivingAmqpLink link, Uri requestUri, IDictio

public override LinkType Type => LinkType.TwinReceiving;

protected override QualityOfService QualityOfService => QualityOfService.AtMostOnce;

public override string CorrelationId =>
AmqpConnectionUtils.GetCorrelationId(this.Link);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Azure.Devices.Edge.Hub.Core
{
using System;

public class EdgeHubMessageTooLargeException : Exception
{
public EdgeHubMessageTooLargeException(string message)
: base(message)
{ }

public EdgeHubMessageTooLargeException(string message, Exception innerException)
: base(message, innerException)
{ }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using App.Metrics;
using App.Metrics.Counter;
Expand Down Expand Up @@ -105,7 +103,7 @@ static void ValidateMessageSize(IRoutingMessage messageToBeValidated)
long messageSize = messageToBeValidated.Size();
if (messageSize > MaxMessageSize)
{
throw new InvalidOperationException($"Message size exceeds maximum allowed size: got {messageSize}, limit {MaxMessageSize}");
throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {MaxMessageSize} bytes allowed");
}
}

Expand Down Expand Up @@ -302,10 +300,7 @@ enum EventIds
ErrorRemovingSubscription,
ErrorAddingSubscription,
AddingSubscription,
RemovingSubscription,
InvokingMethod,
NoSubscription,
ClientNotFound
RemovingSubscription
}

public static void MethodCallReceived(string fromId, string toId, string correlationId)
Expand Down Expand Up @@ -362,21 +357,6 @@ public static void RemovingSubscription(string id, DeviceSubscription subscripti
{
Log.LogDebug((int)EventIds.RemovingSubscription, Invariant($"Removing subscription {subscription} for client {id}."));
}

public static void InvokingMethod(DirectMethodRequest methodRequest)
{
Log.LogDebug((int)EventIds.InvokingMethod, Invariant($"Invoking method {methodRequest.Name} on client {methodRequest.Id}."));
}

public static void NoSubscriptionForMethodInvocation(DirectMethodRequest methodRequest)
{
Log.LogWarning((int)EventIds.NoSubscription, Invariant($"Unable to invoke method {methodRequest.Name} on client {methodRequest.Id} because no subscription for methods for found."));
}

public static void NoDeviceProxyForMethodInvocation(DirectMethodRequest methodRequest)
{
Log.LogWarning((int)EventIds.ClientNotFound, Invariant($"Unable to invoke method {methodRequest.Name} as client {methodRequest.Id} is not connected."));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public TestReceivingLinkHandler(IReceivingAmqpLink link, Uri requestUri, IDictio

public override LinkType Type => LinkType.Events;

protected override QualityOfService QualityOfService => QualityOfService.AtLeastOnce;

public IList<AmqpMessage> ReceivedMessages { get; } = new List<AmqpMessage>();

protected override Task OnMessageReceived(AmqpMessage amqpMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,18 @@ public async Task EdgeHubChecksMessageSize()

var routingEdgeHub = new RoutingEdgeHub(router, messageConverter, connectionManager, twinManager, "testEdgeDevice", Mock.Of<IInvokeMethodHandler>());

await Assert.ThrowsAsync<InvalidOperationException>(() => routingEdgeHub.ProcessDeviceMessage(identity.Object, badMessage));
await Assert.ThrowsAsync<EdgeHubMessageTooLargeException>(() => routingEdgeHub.ProcessDeviceMessage(identity.Object, badMessage));

string badString = System.Text.Encoding.UTF8.GetString(new byte[300 * 1024], 0, 300 * 1024);
var badProperties = new Dictionary<string, string> { ["toolong"] = badString };

badMessage = new Message.Builder(new byte[1]).SetProperties(badProperties).Build();

await Assert.ThrowsAsync<InvalidOperationException>(() => routingEdgeHub.ProcessDeviceMessage(identity.Object, badMessage));
await Assert.ThrowsAsync<EdgeHubMessageTooLargeException>(() => routingEdgeHub.ProcessDeviceMessage(identity.Object, badMessage));

badMessage = new Message(new byte[1], new Dictionary<string, string>(), badProperties);

await Assert.ThrowsAsync<InvalidOperationException>(() => routingEdgeHub.ProcessDeviceMessage(identity.Object, badMessage));
await Assert.ThrowsAsync<EdgeHubMessageTooLargeException>(() => routingEdgeHub.ProcessDeviceMessage(identity.Object, badMessage));
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Devices.Edge.Hub.E2E.Test
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Edge.Util.Test.Common;
using Xunit;

Expand Down Expand Up @@ -113,5 +114,44 @@ async Task SendTelemetryMultipleInputsTest(ITransportSettings[] transportSetting
// wait for the connection to be closed on the Edge side
await Task.Delay(TimeSpan.FromSeconds(10));
}

[Theory]
[MemberData(nameof(TestSettings.TransportSettings), MemberType = typeof(TestSettings))]
async Task SendLargeMessageHandleExceptionTest(ITransportSettings[] transportSettings)
{
TestModule sender = null;

string edgeDeviceConnectionString = await SecretsHelper.GetSecretFromConfigKey("edgeCapableDeviceConnStrKey");
IotHubConnectionStringBuilder connectionStringBuilder = IotHubConnectionStringBuilder.Create(edgeDeviceConnectionString);
RegistryManager rm = RegistryManager.CreateFromConnectionString(edgeDeviceConnectionString);

try
{
sender = await TestModule.CreateAndConnect(rm, connectionStringBuilder.HostName, connectionStringBuilder.DeviceId, "sender1", transportSettings);

Exception ex = null;
try
{
// create a large message
var message = new Message(new byte[400 * 1000]);
await sender.SendMessageAsync("output1", message);
}
catch (Exception e)
{
ex = e;
}

Assert.NotNull(ex);
}
finally
{
if (rm != null)
{
await rm.CloseAsync();
}
}
// wait for the connection to be closed on the Edge side
await Task.Delay(TimeSpan.FromSeconds(10));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public async Task<int> SendMessagesByCountAsync(string output, int startIndex, i

public Task<int> SendMessagesForDurationAsync(string output, TimeSpan duration) => this.SendMessagesAsync(output, 0, int.MaxValue, duration, TimeSpan.Zero);

public Task SendMessageAsync(string output, Message message)
{
return this.moduleClient.SendEventAsync(output, message);
}

async Task<int> SendMessagesAsync(string output, int startIndex, int count, TimeSpan duration, TimeSpan sleepTime)
{
var s = new Stopwatch();
Expand Down

0 comments on commit 93f13b8

Please sign in to comment.