Skip to content

Commit

Permalink
feat: upgrade to MQTTnet 4.2.0.706
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed May 9, 2023
1 parent c80f652 commit 3e27d13
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>4.2.2$(BaseVersionSuffix)</BaseVersion>
<BaseVersion>4.3.0$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
</PropertyGroup>
Expand Down
4 changes: 3 additions & 1 deletion docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ uid: releases

# Releases

## [4.2.2](https://github.com/BEagle1984/silverback/releases/tag/v4.2.2)
## [4.3.0](https://github.com/BEagle1984/silverback/releases/tag/v4.3.0)

### What's new

* Upgrade to [MQTTnet 4.2.0.706](https://github.com/chkr1011/MQTTnet/releases/tag/v4.2.0.706)
* Upgrade to [Confluent.Kafka 2.1.1](https://github.com/confluentinc/confluent-kafka-dotnet/releases/tag/v2.1.1)
* Upgrade to Newtonsoft.Json 13.0.3
* Add `ReasonCode` and `ReasonString` to `MqttProduceException` message

## [4.2.1](https://github.com/BEagle1984/silverback/releases/tag/v4.2.1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Packets;
using Silverback.Messaging.Configuration.Mqtt;
using Silverback.Util;

Expand Down Expand Up @@ -55,11 +56,10 @@ public MockedMqttClient(IInMemoryMqttBroker broker, IMockedMqttOptions mockOptio
/// <inheritdoc cref="IMqttClient.DisconnectedAsync" />
public event Func<MqttClientDisconnectedEventArgs, Task>? DisconnectedAsync;

#pragma warning disable CS0067
/// <inheritdoc cref="IMqttClient.InspectPackage" />
public event Func<InspectMqttPacketEventArgs, Task>? InspectPackage;
#pragma warning disable CS0067 // Event is never used
/// <inheritdoc cref="IMqttClient.InspectPacketAsync" />
public event Func<InspectMqttPacketEventArgs, Task>? InspectPacketAsync;
#pragma warning restore CS0067

/// <summary>
/// Gets a value indicating whether the client is connected and a message handler is bound to it.
/// </summary>
Expand Down Expand Up @@ -116,7 +116,17 @@ public async Task DisconnectAsync(MqttClientDisconnectOptions options, Cancellat
IsConnected = false;

if (DisconnectedAsync != null)
await DisconnectedAsync.Invoke(new MqttClientDisconnectedEventArgs()).ConfigureAwait(false);
{
var eventArgs = new MqttClientDisconnectedEventArgs(
true,
new MqttClientConnectResult(),
MqttClientDisconnectReason.NormalDisconnection,
string.Empty,
null,
null);

await DisconnectedAsync.Invoke(eventArgs).ConfigureAwait(false);
}
}

/// <inheritdoc cref="IMqttClient.SubscribeAsync" />
Expand All @@ -129,7 +139,12 @@ public Task<MqttClientSubscribeResult> SubscribeAsync(

_broker.Subscribe(ClientId, options.TopicFilters.Select(filter => filter.Topic).ToList());

return Task.FromResult(new MqttClientSubscribeResult());
var result = new MqttClientSubscribeResult(
42,
options.TopicFilters.Select(filter => new MqttClientSubscribeResultItem(filter, MqttClientSubscribeResultCode.GrantedQoS0)).ToArray(),
string.Empty,
Array.Empty<MqttUserProperty>());
return Task.FromResult(result);
}

/// <inheritdoc cref="IMqttClient.UnsubscribeAsync" />
Expand All @@ -142,7 +157,12 @@ public Task<MqttClientUnsubscribeResult> UnsubscribeAsync(

_broker.Unsubscribe(ClientId, options.TopicFilters);

return Task.FromResult(new MqttClientUnsubscribeResult());
var result = new MqttClientUnsubscribeResult(
42,
options.TopicFilters.Select(filter => new MqttClientUnsubscribeResultItem(filter, MqttClientUnsubscribeResultCode.Success)).ToArray(),
string.Empty,
Array.Empty<MqttUserProperty>());
return Task.FromResult(result);
}

/// <inheritdoc cref="IMqttClient.PublishAsync" />
Expand All @@ -158,10 +178,7 @@ public async Task<MqttClientPublishResult> PublishAsync(

await _broker.PublishAsync(ClientId, applicationMessage, Options).ConfigureAwait(false);

return new MqttClientPublishResult
{
ReasonCode = MqttClientPublishReasonCode.Success
};
return new MqttClientPublishResult(null, MqttClientPublishReasonCode.Success, string.Empty, Array.Empty<MqttUserProperty>());
}

/// <inheritdoc cref="IMqttClient.PingAsync" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ internal async Task HandleMessageAsync(ConsumedApplicationMessage message)
throw new InvalidOperationException("The message has been processed already.");

await HandleMessageAsync(
message.ApplicationMessage.Payload,
message.ApplicationMessage.PayloadSegment.ToArray(),
headers,
message.ApplicationMessage.Topic,
new MqttMessageIdentifier(Endpoint.Configuration.ClientId, message.Id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private async Task PublishToTopicAsync(
var mqttApplicationMessage = new MqttApplicationMessage
{
Topic = queuedMessage.ActualEndpointName,
Payload = queuedMessage.MessageBytes,
PayloadSegment = queuedMessage.MessageBytes,
QualityOfServiceLevel = Endpoint.QualityOfServiceLevel,
Retain = Endpoint.Retain,
MessageExpiryInterval = Endpoint.MessageExpiryInterval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ This package contains an implementation of Silverback.Integration for MQTT.</Des
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MQTTnet" Version="4.1.4.563" />
<PackageReference Include="MQTTnet" Version="4.2.0.706" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ public async Task MovePolicy_ToOtherTopic_MessageMoved()

var otherTopicMessages = Helper.GetMessages("e2e/other");
otherTopicMessages.Count.Should().Be(1);
otherTopicMessages[0].Payload.Should()
otherTopicMessages[0].PayloadSegment.ToArray().Should()
.BeEquivalentTo(Helper.Spy.InboundEnvelopes[0].RawMessage.ReReadAll());
}

Expand Down Expand Up @@ -643,7 +643,7 @@ public async Task MovePolicy_ToOtherTopicAfterRetry_MessageRetriedAndMoved()

var otherTopicMessages = Helper.GetMessages("e2e/other");
otherTopicMessages.Count.Should().Be(1);
otherTopicMessages[0].Payload.Should()
otherTopicMessages[0].PayloadSegment.ToArray().Should()
.BeEquivalentTo(Helper.Spy.InboundEnvelopes[0].RawMessage.ReReadAll());
}
}
Expand Down

0 comments on commit 3e27d13

Please sign in to comment.