Skip to content

Commit

Permalink
Avoid to call MqttPacketBuffer.Join() when SendPacketAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
xljiulang committed Nov 24, 2022
1 parent e88b280 commit a2127c6
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@
using MQTTnet.AspNetCore.Client.Tcp;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.Packets;
using System;
using System.IO.Pipelines;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Internal;

namespace MQTTnet.AspNetCore
{
public sealed class MqttConnectionContext : IMqttChannelAdapter
{
readonly AsyncLock _writerLock = new AsyncLock();

PipeReader _input;
PipeWriter _output;

public MqttConnectionContext(MqttPacketFormatterAdapter packetFormatterAdapter, ConnectionContext connection)
{
PacketFormatterAdapter = packetFormatterAdapter ?? throw new ArgumentNullException(nameof(packetFormatterAdapter));
Expand Down Expand Up @@ -62,7 +62,7 @@ public string Endpoint
public X509Certificate2 ClientCertificate => Http?.HttpContext?.Connection?.ClientCertificate;

public ConnectionContext Connection { get; }

public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

public long BytesSent { get; set; }
Expand Down Expand Up @@ -169,19 +169,17 @@ public void ResetStatistics()

public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellationToken)
{
var formatter = PacketFormatterAdapter;

using (await _writerLock.EnterAsync(cancellationToken).ConfigureAwait(false))
{
var buffer = formatter.Encode(packet);
var msg = buffer.Join().AsMemory();
var output = _output;
var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
if (result.IsCompleted)
var buffer = PacketFormatterAdapter.Encode(packet);
await _output.WriteAsync(buffer.Packet, cancellationToken).ConfigureAwait(false);

if (buffer.Payload.Count > 0)
{
BytesSent += msg.Length;
await _output.WriteAsync(buffer.Payload, cancellationToken).ConfigureAwait(false);
}

BytesSent += buffer.Length;
PacketFormatterAdapter.Cleanup();
}
}
Expand Down

0 comments on commit a2127c6

Please sign in to comment.