Skip to content

Commit

Permalink
Update arg validation (Azure#16885)
Browse files Browse the repository at this point in the history
* Update arg validation

* Update validation for AMQP models

* API updates

* Perf enhancements

* PR fb
  • Loading branch information
JoshLove-msft authored Nov 12, 2020
1 parent 0a025d3 commit ed8416f
Show file tree
Hide file tree
Showing 23 changed files with 199 additions and 86 deletions.
1 change: 1 addition & 0 deletions sdk/core/Azure.Core.Amqp/src/AmqpAddress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public struct AmqpAddress : IEquatable<AmqpAddress>
/// <param name="address">The address.</param>
public AmqpAddress(string address)
{
Argument.AssertNotNull(address, nameof(address));
_address = address;
}

Expand Down
3 changes: 2 additions & 1 deletion sdk/core/Azure.Core.Amqp/src/AmqpMessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public class AmqpMessageBody
/// <param name="data">The data sections.</param>
public AmqpMessageBody(IEnumerable<ReadOnlyMemory<byte>> data)
{
_data = data ?? Enumerable.Empty<ReadOnlyMemory<byte>>();
Argument.AssertNotNull(data, nameof(data));
_data = data;
BodyType = AmqpMessageBodyType.Data;
}

Expand Down
7 changes: 4 additions & 3 deletions sdk/core/Azure.Core.Amqp/src/AmqpMessageId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Azure.Core.Amqp
/// </summary>
public struct AmqpMessageId : IEquatable<AmqpMessageId>
{
private readonly string? _messageIdString;
private readonly string _messageIdString;

/// <summary>
/// Initializes a new <see cref="AmqpMessageId"/> using the provided
Expand All @@ -22,6 +22,7 @@ public struct AmqpMessageId : IEquatable<AmqpMessageId>
/// <param name="messageId">The message Id.</param>
public AmqpMessageId(string messageId)
{
Argument.AssertNotNull(messageId, nameof(messageId));
_messageIdString = messageId;
}

Expand All @@ -30,7 +31,7 @@ public AmqpMessageId(string messageId)
/// </summary>
///
/// <returns>A <see cref="string"/> from the value of this instance.</returns>
public override string ToString() => _messageIdString!;
public override string ToString() => _messageIdString;

/// <summary>
/// Determines whether the provided object is equal to the current object.
Expand Down Expand Up @@ -69,7 +70,7 @@ public override int GetHashCode()
/// <see cref="AmqpMessageId"/>; otherwise, <see langword="false" />.
/// </returns>
public bool Equals(AmqpMessageId other) =>
other.Equals(_messageIdString!);
other.Equals(_messageIdString);

/// <summary>
/// Determines whether the provided <see cref="string"/> is equal to the current instance.
Expand Down
9 changes: 9 additions & 0 deletions sdk/core/Azure.Core.Amqp/tests/AmqpAddressTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using NUnit.Framework;

namespace Azure.Core.Amqp.Tests
Expand All @@ -17,5 +18,13 @@ public void CanCreateFromString()
Assert.True(address.Equals((object)new AmqpAddress("address")));
Assert.False(address.Equals(new AmqpMessageId("messageId2")));
}

[Test]
public void CannotCreateFromNullAddress()
{
Assert.That(
() => new AmqpAddress(null),
Throws.InstanceOf<ArgumentNullException>());
}
}
}
12 changes: 7 additions & 5 deletions sdk/core/Azure.Core.Amqp/tests/AmqpDataMessageBodyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using NUnit.Framework;

namespace Azure.Core.Amqp.Tests
Expand All @@ -16,11 +15,14 @@ public void CanCreateDataBody()
Assert.AreEqual(AmqpMessageBodyType.Data, body.BodyType);
Assert.IsTrue(body.TryGetData(out var data));
Assert.NotNull(data);
}

body = new AmqpMessageBody(null);
Assert.AreEqual(AmqpMessageBodyType.Data, body.BodyType);
Assert.IsTrue(body.TryGetData(out data));
Assert.NotNull(data);
[Test]
public void CannotCreateFromNullBody()
{
Assert.That(
() => new AmqpMessageBody(null),
Throws.InstanceOf<ArgumentNullException>());
}
}
}
8 changes: 8 additions & 0 deletions sdk/core/Azure.Core.Amqp/tests/AmqpMessageIdTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,13 @@ public void CanCreateFromString()
Assert.False(messageId.Equals(Guid.NewGuid()));
Assert.False(messageId.Equals((object)"messageId"));
}

[Test]
public void CannotCreateFromNullMessageId()
{
Assert.That(
() => new AmqpMessageId(null),
Throws.InstanceOf<ArgumentNullException>());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ protected ServiceBusSender() { }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
public virtual System.Threading.Tasks.Task<long> ScheduleMessageAsync(Azure.Messaging.ServiceBus.ServiceBusMessage message, System.DateTimeOffset scheduledEnqueueTime, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<long[]> ScheduleMessagesAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.ServiceBus.ServiceBusMessage> messages, System.DateTimeOffset scheduledEnqueueTime, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<long>> ScheduleMessagesAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.ServiceBus.ServiceBusMessage> messages, System.DateTimeOffset scheduledEnqueueTime, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendMessageAsync(Azure.Messaging.ServiceBus.ServiceBusMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendMessagesAsync(Azure.Messaging.ServiceBus.ServiceBusMessageBatch messageBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendMessagesAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.ServiceBus.ServiceBusMessage> messages, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down Expand Up @@ -404,7 +404,7 @@ public ServiceBusSessionProcessorOptions() { }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ReceiveMode ReceiveMode { get { throw null; } set { } }
public string[] SessionIds { get { throw null; } set { } }
public System.Collections.Generic.IList<string> SessionIds { get { throw null; } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ internal async Task SetStateInternal(
/// Throws if the messages have not been deferred.</returns>
/// <seealso cref="DeferAsync"/>
public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync(
IList<long> sequenceNumbers,
IReadOnlyList<long> sequenceNumbers,
CancellationToken cancellationToken = default)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
Expand All @@ -1173,7 +1173,7 @@ await _retryPolicy.RunOperation(
}

internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsyncInternal(
long[] sequenceNumbers,
IReadOnlyList<long> sequenceNumbers,
TimeSpan timeout)
{
var messages = new List<ServiceBusReceivedMessage>();
Expand Down
12 changes: 6 additions & 6 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ internal virtual async Task SendBatchInternalAsync(
/// <param name="messages">The list of messages to send.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public override async Task SendAsync(
IList<ServiceBusMessage> messages,
IReadOnlyList<ServiceBusMessage> messages,
CancellationToken cancellationToken)
{
AmqpMessage messageFactory() => AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(messages);
Expand Down Expand Up @@ -374,8 +374,8 @@ private void OnManagementLinkClosed(object managementLink, EventArgs e) =>
/// <param name="messages"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public override async Task<long[]> ScheduleMessagesAsync(
IList<ServiceBusMessage> messages,
public override async Task<IReadOnlyList<long>> ScheduleMessagesAsync(
IReadOnlyList<ServiceBusMessage> messages,
CancellationToken cancellationToken = default)
{
long[] seqNumbers = null;
Expand All @@ -399,7 +399,7 @@ await _retryPolicy.RunOperation(async (timeout) =>
/// <param name="cancellationToken"></param>
/// <returns></returns>
internal async Task<long[]> ScheduleMessageInternalAsync(
IList<ServiceBusMessage> messages,
IReadOnlyList<ServiceBusMessage> messages,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -492,7 +492,7 @@ internal async Task<long[]> ScheduleMessageInternalAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
public override async Task CancelScheduledMessagesAsync(
long[] sequenceNumbers,
IReadOnlyList<long> sequenceNumbers,
CancellationToken cancellationToken = default)
{
Task cancelMessageTask = _retryPolicy.RunOperation(async (timeout) =>
Expand All @@ -515,7 +515,7 @@ await CancelScheduledMessageInternalAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
internal async Task CancelScheduledMessageInternalAsync(
long[] sequenceNumbers,
IReadOnlyList<long> sequenceNumbers,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public abstract Task DeadLetterAsync(
/// Throws if the messages have not been deferred.</returns>
/// <seealso cref="DeferAsync"/>
public abstract Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync(
IList<long> sequenceNumbers,
IReadOnlyList<long> sequenceNumbers,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract ValueTask<TransportMessageBatch> CreateMessageBatchAsync(
/// <param name="messages">The list of messages to send.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public abstract Task SendAsync(
IList<ServiceBusMessage> messages,
IReadOnlyList<ServiceBusMessage> messages,
CancellationToken cancellationToken);

/// <summary>
Expand All @@ -79,8 +79,8 @@ public abstract Task SendBatchAsync(
/// <param name="messages"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public abstract Task<long[]> ScheduleMessagesAsync(
IList<ServiceBusMessage> messages,
public abstract Task<IReadOnlyList<long>> ScheduleMessagesAsync(
IReadOnlyList<ServiceBusMessage> messages,
CancellationToken cancellationToken = default);

/// <summary>
Expand All @@ -90,7 +90,7 @@ public abstract Task<long[]> ScheduleMessagesAsync(
/// <param name="cancellationToken"></param>
/// <returns></returns>
public abstract Task CancelScheduledMessagesAsync(
long[] sequenceNumbers,
IReadOnlyList<long> sequenceNumbers,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void ReceiveDeferredMessageStartCore(string identifier, int messageCount,
}

[NonEvent]
public virtual void ReceiveDeferredMessageStart(string identifier, IList<long> sequenceNumbers)
public virtual void ReceiveDeferredMessageStart(string identifier, IReadOnlyList<long> sequenceNumbers)
{
if (IsEnabled())
{
Expand Down Expand Up @@ -371,12 +371,12 @@ public virtual void ScheduleMessagesException(string identifier, string exceptio
}

[NonEvent]
public virtual void CancelScheduledMessagesStart(string identifier, long[] sequenceNumbers)
public virtual void CancelScheduledMessagesStart(string identifier, IReadOnlyList<long> sequenceNumbers)
{
if (IsEnabled())
{
var formattedSequenceNumbers = StringUtility.GetFormattedSequenceNumbers(sequenceNumbers);
CancelScheduledMessagesStartCore(identifier, sequenceNumbers.Length, formattedSequenceNumbers);
CancelScheduledMessagesStartCore(identifier, sequenceNumbers.Count, formattedSequenceNumbers);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,21 @@ public static ServiceBusReceivedMessage ServiceBusReceivedMessage(
DateTimeOffset enqueuedTime = default)
{
var amqpMessage = new AmqpAnnotatedMessage(new AmqpMessageBody(new ReadOnlyMemory<byte>[] { body }));
amqpMessage.Properties.CorrelationId = new AmqpMessageId(correlationId);

if (correlationId != default)
{
amqpMessage.Properties.CorrelationId = new AmqpMessageId(correlationId);
}
amqpMessage.Properties.Subject = subject;
amqpMessage.Properties.To = new AmqpAddress(to);
if (to != default)
{
amqpMessage.Properties.To = new AmqpAddress(to);
}
amqpMessage.Properties.ContentType = contentType;
amqpMessage.Properties.ReplyTo = new AmqpAddress(replyTo);
if (replyTo != default)
{
amqpMessage.Properties.ReplyTo = new AmqpAddress(replyTo);
}
amqpMessage.MessageAnnotations[AmqpMessageConstants.ScheduledEnqueueTimeUtcName] = scheduledEnqueueTime.UtcDateTime;

if (messageId != default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Amqp;
Expand Down Expand Up @@ -94,7 +95,7 @@ internal ServiceBusSessionProcessor(
true,
plugins,
options.ToProcessorOptions(),
options.SessionIds,
options.SessionIds.ToArray(),
options.MaxConcurrentSessions,
options.MaxConcurrentCallsPerSession);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.ComponentModel;
using Azure.Core;

Expand Down Expand Up @@ -126,12 +127,12 @@ public int MaxConcurrentCallsPerSession
private int _maxConcurrentCallsPerSessions = 1;

/// <summary>
/// Gets or sets an optional list of session IDs to scope
/// the <see cref="ServiceBusSessionProcessor"/> to. If left
/// blank, the processor will not be limited to any specific
/// Gets an optional list of session IDs to scope
/// the <see cref="ServiceBusSessionProcessor"/> to. If the list is
/// left empty, the processor will not be limited to any specific
/// session IDs.
/// </summary>
public string[] SessionIds { get; set; }
public IList<string> SessionIds { get; } = new List<string>();

/// <summary>
/// Determines whether the specified <see cref="System.Object" /> is equal to this instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInterna
CancellationToken cancellationToken)
{
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
Argument.AssertAtLeast(maxMessages, 1, nameof(maxMessages));

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.PeekMessageStart(Identifier, sequenceNumber, maxMessages);
using DiagnosticScope scope = ScopeFactory.CreateScope(
Expand Down Expand Up @@ -898,11 +900,21 @@ public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDefer
CancellationToken cancellationToken = default)
{
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver));
Argument.AssertNotNullOrEmpty(sequenceNumbers, nameof(sequenceNumbers));
Argument.AssertNotNull(sequenceNumbers, nameof(sequenceNumbers));
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var sequenceNumbersList = sequenceNumbers.ToList();

Logger.ReceiveDeferredMessageStart(Identifier, sequenceNumbersList);
IReadOnlyList<long> sequenceList = sequenceNumbers switch
{
IReadOnlyList<long> alreadyList => alreadyList,
_ => sequenceNumbers.ToList()
};

if (sequenceList.Count == 0)
{
return Array.Empty<ServiceBusReceivedMessage>();
}

Logger.ReceiveDeferredMessageStart(Identifier, sequenceList);
using DiagnosticScope scope = ScopeFactory.CreateScope(DiagnosticProperty.ReceiveDeferredActivityName);
scope.AddAttribute(
DiagnosticProperty.SequenceNumbersAttribute,
Expand All @@ -913,7 +925,7 @@ public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDefer
try
{
deferredMessages = await InnerReceiver.ReceiveDeferredMessagesAsync(
sequenceNumbersList,
sequenceList,
cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
Expand Down
Loading

0 comments on commit ed8416f

Please sign in to comment.