From ea741f52477b82bdc14e611ac45c9bc72c82f727 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 12 Apr 2023 10:10:17 -0700 Subject: [PATCH 1/2] make shutdown for clients and workers parallel --- .../OrchestrationService/Client.cs | 3 +- .../EventHubs/EventHubsProcessor.cs | 9 +-- .../EventHubs/EventHubsTransport.cs | 62 ++++++++++++------- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs index a1f67874..e44860c6 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Client.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs @@ -83,8 +83,7 @@ public async Task StopAsync() await this.ResponseTimeouts.StopAsync(); // We now enter the final stage of client shutdown, where we forcefully cancel - // all requests that have not completed yet. We do this as late as possible in the shutdown - // process, so that requests still have a chance to successfully complete as long as possible. + // all requests that have not completed yet. this.allRemainingRequestsAreNowBeingCancelled = true; while (true) { diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs index 92b401c0..4104334d 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs @@ -266,14 +266,15 @@ async Task ShutdownAsync() using (await this.deliveryLock.LockAsync()) { - this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} starting shutdown task", this.eventHubName, this.eventHubPartition); - if (this.shutdownTask == null) { + this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} starting shutdown task", this.eventHubName, this.eventHubPartition); this.shutdownTask = Task.Run(() => ShutdownAsync()); } - - this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} started shutdown task", this.eventHubName, this.eventHubPartition); + else + { + this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} shutdown task already started", this.eventHubName, this.eventHubPartition); + } } await this.shutdownTask; diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs index 5e790cde..41b2bfc2 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs @@ -159,7 +159,7 @@ async Task StartPartitionHost() { if (this.settings.PartitionManagement != PartitionManagementOptions.Scripted) { - this.traceHelper.LogInformation($"Registering Partition Host with EventHubs"); + this.traceHelper.LogInformation($"EventHubsTransport is registering PartitionHost"); string formattedCreationDate = this.connections.CreationTimestamp.ToString("o").Replace("/", "-"); @@ -181,11 +181,11 @@ await this.eventProcessorHost.RegisterEventProcessorFactoryAsync( new PartitionEventProcessorFactory(this), processorOptions); - this.traceHelper.LogInformation($"Partition Host started"); + this.traceHelper.LogInformation($"EventHubsTransport started PartitionHost"); } else { - this.traceHelper.LogInformation($"Starting scripted partition host"); + this.traceHelper.LogInformation($"EventHubsTransport is starting scripted partition host"); this.scriptedEventProcessorHost = new ScriptedEventProcessorHost( EventHubsTransport.PartitionHub, EventHubsTransport.PartitionConsumerGroup, @@ -207,7 +207,7 @@ await this.eventProcessorHost.RegisterEventProcessorFactoryAsync( async Task StartLoadMonitorHost() { - this.traceHelper.LogInformation("Registering LoadMonitor Host with EventHubs"); + this.traceHelper.LogInformation("EventHubsTransport is registering LoadMonitorHost"); this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync( Guid.NewGuid().ToString(), @@ -237,7 +237,7 @@ internal async Task ExitProcess(bool deletePartitionsFirst) await this.connections.DeletePartitions(); } - this.traceHelper.LogError("Killing process in 10 seconds"); + this.traceHelper.LogError("EventHubsTransport is killing process in 10 seconds"); await Task.Delay(TimeSpan.FromSeconds(10)); System.Environment.Exit(222); } @@ -289,39 +289,53 @@ public IEventProcessor CreateEventProcessor(PartitionContext context) async Task ITransportLayer.StopAsync() { - this.traceHelper.LogInformation("Shutting down EventHubsBackend"); - this.shutdownSource.Cancel(); // initiates shutdown of client and of all partitions + this.traceHelper.LogInformation("EventHubsTransport is shutting down"); + this.shutdownSource.Cancel(); // immediately initiates shutdown of client and of all partitions - if (this.hasWorkers) - { - this.traceHelper.LogDebug("Stopping partition and loadmonitor hosts"); - await Task.WhenAll( - this.StopPartitionHost(), - this.loadMonitorHost.UnregisterEventProcessorAsync()); - } + await Task.WhenAll( + this.hasWorkers ? this.StopWorkersAsync() : Task.CompletedTask, + this.StopClientsAndConnectionsAsync()); - this.traceHelper.LogDebug("Stopping client process loop"); - await this.clientProcessTask; + this.traceHelper.LogInformation("EventHubsTransport is shut down"); + } - this.traceHelper.LogDebug("Stopping client"); - await this.client.StopAsync(); + async Task StopWorkersAsync() + { + this.traceHelper.LogDebug("EventHubsTransport is stopping partition and loadmonitor hosts"); + await Task.WhenAll( + this.StopPartitionHostAsync(), + this.StopLoadMonitorHostAsync()); + } - this.traceHelper.LogDebug("Closing connections"); + async Task StopClientsAndConnectionsAsync() + { + this.traceHelper.LogDebug("EventHubsTransport is stopping client process loop"); + await this.clientProcessTask; + + this.traceHelper.LogDebug("EventHubsTransport is closing connections"); await this.connections.StopAsync(); - this.traceHelper.LogInformation("EventHubsBackend shutdown completed"); + this.traceHelper.LogDebug("EventHubsTransport is stopping client"); + await this.client.StopAsync(); } - Task StopPartitionHost() + async Task StopPartitionHostAsync() { if (this.settings.PartitionManagement != PartitionManagementOptions.Scripted) { - return this.eventProcessorHost.UnregisterEventProcessorAsync(); + await this.eventProcessorHost.UnregisterEventProcessorAsync(); } else { - return this.scriptedEventProcessorHost.StopAsync(); - } + await this.scriptedEventProcessorHost.StopAsync(); + } + this.traceHelper.LogDebug("EventHubsTransport stopped partition host"); + } + + async Task StopLoadMonitorHostAsync() + { + await this.loadMonitorHost.UnregisterEventProcessorAsync(); + this.traceHelper.LogDebug("EventHubsTransport stopped loadmonitor host"); } IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext partitionContext) From 7ebbd91b8232f45dab56e4463bc07d632bbe2a26 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Mon, 17 Apr 2023 10:28:05 -0700 Subject: [PATCH 2/2] address PR feedback --- .../TransportLayer/EventHubs/EventHubsTransport.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs index 41b2bfc2..f3d0037b 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs @@ -317,6 +317,8 @@ async Task StopClientsAndConnectionsAsync() this.traceHelper.LogDebug("EventHubsTransport is stopping client"); await this.client.StopAsync(); + + this.traceHelper.LogDebug("EventHubsTransport stopped clients"); } async Task StopPartitionHostAsync()