diff --git a/.github/workflows/ReleaseNotes.md b/.github/workflows/ReleaseNotes.md index d499d16e0..f6deec47a 100644 --- a/.github/workflows/ReleaseNotes.md +++ b/.github/workflows/ReleaseNotes.md @@ -4,6 +4,7 @@ * [Client] Added support for _RemoteCertificateValidationCallback_ for .NET 4.5.2, 4.6.1 and 4.8 (#1806, thanks to @troky). * [Client] Fixed wrong logging of obsolete feature when connection was not successful (#1801, thanks to @ramonsmits). * [Client] Fixed _NullReferenceException_ when performing several actions when not connected (#1800, thanks to @ramonsmits). +* [RpcClient] Added support for passing custom parameters to topic generation context (#1798, thanks to @Temppus). * [Server] Fixed _NullReferenceException_ in retained messages management (#1762, thanks to @logicaloud). * [Server] Exposed new option which allows disabling packet fragmentation (#1753). * [Server] Expired sessions will no longer be used when a client connects (#1756). diff --git a/Source/MQTTnet.Extensions.Rpc/IMqttRpcClient.cs b/Source/MQTTnet.Extensions.Rpc/IMqttRpcClient.cs index 8a2528990..b43459689 100644 --- a/Source/MQTTnet.Extensions.Rpc/IMqttRpcClient.cs +++ b/Source/MQTTnet.Extensions.Rpc/IMqttRpcClient.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MQTTnet.Protocol; @@ -7,8 +8,8 @@ namespace MQTTnet.Extensions.Rpc { public interface IMqttRpcClient : IDisposable { - Task ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel); + Task ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary parameters = null); - Task ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken = default); + Task ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary parameters = null, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs index aab296c99..663e23846 100644 --- a/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs +++ b/Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -42,13 +43,13 @@ public void Dispose() _waitingCalls.Clear(); } - public async Task ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel) + public async Task ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary parameters = null) { using (var timeoutToken = new CancellationTokenSource(timeout)) { try { - return await ExecuteAsync(methodName, payload, qualityOfServiceLevel, timeoutToken.Token).ConfigureAwait(false); + return await ExecuteAsync(methodName, payload, qualityOfServiceLevel, parameters, timeoutToken.Token).ConfigureAwait(false); } catch (OperationCanceledException exception) { @@ -62,14 +63,14 @@ public async Task ExecuteAsync(TimeSpan timeout, string methodName, byte } } - public async Task ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken = default) + public async Task ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary parameters = null, CancellationToken cancellationToken = default) { if (methodName == null) { throw new ArgumentNullException(nameof(methodName)); } - var context = new TopicGenerationContext(_mqttClient, _options, methodName, qualityOfServiceLevel); + var context = new TopicGenerationContext(_mqttClient, _options, methodName, parameters, qualityOfServiceLevel); var topicNames = _options.TopicGenerationStrategy.CreateRpcTopics(context); var requestTopic = topicNames.RequestTopic; diff --git a/Source/MQTTnet.Extensions.Rpc/MqttRpcClientExtensions.cs b/Source/MQTTnet.Extensions.Rpc/MqttRpcClientExtensions.cs index 8fa10468d..a0b0ebfee 100644 --- a/Source/MQTTnet.Extensions.Rpc/MqttRpcClientExtensions.cs +++ b/Source/MQTTnet.Extensions.Rpc/MqttRpcClientExtensions.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using MQTTnet.Protocol; @@ -11,13 +12,13 @@ namespace MQTTnet.Extensions.Rpc { public static class MqttRpcClientExtensions { - public static Task ExecuteAsync(this IMqttRpcClient client, TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel) + public static Task ExecuteAsync(this IMqttRpcClient client, TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary parameters = null) { if (client == null) throw new ArgumentNullException(nameof(client)); var buffer = Encoding.UTF8.GetBytes(payload ?? string.Empty); - return client.ExecuteAsync(timeout, methodName, buffer, qualityOfServiceLevel); + return client.ExecuteAsync(timeout, methodName, buffer, qualityOfServiceLevel, parameters); } } } \ No newline at end of file diff --git a/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs index 03898bb87..a556825a5 100644 --- a/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs +++ b/Source/MQTTnet.Extensions.Rpc/Options/TopicGeneration/TopicGenerationContext.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections.Generic; using MQTTnet.Client; using MQTTnet.Protocol; @@ -10,9 +11,11 @@ namespace MQTTnet.Extensions.Rpc { public sealed class TopicGenerationContext { - public TopicGenerationContext(IMqttClient mqttClient, MqttRpcClientOptions options, string methodName, MqttQualityOfServiceLevel qualityOfServiceLevel) + public TopicGenerationContext(IMqttClient mqttClient, MqttRpcClientOptions options, string methodName, + IDictionary parameters, MqttQualityOfServiceLevel qualityOfServiceLevel) { MethodName = methodName ?? throw new ArgumentNullException(nameof(methodName)); + Parameters = parameters; QualityOfServiceLevel = qualityOfServiceLevel; MqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient)); Options = options ?? throw new ArgumentNullException(nameof(options)); @@ -20,6 +23,8 @@ public TopicGenerationContext(IMqttClient mqttClient, MqttRpcClientOptions optio public string MethodName { get; } + public IDictionary Parameters { get; } + public IMqttClient MqttClient { get; } public MqttRpcClientOptions Options { get; } diff --git a/Source/MQTTnet.Tests/Extensions/Rpc_Tests.cs b/Source/MQTTnet.Tests/Extensions/Rpc_Tests.cs index b48d50537..36bca5e4d 100644 --- a/Source/MQTTnet.Tests/Extensions/Rpc_Tests.cs +++ b/Source/MQTTnet.Tests/Extensions/Rpc_Tests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -45,6 +46,34 @@ public async Task Execute_Success_MQTT_V5_Mixed_Clients() } } + [TestMethod] + public async Task Execute_Success_Parameters_Propagated_Correctly() + { + var paramValue = "123"; + var parameters = new Dictionary + { + { TestParametersTopicGenerationStrategy.ExpectedParamName, "123" }, + }; + + using (var testEnvironment = CreateTestEnvironment()) + { + await testEnvironment.StartServer(); + + var responseSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder()); + await responseSender.SubscribeAsync($"MQTTnet.RPC/+/ping/{paramValue}"); + + responseSender.ApplicationMessageReceivedAsync += e => responseSender.PublishStringAsync(e.ApplicationMessage.Topic + "/response", "pong"); + + using (var rpcClient = await testEnvironment.ConnectRpcClient(new MqttRpcClientOptionsBuilder() + .WithTopicGenerationStrategy(new TestParametersTopicGenerationStrategy()).Build())) + { + var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", MqttQualityOfServiceLevel.AtMostOnce, parameters); + + Assert.AreEqual("pong", Encoding.UTF8.GetString(response)); + } + } + } + [TestMethod] public Task Execute_Success_With_QoS_0() { @@ -222,5 +251,32 @@ public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context) }; } } + + class TestParametersTopicGenerationStrategy : IMqttRpcClientTopicGenerationStrategy + { + internal const string ExpectedParamName = "test_param_name"; + + public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context) + { + if (context.Parameters == null) + { + throw new InvalidOperationException("Parameters dictionary expected to be not null"); + } + + if (!context.Parameters.TryGetValue(ExpectedParamName, out var paramValue)) + { + throw new InvalidOperationException($"Parameter with name {ExpectedParamName} not present"); + } + + var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{context.MethodName}/{paramValue}"; + var responseTopic = requestTopic + "/response"; + + return new MqttRpcTopicPair + { + RequestTopic = requestTopic, + ResponseTopic = responseTopic + }; + } + } } } \ No newline at end of file