Skip to content

Commit

Permalink
Implement clean shutdown of Event Hubs senders (#257)
Browse files Browse the repository at this point in the history
* shutdown eventhubs senders

* reorder the stopping of timeouts and clientrequests (to avoid ObjectDisposed exceptions)
  • Loading branch information
sebastianburckhardt authored Jun 16, 2023
1 parent 0654d60 commit d47dc35
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public async Task StopAsync()
// cancel the token, if not already cancelled.
this.cts.Cancel();

await this.ResponseTimeouts.StopAsync();

// We now enter the final stage of client shutdown, where we forcefully cancel
// all requests that have not completed yet.
this.allRemainingRequestsAreNowBeingCancelled = true;
Expand All @@ -98,6 +96,8 @@ public async Task StopAsync()
}
}

await this.ResponseTimeouts.StopAsync();

this.cts.Dispose();

this.traceHelper.TraceProgress("Stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class EventHubsClientSender
readonly EventHubsSender<ClientEvent>[] channels;
int roundRobin;

public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, EventHubsTraceHelper traceHelper)
public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
{
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[senders.Length];
for (int i = 0; i < senders.Length; i++)
{
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], traceHelper);
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], shutdownToken, traceHelper);
}
}

Expand All @@ -41,5 +41,10 @@ public void Submit(ClientEvent toSend)
var channel = this.channels.FirstOrDefault(this.Idle) ?? this.NextChannel();
channel.Submit(toSend);
}

public Task WaitForShutdownAsync()
{
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class EventHubsConnections
readonly string[] clientHubs;
readonly string partitionHub;
readonly string loadMonitorHub;
readonly CancellationToken shutdownToken;

EventHubClient partitionClient;
List<EventHubClient> clientClients;
Expand All @@ -44,12 +45,14 @@ public EventHubsConnections(
ConnectionInfo connectionInfo,
string partitionHub,
string[] clientHubs,
string loadMonitorHub)
string loadMonitorHub,
CancellationToken shutdownToken)
{
this.connectionInfo = connectionInfo;
this.partitionHub = partitionHub;
this.clientHubs = clientHubs;
this.loadMonitorHub = loadMonitorHub;
this.shutdownToken = shutdownToken;
}

public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";
Expand All @@ -62,30 +65,43 @@ await Task.WhenAll(
this.EnsureLoadMonitorAsync());
}

public async Task StopAsync()
public Task StopAsync()
{
IEnumerable<EventHubClient> Clients()
return Task.WhenAll(
this.StopClientClients(),
this.StopPartitionClients(),
this.StopLoadMonitorClients()
);
}

async Task StopClientClients()
{
await Task.WhenAll(this._clientSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());

if (this.clientClients != null)
{
if (this.partitionClient != null)
{
yield return this.partitionClient;
}
await Task.WhenAll(this.clientClients.Select(client => client.CloseAsync()).ToList());
}
}

if (this.clientClients != null)
{
foreach (var client in this.clientClients)
{
yield return client;
}
}
async Task StopPartitionClients()
{
await Task.WhenAll(this._partitionSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());

if (this.loadMonitorHub != null)
{
yield return this.loadMonitorClient;
}
if (this.partitionClient != null)
{
await this.partitionClient.CloseAsync();
}
}

async Task StopLoadMonitorClients()
{
await Task.WhenAll(this._loadMonitorSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());

await Task.WhenAll(Clients().Select(client => client.CloseAsync()).ToList());
if (this.loadMonitorHub != null)
{
await this.loadMonitorClient.CloseAsync();
}
}

const int EventHubCreationRetries = 5;
Expand Down Expand Up @@ -267,6 +283,7 @@ public EventHubsSender<PartitionUpdateEvent> GetPartitionSender(int partitionId,
this.Host,
taskHubGuid,
partitionSender,
this.shutdownToken,
this.TraceHelper);
this.TraceHelper.LogDebug("Created PartitionSender {sender} from {clientId}", partitionSender.ClientId, client.ClientId);
return sender;
Expand All @@ -290,6 +307,7 @@ public EventHubsClientSender GetClientSender(Guid clientId, byte[] taskHubGuid)
taskHubGuid,
clientId,
partitionSenders,
this.shutdownToken,
this.TraceHelper);
return sender;
});
Expand All @@ -304,6 +322,7 @@ public LoadMonitorSender GetLoadMonitorSender(byte[] taskHubGuid)
this.Host,
taskHubGuid,
loadMonitorSender,
this.shutdownToken,
this.TraceHelper);
this.TraceHelper.LogDebug("Created LoadMonitorSender {sender} from {clientId}", loadMonitorSender.ClientId, this.loadMonitorClient.ClientId);
return sender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class EventHubsSender<T> : BatchWorker<Event> where T: Event
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();

public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
{
this.host = host;
this.taskHubGuid = taskHubGuid;
Expand Down Expand Up @@ -59,6 +59,7 @@ protected override async Task Process(IList<Event> toSend)
async Task SendBatch(int lastPosition)
{
maybeSent = lastPosition;
this.cancellationToken.ThrowIfCancellationRequested();
this.stopwatch.Restart();
await this.sender.SendAsync(batch).ConfigureAwait(false);
this.stopwatch.Stop();
Expand Down Expand Up @@ -140,6 +141,12 @@ async Task SendBatch(int lastPosition)
this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send due to message size, reducing to {maxMessageSize}kB",
this.eventHubName, this.eventHubPartition, this.maxMessageSize / 1024);
}
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async Task<TaskhubParameters> ITransportLayer.StartAsync()
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);

this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
{
Host = host,
TraceHelper = this.traceHelper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class LoadMonitorSender : BatchWorker<LoadMonitorEvent>
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();

public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
{
this.host = host;
this.taskHubGuid = taskHubGuid;
Expand Down Expand Up @@ -89,6 +89,7 @@ protected override async Task Process(IList<LoadMonitorEvent> toSend)
int length = (int)(this.stream.Position);
var arraySegment = new ArraySegment<byte>(this.stream.GetBuffer(), 0, length);
var eventData = new EventData(arraySegment);
this.cancellationToken.ThrowIfCancellationRequested();
await this.sender.SendAsync(eventData);
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
this.stream.Seek(0, SeekOrigin.Begin);
Expand All @@ -106,6 +107,12 @@ protected override async Task Process(IList<LoadMonitorEvent> toSend)
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds);
}
}
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);
Expand Down
23 changes: 23 additions & 0 deletions src/DurableTask.Netherite/Util/BatchWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ abstract class BatchWorker<T>
bool processingBatch;
public TimeSpan? ProcessingBatchSince => this.processingBatch ? this.stopwatch.Elapsed : null;

volatile TaskCompletionSource<object> shutdownCompletionSource;

/// <summary>
/// Constructor including a cancellation token.
/// </summary>
Expand Down Expand Up @@ -93,6 +95,22 @@ public virtual Task WaitForCompletionAsync()
return tcs.Task;
}

public Task WaitForShutdownAsync()
{
if (!this.cancellationToken.IsCancellationRequested)
{
throw new InvalidOperationException("must call this only after canceling the token");
}

if (this.shutdownCompletionSource == null)
{
Interlocked.CompareExchange(ref this.shutdownCompletionSource, new TaskCompletionSource<object>(), null);
this.NotifyInternal();
}

return this.shutdownCompletionSource.Task;
}

readonly List<T> batch = new List<T>();
readonly List<TaskCompletionSource<bool>> waiters = new List<TaskCompletionSource<bool>>();
IList<T> requeued = null;
Expand Down Expand Up @@ -226,6 +244,11 @@ async Task Work()
this.processingBatch = false;
previousBatch = this.batch.Count;
}

if (this.cancellationToken.IsCancellationRequested)
{
this.shutdownCompletionSource?.TrySetResult(null);
}
}

public void Resume()
Expand Down

0 comments on commit d47dc35

Please sign in to comment.