Skip to content

Commit

Permalink
move singleton objects out of FASTER store, and instead checkpoint th…
Browse files Browse the repository at this point in the history
…em directly in a blob.
  • Loading branch information
sebastianburckhardt committed Apr 20, 2021
1 parent 3c5c586 commit 2e118bc
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public enum TrackedObjectType
{ TrackedObjectType.Instance, typeof(InstanceState) },
};

public static bool IsSingletonType(TrackedObjectType t) => (int) t < (int) TrackedObjectType.History;
public static bool IsSingletonType(TrackedObjectType t) => (int)t < NumberSingletonTypes;

public const int NumberSingletonTypes = (int)TrackedObjectType.History;

public bool IsSingleton => IsSingletonType(this.ObjectType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,18 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
20, // 1MB
};

const int StorageFormatVersion = 1;
static readonly int[] StorageFormatVersion = new int[] {
1, //initial version
2, //separate singletons
};

public static string GetStorageFormat(NetheriteOrchestrationServiceSettings settings)
{
return JsonConvert.SerializeObject(new
{
UseAlternateObjectStore = settings.UseAlternateObjectStore,
UsePSFQueries = settings.UsePSFQueries,
FormatVersion = StorageFormatVersion,
FormatVersion = StorageFormatVersion.Last(),
},
Formatting.None);
}
Expand All @@ -131,7 +134,7 @@ public static void CheckStorageFormat(string format, NetheriteOrchestrationServi
{
throw new InvalidOperationException("The Netherite configuration setting 'UsePSFQueries' is incompatible with the existing taskhub.");
}
if ((int)json["FormatVersion"] != StorageFormatVersion)
if ((int)json["FormatVersion"] != StorageFormatVersion.Last())
{
throw new InvalidOperationException("The current storage format version is incompatible with the existing taskhub.");
}
Expand Down Expand Up @@ -656,6 +659,8 @@ await this.eventLogCommitBlob.ReleaseLeaseAsync(

(string, string) GetObjectLogSnapshotBlobName(Guid token) => ($"cpr-checkpoints/{token}", "snapshot.obj.dat");

string GetSingletonsSnapshotBlobName(Guid token) => $"cpr-checkpoints/{token}/singletons.dat";

#endregion

#region ILogCommitManager
Expand Down Expand Up @@ -1013,6 +1018,66 @@ internal IDevice GetSnapshotObjectLogDevice(Guid token, int psfGroupOrdinal)

#endregion

internal async Task PersistSingletonsAsync(byte[] singletons, Guid guid)
{
if (this.UseLocalFilesForTestingAndDebugging)
{
var path = Path.Combine(this.LocalCheckpointDirectoryPath, this.GetSingletonsSnapshotBlobName(guid));
await File.WriteAllBytesAsync(path, singletons);
}
else
{
var singletonsBlob = this.blockBlobPartitionDirectory.GetBlockBlobReference(this.GetSingletonsSnapshotBlobName(guid));
await this.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageWriteMaxConcurrency,
false,
"CloudBlockBlob.UploadFromByteArrayAsync",
"WriteSingletons",
"",
singletonsBlob.Name,
1000 + singletons.Length / 5000,
false,
async (numAttempts) =>
{
await singletonsBlob.UploadFromByteArrayAsync(singletons, 0, singletons.Length);
return singletons.Length;
});
}
}

internal async Task<Stream> RecoverSingletonsAsync()
{
if (this.UseLocalFilesForTestingAndDebugging)
{
var path = Path.Combine(this.LocalCheckpointDirectoryPath, this.GetSingletonsSnapshotBlobName(this.CheckpointInfo.LogToken));
var stream = File.OpenRead(path);
return stream;
}
else
{
var singletonsBlob = this.blockBlobPartitionDirectory.GetBlockBlobReference(this.GetSingletonsSnapshotBlobName(this.CheckpointInfo.LogToken));
var stream = new MemoryStream();
await this.PerformWithRetriesAsync(
BlobManager.AsynchronousStorageReadMaxConcurrency,
true,
"CloudBlockBlob.DownloadToStreamAsync",
"ReadSingletons",
"",
singletonsBlob.Name,
20000,
true,
async (numAttempts) =>
{
stream.Seek(0, SeekOrigin.Begin);
await singletonsBlob.DownloadToStreamAsync(stream);
return stream.Position;
});

stream.Seek(0, SeekOrigin.Begin);
return stream;
}
}

internal async Task FinalizeCheckpointCompletedAsync()
{
// write the final file that has all the checkpoint info
Expand Down
81 changes: 67 additions & 14 deletions src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ namespace DurableTask.Netherite.Faster
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Common;
using DurableTask.Core.Tracing;
using FASTER.core;

class FasterKV : TrackedObjectStore
Expand All @@ -22,6 +24,9 @@ class FasterKV : TrackedObjectStore
readonly BlobManager blobManager;
readonly CancellationToken terminationToken;

TrackedObject[] singletons;
Task persistSingletonsTask;

ClientSession<Key, Value, EffectTracker, TrackedObject, object, IFunctions<Key, Value, EffectTracker, TrackedObject, object>> mainSession;

#if FASTER_SUPPORTS_PSF
Expand All @@ -46,9 +51,11 @@ public FasterKV(Partition partition, BlobManager blobManager)
new SerializerSettings<Key, Value>
{
keySerializer = () => new Key.Serializer(),
valueSerializer = () => new Value.Serializer(this.StoreStats),
valueSerializer = () => new Value.Serializer(this.StoreStats, partition.TraceHelper),
});

this.singletons = new TrackedObject[TrackedObjectKey.NumberSingletonTypes];

#if FASTER_SUPPORTS_PSF
if (partition.Settings.UsePSFQueries)
{
Expand Down Expand Up @@ -103,9 +110,23 @@ public override void InitMainSession()
try
{
await this.blobManager.FindCheckpointsAsync();

// recover singletons
this.blobManager.TraceHelper.FasterProgress($"Recovering Singletons");
using (var stream = await this.blobManager.RecoverSingletonsAsync())
{
this.singletons = Serializer.DeserializeSingletons(stream);
}
foreach (var singleton in this.singletons)
{
singleton.Partition = this.partition;
}

// recover Faster
this.blobManager.TraceHelper.FasterProgress($"Recovering FasterKV");
await this.fht.RecoverAsync(numPagesToPreload: 0);
await this.fht.RecoverAsync(numPagesToPreload: 0);
this.mainSession = this.CreateASession();

return (this.blobManager.CheckpointInfo.CommitLogPosition, this.blobManager.CheckpointInfo.InputQueuePosition);
}
catch (Exception exception)
Expand Down Expand Up @@ -139,7 +160,16 @@ public override bool TakeFullCheckpoint(long commitLogPosition, long inputQueueP
{
this.blobManager.CheckpointInfo.CommitLogPosition = commitLogPosition;
this.blobManager.CheckpointInfo.InputQueuePosition = inputQueuePosition;
return this.fht.TakeFullCheckpoint(out checkpointGuid);
if (this.fht.TakeFullCheckpoint(out checkpointGuid))
{
byte[] serializedSingletons = Serializer.SerializeSingletons(this.singletons);
this.persistSingletonsTask = this.blobManager.PersistSingletonsAsync(serializedSingletons, checkpointGuid);
return true;
}
else
{
return false;
}
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
Expand All @@ -155,6 +185,8 @@ public override async ValueTask CompleteCheckpointAsync()
// workaround for hanging in CompleteCheckpointAsync: use custom thread.
await RunOnDedicatedThreadAsync(() => this.fht.CompleteCheckpointAsync(this.terminationToken).AsTask());
//await this.fht.CompleteCheckpointAsync(this.terminationToken);

await this.persistSingletonsTask;
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
Expand All @@ -176,13 +208,13 @@ public override Task FinalizeCheckpointCompletedAsync(Guid guid)
return this.blobManager.FinalizeCheckpointCompletedAsync();
}


public override Guid? StartIndexCheckpoint()
{
try
{
if (this.fht.TakeIndexCheckpoint(out var token))
{
this.persistSingletonsTask = Task.CompletedTask;
return token;
}
else
Expand All @@ -209,6 +241,9 @@ public override Task FinalizeCheckpointCompletedAsync(Guid guid)
// according to Badrish this ensures proper fencing w.r.t. session
this.mainSession.Refresh();

byte[] serializedSingletons = Serializer.SerializeSingletons(this.singletons);
this.persistSingletonsTask = this.blobManager.PersistSingletonsAsync(serializedSingletons, token);

return token;
}
else
Expand Down Expand Up @@ -415,6 +450,7 @@ public override void ReadAsync(PartitionReadEvent readEvent, EffectTracker effec

void TryRead(Key key)
{
this.partition.Assert(!key.Val.IsSingleton);
TrackedObject target = null;
var status = this.mainSession.Read(ref key, ref effectTracker, ref target, readEvent, 0);
switch (status)
Expand Down Expand Up @@ -449,9 +485,16 @@ public override async ValueTask<TrackedObject> ReadAsync(Key key, EffectTracker
{
try
{
var result = await this.mainSession.ReadAsync(key, effectTracker, context:null, token: this.terminationToken).ConfigureAwait(false);
var (status, output) = result.Complete();
return output;
if (key.Val.IsSingleton)
{
return this.singletons[(int)key.Val.ObjectType];
}
else
{
var result = await this.mainSession.ReadAsync(key, effectTracker, context: null, token: this.terminationToken).ConfigureAwait(false);
var (status, output) = result.Complete();
return output;
}
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
Expand All @@ -463,11 +506,12 @@ public override async ValueTask<TrackedObject> ReadAsync(Key key, EffectTracker
// read a tracked object on a query session
async ValueTask<TrackedObject> ReadAsync(
ClientSession<Key, Value, EffectTracker, TrackedObject, object, Functions> session,
Key key,
Key key,
EffectTracker effectTracker)
{
try
{
this.partition.Assert(!key.Val.IsSingleton);
var result = await session.ReadAsync(key, effectTracker, context: null, token: this.terminationToken).ConfigureAwait(false);
var (status, output) = result.Complete();
return output;
Expand All @@ -484,12 +528,11 @@ async ValueTask<TrackedObject> ReadAsync(
public override ValueTask<TrackedObject> CreateAsync(Key key)
{
try
{
{
this.partition.Assert(key.Val.IsSingleton);
TrackedObject newObject = TrackedObjectKey.Factory(key);
newObject.Partition = this.partition;
Value newValue = newObject;
// Note: there is no UpsertAsync().
this.mainSession.Upsert(ref key, ref newValue);
this.singletons[(int)key.Val.ObjectType] = newObject;
return new ValueTask<TrackedObject>(newObject);
}
catch (Exception exception)
Expand All @@ -503,7 +546,14 @@ public async override ValueTask ProcessEffectOnTrackedObject(Key k, EffectTracke
{
try
{
(await this.mainSession.RMWAsync(ref k, ref tracker, token: this.terminationToken)).Complete();
if (k.Val.IsSingleton)
{
tracker.ProcessEffectOn(this.singletons[(int)k.Val.ObjectType]);
}
else
{
(await this.mainSession.RMWAsync(ref k, ref tracker, token: this.terminationToken)).Complete();
}
}
catch (Exception exception)
when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception))
Expand Down Expand Up @@ -706,17 +756,20 @@ public struct Value
public class Serializer : BinaryObjectSerializer<Value>
{
readonly StoreStatistics storeStats;
readonly PartitionTraceHelper traceHelper;

public Serializer(StoreStatistics storeStats)
public Serializer(StoreStatistics storeStats, PartitionTraceHelper traceHelper)
{
this.storeStats = storeStats;
this.traceHelper = traceHelper;
}

public override void Deserialize(out Value obj)
{
int count = this.reader.ReadInt32();
byte[] bytes = this.reader.ReadBytes(count);
var trackedObject = DurableTask.Netherite.Serializer.DeserializeTrackedObject(bytes);
//this.traceHelper.TraceProgress($"Deserialized TrackedObject {trackedObject.Key} size={bytes.Length}");
//if (trackedObject.Key.IsSingleton)
//{
// this.storeStats.A++;
Expand Down
33 changes: 33 additions & 0 deletions src/DurableTask.Netherite/Util/Serializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace DurableTask.Netherite
using System.IO;
using System.Runtime.Serialization;
using System.Text;
using DurableTask.Netherite.Faster;

static class Serializer
{
Expand All @@ -16,6 +17,12 @@ static readonly DataContractSerializer eventSerializer
static readonly DataContractSerializer trackedObjectSerializer
= new DataContractSerializer(typeof(TrackedObject));

static readonly DataContractSerializer singletonsSerializer
= new DataContractSerializer(typeof(TrackedObject[]));

static readonly DataContractSerializer checkpointInfoSerializer
= new DataContractSerializer(typeof(CheckpointInfo));

static readonly UnicodeEncoding uniEncoding = new UnicodeEncoding();

public static byte[] SerializeEvent(Event e, byte? header = null)
Expand Down Expand Up @@ -68,5 +75,31 @@ public static TrackedObject DeserializeTrackedObject(byte[] bytes)
result.SerializationCache = bytes;
return result;
}

public static byte[] SerializeSingletons(TrackedObject[] singletons)
{
var stream = new MemoryStream();
singletonsSerializer.WriteObject(stream, singletons);
return stream.ToArray();
}

public static TrackedObject[] DeserializeSingletons(Stream stream)
{
var result = (TrackedObject[])singletonsSerializer.ReadObject(stream);
return result;
}

public static MemoryStream SerializeCheckpointInfo(CheckpointInfo checkpointInfo)
{
var stream = new MemoryStream();
checkpointInfoSerializer.WriteObject(stream, checkpointInfo);
return stream;
}

public static CheckpointInfo DeserializeCheckpointInfo(Stream stream)
{
var result = (CheckpointInfo)checkpointInfoSerializer.ReadObject(stream);
return result;
}
}
}

0 comments on commit 2e118bc

Please sign in to comment.