Skip to content

Commit

Permalink
Adds events for delivered and dropped messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
kallayj committed Oct 26, 2023
1 parent 6fdee9e commit 989d6b4
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;

namespace MQTTnet.Server
{
public sealed class ApplicationMessageEnqueuedEventArgs : EventArgs
{
public ApplicationMessageEnqueuedEventArgs(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage, bool dropped)
{
SenderClientId = senderClientId ?? throw new ArgumentNullException( nameof(senderClientId));
ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId));
DroppedQueueFull = dropped;
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
}
public string SenderClientId { get; }

public string ReceiverClientId { get; }

public bool DroppedQueueFull { get; }


public MqttApplicationMessage ApplicationMessage { get; }
}
}
18 changes: 18 additions & 0 deletions Source/MQTTnet/Server/Events/QueueMessageOverwrittenEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;

namespace MQTTnet.Server
{
public sealed class QueueMessageOverwrittenEventArgs : EventArgs
{
public QueueMessageOverwrittenEventArgs(string receiverClientId)
{
ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId));
}

public string ReceiverClientId { get; }
}
}
7 changes: 6 additions & 1 deletion Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,12 @@ public async Task<DispatchApplicationMessageResult> DispatchApplicationMessage(

matchingSubscribersCount++;

session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));
var result = session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy));
if (_eventContainer.ClientMessageEnqueuedOrDroppedEvent.HasHandlers)
{
var eventArgs = new ApplicationMessageEnqueuedEventArgs(senderId, session.Id, applicationMessage, result);
await _eventContainer.ClientMessageEnqueuedOrDroppedEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
_logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'", session.Id, applicationMessage.Topic);
}

Expand Down
3 changes: 3 additions & 0 deletions Source/MQTTnet/Server/Internal/MqttServerEventContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public sealed class MqttServerEventContainer
public AsyncEvent<ClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopicEvent { get; } = new AsyncEvent<ClientUnsubscribedTopicEventArgs>();

public AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs> InterceptingClientEnqueueEvent { get; } = new AsyncEvent<InterceptingClientApplicationMessageEnqueueEventArgs>();
public AsyncEvent<ApplicationMessageEnqueuedEventArgs> ClientMessageEnqueuedOrDroppedEvent { get; } = new AsyncEvent<ApplicationMessageEnqueuedEventArgs>();

public AsyncEvent<QueueMessageOverwrittenEventArgs> QueueMessageOverwrittenEvent { get; } = new AsyncEvent<QueueMessageOverwrittenEventArgs>();

public AsyncEvent<InterceptingPacketEventArgs> InterceptingInboundPacketEvent { get; } = new AsyncEvent<InterceptingPacketEventArgs>();

Expand Down
12 changes: 10 additions & 2 deletions Source/MQTTnet/Server/Internal/MqttSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace MQTTnet.Server
public sealed class MqttSession : IDisposable
{
readonly MqttClientSessionsManager _clientSessionsManager;
readonly MqttServerEventContainer _eventContainer;
readonly MqttPacketBus _packetBus = new MqttPacketBus();
readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider();

Expand Down Expand Up @@ -45,6 +46,7 @@ public MqttSession(
_serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
_clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager));

_eventContainer = eventContainer;
_subscriptionsManager = new MqttClientSubscriptionsManager(this, eventContainer, retainedMessagesManager, clientSessionsManager);
}

Expand Down Expand Up @@ -117,20 +119,25 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem)
_packetBus.EnqueueItem(packetBusItem, MqttPacketBusPartition.Control);
}

public void EnqueueDataPacket(MqttPacketBusItem packetBusItem)
public bool EnqueueDataPacket(MqttPacketBusItem packetBusItem)
{
if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient)
{
if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
return;
return false;
}

if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{
// Only drop from the data partition. Dropping from control partition might break the connection
// because the client does not receive PINGREQ packets etc. any longer.
_packetBus.DropFirstItem(MqttPacketBusPartition.Data);
if (_eventContainer.QueueMessageOverwrittenEvent.HasHandlers)
{
var eventArgs = new QueueMessageOverwrittenEventArgs(this.Id);
_eventContainer.QueueMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false);
}
}
}

Expand All @@ -147,6 +154,7 @@ public void EnqueueDataPacket(MqttPacketBusItem packetBusItem)
}

_packetBus.EnqueueItem(packetBusItem, MqttPacketBusPartition.Data);
return true;
}

public void EnqueueHealthPacket(MqttPacketBusItem packetBusItem)
Expand Down
12 changes: 12 additions & 0 deletions Source/MQTTnet/Server/MqttServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ public event Func<InterceptingClientApplicationMessageEnqueueEventArgs, Task> In
remove => _eventContainer.InterceptingClientEnqueueEvent.RemoveHandler(value);
}

public event Func<ApplicationMessageEnqueuedEventArgs, Task> ClientMessageEnqueuedOrDroppedAsync
{
add => _eventContainer.ClientMessageEnqueuedOrDroppedEvent.AddHandler(value);
remove => _eventContainer.ClientMessageEnqueuedOrDroppedEvent.RemoveHandler(value);
}

public event Func<QueueMessageOverwrittenEventArgs, Task> QueueMessageOverwrittenEventAsync
{
add => _eventContainer.QueueMessageOverwrittenEvent.AddHandler(value);
remove => _eventContainer.QueueMessageOverwrittenEvent.RemoveHandler(value);
}

public event Func<InterceptingPacketEventArgs, Task> InterceptingInboundPacketAsync
{
add => _eventContainer.InterceptingInboundPacketEvent.AddHandler(value);
Expand Down

0 comments on commit 989d6b4

Please sign in to comment.