Skip to content

Commit

Permalink
Merge branch 'main' into dbulfin/preview
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanbulfinMS committed Mar 1, 2022
2 parents 52d5542 + ea504ae commit e861cb4
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Track1 .NET Azure IoT Hub and DPS SDKs

* @drwill-ms @timtay-microsoft @abhipsaMisra @azabbasi @jamdavi @andyk-ms @brycewang-microsoft
* @drwill-ms @timtay-microsoft @abhipsaMisra @azabbasi @andyk-ms @brycewang-microsoft
18 changes: 9 additions & 9 deletions iothub/device/src/DeviceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public void SetRetryPolicy(IRetryPolicy retryPolicy)
/// </summary>
/// <remarks>
/// You cannot reject or abandon messages over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <param name="cancellationToken">A cancellation token to cancel the operation.</param>
/// <exception cref="IotHubCommunicationException">Thrown when the operation has been canceled. The inner exception will be <see cref="OperationCanceledException"/>.</exception>
Expand All @@ -333,7 +333,7 @@ public void SetRetryPolicy(IRetryPolicy retryPolicy)
/// </summary>
/// <remarks>
/// You cannot reject or abandon messages over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <returns>The received message or null if there was no message until the specified time has elapsed.</returns>
public Task<Message> ReceiveAsync(TimeSpan timeout) => InternalClient.ReceiveAsync(timeout);
Expand Down Expand Up @@ -383,7 +383,7 @@ public Task SetReceiveMessageHandlerAsync(ReceiveMessageCallback messageHandler,
/// </summary>
/// <remarks>
/// You cannot abandon a message over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <param name="lockToken">The message lockToken.</param>
public Task AbandonAsync(string lockToken) => InternalClient.AbandonAsync(lockToken);
Expand All @@ -393,7 +393,7 @@ public Task SetReceiveMessageHandlerAsync(ReceiveMessageCallback messageHandler,
/// </summary>
/// <remarks>
/// You cannot abandon a message over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <param name="lockToken">The message lockToken.</param>
/// <param name="cancellationToken">A cancellation token to cancel the operation.</param>
Expand All @@ -405,7 +405,7 @@ public Task SetReceiveMessageHandlerAsync(ReceiveMessageCallback messageHandler,
/// </summary>
/// <remarks>
/// You cannot abandon a message over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <param name="message">The message to abandon.</param>
public Task AbandonAsync(Message message) => InternalClient.AbandonAsync(message);
Expand All @@ -415,7 +415,7 @@ public Task SetReceiveMessageHandlerAsync(ReceiveMessageCallback messageHandler,
/// </summary>
/// <remarks>
/// You cannot abandon a message over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <param name="message">The message to abandon.</param>
/// <param name="cancellationToken">A cancellation token to cancel the operation.</param>
Expand All @@ -427,7 +427,7 @@ public Task SetReceiveMessageHandlerAsync(ReceiveMessageCallback messageHandler,
/// </summary>
/// <remarks>
/// You cannot reject a message over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <param name="lockToken">The message lockToken.</param>
public Task RejectAsync(string lockToken) => InternalClient.RejectAsync(lockToken);
Expand All @@ -449,7 +449,7 @@ public Task SetReceiveMessageHandlerAsync(ReceiveMessageCallback messageHandler,
/// </summary>
/// <remarks>
/// You cannot reject a message over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <param name="message">The message.</param>
public Task RejectAsync(Message message) => InternalClient.RejectAsync(message);
Expand All @@ -459,7 +459,7 @@ public Task SetReceiveMessageHandlerAsync(ReceiveMessageCallback messageHandler,
/// </summary>
/// <remarks>
/// You cannot reject a message over MQTT protocol.
/// For more details, see https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// For more details, see https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-c2d#the-cloud-to-device-message-life-cycle.
/// </remarks>
/// <param name="message">The message to reject.</param>
/// <param name="cancellationToken">A cancellation token to cancel the operation.</param>
Expand Down
3 changes: 2 additions & 1 deletion iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Client.Extensions;
using Microsoft.Azure.Devices.Client.Transport.AmqpIot;
using Microsoft.Azure.Devices.Shared;
using System.Collections.Generic;
using System.Linq;

namespace Microsoft.Azure.Devices.Client.Transport.Amqp
{
Expand Down
2 changes: 1 addition & 1 deletion iothub/device/src/Transport/Amqp/AmqpTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal class AmqpTransportHandler : TransportHandler
protected AmqpUnit _amqpUnit;
private readonly Action<TwinCollection> _onDesiredStatePatchListener;
private readonly object _lock = new object();
private readonly ConcurrentDictionary<string, TaskCompletionSource<AmqpMessage>> _twinResponseCompletions = new ConcurrentDictionary<string, TaskCompletionSource<AmqpMessage>>();
private readonly ConcurrentDictionary<string, TaskCompletionSource<Twin>> _twinResponseCompletions = new ConcurrentDictionary<string, TaskCompletionSource<Twin>>();
private bool _closed;

static AmqpTransportHandler()
Expand Down
36 changes: 20 additions & 16 deletions iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
using Newtonsoft.Json;

#if NET5_0

using TaskCompletionSource = System.Threading.Tasks.TaskCompletionSource;

#else
using TaskCompletionSource = Microsoft.Azure.Devices.Shared.TaskCompletionSource;
#endif
Expand Down Expand Up @@ -112,7 +114,8 @@ internal sealed class MqttTransportHandler : TransportHandler, IMqttIotHubEventH
private readonly Func<IPAddress[], int, Task<IChannel>> _channelFactory;
private readonly Queue<string> _completionQueue;
private readonly MqttIotHubAdapterFactory _mqttIotHubAdapterFactory;
private readonly QualityOfService _qos;
private readonly QualityOfService _qosSendPacketToService;
private readonly QualityOfService _qosReceivePacketFromService;
private readonly bool _retainMessagesAcrossSessions;
private readonly object _syncRoot = new object();
private readonly RetryPolicy _closeRetryPolicy;
Expand Down Expand Up @@ -173,7 +176,8 @@ internal MqttTransportHandler(
_deviceboundMessageFilter = string.Format(CultureInfo.InvariantCulture, DeviceBoundMessagesTopicFilter, iotHubConnectionString.DeviceId);
_deviceboundMessagePrefix = string.Format(CultureInfo.InvariantCulture, DeviceBoundMessagesTopicPrefix, iotHubConnectionString.DeviceId);

_qos = settings.PublishToServerQoS;
_qosSendPacketToService = settings.PublishToServerQoS;
_qosReceivePacketFromService = settings.ReceivingQoS;

// If the CleanSession flag is set to false, C2D messages will be retained across device sessions, i.e. the device
// will receive the C2D messages that were sent to it while it was disconnected.
Expand Down Expand Up @@ -299,7 +303,7 @@ public override async Task<Message> ReceiveAsync(CancellationToken cancellationT
}

await WaitUntilC2dMessageArrivesAsync(cancellationToken).ConfigureAwait(false);
return ProcessMessage();
return ProcessC2dMessage();
}
finally
{
Expand Down Expand Up @@ -338,7 +342,7 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)
using var cts = new CancellationTokenSource(timeout);

await WaitUntilC2dMessageArrivesAsync(cts.Token).ConfigureAwait(false);
return ProcessMessage();
return ProcessC2dMessage();
}
finally
{
Expand All @@ -347,20 +351,20 @@ public override async Task<Message> ReceiveAsync(TimeoutHelper timeoutHelper)
}
}

private Message ProcessMessage()
private Message ProcessC2dMessage()
{
Message message = null;

try
{
if (Logging.IsEnabled)
Logging.Enter(this, message, $"Will begin processing received C2D message, queue size={_messageQueue.Count}", nameof(ProcessMessage));
Logging.Enter(this, message, $"Will begin processing received C2D message, queue size={_messageQueue.Count}", nameof(ProcessC2dMessage));

lock (_syncRoot)
{
if (_messageQueue.TryDequeue(out message))
{
if (_qos == QualityOfService.AtLeastOnce)
if (_qosReceivePacketFromService == QualityOfService.AtLeastOnce)
{
_completionQueue.Enqueue(message.LockToken);
}
Expand All @@ -374,7 +378,7 @@ private Message ProcessMessage()
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, message, $"Processed received C2D message with Id={message?.MessageId}", nameof(ProcessMessage));
Logging.Exit(this, message, $"Processed received C2D message with Id={message?.MessageId}", nameof(ProcessC2dMessage));
}
}

Expand All @@ -398,7 +402,7 @@ public override async Task CompleteAsync(string lockToken, CancellationToken can
cancellationToken.ThrowIfCancellationRequested();
EnsureValidState();

if (_qos == QualityOfService.AtMostOnce)
if (_qosReceivePacketFromService == QualityOfService.AtMostOnce)
{
throw new IotHubException("Complete is not allowed for QoS 0.", isTransient: false);
}
Expand Down Expand Up @@ -565,7 +569,7 @@ private async Task HandleIncomingMessagesAsync()
if (Logging.IsEnabled)
Logging.Enter(this, "Process C2D message via callback", nameof(HandleIncomingMessagesAsync));

Message message = ProcessMessage();
Message message = ProcessC2dMessage();

// We are intentionally not awaiting _deviceMessageReceivedListener callback.
// This is a user-supplied callback that isn't required to be awaited by us. We can simply invoke it and continue.
Expand Down Expand Up @@ -652,7 +656,7 @@ private async Task HandleIncomingEventMessageAsync(Message message)
// Add the endpoint as a SystemProperty
message.SystemProperties.Add(MessageSystemPropertyNames.InputName, inputName);

if (_qos == QualityOfService.AtLeastOnce)
if (_qosReceivePacketFromService == QualityOfService.AtLeastOnce)
{
lock (_syncRoot)
{
Expand Down Expand Up @@ -833,7 +837,7 @@ public override async Task EnableMethodsAsync(CancellationToken cancellationToke
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_001: `EnableMethodsAsync` shall subscribe using the '$iothub/methods/POST/' topic filter.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_002: `EnableMethodsAsync` shall wait for a SUBACK for the subscription request.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_003: `EnableMethodsAsync` shall return failure if the subscription request fails.
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(MethodPostTopicFilter, QualityOfService.AtMostOnce))).ConfigureAwait(true);
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(MethodPostTopicFilter, _qosReceivePacketFromService))).ConfigureAwait(true);
}

public override async Task DisableMethodsAsync(CancellationToken cancellationToken)
Expand All @@ -855,7 +859,7 @@ public override async Task EnableEventReceiveAsync(bool isAnEdgeModule, Cancella
// Codes_SRS_CSHARP_MQTT_TRANSPORT_33_021: `EnableEventReceiveAsync` shall subscribe using the 'devices/{0}/modules/{1}/' topic filter.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_33_022: `EnableEventReceiveAsync` shall wait for a SUBACK for the subscription request.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_33_023: `EnableEventReceiveAsync` shall return failure if the subscription request fails.
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_receiveEventMessageFilter, _qos))).ConfigureAwait(true);
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_receiveEventMessageFilter, _qosReceivePacketFromService))).ConfigureAwait(true);
}

public override async Task DisableEventReceiveAsync(bool isAnEdgeModule, CancellationToken cancellationToken)
Expand Down Expand Up @@ -897,7 +901,7 @@ public override async Task EnableTwinPatchAsync(CancellationToken cancellationTo
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_010: `EnableTwinPatchAsync` shall subscribe using the '$iothub/twin/PATCH/properties/desired/#' topic filter.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_011: `EnableTwinPatchAsync` shall wait for a SUBACK on the subscription request.
// Codes_SRS_CSHARP_MQTT_TRANSPORT_18_012: `EnableTwinPatchAsync` shall return failure if the subscription request fails.
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(TwinPatchTopicFilter, QualityOfService.AtMostOnce))).ConfigureAwait(true);
await _channel.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(TwinPatchTopicFilter, _qosReceivePacketFromService))).ConfigureAwait(true);

if (Logging.IsEnabled)
Logging.Exit(this, cancellationToken, nameof(EnableTwinPatchAsync));
Expand Down Expand Up @@ -1076,7 +1080,7 @@ private async Task SubscribeCloudToDeviceMessagesAsync()
if (TryStateTransition(TransportState.Open, TransportState.Subscribing))
{
await _channel
.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_deviceboundMessageFilter, QualityOfService.AtLeastOnce)))
.WriteAsync(new SubscribePacket(0, new SubscriptionRequest(_deviceboundMessageFilter, _qosReceivePacketFromService)))
.ConfigureAwait(true);

if (TryStateTransition(TransportState.Subscribing, TransportState.Receiving)
Expand All @@ -1095,7 +1099,7 @@ private Task SubscribeTwinResponsesAsync()
0,
new SubscriptionRequest(
TwinResponseTopicFilter,
QualityOfService.AtMostOnce)));
_qosReceivePacketFromService)));
}

private bool ParseResponseTopic(string topicName, out int status)
Expand Down
3 changes: 2 additions & 1 deletion iothub/service/src/JobProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ namespace Microsoft.Azure.Devices
{
/// <summary>
/// Contains properties of a Job.
/// See online <a href="https://docs.microsoft.com/rest/api/iothub/service/createimportexportjob">documentation</a> for more infomration.
/// See online <a href="https://docs.microsoft.com/en-us/rest/api/iothub/service/createimportexportjob">documentation</a>
/// for more infomration.
/// </summary>
public class JobProperties
{
Expand Down
2 changes: 1 addition & 1 deletion iothub/service/src/Microsoft.Azure.Devices.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,6 @@
<PackageReference Include="System.Diagnostics.TraceSource" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.Contracts" Version="4.3.0" />
<PackageReference Include="Microsoft.Rest.ClientRuntime" Version="2.3.21" />
<PackageReference Include="Azure.Core" Version="1.9.0" />
<PackageReference Include="Azure.Core" Version="1.22.0" />
</ItemGroup>
</Project>
Loading

0 comments on commit e861cb4

Please sign in to comment.