Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No longer fire _ClientAcknowledgedPublishPacketEvent_ for PUBREC packets (#1550) #1551

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* [Server] Fixed duplicated invocation of the event _ClientAcknowledgedPublishPacketAsync_ for QoS level 2 (#1550)
1 change: 1 addition & 0 deletions MQTTnet.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -242,5 +242,6 @@ See the LICENSE file in the project root for more information.</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=PINGREQ/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=PINGRESP/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=PUBCOMP/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Tnet/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=unsub/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
13 changes: 2 additions & 11 deletions Source/MQTTnet.Tests/Server/QoS_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,16 @@ public async Task Fire_Event_On_Client_Acknowledges_QoS_2()

await LongTestDelay();

Assert.AreEqual(2, eventArgs.Count);
Assert.AreEqual(1, eventArgs.Count);

var firstEvent = eventArgs[0];

Assert.IsNotNull(firstEvent);
Assert.IsNotNull(firstEvent.PublishPacket);
Assert.IsNotNull(firstEvent.AcknowledgePacket);
Assert.IsFalse(firstEvent.IsCompleted);
Assert.IsTrue(firstEvent.IsCompleted);

Assert.AreEqual("A", firstEvent.PublishPacket.Topic);

var secondEvent = eventArgs[1];

Assert.IsNotNull(secondEvent);
Assert.IsNotNull(secondEvent.PublishPacket);
Assert.IsNotNull(secondEvent.AcknowledgePacket);
Assert.IsTrue(secondEvent.IsCompleted);

Assert.AreEqual("A", secondEvent.PublishPacket.Topic);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@ namespace MQTTnet.Server
{
public sealed class ClientAcknowledgedPublishPacketEventArgs : EventArgs
{
public ClientAcknowledgedPublishPacketEventArgs(string clientId, IDictionary sessionItems, MqttPublishPacket publishPacket, MqttPacketWithIdentifier acknowledgePacket)
{
ClientId = clientId ?? throw new ArgumentNullException(nameof(clientId));
SessionItems = sessionItems ?? throw new ArgumentNullException(nameof(sessionItems));
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
AcknowledgePacket = acknowledgePacket ?? throw new ArgumentNullException(nameof(acknowledgePacket));
}

/// <summary>
/// Gets the packet which was used for acknowledge. This can be a PubAck or PubComp packet.
/// </summary>
public MqttPacketWithIdentifier AcknowledgePacket { get; internal set; }
public MqttPacketWithIdentifier AcknowledgePacket { get; }

/// <summary>
/// Gets the ID of the client which acknowledged a PUBLISH packet.
/// </summary>
public string ClientId { get; internal set; }
public string ClientId { get; }

/// <summary>
/// Gets whether the PUBLISH packet is fully acknowledged. This is the case for PUBACK (QoS 1) and PUBCOMP (QoS 2.
Expand All @@ -28,11 +36,11 @@ public sealed class ClientAcknowledgedPublishPacketEventArgs : EventArgs
/// <summary>
/// Gets the PUBLISH packet which was acknowledged.
/// </summary>
public MqttPublishPacket PublishPacket { get; internal set; }
public MqttPublishPacket PublishPacket { get; }

/// <summary>
/// Gets the session items which contain custom user data per session.
/// </summary>
public IDictionary SessionItems { get; internal set; }
public IDictionary SessionItems { get; }
}
}
23 changes: 6 additions & 17 deletions Source/MQTTnet/Server/Internal/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
using MQTTnet.Internal;
using MQTTnet.PacketDispatcher;
using MQTTnet.Packets;
Expand Down Expand Up @@ -170,14 +169,7 @@ Task ClientAcknowledgedPublishPacket(MqttPublishPacket publishPacket, MqttPacket
{
if (_eventContainer.ClientAcknowledgedPublishPacketEvent.HasHandlers)
{
var eventArgs = new ClientAcknowledgedPublishPacketEventArgs
{
PublishPacket = publishPacket,
AcknowledgePacket = acknowledgePacket,
ClientId = Id,
SessionItems = Session.Items
};

var eventArgs = new ClientAcknowledgedPublishPacketEventArgs(Id, Session.Items, publishPacket, acknowledgePacket);
return _eventContainer.ClientAcknowledgedPublishPacketEvent.TryInvokeAsync(eventArgs, _logger);
}

Expand Down Expand Up @@ -272,17 +264,14 @@ async Task HandleIncomingPublishPacket(MqttPublishPacket publishPacket, Cancella
}
}

async Task HandleIncomingPubRecPacket(MqttPubRecPacket pubRecPacket)
Task HandleIncomingPubRecPacket(MqttPubRecPacket pubRecPacket)
{
var acknowledgedPublishPacket = Session.PeekAcknowledgePublishPacket(pubRecPacket.PacketIdentifier);

if (acknowledgedPublishPacket != null)
{
await ClientAcknowledgedPublishPacket(acknowledgedPublishPacket, pubRecPacket).ConfigureAwait(false);
}

// Do not fire the event _ClientAcknowledgedPublishPacket_ here because the QoS 2 process is only finished
// properly when the client has sent the PUBCOMP packet.
var pubRelPacket = _packetFactories.PubRel.Create(pubRecPacket, MqttApplicationMessageReceivedReasonCode.Success);
Session.EnqueueControlPacket(new MqttPacketBusItem(pubRelPacket));

return CompletedTask.Instance;
}

void HandleIncomingPubRelPacket(MqttPubRelPacket pubRelPacket)
Expand Down