Skip to content

Commit

Permalink
Fix issue with Max DateTime (#34430)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft authored Feb 21, 2023
1 parent b5a0180 commit 5a0d69b
Showing 1 changed file with 39 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,32 @@ public virtual AmqpMessage BuildAmqpBatchFromMessages(
batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
}

if (firstMessage?.Properties.MessageId != null)
if ((firstMessage?.Sections & SectionFlag.Properties) > 0)
{
batchEnvelope.Properties.MessageId = firstMessage.Properties.MessageId;
}
if (firstMessage?.Properties.GroupId != null)
{
batchEnvelope.Properties.GroupId = firstMessage.Properties.GroupId;
}
if (firstMessage?.Properties.MessageId != null)
{
batchEnvelope.Properties.MessageId = firstMessage.Properties.MessageId;
}

if (firstMessage?.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] != null)
{
batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] =
firstMessage.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName];
if (firstMessage?.Properties.GroupId != null)
{
batchEnvelope.Properties.GroupId = firstMessage.Properties.GroupId;
}
}

if (firstMessage?.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] != null)
if ((firstMessage?.Sections & SectionFlag.MessageAnnotations) > 0)
{
batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] =
firstMessage.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName];
if (firstMessage?.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] != null)
{
batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName] =
firstMessage.MessageAnnotations.Map[AmqpMessageConstants.PartitionKeyName];
}

if (firstMessage?.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] != null)
{
batchEnvelope.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName] =
firstMessage.MessageAnnotations.Map[AmqpMessageConstants.ViaPartitionKeyName];
}
}

batchEnvelope.Batchable = true;
Expand Down Expand Up @@ -172,12 +179,13 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
var message = AmqpAnnotatedMessageConverter.ToAmqpMessage(annotatedMessage);

// Reset the resultant TTL as there is special handling for Service Bus
message.Header.Ttl = null;
if ((message.Sections & SectionFlag.Header) > 0)
message.Header.Ttl = null;

// If TTL is set, it is used to calculate AbsoluteExpiryTime and CreationTime
TimeSpan ttl = annotatedMessage.GetTimeToLive();
if (ttl != TimeSpan.MaxValue)
{
if (ttl != TimeSpan.MaxValue)
{
message.Header.Ttl = (uint)ttl.TotalMilliseconds;
message.Properties.CreationTime = DateTime.UtcNow;

Expand All @@ -189,9 +197,9 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
{
message.Properties.AbsoluteExpiryTime = AmqpConstants.MaxAbsoluteExpiryTime;
}
}
else
{
}
else
{
if (annotatedMessage.Properties.CreationTime.HasValue)
{
message.Properties.CreationTime = annotatedMessage.Properties.CreationTime.Value.UtcDateTime;
Expand All @@ -200,9 +208,9 @@ public virtual AmqpMessage SBMessageToAmqpMessage(ServiceBusMessage sbMessage)
{
message.Properties.AbsoluteExpiryTime = annotatedMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime;
}
}
}

return message;
return message;
}

public virtual ServiceBusReceivedMessage AmqpMessageToSBReceivedMessage(AmqpMessage amqpMessage, bool isPeeked = false)
Expand All @@ -214,6 +222,15 @@ public virtual ServiceBusReceivedMessage AmqpMessageToSBReceivedMessage(AmqpMess
annotatedMessage.Header.DeliveryCount = isPeeked ? annotatedMessage.Header.DeliveryCount : annotatedMessage.Header.DeliveryCount + 1;
}

if (annotatedMessage.HasSection(AmqpMessageSection.MessageAnnotations) &&
annotatedMessage.MessageAnnotations.TryGetValue(AmqpMessageConstants.LockedUntilName, out object lockedUntil))
{
if (lockedUntil is DateTime lockedUntilDateTime && lockedUntilDateTime >= DateTimeOffset.MaxValue.UtcDateTime)
{
annotatedMessage.MessageAnnotations[AmqpMessageConstants.LockedUntilName] = DateTimeOffset.MaxValue.UtcDateTime;
}
}

ServiceBusReceivedMessage sbMessage = new ServiceBusReceivedMessage(annotatedMessage);
if (GuidUtilities.TryParseGuidBytes(amqpMessage.DeliveryTag, out Guid lockToken))
{
Expand Down Expand Up @@ -299,95 +316,5 @@ internal static bool TryGetAmqpObjectFromNetObject(object netObject, MappingType

return amqpObject != null;
}

private static bool TryGetNetObjectFromAmqpObject(object amqpObject, MappingType mappingType, out object netObject)
{
netObject = null;
if (amqpObject == null)
{
return true;
}

switch (SerializationUtilities.GetTypeId(amqpObject))
{
case PropertyValueType.Byte:
case PropertyValueType.SByte:
case PropertyValueType.Int16:
case PropertyValueType.Int32:
case PropertyValueType.Int64:
case PropertyValueType.UInt16:
case PropertyValueType.UInt32:
case PropertyValueType.UInt64:
case PropertyValueType.Single:
case PropertyValueType.Double:
case PropertyValueType.Boolean:
case PropertyValueType.Decimal:
case PropertyValueType.Char:
case PropertyValueType.Guid:
case PropertyValueType.DateTime:
case PropertyValueType.String:
netObject = amqpObject;
break;
case PropertyValueType.Unknown:
if (amqpObject is AmqpSymbol amqpObjectAsAmqpSymbol)
{
netObject = (amqpObjectAsAmqpSymbol).Value;
}
else if (amqpObject is ArraySegment<byte> amqpObjectAsArraySegment)
{
ArraySegment<byte> binValue = amqpObjectAsArraySegment;
if (binValue.Count == binValue.Array.Length)
{
netObject = binValue.Array;
}
else
{
var buffer = new byte[binValue.Count];
Buffer.BlockCopy(binValue.Array, binValue.Offset, buffer, 0, binValue.Count);
netObject = buffer;
}
}
else if (amqpObject is DescribedType amqpObjectAsDescribedType)
{
if (amqpObjectAsDescribedType.Descriptor is AmqpSymbol)
{
var amqpSymbol = (AmqpSymbol)amqpObjectAsDescribedType.Descriptor;
if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.UriName))
{
netObject = new Uri((string)amqpObjectAsDescribedType.Value);
}
else if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.TimeSpanName))
{
netObject = new TimeSpan((long)amqpObjectAsDescribedType.Value);
}
else if (amqpSymbol.Equals((AmqpSymbol)AmqpMessageConstants.DateTimeOffsetName))
{
netObject = new DateTimeOffset(new DateTime((long)amqpObjectAsDescribedType.Value, DateTimeKind.Utc));
}
}
}
else if (mappingType == MappingType.ApplicationProperty)
{
throw new SerializationException(Resources.FailedToSerializeUnsupportedType.FormatForUser(amqpObject.GetType().FullName));
}
else if (amqpObject is AmqpMap map)
{
var dictionary = new Dictionary<string, object>();
foreach (var pair in map)
{
dictionary.Add(pair.Key.ToString(), pair.Value);
}

netObject = dictionary;
}
else
{
netObject = amqpObject;
}
break;
}

return netObject != null;
}
}
}

0 comments on commit 5a0d69b

Please sign in to comment.