From 4829e357ab2320715cea7ddf7ee7d7fc0cabb09a Mon Sep 17 00:00:00 2001 From: Serkant Karaca Date: Tue, 13 Oct 2020 18:59:06 -0700 Subject: [PATCH] Fixing regression at processor shutdown. (#15262) --- .../src/PartitionPump.cs | 34 +++---- .../tests/Infrastructure/TestUtility.cs | 12 +++ .../tests/Processor/ProcessorTests.cs | 89 +++++++++++++++++++ 3 files changed, 119 insertions(+), 16 deletions(-) diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs b/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs index 76072d9de8e88..99e5aefe5b098 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs @@ -95,22 +95,6 @@ public async Task CloseAsync(CloseReason reason) ProcessorEventSource.Log.PartitionPumpCloseStart(this.Host.HostName, this.PartitionContext.PartitionId, reason.ToString()); this.PumpStatus = PartitionPumpStatus.Closing; - // Release lease as the first thing since closing receiver can take up to operation timeout. - // This helps other available hosts discover lease available sooner. - if (reason != CloseReason.LeaseLost) - { - // Since this pump is dead, release the lease. - try - { - await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); - } - catch (Exception e) - { - // Log and ignore any failure since expired lease will be picked by another host. - this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease); - } - } - try { this.cancellationTokenSource.Cancel(); @@ -137,6 +121,24 @@ public async Task CloseAsync(CloseReason reason) // Report the failure to the general error handler instead. this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, "Closing Event Processor"); } + finally + { + // Release the lease regardless of result from pump's close call above. + // Increase the chance of a healthy host grabbing the lease here. + if (reason != CloseReason.LeaseLost) + { + // Since this pump is dead, release the lease. + try + { + await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); + } + catch (Exception e) + { + // Log and ignore any failure since expired lease will be picked by another host. + this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease); + } + } + } this.PumpStatus = PartitionPumpStatus.Closed; ProcessorEventSource.Log.PartitionPumpCloseStop(this.Host.HostName, this.PartitionContext.PartitionId); diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Infrastructure/TestUtility.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Infrastructure/TestUtility.cs index 98223d68b455a..0ba7ee34eb7b4 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Infrastructure/TestUtility.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Infrastructure/TestUtility.cs @@ -79,6 +79,18 @@ internal static class TestUtility internal static string GetEntityConnectionString(string entityName) => new EventHubsConnectionStringBuilder(EventHubsConnectionString) { EntityPath = entityName }.ToString(); + internal static async Task SendToEventhubAsync(EventHubClient ehClient, int numberOfMessages = 1) + { + TestUtility.Log($"Starting to send {numberOfMessages} messages."); + + for (int i = 0; i < numberOfMessages; i++) + { + await ehClient.SendAsync(new EventData(Encoding.UTF8.GetBytes("Hello Event Hubs!"))); + } + + TestUtility.Log("Sends done."); + } + internal static Task SendToPartitionAsync(EventHubClient ehClient, string partitionId, string messageBody, int numberOfMessages = 1) { return SendToPartitionAsync(ehClient, partitionId, new EventData(Encoding.UTF8.GetBytes(messageBody)), numberOfMessages); diff --git a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTests.cs b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTests.cs index 248d4bb9f92db..af27a099a34d4 100644 --- a/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTests.cs +++ b/sdk/eventhub/Microsoft.Azure.EventHubs/tests/Processor/ProcessorTests.cs @@ -1185,6 +1185,95 @@ public async Task DontCheckpointStartOfStream() Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "Didn't receive exactly 1 event."); } } + + [Fact] + [LiveTest] + [DisplayTestMethodName] + public async Task CheckpointOnShutdown() + { + int checkpointFailureCount = 0; + int checkpointSuccessCount = 0; + int receivedEventCount = 0; + + await using (var scope = await EventHubScope.CreateAsync(1)) + { + var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName); + + // Use a randomly generated container name so that initial offset provider will be respected. + var eventProcessorHost = new EventProcessorHost( + string.Empty, + PartitionReceiver.DefaultConsumerGroupName, + connectionString, + TestUtility.StorageConnectionString, + Guid.NewGuid().ToString()); + + var processorOptions = new EventProcessorOptions + { + ReceiveTimeout = TimeSpan.FromSeconds(10), + InitialOffsetProvider = partitionId => EventPosition.FromEnd(), + InvokeProcessorAfterReceiveTimeout = false + }; + + var processorFactory = new TestEventProcessorFactory(); + processorFactory.OnCreateProcessor += (f, createArgs) => + { + var processor = createArgs.Item2; + string partitionId = createArgs.Item1.PartitionId; + string hostName = createArgs.Item1.Owner; + processor.OnOpen += (_, partitionContext) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor opened"); + processor.OnClose += (_, closeArgs) => + { + TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); + if (closeArgs.Item2 != CloseReason.Shutdown) + { + return; + } + + try + { + // Checkpoint at close. + closeArgs.Item1.CheckpointAsync().GetAwaiter().GetResult(); + Interlocked.Increment(ref checkpointSuccessCount); + } + catch (Exception) + { + Interlocked.Increment(ref checkpointFailureCount); + throw; + } + }; + + processor.OnProcessError += (_, errorArgs) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + processor.OnProcessEvents += (_, eventsArgs) => + { + int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; + if (eventCount > 0) + { + TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + Interlocked.Increment(ref receivedEventCount); + } + }; + }; + + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); + + // Now send some number of messages. + var ehClient = EventHubClient.CreateFromConnectionString(connectionString); + await TestUtility.SendToEventhubAsync(ehClient, 10); + await Task.Delay(TimeSpan.FromSeconds(30)); + + // Now unregister processor + await eventProcessorHost.UnregisterEventProcessorAsync(); + + // At least one message received. + Assert.True(receivedEventCount > 0, "Didn't receive any events."); + + // At least one successful checkpoint call. + Assert.True(checkpointSuccessCount > 0, $"checkpointSuccessCount == 0, checkpointFailureCount={checkpointFailureCount}"); + + // No failed checkpoint call. + Assert.True(checkpointFailureCount == 0, $"checkpointFailureCount == {checkpointFailureCount}"); + } + } } }