diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 8f13bb5f..fffcdcbc 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -168,6 +168,11 @@ public class NetheriteOrchestrationServiceSettings /// public int PackPartitionTaskMessages { get; set; } = 100; + /// + /// Time limit for partition startup, in minutes. + /// + public int PartitionStartupTimeoutMinutes { get; set; } = 15; + /// /// Allows attaching additional checkers and debuggers during testing. /// diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs index 9b6e3406..7f641cce 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs @@ -55,7 +55,10 @@ partial class Partition : TransportAbstraction.IPartition // A little helper property that allows us to conventiently check the condition for low-level event tracing public EventTraceHelper EventDetailTracer => this.EventTraceHelper.IsTracingAtMostDetailedLevel ? this.EventTraceHelper : null; - static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(5); + // We use this semaphore to limit how many partitions can be starting up at the same time on the same host + // because starting up a partition may temporarily consume a lot of CPU, I/O, and memory + const int ConcurrentStartsLimit = 5; + static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(ConcurrentStartsLimit); public Partition( NetheriteOrchestrationService host, @@ -93,9 +96,6 @@ public Partition( EventTraceContext.Clear(); this.ErrorHandler = errorHandler; - - this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, ""); - errorHandler.Token.Register(() => { this.TraceHelper.TracePartitionProgress("Terminated", ref this.LastTransition, this.CurrentTimeMs, ""); @@ -108,26 +108,36 @@ public Partition( } }, useSynchronizationContext: false); - + + // before we start the partition, we have to acquire the MaxConcurrentStarts semaphore + // (to prevent a host from being overwhelmed by the simultaneous start of too many partitions) + this.TraceHelper.TracePartitionProgress("Waiting", ref this.LastTransition, this.CurrentTimeMs, $"max={ConcurrentStartsLimit} available={MaxConcurrentStarts.CurrentCount}"); await MaxConcurrentStarts.WaitAsync(); - // create or restore partition state from last snapshot try { - // create the state - this.State = ((TransportAbstraction.IHost) this.host).StorageLayer.CreatePartitionState(parameters); + this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, ""); - // initialize timer for this partition - this.PendingTimers = new BatchTimer(this.ErrorHandler.Token, this.TimersFired); + (long, int) inputQueuePosition; - // goes to storage to create or restore the partition state - var inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false); + await using (new PartitionTimeout(errorHandler, "partition startup", TimeSpan.FromMinutes(this.Settings.PartitionStartupTimeoutMinutes))) + { + // create or restore partition state from last snapshot + // create the state + this.State = ((TransportAbstraction.IHost)this.host).StorageLayer.CreatePartitionState(parameters); - // start processing the timers - this.PendingTimers.Start($"Timer{this.PartitionId:D2}"); + // initialize timer for this partition + this.PendingTimers = new BatchTimer(this.ErrorHandler.Token, this.TimersFired); - // start processing the worker queues - this.State.StartProcessing(); + // goes to storage to create or restore the partition state + inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false); + + // start processing the timers + this.PendingTimers.Start($"Timer{this.PartitionId:D2}"); + + // start processing the worker queues + this.State.StartProcessing(); + } this.TraceHelper.TracePartitionProgress("Started", ref this.LastTransition, this.CurrentTimeMs, $"nextInputQueuePosition={inputQueuePosition.Item1}.{inputQueuePosition.Item2}"); return inputQueuePosition; diff --git a/src/DurableTask.Netherite/Util/PartitionTimeout.cs b/src/DurableTask.Netherite/Util/PartitionTimeout.cs new file mode 100644 index 00000000..e4c32b6b --- /dev/null +++ b/src/DurableTask.Netherite/Util/PartitionTimeout.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite +{ + using System; + using System.Collections.Generic; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + + /// + /// A utility class for terminating the partition if some task takes too long. + /// Implemented as a disposable, with used to cancel the timeout. + /// + class PartitionTimeout : IAsyncDisposable + { + readonly CancellationTokenSource tokenSource; + readonly Task timeoutTask; + + public PartitionTimeout(IPartitionErrorHandler errorHandler, string task, TimeSpan timeout) + { + this.tokenSource = new CancellationTokenSource(); + this.timeoutTask = Task.Run(async () => + { + try + { + await Task.Delay(timeout, this.tokenSource.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // we did not time out + return; + } + + errorHandler.HandleError( + $"{nameof(PartitionTimeout)}", + $"{task} timed out after {timeout}", + e: null, + terminatePartition: true, + reportAsWarning: false); + }); + } + + public async ValueTask DisposeAsync() + { + // cancel the timeout task (if it has not already completed) + this.tokenSource.Cancel(); + + // wait for the timeouttask to complete here, so we can be sure that the + // decision about the timeout firing or not firing has been made + // before we leave this method + await this.timeoutTask.ConfigureAwait(false); + + // we can dispose the token source now since the timeoutTask is completed + this.tokenSource.Dispose(); + } + } +}