diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs index 826253b6..f72daee6 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs @@ -14,30 +14,17 @@ namespace DurableTask.Netherite /// information about the context, and to enumerate the objects on which the effect /// is being processed. /// - class EffectTracker : List + abstract class EffectTracker : List { - readonly Func applyToStore; - readonly Func, ValueTask> removeFromStore; - readonly Func<(long, long)> getPositions; - readonly System.Diagnostics.Stopwatch stopWatch; readonly HashSet deletedKeys; - public EffectTracker(Partition partition, Func applyToStore, Func, ValueTask> removeFromStore, Func<(long, long)> getPositions) + PartitionUpdateEvent currentUpdate; + + public EffectTracker() { - this.Partition = partition; - this.applyToStore = applyToStore; - this.removeFromStore = removeFromStore; - this.getPositions = getPositions; - this.stopWatch = new System.Diagnostics.Stopwatch(); this.deletedKeys = new HashSet(); - this.stopWatch.Start(); } - /// - /// The current partition. - /// - public Partition Partition { get; } - /// /// The event id of the current effect. /// @@ -55,7 +42,30 @@ void SetCurrentUpdateEvent(PartitionUpdateEvent updateEvent) this.currentUpdate = updateEvent; } - PartitionUpdateEvent currentUpdate; + + #region abstract methods + + public abstract ValueTask ApplyToStore(TrackedObjectKey key, EffectTracker tracker); + + public abstract ValueTask RemoveFromStore(IEnumerable keys); + + public abstract (long, long) GetPositions(); + + public abstract Partition Partition { get; } + + public abstract EventTraceHelper EventTraceHelper { get; } + + public abstract EventTraceHelper EventDetailTracer { get; } + + protected abstract void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning); + + public abstract void Assert(bool condition); + + public abstract uint PartitionId { get; } + + public abstract double CurrentTimeMs { get; } + + #endregion /// /// Applies the event to the given tracked object, using visitor pattern to @@ -73,7 +83,7 @@ public void ProcessEffectOn(TrackedObject trackedObject) { // for robustness, we swallow exceptions inside event processing. // It does not mean they are not serious. We still report them as errors. - this.Partition.ErrorHandler.HandleError(nameof(ProcessUpdate), $"Encountered exception on {trackedObject} when applying update event {this.currentUpdate}, eventId={this.currentUpdate?.EventId}", exception, false, false); + this.HandleError(nameof(ProcessUpdate), $"Encountered exception on {trackedObject} when applying update event {this.currentUpdate}, eventId={this.currentUpdate?.EventId}", exception, false, false); } } @@ -84,16 +94,16 @@ public void AddDeletion(TrackedObjectKey key) public async Task ProcessUpdate(PartitionUpdateEvent updateEvent) { - (long commitLogPosition, long inputQueuePosition) = this.getPositions(); - double startedTimestamp = this.Partition.CurrentTimeMs; + (long commitLogPosition, long inputQueuePosition) = this.GetPositions(); + double startedTimestamp = this.CurrentTimeMs; using (EventTraceContext.MakeContext(commitLogPosition, updateEvent.EventIdString)) { try { - this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, this.IsReplaying); + this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, this.IsReplaying); - this.Partition.Assert(updateEvent != null); + this.Assert(updateEvent != null); this.SetCurrentUpdateEvent(updateEvent); @@ -111,10 +121,10 @@ async ValueTask ProcessRecursively() var startPos = this.Count - 1; var key = this[startPos]; - this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Process on [{key}]"); + this.EventDetailTracer?.TraceEventProcessingDetail($"Process on [{key}]"); // start with processing the event on this object - await this.applyToStore(key, this).ConfigureAwait(false); + await this.ApplyToStore(key, this).ConfigureAwait(false); // recursively process all additional objects to process while (this.Count - 1 > startPos) @@ -128,7 +138,7 @@ async ValueTask ProcessRecursively() if (this.deletedKeys.Count > 0) { - await this.removeFromStore(this.deletedKeys); + await this.RemoveFromStore(this.deletedKeys); this.deletedKeys.Clear(); } @@ -142,21 +152,21 @@ async ValueTask ProcessRecursively() { // for robustness, we swallow exceptions inside event processing. // It does not mean they are not serious. We still report them as errors. - this.Partition.ErrorHandler.HandleError(nameof(ProcessUpdate), $"Encountered exception while processing update event {updateEvent}", exception, false, false); + this.HandleError(nameof(ProcessUpdate), $"Encountered exception while processing update event {updateEvent}", exception, false, false); } finally { - double finishedTimestamp = this.Partition.CurrentTimeMs; - this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, startedTimestamp, finishedTimestamp, this.IsReplaying); + double finishedTimestamp = this.CurrentTimeMs; + this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, startedTimestamp, finishedTimestamp, this.IsReplaying); } } } public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key, TrackedObject target) { - (long commitLogPosition, long inputQueuePosition) = this.getPositions(); - this.Partition.Assert(!this.IsReplaying); // read events are never part of the replay - double startedTimestamp = this.Partition.CurrentTimeMs; + (long commitLogPosition, long inputQueuePosition) = this.GetPositions(); + this.Assert(!this.IsReplaying); // read events are never part of the replay + double startedTimestamp = this.CurrentTimeMs; using (EventTraceContext.MakeContext(commitLogPosition, readEvent.EventIdString)) { @@ -171,7 +181,7 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key InstanceState instanceState = (InstanceState)target; string instanceExecutionId = instanceState?.OrchestrationState?.OrchestrationInstance.ExecutionId; string status = instanceState?.OrchestrationState?.OrchestrationStatus.ToString() ?? "null"; - this.Partition.EventTraceHelper.TraceFetchedInstanceStatus(readEvent, key.InstanceId, instanceExecutionId, status, startedTimestamp - readEvent.IssuedTimestamp); + this.EventTraceHelper?.TraceFetchedInstanceStatus(readEvent, key.InstanceId, instanceExecutionId, status, startedTimestamp - readEvent.IssuedTimestamp); break; case TrackedObjectKey.TrackedObjectType.History: @@ -179,7 +189,7 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key string historyExecutionId = historyState?.ExecutionId; int eventCount = historyState?.History?.Count ?? 0; int episode = historyState?.Episode ?? 0; - this.Partition.EventTraceHelper.TraceFetchedInstanceHistory(readEvent, key.InstanceId, historyExecutionId, eventCount, episode, startedTimestamp - readEvent.IssuedTimestamp); + this.EventTraceHelper?.TraceFetchedInstanceHistory(readEvent, key.InstanceId, historyExecutionId, eventCount, episode, startedTimestamp - readEvent.IssuedTimestamp); break; default: @@ -188,7 +198,7 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key if (isReady) { - this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, false); + this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, false); readEvent.Fire(this.Partition); } } @@ -200,27 +210,27 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key { // for robustness, we swallow exceptions inside event processing. // It does not mean they are not serious. We still report them as errors. - this.Partition.ErrorHandler.HandleError(nameof(ProcessReadResult), $"Encountered exception while processing read event {readEvent}", exception, false, false); + this.HandleError(nameof(ProcessReadResult), $"Encountered exception while processing read event {readEvent}", exception, false, false); } finally { - double finishedTimestamp = this.Partition.CurrentTimeMs; - this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, startedTimestamp, finishedTimestamp, false); + double finishedTimestamp = this.CurrentTimeMs; + this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, startedTimestamp, finishedTimestamp, false); } } } public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable instances) { - (long commitLogPosition, long inputQueuePosition) = this.getPositions(); - this.Partition.Assert(!this.IsReplaying); // query events are never part of the replay - double startedTimestamp = this.Partition.CurrentTimeMs; + (long commitLogPosition, long inputQueuePosition) = this.GetPositions(); + this.Assert(!this.IsReplaying); // query events are never part of the replay + double startedTimestamp = this.CurrentTimeMs; using (EventTraceContext.MakeContext(commitLogPosition, queryEvent.EventIdString)) { try { - this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, false); + this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, false); await queryEvent.OnQueryCompleteAsync(instances, this.Partition); } catch (OperationCanceledException) @@ -231,12 +241,12 @@ public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsync { // for robustness, we swallow exceptions inside event processing. // It does not mean they are not serious. We still report them as errors. - this.Partition.ErrorHandler.HandleError(nameof(ProcessQueryResultAsync), $"Encountered exception while processing query event {queryEvent}", exception, false, false); + this.HandleError(nameof(ProcessQueryResultAsync), $"Encountered exception while processing query event {queryEvent}", exception, false, false); } finally { - double finishedTimestamp = this.Partition.CurrentTimeMs; - this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, startedTimestamp, finishedTimestamp, false); + double finishedTimestamp = this.CurrentTimeMs; + this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, startedTimestamp, finishedTimestamp, false); } } } diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/PartitionEffectTracker.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/PartitionEffectTracker.cs new file mode 100644 index 00000000..9dba075d --- /dev/null +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/PartitionEffectTracker.cs @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using DurableTask.Core; + using DurableTask.Core.Common; + + /// + /// Is used while applying an effect to a partition state, to carry + /// information about the context, and to enumerate the objects on which the effect + /// is being processed. + /// + abstract class PartitionEffectTracker : EffectTracker + { + readonly Partition partition; + + public PartitionEffectTracker(Partition partition) + { + this.partition = partition; + if (partition == null) + { + throw new ArgumentNullException(nameof(partition)); + } + } + + public override Partition Partition => this.partition; + + public override EventTraceHelper EventTraceHelper + => this.Partition.EventTraceHelper; + + public override EventTraceHelper EventDetailTracer + => this.Partition.EventDetailTracer; + + protected override void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning) + => this.Partition.ErrorHandler.HandleError(where, message, e, terminatePartition, reportAsWarning); + + public override void Assert(bool condition) + => this.Partition.Assert(condition); + + public override uint PartitionId + => this.Partition.PartitionId; + + public override double CurrentTimeMs + => this.Partition.CurrentTimeMs; + } +} diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs index 2f9c3b6b..f816df30 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs @@ -68,7 +68,7 @@ public virtual void OnFirstInitialization() /// Is automatically called on all singleton objects after recovery. Typically used to /// restart pending activities, timers, tasks and the like. /// - public virtual void OnRecoveryCompleted() + public virtual void OnRecoveryCompleted(EffectTracker effects) { } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs index 3278ed9d..c0b9ec2c 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs @@ -15,9 +15,6 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch [DataMember] public OrchestrationStatus[] DedupeStatuses { get; set; } - [DataMember] - public DateTime Timestamp { get; set; } - [DataMember] public TaskMessage TaskMessage { get; set; } @@ -42,18 +39,51 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch [IgnoreDataMember] public CreationResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState + [IgnoreDataMember] + public DateTime CreationTimestamp { get; set; } + public override bool OnReadComplete(TrackedObject target, Partition partition) { - // Use this moment of time as the creation timestamp, replacing the original timestamp taken on the client. + // Use this moment of time as the creation timestamp, which is going to replace the original timestamp taken on the client. // This is preferrable because it avoids clock synchronization issues (which can result in negative orchestration durations) // and means the timestamp is consistently ordered with respect to timestamps of other events on this partition. - DateTime creationTimestamp = DateTime.UtcNow; - - this.ExecutionStartedEvent.Timestamp = creationTimestamp; + this.CreationTimestamp = DateTime.UtcNow; return true; } + // make a copy of an event so we run it through the pipeline a second time + public override PartitionEvent Clone() + { + var evt = (CreationRequestReceived)base.Clone(); + + // make a copy of the execution started event in order to modify the creation timestamp + + var ee = this.ExecutionStartedEvent; + var tm = this.TaskMessage; + + evt.TaskMessage = new TaskMessage() + { + Event = new ExecutionStartedEvent(ee.EventId, ee.Input) + { + ParentInstance = ee.ParentInstance, + Name = ee.Name, + Version = ee.Version, + Tags = ee.Tags, + ScheduledStartTime = ee.ScheduledStartTime, + Correlation = ee.Correlation, + ExtensionData = ee.ExtensionData, + OrchestrationInstance = ee.OrchestrationInstance, + Timestamp = this.CreationTimestamp, + }, + SequenceNumber = tm.SequenceNumber, + OrchestrationInstance = tm.OrchestrationInstance, + ExtensionData = tm.ExtensionData, + }; + + return evt; + } + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) { trackedObject.Process(this, effects); diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs index e62b538e..267a6aa3 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs @@ -40,11 +40,11 @@ protected override void ExtraTraceInformation(StringBuilder s) s.Append(this.InstanceId); } - public static bool SatisfiesWaitCondition(OrchestrationState value) - => (value != null && - value.OrchestrationStatus != OrchestrationStatus.Running && - value.OrchestrationStatus != OrchestrationStatus.Pending && - value.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew); + public static bool SatisfiesWaitCondition(OrchestrationStatus? value) + => (value.HasValue && + value.Value != OrchestrationStatus.Running && + value.Value != OrchestrationStatus.Pending && + value.Value != OrchestrationStatus.ContinuedAsNew); public WaitResponseReceived CreateResponse(OrchestrationState value) => new WaitResponseReceived() diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs index 7e1ec0ed..67f26d65 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs @@ -43,9 +43,6 @@ class BatchProcessed : PartitionUpdateEvent, IRequiresPrefetch [DataMember] public OrchestrationStatus OrchestrationStatus { get; set; } - [IgnoreDataMember] - public OrchestrationState State { get; set; } - [DataMember] public List ActivityMessages { get; set; } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/PartitionEvent.cs b/src/DurableTask.Netherite/Events/PartitionEvents/PartitionEvent.cs index cf8e7450..9abc7fbe 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/PartitionEvent.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/PartitionEvent.cs @@ -40,7 +40,7 @@ abstract class PartitionEvent : Event public virtual void OnSubmit(Partition partition) { } // make a copy of an event so we run it through the pipeline a second time - public PartitionEvent Clone() + public virtual PartitionEvent Clone() { var evt = (PartitionEvent)this.MemberwiseClone(); diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs index 6dac3e7e..a10cb7b1 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Client.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs @@ -324,7 +324,6 @@ public async Task CreateTaskOrchestrationAsync(uint partitionId, TaskMessage cre RequestId = Interlocked.Increment(ref this.SequenceNumber), TaskMessage = creationMessage, DedupeStatuses = dedupeStatuses, - Timestamp = DateTime.UtcNow, TimeoutUtc = this.GetTimeoutBucket(DefaultTimeout), }; diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index afe7a266..3af537e1 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -728,7 +728,6 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync( PersistFirst = partition.Settings.PersistStepsFirst ? BatchProcessed.PersistFirstStatus.Required : BatchProcessed.PersistFirstStatus.NotRequired, OrchestrationStatus = state.OrchestrationStatus, ExecutionId = state.OrchestrationInstance.ExecutionId, - State = state, ActivityMessages = (List)activityMessages, LocalMessages = localMessages, RemoteMessages = remoteMessages, @@ -751,7 +750,7 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync( latencyMs, sequenceNumber); - partition.SubmitEvent(batchProcessedEvent); + partition.SubmitEvent(batchProcessedEvent); if (this.workItemTraceHelper.TraceTaskMessages) { diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index c708e9b0..cc1bea22 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -181,10 +181,10 @@ public class NetheriteOrchestrationServiceSettings internal bool UseSeparatePageBlobStorage => !string.IsNullOrEmpty(this.ResolvedPageBlobStorageConnectionString); /// - /// Whether to attach a debugger for cache transitions. Used only for testing and debugging. + /// Allows attaching additional checkers and debuggers during testing. /// [JsonIgnore] - public Faster.CacheDebugger CacheDebugger { get; set; } = null; + public TestHooks TestHooks { get; set; } = null; /// /// A lower limit on the severity level of trace events emitted by the transport layer. diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs index 5fa2d40d..93099cca 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs @@ -83,7 +83,7 @@ public Partition( this.WorkItemTraceHelper = workItemTraceHelper; this.stopwatch.Start(); this.LastTransition = this.CurrentTimeMs; - this.CacheDebugger = this.Settings.CacheDebugger; + this.CacheDebugger = this.Settings.TestHooks?.CacheDebugger; } public async Task CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, long firstInputQueuePosition) diff --git a/src/DurableTask.Netherite/OrchestrationService/TestHooks.cs b/src/DurableTask.Netherite/OrchestrationService/TestHooks.cs new file mode 100644 index 00000000..0687851a --- /dev/null +++ b/src/DurableTask.Netherite/OrchestrationService/TestHooks.cs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite +{ + using System; + + /// + /// Hooks for attaching additional checkers and debuggers during testing. + /// + public class TestHooks + { + internal Faster.CacheDebugger CacheDebugger { get; set; } = null; + + internal Faster.ReplayChecker ReplayChecker { get; set; } = null; + + internal event Action OnError; + + internal void Error(string source, string message) + { + if (System.Diagnostics.Debugger.IsAttached) + { + System.Diagnostics.Debugger.Break(); + } + this.OnError($"TestHook-{source} !!! {message}"); + } + } +} diff --git a/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs b/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs index 269114f9..9a774115 100644 --- a/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs +++ b/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs @@ -59,7 +59,7 @@ public override void OnFirstInitialization() const double SMOOTHING_FACTOR = 0.1; - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(EffectTracker effects) { // reschedule work items foreach (var pending in this.Pending) @@ -251,7 +251,7 @@ public override void Process(ActivityCompleted evt, EffectTracker effects) this.AverageActivityCompletionTime = !this.AverageActivityCompletionTime.HasValue ? evt.LatencyMs : SMOOTHING_FACTOR * evt.LatencyMs + (1 - SMOOTHING_FACTOR) * this.AverageActivityCompletionTime.Value; - if (evt.OriginPartitionId == effects.Partition.PartitionId) + if (evt.OriginPartitionId == effects.PartitionId) { // the response can be delivered to a session on this partition effects.Add(TrackedObjectKey.Sessions); @@ -305,7 +305,7 @@ public override void Process(TransferCommandReceived evt, EffectTracker effects) this.TransferCommandsReceived[evt.PartitionId] = evt.Timestamp; } - this.Partition.EventTraceHelper.TraceEventProcessingDetail($"Processed OffloadCommand, " + + effects.EventTraceHelper?.TraceEventProcessingDetail($"Processed OffloadCommand, " + $"OffloadDestination={evt.TransferDestination}, NumActivitiesSent={evt.TransferredActivities.Count}"); effects.Add(TrackedObjectKey.Outbox); } @@ -362,7 +362,7 @@ public override void Process(OffloadDecision offloadDecisionEvent, EffectTracker if (!effects.IsReplaying) { - this.Partition.EventTraceHelper.TracePartitionOffloadDecision(this.EstimatedWorkItemQueueSize, this.Pending.Count, this.LocalBacklog.Count, -1, offloadDecisionEvent); + effects.EventTraceHelper?.TracePartitionOffloadDecision(this.EstimatedWorkItemQueueSize, this.Pending.Count, this.LocalBacklog.Count, -1, offloadDecisionEvent); if (this.LocalBacklog.Count > 0) { diff --git a/src/DurableTask.Netherite/PartitionState/HistoryState.cs b/src/DurableTask.Netherite/PartitionState/HistoryState.cs index 03e5185f..d8aea410 100644 --- a/src/DurableTask.Netherite/PartitionState/HistoryState.cs +++ b/src/DurableTask.Netherite/PartitionState/HistoryState.cs @@ -73,7 +73,7 @@ public override void Process(BatchProcessed evt, EffectTracker effects) if (!effects.IsReplaying) { - this.Partition.EventTraceHelper.TraceInstanceUpdate( + effects.EventTraceHelper?.TraceInstanceUpdate( evt.EventIdString, evt.InstanceId, evt.ExecutionId, @@ -88,7 +88,7 @@ public override void Process(BatchProcessed evt, EffectTracker effects) if (this.CachedOrchestrationWorkItem != null && this.CachedOrchestrationWorkItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId != evt.ExecutionId) { - effects.Partition.EventTraceHelper.TraceEventProcessingWarning($"Dropping bad workitem cache instance={this.InstanceId} expected_executionid={evt.ExecutionId} actual_executionid={this.CachedOrchestrationWorkItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId}"); + effects.EventTraceHelper?.TraceEventProcessingWarning($"Dropping bad workitem cache instance={this.InstanceId} expected_executionid={evt.ExecutionId} actual_executionid={this.CachedOrchestrationWorkItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId}"); this.CachedOrchestrationWorkItem = null; } } diff --git a/src/DurableTask.Netherite/PartitionState/InstanceState.cs b/src/DurableTask.Netherite/PartitionState/InstanceState.cs index 07a8b014..0c5679b2 100644 --- a/src/DurableTask.Netherite/PartitionState/InstanceState.cs +++ b/src/DurableTask.Netherite/PartitionState/InstanceState.cs @@ -102,47 +102,11 @@ public override void Process(BatchProcessed evt, EffectTracker effects) } this.OrchestrationState.LastUpdatedTime = evt.Timestamp; - - if (evt.State != null) - { - this.Partition.Assert( - ( - evt.State.Version, - evt.State.Status, - evt.State.Output, - evt.State.Name, - evt.State.Input, - evt.State.OrchestrationInstance.InstanceId, - evt.State.OrchestrationInstance.ExecutionId, - evt.State.CompletedTime, - evt.State.OrchestrationStatus, - evt.State.LastUpdatedTime, - evt.State.CreatedTime, - evt.State.ScheduledStartTime, - evt.State.OrchestrationInstance.ExecutionId - ) - == - ( - this.OrchestrationState.Version, - this.OrchestrationState.Status, - this.OrchestrationState.Output, - this.OrchestrationState.Name, - this.OrchestrationState.Input, - this.OrchestrationState.OrchestrationInstance.InstanceId, - this.OrchestrationState.OrchestrationInstance.ExecutionId, - this.OrchestrationState.CompletedTime, - this.OrchestrationState.OrchestrationStatus, - this.OrchestrationState.LastUpdatedTime, - this.OrchestrationState.CreatedTime, - this.OrchestrationState.ScheduledStartTime, - evt.ExecutionId - )); - } // if the orchestration is complete, notify clients that are waiting for it if (this.Waiters != null) { - if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState)) + if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState?.OrchestrationStatus)) { // we do not need effects.Add(TrackedObjectKey.Outbox) because it has already been added by SessionsState evt.ResponsesToSend = this.Waiters.Select(request => request.CreateResponse(this.OrchestrationState)).ToList(); @@ -190,7 +154,7 @@ static OrchestrationState UpdateOrchestrationState(OrchestrationState orchestrat public override void Process(WaitRequestReceived evt, EffectTracker effects) { - if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState)) + if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState?.OrchestrationStatus)) { effects.Add(TrackedObjectKey.Outbox); evt.ResponseToSend = evt.CreateResponse(this.OrchestrationState); diff --git a/src/DurableTask.Netherite/PartitionState/OutboxState.cs b/src/DurableTask.Netherite/PartitionState/OutboxState.cs index 2d459654..1d5fd7b9 100644 --- a/src/DurableTask.Netherite/PartitionState/OutboxState.cs +++ b/src/DurableTask.Netherite/PartitionState/OutboxState.cs @@ -25,7 +25,7 @@ class OutboxState : TrackedObject, TransportAbstraction.IDurabilityListener public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Outbox); - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(EffectTracker effects) { // resend all pending foreach (var kvp in this.Outbox) @@ -35,7 +35,7 @@ public override void OnRecoveryCompleted() kvp.Value.Partition = this.Partition; // resend (anything we have recovered is of course persisted) - this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Resent {kvp.Key:D10} ({kvp.Value} messages)"); + effects.EventDetailTracer?.TraceEventProcessingDetail($"Resent {kvp.Key:D10} ({kvp.Value} messages)"); this.Send(kvp.Value); } } @@ -59,6 +59,12 @@ void SendBatchOnceEventIsPersisted(PartitionUpdateEvent evt, EffectTracker effec batch.Position = commitPosition; batch.Partition = this.Partition; + foreach (var partitionMessageEvent in batch.OutgoingMessages) + { + partitionMessageEvent.OriginPartition = this.Partition.PartitionId; + partitionMessageEvent.OriginPosition = commitPosition; + } + if (!effects.IsReplaying) { if (evt is BatchProcessed batchProcessedEvt @@ -90,8 +96,6 @@ void Send(Batch batch) foreach (var outmessage in batch.OutgoingMessages) { DurabilityListeners.Register(outmessage, batch); - outmessage.OriginPartition = this.Partition.PartitionId; - outmessage.OriginPosition = batch.Position; this.Partition.Send(outmessage); } foreach (var outresponse in batch.OutgoingResponses) @@ -153,9 +157,9 @@ public void ConfirmDurable(Event evt) } } - public override void Process(SendConfirmed evt, EffectTracker _) + public override void Process(SendConfirmed evt, EffectTracker effects) { - this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Store has sent all outbound messages by event {evt} id={evt.EventIdString}"); + effects.EventDetailTracer?.TraceEventProcessingDetail($"Store has sent all outbound messages by event {evt} id={evt.EventIdString}"); // we no longer need to keep these events around this.Outbox.Remove(evt.Position); diff --git a/src/DurableTask.Netherite/PartitionState/PrefetchState.cs b/src/DurableTask.Netherite/PartitionState/PrefetchState.cs index 22441d8b..a0708f87 100644 --- a/src/DurableTask.Netherite/PartitionState/PrefetchState.cs +++ b/src/DurableTask.Netherite/PartitionState/PrefetchState.cs @@ -22,7 +22,7 @@ class PrefetchState : TrackedObject [IgnoreDataMember] public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Prefetch); - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(EffectTracker effects) { // reissue prefetch tasks for what did not complete prior to crash/recovery foreach (var kvp in this.PendingPrefetches) diff --git a/src/DurableTask.Netherite/PartitionState/QueriesState.cs b/src/DurableTask.Netherite/PartitionState/QueriesState.cs index f595cb62..6c10d70a 100644 --- a/src/DurableTask.Netherite/PartitionState/QueriesState.cs +++ b/src/DurableTask.Netherite/PartitionState/QueriesState.cs @@ -22,7 +22,7 @@ class QueriesState : TrackedObject [IgnoreDataMember] public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Queries); - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(EffectTracker effects) { // reissue queries that did not complete prior to crash/recovery foreach (var kvp in this.PendingQueries) diff --git a/src/DurableTask.Netherite/PartitionState/ReassemblyState.cs b/src/DurableTask.Netherite/PartitionState/ReassemblyState.cs index c609f9f3..89ac8d81 100644 --- a/src/DurableTask.Netherite/PartitionState/ReassemblyState.cs +++ b/src/DurableTask.Netherite/PartitionState/ReassemblyState.cs @@ -29,7 +29,7 @@ public override void Process(PartitionEventFragment evt, EffectTracker effects) { evt.ReassembledEvent = FragmentationAndReassembly.Reassemble(this.Fragments[originalEventString], evt); - this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Reassembled {evt.ReassembledEvent}"); + effects.EventDetailTracer?.TraceEventProcessingDetail($"Reassembled {evt.ReassembledEvent}"); this.Fragments.Remove(originalEventString); diff --git a/src/DurableTask.Netherite/PartitionState/SessionsState.cs b/src/DurableTask.Netherite/PartitionState/SessionsState.cs index e0cb4758..5d1690a3 100644 --- a/src/DurableTask.Netherite/PartitionState/SessionsState.cs +++ b/src/DurableTask.Netherite/PartitionState/SessionsState.cs @@ -49,7 +49,7 @@ internal class Session public static string GetWorkItemId(uint partition, long session, long position) => $"{partition:D2}S{session}P{position}"; - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(EffectTracker effects) { // start work items for all sessions foreach (var kvp in this.Sessions) @@ -327,7 +327,7 @@ public override void Process(BatchProcessed evt, EffectTracker effects) effects.Add(TrackedObjectKey.Timers); } - if (evt.RemoteMessages?.Count > 0 || WaitRequestReceived.SatisfiesWaitCondition(evt.State)) + if (evt.RemoteMessages?.Count > 0 || WaitRequestReceived.SatisfiesWaitCondition(evt.OrchestrationStatus)) { effects.Add(TrackedObjectKey.Outbox); } @@ -346,16 +346,16 @@ public override void Process(BatchProcessed evt, EffectTracker effects) } // remove processed messages from this batch - effects.Partition.Assert(session != null); - effects.Partition.Assert(session.SessionId == evt.SessionId); - effects.Partition.Assert(session.BatchStartPosition == evt.BatchStartPosition); + effects.Assert(session != null); + effects.Assert(session.SessionId == evt.SessionId); + effects.Assert(session.BatchStartPosition == evt.BatchStartPosition); session.Batch.RemoveRange(0, evt.BatchLength); session.BatchStartPosition += evt.BatchLength; this.StartNewBatchIfNeeded(session, effects, evt.InstanceId, effects.IsReplaying); } - void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instanceId, bool inRecovery) + void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instanceId, bool isReplaying) { if (session.Batch.Count == 0) { @@ -364,7 +364,7 @@ void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instan } else { - if (!inRecovery) // we don't start work items until end of recovery + if (!isReplaying) // we don't start work items until end of recovery { // there are more messages. Start another work item. new OrchestrationMessageBatch(instanceId, session, this.Partition); diff --git a/src/DurableTask.Netherite/PartitionState/TimersState.cs b/src/DurableTask.Netherite/PartitionState/TimersState.cs index c1fb3a97..4f3dcddb 100644 --- a/src/DurableTask.Netherite/PartitionState/TimersState.cs +++ b/src/DurableTask.Netherite/PartitionState/TimersState.cs @@ -30,12 +30,12 @@ public override string ToString() return $"Timers ({this.PendingTimers.Count} pending) next={this.SequenceNumber:D6}"; } - public override void OnRecoveryCompleted() + public override void OnRecoveryCompleted(EffectTracker effects) { // restore the pending timers foreach (var kvp in this.PendingTimers) { - this.Schedule(kvp.Key, kvp.Value.due, kvp.Value.message, kvp.Value.workItemId); + this.Schedule(kvp.Key, kvp.Value.due, kvp.Value.message, kvp.Value.workItemId, effects); } } @@ -53,7 +53,7 @@ public override void UpdateLoadInfo(PartitionLoadInfo info) } } - void Schedule(long timerId, DateTime due, TaskMessage message, string originWorkItemId) + void Schedule(long timerId, DateTime due, TaskMessage message, string originWorkItemId, EffectTracker effects) { TimerFired expirationEvent = new TimerFired() { @@ -64,20 +64,20 @@ void Schedule(long timerId, DateTime due, TaskMessage message, string originWork Due = due, }; - this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Scheduled {message} due at {expirationEvent.Due:o}, id={expirationEvent.EventIdString}"); + effects.EventDetailTracer?.TraceEventProcessingDetail($"Scheduled {message} due at {expirationEvent.Due:o}, id={expirationEvent.EventIdString}"); this.Partition.PendingTimers.Schedule(expirationEvent.Due, expirationEvent); } - void AddTimer(TaskMessage taskMessage, string originWorkItemId, bool isReplaying) + void AddTimer(TaskMessage taskMessage, string originWorkItemId, EffectTracker effects) { var timerId = this.SequenceNumber++; var due = GetDueTime(taskMessage); this.PendingTimers.Add(timerId, (due, taskMessage, originWorkItemId)); - if (!isReplaying) + if (!effects.IsReplaying) { this.Partition.WorkItemTraceHelper.TraceTaskMessageReceived(this.Partition.PartitionId, taskMessage, originWorkItemId, $"Timer@{due}"); - this.Schedule(timerId, due, taskMessage, originWorkItemId); + this.Schedule(timerId, due, taskMessage, originWorkItemId, effects); } } @@ -112,7 +112,7 @@ public override void Process(BatchProcessed evt, EffectTracker effects) // starts new timers as specified by the batch foreach (var taskMessage in evt.TimerMessages) { - this.AddTimer(taskMessage, evt.EventIdString, effects.IsReplaying); + this.AddTimer(taskMessage, evt.EventIdString, effects); } } @@ -121,13 +121,13 @@ public override void Process(TaskMessagesReceived evt, EffectTracker effects) // starts new timers as specified by the batch foreach (var taskMessage in evt.DelayedTaskMessages) { - this.AddTimer(taskMessage, evt.WorkItemId, effects.IsReplaying); + this.AddTimer(taskMessage, evt.WorkItemId, effects); } } public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) { - this.AddTimer(creationRequestReceived.TaskMessage, creationRequestReceived.EventIdString, effects.IsReplaying); + this.AddTimer(creationRequestReceived.TaskMessage, creationRequestReceived.EventIdString, effects); } } } diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/CacheDebugger.cs b/src/DurableTask.Netherite/StorageProviders/Faster/CacheDebugger.cs index 78aec376..851801bc 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/CacheDebugger.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/CacheDebugger.cs @@ -14,10 +14,16 @@ namespace DurableTask.Netherite.Faster /// /// Records cache and storage management traces for each object. This class is only used for testing and debugging, as it creates lots of overhead. /// - public class CacheDebugger + class CacheDebugger { + readonly TestHooks testHooks; readonly ConcurrentDictionary Objects = new ConcurrentDictionary(); + public CacheDebugger(TestHooks testHooks) + { + this.testHooks = testHooks; + } + public enum CacheEvent { // reads and RMWs on the main session @@ -67,9 +73,6 @@ public override string ToString() public string PrintCacheEvents() => string.Join(",", this.CacheEvents.Select(e => e.ToString())); } - - public event Action OnError; - public Task CreateTimer(TimeSpan timeSpan) { @@ -81,7 +84,7 @@ public async ValueTask CheckTiming(Task waitingFor, Task timer, string message) var first = await Task.WhenAny(waitingFor, timer); if (first == timer) { - this.OnError($"timeout: {message}"); + this.testHooks.Error(this.GetType().Name, $"timeout: {message}"); } } @@ -138,25 +141,15 @@ internal void Record(TrackedObjectKey key, CacheEvent evt, int? version, string internal void Fail(string message) { - if (System.Diagnostics.Debugger.IsAttached) - { - System.Diagnostics.Debugger.Break(); - } - - this.OnError(message); + this.testHooks.Error(this.GetType().Name, message); } internal void Fail(string message, TrackedObjectKey key) { this.Record(key, CacheEvent.Fail, null, null); - if (System.Diagnostics.Debugger.IsAttached) - { - System.Diagnostics.Debugger.Break(); - } - var objectInfo = this.Objects[key]; - this.OnError($"{message} cacheEvents={objectInfo.PrintCacheEvents()}"); + this.testHooks.Error(this.GetType().Name, $"{message} cacheEvents={objectInfo.PrintCacheEvents()}"); } internal void ValidateObjectVersion(FasterKV.Value val, TrackedObjectKey key) diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/FasterAlt.cs b/src/DurableTask.Netherite/StorageProviders/Faster/FasterAlt.cs index 83369692..fce5a478 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/FasterAlt.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/FasterAlt.cs @@ -373,6 +373,13 @@ public override ValueTask RemoveKeys(IEnumerable keys) throw new NotImplementedException(); } + public override void EmitCurrentState(Action emitItem) + { + // TODO + throw new NotImplementedException(); + } + + #region storage access operation CloudBlockBlob GetBlob(TrackedObjectKey key) diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs index 5627392c..065f84f4 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/FasterKV.cs @@ -8,6 +8,7 @@ namespace DurableTask.Netherite.Faster using System.Diagnostics; using System.IO; using System.Linq; + using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -15,6 +16,7 @@ namespace DurableTask.Netherite.Faster using DurableTask.Core.Common; using DurableTask.Core.Tracing; using FASTER.core; + using Newtonsoft.Json; class FasterKV : TrackedObjectStore { @@ -624,24 +626,34 @@ void ReportProgress() } } - //private async Task DumpCurrentState(EffectTracker effectTracker) // TODO unused - //{ - // try - // { - // var stringBuilder = new StringBuilder(); - // await foreach (var trackedObject in EnumerateAllTrackedObjects(effectTracker).OrderBy(obj => obj.Key, new TrackedObjectKey.Comparer())) - // { - // stringBuilder.Append(trackedObject.ToString()); - // stringBuilder.AppendLine(); - // } - // return stringBuilder.ToString(); - // } - // catch (Exception exception) - // when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception)) - // { - // throw new OperationCanceledException("Partition was terminated.", exception, this.terminationToken); - // } - //} + public override void EmitCurrentState(Action emitItem) + { + try + { + var stringBuilder = new StringBuilder(); + + // iterate singletons + foreach(var key in TrackedObjectKey.GetSingletons()) + { + var singleton = this.singletons[(int)key.ObjectType]; + emitItem(key, singleton); + } + + // iterate histories + using (var iter1 = this.mainSession.Iterate()) + { + while (iter1.GetNext(out RecordInfo recordInfo, out var key, out var value) && !recordInfo.Tombstone) + { + emitItem(key, value); + } + } + } + catch (Exception exception) + when (this.terminationToken.IsCancellationRequested && !Utils.IsFatal(exception)) + { + throw new OperationCanceledException("Partition was terminated.", exception, this.terminationToken); + } + } class EvictionObserver : IObserver> { diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/ReplayChecker.cs b/src/DurableTask.Netherite/StorageProviders/Faster/ReplayChecker.cs new file mode 100644 index 00000000..b1cd524e --- /dev/null +++ b/src/DurableTask.Netherite/StorageProviders/Faster/ReplayChecker.cs @@ -0,0 +1,287 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite.Faster +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using FASTER.core; + using Newtonsoft.Json; + using Newtonsoft.Json.Linq; + using Newtonsoft.Json.Serialization; + + /// + /// Validates the replay, by maintaining an ongoing checkpoint and confirming the commutative diagram + /// serialize(new-state) = serialize(deserialize(old-state) + event) + /// This class is only used for testing and debugging, as it creates lots of overhead. + /// + class ReplayChecker + { + readonly ConcurrentDictionary partitionInfo; + readonly TestHooks testHooks; + + public ReplayChecker(TestHooks testHooks) + { + this.testHooks = testHooks; + this.partitionInfo = new ConcurrentDictionary(); + } + + class Info + { + public Partition Partition; + public Dictionary Store; + public long CommitLogPosition; + public long InputQueuePosition; + public EffectTracker EffectTracker; + } + + readonly JsonSerializerSettings settings = new JsonSerializerSettings() + { + TypeNameHandling = TypeNameHandling.Auto, + }; + + string Serialize(TrackedObject trackedObject) + { + if (trackedObject == null) + { + return "null"; + } + else + { + JObject jObject = JObject.FromObject(trackedObject, JsonSerializer.Create(this.settings)); + + // for the checking to work correctly, we must edit the serialized state as follows: + // - order all json properties, otherwise nondeterminism causes false errors + // - modify isPlayed to true (on HistoryState) or false (on SessionsState) to avoid errors due to racing mutations + + bool? fixIsPlayed; + if (trackedObject is SessionsState) + { + fixIsPlayed = false; + } + else if (trackedObject is HistoryState) + { + fixIsPlayed = true; + } + else + { + fixIsPlayed = null; + } + + RecursivelyEdit(jObject); + + return jObject.ToString(Formatting.Indented); + + void RecursivelyEdit(JObject jObj) + { + var children = jObj.Properties().OrderBy(p => p.Name).ToList(); + foreach (var prop in children) + { + prop.Remove(); + } + foreach (var prop in children) + { + jObj.Add(prop); + + if (prop.Value is JObject o1) + { + RecursivelyEdit(o1); + } + else if (prop.Value is JArray) + { + var numProperties = prop.Value.Count(); + for (int i = 0; i < numProperties; i++) + { + if (prop.Value[i] is JObject o2) + { + RecursivelyEdit(o2); + } + } + } + else if (fixIsPlayed.HasValue && prop.Name == "IsPlayed") + { + prop.Value = (JToken)fixIsPlayed.Value; + } + } + } + } + } + + TrackedObject DeserializeTrackedObject(string content, TrackedObjectKey key) + { + if (content == "null") + { + return null; + } + else + { + var trackedObject = TrackedObjectKey.Factory(key); + JsonConvert.PopulateObject(content, trackedObject, this.settings); + return trackedObject; + } + } + + string Serialize(PartitionUpdateEvent partitionUpdateEvent) + => JsonConvert.SerializeObject(partitionUpdateEvent, typeof(PartitionUpdateEvent), Formatting.Indented, this.settings); + + PartitionUpdateEvent DeserializePartitionUpdateEvent(string content) + => (PartitionUpdateEvent) JsonConvert.DeserializeObject(content, this.settings); + + public void PartitionStarting(Partition partition, TrackedObjectStore store, long CommitLogPosition, long InputQueuePosition) + { + var info = new Info() + { + Partition = partition, + Store = new Dictionary(), + CommitLogPosition = CommitLogPosition, + InputQueuePosition = InputQueuePosition, + }; + + this.partitionInfo[partition] = info; + + store.EmitCurrentState((TrackedObjectKey key, TrackedObject value) => + { + info.Store.Add(key, this.Serialize(value)); + }); + + info.EffectTracker = new ReplayCheckEffectTracker(this, info); + } + + public async Task CheckUpdate(Partition partition, PartitionUpdateEvent partitionUpdateEvent, TrackedObjectStore store) + { + var info = this.partitionInfo[partition]; + + System.Diagnostics.Trace.WriteLine($"REPLAYCHECK STARTED {partitionUpdateEvent}"); + + var eventForReplay = this.DeserializePartitionUpdateEvent(this.Serialize(partitionUpdateEvent)); + eventForReplay.NextCommitLogPosition = partitionUpdateEvent.NextCommitLogPosition; + await info.EffectTracker.ProcessUpdate(eventForReplay); + + // check that the two match, generate error message and fix difference otherwise + + HashSet NotVisited = new HashSet(info.Store.Keys); + + store.EmitCurrentState((TrackedObjectKey key, TrackedObject value) => + { + NotVisited.Remove(key); + string expected = this.Serialize(value); + + if (!info.Store.TryGetValue(key, out var replayed)) + { + this.testHooks.Error(this.GetType().Name, $"key={key}\nexpected={expected}\nreplayed=absent"); + info.Store[key] = expected; + } + if (expected != replayed) + { + var expectedlines = TraceUtils.GetLines(expected).ToArray(); + var replayedlines = TraceUtils.GetLines(replayed).ToArray(); + string expectedline = ""; + string replayedline = ""; + int i = 0; + for (; i < Math.Max(expectedlines.Length, replayedlines.Length); i++) + { + expectedline = i < expectedlines.Length ? expectedlines[i] : "absent"; + replayedline = i < replayedlines.Length ? replayedlines[i] : "absent"; + if (expectedline != replayedline) + { + break; + } + } + this.testHooks.Error(this.GetType().Name, $"key={key} line={i}\nexpectedline={expectedline}\nreplayedline={replayedline}\nexpected={expected}\nreplayed={replayed} "); + info.Store[key] = expected; + } + }); + + foreach(var key in NotVisited) + { + string val = info.Store[key]; + this.testHooks.Error(this.GetType().Name, $"key={key}\nexpected=absent\nreplayed={val}"); + info.Store.Remove(key); + } + + System.Diagnostics.Trace.WriteLine("REPLAYCHECK DONE"); + } + + public void PartitionStopped(Partition partition) + { + this.partitionInfo.TryRemove(partition, out _); + } + + class ReplayCheckEffectTracker : EffectTracker + { + readonly ReplayChecker replayChecker; + readonly ReplayChecker.Info info; + + public ReplayCheckEffectTracker(ReplayChecker replayChecker, ReplayChecker.Info info) + { + this.replayChecker = replayChecker; + this.info = info; + this.IsReplaying = true; + } + + public override Partition Partition => this.info.Partition; + + public override EventTraceHelper EventTraceHelper => null; + + public override EventTraceHelper EventDetailTracer => null; + + public override uint PartitionId => this.info.Partition.PartitionId; + + public override double CurrentTimeMs => 0; + + public override ValueTask ApplyToStore(TrackedObjectKey key, EffectTracker tracker) + { + // retrieve the previously stored state, if present + TrackedObject trackedObject = null; + if (this.info.Store.TryGetValue(key, out string content)) + { + trackedObject = this.replayChecker.DeserializeTrackedObject(content, key); + } + + // initialize the tracked object before applying the effect + trackedObject ??= TrackedObjectKey.Factory(key); + trackedObject.Partition = this.Partition; + + // apply the effect using our special tracker that suppresses side effects + tracker.ProcessEffectOn(trackedObject); + + // store the result back, to reuse on the next update + content = this.replayChecker.Serialize(trackedObject); + this.info.Store[key] = content; + + return default; + } + + public override ValueTask RemoveFromStore(IEnumerable keys) + { + foreach (var key in keys) + { + this.info.Store.Remove(key); + } + return default; + } + + public override (long, long) GetPositions() + { + return (this.info.CommitLogPosition, this.info.InputQueuePosition); + } + + public override void Assert(bool condition) + { + if (!condition) + { + this.replayChecker.testHooks.Error(this.replayChecker.GetType().Name, "assertion failed"); + } + } + + protected override void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning) + { + this.replayChecker.testHooks.Error(this.replayChecker.GetType().Name, $"{where}: {message} {e}"); + } + } + } +} diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs index 139f2f6e..8f0a0ad5 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs @@ -16,6 +16,7 @@ class StoreWorker : BatchWorker readonly FasterTraceHelper traceHelper; readonly BlobManager blobManager; readonly Random random; + readonly ReplayChecker replayChecker; readonly EffectTracker effectTracker; @@ -43,7 +44,7 @@ class StoreWorker : BatchWorker public static TimeSpan PokePeriod = TimeSpan.FromSeconds(3); // allows storeworker to checkpoint and publish load even while idle - public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHelper traceHelper, BlobManager blobManager, CancellationToken cancellationToken) + public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHelper traceHelper, BlobManager blobManager, CancellationToken cancellationToken) : base($"{nameof(StoreWorker)}{partition.PartitionId:D2}", true, 500, cancellationToken, partition.TraceHelper) { partition.ErrorHandler.Token.ThrowIfCancellationRequested(); @@ -53,18 +54,14 @@ public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHel this.traceHelper = traceHelper; this.blobManager = blobManager; this.random = new Random(); + this.replayChecker = this.partition.Settings.TestHooks?.ReplayChecker; this.loadInfo = PartitionLoadInfo.FirstFrame(this.partition.Settings.WorkerId); this.lastPublished = DateTime.MinValue; this.lastPublishedLatencyTrend = ""; // construct an effect tracker that we use to apply effects to the store - this.effectTracker = new EffectTracker( - this.partition, - (key, tracker) => store.ProcessEffectOnTrackedObject(key, tracker), - (keys) => store.RemoveKeys(keys), - () => (this.CommitLogPosition, this.InputQueuePosition) - ); + this.effectTracker = new TrackedObjectStoreEffectTracker(this.partition, this, store); } public async Task Initialize(long initialCommitLogPosition, long initialInputQueuePosition) @@ -85,6 +82,8 @@ public async Task Initialize(long initialCommitLogPosition, long initialInputQue this.lastCheckpointedCommitLogPosition = this.CommitLogPosition; this.lastCheckpointedInputQueuePosition = this.InputQueuePosition; this.numberEventsSinceLastCheckpoint = initialCommitLogPosition; + + this.replayChecker?.PartitionStarting(this.partition, this.store, this.CommitLogPosition, this.InputQueuePosition); } public void StartProcessing() @@ -149,6 +148,8 @@ public async Task CancelAndShutdown() await this.pendingStoreCheckpoint.ConfigureAwait(false); } + this.replayChecker?.PartitionStopped(this.partition); + this.traceHelper.FasterProgress("Stopped StoreWorker"); } @@ -443,12 +444,14 @@ public async Task RestartThingsAtEndOfRecovery() { this.traceHelper.FasterProgress("Restarting tasks"); + this.replayChecker?.PartitionStarting(this.partition, this.store, this.CommitLogPosition, this.InputQueuePosition); + using (EventTraceContext.MakeContext(this.CommitLogPosition, string.Empty)) { foreach (var key in TrackedObjectKey.GetSingletons()) { var target = (TrackedObject)await this.store.ReadAsync(key, this.effectTracker).ConfigureAwait(false); - target.OnRecoveryCompleted(); + target.OnRecoveryCompleted(this.effectTracker); } } } @@ -486,6 +489,11 @@ async ValueTask ProcessUpdate(PartitionUpdateEvent partitionUpdateEvent) await this.effectTracker.ProcessUpdate(partitionUpdateEvent).ConfigureAwait(false); + if (this.replayChecker != null) + { + await this.replayChecker.CheckUpdate(this.partition, partitionUpdateEvent, this.store); + } + // update the commit log position if (partitionUpdateEvent.NextCommitLogPosition > 0) { diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStore.cs b/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStore.cs index 77a686ac..6e893789 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStore.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStore.cs @@ -50,6 +50,8 @@ abstract class TrackedObjectStore public abstract ValueTask RemoveKeys(IEnumerable keys); + public abstract void EmitCurrentState(Action emitItem); + public StoreStatistics StoreStats { get; } = new StoreStatistics(); public class StoreStatistics diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStoreEffectTracker.cs b/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStoreEffectTracker.cs new file mode 100644 index 00000000..dd517317 --- /dev/null +++ b/src/DurableTask.Netherite/StorageProviders/Faster/TrackedObjectStoreEffectTracker.cs @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite.Faster +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + + + class TrackedObjectStoreEffectTracker : PartitionEffectTracker + { + readonly TrackedObjectStore store; + readonly StoreWorker storeWorker; + + public TrackedObjectStoreEffectTracker(Partition partition, StoreWorker storeWorker, TrackedObjectStore store) + : base(partition) + { + this.store = store; + this.storeWorker = storeWorker; + } + + public override ValueTask ApplyToStore(TrackedObjectKey key, EffectTracker tracker) + { + this.store.ProcessEffectOnTrackedObject(key, tracker); + return default; + } + + public override (long, long) GetPositions() + { + return (this.storeWorker.CommitLogPosition, this.storeWorker.InputQueuePosition); + } + + public override ValueTask RemoveFromStore(IEnumerable keys) + { + this.store.RemoveKeys(keys); + return default; + } + } +} \ No newline at end of file diff --git a/src/DurableTask.Netherite/StorageProviders/Memory/MemoryStorage.cs b/src/DurableTask.Netherite/StorageProviders/Memory/MemoryStorage.cs index 1e32f293..f2182b0e 100644 --- a/src/DurableTask.Netherite/StorageProviders/Memory/MemoryStorage.cs +++ b/src/DurableTask.Netherite/StorageProviders/Memory/MemoryStorage.cs @@ -16,6 +16,7 @@ class MemoryStorage : BatchWorker, IPartitionState { readonly ILogger logger; Partition partition; + EffectTracker effects; long nextSubmitPosition = 0; long commitPosition = 0; long inputQueuePosition = 0; @@ -65,6 +66,7 @@ public void SubmitEvents(IList entries) public Task CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, long initialInputQueuePosition) { this.partition = partition; + this.effects = new MemoryStorageEffectTracker(partition, this); foreach (var trackedObject in this.trackedObjects.Values) { @@ -114,14 +116,7 @@ IList QueryOrchestrationStates(InstanceQuery query) protected override async Task Process(IList batch) { try - { - var effects = new EffectTracker( - this.partition, - this.ApplyToStore, - this.RemoveFromStore, - () => (this.commitPosition, this.inputQueuePosition) - ); - + { if (batch.Count != 0) { foreach (var partitionEvent in batch) @@ -135,7 +130,7 @@ protected override async Task Process(IList batch) { case PartitionUpdateEvent updateEvent: updateEvent.NextCommitLogPosition = this.commitPosition + 1; - await effects.ProcessUpdate(updateEvent).ConfigureAwait(false); + await this.effects.ProcessUpdate(updateEvent).ConfigureAwait(false); DurabilityListeners.ConfirmDurable(updateEvent); if (updateEvent.NextCommitLogPosition > 0) { @@ -148,15 +143,15 @@ protected override async Task Process(IList batch) if (readEvent.Prefetch.HasValue) { var prefetchTarget = this.GetOrAdd(readEvent.Prefetch.Value); - effects.ProcessReadResult(readEvent, readEvent.Prefetch.Value, prefetchTarget); + this.effects.ProcessReadResult(readEvent, readEvent.Prefetch.Value, prefetchTarget); } var readTarget = this.GetOrAdd(readEvent.ReadTarget); - effects.ProcessReadResult(readEvent, readEvent.ReadTarget, readTarget); + this.effects.ProcessReadResult(readEvent, readEvent.ReadTarget, readTarget); break; case PartitionQueryEvent queryEvent: var instances = this.QueryOrchestrationStates(queryEvent.InstanceQuery); - var backgroundTask = Task.Run(() => effects.ProcessQueryResultAsync(queryEvent, instances.ToAsyncEnumerable())); + var backgroundTask = Task.Run(() => this.effects.ProcessQueryResultAsync(queryEvent, instances.ToAsyncEnumerable())); break; default: @@ -182,24 +177,40 @@ protected override async Task Process(IList batch) } } - public ValueTask ApplyToStore(TrackedObjectKey key, EffectTracker tracker) + public Task Prefetch(IEnumerable keys) { - tracker.ProcessEffectOn(this.GetOrAdd(key)); - return default; + return Task.CompletedTask; } - public ValueTask RemoveFromStore(IEnumerable keys) + class MemoryStorageEffectTracker : PartitionEffectTracker { - foreach (var key in keys) + readonly MemoryStorage memoryStorage; + + public MemoryStorageEffectTracker(Partition partition, MemoryStorage memoryStorage) + : base(partition) + { + this.memoryStorage = memoryStorage; + } + + public override ValueTask ApplyToStore(TrackedObjectKey key, EffectTracker tracker) { - this.trackedObjects.TryRemove(key, out _); + tracker.ProcessEffectOn(this.memoryStorage.GetOrAdd(key)); + return default; } - return default; - } - public Task Prefetch(IEnumerable keys) - { - return Task.CompletedTask; + public override (long, long) GetPositions() + { + return (this.memoryStorage.commitPosition, this.memoryStorage.inputQueuePosition); + } + + public override ValueTask RemoveFromStore(IEnumerable keys) + { + foreach (var key in keys) + { + this.memoryStorage.trackedObjects.TryRemove(key, out _); + } + return default; + } } } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/Util/Utils.cs b/src/DurableTask.Netherite/Util/Utils.cs index 481c686f..117250e0 100644 --- a/src/DurableTask.Netherite/Util/Utils.cs +++ b/src/DurableTask.Netherite/Util/Utils.cs @@ -6,6 +6,7 @@ namespace DurableTask.Netherite using System; using System.Collections.Generic; using System.Diagnostics; + using System.IO; using System.Linq; using DurableTask.Core; using DurableTask.Core.History; @@ -65,5 +66,21 @@ public static bool TryGetTaskScheduledId(HistoryEvent historyEvent, out int task return false; } } + + public static IEnumerable GetLines(this string str, bool removeEmptyLines = false) + { + using (var sr = new StringReader(str)) + { + string line; + while ((line = sr.ReadLine()) != null) + { + if (removeEmptyLines && String.IsNullOrWhiteSpace(line)) + { + continue; + } + yield return line; + } + } + } } } diff --git a/test/DurableTask.Netherite.Tests/FasterPartitionTests.cs b/test/DurableTask.Netherite.Tests/FasterPartitionTests.cs index f8a897d5..11fac382 100644 --- a/test/DurableTask.Netherite.Tests/FasterPartitionTests.cs +++ b/test/DurableTask.Netherite.Tests/FasterPartitionTests.cs @@ -121,16 +121,16 @@ public async Task LimitedMemory() }; // create a cache monitor - var cacheDebugger = new Faster.CacheDebugger(); + var cacheDebugger = new Faster.CacheDebugger(settings.TestHooks); var cts = new CancellationTokenSource(); string reportedProblem = null; - cacheDebugger.OnError += (message) => + settings.TestHooks.OnError += (message) => { - this.output?.Invoke($"CACHEDEBUGGER: {message}"); + this.output?.Invoke($"TESTHOOKS: {message}"); reportedProblem = reportedProblem ?? message; cts.Cancel(); }; - settings.CacheDebugger = cacheDebugger; + settings.TestHooks.CacheDebugger = cacheDebugger; // we use the standard hello orchestration from the samples, which calls 5 activities in sequence var orchestrationType = typeof(ScenarioTests.Orchestrations.Hello5); diff --git a/test/DurableTask.Netherite.Tests/QueryTests.cs b/test/DurableTask.Netherite.Tests/QueryTests.cs index e8fab700..0f7d9fb8 100644 --- a/test/DurableTask.Netherite.Tests/QueryTests.cs +++ b/test/DurableTask.Netherite.Tests/QueryTests.cs @@ -50,6 +50,8 @@ public void Dispose() throw new TimeoutException("timed out while purging instances after running test"); } + Assert.Null(this.fixture.TestHooksError); + this.outputHelper = null; } diff --git a/test/DurableTask.Netherite.Tests/ScenarioTests.cs b/test/DurableTask.Netherite.Tests/ScenarioTests.cs index ef7fdbe9..480481a9 100644 --- a/test/DurableTask.Netherite.Tests/ScenarioTests.cs +++ b/test/DurableTask.Netherite.Tests/ScenarioTests.cs @@ -47,6 +47,7 @@ public void Dispose() throw new TimeoutException("timed out while purging instances after running test"); } + Assert.Null(this.fixture.TestHooksError); this.outputHelper = null; } diff --git a/test/DurableTask.Netherite.Tests/SingleHostFixture.cs b/test/DurableTask.Netherite.Tests/SingleHostFixture.cs index 5bc43c11..e27a9f7b 100644 --- a/test/DurableTask.Netherite.Tests/SingleHostFixture.cs +++ b/test/DurableTask.Netherite.Tests/SingleHostFixture.cs @@ -20,6 +20,8 @@ public class SingleHostFixture : IDisposable internal TestOrchestrationHost Host { get; private set; } internal ILoggerFactory LoggerFactory { get; private set; } + internal string TestHooksError { get; private set; } + public SingleHostFixture() { this.LoggerFactory = new LoggerFactory(); @@ -28,6 +30,12 @@ public SingleHostFixture() TestConstants.ValidateEnvironment(); var settings = TestConstants.GetNetheriteOrchestrationServiceSettings(); settings.PartitionManagement = PartitionManagementOptions.EventProcessorHost; + settings.TestHooks.ReplayChecker = new Faster.ReplayChecker(settings.TestHooks); + settings.TestHooks.OnError += (message) => + { + System.Diagnostics.Trace.WriteLine($"TESTHOOKS: {message}"); + this.TestHooksError ??= message; + }; this.Host = new TestOrchestrationHost(settings, this.LoggerFactory); this.Host.StartAsync().Wait(); this.traceListener = new TestTraceListener(); @@ -45,6 +53,7 @@ public void SetOutput(Action output) { this.loggerProvider.Output = output; this.traceListener.Output = output; + this.TestHooksError = null; } internal class TestTraceListener : TraceListener diff --git a/test/DurableTask.Netherite.Tests/TestConstants.cs b/test/DurableTask.Netherite.Tests/TestConstants.cs index 6afbbd63..02479fc6 100644 --- a/test/DurableTask.Netherite.Tests/TestConstants.cs +++ b/test/DurableTask.Netherite.Tests/TestConstants.cs @@ -51,6 +51,7 @@ public static NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationSer //settings.UseLocalDirectoryForPartitionStorage = $"{Environment.GetEnvironmentVariable("temp")}\\FasterTestStorage"; settings.Validate((name) => Environment.GetEnvironmentVariable(name)); + settings.TestHooks = new TestHooks(); return settings; }