Skip to content

Commit

Permalink
Merge develop.
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Oct 6, 2018
2 parents 03bef4c + c8b3066 commit ffc4f14
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Build/MQTTnet.AspNetCore.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.3" />
<dependency id="MQTTnet" version="2.8.4" />
<dependency id="Microsoft.AspNetCore.Connections.Abstractions" version="2.1.0" />
<dependency id="Microsoft.AspNetCore.WebSockets" version="2.0.1" />
<dependency id="Microsoft.Extensions.Hosting.Abstractions" version="2.0.1" />
Expand Down
2 changes: 1 addition & 1 deletion Build/MQTTnet.Extensions.ManagedClient.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.3" />
<dependency id="MQTTnet" version="2.8.4" />
</dependencies>
</metadata>

Expand Down
2 changes: 1 addition & 1 deletion Build/MQTTnet.Extensions.Rpc.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
<dependencies>
<dependency id="MQTTnet" version="2.8.3" />
<dependency id="MQTTnet" version="2.8.4" />
</dependencies>
</metadata>

Expand Down
5 changes: 1 addition & 4 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Core] Added all factory methods to the factory interface.
* [Core] Fixed an issue with cancellation token handling (thanks to @acrabb).
* [Server] Added a new overload for configuring the ASP.net integration (thanks to @JanEggers).
* [Server] Added a method for clearing all retained messages.
<releaseNotes>* [Client] Fixed a deadlock when an exception is fired while connecting (thanks to @malibVB).
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
Expand Down
6 changes: 5 additions & 1 deletion Source/MQTTnet/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, Cancellat

return packet;
}
catch (OperationCanceledException)
{
}
catch (Exception exception)
{
if (IsWrappedException(exception))
Expand Down Expand Up @@ -237,7 +240,8 @@ private static void WrapException(Exception exception)
{
if (exception is IOException && exception.InnerException is SocketException socketException)
{
if (socketException.SocketErrorCode == SocketError.ConnectionAborted)
if (socketException.SocketErrorCode == SocketError.ConnectionAborted ||
socketException.SocketErrorCode == SocketError.OperationAborted)
{
throw new OperationCanceledException();
}
Expand Down
57 changes: 35 additions & 22 deletions Source/MQTTnet/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class MqttClient : IMqttClient
internal Task _keepAliveMessageSenderTask;
private IMqttChannelAdapter _adapter;
private bool _cleanDisconnectInitiated;
private TaskCompletionSource<bool> _disconnectReason;
private int _disconnectGate;

public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
{
Expand All @@ -54,12 +54,12 @@ public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions optio

try
{
_cancellationTokenSource = new CancellationTokenSource();
_disconnectReason = new TaskCompletionSource<bool>();
_options = options;
_packetIdentifierProvider.Reset();
_packetDispatcher.Reset();

_cancellationTokenSource = new CancellationTokenSource();
_disconnectGate = 0;
_adapter = _adapterFactory.CreateClientAdapter(options, _logger);

_logger.Verbose($"Trying to connect with server ({_options.ChannelOptions}).");
Expand Down Expand Up @@ -87,10 +87,12 @@ public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions optio
catch (Exception exception)
{
_logger.Error(exception, "Error while connecting with server.");
if (_disconnectReason.TrySetException(exception))

if (!DisconnectIsPending())
{
await DisconnectInternalAsync(null, exception).ConfigureAwait(false);
}

throw;
}
}
Expand All @@ -108,7 +110,7 @@ public async Task DisconnectAsync()
}
finally
{
if (_disconnectReason.TrySetCanceled())
if (!DisconnectIsPending())
{
await DisconnectInternalAsync(null, null).ConfigureAwait(false);
}
Expand Down Expand Up @@ -183,7 +185,7 @@ public Task PublishAsync(MqttApplicationMessage applicationMessage)

public void Dispose()
{
_cancellationTokenSource?.Cancel (false);
_cancellationTokenSource?.Cancel(false);
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;

Expand Down Expand Up @@ -224,9 +226,10 @@ private void ThrowIfConnected(string message)

private async Task DisconnectInternalAsync(Task sender, Exception exception)
{
InitiateDisconnect();

var clientWasConnected = IsConnected;

InitiateDisconnect();

IsConnected = false;

try
Expand All @@ -247,7 +250,7 @@ private async Task DisconnectInternalAsync(Task sender, Exception exception)
}
finally
{
Dispose ();
Dispose();
_cleanDisconnectInitiated = false;

_logger.Info("Disconnected.");
Expand All @@ -261,16 +264,16 @@ private void InitiateDisconnect()
{
try
{
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
if (_cancellationTokenSource?.IsCancellationRequested == true)
{
return;
}

_cancellationTokenSource.Cancel(false);
_cancellationTokenSource?.Cancel(false);
}
catch (Exception adapterException)
catch (Exception exception)
{
_logger.Warning(adapterException, "Error while initiating disconnect.");
_logger.Warning(exception, "Error while initiating disconnect.");
}
}
}
Expand Down Expand Up @@ -358,7 +361,7 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke
_logger.Error(exception, "Unhandled exception while sending/receiving keep alive packets.");
}

if (_disconnectReason.TrySetException(exception))
if (!DisconnectIsPending())
{
await DisconnectInternalAsync(_keepAliveMessageSenderTask, exception).ConfigureAwait(false);
}
Expand All @@ -377,8 +380,10 @@ private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
if (packet != null)
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken)
.ConfigureAwait(false);

if (packet != null && !cancellationToken.IsCancellationRequested)
{
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
}
Expand All @@ -393,7 +398,6 @@ private async Task ReceivePacketsAsync(CancellationToken cancellationToken)

if (exception is OperationCanceledException)
{
_logger.Verbose ("MQTT OperationCanceled exception while receiving packets.");
}
else if (exception is MqttCommunicationException)
{
Expand All @@ -406,7 +410,7 @@ private async Task ReceivePacketsAsync(CancellationToken cancellationToken)

_packetDispatcher.Dispatch(exception);

if (_disconnectReason.TrySetException(exception))
if (!DisconnectIsPending())
{
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
}
Expand Down Expand Up @@ -492,16 +496,20 @@ private async Task PublishExactlyOnce(MqttPublishPacket publishPacket, Cancellat

private void StartReceivingPackets(CancellationToken cancellationToken)
{
_packetReceiverTask = Task.Run(
_packetReceiverTask = Task.Factory.StartNew(
() => ReceivePacketsAsync(cancellationToken),
cancellationToken);
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}

private void StartSendingKeepAliveMessages(CancellationToken cancellationToken)
{
_keepAliveMessageSenderTask = Task.Run(
_keepAliveMessageSenderTask = Task.Factory.StartNew(
() => SendKeepAliveMessagesAsync(cancellationToken),
cancellationToken);
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}

private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
Expand Down Expand Up @@ -537,5 +545,10 @@ private static async Task WaitForTaskAsync(Task task, Task sender)
{
}
}

private bool DisconnectIsPending()
{
return Interlocked.CompareExchange(ref _disconnectGate, 1, 0) != 0;
}
}
}
59 changes: 52 additions & 7 deletions Tests/MQTTnet.Core.Tests/MqttServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Adapter;
using MQTTnet.Implementations;

namespace MQTTnet.Core.Tests
Expand Down Expand Up @@ -552,6 +553,51 @@ public async Task MqttServer_Body()
Assert.IsTrue(bodyIsMatching);
}

[TestMethod]
public async Task MqttServer_ConnectionDenied()
{
var server = new MqttFactory().CreateMqttServer();
var client = new MqttFactory().CreateMqttClient();

try
{
var options = new MqttServerOptionsBuilder().WithConnectionValidator(context =>
{
context.ReturnCode = MqttConnectReturnCode.ConnectionRefusedNotAuthorized;
}).Build();

await server.StartAsync(options);


var clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost").Build();

try
{
await client.ConnectAsync(clientOptions);
Assert.Fail("An exception should be raised.");
}
catch (Exception exception)
{
if (exception is MqttConnectingFailedException)
{

}
else
{
Assert.Fail("Wrong exception.");
}
}
}
finally
{
await client.DisconnectAsync();
await server.StopAsync();

client.Dispose();
}
}

[TestMethod]
public async Task MqttServer_SameClientIdConnectDisconnectEventOrder()
{
Expand Down Expand Up @@ -623,25 +669,24 @@ private static async Task TestPublishAsync(
MqttQualityOfServiceLevel filterQualityOfServiceLevel,
int expectedReceivedMessagesCount)
{
var serverAdapter = new TestMqttServerAdapter();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
var s = new MqttFactory().CreateMqttServer();

var receivedMessagesCount = 0;
try
{
await s.StartAsync(new MqttServerOptions());

var c1 = await serverAdapter.ConnectTestClient("c1");
var c2 = await serverAdapter.ConnectTestClient("c2");

var c1 = new MqttFactory().CreateMqttClient();
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;

await c1.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build());

var c2 = new MqttFactory().CreateMqttClient();
await c2.ConnectAsync(new MqttClientOptionsBuilder().WithTcpServer("localhost").Build());
await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel));

await Task.Delay(500);
await c1.UnsubscribeAsync(topicFilter);

await Task.Delay(500);
}
finally
Expand Down

0 comments on commit ffc4f14

Please sign in to comment.