diff --git a/.github/workflows/ReleaseNotes.md b/.github/workflows/ReleaseNotes.md index fe734924a..0fdf73c0f 100644 --- a/.github/workflows/ReleaseNotes.md +++ b/.github/workflows/ReleaseNotes.md @@ -1,7 +1,7 @@ * [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min). -* [Client] Added support for custom CA chain validation (#1851, thanks to @rido-min). * [Client] Fixed handling of unobserved tasks exceptions (#1871). * [Client] Fixed not specified ReasonCode when using _SendExtendedAuthenticationExchangeDataAsync_ (#1882, thanks to @rido-min). * [Server] Fixed not working _UpdateRetainedMessageAsync_ public api (#1858, thanks to @kimdiego2098). * [Server] Added support for custom DISCONNECT packets when stopping the server or disconnect a client (BREAKING CHANGE!, #1846). * [Server] Added new property to stop the server from accepting new connections even if it is running (#1846). +* [Server] Added new events for delivered and dropped messages (#1866, thanks to @kallayj). \ No newline at end of file diff --git a/Source/MQTTnet/Internal/MqttPacketBus.cs b/Source/MQTTnet/Internal/MqttPacketBus.cs index 002e3d58b..b8c5b363c 100644 --- a/Source/MQTTnet/Internal/MqttPacketBus.cs +++ b/Source/MQTTnet/Internal/MqttPacketBus.cs @@ -104,17 +104,22 @@ public void Dispose() _signal.Dispose(); } - public void DropFirstItem(MqttPacketBusPartition partition) + public MqttPacketBusItem DropFirstItem(MqttPacketBusPartition partition) { lock (_syncRoot) { var partitionInstance = _partitions[(int)partition]; - if (partitionInstance.Any()) + if (partitionInstance.Count > 0) { + var firstItem = partitionInstance.First.Value; partitionInstance.RemoveFirst(); + + return firstItem; } } + + return null; } public void EnqueueItem(MqttPacketBusItem item, MqttPacketBusPartition partition) diff --git a/Source/MQTTnet/Server/Events/ApplicationMessageEnqueuedEventArgs.cs b/Source/MQTTnet/Server/Events/ApplicationMessageEnqueuedEventArgs.cs new file mode 100644 index 000000000..518c0a58e --- /dev/null +++ b/Source/MQTTnet/Server/Events/ApplicationMessageEnqueuedEventArgs.cs @@ -0,0 +1,27 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; + +namespace MQTTnet.Server +{ + public sealed class ApplicationMessageEnqueuedEventArgs : EventArgs + { + public ApplicationMessageEnqueuedEventArgs(string senderClientId, string receiverClientId, MqttApplicationMessage applicationMessage, bool isDropped) + { + SenderClientId = senderClientId ?? throw new ArgumentNullException( nameof(senderClientId)); + ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId)); + ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage)); + IsDropped = isDropped; + } + + public string SenderClientId { get; } + + public string ReceiverClientId { get; } + + public bool IsDropped { get; } + + public MqttApplicationMessage ApplicationMessage { get; } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Events/QueueMessageOverwrittenEventArgs.cs b/Source/MQTTnet/Server/Events/QueueMessageOverwrittenEventArgs.cs new file mode 100644 index 000000000..161e50548 --- /dev/null +++ b/Source/MQTTnet/Server/Events/QueueMessageOverwrittenEventArgs.cs @@ -0,0 +1,22 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using MQTTnet.Packets; + +namespace MQTTnet.Server +{ + public sealed class QueueMessageOverwrittenEventArgs : EventArgs + { + public QueueMessageOverwrittenEventArgs(string receiverClientId, MqttPacket packet) + { + ReceiverClientId = receiverClientId ?? throw new ArgumentNullException(nameof(receiverClientId)); + Packet = packet ?? throw new ArgumentNullException(nameof(packet)); + } + + public MqttPacket Packet { get; } + + public string ReceiverClientId { get; } + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Internal/EnqueueDataPacketResult.cs b/Source/MQTTnet/Server/Internal/EnqueueDataPacketResult.cs new file mode 100644 index 000000000..742cb7ad6 --- /dev/null +++ b/Source/MQTTnet/Server/Internal/EnqueueDataPacketResult.cs @@ -0,0 +1,12 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace MQTTnet.Server +{ + public enum EnqueueDataPacketResult + { + Enqueued, + Dropped + } +} \ No newline at end of file diff --git a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs index 19fc18a69..4f6864b88 100644 --- a/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs +++ b/Source/MQTTnet/Server/Internal/MqttClientSessionsManager.cs @@ -244,7 +244,14 @@ public async Task DispatchApplicationMessage( matchingSubscribersCount++; - session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy)); + var result = session.EnqueueDataPacket(new MqttPacketBusItem(publishPacketCopy)); + + if (_eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.HasHandlers) + { + var eventArgs = new ApplicationMessageEnqueuedEventArgs(senderId, session.Id, applicationMessage, result == EnqueueDataPacketResult.Dropped); + await _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.InvokeAsync(eventArgs).ConfigureAwait(false); + } + _logger.Verbose("Client '{0}': Queued PUBLISH packet with topic '{1}'", session.Id, applicationMessage.Topic); } diff --git a/Source/MQTTnet/Server/Internal/MqttServerEventContainer.cs b/Source/MQTTnet/Server/Internal/MqttServerEventContainer.cs index 5f06b6d92..1ffbf88db 100644 --- a/Source/MQTTnet/Server/Internal/MqttServerEventContainer.cs +++ b/Source/MQTTnet/Server/Internal/MqttServerEventContainer.cs @@ -22,6 +22,10 @@ public sealed class MqttServerEventContainer public AsyncEvent ClientUnsubscribedTopicEvent { get; } = new AsyncEvent(); public AsyncEvent InterceptingClientEnqueueEvent { get; } = new AsyncEvent(); + + public AsyncEvent ApplicationMessageEnqueuedOrDroppedEvent { get; } = new AsyncEvent(); + + public AsyncEvent QueuedApplicationMessageOverwrittenEvent { get; } = new AsyncEvent(); public AsyncEvent InterceptingInboundPacketEvent { get; } = new AsyncEvent(); diff --git a/Source/MQTTnet/Server/Internal/MqttSession.cs b/Source/MQTTnet/Server/Internal/MqttSession.cs index e16b65349..40f1b4f00 100644 --- a/Source/MQTTnet/Server/Internal/MqttSession.cs +++ b/Source/MQTTnet/Server/Internal/MqttSession.cs @@ -18,6 +18,7 @@ namespace MQTTnet.Server public sealed class MqttSession : IDisposable { readonly MqttClientSessionsManager _clientSessionsManager; + readonly MqttServerEventContainer _eventContainer; readonly MqttPacketBus _packetBus = new MqttPacketBus(); readonly MqttPacketIdentifierProvider _packetIdentifierProvider = new MqttPacketIdentifierProvider(); @@ -44,6 +45,7 @@ public MqttSession( _connectPacket = connectPacket ?? throw new ArgumentNullException(nameof(connectPacket)); _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions)); _clientSessionsManager = clientSessionsManager ?? throw new ArgumentNullException(nameof(clientSessionsManager)); + _eventContainer = eventContainer ?? throw new ArgumentNullException(nameof(eventContainer)); _subscriptionsManager = new MqttClientSubscriptionsManager(this, eventContainer, retainedMessagesManager, clientSessionsManager); } @@ -117,20 +119,25 @@ public void EnqueueControlPacket(MqttPacketBusItem packetBusItem) _packetBus.EnqueueItem(packetBusItem, MqttPacketBusPartition.Control); } - public void EnqueueDataPacket(MqttPacketBusItem packetBusItem) + public EnqueueDataPacketResult EnqueueDataPacket(MqttPacketBusItem packetBusItem) { if (_packetBus.ItemsCount(MqttPacketBusPartition.Data) >= _serverOptions.MaxPendingMessagesPerClient) { if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage) { - return; + return EnqueueDataPacketResult.Dropped; } if (_serverOptions.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage) { // Only drop from the data partition. Dropping from control partition might break the connection // because the client does not receive PINGREQ packets etc. any longer. - _packetBus.DropFirstItem(MqttPacketBusPartition.Data); + var firstItem = _packetBus.DropFirstItem(MqttPacketBusPartition.Data); + if (firstItem != null && _eventContainer.QueuedApplicationMessageOverwrittenEvent.HasHandlers) + { + var eventArgs = new QueueMessageOverwrittenEventArgs(Id, firstItem.Packet); + _eventContainer.QueuedApplicationMessageOverwrittenEvent.InvokeAsync(eventArgs).ConfigureAwait(false); + } } } @@ -147,6 +154,7 @@ public void EnqueueDataPacket(MqttPacketBusItem packetBusItem) } _packetBus.EnqueueItem(packetBusItem, MqttPacketBusPartition.Data); + return EnqueueDataPacketResult.Enqueued; } public void EnqueueHealthPacket(MqttPacketBusItem packetBusItem) diff --git a/Source/MQTTnet/Server/MqttServer.cs b/Source/MQTTnet/Server/MqttServer.cs index 016b167d8..19ba2244e 100644 --- a/Source/MQTTnet/Server/MqttServer.cs +++ b/Source/MQTTnet/Server/MqttServer.cs @@ -51,6 +51,12 @@ public MqttServer(MqttServerOptions options, IEnumerable ada _keepAliveMonitor = new MqttServerKeepAliveMonitor(options, _clientSessionsManager, _rootLogger); } + public event Func ApplicationMessageEnqueuedOrDroppedAsync + { + add => _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.AddHandler(value); + remove => _eventContainer.ApplicationMessageEnqueuedOrDroppedEvent.RemoveHandler(value); + } + public event Func ApplicationMessageNotConsumedAsync { add => _eventContainer.ApplicationMessageNotConsumedEvent.AddHandler(value); @@ -135,6 +141,12 @@ public event Func PreparingSessionAsync remove => _eventContainer.PreparingSessionEvent.RemoveHandler(value); } + public event Func QueuedApplicationMessageOverwrittenAsync + { + add => _eventContainer.QueuedApplicationMessageOverwrittenEvent.AddHandler(value); + remove => _eventContainer.QueuedApplicationMessageOverwrittenEvent.RemoveHandler(value); + } + public event Func RetainedMessageChangedAsync { add => _eventContainer.RetainedMessageChangedEvent.AddHandler(value);