From b5dd63ff8f3a9e0116daa67dff52b67fb3efedd5 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Mon, 11 Oct 2021 11:34:12 -0700 Subject: [PATCH 1/2] update to FASTER 1.9.3 --- .../DurableTask.Netherite.csproj | 2 +- .../Faster/AzureBlobs/BlobManager.cs | 23 +++++++++++++++---- .../AzureBlobs/LocalFileCheckpointManager.cs | 16 +++++++++++-- .../AzureBlobs/PsfBlobCheckpointManager.cs | 13 +++++++++-- .../StorageProviders/Faster/FasterKV.cs | 12 ++++++++++ 5 files changed, 57 insertions(+), 9 deletions(-) diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 18f7c05e..1f119340 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -48,7 +48,7 @@ - + diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs index dd693827..6f3e960a 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs @@ -92,7 +92,7 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager SegmentSizeBits = useSeparatePageBlobStorage ? 35 // 32 GB : 32, // 4 GB - CopyReadsToTail = true, + CopyReadsToTail = CopyReadsToTail.FromReadOnly, MemorySizeBits = (numPartitions <= 1) ? 25 : // 32MB (numPartitions <= 2) ? 24 : // 16MB @@ -168,6 +168,8 @@ public void PurgeAll() //TODO figure out what is supposed to go here } + public void OnRecovery(Guid indexToken, Guid logToken) { /* TODO */ } + public CheckpointSettings StoreCheckpointSettings => new CheckpointSettings { CheckpointManager = this.UseLocalFiles ? (ICheckpointManager)this.LocalCheckpointManager : (ICheckpointManager)this, @@ -808,11 +810,14 @@ void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMeta void ICheckpointManager.CommitLogCheckpoint(Guid logToken, byte[] commitMetadata) => this.CommitLogCheckpoint(logToken, commitMetadata, InvalidPsfGroupOrdinal); + void ICheckpointManager.CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog) + => this.CommitLogIncrementalCheckpoint(logToken, version, commitMetadata, deltaLog, InvalidPsfGroupOrdinal); + byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken) => this.GetIndexCheckpointMetadata(indexToken, InvalidPsfGroupOrdinal); - byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken) - => this.GetLogCheckpointMetadata(logToken, InvalidPsfGroupOrdinal); + byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog) + => this.GetLogCheckpointMetadata(logToken, InvalidPsfGroupOrdinal, deltaLog); IDevice ICheckpointManager.GetIndexDevice(Guid indexToken) => this.GetIndexDevice(indexToken, InvalidPsfGroupOrdinal); @@ -823,6 +828,9 @@ IDevice ICheckpointManager.GetSnapshotLogDevice(Guid token) IDevice ICheckpointManager.GetSnapshotObjectLogDevice(Guid token) => this.GetSnapshotObjectLogDevice(token, InvalidPsfGroupOrdinal); + IDevice ICheckpointManager.GetDeltaLogDevice(Guid token) + => this.GetDeltaLogDevice(token, InvalidPsfGroupOrdinal); + IEnumerable ICheckpointManager.GetIndexCheckpointTokens() { var indexToken = this.CheckpointInfo.IndexToken; @@ -949,6 +957,11 @@ internal void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata, int psfG this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.CommitLogCheckpoint Returned from {tag}, target={metaFileBlob.Name}"); } + internal void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog, int indexOrdinal) + { + throw new NotImplementedException("incremental checkpointing is not implemented"); + } + internal byte[] GetIndexCheckpointMetadata(Guid indexToken, int psfGroupOrdinal) { var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal); @@ -977,7 +990,7 @@ internal byte[] GetIndexCheckpointMetadata(Guid indexToken, int psfGroupOrdinal) this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexCommitMetadata Returned {result?.Length ?? null} bytes from {tag}, target={metaFileBlob.Name}"); return result; } - internal byte[] GetLogCheckpointMetadata(Guid logToken, int psfGroupOrdinal) + internal byte[] GetLogCheckpointMetadata(Guid logToken, int psfGroupOrdinal, DeltaLog deltaLog) // TODO DeltaLog { var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal); this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexCommitMetadata Called on {tag}, logToken={logToken}"); @@ -1050,6 +1063,8 @@ internal IDevice GetSnapshotObjectLogDevice(Guid token, int psfGroupOrdinal) return device; } + internal IDevice GetDeltaLogDevice(Guid token, int indexOrdinal) => default; // TODO + #endregion internal async Task FinalizeCheckpointCompletedAsync() diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/LocalFileCheckpointManager.cs b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/LocalFileCheckpointManager.cs index 52f0a77c..c79366aa 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/LocalFileCheckpointManager.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/LocalFileCheckpointManager.cs @@ -39,11 +39,17 @@ void ICheckpointManager.CommitLogCheckpoint(Guid logToken, byte[] commitMetadata this.localCheckpointManager.CommitLogCheckpoint(logToken, commitMetadata); this.checkpointInfo.LogToken = logToken; } + + void ICheckpointManager.CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog) + { + throw new NotImplementedException("incremental checkpointing is not implemented"); + } + byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken) => this.localCheckpointManager.GetIndexCheckpointMetadata(indexToken); - byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken) - => this.localCheckpointManager.GetLogCheckpointMetadata(logToken); + byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog) + => this.localCheckpointManager.GetLogCheckpointMetadata(logToken, deltaLog); IDevice ICheckpointManager.GetIndexDevice(Guid indexToken) => this.localCheckpointManager.GetIndexDevice(indexToken); @@ -54,6 +60,9 @@ IDevice ICheckpointManager.GetSnapshotLogDevice(Guid token) IDevice ICheckpointManager.GetSnapshotObjectLogDevice(Guid token) => this.localCheckpointManager.GetSnapshotObjectLogDevice(token); + IDevice ICheckpointManager.GetDeltaLogDevice(Guid token) + => this.localCheckpointManager.GetDeltaLogDevice(token); + internal string GetLatestCheckpointJson() => File.ReadAllText(this.checkpointCompletedFilename); IEnumerable ICheckpointManager.GetIndexCheckpointTokens() @@ -71,6 +80,9 @@ IEnumerable ICheckpointManager.GetLogCheckpointTokens() void ICheckpointManager.PurgeAll() => this.localCheckpointManager.PurgeAll(); + public void OnRecovery(Guid indexToken, Guid logToken) + => this.localCheckpointManager.OnRecovery(indexToken, logToken); + void IDisposable.Dispose() => this.localCheckpointManager.Dispose(); } diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/PsfBlobCheckpointManager.cs b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/PsfBlobCheckpointManager.cs index 0435b45f..e8b51859 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/PsfBlobCheckpointManager.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/PsfBlobCheckpointManager.cs @@ -30,6 +30,9 @@ void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMeta void ICheckpointManager.CommitLogCheckpoint(Guid logToken, byte[] commitMetadata) => this.blobManager.CommitLogCheckpoint(logToken, commitMetadata, this.groupOrdinal); + void ICheckpointManager.CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog) + => this.blobManager.CommitLogIncrementalCheckpoint(logToken, version, commitMetadata, deltaLog, this.groupOrdinal); + IDevice ICheckpointManager.GetIndexDevice(Guid indexToken) => this.blobManager.GetIndexDevice(indexToken, this.groupOrdinal); @@ -39,11 +42,14 @@ IDevice ICheckpointManager.GetSnapshotLogDevice(Guid token) IDevice ICheckpointManager.GetSnapshotObjectLogDevice(Guid token) => this.blobManager.GetSnapshotObjectLogDevice(token, this.groupOrdinal); + IDevice ICheckpointManager.GetDeltaLogDevice(Guid token) + => this.blobManager.GetDeltaLogDevice(token, this.groupOrdinal); + byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken) => this.blobManager.GetIndexCheckpointMetadata(indexToken, this.groupOrdinal); - byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken) - => this.blobManager.GetLogCheckpointMetadata(logToken, this.groupOrdinal); + byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog) + => this.blobManager.GetLogCheckpointMetadata(logToken, this.groupOrdinal, deltaLog); IEnumerable ICheckpointManager.GetIndexCheckpointTokens() { @@ -59,6 +65,9 @@ IEnumerable ICheckpointManager.GetLogCheckpointTokens() public void PurgeAll() { /* TODO */ } + public void OnRecovery(Guid indexToken, Guid logToken) + => this.blobManager.OnRecovery(indexToken, logToken); + public void Dispose() { } } } diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs index 7acc3af3..3648c432 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs @@ -808,6 +808,18 @@ public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoi public void RMWCompletionCallback(ref Key key, ref EffectTracker input, object ctx, Status status) { } public void UpsertCompletionCallback(ref Key key, ref Value value, object ctx) { } public void DeleteCompletionCallback(ref Key key, object ctx) { } + + // We do not need to lock records, because writes and non-query reads are single-session, and query reads can only race on instance states which are immutable + public bool SupportsLocking => false; + + public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext) + { + } + + public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext) + { + return true; + } } } } From 8c103256dde9d759bc889c7537a1838035f56c54 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Mon, 11 Oct 2021 14:00:23 -0700 Subject: [PATCH 2/2] update to 1.9.6 --- .../DurableTask.Netherite.csproj | 2 +- .../Faster/AzureBlobs/BlobManager.cs | 20 +++++++++++++------ .../AzureBlobs/LocalFileCheckpointManager.cs | 7 +++++-- .../AzureBlobs/PsfBlobCheckpointManager.cs | 6 ++++-- .../StorageProviders/Faster/FasterKV.cs | 13 ++++++------ 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 1f119340..f7a3faef 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -48,7 +48,7 @@ - + diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs index 6f3e960a..7d67d17e 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs @@ -160,15 +160,23 @@ public static void CheckStorageFormat(string format, NetheriteOrchestrationServi public void Dispose() { - //TODO figure out what is supposed to go here + // we do not need to dispose any resources for the commit manager, because any such resources are deleted together with the taskhub + } + + public void Purge(Guid token) + { + throw new NotImplementedException("Purges are handled directly on recovery, not via FASTER"); } public void PurgeAll() { - //TODO figure out what is supposed to go here + throw new NotImplementedException("Purges are handled directly on recovery, not via FASTER"); } - public void OnRecovery(Guid indexToken, Guid logToken) { /* TODO */ } + public void OnRecovery(Guid indexToken, Guid logToken) + { + // we handle cleanup of old checkpoints somewhere else + } public CheckpointSettings StoreCheckpointSettings => new CheckpointSettings { @@ -816,8 +824,8 @@ void ICheckpointManager.CommitLogIncrementalCheckpoint(Guid logToken, int versio byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken) => this.GetIndexCheckpointMetadata(indexToken, InvalidPsfGroupOrdinal); - byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog) - => this.GetLogCheckpointMetadata(logToken, InvalidPsfGroupOrdinal, deltaLog); + byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo) + => this.GetLogCheckpointMetadata(logToken, InvalidPsfGroupOrdinal, deltaLog, scanDelta, recoverTo); IDevice ICheckpointManager.GetIndexDevice(Guid indexToken) => this.GetIndexDevice(indexToken, InvalidPsfGroupOrdinal); @@ -990,7 +998,7 @@ internal byte[] GetIndexCheckpointMetadata(Guid indexToken, int psfGroupOrdinal) this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexCommitMetadata Returned {result?.Length ?? null} bytes from {tag}, target={metaFileBlob.Name}"); return result; } - internal byte[] GetLogCheckpointMetadata(Guid logToken, int psfGroupOrdinal, DeltaLog deltaLog) // TODO DeltaLog + internal byte[] GetLogCheckpointMetadata(Guid logToken, int psfGroupOrdinal, DeltaLog deltaLog, bool scanDelta, long recoverTo) { var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal); this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexCommitMetadata Called on {tag}, logToken={logToken}"); diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/LocalFileCheckpointManager.cs b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/LocalFileCheckpointManager.cs index c79366aa..f0845066 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/LocalFileCheckpointManager.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/LocalFileCheckpointManager.cs @@ -48,8 +48,8 @@ void ICheckpointManager.CommitLogIncrementalCheckpoint(Guid logToken, int versio byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken) => this.localCheckpointManager.GetIndexCheckpointMetadata(indexToken); - byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog) - => this.localCheckpointManager.GetLogCheckpointMetadata(logToken, deltaLog); + byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo) + => this.localCheckpointManager.GetLogCheckpointMetadata(logToken, deltaLog, scanDelta, recoverTo); IDevice ICheckpointManager.GetIndexDevice(Guid indexToken) => this.localCheckpointManager.GetIndexDevice(indexToken); @@ -77,6 +77,9 @@ IEnumerable ICheckpointManager.GetLogCheckpointTokens() yield return logToken; } + void ICheckpointManager.Purge(Guid guid) + => this.localCheckpointManager.Purge(guid); + void ICheckpointManager.PurgeAll() => this.localCheckpointManager.PurgeAll(); diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/PsfBlobCheckpointManager.cs b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/PsfBlobCheckpointManager.cs index e8b51859..ed66bb8f 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/PsfBlobCheckpointManager.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/PsfBlobCheckpointManager.cs @@ -48,8 +48,8 @@ IDevice ICheckpointManager.GetDeltaLogDevice(Guid token) byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken) => this.blobManager.GetIndexCheckpointMetadata(indexToken, this.groupOrdinal); - byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog) - => this.blobManager.GetLogCheckpointMetadata(logToken, this.groupOrdinal, deltaLog); + byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo) + => this.blobManager.GetLogCheckpointMetadata(logToken, this.groupOrdinal, deltaLog, scanDelta, recoverTo); IEnumerable ICheckpointManager.GetIndexCheckpointTokens() { @@ -63,6 +63,8 @@ IEnumerable ICheckpointManager.GetLogCheckpointTokens() yield return logToken; } + public void Purge(Guid guid) { /* TODO */ } + public void PurgeAll() { /* TODO */ } public void OnRecovery(Guid indexToken, Guid logToken) diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs index 3648c432..1bcf25d5 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs @@ -457,9 +457,10 @@ IAsyncEnumerable ScanOrchestrationStates( void RunScan() { using var _ = EventTraceContext.MakeContext(0, queryId); + using var session = this.CreateASession(); // get the unique set of keys appearing in the log and emit them - using var iter1 = this.fht.Iterate(); + using var iter1 = session.Iterate(); Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); @@ -683,7 +684,7 @@ public Functions(Partition partition, StoreStatistics stats) this.stats = stats; } - public void InitialUpdater(ref Key key, ref EffectTracker tracker, ref Value value) + public void InitialUpdater(ref Key key, ref EffectTracker tracker, ref Value value, ref TrackedObject output) { var trackedObject = TrackedObjectKey.Factory(key.Val); this.stats.Create++; @@ -693,7 +694,7 @@ public void InitialUpdater(ref Key key, ref EffectTracker tracker, ref Value val this.stats.Modify++; } - public bool InPlaceUpdater(ref Key key, ref EffectTracker tracker, ref Value value) + public bool InPlaceUpdater(ref Key key, ref EffectTracker tracker, ref Value value, ref TrackedObject output) { this.partition.Assert(value.Val is TrackedObject); TrackedObject trackedObject = value; @@ -704,9 +705,9 @@ public bool InPlaceUpdater(ref Key key, ref EffectTracker tracker, ref Value val return true; } - public bool NeedCopyUpdate(ref Key key, ref EffectTracker tracker, ref Value value) => true; + public bool NeedCopyUpdate(ref Key key, ref EffectTracker tracker, ref Value value, ref TrackedObject output) => true; - public void CopyUpdater(ref Key key, ref EffectTracker tracker, ref Value oldValue, ref Value newValue) + public void CopyUpdater(ref Key key, ref EffectTracker tracker, ref Value oldValue, ref Value newValue, ref TrackedObject output) { this.stats.Copy++; @@ -805,7 +806,7 @@ public void ReadCompletionCallback(ref Key key, ref EffectTracker tracker, ref T } public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint) { } - public void RMWCompletionCallback(ref Key key, ref EffectTracker input, object ctx, Status status) { } + public void RMWCompletionCallback(ref Key key, ref EffectTracker input, ref TrackedObject output, object ctx, Status status) { } public void UpsertCompletionCallback(ref Key key, ref Value value, object ctx) { } public void DeleteCompletionCallback(ref Key key, object ctx) { }