diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObjectKey.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObjectKey.cs index c7af8cc8..62fd6560 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObjectKey.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObjectKey.cs @@ -52,7 +52,9 @@ public enum TrackedObjectType { TrackedObjectType.Instance, typeof(InstanceState) }, }; - public static bool IsSingletonType(TrackedObjectType t) => (int) t < (int) TrackedObjectType.History; + public static bool IsSingletonType(TrackedObjectType t) => (int)t < NumberSingletonTypes; + + public const int NumberSingletonTypes = (int)TrackedObjectType.History; public bool IsSingleton => IsSingletonType(this.ObjectType); diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs index 52c5a841..5fd4cad6 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs @@ -104,7 +104,10 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager 20, // 1MB }; - const int StorageFormatVersion = 1; + static readonly int[] StorageFormatVersion = new int[] { + 1, //initial version + 2, //separate singletons + }; public static string GetStorageFormat(NetheriteOrchestrationServiceSettings settings) { @@ -112,7 +115,7 @@ public static string GetStorageFormat(NetheriteOrchestrationServiceSettings sett { UseAlternateObjectStore = settings.UseAlternateObjectStore, UsePSFQueries = settings.UsePSFQueries, - FormatVersion = StorageFormatVersion, + FormatVersion = StorageFormatVersion.Last(), }, Formatting.None); } @@ -131,7 +134,7 @@ public static void CheckStorageFormat(string format, NetheriteOrchestrationServi { throw new InvalidOperationException("The Netherite configuration setting 'UsePSFQueries' is incompatible with the existing taskhub."); } - if ((int)json["FormatVersion"] != StorageFormatVersion) + if ((int)json["FormatVersion"] != StorageFormatVersion.Last()) { throw new InvalidOperationException("The current storage format version is incompatible with the existing taskhub."); } @@ -656,6 +659,8 @@ await this.eventLogCommitBlob.ReleaseLeaseAsync( (string, string) GetObjectLogSnapshotBlobName(Guid token) => ($"cpr-checkpoints/{token}", "snapshot.obj.dat"); + string GetSingletonsSnapshotBlobName(Guid token) => $"cpr-checkpoints/{token}/singletons.dat"; + #endregion #region ILogCommitManager @@ -1013,6 +1018,66 @@ internal IDevice GetSnapshotObjectLogDevice(Guid token, int psfGroupOrdinal) #endregion + internal async Task PersistSingletonsAsync(byte[] singletons, Guid guid) + { + if (this.UseLocalFilesForTestingAndDebugging) + { + var path = Path.Combine(this.LocalCheckpointDirectoryPath, this.GetSingletonsSnapshotBlobName(guid)); + await File.WriteAllBytesAsync(path, singletons); + } + else + { + var singletonsBlob = this.blockBlobPartitionDirectory.GetBlockBlobReference(this.GetSingletonsSnapshotBlobName(guid)); + await this.PerformWithRetriesAsync( + BlobManager.AsynchronousStorageWriteMaxConcurrency, + false, + "CloudBlockBlob.UploadFromByteArrayAsync", + "WriteSingletons", + "", + singletonsBlob.Name, + 1000 + singletons.Length / 5000, + false, + async (numAttempts) => + { + await singletonsBlob.UploadFromByteArrayAsync(singletons, 0, singletons.Length); + return singletons.Length; + }); + } + } + + internal async Task RecoverSingletonsAsync() + { + if (this.UseLocalFilesForTestingAndDebugging) + { + var path = Path.Combine(this.LocalCheckpointDirectoryPath, this.GetSingletonsSnapshotBlobName(this.CheckpointInfo.LogToken)); + var stream = File.OpenRead(path); + return stream; + } + else + { + var singletonsBlob = this.blockBlobPartitionDirectory.GetBlockBlobReference(this.GetSingletonsSnapshotBlobName(this.CheckpointInfo.LogToken)); + var stream = new MemoryStream(); + await this.PerformWithRetriesAsync( + BlobManager.AsynchronousStorageReadMaxConcurrency, + true, + "CloudBlockBlob.DownloadToStreamAsync", + "ReadSingletons", + "", + singletonsBlob.Name, + 20000, + true, + async (numAttempts) => + { + stream.Seek(0, SeekOrigin.Begin); + await singletonsBlob.DownloadToStreamAsync(stream); + return stream.Position; + }); + + stream.Seek(0, SeekOrigin.Begin); + return stream; + } + } + internal async Task FinalizeCheckpointCompletedAsync() { // write the final file that has all the checkpoint info diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs index ea94ab78..a2365a57 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs @@ -6,12 +6,14 @@ namespace DurableTask.Netherite.Faster using System; using System.Collections.Generic; using System.Diagnostics; + using System.IO; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using DurableTask.Core; using DurableTask.Core.Common; + using DurableTask.Core.Tracing; using FASTER.core; class FasterKV : TrackedObjectStore @@ -22,6 +24,9 @@ class FasterKV : TrackedObjectStore readonly BlobManager blobManager; readonly CancellationToken terminationToken; + TrackedObject[] singletons; + Task persistSingletonsTask; + ClientSession> mainSession; #if FASTER_SUPPORTS_PSF @@ -46,9 +51,11 @@ public FasterKV(Partition partition, BlobManager blobManager) new SerializerSettings { keySerializer = () => new Key.Serializer(), - valueSerializer = () => new Value.Serializer(this.StoreStats), + valueSerializer = () => new Value.Serializer(this.StoreStats, partition.TraceHelper), }); + this.singletons = new TrackedObject[TrackedObjectKey.NumberSingletonTypes]; + #if FASTER_SUPPORTS_PSF if (partition.Settings.UsePSFQueries) { @@ -103,9 +110,23 @@ public override void InitMainSession() try { await this.blobManager.FindCheckpointsAsync(); + + // recover singletons + this.blobManager.TraceHelper.FasterProgress($"Recovering Singletons"); + using (var stream = await this.blobManager.RecoverSingletonsAsync()) + { + this.singletons = Serializer.DeserializeSingletons(stream); + } + foreach (var singleton in this.singletons) + { + singleton.Partition = this.partition; + } + + // recover Faster this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV"); - await this.fht.RecoverAsync(numPagesToPreload: 0); + await this.fht.RecoverAsync(numPagesToPreload: 0); this.mainSession = this.CreateASession(); + return (this.blobManager.CheckpointInfo.CommitLogPosition, this.blobManager.CheckpointInfo.InputQueuePosition); } catch (Exception exception) @@ -139,7 +160,16 @@ public override bool TakeFullCheckpoint(long commitLogPosition, long inputQueueP { this.blobManager.CheckpointInfo.CommitLogPosition = commitLogPosition; this.blobManager.CheckpointInfo.InputQueuePosition = inputQueuePosition; - return this.fht.TakeFullCheckpoint(out checkpointGuid); + if (this.fht.TakeFullCheckpoint(out checkpointGuid)) + { + byte[] serializedSingletons = Serializer.SerializeSingletons(this.singletons); + this.persistSingletonsTask = this.blobManager.PersistSingletonsAsync(serializedSingletons, checkpointGuid); + return true; + } + else + { + return false; + } } catch (Exception exception) when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception)) @@ -155,6 +185,8 @@ public override async ValueTask CompleteCheckpointAsync() // workaround for hanging in CompleteCheckpointAsync: use custom thread. await RunOnDedicatedThreadAsync(() => this.fht.CompleteCheckpointAsync(this.terminationToken).AsTask()); //await this.fht.CompleteCheckpointAsync(this.terminationToken); + + await this.persistSingletonsTask; } catch (Exception exception) when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception)) @@ -176,13 +208,13 @@ public override Task FinalizeCheckpointCompletedAsync(Guid guid) return this.blobManager.FinalizeCheckpointCompletedAsync(); } - public override Guid? StartIndexCheckpoint() { try { if (this.fht.TakeIndexCheckpoint(out var token)) { + this.persistSingletonsTask = Task.CompletedTask; return token; } else @@ -209,6 +241,9 @@ public override Task FinalizeCheckpointCompletedAsync(Guid guid) // according to Badrish this ensures proper fencing w.r.t. session this.mainSession.Refresh(); + byte[] serializedSingletons = Serializer.SerializeSingletons(this.singletons); + this.persistSingletonsTask = this.blobManager.PersistSingletonsAsync(serializedSingletons, token); + return token; } else @@ -415,6 +450,7 @@ public override void ReadAsync(PartitionReadEvent readEvent, EffectTracker effec void TryRead(Key key) { + this.partition.Assert(!key.Val.IsSingleton); TrackedObject target = null; var status = this.mainSession.Read(ref key, ref effectTracker, ref target, readEvent, 0); switch (status) @@ -449,9 +485,16 @@ public override async ValueTask ReadAsync(Key key, EffectTracker { try { - var result = await this.mainSession.ReadAsync(key, effectTracker, context:null, token: this.terminationToken).ConfigureAwait(false); - var (status, output) = result.Complete(); - return output; + if (key.Val.IsSingleton) + { + return this.singletons[(int)key.Val.ObjectType]; + } + else + { + var result = await this.mainSession.ReadAsync(key, effectTracker, context: null, token: this.terminationToken).ConfigureAwait(false); + var (status, output) = result.Complete(); + return output; + } } catch (Exception exception) when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception)) @@ -463,11 +506,12 @@ public override async ValueTask ReadAsync(Key key, EffectTracker // read a tracked object on a query session async ValueTask ReadAsync( ClientSession session, - Key key, + Key key, EffectTracker effectTracker) { try { + this.partition.Assert(!key.Val.IsSingleton); var result = await session.ReadAsync(key, effectTracker, context: null, token: this.terminationToken).ConfigureAwait(false); var (status, output) = result.Complete(); return output; @@ -484,12 +528,11 @@ async ValueTask ReadAsync( public override ValueTask CreateAsync(Key key) { try - { + { + this.partition.Assert(key.Val.IsSingleton); TrackedObject newObject = TrackedObjectKey.Factory(key); newObject.Partition = this.partition; - Value newValue = newObject; - // Note: there is no UpsertAsync(). - this.mainSession.Upsert(ref key, ref newValue); + this.singletons[(int)key.Val.ObjectType] = newObject; return new ValueTask(newObject); } catch (Exception exception) @@ -503,7 +546,14 @@ public async override ValueTask ProcessEffectOnTrackedObject(Key k, EffectTracke { try { - (await this.mainSession.RMWAsync(ref k, ref tracker, token: this.terminationToken)).Complete(); + if (k.Val.IsSingleton) + { + tracker.ProcessEffectOn(this.singletons[(int)k.Val.ObjectType]); + } + else + { + (await this.mainSession.RMWAsync(ref k, ref tracker, token: this.terminationToken)).Complete(); + } } catch (Exception exception) when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception)) @@ -706,10 +756,12 @@ public struct Value public class Serializer : BinaryObjectSerializer { readonly StoreStatistics storeStats; + readonly PartitionTraceHelper traceHelper; - public Serializer(StoreStatistics storeStats) + public Serializer(StoreStatistics storeStats, PartitionTraceHelper traceHelper) { this.storeStats = storeStats; + this.traceHelper = traceHelper; } public override void Deserialize(out Value obj) @@ -717,6 +769,7 @@ public override void Deserialize(out Value obj) int count = this.reader.ReadInt32(); byte[] bytes = this.reader.ReadBytes(count); var trackedObject = DurableTask.Netherite.Serializer.DeserializeTrackedObject(bytes); + //this.traceHelper.TraceProgress($"Deserialized TrackedObject {trackedObject.Key} size={bytes.Length}"); //if (trackedObject.Key.IsSingleton) //{ // this.storeStats.A++; diff --git a/src/DurableTask.Netherite/Util/Serializer.cs b/src/DurableTask.Netherite/Util/Serializer.cs index 1de75b2c..588cdd79 100644 --- a/src/DurableTask.Netherite/Util/Serializer.cs +++ b/src/DurableTask.Netherite/Util/Serializer.cs @@ -7,6 +7,7 @@ namespace DurableTask.Netherite using System.IO; using System.Runtime.Serialization; using System.Text; + using DurableTask.Netherite.Faster; static class Serializer { @@ -16,6 +17,12 @@ static readonly DataContractSerializer eventSerializer static readonly DataContractSerializer trackedObjectSerializer = new DataContractSerializer(typeof(TrackedObject)); + static readonly DataContractSerializer singletonsSerializer + = new DataContractSerializer(typeof(TrackedObject[])); + + static readonly DataContractSerializer checkpointInfoSerializer + = new DataContractSerializer(typeof(CheckpointInfo)); + static readonly UnicodeEncoding uniEncoding = new UnicodeEncoding(); public static byte[] SerializeEvent(Event e, byte? header = null) @@ -68,5 +75,31 @@ public static TrackedObject DeserializeTrackedObject(byte[] bytes) result.SerializationCache = bytes; return result; } + + public static byte[] SerializeSingletons(TrackedObject[] singletons) + { + var stream = new MemoryStream(); + singletonsSerializer.WriteObject(stream, singletons); + return stream.ToArray(); + } + + public static TrackedObject[] DeserializeSingletons(Stream stream) + { + var result = (TrackedObject[])singletonsSerializer.ReadObject(stream); + return result; + } + + public static MemoryStream SerializeCheckpointInfo(CheckpointInfo checkpointInfo) + { + var stream = new MemoryStream(); + checkpointInfoSerializer.WriteObject(stream, checkpointInfo); + return stream; + } + + public static CheckpointInfo DeserializeCheckpointInfo(Stream stream) + { + var result = (CheckpointInfo)checkpointInfoSerializer.ReadObject(stream); + return result; + } } }