diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs index 516bfb31f53ba..5b1a12fed5e5b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs @@ -151,8 +151,9 @@ public async Task BackgroundProcessingToleratesPartitionIdQueryFailure() var mockProcessor = new Mock>(65, "consumerGroup", "namespace", "eventHub", Mock.Of(), default(EventProcessorOptions)) { CallBase = true }; mockConnection - .Setup(conn => conn.GetPartitionIdsAsync(It.IsAny(), It.IsAny())) - .Throws(expectedException); + .SetupSequence(conn => conn.GetPartitionIdsAsync(It.IsAny(), It.IsAny())) + .Throws(expectedException) + .ReturnsAsync(new[] { "0", "1" }); mockProcessor .Setup(processor => processor.CreateConnection()) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubBufferedProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubBufferedProducerClientTests.cs index d9938d22aeba0..cb674d977e9a7 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubBufferedProducerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubBufferedProducerClientTests.cs @@ -1813,16 +1813,27 @@ public async Task EnqueueEventsAsyncEnqueuesForAutomaticRouting() try { - await mockBufferedProducer.Object.EnqueueEventsAsync(events, cancellationSource.Token); + var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, cancellationSource.Token); Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition."); + Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued."); + Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued."); + Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued."); + var readEventCount = 0; - while (partitionPublisher.TryReadEvent(out var readEvent)) + while (readEventCount < events.Length) { - ++readEventCount; - Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source."); - Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + if (partitionPublisher.TryReadEvent(out var readEvent)) + { + ++readEventCount; + + Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source."); + Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + } + + cancellationSource.Token.ThrowIfCancellationRequested(); + await Task.Delay(50); } Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length."); @@ -1881,17 +1892,27 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionKey() try { var options = new EnqueueEventOptions { PartitionKey = partitionKey }; - await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token); + var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token); Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition."); + Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued."); + Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued."); + Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued."); var readEventCount = 0; - while (partitionPublisher.TryReadEvent(out var readEvent)) + while (readEventCount < events.Length) { - ++readEventCount; - Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source."); - Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }]."); + if (partitionPublisher.TryReadEvent(out var readEvent)) + { + ++readEventCount; + + Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source."); + Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }]."); + } + + cancellationSource.Token.ThrowIfCancellationRequested(); + await Task.Delay(50); } Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length."); @@ -1941,17 +1962,27 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionAssignment() try { var options = new EnqueueEventOptions { PartitionId = partitionId }; - await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token); + var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token); Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition."); + Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued."); + Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued."); + Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued."); var readEventCount = 0; - while (partitionPublisher.TryReadEvent(out var readEvent)) + while (readEventCount < events.Length) { - ++readEventCount; - Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source."); - Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + if (partitionPublisher.TryReadEvent(out var readEvent)) + { + ++readEventCount; + + Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source."); + Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + } + + cancellationSource.Token.ThrowIfCancellationRequested(); + await Task.Delay(50); } Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length."); @@ -2555,16 +2586,27 @@ public async Task EnqueueEventAsyncEnqueuesForAutomaticRouting() try { - await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, cancellationSource.Token); + var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, cancellationSource.Token); Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition."); + Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued."); + Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued."); + Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued."); + var readEventCount = 0; - while (partitionPublisher.TryReadEvent(out var readEvent)) + while (readEventCount < 1) { - ++readEventCount; - Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); + if (partitionPublisher.TryReadEvent(out var readEvent)) + { + ++readEventCount; + + Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + } + + cancellationSource.Token.ThrowIfCancellationRequested(); + await Task.Delay(50); } Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued."); @@ -2623,17 +2665,27 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionKey() try { var options = new EnqueueEventOptions { PartitionKey = partitionKey }; - await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token); + var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token); Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition."); + Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued."); + Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued."); + Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued."); var readEventCount = 0; - while (partitionPublisher.TryReadEvent(out var readEvent)) + while (readEventCount < 1) { - ++readEventCount; - Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); - Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }]."); + if (partitionPublisher.TryReadEvent(out var readEvent)) + { + ++readEventCount; + + Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); + Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }]."); + } + + cancellationSource.Token.ThrowIfCancellationRequested(); + await Task.Delay(50); } Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued."); @@ -2683,17 +2735,27 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionAssignment() try { var options = new EnqueueEventOptions { PartitionId = partitionId }; - await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token); + var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token); Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition."); + Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued."); + Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued."); + Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued."); var readEventCount = 0; - while (partitionPublisher.TryReadEvent(out var readEvent)) + while (readEventCount < 1) { - ++readEventCount; - Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); - Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + if (partitionPublisher.TryReadEvent(out var readEvent)) + { + ++readEventCount; + + Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); + Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + } + + cancellationSource.Token.ThrowIfCancellationRequested(); + await Task.Delay(50); } Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued."); @@ -2763,9 +2825,23 @@ public async Task EnqueueEventAsyncWaitsWhenFull() Assert.That(partitionPublisher.TryReadEvent(out var readBlockerEvent), Is.True, "The blocking event should be available to read immediately."); Assert.That(blockerEvent.EventBody.ToString(), Is.EqualTo(readBlockerEvent.EventBody.ToString()), $"The event with body: [{ readBlockerEvent.EventBody }] was not enqueued."); - Assert.That(partitionPublisher.TryReadEvent(out var readEvent), Is.True, "An event is expected to be available to read."); - Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); - Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + var readEventCount = 0; + + while (readEventCount < 1) + { + if (partitionPublisher.TryReadEvent(out var readEvent)) + { + ++readEventCount; + + Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued."); + Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }]."); + } + + cancellationSource.Token.ThrowIfCancellationRequested(); + await Task.Delay(50); + } + + Assert.That(readEventCount, Is.EqualTo(1), "An event should have been available to read."); await enqueueTask; Assert.That(partitionPublisher.TryReadEvent(out _), Is.False, "Other than the blocker, a single event should have been enqueued.");