diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs index e44860c6..6ce62997 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Client.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs @@ -80,8 +80,6 @@ public async Task StopAsync() // cancel the token, if not already cancelled. this.cts.Cancel(); - await this.ResponseTimeouts.StopAsync(); - // We now enter the final stage of client shutdown, where we forcefully cancel // all requests that have not completed yet. this.allRemainingRequestsAreNowBeingCancelled = true; @@ -98,6 +96,8 @@ public async Task StopAsync() } } + await this.ResponseTimeouts.StopAsync(); + this.cts.Dispose(); this.traceHelper.TraceProgress("Stopped"); diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsClientSender.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsClientSender.cs index eabac3eb..d08b982d 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsClientSender.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsClientSender.cs @@ -19,12 +19,12 @@ class EventHubsClientSender readonly EventHubsSender[] channels; int roundRobin; - public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, EventHubsTraceHelper traceHelper) + public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper) { this.channels = new Netherite.EventHubsTransport.EventHubsSender[senders.Length]; for (int i = 0; i < senders.Length; i++) { - this.channels[i] = new EventHubsSender(host, taskHubGuid, senders[i], traceHelper); + this.channels[i] = new EventHubsSender(host, taskHubGuid, senders[i], shutdownToken, traceHelper); } } @@ -41,5 +41,10 @@ public void Submit(ClientEvent toSend) var channel = this.channels.FirstOrDefault(this.Idle) ?? this.NextChannel(); channel.Submit(toSend); } + + public Task WaitForShutdownAsync() + { + return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync())); + } } } diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs index ede8deff..a45d00ea 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs @@ -19,6 +19,7 @@ class EventHubsConnections readonly string[] clientHubs; readonly string partitionHub; readonly string loadMonitorHub; + readonly CancellationToken shutdownToken; EventHubClient partitionClient; List clientClients; @@ -44,12 +45,14 @@ public EventHubsConnections( ConnectionInfo connectionInfo, string partitionHub, string[] clientHubs, - string loadMonitorHub) + string loadMonitorHub, + CancellationToken shutdownToken) { this.connectionInfo = connectionInfo; this.partitionHub = partitionHub; this.clientHubs = clientHubs; this.loadMonitorHub = loadMonitorHub; + this.shutdownToken = shutdownToken; } public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}"; @@ -62,30 +65,43 @@ await Task.WhenAll( this.EnsureLoadMonitorAsync()); } - public async Task StopAsync() + public Task StopAsync() { - IEnumerable Clients() + return Task.WhenAll( + this.StopClientClients(), + this.StopPartitionClients(), + this.StopLoadMonitorClients() + ); + } + + async Task StopClientClients() + { + await Task.WhenAll(this._clientSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList()); + + if (this.clientClients != null) { - if (this.partitionClient != null) - { - yield return this.partitionClient; - } + await Task.WhenAll(this.clientClients.Select(client => client.CloseAsync()).ToList()); + } + } - if (this.clientClients != null) - { - foreach (var client in this.clientClients) - { - yield return client; - } - } + async Task StopPartitionClients() + { + await Task.WhenAll(this._partitionSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList()); - if (this.loadMonitorHub != null) - { - yield return this.loadMonitorClient; - } + if (this.partitionClient != null) + { + await this.partitionClient.CloseAsync(); } + } + + async Task StopLoadMonitorClients() + { + await Task.WhenAll(this._loadMonitorSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList()); - await Task.WhenAll(Clients().Select(client => client.CloseAsync()).ToList()); + if (this.loadMonitorHub != null) + { + await this.loadMonitorClient.CloseAsync(); + } } const int EventHubCreationRetries = 5; @@ -267,6 +283,7 @@ public EventHubsSender GetPartitionSender(int partitionId, this.Host, taskHubGuid, partitionSender, + this.shutdownToken, this.TraceHelper); this.TraceHelper.LogDebug("Created PartitionSender {sender} from {clientId}", partitionSender.ClientId, client.ClientId); return sender; @@ -290,6 +307,7 @@ public EventHubsClientSender GetClientSender(Guid clientId, byte[] taskHubGuid) taskHubGuid, clientId, partitionSenders, + this.shutdownToken, this.TraceHelper); return sender; }); @@ -304,6 +322,7 @@ public LoadMonitorSender GetLoadMonitorSender(byte[] taskHubGuid) this.Host, taskHubGuid, loadMonitorSender, + this.shutdownToken, this.TraceHelper); this.TraceHelper.LogDebug("Created LoadMonitorSender {sender} from {clientId}", loadMonitorSender.ClientId, this.loadMonitorClient.ClientId); return sender; diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsSender.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsSender.cs index b1e17661..76299ef2 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsSender.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsSender.cs @@ -27,8 +27,8 @@ class EventHubsSender : BatchWorker where T: Event readonly MemoryStream stream = new MemoryStream(); // reused for all packets readonly Stopwatch stopwatch = new Stopwatch(); - public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper) - : base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper) + public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper) + : base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper) { this.host = host; this.taskHubGuid = taskHubGuid; @@ -59,6 +59,7 @@ protected override async Task Process(IList toSend) async Task SendBatch(int lastPosition) { maybeSent = lastPosition; + this.cancellationToken.ThrowIfCancellationRequested(); this.stopwatch.Restart(); await this.sender.SendAsync(batch).ConfigureAwait(false); this.stopwatch.Stop(); @@ -140,6 +141,12 @@ async Task SendBatch(int lastPosition) this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send due to message size, reducing to {maxMessageSize}kB", this.eventHubName, this.eventHubPartition, this.maxMessageSize / 1024); } + catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested) + { + // normal during shutdown + this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition); + return; + } catch (Exception e) { this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition); diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs index f3d0037b..d461705d 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs @@ -102,7 +102,7 @@ async Task ITransportLayer.StartAsync() // check that the storage format is supported, and load the relevant FASTER tuning parameters BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning); - this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub) + this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token) { Host = host, TraceHelper = this.traceHelper, diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorSender.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorSender.cs index 39a5a234..fe216cb4 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorSender.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorSender.cs @@ -26,8 +26,8 @@ class LoadMonitorSender : BatchWorker readonly MemoryStream stream = new MemoryStream(); // reused for all packets readonly Stopwatch stopwatch = new Stopwatch(); - public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper) - : base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper) + public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper) + : base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper) { this.host = host; this.taskHubGuid = taskHubGuid; @@ -89,6 +89,7 @@ protected override async Task Process(IList toSend) int length = (int)(this.stream.Position); var arraySegment = new ArraySegment(this.stream.GetBuffer(), 0, length); var eventData = new EventData(arraySegment); + this.cancellationToken.ThrowIfCancellationRequested(); await this.sender.SendAsync(eventData); this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString); this.stream.Seek(0, SeekOrigin.Begin); @@ -106,6 +107,12 @@ protected override async Task Process(IList toSend) this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds); } } + catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested) + { + // normal during shutdown + this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition); + return; + } catch (Exception e) { this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition); diff --git a/src/DurableTask.Netherite/Util/BatchWorker.cs b/src/DurableTask.Netherite/Util/BatchWorker.cs index 80b9e91c..3e8ab616 100644 --- a/src/DurableTask.Netherite/Util/BatchWorker.cs +++ b/src/DurableTask.Netherite/Util/BatchWorker.cs @@ -39,6 +39,8 @@ abstract class BatchWorker bool processingBatch; public TimeSpan? ProcessingBatchSince => this.processingBatch ? this.stopwatch.Elapsed : null; + volatile TaskCompletionSource shutdownCompletionSource; + /// /// Constructor including a cancellation token. /// @@ -93,6 +95,22 @@ public virtual Task WaitForCompletionAsync() return tcs.Task; } + public Task WaitForShutdownAsync() + { + if (!this.cancellationToken.IsCancellationRequested) + { + throw new InvalidOperationException("must call this only after canceling the token"); + } + + if (this.shutdownCompletionSource == null) + { + Interlocked.CompareExchange(ref this.shutdownCompletionSource, new TaskCompletionSource(), null); + this.NotifyInternal(); + } + + return this.shutdownCompletionSource.Task; + } + readonly List batch = new List(); readonly List> waiters = new List>(); IList requeued = null; @@ -226,6 +244,11 @@ async Task Work() this.processingBatch = false; previousBatch = this.batch.Count; } + + if (this.cancellationToken.IsCancellationRequested) + { + this.shutdownCompletionSource?.TrySetResult(null); + } } public void Resume()