Skip to content

Commit

Permalink
Updated with metrics supported by new events (from PR dotnet#1866).
Browse files Browse the repository at this point in the history
  • Loading branch information
kallayj committed Dec 6, 2023
1 parent 39d6130 commit e6f8872
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0;net6.0;net7.0</TargetFrameworks>
<TargetFrameworks>netstandard2.0;netstandard2.1;net6.0;net7.0</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">$(TargetFrameworks);net452;net461;net48</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' AND '$(MSBuildRuntimeType)' != 'Core' ">$(TargetFrameworks);uap10.0</TargetFrameworks>
</PropertyGroup>
Expand Down
29 changes: 29 additions & 0 deletions Source/MQTTnet.Extensions.Metrics/MqttMetricsExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Diagnostics.Metrics;
using System.Threading.Tasks;
using MQTTnet.Server;
using MQTTnet.Diagnostics.Instrumentation;

Expand All @@ -24,6 +25,34 @@ public static void AddMetrics(this IMqttServerMetricSource server, Meter meter)
return Task.CompletedTask;
};

var deliveredMessageCounter = meter.CreateCounter<long>(
name: "mqtt.messages.delivered.count",
unit: "total",
description: "Cumulative total number of MQTTnet messages delivered to subscribers");

var droppedQueueFullMessageCounter = meter.CreateCounter<long>(
name: "mqtt.messages.dropped.queuefull.count",
unit: "total",
description: "Cumulative total number of MQTTnet messages dropped because the queue is full");

server.ApplicationMessageEnqueuedOrDroppedAsync += (ApplicationMessageEnqueuedEventArgs args) =>
{
if (args.IsDropped)
{
droppedQueueFullMessageCounter.Add(1);
}
else
{
deliveredMessageCounter.Add(1);
}
return Task.CompletedTask;
};
server.QueuedApplicationMessageOverwrittenAsync += (QueueMessageOverwrittenEventArgs args) =>
{
droppedQueueFullMessageCounter.Add(1);
return Task.CompletedTask;
};

var successfulConnectionsCounter = meter.CreateCounter<long>(
name: "mqtt.clients.connected.count",
unit: "total",
Expand Down
44 changes: 44 additions & 0 deletions Source/MQTTnet.Tests/Extensions/Metrics_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public async Task All_Metrics()
int clientCount = 0;
long connectedCount = 0;
long publishedCount = 0;
long deliveredCount = 0;
long droppedCount = 0;

using (var testEnvironment = CreateTestEnvironment())
using (Meter meter = new Meter("TEST"))
Expand All @@ -38,6 +40,8 @@ public async Task All_Metrics()
case "mqtt.clients.count":
case "mqtt.clients.connected.count":
case "mqtt.messages.published.count":
case "mqtt.messages.delivered.count":
case "mqtt.messages.dropped.queuefull.count":
listener.EnableMeasurementEvents(instrument);
break;
}
Expand Down Expand Up @@ -69,6 +73,12 @@ public async Task All_Metrics()
case "mqtt.messages.published.count":
publishedCount += measurement;
break;
case "mqtt.messages.delivered.count":
deliveredCount += measurement;
break;
case "mqtt.messages.dropped.queuefull.count":
droppedCount += measurement;
break;
}
};
meterListener.SetMeasurementEventCallback(longCallback);
Expand All @@ -83,13 +93,24 @@ public async Task All_Metrics()

await metricsSource.TriggerPublish();
Assert.AreEqual(1, publishedCount);

await metricsSource.TriggerEnqueueOrDrop(dropQueueFull: false);
Assert.AreEqual(1, deliveredCount);

await metricsSource.TriggerEnqueueOrDrop(dropQueueFull: true);
Assert.AreEqual(1, droppedCount);

await metricsSource.TriggerOverwriteMessage();
Assert.AreEqual(2, droppedCount);
}
}

private class TestMetricsSource : IMqttServerMetricSource
{
public event Func<ClientConnectedEventArgs, Task> ClientConnectedAsync;
public event Func<InterceptingPublishEventArgs, Task> InterceptingPublishAsync;
public event Func<ApplicationMessageEnqueuedEventArgs, Task> ApplicationMessageEnqueuedOrDroppedAsync;
public event Func<QueueMessageOverwrittenEventArgs, Task> QueuedApplicationMessageOverwrittenAsync;

public int GetActiveClientCount()
{
Expand Down Expand Up @@ -120,6 +141,29 @@ public Task TriggerPublish()
new Dictionary<string, object>());
return InterceptingPublishAsync(args);
}

public Task TriggerEnqueueOrDrop(bool dropQueueFull)
{
var args = new ApplicationMessageEnqueuedEventArgs(
"sender",
"receiver",
new MqttApplicationMessage(),
dropQueueFull);
return ApplicationMessageEnqueuedOrDroppedAsync(args);
}

public Task TriggerOverwriteMessage()
{
var args = new QueueMessageOverwrittenEventArgs(
"receiver",
new DummyPacket());
return QueuedApplicationMessageOverwrittenAsync(args);
}

private class DummyPacket : MqttPacket
{
public DummyPacket() {}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ public interface IMqttServerMetricSource
int GetActiveClientCount();
event Func<ClientConnectedEventArgs, Task> ClientConnectedAsync;
event Func<InterceptingPublishEventArgs, Task> InterceptingPublishAsync;
event Func<ApplicationMessageEnqueuedEventArgs, Task> ApplicationMessageEnqueuedOrDroppedAsync;
event Func<QueueMessageOverwrittenEventArgs, Task> QueuedApplicationMessageOverwrittenAsync;
}
}
}

0 comments on commit e6f8872

Please sign in to comment.