Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new logs to recovery path #337

Merged
merged 5 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs
this.underLease,
"BlobBaseClient.DeleteAsync",
"DeleteDeviceSegment",
"",
$"id={id}",
entry.PageBlob.Default.Name,
5000,
true,
Expand Down Expand Up @@ -448,13 +448,14 @@ async Task WritePortionToBlobAsync(UnmanagedMemoryStream stream, BlobEntry blobE
{
using (stream)
{
var position = destinationAddress + offset;
long originalStreamPosition = stream.Position;
await this.BlobManager.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageWriteMaxConcurrency,
true,
"PageBlobClient.UploadPagesAsync",
"WriteToDevice",
$"id={id} length={length} destinationAddress={destinationAddress + offset}",
$"id={id} position={position} length={length}",
blobEntry.PageBlob.Default.Name,
1000 + (int)length / 1000,
true,
Expand Down Expand Up @@ -498,19 +499,23 @@ unsafe Task ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sour

async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id)
{
long readRangeStart = sourceAddress;
long readRangeEnd = readRangeStart + readLength;
string operationReadRange = $"[{readRangeStart}, {readRangeEnd}]";
using (stream)
{
long offset = 0;
while (readLength > 0)
{
var position = sourceAddress + offset;
var length = Math.Min(readLength, MAX_DOWNLOAD_SIZE);

await this.BlobManager.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageReadMaxConcurrency,
true,
"PageBlobClient.DownloadStreamingAsync",
"ReadFromDevice",
$"id={id} readLength={length} sourceAddress={sourceAddress + offset}",
$"id={id} position={position} length={length} operationReadRange={operationReadRange}",
blob.Default.Name,
1000 + (int)length / 1000,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,23 +1115,23 @@ void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMeta
var metaFileBlob = partDir.GetBlockBlobClient(this.GetIndexCheckpointMetaBlobName(indexToken));

this.PerformWithRetries(
false,
"BlockBlobClient.OpenWrite",
"WriteIndexCheckpointMetadata",
$"token={indexToken} size={commitMetadata.Length}",
metaFileBlob.Name,
1000,
true,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
using var blobStream = client.OpenWrite(overwrite: true);
using var writer = new BinaryWriter(blobStream);
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
writer.Flush();
return (commitMetadata.Length, true);
});
false,
"BlockBlobClient.OpenWrite",
"WriteIndexCheckpointMetadata",
$"token={indexToken} size={commitMetadata.Length}",
metaFileBlob.Name,
1000,
true,
(numAttempts) =>
{
var client = metaFileBlob.WithRetries;
using var blobStream = client.OpenWrite(overwrite: true);
using var writer = new BinaryWriter(blobStream);
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
writer.Flush();
return (commitMetadata.Length, true);
});

this.CheckpointInfo.IndexToken = indexToken;
this.StorageTracer?.FasterStorageProgress($"StorageOpReturned ICheckpointManager.CommitIndexCheckpoint, target={metaFileBlob.Name}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task PerformWithRetriesAsync(
bool requireLease,
string name,
string intent,
string data,
string details,
string target,
int expectedLatencyBound,
bool isCritical,
Expand Down Expand Up @@ -59,7 +59,7 @@ public async Task PerformWithRetriesAsync(

this.PartitionErrorHandler.Token.ThrowIfCancellationRequested();

this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {data}");
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {details}");

stopwatch.Restart();

Expand All @@ -68,14 +68,14 @@ public async Task PerformWithRetriesAsync(
long size = await operationAsync(numAttempts).ConfigureAwait(false);

stopwatch.Stop();
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {data}");
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {details}");

if (stopwatch.ElapsedMilliseconds > expectedLatencyBound)
{
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {data}");
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {details}");
}

this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);
this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, details, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);

return;
}
Expand All @@ -91,7 +91,7 @@ public async Task PerformWithRetriesAsync(

if (BlobUtils.IsTimeout(e))
{
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {data}");
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {details}");
}
else
{
Expand All @@ -103,7 +103,7 @@ public async Task PerformWithRetriesAsync(
}
catch (Azure.RequestFailedException ex) when (BlobUtilsV12.PreconditionFailed(ex) && readETagAsync != null)
{
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) failed precondition on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {data}");
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) failed precondition on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} {details}");
mustReadETagFirst = true;
continue;
}
Expand Down Expand Up @@ -134,7 +134,7 @@ public void PerformWithRetries(
bool requireLease,
string name,
string intent,
string data,
string details,
string target,
int expectedLatencyBound,
bool isCritical,
Expand All @@ -156,7 +156,7 @@ public void PerformWithRetries(

this.PartitionErrorHandler.Token.ThrowIfCancellationRequested();

this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {data}");
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) started attempt {numAttempts}; target={target} {details}");
stopwatch.Restart();

this.FaultInjector?.StorageAccess(this, name, intent, target);
Expand All @@ -169,13 +169,13 @@ public void PerformWithRetries(
}

stopwatch.Stop();
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} size={size} {data} ");
this.StorageTracer?.FasterStorageProgress($"storage operation {name} ({intent}) succeeded on attempt {numAttempts}; target={target} latencyMs={stopwatch.Elapsed.TotalMilliseconds:F1} size={size} {details} ");

this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);
this.TraceHelper.FasterAzureStorageAccessCompleted(intent, size, name, details, target, stopwatch.Elapsed.TotalMilliseconds, numAttempts);

if (stopwatch.ElapsedMilliseconds > expectedLatencyBound)
{
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {data}");
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) took {stopwatch.Elapsed.TotalSeconds:F1}s on attempt {numAttempts}, which is excessive; {details}");
}

return;
Expand All @@ -191,7 +191,7 @@ public void PerformWithRetries(
stopwatch.Stop();
if (BlobUtils.IsTimeout(e))
{
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {data}");
this.TraceHelper.FasterPerfWarning($"storage operation {name} ({intent}) timed out on attempt {numAttempts} after {stopwatch.Elapsed.TotalSeconds:F1}s, retrying now; target={target} {details}");
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ public override Task<bool> FindCheckpointAsync(bool logIsEmpty)
}

// recover Faster
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV");
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV - Entering fht.RecoverAsync");
await this.fht.RecoverAsync(this.partition.Settings.FasterTuningParameters?.NumPagesToPreload ?? 1, true, -1, this.terminationToken);
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV - Returned from fht.RecoverAsync");
this.mainSession = this.CreateASession($"main-{this.blobManager.IncarnationTimestamp:o}", false);
for (int i = 0; i < this.querySessions.Length; i++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ public void FasterStorageProgress(string details)
}
}

public void FasterAzureStorageAccessCompleted(string intent, long size, string operation, string target, double latency, int attempt)
public void FasterAzureStorageAccessCompleted(string intent, long size, string operation, string details, string target, double latency, int attempt)
{
if (this.logLevelLimit <= LogLevel.Debug)
{
this.logger.LogDebug("Part{partition:D2} storage access completed intent={intent} size={size} operation={operation} target={target} latency={latency} attempt={attempt}",
this.partitionId, intent, size, operation, target, latency, attempt);
EtwSource.Log.FasterAzureStorageAccessCompleted(this.account, this.taskHub, this.partitionId, intent, size, operation, target, latency, attempt, TraceUtils.AppName, TraceUtils.ExtensionVersion);
this.logger.LogDebug("Part{partition:D2} storage access completed intent={intent} size={size} operation={operation} {details} target={target} latency={latency} attempt={attempt}",
this.partitionId, intent, size, operation, details, target, latency, attempt);
EtwSource.Log.FasterAzureStorageAccessCompleted(this.account, this.taskHub, this.partitionId, intent, size, operation, details, target, latency, attempt, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.Netherite/Tracing/EtwSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,11 @@ public void FasterStorageProgress(string Account, string TaskHub, int PartitionI
this.WriteEvent(265, Account, TaskHub, PartitionId, Details, AppName, ExtensionVersion);
}

[Event(266, Level = EventLevel.Verbose, Version = 1)]
public void FasterAzureStorageAccessCompleted(string Account, string TaskHub, int PartitionId, string Intent, long Size, string Operation, string Target, double Latency, int Attempt, string AppName, string ExtensionVersion)
[Event(266, Level = EventLevel.Verbose, Version = 3)]
public void FasterAzureStorageAccessCompleted(string Account, string TaskHub, int PartitionId, string Intent,long Size, string Operation, string Details, string Target, double Latency, int Attempt, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(266, Account, TaskHub, PartitionId, Intent, Size, Operation, Target, Latency, Attempt, AppName, ExtensionVersion);
this.WriteEvent(266, Account, TaskHub, PartitionId, Intent, Size, Operation, Details, Target, Latency, Attempt, AppName, ExtensionVersion);
}

[Event(267, Level = EventLevel.Warning, Version = 1)]
Expand Down