Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to FASTER 1.9.6 #80

Merged
merged 2 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DurableTask.Netherite/DurableTask.Netherite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Microsoft.FASTER.Core" Version="1.8.0" />
<PackageReference Include="Microsoft.FASTER.Core" Version="1.9.6" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.6" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.*" PrivateAssets="All" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
SegmentSizeBits =
useSeparatePageBlobStorage ? 35 // 32 GB
: 32, // 4 GB
CopyReadsToTail = true,
CopyReadsToTail = CopyReadsToTail.FromReadOnly,
MemorySizeBits =
(numPartitions <= 1) ? 25 : // 32MB
(numPartitions <= 2) ? 24 : // 16MB
Expand Down Expand Up @@ -160,12 +160,22 @@ public static void CheckStorageFormat(string format, NetheriteOrchestrationServi

public void Dispose()
{
//TODO figure out what is supposed to go here
// we do not need to dispose any resources for the commit manager, because any such resources are deleted together with the taskhub
}

public void Purge(Guid token)
{
throw new NotImplementedException("Purges are handled directly on recovery, not via FASTER");
}

public void PurgeAll()
{
//TODO figure out what is supposed to go here
throw new NotImplementedException("Purges are handled directly on recovery, not via FASTER");
}

public void OnRecovery(Guid indexToken, Guid logToken)
{
// we handle cleanup of old checkpoints somewhere else
}

public CheckpointSettings StoreCheckpointSettings => new CheckpointSettings
Expand Down Expand Up @@ -808,11 +818,14 @@ void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMeta
void ICheckpointManager.CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
=> this.CommitLogCheckpoint(logToken, commitMetadata, InvalidPsfGroupOrdinal);

void ICheckpointManager.CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog)
=> this.CommitLogIncrementalCheckpoint(logToken, version, commitMetadata, deltaLog, InvalidPsfGroupOrdinal);

byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken)
=> this.GetIndexCheckpointMetadata(indexToken, InvalidPsfGroupOrdinal);

byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken)
=> this.GetLogCheckpointMetadata(logToken, InvalidPsfGroupOrdinal);
byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
=> this.GetLogCheckpointMetadata(logToken, InvalidPsfGroupOrdinal, deltaLog, scanDelta, recoverTo);

IDevice ICheckpointManager.GetIndexDevice(Guid indexToken)
=> this.GetIndexDevice(indexToken, InvalidPsfGroupOrdinal);
Expand All @@ -823,6 +836,9 @@ IDevice ICheckpointManager.GetSnapshotLogDevice(Guid token)
IDevice ICheckpointManager.GetSnapshotObjectLogDevice(Guid token)
=> this.GetSnapshotObjectLogDevice(token, InvalidPsfGroupOrdinal);

IDevice ICheckpointManager.GetDeltaLogDevice(Guid token)
=> this.GetDeltaLogDevice(token, InvalidPsfGroupOrdinal);

IEnumerable<Guid> ICheckpointManager.GetIndexCheckpointTokens()
{
var indexToken = this.CheckpointInfo.IndexToken;
Expand Down Expand Up @@ -949,6 +965,11 @@ internal void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata, int psfG
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.CommitLogCheckpoint Returned from {tag}, target={metaFileBlob.Name}");
}

internal void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog, int indexOrdinal)
{
throw new NotImplementedException("incremental checkpointing is not implemented");
}

internal byte[] GetIndexCheckpointMetadata(Guid indexToken, int psfGroupOrdinal)
{
var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal);
Expand Down Expand Up @@ -977,7 +998,7 @@ internal byte[] GetIndexCheckpointMetadata(Guid indexToken, int psfGroupOrdinal)
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexCommitMetadata Returned {result?.Length ?? null} bytes from {tag}, target={metaFileBlob.Name}");
return result;
}
internal byte[] GetLogCheckpointMetadata(Guid logToken, int psfGroupOrdinal)
internal byte[] GetLogCheckpointMetadata(Guid logToken, int psfGroupOrdinal, DeltaLog deltaLog, bool scanDelta, long recoverTo)
{
var (isPsf, tag) = this.IsPsfOrPrimary(psfGroupOrdinal);
this.StorageTracer?.FasterStorageProgress($"ICheckpointManager.GetIndexCommitMetadata Called on {tag}, logToken={logToken}");
Expand Down Expand Up @@ -1050,6 +1071,8 @@ internal IDevice GetSnapshotObjectLogDevice(Guid token, int psfGroupOrdinal)
return device;
}

internal IDevice GetDeltaLogDevice(Guid token, int indexOrdinal) => default; // TODO

#endregion

internal async Task FinalizeCheckpointCompletedAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ void ICheckpointManager.CommitLogCheckpoint(Guid logToken, byte[] commitMetadata
this.localCheckpointManager.CommitLogCheckpoint(logToken, commitMetadata);
this.checkpointInfo.LogToken = logToken;
}

void ICheckpointManager.CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog)
{
throw new NotImplementedException("incremental checkpointing is not implemented");
}

byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken)
=> this.localCheckpointManager.GetIndexCheckpointMetadata(indexToken);

byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken)
=> this.localCheckpointManager.GetLogCheckpointMetadata(logToken);
byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
=> this.localCheckpointManager.GetLogCheckpointMetadata(logToken, deltaLog, scanDelta, recoverTo);

IDevice ICheckpointManager.GetIndexDevice(Guid indexToken)
=> this.localCheckpointManager.GetIndexDevice(indexToken);
Expand All @@ -54,6 +60,9 @@ IDevice ICheckpointManager.GetSnapshotLogDevice(Guid token)
IDevice ICheckpointManager.GetSnapshotObjectLogDevice(Guid token)
=> this.localCheckpointManager.GetSnapshotObjectLogDevice(token);

IDevice ICheckpointManager.GetDeltaLogDevice(Guid token)
=> this.localCheckpointManager.GetDeltaLogDevice(token);

internal string GetLatestCheckpointJson() => File.ReadAllText(this.checkpointCompletedFilename);

IEnumerable<Guid> ICheckpointManager.GetIndexCheckpointTokens()
Expand All @@ -68,9 +77,15 @@ IEnumerable<Guid> ICheckpointManager.GetLogCheckpointTokens()
yield return logToken;
}

void ICheckpointManager.Purge(Guid guid)
=> this.localCheckpointManager.Purge(guid);

void ICheckpointManager.PurgeAll()
=> this.localCheckpointManager.PurgeAll();

public void OnRecovery(Guid indexToken, Guid logToken)
=> this.localCheckpointManager.OnRecovery(indexToken, logToken);

void IDisposable.Dispose()
=> this.localCheckpointManager.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMeta
void ICheckpointManager.CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
=> this.blobManager.CommitLogCheckpoint(logToken, commitMetadata, this.groupOrdinal);

void ICheckpointManager.CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog)
=> this.blobManager.CommitLogIncrementalCheckpoint(logToken, version, commitMetadata, deltaLog, this.groupOrdinal);

IDevice ICheckpointManager.GetIndexDevice(Guid indexToken)
=> this.blobManager.GetIndexDevice(indexToken, this.groupOrdinal);

Expand All @@ -39,11 +42,14 @@ IDevice ICheckpointManager.GetSnapshotLogDevice(Guid token)
IDevice ICheckpointManager.GetSnapshotObjectLogDevice(Guid token)
=> this.blobManager.GetSnapshotObjectLogDevice(token, this.groupOrdinal);

IDevice ICheckpointManager.GetDeltaLogDevice(Guid token)
=> this.blobManager.GetDeltaLogDevice(token, this.groupOrdinal);

byte[] ICheckpointManager.GetIndexCheckpointMetadata(Guid indexToken)
=> this.blobManager.GetIndexCheckpointMetadata(indexToken, this.groupOrdinal);

byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken)
=> this.blobManager.GetLogCheckpointMetadata(logToken, this.groupOrdinal);
byte[] ICheckpointManager.GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo)
=> this.blobManager.GetLogCheckpointMetadata(logToken, this.groupOrdinal, deltaLog, scanDelta, recoverTo);

IEnumerable<Guid> ICheckpointManager.GetIndexCheckpointTokens()
{
Expand All @@ -57,8 +63,13 @@ IEnumerable<Guid> ICheckpointManager.GetLogCheckpointTokens()
yield return logToken;
}

public void Purge(Guid guid) { /* TODO */ }

public void PurgeAll() { /* TODO */ }

public void OnRecovery(Guid indexToken, Guid logToken)
=> this.blobManager.OnRecovery(indexToken, logToken);

public void Dispose() { }
}
}
25 changes: 19 additions & 6 deletions src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,10 @@ IAsyncEnumerable<OrchestrationState> ScanOrchestrationStates(
void RunScan()
{
using var _ = EventTraceContext.MakeContext(0, queryId);
using var session = this.CreateASession();

// get the unique set of keys appearing in the log and emit them
using var iter1 = this.fht.Iterate();
using var iter1 = session.Iterate();

Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
Expand Down Expand Up @@ -683,7 +684,7 @@ public Functions(Partition partition, StoreStatistics stats)
this.stats = stats;
}

public void InitialUpdater(ref Key key, ref EffectTracker tracker, ref Value value)
public void InitialUpdater(ref Key key, ref EffectTracker tracker, ref Value value, ref TrackedObject output)
{
var trackedObject = TrackedObjectKey.Factory(key.Val);
this.stats.Create++;
Expand All @@ -693,7 +694,7 @@ public void InitialUpdater(ref Key key, ref EffectTracker tracker, ref Value val
this.stats.Modify++;
}

public bool InPlaceUpdater(ref Key key, ref EffectTracker tracker, ref Value value)
public bool InPlaceUpdater(ref Key key, ref EffectTracker tracker, ref Value value, ref TrackedObject output)
{
this.partition.Assert(value.Val is TrackedObject);
TrackedObject trackedObject = value;
Expand All @@ -704,9 +705,9 @@ public bool InPlaceUpdater(ref Key key, ref EffectTracker tracker, ref Value val
return true;
}

public bool NeedCopyUpdate(ref Key key, ref EffectTracker tracker, ref Value value) => true;
public bool NeedCopyUpdate(ref Key key, ref EffectTracker tracker, ref Value value, ref TrackedObject output) => true;

public void CopyUpdater(ref Key key, ref EffectTracker tracker, ref Value oldValue, ref Value newValue)
public void CopyUpdater(ref Key key, ref EffectTracker tracker, ref Value oldValue, ref Value newValue, ref TrackedObject output)
{
this.stats.Copy++;

Expand Down Expand Up @@ -805,9 +806,21 @@ public void ReadCompletionCallback(ref Key key, ref EffectTracker tracker, ref T
}

public void CheckpointCompletionCallback(string sessionId, CommitPoint commitPoint) { }
public void RMWCompletionCallback(ref Key key, ref EffectTracker input, object ctx, Status status) { }
public void RMWCompletionCallback(ref Key key, ref EffectTracker input, ref TrackedObject output, object ctx, Status status) { }
public void UpsertCompletionCallback(ref Key key, ref Value value, object ctx) { }
public void DeleteCompletionCallback(ref Key key, object ctx) { }

// We do not need to lock records, because writes and non-query reads are single-session, and query reads can only race on instance states which are immutable
public bool SupportsLocking => false;

public void Lock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, ref long lockContext)
{
}

public bool Unlock(ref RecordInfo recordInfo, ref Key key, ref Value value, LockType lockType, long lockContext)
{
return true;
}
}
}
}