Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement clean shutdown of Event Hubs senders #257

Merged
merged 2 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public async Task StopAsync()
// cancel the token, if not already cancelled.
this.cts.Cancel();

await this.ResponseTimeouts.StopAsync();

// We now enter the final stage of client shutdown, where we forcefully cancel
// all requests that have not completed yet.
this.allRemainingRequestsAreNowBeingCancelled = true;
Expand All @@ -98,6 +96,8 @@ public async Task StopAsync()
}
}

await this.ResponseTimeouts.StopAsync();

this.cts.Dispose();

this.traceHelper.TraceProgress("Stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class EventHubsClientSender
readonly EventHubsSender<ClientEvent>[] channels;
int roundRobin;

public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, EventHubsTraceHelper traceHelper)
public EventHubsClientSender(TransportAbstraction.IHost host, byte[] taskHubGuid, Guid clientId, PartitionSender[] senders, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
{
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[senders.Length];
for (int i = 0; i < senders.Length; i++)
{
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], traceHelper);
this.channels[i] = new EventHubsSender<ClientEvent>(host, taskHubGuid, senders[i], shutdownToken, traceHelper);
}
}

Expand All @@ -41,5 +41,10 @@ public void Submit(ClientEvent toSend)
var channel = this.channels.FirstOrDefault(this.Idle) ?? this.NextChannel();
channel.Submit(toSend);
}

public Task WaitForShutdownAsync()
{
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class EventHubsConnections
readonly string[] clientHubs;
readonly string partitionHub;
readonly string loadMonitorHub;
readonly CancellationToken shutdownToken;

EventHubClient partitionClient;
List<EventHubClient> clientClients;
Expand All @@ -44,12 +45,14 @@ public EventHubsConnections(
ConnectionInfo connectionInfo,
string partitionHub,
string[] clientHubs,
string loadMonitorHub)
string loadMonitorHub,
CancellationToken shutdownToken)
{
this.connectionInfo = connectionInfo;
this.partitionHub = partitionHub;
this.clientHubs = clientHubs;
this.loadMonitorHub = loadMonitorHub;
this.shutdownToken = shutdownToken;
}

public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";
Expand All @@ -62,30 +65,43 @@ await Task.WhenAll(
this.EnsureLoadMonitorAsync());
}

public async Task StopAsync()
public Task StopAsync()
{
IEnumerable<EventHubClient> Clients()
return Task.WhenAll(
this.StopClientClients(),
this.StopPartitionClients(),
this.StopLoadMonitorClients()
);
}

async Task StopClientClients()
{
await Task.WhenAll(this._clientSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());

if (this.clientClients != null)
{
if (this.partitionClient != null)
{
yield return this.partitionClient;
}
await Task.WhenAll(this.clientClients.Select(client => client.CloseAsync()).ToList());
}
}

if (this.clientClients != null)
{
foreach (var client in this.clientClients)
{
yield return client;
}
}
async Task StopPartitionClients()
{
await Task.WhenAll(this._partitionSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());

if (this.loadMonitorHub != null)
{
yield return this.loadMonitorClient;
}
if (this.partitionClient != null)
{
await this.partitionClient.CloseAsync();
}
}

async Task StopLoadMonitorClients()
{
await Task.WhenAll(this._loadMonitorSenders.Values.Select(sender => sender.WaitForShutdownAsync()).ToList());

await Task.WhenAll(Clients().Select(client => client.CloseAsync()).ToList());
if (this.loadMonitorHub != null)
{
await this.loadMonitorClient.CloseAsync();
}
}

const int EventHubCreationRetries = 5;
Expand Down Expand Up @@ -267,6 +283,7 @@ public EventHubsSender<PartitionUpdateEvent> GetPartitionSender(int partitionId,
this.Host,
taskHubGuid,
partitionSender,
this.shutdownToken,
this.TraceHelper);
this.TraceHelper.LogDebug("Created PartitionSender {sender} from {clientId}", partitionSender.ClientId, client.ClientId);
return sender;
Expand All @@ -290,6 +307,7 @@ public EventHubsClientSender GetClientSender(Guid clientId, byte[] taskHubGuid)
taskHubGuid,
clientId,
partitionSenders,
this.shutdownToken,
this.TraceHelper);
return sender;
});
Expand All @@ -304,6 +322,7 @@ public LoadMonitorSender GetLoadMonitorSender(byte[] taskHubGuid)
this.Host,
taskHubGuid,
loadMonitorSender,
this.shutdownToken,
this.TraceHelper);
this.TraceHelper.LogDebug("Created LoadMonitorSender {sender} from {clientId}", loadMonitorSender.ClientId, this.loadMonitorClient.ClientId);
return sender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class EventHubsSender<T> : BatchWorker<Event> where T: Event
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();

public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
public EventHubsSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
{
this.host = host;
this.taskHubGuid = taskHubGuid;
Expand Down Expand Up @@ -59,6 +59,7 @@ protected override async Task Process(IList<Event> toSend)
async Task SendBatch(int lastPosition)
{
maybeSent = lastPosition;
this.cancellationToken.ThrowIfCancellationRequested();
this.stopwatch.Restart();
await this.sender.SendAsync(batch).ConfigureAwait(false);
this.stopwatch.Stop();
Expand Down Expand Up @@ -140,6 +141,12 @@ async Task SendBatch(int lastPosition)
this.traceHelper.LogWarning("EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send due to message size, reducing to {maxMessageSize}kB",
this.eventHubName, this.eventHubPartition, this.maxMessageSize / 1024);
}
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async Task<TaskhubParameters> ITransportLayer.StartAsync()
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);

this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub)
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
{
Host = host,
TraceHelper = this.traceHelper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class LoadMonitorSender : BatchWorker<LoadMonitorEvent>
readonly MemoryStream stream = new MemoryStream(); // reused for all packets
readonly Stopwatch stopwatch = new Stopwatch();

public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, CancellationToken.None, traceHelper)
public LoadMonitorSender(TransportAbstraction.IHost host, byte[] taskHubGuid, PartitionSender sender, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper)
: base($"EventHubsSender {sender.EventHubClient.EventHubName}/{sender.PartitionId}", false, 2000, shutdownToken, traceHelper)
{
this.host = host;
this.taskHubGuid = taskHubGuid;
Expand Down Expand Up @@ -89,6 +89,7 @@ protected override async Task Process(IList<LoadMonitorEvent> toSend)
int length = (int)(this.stream.Position);
var arraySegment = new ArraySegment<byte>(this.stream.GetBuffer(), 0, length);
var eventData = new EventData(arraySegment);
this.cancellationToken.ThrowIfCancellationRequested();
await this.sender.SendAsync(eventData);
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
this.stream.Seek(0, SeekOrigin.Begin);
Expand All @@ -106,6 +107,12 @@ protected override async Task Process(IList<LoadMonitorEvent> toSend)
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds);
}
}
catch (OperationCanceledException) when (this.cancellationToken.IsCancellationRequested)
{
// normal during shutdown
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} was cancelled", this.eventHubName, this.eventHubPartition);
return;
}
catch (Exception e)
{
this.traceHelper.LogWarning(e, "EventHubsSender {eventHubName}/{eventHubPartitionId} failed to send", this.eventHubName, this.eventHubPartition);
Expand Down
23 changes: 23 additions & 0 deletions src/DurableTask.Netherite/Util/BatchWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ abstract class BatchWorker<T>
bool processingBatch;
public TimeSpan? ProcessingBatchSince => this.processingBatch ? this.stopwatch.Elapsed : null;

volatile TaskCompletionSource<object> shutdownCompletionSource;

/// <summary>
/// Constructor including a cancellation token.
/// </summary>
Expand Down Expand Up @@ -93,6 +95,22 @@ public virtual Task WaitForCompletionAsync()
return tcs.Task;
}

public Task WaitForShutdownAsync()
{
if (!this.cancellationToken.IsCancellationRequested)
{
throw new InvalidOperationException("must call this only after canceling the token");
}

if (this.shutdownCompletionSource == null)
{
Interlocked.CompareExchange(ref this.shutdownCompletionSource, new TaskCompletionSource<object>(), null);
this.NotifyInternal();
}

return this.shutdownCompletionSource.Task;
}

readonly List<T> batch = new List<T>();
readonly List<TaskCompletionSource<bool>> waiters = new List<TaskCompletionSource<bool>>();
IList<T> requeued = null;
Expand Down Expand Up @@ -226,6 +244,11 @@ async Task Work()
this.processingBatch = false;
previousBatch = this.batch.Count;
}

if (this.cancellationToken.IsCancellationRequested)
{
this.shutdownCompletionSource?.TrySetResult(null);
}
}

public void Resume()
Expand Down