Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 27592f1
Author: Christian <[email protected]>
Date:   Sat Jan 8 15:36:35 2022 +0100

    Update MQTTnet.nuspec

commit c6f4b59
Author: Christian <[email protected]>
Date:   Sat Jan 8 15:35:47 2022 +0100

    Increase delay for Keep Alive checks to decrease CPU load.

commit b362b2a
Author: Christian <[email protected]>
Date:   Sat Jan 8 14:35:25 2022 +0100

    Make logger parameters generic.

commit 7a72a8d
Author: Christian <[email protected]>
Date:   Sat Jan 8 14:07:57 2022 +0100

    Update MQTTnet.nuspec

commit bd40e10
Author: Günther Foidl <[email protected]>
Date:   Sat Jan 8 14:00:39 2022 +0100

    Elide repeated Action-delegate allocation by caching it (#1324)

    * Elide repeated Action-delegate allocation by caching it

    Here C# compiler has too little knowledge about the usage of the delegate, we know better so cache it on our own to avoid the repeated allocation.

    * Use conditional compilation

commit 4b44a2b
Author: Christian <[email protected]>
Date:   Fri Jan 7 18:01:03 2022 +0100

    Remove appveyor.

commit 215f24a
Author: Christian <[email protected]>
Date:   Fri Jan 7 17:52:16 2022 +0100

    Create CODE-OF-CONDUCT.md
  • Loading branch information
chkr1011 committed Jan 8, 2022
1 parent 9489ec0 commit 69fd82e
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 34 deletions.
4 changes: 2 additions & 2 deletions Source/MQTTnet/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ async Task TrySendKeepAliveMessagesAsync(CancellationToken cancellationToken)
_logger.Verbose("Start sending keep alive packets.");

var keepAlivePeriod = Options.KeepAlivePeriod;

while (!cancellationToken.IsCancellationRequested)
{
// Values described here: [MQTT-3.1.2-24].
Expand All @@ -504,7 +504,7 @@ async Task TrySendKeepAliveMessagesAsync(CancellationToken cancellationToken)
// due to some edge cases and was buggy in the past. Now we wait several ms because the
// min keep alive value is one second so that the server will wait 1.5 seconds for a PING
// packet.
await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken).ConfigureAwait(false);
await Task.Delay(250, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Diagnostics/Logger/IMqttNetLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ namespace MQTTnet.Diagnostics
{
public interface IMqttNetLogger
{
bool IsEnabled { get; }

void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception);
}
}
2 changes: 2 additions & 0 deletions Source/MQTTnet/Diagnostics/Logger/MqttNetEventLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public MqttNetEventLogger(string logId = null)

public string LogId { get; }

public bool IsEnabled { get; set; } = true;

public void Publish(MqttNetLogLevel level, string source, string message, object[] parameters, Exception exception)
{
var eventHandler = LogMessagePublished;
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Diagnostics/Logger/MqttNetNullLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace MQTTnet.Diagnostics
/// </summary>
public sealed class MqttNetNullLogger : IMqttNetLogger
{
public bool IsEnabled { get; }

public void Publish(MqttNetLogLevel logLevel, string source, string message, object[] parameters, Exception exception)
{
}
Expand Down
2 changes: 2 additions & 0 deletions Source/MQTTnet/Diagnostics/Logger/MqttNetSourceLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public MqttNetSourceLogger(IMqttNetLogger logger, string source)
_source = source;
}

public bool IsEnabled => _logger.IsEnabled;

public void Publish(MqttNetLogLevel logLevel, string message, object[] parameters, Exception exception)
{
_logger.Publish(logLevel, _source, message, parameters, exception);
Expand Down
140 changes: 125 additions & 15 deletions Source/MQTTnet/Diagnostics/Logger/MqttNetSourceLoggerExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,176 @@ namespace MQTTnet.Diagnostics
{
public static class MqttNetSourceLoggerExtensions
{
/*
* The logger uses generic parameters in order to avoid boxing of parameter values like integers etc.
*/

public static MqttNetSourceLogger WithSource(this IMqttNetLogger logger, string source)
{
if (logger == null) throw new ArgumentNullException(nameof(logger));

return new MqttNetSourceLogger(logger, source);
}
public static void Verbose(this MqttNetSourceLogger logger, string message, params object[] parameters)

public static void Verbose<TParameter1>(this MqttNetSourceLogger logger, string message, TParameter1 parameter1)
{
logger.Publish(MqttNetLogLevel.Verbose, message, parameters, null);
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Verbose, message, new object[] {parameter1}, null);
}


public static void Verbose<TParameter1, TParameter2>(this MqttNetSourceLogger logger, string message, TParameter1 parameter1, TParameter2 parameter2)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Verbose, message, new object[] {parameter1, parameter2}, null);
}

public static void Verbose<TParameter1, TParameter2, TParameter3>(this MqttNetSourceLogger logger, string message, TParameter1 parameter1, TParameter2 parameter2,
TParameter3 parameter3)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Verbose, message, new object[] {parameter1, parameter2, parameter3}, null);
}

public static void Verbose(this MqttNetSourceLogger logger, string message)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Verbose, message, null, null);
}

public static void Info(this MqttNetSourceLogger logger, string message, params object[] parameters)
public static void Info<TParameter1>(this MqttNetSourceLogger logger, string message, TParameter1 parameter1)
{
logger.Publish(MqttNetLogLevel.Info, message, parameters, null);
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Info, message, new object[] {parameter1}, null);
}

public static void Info<TParameter1, TParameter2>(this MqttNetSourceLogger logger, string message, TParameter1 parameter1, TParameter2 parameter2)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Info, message, new object[] {parameter1, parameter2}, null);
}

public static void Info(this MqttNetSourceLogger logger, string message)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Info, message, null, null);
}

public static void Warning(this MqttNetSourceLogger logger, Exception exception, string message, params object[] parameters)
public static void Warning<TParameter1>(this MqttNetSourceLogger logger, Exception exception, string message, TParameter1 parameter1)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Warning, message, new object[] {parameter1}, exception);
}

public static void Warning<TParameter1, TParameter2>(this MqttNetSourceLogger logger, Exception exception, string message, TParameter1 parameter1, TParameter2 parameter2)
{
logger.Publish(MqttNetLogLevel.Warning, message, parameters, exception);
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Warning, message, new object[] {parameter1, parameter2}, exception);
}

public static void Warning(this MqttNetSourceLogger logger, Exception exception, string message)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Warning, message, null, exception);
}

public static void Warning(this MqttNetSourceLogger logger, string message, params object[] parameters)
public static void Warning<TParameter1>(this MqttNetSourceLogger logger, string message, TParameter1 parameter1)
{
logger.Publish(MqttNetLogLevel.Warning, message, parameters, null);
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Warning, message, new object[] {parameter1}, null);
}

public static void Warning(this MqttNetSourceLogger logger, string message)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Warning, message, null, null);
}

public static void Error(this MqttNetSourceLogger logger, Exception exception, string message, params object[] parameters)
public static void Error<TParameter1>(this MqttNetSourceLogger logger, Exception exception, string message, TParameter1 parameter1)
{
logger.Publish(MqttNetLogLevel.Error, message, parameters, exception);
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Error, message, new object[] {parameter1}, exception);
}

public static void Error<TParameter1, TParameter2>(this MqttNetSourceLogger logger, Exception exception, string message, TParameter1 parameter1, TParameter2 parameter2)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Error, message, new object[] {parameter1, parameter2}, exception);
}

public static void Error(this MqttNetSourceLogger logger, Exception exception, string message)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Error, message, null, exception);
}

public static void Error(this MqttNetSourceLogger logger, string message)
{
if (!logger.IsEnabled)
{
return;
}

logger.Publish(MqttNetLogLevel.Error, message, null, null);
}
}
}
}
10 changes: 8 additions & 2 deletions Source/MQTTnet/Implementations/CrossPlatformSocket.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
Expand All @@ -11,25 +11,31 @@ namespace MQTTnet.Implementations
public sealed class CrossPlatformSocket : IDisposable
{
readonly Socket _socket;
readonly Action _socketDisposeAction;

NetworkStream _networkStream;

public CrossPlatformSocket(AddressFamily addressFamily)
{
_socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp);
_socketDisposeAction = _socket.Dispose;
}

public CrossPlatformSocket()
{
// Having this constructor is important because avoiding the address family as parameter
// will make use of dual mode in the .net framework.
_socket = new Socket(SocketType.Stream, ProtocolType.Tcp);

_socketDisposeAction = _socket.Dispose;
}

public CrossPlatformSocket(Socket socket)
{
_socket = socket ?? throw new ArgumentNullException(nameof(socket));
_networkStream = new NetworkStream(socket, true);

_socketDisposeAction = _socket.Dispose;
}

public bool NoDelay
Expand Down Expand Up @@ -112,7 +118,7 @@ public async Task ConnectAsync(string host, int port, CancellationToken cancella
_networkStream?.Dispose();

// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(() => _socket.Dispose()))
using (cancellationToken.Register(_socketDisposeAction))
{
cancellationToken.ThrowIfCancellationRequested();

Expand Down
36 changes: 25 additions & 11 deletions Source/MQTTnet/Implementations/MqttTcpChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@ public sealed class MqttTcpChannel : IMqttChannel
{
readonly IMqttClientOptions _clientOptions;
readonly MqttClientTcpOptions _tcpOptions;
readonly Action _disposeAction;

Stream _stream;

public MqttTcpChannel(IMqttClientOptions clientOptions)
public MqttTcpChannel()
{
_disposeAction = Dispose;
}

public MqttTcpChannel(IMqttClientOptions clientOptions) : this()
{
_clientOptions = clientOptions ?? throw new ArgumentNullException(nameof(clientOptions));
_tcpOptions = (MqttClientTcpOptions) clientOptions.ChannelOptions;

IsSecureConnection = clientOptions.ChannelOptions?.TlsOptions?.UseTls == true;
}

public MqttTcpChannel(Stream stream, string endpoint, X509Certificate2 clientCertificate)
public MqttTcpChannel(Stream stream, string endpoint, X509Certificate2 clientCertificate) : this()
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));

Expand Down Expand Up @@ -150,11 +156,15 @@ public async Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancellat
return 0;
}

#if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
return await stream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false);
#else
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose))
using (cancellationToken.Register(_disposeAction))
{
return await stream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
#endif
}
catch (ObjectDisposedException)
{
Expand All @@ -178,18 +188,22 @@ public async Task WriteAsync(byte[] buffer, int offset, int count, CancellationT

try
{
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(Dispose))
{
var stream = _stream;
var stream = _stream;

if (stream == null)
{
throw new MqttCommunicationException("The TCP connection is closed.");
}
if (stream == null)
{
throw new MqttCommunicationException("The TCP connection is closed.");
}

#if NETCOREAPP3_0_OR_GREATER || NETSTANDARD2_1_OR_GREATER
await stream.WriteAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false);
#else
// Workaround for: https://github.com/dotnet/corefx/issues/24430
using (cancellationToken.Register(_disposeAction))
{
await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
}
#endif
}
catch (ObjectDisposedException)
{
Expand Down
4 changes: 2 additions & 2 deletions Tests/MQTTnet.Core.Tests/Logger/Logger_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void Root_Log_Messages()

childLogger.Verbose("Verbose");
childLogger.Info("Info");
childLogger.Warning(null, "Warning");
childLogger.Warning((Exception)null, "Warning");
childLogger.Error(null, "Error");

Assert.AreEqual(4, logMessagesCount);
Expand All @@ -55,7 +55,7 @@ public void Use_Custom_Log_Id()

childLogger.Verbose("Verbose");
childLogger.Info("Info");
childLogger.Warning(null, "Warning");
childLogger.Warning((Exception)null, "Warning");
childLogger.Error(null, "Error");
}
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/MQTTnet.Core.Tests/Logger/SourceLogger_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void Log_With_Source()
};

var sourceLogger = logger.WithSource("The_Source");
sourceLogger.Info("MESSAGE", null, null);
sourceLogger.Info("MESSAGE", (object)null, (object)null);

Assert.AreEqual("The_Source", logMessage.Source);
}
Expand Down
Loading

0 comments on commit 69fd82e

Please sign in to comment.