Skip to content

Commit

Permalink
Revise partition startup (#332)
Browse files Browse the repository at this point in the history
* add "waiting" transition and timeout to partition startup

* address PR feedback
  • Loading branch information
sebastianburckhardt authored Nov 3, 2023
1 parent f24f6de commit fc777fe
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public int PackPartitionTaskMessages { get; set; } = 100;

/// <summary>
/// Time limit for partition startup, in minutes.
/// </summary>
public int PartitionStartupTimeoutMinutes { get; set; } = 15;

/// <summary>
/// Allows attaching additional checkers and debuggers during testing.
/// </summary>
Expand Down
42 changes: 26 additions & 16 deletions src/DurableTask.Netherite/OrchestrationService/Partition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ partial class Partition : TransportAbstraction.IPartition
// A little helper property that allows us to conventiently check the condition for low-level event tracing
public EventTraceHelper EventDetailTracer => this.EventTraceHelper.IsTracingAtMostDetailedLevel ? this.EventTraceHelper : null;

static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(5);
// We use this semaphore to limit how many partitions can be starting up at the same time on the same host
// because starting up a partition may temporarily consume a lot of CPU, I/O, and memory
const int ConcurrentStartsLimit = 5;
static readonly SemaphoreSlim MaxConcurrentStarts = new SemaphoreSlim(ConcurrentStartsLimit);

public Partition(
NetheriteOrchestrationService host,
Expand Down Expand Up @@ -93,9 +96,6 @@ public Partition(
EventTraceContext.Clear();

this.ErrorHandler = errorHandler;

this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, "");

errorHandler.Token.Register(() =>
{
this.TraceHelper.TracePartitionProgress("Terminated", ref this.LastTransition, this.CurrentTimeMs, "");
Expand All @@ -108,26 +108,36 @@ public Partition(
}

}, useSynchronizationContext: false);


// before we start the partition, we have to acquire the MaxConcurrentStarts semaphore
// (to prevent a host from being overwhelmed by the simultaneous start of too many partitions)
this.TraceHelper.TracePartitionProgress("Waiting", ref this.LastTransition, this.CurrentTimeMs, $"max={ConcurrentStartsLimit} available={MaxConcurrentStarts.CurrentCount}");
await MaxConcurrentStarts.WaitAsync();

// create or restore partition state from last snapshot
try
{
// create the state
this.State = ((TransportAbstraction.IHost) this.host).StorageLayer.CreatePartitionState(parameters);
this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, "");

// initialize timer for this partition
this.PendingTimers = new BatchTimer<PartitionEvent>(this.ErrorHandler.Token, this.TimersFired);
(long, int) inputQueuePosition;

// goes to storage to create or restore the partition state
var inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false);
await using (new PartitionTimeout(errorHandler, "partition startup", TimeSpan.FromMinutes(this.Settings.PartitionStartupTimeoutMinutes)))
{
// create or restore partition state from last snapshot
// create the state
this.State = ((TransportAbstraction.IHost)this.host).StorageLayer.CreatePartitionState(parameters);

// start processing the timers
this.PendingTimers.Start($"Timer{this.PartitionId:D2}");
// initialize timer for this partition
this.PendingTimers = new BatchTimer<PartitionEvent>(this.ErrorHandler.Token, this.TimersFired);

// start processing the worker queues
this.State.StartProcessing();
// goes to storage to create or restore the partition state
inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false);

// start processing the timers
this.PendingTimers.Start($"Timer{this.PartitionId:D2}");

// start processing the worker queues
this.State.StartProcessing();
}

this.TraceHelper.TracePartitionProgress("Started", ref this.LastTransition, this.CurrentTimeMs, $"nextInputQueuePosition={inputQueuePosition.Item1}.{inputQueuePosition.Item2}");
return inputQueuePosition;
Expand Down
59 changes: 59 additions & 0 deletions src/DurableTask.Netherite/Util/PartitionTimeout.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// A utility class for terminating the partition if some task takes too long.
/// Implemented as a disposable, with <see cref="IAsyncDisposable.DisposeAsync"> used to cancel the timeout.
/// </summary>
class PartitionTimeout : IAsyncDisposable
{
readonly CancellationTokenSource tokenSource;
readonly Task timeoutTask;

public PartitionTimeout(IPartitionErrorHandler errorHandler, string task, TimeSpan timeout)
{
this.tokenSource = new CancellationTokenSource();
this.timeoutTask = Task.Run(async () =>
{
try
{
await Task.Delay(timeout, this.tokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// we did not time out
return;
}

errorHandler.HandleError(
$"{nameof(PartitionTimeout)}",
$"{task} timed out after {timeout}",
e: null,
terminatePartition: true,
reportAsWarning: false);
});
}

public async ValueTask DisposeAsync()
{
// cancel the timeout task (if it has not already completed)
this.tokenSource.Cancel();

// wait for the timeouttask to complete here, so we can be sure that the
// decision about the timeout firing or not firing has been made
// before we leave this method
await this.timeoutTask.ConfigureAwait(false);

// we can dispose the token source now since the timeoutTask is completed
this.tokenSource.Dispose();
}
}
}

0 comments on commit fc777fe

Please sign in to comment.