Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix io notification #179

Merged
merged 2 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public override bool CompletePending()
return this.pendingLoads.Count == 0;
}

public override ValueTask ReadyToCompletePendingAsync()
public override ValueTask ReadyToCompletePendingAsync(CancellationToken token)
{
if (this.pendingLoads.Count == 0)
{
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ public override bool CompletePending()
}
}

public override ValueTask ReadyToCompletePendingAsync()
public override ValueTask ReadyToCompletePendingAsync(CancellationToken token)
{
return this.mainSession.ReadyToCompletePendingAsync(this.terminationToken);
return this.mainSession.ReadyToCompletePendingAsync(token);
}

public override bool TakeFullCheckpoint(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint, out Guid checkpointGuid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public void FasterStorageError(string context, Exception exception)
}
}

public void FasterCacheSizeMeasured(int numPages, long numRecords, long sizeInBytes, long discrepancy, double elapsedMs)
public void FasterCacheSizeMeasured(int numPages, long numRecords, long sizeInBytes, long gcMemory, long processMemory, long discrepancy, double elapsedMs)
{
if (this.logLevelLimit <= LogLevel.Information)
{
this.logger.LogInformation("Part{partition:D2} Measured CacheSize numPages={numPages} numRecords={numRecords} sizeInBytes={sizeInBytes} discrepancy={discrepancy} elapsedMs={elapsedMs:F2}", this.partitionId, numPages, numRecords, sizeInBytes, discrepancy, elapsedMs);
EtwSource.Log.FasterCacheSizeMeasured(this.account, this.taskHub, this.partitionId, numPages, numRecords, sizeInBytes, discrepancy, elapsedMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
this.logger.LogInformation("Part{partition:D2} Measured CacheSize numPages={numPages} numRecords={numRecords} sizeInBytes={sizeInBytes} gcMemory={gcMemory} processMemory={processMemory} discrepancy={discrepancy} elapsedMs={elapsedMs:F2}", this.partitionId, numPages, numRecords, sizeInBytes, gcMemory, processMemory, discrepancy, elapsedMs);
EtwSource.Log.FasterCacheSizeMeasured(this.account, this.taskHub, this.partitionId, numPages, numRecords, sizeInBytes, gcMemory, processMemory, discrepancy, elapsedMs, TraceUtils.AppName, TraceUtils.ExtensionVersion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public void MeasureCacheSize(bool isFirstCall)
numPages,
numRecords,
sizeInBytes: size,
gcMemory: GC.GetTotalMemory(false),
processMemory: System.Diagnostics.Process.GetCurrentProcess().PrivateMemorySize64,
discrepancy: isFirstCall ? 0 : size - this.trackedObjectSize,
stopwatch.Elapsed.TotalMilliseconds);

Expand Down
21 changes: 19 additions & 2 deletions src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class StoreWorker : BatchWorker<PartitionEvent>
public static TimeSpan PublishInterval = TimeSpan.FromSeconds(8);
public static TimeSpan PokePeriod = TimeSpan.FromSeconds(3); // allows storeworker to checkpoint and publish load even while idle

CancellationTokenSource ioCompletionNotificationCancellation;


public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHelper traceHelper, BlobManager blobManager, CancellationToken cancellationToken)
: base($"{nameof(StoreWorker)}{partition.PartitionId:D2}", true, 500, cancellationToken, partition.TraceHelper)
Expand Down Expand Up @@ -354,6 +356,14 @@ protected override async Task Process(IList<PartitionEvent> batch)
{
bool markPartitionAsActive = false;

// no need to wait any longer for a notification, since we are running now
if (this.ioCompletionNotificationCancellation != null)
{
this.ioCompletionNotificationCancellation.Cancel();
this.ioCompletionNotificationCancellation.Dispose();
this.ioCompletionNotificationCancellation = null;
}

foreach (var partitionEvent in batch)
{
if (this.isShuttingDown || this.cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -417,6 +427,9 @@ protected override async Task Process(IList<PartitionEvent> batch)
(this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition)
= await this.pendingStoreCheckpoint; // observe exceptions here

// force collection of memory used during checkpointing
GC.Collect();

// we have reached the end of the state machine transitions
this.pendingStoreCheckpoint = null;
this.pendingCheckpointTrigger = CheckpointTrigger.None;
Expand Down Expand Up @@ -446,6 +459,9 @@ protected override async Task Process(IList<PartitionEvent> batch)
{
await this.pendingCompaction; // observe exceptions here

// force collection of memory used during compaction
GC.Collect();

// the index checkpoint is next
var token = this.store.StartIndexCheckpoint();
if (token.HasValue)
Expand Down Expand Up @@ -485,7 +501,8 @@ protected override async Task Process(IList<PartitionEvent> batch)

if (!allRequestsCompleted)
{
var _ = this.store.ReadyToCompletePendingAsync().AsTask().ContinueWith(x => this.Notify());
this.ioCompletionNotificationCancellation = CancellationTokenSource.CreateLinkedTokenSource(this.cancellationToken);
var _ = this.store.ReadyToCompletePendingAsync(this.ioCompletionNotificationCancellation.Token).AsTask().ContinueWith(x => this.Notify());
}

// during testing, this is a good time to check invariants in the store
Expand All @@ -500,7 +517,7 @@ protected override async Task Process(IList<PartitionEvent> batch)
this.partition.ErrorHandler.HandleError("StoreWorker.Process", "Encountered exception while working on store", exception, true, false);
}
}

public async Task<(long,long)> WaitForCheckpointAsync(bool isIndexCheckpoint, Guid checkpointToken, bool removeObsoleteCheckpoints)
{
var stopwatch = new System.Diagnostics.Stopwatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ abstract class TrackedObjectStore

public abstract bool CompletePending();

public abstract ValueTask ReadyToCompletePendingAsync();
public abstract ValueTask ReadyToCompletePendingAsync(CancellationToken token);

public abstract void AdjustCacheSize();

Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.Netherite/Tracing/EtwSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,11 @@ public void FasterStorageError(string Account, string TaskHub, int PartitionId,
this.WriteEvent(256, Account, TaskHub, PartitionId, Context, Details, AppName, ExtensionVersion);
}

[Event(257, Level = EventLevel.Informational, Version = 1)]
public void FasterCacheSizeMeasured(string Account, string TaskHub, int PartitionId, int NumPages, long NumRecords, long SizeInBytes, long Discrepancy, double ElapsedMs, string AppName, string ExtensionVersion)
[Event(257, Level = EventLevel.Informational, Version = 3)]
public void FasterCacheSizeMeasured(string Account, string TaskHub, int PartitionId, int NumPages, long NumRecords, long SizeInBytes, long GcMemory, long ProcessMemory, long Discrepancy, double ElapsedMs, string AppName, string ExtensionVersion)
{
SetCurrentThreadActivityId(serviceInstanceId);
this.WriteEvent(257, Account, TaskHub, PartitionId, NumPages, NumRecords, SizeInBytes, Discrepancy, ElapsedMs, AppName, ExtensionVersion);
this.WriteEvent(257, Account, TaskHub, PartitionId, NumPages, NumRecords, SizeInBytes, GcMemory, ProcessMemory, Discrepancy, ElapsedMs, AppName, ExtensionVersion);
}

[Event(259, Level = EventLevel.Verbose, Version = 1)]
Expand Down