From d9550b7780576ddc423c441f35fb2855f7558106 Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Thu, 31 Aug 2023 12:03:02 -0700 Subject: [PATCH 1/5] Update GH automation (#303) --- .github/policies/resourceManagement.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/policies/resourceManagement.yml b/.github/policies/resourceManagement.yml index d23530fc..30ea8de2 100644 --- a/.github/policies/resourceManagement.yml +++ b/.github/policies/resourceManagement.yml @@ -57,6 +57,16 @@ configuration: reply: This issue has been marked as duplicate and has not had any activity for **1 day**. It will be closed for housekeeping purposes. - closeIssue eventResponderTasks: + - if: + - payloadType: Issues + - and: + - isOpen + - not: + and: + - isLabeled + then: + - addLabel: + label: "Needs: Triage :mag:" - if: - payloadType: Issue_Comment - isAction: From 3a2d193e57d528d1664005781e2cce993b2794a2 Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Tue, 10 Oct 2023 10:38:53 -0700 Subject: [PATCH 2/5] initial commit (#290) --- .../DurableTask.Netherite.AzureFunctions.csproj | 4 ++-- .../NetheriteProvider.cs | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index 444b9278..b7b9eb6a 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -26,9 +26,9 @@ 1 4 - 0 + 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) - + private $(MajorVersion).0.0.0 .$(GITHUB_RUN_NUMBER) $(VersionPrefix)$(BuildSuffix) diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs index 6f8f491e..1d3f64c6 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs @@ -145,7 +145,12 @@ class ScaleMonitor : IScaleMonitor public ScaleMonitor(ScalingMonitor scalingMonitor) { this.scalingMonitor = scalingMonitor; - this.descriptor = new ScaleMonitorDescriptor($"DurableTaskTrigger-Netherite-{this.scalingMonitor.TaskHubName}".ToLower()); + + // appending random GUID to end of descriptor to keep scale monitor keys unique + string guid = Guid.NewGuid().ToString("N"); + + var descriptorId = $"DurableTaskTrigger-Netherite-{this.scalingMonitor.TaskHubName}-{guid}".ToLower(); + this.descriptor = new ScaleMonitorDescriptor(descriptorId); } public ScaleMonitorDescriptor Descriptor => this.descriptor; From eacb42fb0ba5dcb47cda2a938874d35f8f863020 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Wed, 11 Oct 2023 10:58:20 -0700 Subject: [PATCH 3/5] Revert "initial commit (#290)" (#314) This reverts commit 3a2d193e57d528d1664005781e2cce993b2794a2. --- .../DurableTask.Netherite.AzureFunctions.csproj | 4 ++-- .../NetheriteProvider.cs | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index b7b9eb6a..444b9278 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -26,9 +26,9 @@ 1 4 - 1 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) - private + $(MajorVersion).0.0.0 .$(GITHUB_RUN_NUMBER) $(VersionPrefix)$(BuildSuffix) diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs index 1d3f64c6..6f8f491e 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs @@ -145,12 +145,7 @@ class ScaleMonitor : IScaleMonitor public ScaleMonitor(ScalingMonitor scalingMonitor) { this.scalingMonitor = scalingMonitor; - - // appending random GUID to end of descriptor to keep scale monitor keys unique - string guid = Guid.NewGuid().ToString("N"); - - var descriptorId = $"DurableTaskTrigger-Netherite-{this.scalingMonitor.TaskHubName}-{guid}".ToLower(); - this.descriptor = new ScaleMonitorDescriptor(descriptorId); + this.descriptor = new ScaleMonitorDescriptor($"DurableTaskTrigger-Netherite-{this.scalingMonitor.TaskHubName}".ToLower()); } public ScaleMonitorDescriptor Descriptor => this.descriptor; From 6effbf572c0a5b4290c159805e720b42051367b3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 18 Oct 2023 14:46:37 -0700 Subject: [PATCH 4/5] Bump Azure.Identity from 1.7.0 to 1.10.2 in /samples/TokenCredentialDTFx (#323) Bumps [Azure.Identity](https://github.com/Azure/azure-sdk-for-net) from 1.7.0 to 1.10.2. - [Release notes](https://github.com/Azure/azure-sdk-for-net/releases) - [Commits](https://github.com/Azure/azure-sdk-for-net/compare/Azure.Identity_1.7.0...Azure.Identity_1.10.2) --- updated-dependencies: - dependency-name: Azure.Identity dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- samples/TokenCredentialDTFx/TokenCredentialDTFx.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/TokenCredentialDTFx/TokenCredentialDTFx.csproj b/samples/TokenCredentialDTFx/TokenCredentialDTFx.csproj index a0b22c1b..75a07470 100644 --- a/samples/TokenCredentialDTFx/TokenCredentialDTFx.csproj +++ b/samples/TokenCredentialDTFx/TokenCredentialDTFx.csproj @@ -1,4 +1,4 @@ - + Exe @@ -8,7 +8,7 @@ - + From ba15eed262d72741bb52b45fb8a71137192c24b6 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 19 Oct 2023 13:55:54 -0700 Subject: [PATCH 5/5] Terminate partition when FASTER refuses to checkpoint for over a minute (#301) --- .../PartitionErrorHandler.cs | 4 +- .../StorageLayer/Faster/FasterKV.cs | 37 ++- .../StorageLayer/Faster/FasterTraceHelper.cs | 9 + .../StorageLayer/Faster/StoreWorker.cs | 223 +++++++++++++----- 4 files changed, 205 insertions(+), 68 deletions(-) diff --git a/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs b/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs index 7d5a74de..85380ff3 100644 --- a/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs +++ b/src/DurableTask.Netherite/OrchestrationService/PartitionErrorHandler.cs @@ -61,7 +61,7 @@ public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelL this.host = host; } - public void HandleError(string context, string message, Exception exception, bool terminatePartition, bool isWarning) + public void HandleError(string context, string message, Exception? exception, bool terminatePartition, bool isWarning) { bool isFatal = exception != null && Utils.IsFatal(exception); @@ -94,7 +94,7 @@ public void TerminateNormally() } } - void TraceError(bool isWarning, string context, string message, Exception exception, bool terminatePartition) + void TraceError(bool isWarning, string context, string message, Exception? exception, bool terminatePartition) { var logLevel = isWarning ? LogLevel.Warning : LogLevel.Error; if (this.logLevelLimit <= logLevel) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs index 65e607da..e9abcc14 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs @@ -484,7 +484,11 @@ long MinimalLogSize public override async Task RunCompactionAsync(long target) { string id = DateTime.UtcNow.ToString("O"); // for tracing purposes + + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is requesting to enter semaphore with {maxCompactionThreads.CurrentCount} threads available"); await maxCompactionThreads.WaitAsync(); + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} entered semaphore"); + try { long beginAddressBeforeCompaction = this.Log.BeginAddress; @@ -499,20 +503,47 @@ public override async Task RunCompactionAsync(long target) target - this.Log.BeginAddress, this.GetElapsedCompactionMilliseconds()); + var tokenSource = new CancellationTokenSource(); + var timeoutTask = Task.Delay(TimeSpan.FromMinutes(10), tokenSource.Token); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var thread = TrackedThreads.MakeTrackedThread(RunCompaction, $"Compaction.{id}"); thread.Start(); + + var winner = await Task.WhenAny(tcs.Task, timeoutTask); + + if (winner == timeoutTask) + { + // compaction timed out. Terminate partition + var exceptionMessage = $"Compaction {id} time out"; + this.partition.ErrorHandler.HandleError(nameof(RunCompactionAsync), exceptionMessage, e: null, terminatePartition: true, reportAsWarning: true); + + // we need resolve the task to ensure the 'finally' block is executed which frees up another thread to start compating + tcs.TrySetException(new OperationCanceledException(exceptionMessage)); + } + else + { + // cancel the timeout task since compaction completed + tokenSource.Cancel(); + } + + await timeoutTask.ContinueWith(_ => tokenSource.Dispose()); + + // return result of compaction task return await tcs.Task; void RunCompaction() { try { + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} started"); var session = this.CreateASession($"compaction-{id}", true); + + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} obtained a FASTER session"); using (this.TrackTemporarySession(session)) { + this.blobManager.TraceHelper.FasterProgress($"Compaction {id} is invoking FASTER's compaction routine"); long compactedUntil = session.Compact(target, CompactionType.Scan); this.TraceHelper.FasterCompactionProgress( @@ -525,17 +556,17 @@ void RunCompaction() this.Log.BeginAddress - beginAddressBeforeCompaction, this.GetElapsedCompactionMilliseconds()); - tcs.SetResult(compactedUntil); + tcs.TrySetResult(compactedUntil); } } catch (Exception exception) when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception)) { - tcs.SetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken)); + tcs.TrySetException(new OperationCanceledException("Partition was terminated.", exception, this.terminationToken)); } catch (Exception e) { - tcs.SetException(e); + tcs.TrySetException(e); } } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs index b83f34d8..63d0bb86 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterTraceHelper.cs @@ -128,6 +128,15 @@ public void FasterProgress(string details) } } + public void FasterProgress(Func constructString) + { + if (this.logLevelLimit <= LogLevel.Debug) + { + var details = constructString(); + this.FasterProgress(details); + } + } + public void FasterStorageProgress(string details) { if (this.logLevelLimit <= LogLevel.Trace) diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs index d608d4ab..116c6245 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs @@ -21,6 +21,7 @@ class StoreWorker : BatchWorker readonly EffectTracker effectTracker; bool isShuttingDown; + DateTime? timeOfFirstRefusedCheckpoint; public string InputQueueFingerprint { get; private set; } public (long,int) InputQueuePosition { get; private set; } @@ -30,15 +31,15 @@ class StoreWorker : BatchWorker // periodic index and store checkpointing CheckpointTrigger pendingCheckpointTrigger; - Task pendingIndexCheckpoint; - Task<(long, (long,int))> pendingStoreCheckpoint; + Task? pendingIndexCheckpoint; + Task<(long, (long,int))>? pendingStoreCheckpoint; (long,int) lastCheckpointedInputQueuePosition; long lastCheckpointedCommitLogPosition; long numberEventsSinceLastCheckpoint; DateTime timeOfNextIdleCheckpoint; // periodic compaction - Task pendingCompaction; + Task? pendingCompaction; // periodic load publishing PartitionLoadInfo loadInfo; @@ -281,6 +282,50 @@ internal enum CheckpointTrigger Idle } + void LogCheckpointStats() + { + long inputQueuePositionLag = this.GetInputQueuePositionLag(); + + // since this is a pure function, we declare it as local static for improved performance + static string ReportNullableTaskStatus(Task? t) + { + if (t == null) + { + return "is null"; + } + else + { + return t.IsCompleted ? "is completed" : "is not completed"; + } + }; + + // since the statistics log is only emitted if the trace level is at least "Debug", + // we defer the construction of the string to relieve GC pressure + string ConstructLogString() + { + + var log = $"Checkpoint statistics: " + + $"LastCheckpointedCommitLogPosition={this.lastCheckpointedCommitLogPosition}, " + + $"MaxNumberBytesBetweenCheckpoints={this.partition.Settings.MaxNumberBytesBetweenCheckpoints}, " + + $"CommitLogPosition={this.CommitLogPosition}, " + + $"NumberEventsSinceLastCheckpoint={this.numberEventsSinceLastCheckpoint}, " + + $"MaxNumberEventsBetweenCheckpoints={this.partition.Settings.MaxNumberEventsBetweenCheckpoints}, " + + $"InputQueuePositionLag={inputQueuePositionLag}," + + $"TimeOfNextIdleCheckpoint={this.timeOfNextIdleCheckpoint}, " + + $"TimeOfFirstRefusedCheckpoint={this.timeOfFirstRefusedCheckpoint}, " + + $"PendingCompaction status={ReportNullableTaskStatus(this.pendingCompaction)}, " + + $"PendingIndexCheckpoint status={ReportNullableTaskStatus(this.pendingIndexCheckpoint)}, " + + $"PendingStoreCheckpoint status={ReportNullableTaskStatus(this.pendingStoreCheckpoint)}"; + return log; + } + this.traceHelper.FasterProgress(ConstructLogString); + } + + long GetInputQueuePositionLag() + { + return this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition); + } + bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil) { // in a test setting, let the test decide when to checkpoint or compact @@ -292,8 +337,7 @@ bool CheckpointDue(out CheckpointTrigger trigger, out long? compactUntil) trigger = CheckpointTrigger.None; compactUntil = null; - long inputQueuePositionLag = - this.InputQueuePosition.Item1 - Math.Max(this.lastCheckpointedInputQueuePosition.Item1, this.LogWorker.LastCommittedInputQueuePosition); + long inputQueuePositionLag = this.GetInputQueuePositionLag(); if (this.lastCheckpointedCommitLogPosition + this.partition.Settings.MaxNumberBytesBetweenCheckpoints <= this.CommitLogPosition) { @@ -349,7 +393,114 @@ void ScheduleNextIdleCheckpointTime() var actual = (((earliest - offset - 1) / period) + 1) * period + offset; this.timeOfNextIdleCheckpoint = new DateTime(actual, DateTimeKind.Utc); } - + + void StartCheckpointOrFailOnTimeout(Func checkpointRoutine, string messageOnError) + { + var checkpointStarted = checkpointRoutine.Invoke(); + if (checkpointStarted) + { + this.timeOfFirstRefusedCheckpoint = null; + } + else + { + // track start of FASTER refusal to start checkpoint + var currentTime = DateTime.UtcNow; + this.timeOfFirstRefusedCheckpoint ??= currentTime; + + // if the refusal to checkpoint started over a minute ago, terminate partition + TimeSpan duration = currentTime - this.timeOfFirstRefusedCheckpoint.Value; + if (duration > TimeSpan.FromMinutes(1)) + { + messageOnError += $". FASTER first refused to checkpoint at '{this.timeOfFirstRefusedCheckpoint}'. Duration of refusal = {duration}. Terminating partition."; + this.partition.ErrorHandler.HandleError(nameof(StartCheckpointOrFailOnTimeout), messageOnError, e: null, terminatePartition: true, reportAsWarning: false); + } + } + } + + async ValueTask RunCheckpointingStateMachine() + { + // handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none) + if (this.pendingStoreCheckpoint != null) + { + if (this.pendingStoreCheckpoint.IsCompleted == true) + { + this.traceHelper.FasterProgress("Checkpointing state machine: pendingStorecheckpoint has completed."); + (this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition) + = await this.pendingStoreCheckpoint; // observe exceptions here + + // force collection of memory used during checkpointing + GC.Collect(); + + this.traceHelper.FasterProgress("Checkpointing state machine: resetting to initial state"); + // we have reached the end of the state machine transitions + this.pendingStoreCheckpoint = null; + this.pendingCheckpointTrigger = CheckpointTrigger.None; + this.ScheduleNextIdleCheckpointTime(); + this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log); + } + } + else if (this.pendingIndexCheckpoint != null) + { + if (this.pendingIndexCheckpoint.IsCompleted == true) + { + this.traceHelper.FasterProgress("Checkpointing state machine: pendingIndexCheckpoint has completed"); + await this.pendingIndexCheckpoint; // observe exceptions here + + // the store checkpoint is next + this.StartCheckpointOrFailOnTimeout( + checkpointRoutine: () => + { + var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null); + if (token.HasValue) + { + this.traceHelper.FasterProgress("Checkpointing state machine: store checkpoint started"); + this.pendingIndexCheckpoint = null; + this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true); + this.numberEventsSinceLastCheckpoint = 0; + } + var checkpointStarted = token.HasValue; + return checkpointStarted; + + }, + messageOnError: "Could not start store checkpoint before timeout"); + } + } + else if (this.pendingCompaction != null) + { + if (this.pendingCompaction.IsCompleted == true) + { + this.traceHelper.FasterProgress("Checkpointing state machine: pendingCompaction has completed"); + await this.pendingCompaction; // observe exceptions here + + // force collection of memory used during compaction + GC.Collect(); + + // the index checkpoint is next + this.StartCheckpointOrFailOnTimeout( + checkpointRoutine: () => + { + var token = this.store.StartIndexCheckpoint(); + if (token.HasValue) + { + this.traceHelper.FasterProgress("Checkpointing state machine: index checkpoint started"); + this.pendingCompaction = null; + this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false); + } + var checkpointStarted = token.HasValue; + return checkpointStarted; + }, + messageOnError: "Could not start index checkpoint before timeout"); + } + } + else if (this.CheckpointDue(out var trigger, out long? compactUntil)) + { + this.traceHelper.FasterProgress($"Checkpointing state machine: checkpoint is due. Trigger='{trigger}'. compactUntil='{compactUntil}'"); + this.pendingCheckpointTrigger = trigger; + + this.pendingCompaction = this.RunCompactionAsync(compactUntil); + } + } + protected override async Task Process(IList batch) { try @@ -420,68 +571,15 @@ protected override async Task Process(IList batch) this.store.AdjustCacheSize(); // handle progression of checkpointing state machine: none -> pendingCompaction -> pendingIndexCheckpoint -> pendingStoreCheckpoint -> none) - if (this.pendingStoreCheckpoint != null) - { - if (this.pendingStoreCheckpoint.IsCompleted == true) - { - (this.lastCheckpointedCommitLogPosition, this.lastCheckpointedInputQueuePosition) - = await this.pendingStoreCheckpoint; // observe exceptions here - - // force collection of memory used during checkpointing - GC.Collect(); - - // we have reached the end of the state machine transitions - this.pendingStoreCheckpoint = null; - this.pendingCheckpointTrigger = CheckpointTrigger.None; - this.ScheduleNextIdleCheckpointTime(); - this.partition.Settings.TestHooks?.CheckpointInjector?.SequenceComplete((this.store as FasterKV).Log); - } - } - else if (this.pendingIndexCheckpoint != null) - { - if (this.pendingIndexCheckpoint.IsCompleted == true) - { - await this.pendingIndexCheckpoint; // observe exceptions here - - // the store checkpoint is next - var token = this.store.StartStoreCheckpoint(this.CommitLogPosition, this.InputQueuePosition, this.InputQueueFingerprint, null); - if (token.HasValue) - { - this.pendingIndexCheckpoint = null; - this.pendingStoreCheckpoint = this.WaitForCheckpointAsync(false, token.Value, true); - this.numberEventsSinceLastCheckpoint = 0; - } - } - } - else if (this.pendingCompaction != null) - { - if (this.pendingCompaction.IsCompleted == true) - { - await this.pendingCompaction; // observe exceptions here - - // force collection of memory used during compaction - GC.Collect(); - - // the index checkpoint is next - var token = this.store.StartIndexCheckpoint(); - if (token.HasValue) - { - this.pendingCompaction = null; - this.pendingIndexCheckpoint = this.WaitForCheckpointAsync(true, token.Value, false); - } - } - } - else if (this.CheckpointDue(out var trigger, out long? compactUntil)) - { - this.pendingCheckpointTrigger = trigger; - this.pendingCompaction = this.RunCompactionAsync(compactUntil); - } + await this.RunCheckpointingStateMachine(); // periodically publish the partition load information and the send/receive positions + // also report checkpointing stats if (this.lastPublished + PublishInterval < DateTime.UtcNow) { this.lastPublished = DateTime.UtcNow; await this.PublishLoadAndPositions(); + this.LogCheckpointStats(); } if (this.partition.NumberPartitions() > 1 && this.partition.Settings.ActivityScheduler == ActivitySchedulerOptions.Locavore) @@ -558,7 +656,6 @@ protected override async Task Process(IList batch) if (target.HasValue) { target = await this.store.RunCompactionAsync(target.Value); - this.partition.Settings.TestHooks?.CheckpointInjector?.CompactionComplete(this.partition.ErrorHandler); }