From 5236e825df78ffdf629972e2434a0ceadd257732 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Wed, 22 Jun 2022 17:31:23 -0700 Subject: [PATCH 1/2] fix completePendingAsync mechanism and add memory stats to cache size measure event --- .../StorageProviders/Faster/FasterAlt.cs | 2 +- .../StorageProviders/Faster/FasterKV.cs | 4 ++-- .../StorageProviders/Faster/FasterTraceHelper.cs | 6 +++--- .../StorageProviders/Faster/MemoryTracker.cs | 2 ++ .../StorageProviders/Faster/StoreWorker.cs | 15 +++++++++++++-- .../StorageProviders/Faster/TrackedObjectStore.cs | 2 +- src/DurableTask.Netherite/Tracing/EtwSource.cs | 6 +++--- 7 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/FasterAlt.cs b/src/DurableTask.Netherite/StorageProviders/Faster/FasterAlt.cs index 50756b90..4326adae 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/FasterAlt.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/FasterAlt.cs @@ -140,7 +140,7 @@ public override bool CompletePending() return this.pendingLoads.Count == 0; } - public override ValueTask ReadyToCompletePendingAsync() + public override ValueTask ReadyToCompletePendingAsync(CancellationToken token) { if (this.pendingLoads.Count == 0) { diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs index 4291c856..38c70f74 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs @@ -234,9 +234,9 @@ public override bool CompletePending() } } - public override ValueTask ReadyToCompletePendingAsync() + public override ValueTask ReadyToCompletePendingAsync(CancellationToken token) { - return this.mainSession.ReadyToCompletePendingAsync(this.terminationToken); + return this.mainSession.ReadyToCompletePendingAsync(token); } public override bool TakeFullCheckpoint(long commitLogPosition, long inputQueuePosition, string inputQueueFingerprint, out Guid checkpointGuid) diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/FasterTraceHelper.cs b/src/DurableTask.Netherite/StorageProviders/Faster/FasterTraceHelper.cs index cda9f9ae..75c64f96 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/FasterTraceHelper.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/FasterTraceHelper.cs @@ -110,12 +110,12 @@ public void FasterStorageError(string context, Exception exception) } } - public void FasterCacheSizeMeasured(int numPages, long numRecords, long sizeInBytes, long discrepancy, double elapsedMs) + public void FasterCacheSizeMeasured(int numPages, long numRecords, long sizeInBytes, long gcMemory, long processMemory, long discrepancy, double elapsedMs) { if (this.logLevelLimit <= LogLevel.Information) { - this.logger.LogInformation("Part{partition:D2} Measured CacheSize numPages={numPages} numRecords={numRecords} sizeInBytes={sizeInBytes} discrepancy={discrepancy} elapsedMs={elapsedMs:F2}", this.partitionId, numPages, numRecords, sizeInBytes, discrepancy, elapsedMs); - EtwSource.Log.FasterCacheSizeMeasured(this.account, this.taskHub, this.partitionId, numPages, numRecords, sizeInBytes, discrepancy, elapsedMs, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.logger.LogInformation("Part{partition:D2} Measured CacheSize numPages={numPages} numRecords={numRecords} sizeInBytes={sizeInBytes} gcMemory={gcMemory} processMemory={processMemory} discrepancy={discrepancy} elapsedMs={elapsedMs:F2}", this.partitionId, numPages, numRecords, sizeInBytes, gcMemory, processMemory, discrepancy, elapsedMs); + EtwSource.Log.FasterCacheSizeMeasured(this.account, this.taskHub, this.partitionId, numPages, numRecords, sizeInBytes, gcMemory, processMemory, discrepancy, elapsedMs, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/MemoryTracker.cs b/src/DurableTask.Netherite/StorageProviders/Faster/MemoryTracker.cs index 05ce5602..b685f0f6 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/MemoryTracker.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/MemoryTracker.cs @@ -114,6 +114,8 @@ public void MeasureCacheSize(bool isFirstCall) numPages, numRecords, sizeInBytes: size, + gcMemory: GC.GetTotalMemory(false), + processMemory: System.Diagnostics.Process.GetCurrentProcess().PrivateMemorySize64, discrepancy: isFirstCall ? 0 : size - this.trackedObjectSize, stopwatch.Elapsed.TotalMilliseconds); diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs index e902e403..8e49afef 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs @@ -48,6 +48,8 @@ class StoreWorker : BatchWorker public static TimeSpan PublishInterval = TimeSpan.FromSeconds(8); public static TimeSpan PokePeriod = TimeSpan.FromSeconds(3); // allows storeworker to checkpoint and publish load even while idle + CancellationTokenSource ioCompletionNotificationCancellation; + public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHelper traceHelper, BlobManager blobManager, CancellationToken cancellationToken) : base($"{nameof(StoreWorker)}{partition.PartitionId:D2}", true, 500, cancellationToken, partition.TraceHelper) @@ -354,6 +356,14 @@ protected override async Task Process(IList batch) { bool markPartitionAsActive = false; + // no need to wait any longer for a notification, since we are running now + if (this.ioCompletionNotificationCancellation != null) + { + this.ioCompletionNotificationCancellation.Cancel(); + this.ioCompletionNotificationCancellation.Dispose(); + this.ioCompletionNotificationCancellation = null; + } + foreach (var partitionEvent in batch) { if (this.isShuttingDown || this.cancellationToken.IsCancellationRequested) @@ -485,7 +495,8 @@ protected override async Task Process(IList batch) if (!allRequestsCompleted) { - var _ = this.store.ReadyToCompletePendingAsync().AsTask().ContinueWith(x => this.Notify()); + this.ioCompletionNotificationCancellation = CancellationTokenSource.CreateLinkedTokenSource(this.cancellationToken); + var _ = this.store.ReadyToCompletePendingAsync(this.ioCompletionNotificationCancellation.Token).AsTask().ContinueWith(x => this.Notify()); } // during testing, this is a good time to check invariants in the store @@ -500,7 +511,7 @@ protected override async Task Process(IList batch) this.partition.ErrorHandler.HandleError("StoreWorker.Process", "Encountered exception while working on store", exception, true, false); } } - + public async Task<(long,long)> WaitForCheckpointAsync(bool isIndexCheckpoint, Guid checkpointToken, bool removeObsoleteCheckpoints) { var stopwatch = new System.Diagnostics.Stopwatch(); diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStore.cs b/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStore.cs index ce544cf3..7beb9e1a 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStore.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStore.cs @@ -19,7 +19,7 @@ abstract class TrackedObjectStore public abstract bool CompletePending(); - public abstract ValueTask ReadyToCompletePendingAsync(); + public abstract ValueTask ReadyToCompletePendingAsync(CancellationToken token); public abstract void AdjustCacheSize(); diff --git a/src/DurableTask.Netherite/Tracing/EtwSource.cs b/src/DurableTask.Netherite/Tracing/EtwSource.cs index 13805c7c..47ae6db0 100644 --- a/src/DurableTask.Netherite/Tracing/EtwSource.cs +++ b/src/DurableTask.Netherite/Tracing/EtwSource.cs @@ -315,11 +315,11 @@ public void FasterStorageError(string Account, string TaskHub, int PartitionId, this.WriteEvent(256, Account, TaskHub, PartitionId, Context, Details, AppName, ExtensionVersion); } - [Event(257, Level = EventLevel.Informational, Version = 1)] - public void FasterCacheSizeMeasured(string Account, string TaskHub, int PartitionId, int NumPages, long NumRecords, long SizeInBytes, long Discrepancy, double ElapsedMs, string AppName, string ExtensionVersion) + [Event(257, Level = EventLevel.Informational, Version = 3)] + public void FasterCacheSizeMeasured(string Account, string TaskHub, int PartitionId, int NumPages, long NumRecords, long SizeInBytes, long GcMemory, long ProcessMemory, long Discrepancy, double ElapsedMs, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(257, Account, TaskHub, PartitionId, NumPages, NumRecords, SizeInBytes, Discrepancy, ElapsedMs, AppName, ExtensionVersion); + this.WriteEvent(257, Account, TaskHub, PartitionId, NumPages, NumRecords, SizeInBytes, GcMemory, ProcessMemory, Discrepancy, ElapsedMs, AppName, ExtensionVersion); } [Event(259, Level = EventLevel.Verbose, Version = 1)] From f53e5f8fc7ccc567d4d51f81b6147f3e995432a1 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Tue, 12 Jul 2022 09:51:47 -0700 Subject: [PATCH 2/2] force garbage collection after compaction and checkpoints --- .../StorageProviders/Faster/StoreWorker.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs index 8e49afef..3a00c5ed 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs @@ -427,6 +427,9 @@ protected override async Task Process(IList batch) (this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition) = await this.pendingStoreCheckpoint; // observe exceptions here + // force collection of memory used during checkpointing + GC.Collect(); + // we have reached the end of the state machine transitions this.pendingStoreCheckpoint = null; this.pendingCheckpointTrigger = CheckpointTrigger.None; @@ -456,6 +459,9 @@ protected override async Task Process(IList batch) { await this.pendingCompaction; // observe exceptions here + // force collection of memory used during compaction + GC.Collect(); + // the index checkpoint is next var token = this.store.StartIndexCheckpoint(); if (token.HasValue)