Skip to content

Commit

Permalink
Terminate partition when FASTER refuses to checkpoint for over a minu…
Browse files Browse the repository at this point in the history
…te (#301)
  • Loading branch information
davidmrdavid authored Oct 19, 2023
1 parent 6effbf5 commit ba15eed
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelL
this.host = host;
}

public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning)
public void HandleError(string context, string message, Exception? exception, bool terminatePartition, bool isWarning)
{
bool isFatal = exception != null && Utils.IsFatal(exception);

Expand Down Expand Up @@ -94,7 +94,7 @@ public void TerminateNormally()
}
}

void TraceError(bool isWarning, string context, string message, Exception exception, bool terminatePartition)
void TraceError(bool isWarning, string context, string message, Exception? exception, bool terminatePartition)
{
var logLevel = isWarning ? LogLevel.Warning : LogLevel.Error;
if (this.logLevelLimit <= logLevel)
Expand Down
37 changes: 34 additions & 3 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,11 @@ long MinimalLogSize
public override async Task<long> RunCompactionAsync(long target)
{
string id = DateTime.UtcNow.ToString("O"); // for tracing purposes

this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is requesting to enter semaphore with {maxCompactionThreads.CurrentCount} threads available");
await maxCompactionThreads.WaitAsync();
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} entered semaphore");

try
{
long beginAddressBeforeCompaction = this.Log.BeginAddress;
Expand All @@ -499,20 +503,47 @@ public override async Task<long> RunCompactionAsync(long target)
target - this.Log.BeginAddress,
this.GetElapsedCompactionMilliseconds());

var tokenSource = new CancellationTokenSource();
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10), tokenSource.Token);
var tcs = new TaskCompletionSource<long>(TaskCreationOptions.RunContinuationsAsynchronously);
var thread = TrackedThreads.MakeTrackedThread(RunCompaction, $"Compaction.{id}");
thread.Start();

var winner = await Task.WhenAny(tcs.Task, timeoutTask);

if (winner == timeoutTask)
{
// compaction timed out. Terminate partition
var exceptionMessage = $"Compaction {id} time out";
this.partition.ErrorHandler.HandleError(nameof(RunCompactionAsync), exceptionMessage, e: null, terminatePartition: true, reportAsWarning: true);

// we need resolve the task to ensure the 'finally' block is executed which frees up another thread to start compating
tcs.TrySetException(new OperationCanceledException(exceptionMessage));
}
else
{
// cancel the timeout task since compaction completed
tokenSource.Cancel();
}

await timeoutTask.ContinueWith(_ => tokenSource.Dispose());

// return result of compaction task
return await tcs.Task;

void RunCompaction()
{
try
{

this.blobManager.TraceHelper.FasterProgress($"Compaction {id} started");

var session = this.CreateASession($"compaction-{id}", true);

this.blobManager.TraceHelper.FasterProgress($"Compaction {id} obtained a FASTER session");
using (this.TrackTemporarySession(session))
{
this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is invoking FASTER's compaction routine");
long compactedUntil = session.Compact(target, CompactionType.Scan);

this.TraceHelper.FasterCompactionProgress(
Expand All @@ -525,17 +556,17 @@ void RunCompaction()
this.Log.BeginAddress - beginAddressBeforeCompaction,
this.GetElapsedCompactionMilliseconds());

tcs.SetResult(compactedUntil);
tcs.TrySetResult(compactedUntil);
}
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
{
tcs.SetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken));
tcs.TrySetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken));
}
catch (Exception e)
{
tcs.SetException(e);
tcs.TrySetException(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ public void FasterProgress(string details)
}
}

public void FasterProgress(Func<string> constructString)
{
if (this.logLevelLimit <= LogLevel.Debug)
{
var details = constructString();
this.FasterProgress(details);
}
}

public void FasterStorageProgress(string details)
{
if (this.logLevelLimit <= LogLevel.Trace)
Expand Down
223 changes: 160 additions & 63 deletions src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class StoreWorker : BatchWorker<PartitionEvent>
readonly EffectTracker effectTracker;

bool isShuttingDown;
DateTime? timeOfFirstRefusedCheckpoint;

public string InputQueueFingerprint { get; private set; }
public (long,int) InputQueuePosition { get; private set; }
Expand All @@ -30,15 +31,15 @@ class StoreWorker : BatchWorker<PartitionEvent>

// periodic index and store checkpointing
CheckpointTrigger pendingCheckpointTrigger;
Task pendingIndexCheckpoint;
Task<(long, (long,int))> pendingStoreCheckpoint;
Task? pendingIndexCheckpoint;
Task<(long, (long,int))>? pendingStoreCheckpoint;
(long,int) lastCheckpointedInputQueuePosition;
long lastCheckpointedCommitLogPosition;
long numberEventsSinceLastCheckpoint;
DateTime timeOfNextIdleCheckpoint;

// periodic compaction
Task<long?> pendingCompaction;
Task<long?>? pendingCompaction;

// periodic load publishing
PartitionLoadInfo loadInfo;
Expand Down Expand Up @@ -281,6 +282,50 @@ internal enum CheckpointTrigger
Idle
}

void LogCheckpointStats()
{
long inputQueuePositionLag = this.GetInputQueuePositionLag();

// since this is a pure function, we declare it as local static for improved performance
static string ReportNullableTaskStatus(Task? t)
{
if (t == null)
{
return "is null";
}
else
{
return t.IsCompleted ? "is completed" : "is not completed";
}
};

// since the statistics log is only emitted if the trace level is at least "Debug",
// we defer the construction of the string to relieve GC pressure
string ConstructLogString()
{

var log = $"Checkpoint statistics: " +
$"LastCheckpointedCommitLogPosition={this.lastCheckpointedCommitLogPosition}, " +
$"MaxNumberBytesBetweenCheckpoints={this.partition.Settings.MaxNumberBytesBetweenCheckpoints}, " +
$"CommitLogPosition={this.CommitLogPosition}, " +
$"NumberEventsSinceLastCheckpoint={this.numberEventsSinceLastCheckpoint}, " +
$"MaxNumberEventsBetweenCheckpoints={this.partition.Settings.MaxNumberEventsBetweenCheckpoints}, " +
$"InputQueuePositionLag={inputQueuePositionLag}," +
$"TimeOfNextIdleCheckpoint={this.timeOfNextIdleCheckpoint}, " +
$"TimeOfFirstRefusedCheckpoint={this.timeOfFirstRefusedCheckpoint}, " +
$"PendingCompaction status={ReportNullableTaskStatus(this.pendingCompaction)}, " +
$"PendingIndexCheckpoint status={ReportNullableTaskStatus(this.pendingIndexCheckpoint)}, " +
$"PendingStoreCheckpoint status={ReportNullableTaskStatus(this.pendingStoreCheckpoint)}";
return log;
}
this.traceHelper.FasterProgress(ConstructLogString);
}

long GetInputQueuePositionLag()
{
return this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition);
}

bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil)
{
// in a test setting, let the test decide when to checkpoint or compact
Expand All @@ -292,8 +337,7 @@ bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil)
trigger = CheckpointTrigger.None;
compactUntil = null;

long inputQueuePositionLag =
this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition);
long inputQueuePositionLag = this.GetInputQueuePositionLag();

if (this.lastCheckpointedCommitLogPosition + this.partition.Settings.MaxNumberBytesBetweenCheckpoints <= this.CommitLogPosition)
{
Expand Down Expand Up @@ -349,7 +393,114 @@ void ScheduleNextIdleCheckpointTime()
var actual = (((earliest - offset - 1) / period) + 1) * period + offset;
this.timeOfNextIdleCheckpoint = new DateTime(actual, DateTimeKind.Utc);
}


void StartCheckpointOrFailOnTimeout(Func<bool> checkpointRoutine, string messageOnError)
{
var checkpointStarted = checkpointRoutine.Invoke();
if (checkpointStarted)
{
this.timeOfFirstRefusedCheckpoint = null;
}
else
{
// track start of FASTER refusal to start checkpoint
var currentTime = DateTime.UtcNow;
this.timeOfFirstRefusedCheckpoint ??= currentTime;

// if the refusal to checkpoint started over a minute ago, terminate partition
TimeSpan duration = currentTime - this.timeOfFirstRefusedCheckpoint.Value;
if (duration > TimeSpan.FromMinutes(1))
{
messageOnError += $". FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}. Terminating partition.";
this.partition.ErrorHandler.HandleError(nameof(StartCheckpointOrFailOnTimeout), messageOnError, e: null, terminatePartition: true, reportAsWarning: false);
}
}
}

async ValueTask RunCheckpointingStateMachine()
{
// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none)
if (this.pendingStoreCheckpoint != null)
{
if (this.pendingStoreCheckpoint.IsCompleted == true)
{
this.traceHelper.FasterProgress("Checkpointing state machine: pendingStorecheckpoint has completed.");
(this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition)
= await this.pendingStoreCheckpoint; // observe exceptions here

// force collection of memory used during checkpointing
GC.Collect();

this.traceHelper.FasterProgress("Checkpointing state machine: resetting to initial state");
// we have reached the end of the state machine transitions
this.pendingStoreCheckpoint = null;
this.pendingCheckpointTrigger = CheckpointTrigger.None;
this.ScheduleNextIdleCheckpointTime();
this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log);
}
}
else if (this.pendingIndexCheckpoint != null)
{
if (this.pendingIndexCheckpoint.IsCompleted == true)
{
this.traceHelper.FasterProgress("Checkpointing state machine: pendingIndexCheckpoint has completed");
await this.pendingIndexCheckpoint; // observe exceptions here

// the store checkpoint is next
this.StartCheckpointOrFailOnTimeout(
checkpointRoutine: () =>
{
var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null);
if (token.HasValue)
{
this.traceHelper.FasterProgress("Checkpointing state machine: store checkpoint started");
this.pendingIndexCheckpoint = null;
this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true);
this.numberEventsSinceLastCheckpoint = 0;
}
var checkpointStarted = token.HasValue;
return checkpointStarted;

},
messageOnError: "Could not start store checkpoint before timeout");
}
}
else if (this.pendingCompaction != null)
{
if (this.pendingCompaction.IsCompleted == true)
{
this.traceHelper.FasterProgress("Checkpointing state machine: pendingCompaction has completed");
await this.pendingCompaction; // observe exceptions here

// force collection of memory used during compaction
GC.Collect();

// the index checkpoint is next
this.StartCheckpointOrFailOnTimeout(
checkpointRoutine: () =>
{
var token = this.store.StartIndexCheckpoint();
if (token.HasValue)
{
this.traceHelper.FasterProgress("Checkpointing state machine: index checkpoint started");
this.pendingCompaction = null;
this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false);
}
var checkpointStarted = token.HasValue;
return checkpointStarted;
},
messageOnError: "Could not start index checkpoint before timeout");
}
}
else if (this.CheckpointDue(out var trigger, out long? compactUntil))
{
this.traceHelper.FasterProgress($"Checkpointing state machine: checkpoint is due. Trigger='{trigger}'. compactUntil='{compactUntil}'");
this.pendingCheckpointTrigger = trigger;

this.pendingCompaction = this.RunCompactionAsync(compactUntil);
}
}

protected override async Task Process(IList<PartitionEvent> batch)
{
try
Expand Down Expand Up @@ -420,68 +571,15 @@ protected override async Task Process(IList<PartitionEvent> batch)
this.store.AdjustCacheSize();

// handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none)
if (this.pendingStoreCheckpoint != null)
{
if (this.pendingStoreCheckpoint.IsCompleted == true)
{
(this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition)
= await this.pendingStoreCheckpoint; // observe exceptions here

// force collection of memory used during checkpointing
GC.Collect();

// we have reached the end of the state machine transitions
this.pendingStoreCheckpoint = null;
this.pendingCheckpointTrigger = CheckpointTrigger.None;
this.ScheduleNextIdleCheckpointTime();
this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log);
}
}
else if (this.pendingIndexCheckpoint != null)
{
if (this.pendingIndexCheckpoint.IsCompleted == true)
{
await this.pendingIndexCheckpoint; // observe exceptions here

// the store checkpoint is next
var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null);
if (token.HasValue)
{
this.pendingIndexCheckpoint = null;
this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true);
this.numberEventsSinceLastCheckpoint = 0;
}
}
}
else if (this.pendingCompaction != null)
{
if (this.pendingCompaction.IsCompleted == true)
{
await this.pendingCompaction; // observe exceptions here

// force collection of memory used during compaction
GC.Collect();

// the index checkpoint is next
var token = this.store.StartIndexCheckpoint();
if (token.HasValue)
{
this.pendingCompaction = null;
this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false);
}
}
}
else if (this.CheckpointDue(out var trigger, out long? compactUntil))
{
this.pendingCheckpointTrigger = trigger;
this.pendingCompaction = this.RunCompactionAsync(compactUntil);
}
await this.RunCheckpointingStateMachine();

// periodically publish the partition load information and the send/receive positions
// also report checkpointing stats
if (this.lastPublished + PublishInterval < DateTime.UtcNow)
{
this.lastPublished = DateTime.UtcNow;
await this.PublishLoadAndPositions();
this.LogCheckpointStats();
}

if (this.partition.NumberPartitions() > 1 && this.partition.Settings.ActivityScheduler == ActivitySchedulerOptions.Locavore)
Expand Down Expand Up @@ -558,7 +656,6 @@ protected override async Task Process(IList<PartitionEvent> batch)
if (target.HasValue)
{
target = await this.store.RunCompactionAsync(target.Value);

this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler);
}

Expand Down

0 comments on commit ba15eed

Please sign in to comment.