Skip to content

Commit

Permalink
Implement detection and potential mitigation of recovery failure cycl…
Browse files Browse the repository at this point in the history
…es (#435)

* implement detection of repeated recovery failures, and add triggers to boost tracing and disable prefetch during replay

* add comments as per PR feedback

* fix the disabling of prefetch, make more readable, improve tracing

* disable prefetch on every other attempt only, to make sure we are not permanently breaking anything
  • Loading branch information
sebastianburckhardt authored Oct 31, 2024
1 parent c9a0b73 commit 68fb3e7
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public int PartitionStartupTimeoutMinutes { get; set; } = 15;

/// <summary>
/// If true, disables the prefetching during replay.
/// </summary>
public bool DisablePrefetchDuringReplay { get; set; } = false;

/// <summary>
/// Allows attaching additional checkers and debuggers during testing.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
BlobUtilsV12.BlockBlobClients eventLogCommitBlob;
BlobLeaseClient leaseClient;

BlobUtilsV12.BlockBlobClients checkpointCompletedBlob;

BlobUtilsV12.BlobDirectory pageBlobPartitionDirectory;
BlobUtilsV12.BlobDirectory blockBlobPartitionDirectory;

Expand Down Expand Up @@ -419,6 +421,7 @@ public async Task StartAsync()

this.eventLogCommitBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(CommitBlobName);
this.leaseClient = this.eventLogCommitBlob.WithRetries.GetBlobLeaseClient();
this.checkpointCompletedBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(this.GetCheckpointCompletedBlobName());

AzureStorageDevice createDevice(string name) =>
new AzureStorageDevice(name, this.blockBlobPartitionDirectory.GetSubDirectory(name), this.pageBlobPartitionDirectory.GetSubDirectory(name), this, true);
Expand Down Expand Up @@ -1057,10 +1060,11 @@ IEnumerable<Guid> ICheckpointManager.GetLogCheckpointTokens()

internal async Task<bool> FindCheckpointsAsync(bool logIsEmpty)
{
BlobUtilsV12.BlockBlobClients checkpointCompletedBlob = default;
string jsonString = null;
DateTimeOffset lastModified = default;

try
{
string jsonString = null;

if (this.UseLocalFiles)
{
Expand All @@ -1076,24 +1080,24 @@ internal async Task<bool> FindCheckpointsAsync(bool logIsEmpty)
else
{
var partDir = this.blockBlobPartitionDirectory;
checkpointCompletedBlob = partDir.GetBlockBlobClient(this.GetCheckpointCompletedBlobName());

await this.PerformWithRetriesAsync(
semaphore: null,
requireLease: true,
"BlockBlobClient.DownloadContentAsync",
"FindCheckpointsAsync",
"",
checkpointCompletedBlob.Name,
this.checkpointCompletedBlob.Name,
1000,
true,
failIfReadonly: false,
async (numAttempts) =>
{
try
{
Azure.Response<BlobDownloadResult> downloadResult = await checkpointCompletedBlob.WithRetries.DownloadContentAsync();
Azure.Response<BlobDownloadResult> downloadResult = await this.checkpointCompletedBlob.WithRetries.DownloadContentAsync();
jsonString = downloadResult.Value.Content.ToString();
lastModified = downloadResult.Value.Details.LastModified;
this.CheckpointInfoETag = downloadResult.Value.Details.ETag;
return 1;
}
Expand All @@ -1105,22 +1109,65 @@ await this.PerformWithRetriesAsync(
});
}

if (jsonString == null)
{
return false;
}
else
{
// read the fields from the json to update the checkpoint info
JsonConvert.PopulateObject(jsonString, this.CheckpointInfo);
return true;
}
}
catch (Exception e)
{
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated);
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", this.checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated);
throw;
}

if (jsonString == null)
{
return false;
}

try
{
// read the fields from the json to update the checkpoint info
JsonConvert.PopulateObject(jsonString, this.CheckpointInfo);
}
catch (JsonException e)
{
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not parse json file describing last checkpoint", this.checkpointCompletedBlob.Name, e, true, false);
throw;
}

if (this.CheckpointInfo.RecoveryAttempts > 0 || DateTimeOffset.UtcNow - lastModified > TimeSpan.FromMinutes(5))
{
this.CheckpointInfo.RecoveryAttempts++;

this.TraceHelper.FasterProgress($"Incremented recovery attempt counter to {this.CheckpointInfo.RecoveryAttempts} in {this.checkpointCompletedBlob.Name}.");

await this.WriteCheckpointMetadataAsync();

// we start to boost the tracing after three failed attempts. This boosting applies to the recovery part only.
int StartBoostingAfter = 3;

// After some number of boosted attempts, we stop boosting since it seems unlikely that we will find new information.
int BoostFor = 10;

if (this.CheckpointInfo.RecoveryAttempts > StartBoostingAfter
&& this.CheckpointInfo.RecoveryAttempts <= StartBoostingAfter + BoostFor)
{
this.TraceHelper.BoostTracing = true;
}
}

return true;
}

public async Task ClearRecoveryAttempts()
{
if (this.CheckpointInfo.RecoveryAttempts > 0)
{
this.CheckpointInfo.RecoveryAttempts = 0;

this.TraceHelper.BoostTracing = false;

await this.WriteCheckpointMetadataAsync();

this.TraceHelper.FasterProgress($"Cleared recovery attempt counter in {this.checkpointCompletedBlob.Name}.");
}
}

void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
Expand Down Expand Up @@ -1436,7 +1483,7 @@ await this.PerformWithRetriesAsync(
}
}

internal async Task FinalizeCheckpointCompletedAsync()
internal async Task WriteCheckpointMetadataAsync()
{
var jsonText = JsonConvert.SerializeObject(this.CheckpointInfo, Formatting.Indented);
if (this.UseLocalFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,46 @@ namespace DurableTask.Netherite.Faster
[JsonObject]
class CheckpointInfo
{
/// <summary>
/// The FasterKV token for the last index checkpoint taken before this checkpoint.
/// </summary>
[JsonProperty]
public Guid IndexToken { get; set; }

/// <summary>
/// The FasterKV token for this checkpoint.
/// </summary>
[JsonProperty]
public Guid LogToken { get; set; }

/// <summary>
/// The FasterLog position for this checkpoint.
/// </summary>
[JsonProperty]
public long CommitLogPosition { get; set; }

/// <summary>
/// The input queue (event hubs) position for this checkpoint.
/// </summary>
[JsonProperty]
public long InputQueuePosition { get; set; }

/// <summary>
/// If the input queue position is a batch, the position within the batch.
/// </summary>
[JsonProperty]
public int InputQueueBatchPosition { get; set; }

/// <summary>
/// The input queue fingerprint for this checkpoint.
/// </summary>
[JsonProperty]
public string InputQueueFingerprint { get; set; }

[JsonProperty]
public long NumberInstances { get; set; }
/// <summary>
/// The number of recovery attempts that have been made for this checkpoint.
/// </summary>
//[JsonProperty]
public int RecoveryAttempts { get; set; }
}
}
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void RunTask() {

public override async Task FinalizeCheckpointCompletedAsync(Guid guid)
{
await this.blobManager.FinalizeCheckpointCompletedAsync();
await this.blobManager.WriteCheckpointMetadataAsync();

if (this.cacheDebugger == null)
{
Expand Down Expand Up @@ -739,7 +739,7 @@ public override async Task RunPrefetchSession(IAsyncEnumerable<TrackedObjectKey>
long lastReport = 0;
void ReportProgress(int elapsedMillisecondsThreshold)
{
if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold)
if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold || this.TraceHelper.BoostTracing)
{
this.blobManager.TraceHelper.FasterProgress(
$"FasterKV PrefetchSession {sessionId} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s issued={numberIssued} pending={maxConcurrency - prefetchSemaphore.CurrentCount} hits={numberHits} misses={numberMisses}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ public FasterTraceHelper(ILogger logger, LogLevel logLevelLimit, ILogger perform
this.partitionId = (int) partitionId;
}

public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace;
public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace || this.BoostTracing;

public bool BoostTracing { get; set; }

// ----- faster storage layer events

Expand Down Expand Up @@ -139,7 +141,7 @@ public void FasterProgress(Func<string> constructString)

public void FasterStorageProgress(string details)
{
if (this.logLevelLimit <= LogLevel.Trace)
if (this.logLevelLimit <= LogLevel.Trace || this.BoostTracing)
{
this.logger.LogTrace("Part{partition:D2} {details}", this.partitionId, details);
EtwSource.Log.FasterStorageProgress(this.account, this.taskHub, this.partitionId, details, TraceUtils.AppName, TraceUtils.ExtensionVersion);
Expand Down
28 changes: 21 additions & 7 deletions src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,24 @@ protected override async Task Process(IList<PartitionUpdateEvent> batch)
}
}

public async Task ReplayCommitLog(long from, StoreWorker worker)
public async Task ReplayCommitLog(long from, StoreWorker worker, bool enablePrefetch)
{
// this procedure is called by StoreWorker during recovery. It replays all the events
// that were committed to the log but are not reflected in the loaded store checkpoint.
try
{
// we create a pipeline where the fetch task obtains a stream of events and then duplicates the
// stream, so it can get replayed and prefetched in parallel.
var prefetchChannel = Channel.CreateBounded<TrackedObjectKey>(1000);
var prefetchChannel = enablePrefetch ? Channel.CreateBounded<TrackedObjectKey>(1000) : null;
var replayChannel = Channel.CreateBounded<PartitionUpdateEvent>(1000);

var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel.Writer);
var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel?.Writer);
var replayTask = Task.Run(() => this.ReplayEvents(replayChannel.Reader, worker));
var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken)));

if (enablePrefetch)
{
var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken)));
}

await fetchTask;
await replayTask;
Expand All @@ -241,23 +245,33 @@ async Task FetchEvents(long from, ChannelWriter<PartitionUpdateEvent> replayChan

await replayChannelWriter.WriteAsync(partitionEvent);

if (partitionEvent is IRequiresPrefetch evt)
if (prefetchChannelWriter != null && partitionEvent is IRequiresPrefetch evt)
{
foreach (var key in evt.KeysToPrefetch)
{
if (this.traceHelper.BoostTracing)
{
this.traceHelper.FasterProgress($"Replay Prefetches {key}");
}

await prefetchChannelWriter.WriteAsync(key);
}
}
}

replayChannelWriter.Complete();
prefetchChannelWriter.Complete();
prefetchChannelWriter?.Complete();
}

async Task ReplayEvents(ChannelReader<PartitionUpdateEvent> reader, StoreWorker worker)
{
await foreach (var partitionEvent in reader.ReadAllAsync(this.cancellationToken))
{
if (this.traceHelper.BoostTracing)
{
this.traceHelper.FasterProgress($"Replaying PartitionEvent {partitionEvent.NextCommitLogPosition}");
}

await worker.ReplayUpdate(partitionEvent);
}
}
Expand Down Expand Up @@ -285,7 +299,7 @@ async IAsyncEnumerable<PartitionUpdateEvent> EventsToReplay(long from)
await iter.WaitAsync(this.cancellationToken);
}

if (this.traceLogDetails)
if (this.traceHelper.IsTracingAtMostDetailedLevel)
{
this.TraceLogDetail("Read", iter.NextAddress, new ReadOnlySpan<byte>(result, 0, entryLength));
}
Expand Down
14 changes: 10 additions & 4 deletions src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ class PartitionStorage : IPartitionState
readonly ILogger logger;
readonly ILogger performanceLogger;
readonly MemoryTracker memoryTracker;
//readonly CloudStorageAccount storageAccount;
//readonly string localFileDirectory;
//readonly CloudStorageAccount pageBlobStorageAccount;

Partition partition;
BlobManager blobManager;
Expand Down Expand Up @@ -193,7 +190,14 @@ async Task TerminationWrapper(Task what)
if (this.log.TailAddress > (long)this.storeWorker.CommitLogPosition)
{
// replay log as the store checkpoint lags behind the log
await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker));

// after six unsuccessful attempts, we start disabling prefetch on every other attempt, to see if this can remedy the problem
int startDisablingPrefetchAfter = 6;

bool disablePrefetch = this.settings.DisablePrefetchDuringReplay
|| (this.blobManager.CheckpointInfo.RecoveryAttempts > startDisablingPrefetchAfter && (this.blobManager.CheckpointInfo.RecoveryAttempts - startDisablingPrefetchAfter) % 2 == 1);

await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker, prefetch: !disablePrefetch));
}
}
catch (OperationCanceledException) when (this.partition.ErrorHandler.IsTerminated)
Expand All @@ -215,6 +219,8 @@ async Task TerminationWrapper(Task what)
}

this.TraceHelper.FasterProgress("Recovery complete");

await this.blobManager.ClearRecoveryAttempts();
}
this.blobManager.FaultInjector?.Started(this.blobManager);
return this.storeWorker.InputQueuePosition;
Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -681,16 +681,16 @@ protected override async Task Process(IList<PartitionEvent> batch)
return target;
}

public async Task ReplayCommitLog(LogWorker logWorker)
public async Task ReplayCommitLog(LogWorker logWorker, bool prefetch)
{
var startPosition = this.CommitLogPosition;
this.traceHelper.FasterProgress($"Replaying log from {startPosition}");
this.traceHelper.FasterProgress($"Replaying log from {startPosition} prefetch={prefetch} boostTracing={this.traceHelper.BoostTracing}");

var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();

this.effectTracker.IsReplaying = true;
await logWorker.ReplayCommitLog(startPosition, this);
await logWorker.ReplayCommitLog(startPosition, this, prefetch);
stopwatch.Stop();
this.effectTracker.IsReplaying = false;

Expand Down

0 comments on commit 68fb3e7

Please sign in to comment.