Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

802 client isnt being notified if message is not delivered #1568

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
* [Server] Fixed duplicated invocation of the event _ClientAcknowledgedPublishPacketAsync_ for QoS level 2 (#1550)
* [Server] Fixed issue in upgrading and downgrading of QoS levels for subscriptions and retained messages (#1560)
* [Server] Fixed memory leak when old sessions are discarded (#1553)
* [Server] When using MQTTv5 the reason code _NoMatchingSubscribers_ is returned when no client is subscribed to a published topic (#802)
4 changes: 2 additions & 2 deletions Source/MQTTnet.Tests/MQTTv5/Client_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public async Task Publish_QoS_1()
var result = await client.PublishStringAsync("a", "b", MqttQualityOfServiceLevel.AtLeastOnce);
await client.DisconnectAsync();

Assert.AreEqual(MqttClientPublishReasonCode.Success, result.ReasonCode);
Assert.AreEqual(MqttClientPublishReasonCode.NoMatchingSubscribers, result.ReasonCode);
}
}

Expand All @@ -169,7 +169,7 @@ public async Task Publish_QoS_2()
var result = await client.PublishStringAsync("a", "b", MqttQualityOfServiceLevel.ExactlyOnce);
await client.DisconnectAsync();

Assert.AreEqual(MqttClientPublishReasonCode.Success, result.ReasonCode);
Assert.AreEqual(MqttClientPublishReasonCode.NoMatchingSubscribers, result.ReasonCode);
}
}

Expand Down
58 changes: 58 additions & 0 deletions Source/MQTTnet.Tests/Server/Publish_Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Formatter;
using MQTTnet.Protocol;

namespace MQTTnet.Tests.Server
{
[TestClass]
public sealed class Publish_Tests : BaseTestClass
{
[TestMethod]
public async Task Return_Success_When_Subscribed()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer();

var sender = await testEnvironment.ConnectClient();
var receiver = await testEnvironment.ConnectClient();

await receiver.SubscribeAsync("A");

// AtLeastOnce is required to get an ACK packet from the server.
var publishResult = await sender.PublishStringAsync("A", "Payload", MqttQualityOfServiceLevel.AtLeastOnce);

Assert.AreEqual(MqttClientPublishReasonCode.Success, publishResult.ReasonCode);

Assert.AreEqual(true, publishResult.IsSuccess);
}
}

[TestMethod]
public async Task Return_NoMatchingSubscribers_When_Not_Subscribed()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer();

var sender = await testEnvironment.ConnectClient();
var receiver = await testEnvironment.ConnectClient();

await receiver.SubscribeAsync("A");

// AtLeastOnce is required to get an ACK packet from the server.
var publishResult = await sender.PublishStringAsync("B", "Payload", MqttQualityOfServiceLevel.AtLeastOnce);

Assert.AreEqual(MqttClientPublishReasonCode.NoMatchingSubscribers, publishResult.ReasonCode);

Assert.AreEqual(true, publishResult.IsSuccess);
}
}
}
}
31 changes: 20 additions & 11 deletions Source/MQTTnet/Client/Publishing/MqttClientPublishResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,38 @@ namespace MQTTnet.Client
public sealed class MqttClientPublishResult
{
/// <summary>
/// Gets the packet identifier which was used for this publish.
/// Returns if the overall status of the publish is a success. This can be the reason code _Success_ or
/// _NoMatchingSubscribers_. _NoMatchingSubscribers_ usually indicates only that no other client is interested in the topic but overall transmit
/// to the server etc. was a success.
/// </summary>
public bool IsSuccess => ReasonCode == MqttClientPublishReasonCode.Success || ReasonCode == MqttClientPublishReasonCode.NoMatchingSubscribers;

/// <summary>
/// Gets the packet identifier which was used for this publish.
/// </summary>
public ushort? PacketIdentifier { get; set; }

/// <summary>
/// Gets or sets the reason code.
/// Hint: MQTT 5 feature only.
/// Gets or sets the reason code.
/// Hint: MQTT 5 feature only.
/// </summary>
public MqttClientPublishReasonCode ReasonCode { get; set; } = MqttClientPublishReasonCode.Success;

/// <summary>
/// Gets or sets the reason string.
/// Hint: MQTT 5 feature only.
/// Gets or sets the reason string.
/// Hint: MQTT 5 feature only.
/// </summary>
public string ReasonString { get; set; }

/// <summary>
/// Gets or sets the user properties.
/// In MQTT 5, user properties are basic UTF-8 string key-value pairs that you can append to almost every type of MQTT packet.
/// As long as you don’t exceed the maximum message size, you can use an unlimited number of user properties to add metadata to MQTT messages and pass information between publisher, broker, and subscriber.
/// The feature is very similar to the HTTP header concept.
/// Hint: MQTT 5 feature only.
/// Gets or sets the user properties.
/// In MQTT 5, user properties are basic UTF-8 string key-value pairs that you can append to almost every type of MQTT
/// packet.
/// As long as you don’t exceed the maximum message size, you can use an unlimited number of user properties to add
/// metadata to MQTT messages and pass information between publisher, broker, and subscriber.
/// The feature is very similar to the HTTP header concept.
/// Hint: MQTT 5 feature only.
/// </summary>
public IReadOnlyCollection<MqttUserProperty> UserProperties { get; internal set; }
}
}
}
38 changes: 19 additions & 19 deletions Source/MQTTnet/Formatter/MqttPubAckPacketFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ namespace MQTTnet.Formatter
{
public sealed class MqttPubAckPacketFactory
{
public MqttPubAckPacket Create(MqttPublishPacket publishPacket, InterceptingPublishEventArgs interceptingPublishEventArgs)
public MqttPubAckPacket Create(
MqttPublishPacket publishPacket,
InterceptingPublishEventArgs interceptingPublishEventArgs,
DispatchApplicationMessageResult dispatchApplicationMessageResult)
{
if (publishPacket == null)
{
Expand All @@ -21,10 +24,19 @@ public MqttPubAckPacket Create(MqttPublishPacket publishPacket, InterceptingPubl

var pubAckPacket = new MqttPubAckPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = MqttPubAckReasonCode.Success
PacketIdentifier = publishPacket.PacketIdentifier
};

if (dispatchApplicationMessageResult.MatchingSubscribersCount == 0)
{
// NoMatchingSubscribers is ONLY sent by the server!
pubAckPacket.ReasonCode = MqttPubAckReasonCode.NoMatchingSubscribers;
}
else
{
pubAckPacket.ReasonCode = MqttPubAckReasonCode.Success;
}

if (interceptingPublishEventArgs != null)
{
pubAckPacket.ReasonCode = (MqttPubAckReasonCode)(int)interceptingPublishEventArgs.Response.ReasonCode;
Expand All @@ -42,24 +54,12 @@ public MqttPubAckPacket Create(MqttApplicationMessageReceivedEventArgs applicati
throw new ArgumentNullException(nameof(applicationMessageReceivedEventArgs));
}

var pubAckPacket = Create(applicationMessageReceivedEventArgs.PublishPacket, applicationMessageReceivedEventArgs.ReasonCode);
pubAckPacket.UserProperties = applicationMessageReceivedEventArgs.ResponseUserProperties;
pubAckPacket.ReasonString = applicationMessageReceivedEventArgs.ResponseReasonString;

return pubAckPacket;
}

static MqttPubAckPacket Create(MqttPublishPacket publishPacket, MqttApplicationMessageReceivedReasonCode applicationMessageReceivedReasonCode)
{
if (publishPacket == null)
{
throw new ArgumentNullException(nameof(publishPacket));
}

var pubAckPacket = new MqttPubAckPacket
{
PacketIdentifier = publishPacket.PacketIdentifier,
ReasonCode = (MqttPubAckReasonCode)(int)applicationMessageReceivedReasonCode
PacketIdentifier = applicationMessageReceivedEventArgs.PublishPacket.PacketIdentifier,
ReasonCode = (MqttPubAckReasonCode)(int)applicationMessageReceivedEventArgs.ReasonCode,
UserProperties = applicationMessageReceivedEventArgs.ResponseUserProperties,
ReasonString = applicationMessageReceivedEventArgs.ResponseReasonString
};

return pubAckPacket;
Expand Down
10 changes: 9 additions & 1 deletion Source/MQTTnet/Formatter/MqttPubRecPacketFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ public MqttPubRecPacket Create(MqttApplicationMessageReceivedEventArgs applicati
return pubRecPacket;
}

public MqttPacket Create(MqttPublishPacket publishPacket, InterceptingPublishEventArgs interceptingPublishEventArgs)
public MqttPacket Create(
MqttPublishPacket publishPacket,
InterceptingPublishEventArgs interceptingPublishEventArgs,
DispatchApplicationMessageResult dispatchApplicationMessageResult)
{
if (publishPacket == null)
{
Expand All @@ -38,6 +41,11 @@ public MqttPacket Create(MqttPublishPacket publishPacket, InterceptingPublishEve
ReasonCode = MqttPubRecReasonCode.Success
};

if (dispatchApplicationMessageResult.MatchingSubscribersCount == 0)
{
pubRecPacket.ReasonCode = MqttPubRecReasonCode.NoMatchingSubscribers;
}

if (interceptingPublishEventArgs != null)
{
pubRecPacket.ReasonCode = (MqttPubRecReasonCode)(int)interceptingPublishEventArgs.Response.ReasonCode;
Expand Down
5 changes: 5 additions & 0 deletions Source/MQTTnet/Protocol/MqttPubAckReasonCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ namespace MQTTnet.Protocol
public enum MqttPubAckReasonCode
{
Success = 0,

/// <summary>
/// The message is accepted but there are no subscribers. This is sent only by the Server. If the Server knows that there are no matching subscribers, it MAY use this Reason Code instead of 0x00 (Success).
/// </summary>
NoMatchingSubscribers = 16,

UnspecifiedError = 128,
ImplementationSpecificError = 131,
NotAuthorized = 135,
Expand Down
16 changes: 16 additions & 0 deletions Source/MQTTnet/Server/Internal/DispatchApplicationMessageResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace MQTTnet.Server
{
public sealed class DispatchApplicationMessageResult
{
public DispatchApplicationMessageResult(int matchingSubscribersCount)
{
MatchingSubscribersCount = matchingSubscribersCount;
}

public int MatchingSubscribersCount { get; }
}
}
7 changes: 4 additions & 3 deletions Source/MQTTnet/Server/Internal/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,10 @@ async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, Cancella
return;
}

DispatchApplicationMessageResult dispatchResult = null;
if (processPublish && applicationMessage != null)
{
await _sessionsManager.DispatchApplicationMessage(Id, applicationMessage).ConfigureAwait(false);
dispatchResult = await _sessionsManager.DispatchApplicationMessage(Id, applicationMessage).ConfigureAwait(false);
}

switch (publishPacket.QualityOfServiceLevel)
Expand All @@ -251,13 +252,13 @@ async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, Cancella
}
case MqttQualityOfServiceLevel.AtLeastOnce:
{
var pubAckPacket = _packetFactories.PubAck.Create(publishPacket, interceptingPublishEventArgs);
var pubAckPacket = _packetFactories.PubAck.Create(publishPacket, interceptingPublishEventArgs, dispatchResult);
Session.EnqueueControlPacket(new MqttPacketBusItem(pubAckPacket));
break;
}
case MqttQualityOfServiceLevel.ExactlyOnce:
{
var pubRecPacket = _packetFactories.PubRec.Create(publishPacket, interceptingPublishEventArgs);
var pubRecPacket = _packetFactories.PubRec.Create(publishPacket, interceptingPublishEventArgs, dispatchResult);
Session.EnqueueControlPacket(new MqttPacketBusItem(pubRecPacket));
break;
}
Expand Down
25 changes: 12 additions & 13 deletions Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,20 @@ public async Task DeleteSessionAsync(string clientId)

_logger.Verbose("Session for client '{0}' deleted.", clientId);
}

public async Task DispatchApplicationMessage(string senderId, MqttApplicationMessage applicationMessage)
public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(string senderId, MqttApplicationMessage applicationMessage)
{
var matchingSubscribersCount = 0;
try
{
if (applicationMessage.Retain)
{
await _retainedMessagesManager.UpdateMessage(senderId, applicationMessage).ConfigureAwait(false);
}

var deliveryCount = 0;

List<MqttSession> subscriberSessions;
lock (_sessionsManagementLock)
{
// only subscriber clients are of interest here.
subscriberSessions = _subscriberSessions.ToList();
}

Expand Down Expand Up @@ -187,17 +186,22 @@ public async Task DispatchApplicationMessage(string senderId, MqttApplicationMes
}

session.EnqueueDataPacket(new MqttPacketBusItem(newPublishPacket));
deliveryCount++;
matchingSubscribersCount++;

_logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'.", session.Id, applicationMessage.Topic);
}

await FireApplicationMessageNotConsumedEvent(applicationMessage, deliveryCount, senderId);
if (matchingSubscribersCount == 0)
{
await FireApplicationMessageNotConsumedEvent(applicationMessage, senderId).ConfigureAwait(false);
}
}
catch (Exception exception)
{
_logger.Error(exception, "Unhandled exception while processing next queued application message.");
}

return new DispatchApplicationMessageResult(matchingSubscribersCount);
}

public void Dispose()
Expand Down Expand Up @@ -575,13 +579,8 @@ MqttSession CreateSession(string clientId, IDictionary sessionItems, bool isPers
return new MqttSession(clientId, isPersistent, sessionItems, _options, _eventContainer, _retainedMessagesManager, this);
}

async Task FireApplicationMessageNotConsumedEvent(MqttApplicationMessage applicationMessage, int deliveryCount, string senderId)
async Task FireApplicationMessageNotConsumedEvent(MqttApplicationMessage applicationMessage, string senderId)
{
if (deliveryCount > 0)
{
return;
}

if (!_eventContainer.ApplicationMessageNotConsumedEvent.HasHandlers)
{
return;
Expand Down