Skip to content

Commit

Permalink
[Event Hubs] Flaky Test Fix (#25346)
Browse files Browse the repository at this point in the history
The focus of these changes is to stabilize some tests with non-deterministic
behavior that have recently shown to be flaky when run in CI pipelines.
  • Loading branch information
jsquire authored Nov 17, 2021
1 parent 7268b8a commit c19c810
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ public async Task BackgroundProcessingToleratesPartitionIdQueryFailure()
var mockProcessor = new Mock<EventProcessor<EventProcessorPartition>>(65, "consumerGroup", "namespace", "eventHub", Mock.Of<TokenCredential>(), default(EventProcessorOptions)) { CallBase = true };

mockConnection
.Setup(conn => conn.GetPartitionIdsAsync(It.IsAny<EventHubsRetryPolicy>(), It.IsAny<CancellationToken>()))
.Throws(expectedException);
.SetupSequence(conn => conn.GetPartitionIdsAsync(It.IsAny<EventHubsRetryPolicy>(), It.IsAny<CancellationToken>()))
.Throws(expectedException)
.ReturnsAsync(new[] { "0", "1" });

mockProcessor
.Setup(processor => processor.CreateConnection())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1813,16 +1813,27 @@ public async Task EnqueueEventsAsyncEnqueuesForAutomaticRouting()

try
{
await mockBufferedProducer.Object.EnqueueEventsAsync(events, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, cancellationSource.Token);
Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");

Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < events.Length)
{
++readEventCount;
Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -1881,17 +1892,27 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionKey()
try
{
var options = new EnqueueEventOptions { PartitionKey = partitionKey };
await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token);

Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");
Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < events.Length)
{
++readEventCount;
Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -1941,17 +1962,27 @@ public async Task EnqueueEventsAsyncEnqueuesWithAPartitionAssignment()
try
{
var options = new EnqueueEventOptions { PartitionId = partitionId };
await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventsAsync(events, options, cancellationSource.Token);

Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");
Assert.That(enqueuedCount, Is.EqualTo(events.Length), "The return value should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(events.Length), "The total event count should indicate that the correct number of events were enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(events.Length), "The partition event count should indicate that the correct number of events were enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < events.Length)
{
++readEventCount;
Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(events.SingleOrDefault(item => item.EventBody.ToString() == readEvent.EventBody.ToString()), Is.Not.Null, $"The event with body: [{ readEvent.EventBody }] was not in the source.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(events.Length), "The number of events read should match the source length.");
Expand Down Expand Up @@ -2555,16 +2586,27 @@ public async Task EnqueueEventAsyncEnqueuesForAutomaticRouting()

try
{
await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, cancellationSource.Token);
Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");

Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < 1)
{
++readEventCount;
Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2623,17 +2665,27 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionKey()
try
{
var options = new EnqueueEventOptions { PartitionKey = partitionKey };
await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token);

Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");
Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < 1)
{
++readEventCount;
Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.EqualTo(partitionKey), $"The partition key should have been preserved for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2683,17 +2735,27 @@ public async Task EnqueueEventAsyncEnqueuesWithAPartitionAssignment()
try
{
var options = new EnqueueEventOptions { PartitionId = partitionId };
await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token);
var enqueuedCount = await mockBufferedProducer.Object.EnqueueEventAsync(expectedEvent, options, cancellationSource.Token);

Assert.That(mockBufferedProducer.Object.ActivePartitionStateMap.TryGetValue(partitionId, out var partitionPublisher), Is.True, "A publisher should have been registered for the partition.");
Assert.That(enqueuedCount, Is.EqualTo(1), "The return value should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.TotalBufferedEventCount, Is.EqualTo(1), "The total event count should indicate that a single event was enqueued.");
Assert.That(mockBufferedProducer.Object.GetBufferedEventCount(partitionId), Is.EqualTo(1), "The partition event count should indicate that a single event was enqueued.");

var readEventCount = 0;

while (partitionPublisher.TryReadEvent(out var readEvent))
while (readEventCount < 1)
{
++readEventCount;
Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(1), "A single event should have been enqueued.");
Expand Down Expand Up @@ -2763,9 +2825,23 @@ public async Task EnqueueEventAsyncWaitsWhenFull()
Assert.That(partitionPublisher.TryReadEvent(out var readBlockerEvent), Is.True, "The blocking event should be available to read immediately.");
Assert.That(blockerEvent.EventBody.ToString(), Is.EqualTo(readBlockerEvent.EventBody.ToString()), $"The event with body: [{ readBlockerEvent.EventBody }] was not enqueued.");

Assert.That(partitionPublisher.TryReadEvent(out var readEvent), Is.True, "An event is expected to be available to read.");
Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
var readEventCount = 0;

while (readEventCount < 1)
{
if (partitionPublisher.TryReadEvent(out var readEvent))
{
++readEventCount;

Assert.That(expectedEvent.EventBody.ToString(), Is.EqualTo(readEvent.EventBody.ToString()), $"The event with body: [{ readEvent.EventBody }] was not enqueued.");
Assert.That(readEvent.GetRawAmqpMessage().GetPartitionKey(null), Is.Null, $"The partition key should not have been set for the event with body: [{ readEvent.EventBody }].");
}

cancellationSource.Token.ThrowIfCancellationRequested();
await Task.Delay(50);
}

Assert.That(readEventCount, Is.EqualTo(1), "An event should have been available to read.");

await enqueueTask;
Assert.That(partitionPublisher.TryReadEvent(out _), Is.False, "Other than the blocker, a single event should have been enqueued.");
Expand Down

0 comments on commit c19c810

Please sign in to comment.