Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1592 bug of mqttbufferreader #1593

Merged
merged 7 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* [Core] Fixed a memory leak in _AsyncSignal_ implementation (#1586, thanks to @mario-zelger).
* [Core] Fixed an issue with bounds handling in _MqttBufferReader_ (#1593).
* [Server] Fix not properly reset statistics (#1587, thanks to @damikun).
* [Server] Now using an empty string as the sender client ID for injected application messages (#1583, thanks to @xljiulang).
43 changes: 43 additions & 0 deletions Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Text;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Formatter;

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[MemoryDiagnoser]
public class MqttBufferReaderBenchmark
{
byte[] _buffer;
int _bufferLength;

[GlobalSetup]
public void GlobalSetup()
{
var writer = new MqttBufferWriter(1024, 1024);
writer.WriteString("hgfjkdfkjlghfdjghdfljkdfhgdlkjfshgsldkfjghsdflkjghdsflkjhrstiuoghlkfjbhnfbutghjoiöjhklötnbhtroliöuhbjntluiobkjzbhtdrlskbhtruhjkfthgbkftgjhgfiklhotriuöhbjtrsioöbtrsötrhträhtrühjtriüoätrhjtsrölbktrbnhtrulöbionhströloubinströoliubhnsöotrunbtöroisntröointröioujhgötiohjgötorshjnbgtöorihbnjtröoihbjntröobntröoibntrjhötrohjbtröoihntröoibnrtoiöbtrjnboöitrhjtnriohötrhjtöroihjtroöihjtroösibntsroönbotöirsbntöoihjntröoihntroöbtrboöitrnhoöitrhjntröoishbnjtröosbhtröbntriohjtröoijtöoitbjöotibjnhöotirhbjntroöibhnjrtoöibnhtroöibnhtörsbnhtöoirbnhtöroibntoörhjnbträöbtrbträbtrbtirbätrsibohjntrsöiobthnjiohjsrtoib");

_buffer = writer.GetBuffer();
_bufferLength = writer.Length;
}

[Benchmark]
public void Use_Span()
{
var span = _buffer.AsSpan(0, _bufferLength);
Encoding.UTF8.GetString(span);
}

[Benchmark]
public void Use_Encoding()
{
Encoding.UTF8.GetString(_buffer, 0, _bufferLength);
}
}
}
6 changes: 5 additions & 1 deletion Source/MQTTnet.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static void Main(string[] args)
Console.WriteLine("d = UnsubscribeBenchmark");
Console.WriteLine("e = MessageDeliveryBenchmark");
Console.WriteLine("f = AsyncLockBenchmark");
Console.WriteLine("g = MqttBufferReaderBenchmark");

var pressedKey = Console.ReadKey(true);
switch (pressedKey.KeyChar)
Expand Down Expand Up @@ -79,9 +80,12 @@ public static void Main(string[] args)
case 'f':
BenchmarkRunner.Run<AsyncLockBenchmark>();
break;
case 'g':
BenchmarkRunner.Run<MqttBufferReaderBenchmark>();
break;
}

Console.ReadLine();
}
}
}
}
18 changes: 5 additions & 13 deletions Source/MQTTnet.Tests/Client/ManagedMqttClient_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ public async Task Receive_While_Not_Cleanly_Disconnected()

await managedClient.StartAsync(managedClientOptions);
await LongTestDelay();
await LongTestDelay();

// Send test data.
await senderClient.PublishStringAsync("topic1");
await LongTestDelay();
await LongTestDelay();

receivedMessages.AssertReceivedCountEquals(1);

Expand Down Expand Up @@ -329,13 +331,7 @@ public async Task Subscriptions_Are_Cleared_At_Logout()
await testEnvironment.StartServer().ConfigureAwait(false);

var sendingClient = await testEnvironment.ConnectClient().ConfigureAwait(false);
await sendingClient.PublishAsync(
new MqttApplicationMessage
{
Topic = "topic",
Payload = new byte[] { 1 },
Retain = true
});
await sendingClient.PublishStringAsync("topic", "A", retain: true);

// Wait a bit for the retained message to be available
await LongTestDelay();
Expand All @@ -357,18 +353,15 @@ await sendingClient.PublishAsync(

await managedClient.SubscribeAsync("topic");

await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder().WithClientOptions(clientOptions).WithAutoReconnectDelay(TimeSpan.FromSeconds(1)).Build());

await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder().WithClientOptions(clientOptions).WithAutoReconnectDelay(TimeSpan.FromSeconds(0.5)).Build());
await LongTestDelay();

Assert.AreEqual(1, receivedManagedMessages.Count);

await managedClient.StopAsync();

await LongTestDelay();

await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder().WithClientOptions(clientOptions).WithAutoReconnectDelay(TimeSpan.FromSeconds(1)).Build());

await managedClient.StartAsync(new ManagedMqttClientOptionsBuilder().WithClientOptions(clientOptions).WithAutoReconnectDelay(TimeSpan.FromSeconds(0.5)).Build());
await LongTestDelay();

// After reconnect and then some delay, the retained message must not be received,
Expand All @@ -377,7 +370,6 @@ await sendingClient.PublishAsync(

// Make sure that it gets received after subscribing again.
await managedClient.SubscribeAsync("topic");

await LongTestDelay();

Assert.AreEqual(2, receivedManagedMessages.Count);
Expand Down
110 changes: 110 additions & 0 deletions Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;

namespace MQTTnet.Tests.Formatter
{
[TestClass]
public sealed class MqttBufferReader_Tests
{
[TestMethod]
[ExpectedException(typeof(MqttProtocolViolationException), "Expected at least 4 bytes but there are only 3 bytes")]
public void Fire_Exception_If_Not_Enough_Data()
{
var buffer = new byte[] { 0, 1, 2 };

var reader = new MqttBufferReader();

reader.SetBuffer(buffer, 0, 3);

// 1 byte is missing.
reader.ReadFourByteInteger();
}

[TestMethod]
[ExpectedException(typeof(MqttProtocolViolationException), "Expected at least 4 bytes but there are only 3 bytes")]
public void Fire_Exception_If_Not_Enough_Data_With_Longer_Buffer()
{
var buffer = new byte[] { 0, 1, 2, 3, 4, 5, 6 };

var reader = new MqttBufferReader();

reader.SetBuffer(buffer, 0, 3);

// 1 byte is missing.
reader.ReadFourByteInteger();
}

[TestMethod]
public void Is_EndOfStream_Without_Buffer()
{
var reader = new MqttBufferReader();

Assert.IsTrue(reader.EndOfStream);
Assert.AreEqual(0, reader.BytesLeft);
}

[TestMethod]
public void Read_Remaining_Data_From_Larger_Buffer()
{
var buffer = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };

var reader = new MqttBufferReader();

// The used buffer contains more data than used!
reader.SetBuffer(buffer, 0, 5);

// This should only read 5 bytes even if more data is in the buffer
// due to custom bounds.
var remainingData = reader.ReadRemainingData();

Assert.IsTrue(reader.EndOfStream);
Assert.AreEqual(0, reader.BytesLeft);
Assert.AreEqual(5, remainingData.Length);
}

[TestMethod]
public void Report_Correct_Length_For_Full_Buffer()
{
var buffer = new byte[] { 5, 6, 7, 8, 9 };

var reader = new MqttBufferReader();
reader.SetBuffer(buffer, 0, 5);

Assert.IsFalse(reader.EndOfStream);
Assert.AreEqual(5, reader.BytesLeft);
}

[TestMethod]
public void Report_Correct_Length_For_Partial_End_Buffer()
{
var buffer = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };

var reader = new MqttBufferReader();

// The used buffer contains more data than used!
reader.SetBuffer(buffer, 5, 5);

Assert.IsFalse(reader.EndOfStream);
Assert.AreEqual(5, reader.BytesLeft);
}

[TestMethod]
public void Report_Correct_Length_For_Partial_Start_Buffer()
{
var buffer = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };

var reader = new MqttBufferReader();

// The used buffer contains more data than used!
reader.SetBuffer(buffer, 0, 5);

Assert.IsFalse(reader.EndOfStream);
Assert.AreEqual(5, reader.BytesLeft);
}
}
}
4 changes: 2 additions & 2 deletions Source/MQTTnet.Tests/MQTTv5/Server_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public async Task Will_Message_Send()
await c1.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic("#").Build());

var c2 = await testEnvironment.ConnectClient(clientOptions);
c2.Dispose(); // Dispose will not send a DISCONNECT pattern first so the will message must be sent.
c2.Dispose(); // Dispose will not send a DISCONNECT packet first so the will message must be sent.

await Task.Delay(1000);
await LongTestDelay();

Assert.AreEqual(1, receivedMessagesCount);
}
Expand Down
33 changes: 27 additions & 6 deletions Source/MQTTnet.Tests/Protocol_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,48 @@
namespace MQTTnet.Tests
{
[TestClass]
public class Protocol_Tests
public sealed class Protocol_Tests
{
[TestMethod]
public void Encode_Four_Byte_Integer()
{
var writer = new MqttBufferWriter(4, 4);

for (uint value = 0; value < 268435455; value++)
{
writer.WriteVariableByteInteger(value);

var buffer = writer.GetBuffer();

var reader = new MqttBufferReader();
reader.SetBuffer(buffer, 0, writer.Length);
var checkValue = reader.ReadVariableByteInteger();

Assert.AreEqual(value, checkValue);


writer.Reset(0);
}
}

[TestMethod]
public void Encode_Two_Byte_Integer()
{
var writer = new MqttBufferWriter(2, 2);

for (ushort value = 0; value < ushort.MaxValue; value++)
{
writer.WriteTwoByteInteger(value);

var buffer = writer.GetBuffer();

var reader = new MqttBufferReader();
reader.SetBuffer(buffer, 0, writer.Length);
var checkValue = reader.ReadTwoByteInteger();

Assert.AreEqual(value, checkValue);

writer.Reset(0);
}
}
}
}
}
2 changes: 2 additions & 0 deletions Source/MQTTnet.Tests/Server/QoS_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public async Task Preserve_Message_Order_For_Queued_Messages()
var dummyClient = await testEnvironment.ConnectClient(o => o.WithClientId("A").WithCleanSession(false));
await dummyClient.SubscribeAsync("#", MqttQualityOfServiceLevel.AtLeastOnce);
dummyClient.Dispose();

await LongTestDelay();

// Now inject messages which are appended to the queue of the client.
await server.InjectApplicationMessage("T", "0", qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce);
Expand Down
Loading