Skip to content

Commit

Permalink
Optimize host shutdown (#249)
Browse files Browse the repository at this point in the history
* make shutdown for clients and workers parallel

* address PR feedback
  • Loading branch information
sebastianburckhardt authored Apr 20, 2023
1 parent 021d5c6 commit 6090def
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 30 deletions.
3 changes: 1 addition & 2 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public async Task StopAsync()
await this.ResponseTimeouts.StopAsync();

// We now enter the final stage of client shutdown, where we forcefully cancel
// all requests that have not completed yet. We do this as late as possible in the shutdown
// process, so that requests still have a chance to successfully complete as long as possible.
// all requests that have not completed yet.
this.allRemainingRequestsAreNowBeingCancelled = true;
while (true)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,15 @@ async Task ShutdownAsync()

using (await this.deliveryLock.LockAsync())
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} starting shutdown task", this.eventHubName, this.eventHubPartition);

if (this.shutdownTask == null)
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} starting shutdown task", this.eventHubName, this.eventHubPartition);
this.shutdownTask = Task.Run(() => ShutdownAsync());
}

this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} started shutdown task", this.eventHubName, this.eventHubPartition);
else
{
this.traceHelper.LogDebug("EventHubsProcessor {eventHubName}/{eventHubPartition} shutdown task already started", this.eventHubName, this.eventHubPartition);
}
}

await this.shutdownTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async Task StartPartitionHost()
{
if (this.settings.PartitionManagement != PartitionManagementOptions.Scripted)
{
this.traceHelper.LogInformation($"Registering Partition Host with EventHubs");
this.traceHelper.LogInformation($"EventHubsTransport is registering PartitionHost");

string formattedCreationDate = this.connections.CreationTimestamp.ToString("o").Replace("/", "-");

Expand All @@ -181,11 +181,11 @@ await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(
new PartitionEventProcessorFactory(this),
processorOptions);

this.traceHelper.LogInformation($"Partition Host started");
this.traceHelper.LogInformation($"EventHubsTransport started PartitionHost");
}
else
{
this.traceHelper.LogInformation($"Starting scripted partition host");
this.traceHelper.LogInformation($"EventHubsTransport is starting scripted partition host");
this.scriptedEventProcessorHost = new ScriptedEventProcessorHost(
EventHubsTransport.PartitionHub,
EventHubsTransport.PartitionConsumerGroup,
Expand All @@ -207,7 +207,7 @@ await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(

async Task StartLoadMonitorHost()
{
this.traceHelper.LogInformation("Registering LoadMonitor Host with EventHubs");
this.traceHelper.LogInformation("EventHubsTransport is registering LoadMonitorHost");

this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
Expand Down Expand Up @@ -237,7 +237,7 @@ internal async Task ExitProcess(bool deletePartitionsFirst)
await this.connections.DeletePartitions();
}

this.traceHelper.LogError("Killing process in 10 seconds");
this.traceHelper.LogError("EventHubsTransport is killing process in 10 seconds");
await Task.Delay(TimeSpan.FromSeconds(10));
System.Environment.Exit(222);
}
Expand Down Expand Up @@ -289,39 +289,55 @@ public IEventProcessor CreateEventProcessor(PartitionContext context)

async Task ITransportLayer.StopAsync()
{
this.traceHelper.LogInformation("Shutting down EventHubsBackend");
this.shutdownSource.Cancel(); // initiates shutdown of client and of all partitions
this.traceHelper.LogInformation("EventHubsTransport is shutting down");
this.shutdownSource.Cancel(); // immediately initiates shutdown of client and of all partitions

if (this.hasWorkers)
{
this.traceHelper.LogDebug("Stopping partition and loadmonitor hosts");
await Task.WhenAll(
this.StopPartitionHost(),
this.loadMonitorHost.UnregisterEventProcessorAsync());
}
await Task.WhenAll(
this.hasWorkers ? this.StopWorkersAsync() : Task.CompletedTask,
this.StopClientsAndConnectionsAsync());

this.traceHelper.LogDebug("Stopping client process loop");
await this.clientProcessTask;
this.traceHelper.LogInformation("EventHubsTransport is shut down");
}

this.traceHelper.LogDebug("Stopping client");
await this.client.StopAsync();
async Task StopWorkersAsync()
{
this.traceHelper.LogDebug("EventHubsTransport is stopping partition and loadmonitor hosts");
await Task.WhenAll(
this.StopPartitionHostAsync(),
this.StopLoadMonitorHostAsync());
}

async Task StopClientsAndConnectionsAsync()
{
this.traceHelper.LogDebug("EventHubsTransport is stopping client process loop");
await this.clientProcessTask;

this.traceHelper.LogDebug("Closing connections");
this.traceHelper.LogDebug("EventHubsTransport is closing connections");
await this.connections.StopAsync();

this.traceHelper.LogInformation("EventHubsBackend shutdown completed");
this.traceHelper.LogDebug("EventHubsTransport is stopping client");
await this.client.StopAsync();

this.traceHelper.LogDebug("EventHubsTransport stopped clients");
}

Task StopPartitionHost()
async Task StopPartitionHostAsync()
{
if (this.settings.PartitionManagement != PartitionManagementOptions.Scripted)
{
return this.eventProcessorHost.UnregisterEventProcessorAsync();
await this.eventProcessorHost.UnregisterEventProcessorAsync();
}
else
{
return this.scriptedEventProcessorHost.StopAsync();
}
await this.scriptedEventProcessorHost.StopAsync();
}
this.traceHelper.LogDebug("EventHubsTransport stopped partition host");
}

async Task StopLoadMonitorHostAsync()
{
await this.loadMonitorHost.UnregisterEventProcessorAsync();
this.traceHelper.LogDebug("EventHubsTransport stopped loadmonitor host");
}

IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext partitionContext)
Expand Down

0 comments on commit 6090def

Please sign in to comment.