diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs index 7a2bb39a862fd..30158a24d09ff 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs @@ -99,25 +99,32 @@ public virtual AmqpMessage BuildAmqpBatchFromMessages( batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; } - if (firstMessage?.Properties.MessageId != null) + if ((firstMessage?.Sections & SectionFlag.Properties) > 0) { - batchEnvelope.Properties.MessageId = firstMessage.Properties.MessageId; - } - if (firstMessage?.Properties.GroupId != null) - { - batchEnvelope.Properties.GroupId = firstMessage.Properties.GroupId; - } + if (firstMessage?.Properties.MessageId != null) + { + batchEnvelope.Properties.MessageId = firstMessage.Properties.MessageId; + } - if (firstMessage?.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] != null) - { - batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] = - firstMessage.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName]; + if (firstMessage?.Properties.GroupId != null) + { + batchEnvelope.Properties.GroupId = firstMessage.Properties.GroupId; + } } - if (firstMessage?.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] != null) + if ((firstMessage?.Sections & SectionFlag.MessageAnnotations) > 0) { - batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] = - firstMessage.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName]; + if (firstMessage?.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] != null) + { + batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] = + firstMessage.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName]; + } + + if (firstMessage?.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] != null) + { + batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] = + firstMessage.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName]; + } } batchEnvelope.Batchable = true; @@ -172,12 +179,13 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) var message = AmqpAnnotatedMessageConverter.ToAmqpMessage(annotatedMessage); // Reset the resultant TTL as there is special handling for Service Bus - message.Header.Ttl = null; + if ((message.Sections & SectionFlag.Header) > 0) + message.Header.Ttl = null; // If TTL is set, it is used to calculate AbsoluteExpiryTime and CreationTime TimeSpan ttl = annotatedMessage.GetTimeToLive(); - if (ttl != TimeSpan.MaxValue) - { + if (ttl != TimeSpan.MaxValue) + { message.Header.Ttl = (uint)ttl.TotalMilliseconds; message.Properties.CreationTime = DateTime.UtcNow; @@ -189,9 +197,9 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) { message.Properties.AbsoluteExpiryTime = AmqpConstants.MaxAbsoluteExpiryTime; } - } - else - { + } + else + { if (annotatedMessage.Properties.CreationTime.HasValue) { message.Properties.CreationTime = annotatedMessage.Properties.CreationTime.Value.UtcDateTime; @@ -200,9 +208,9 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage) { message.Properties.AbsoluteExpiryTime = annotatedMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime; } - } + } - return message; + return message; } public virtual ServiceBusReceivedMessage AmqpMessageToSBReceivedMessage(AmqpMessage amqpMessage, bool isPeeked = false) @@ -214,6 +222,15 @@ public virtual ServiceBusReceivedMessage AmqpMessageToSBReceivedMessage(AmqpMess annotatedMessage.Header.DeliveryCount = isPeeked ? annotatedMessage.Header.DeliveryCount : annotatedMessage.Header.DeliveryCount + 1; } + if (annotatedMessage.HasSection(AmqpMessageSection.MessageAnnotations) && + annotatedMessage.MessageAnnotations.TryGetValue(AmqpMessageConstants.LockedUntilName, out object lockedUntil)) + { + if (lockedUntil is DateTime lockedUntilDateTime && lockedUntilDateTime >= DateTimeOffset.MaxValue.UtcDateTime) + { + annotatedMessage.MessageAnnotations[AmqpMessageConstants.LockedUntilName] = DateTimeOffset.MaxValue.UtcDateTime; + } + } + ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage); if (GuidUtilities.TryParseGuidBytes(amqpMessage.DeliveryTag, out Guid lockToken)) { @@ -299,95 +316,5 @@ internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType return amqpObject != null; } - - private static bool TryGetNetObjectFromAmqpObject(object amqpObject, MappingType mappingType, out object netObject) - { - netObject = null; - if (amqpObject == null) - { - return true; - } - - switch (SerializationUtilities.GetTypeId(amqpObject)) - { - case PropertyValueType.Byte: - case PropertyValueType.SByte: - case PropertyValueType.Int16: - case PropertyValueType.Int32: - case PropertyValueType.Int64: - case PropertyValueType.UInt16: - case PropertyValueType.UInt32: - case PropertyValueType.UInt64: - case PropertyValueType.Single: - case PropertyValueType.Double: - case PropertyValueType.Boolean: - case PropertyValueType.Decimal: - case PropertyValueType.Char: - case PropertyValueType.Guid: - case PropertyValueType.DateTime: - case PropertyValueType.String: - netObject = amqpObject; - break; - case PropertyValueType.Unknown: - if (amqpObject is AmqpSymbol amqpObjectAsAmqpSymbol) - { - netObject = (amqpObjectAsAmqpSymbol).Value; - } - else if (amqpObject is ArraySegment amqpObjectAsArraySegment) - { - ArraySegment binValue = amqpObjectAsArraySegment; - if (binValue.Count == binValue.Array.Length) - { - netObject = binValue.Array; - } - else - { - var buffer = new byte[binValue.Count]; - Buffer.BlockCopy(binValue.Array, binValue.Offset, buffer, 0, binValue.Count); - netObject = buffer; - } - } - else if (amqpObject is DescribedType amqpObjectAsDescribedType) - { - if (amqpObjectAsDescribedType.Descriptor is AmqpSymbol) - { - var amqpSymbol = (AmqpSymbol)amqpObjectAsDescribedType.Descriptor; - if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.UriName)) - { - netObject = new Uri((string)amqpObjectAsDescribedType.Value); - } - else if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.TimeSpanName)) - { - netObject = new TimeSpan((long)amqpObjectAsDescribedType.Value); - } - else if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.DateTimeOffsetName)) - { - netObject = new DateTimeOffset(new DateTime((long)amqpObjectAsDescribedType.Value, DateTimeKind.Utc)); - } - } - } - else if (mappingType == MappingType.ApplicationProperty) - { - throw new SerializationException(Resources.FailedToSerializeUnsupportedType.FormatForUser(amqpObject.GetType().FullName)); - } - else if (amqpObject is AmqpMap map) - { - var dictionary = new Dictionary(); - foreach (var pair in map) - { - dictionary.Add(pair.Key.ToString(), pair.Value); - } - - netObject = dictionary; - } - else - { - netObject = amqpObject; - } - break; - } - - return netObject != null; - } } }