Skip to content

Commit

Permalink
Implement replay checker (#99)
Browse files Browse the repository at this point in the history
* fix aliasing bug

* implement replay checker

* undo unnecessary change

* add missing teeth: must fail unit tests on test hook errors

* fix bug, remove unnecessary field

* fix replay discrepancy on fields of outgoing messages

* fix race condition when updating creation timestamp during prefetch

* fix last commit

* fix false errors caused by json property order
  • Loading branch information
sebastianburckhardt authored Jan 4, 2022
1 parent f542c93 commit 885f7b9
Show file tree
Hide file tree
Showing 35 changed files with 680 additions and 208 deletions.
100 changes: 55 additions & 45 deletions src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,17 @@ namespace DurableTask.Netherite
/// information about the context, and to enumerate the objects on which the effect
/// is being processed.
/// </summary>
class EffectTracker : List<TrackedObjectKey>
abstract class EffectTracker : List<TrackedObjectKey>
{
readonly Func<TrackedObjectKey, EffectTracker, ValueTask> applyToStore;
readonly Func<IEnumerable<TrackedObjectKey>, ValueTask> removeFromStore;
readonly Func<(long, long)> getPositions;
readonly System.Diagnostics.Stopwatch stopWatch;
readonly HashSet<TrackedObjectKey> deletedKeys;

public EffectTracker(Partition partition, Func<TrackedObjectKey, EffectTracker, ValueTask> applyToStore, Func<IEnumerable<TrackedObjectKey>, ValueTask> removeFromStore, Func<(long, long)> getPositions)
PartitionUpdateEvent currentUpdate;

public EffectTracker()
{
this.Partition = partition;
this.applyToStore = applyToStore;
this.removeFromStore = removeFromStore;
this.getPositions = getPositions;
this.stopWatch = new System.Diagnostics.Stopwatch();
this.deletedKeys = new HashSet<TrackedObjectKey>();
this.stopWatch.Start();
}

/// <summary>
/// The current partition.
/// </summary>
public Partition Partition { get; }

/// <summary>
/// The event id of the current effect.
/// </summary>
Expand All @@ -55,7 +42,30 @@ void SetCurrentUpdateEvent(PartitionUpdateEvent updateEvent)
this.currentUpdate = updateEvent;
}

PartitionUpdateEvent currentUpdate;

#region abstract methods

public abstract ValueTask ApplyToStore(TrackedObjectKey key, EffectTracker tracker);

public abstract ValueTask RemoveFromStore(IEnumerable<TrackedObjectKey> keys);

public abstract (long, long) GetPositions();

public abstract Partition Partition { get; }

public abstract EventTraceHelper EventTraceHelper { get; }

public abstract EventTraceHelper EventDetailTracer { get; }

protected abstract void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning);

public abstract void Assert(bool condition);

public abstract uint PartitionId { get; }

public abstract double CurrentTimeMs { get; }

#endregion

/// <summary>
/// Applies the event to the given tracked object, using visitor pattern to
Expand All @@ -73,7 +83,7 @@ public void ProcessEffectOn(TrackedObject trackedObject)
{
// for robustness, we swallow exceptions inside event processing.
// It does not mean they are not serious. We still report them as errors.
this.Partition.ErrorHandler.HandleError(nameof(ProcessUpdate), $"Encountered exception on {trackedObject} when applying update event {this.currentUpdate}, eventId={this.currentUpdate?.EventId}", exception, false, false);
this.HandleError(nameof(ProcessUpdate), $"Encountered exception on {trackedObject} when applying update event {this.currentUpdate}, eventId={this.currentUpdate?.EventId}", exception, false, false);
}
}

Expand All @@ -84,16 +94,16 @@ public void AddDeletion(TrackedObjectKey key)

public async Task ProcessUpdate(PartitionUpdateEvent updateEvent)
{
(long commitLogPosition, long inputQueuePosition) = this.getPositions();
double startedTimestamp = this.Partition.CurrentTimeMs;
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, updateEvent.EventIdString))
{
try
{
this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, this.IsReplaying);
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, this.IsReplaying);

this.Partition.Assert(updateEvent != null);
this.Assert(updateEvent != null);

this.SetCurrentUpdateEvent(updateEvent);

Expand All @@ -111,10 +121,10 @@ async ValueTask ProcessRecursively()
var startPos = this.Count - 1;
var key = this[startPos];

this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Process on [{key}]");
this.EventDetailTracer?.TraceEventProcessingDetail($"Process on [{key}]");

// start with processing the event on this object
await this.applyToStore(key, this).ConfigureAwait(false);
await this.ApplyToStore(key, this).ConfigureAwait(false);

// recursively process all additional objects to process
while (this.Count - 1 > startPos)
Expand All @@ -128,7 +138,7 @@ async ValueTask ProcessRecursively()

if (this.deletedKeys.Count > 0)
{
await this.removeFromStore(this.deletedKeys);
await this.RemoveFromStore(this.deletedKeys);
this.deletedKeys.Clear();
}

Expand All @@ -142,21 +152,21 @@ async ValueTask ProcessRecursively()
{
// for robustness, we swallow exceptions inside event processing.
// It does not mean they are not serious. We still report them as errors.
this.Partition.ErrorHandler.HandleError(nameof(ProcessUpdate), $"Encountered exception while processing update event {updateEvent}", exception, false, false);
this.HandleError(nameof(ProcessUpdate), $"Encountered exception while processing update event {updateEvent}", exception, false, false);
}
finally
{
double finishedTimestamp = this.Partition.CurrentTimeMs;
this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, startedTimestamp, finishedTimestamp, this.IsReplaying);
double finishedTimestamp = this.CurrentTimeMs;
this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, startedTimestamp, finishedTimestamp, this.IsReplaying);
}
}
}

public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key, TrackedObject target)
{
(long commitLogPosition, long inputQueuePosition) = this.getPositions();
this.Partition.Assert(!this.IsReplaying); // read events are never part of the replay
double startedTimestamp = this.Partition.CurrentTimeMs;
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying); // read events are never part of the replay
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, readEvent.EventIdString))
{
Expand All @@ -171,15 +181,15 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key
InstanceState instanceState = (InstanceState)target;
string instanceExecutionId = instanceState?.OrchestrationState?.OrchestrationInstance.ExecutionId;
string status = instanceState?.OrchestrationState?.OrchestrationStatus.ToString() ?? "null";
this.Partition.EventTraceHelper.TraceFetchedInstanceStatus(readEvent, key.InstanceId, instanceExecutionId, status, startedTimestamp - readEvent.IssuedTimestamp);
this.EventTraceHelper?.TraceFetchedInstanceStatus(readEvent, key.InstanceId, instanceExecutionId, status, startedTimestamp - readEvent.IssuedTimestamp);
break;

case TrackedObjectKey.TrackedObjectType.History:
HistoryState historyState = (HistoryState)target;
string historyExecutionId = historyState?.ExecutionId;
int eventCount = historyState?.History?.Count ?? 0;
int episode = historyState?.Episode ?? 0;
this.Partition.EventTraceHelper.TraceFetchedInstanceHistory(readEvent, key.InstanceId, historyExecutionId, eventCount, episode, startedTimestamp - readEvent.IssuedTimestamp);
this.EventTraceHelper?.TraceFetchedInstanceHistory(readEvent, key.InstanceId, historyExecutionId, eventCount, episode, startedTimestamp - readEvent.IssuedTimestamp);
break;

default:
Expand All @@ -188,7 +198,7 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key

if (isReady)
{
this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, false);
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, false);
readEvent.Fire(this.Partition);
}
}
Expand All @@ -200,27 +210,27 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key
{
// for robustness, we swallow exceptions inside event processing.
// It does not mean they are not serious. We still report them as errors.
this.Partition.ErrorHandler.HandleError(nameof(ProcessReadResult), $"Encountered exception while processing read event {readEvent}", exception, false, false);
this.HandleError(nameof(ProcessReadResult), $"Encountered exception while processing read event {readEvent}", exception, false, false);
}
finally
{
double finishedTimestamp = this.Partition.CurrentTimeMs;
this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, startedTimestamp, finishedTimestamp, false);
double finishedTimestamp = this.CurrentTimeMs;
this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, startedTimestamp, finishedTimestamp, false);
}
}
}

public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<OrchestrationState> instances)
{
(long commitLogPosition, long inputQueuePosition) = this.getPositions();
this.Partition.Assert(!this.IsReplaying); // query events are never part of the replay
double startedTimestamp = this.Partition.CurrentTimeMs;
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying); // query events are never part of the replay
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, queryEvent.EventIdString))
{
try
{
this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, false);
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, false);
await queryEvent.OnQueryCompleteAsync(instances, this.Partition);
}
catch (OperationCanceledException)
Expand All @@ -231,12 +241,12 @@ public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsync
{
// for robustness, we swallow exceptions inside event processing.
// It does not mean they are not serious. We still report them as errors.
this.Partition.ErrorHandler.HandleError(nameof(ProcessQueryResultAsync), $"Encountered exception while processing query event {queryEvent}", exception, false, false);
this.HandleError(nameof(ProcessQueryResultAsync), $"Encountered exception while processing query event {queryEvent}", exception, false, false);
}
finally
{
double finishedTimestamp = this.Partition.CurrentTimeMs;
this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, startedTimestamp, finishedTimestamp, false);
double finishedTimestamp = this.CurrentTimeMs;
this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, startedTimestamp, finishedTimestamp, false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Common;

/// <summary>
/// Is used while applying an effect to a partition state, to carry
/// information about the context, and to enumerate the objects on which the effect
/// is being processed.
/// </summary>
abstract class PartitionEffectTracker : EffectTracker
{
readonly Partition partition;

public PartitionEffectTracker(Partition partition)
{
this.partition = partition;
if (partition == null)
{
throw new ArgumentNullException(nameof(partition));
}
}

public override Partition Partition => this.partition;

public override EventTraceHelper EventTraceHelper
=> this.Partition.EventTraceHelper;

public override EventTraceHelper EventDetailTracer
=> this.Partition.EventDetailTracer;

protected override void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning)
=> this.Partition.ErrorHandler.HandleError(where, message, e, terminatePartition, reportAsWarning);

public override void Assert(bool condition)
=> this.Partition.Assert(condition);

public override uint PartitionId
=> this.Partition.PartitionId;

public override double CurrentTimeMs
=> this.Partition.CurrentTimeMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public virtual void OnFirstInitialization()
/// Is automatically called on all singleton objects after recovery. Typically used to
/// restart pending activities, timers, tasks and the like.
/// </summary>
public virtual void OnRecoveryCompleted()
public virtual void OnRecoveryCompleted(EffectTracker effects)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch
[DataMember]
public OrchestrationStatus[] DedupeStatuses { get; set; }

[DataMember]
public DateTime Timestamp { get; set; }

[DataMember]
public TaskMessage TaskMessage { get; set; }

Expand All @@ -42,18 +39,51 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch
[IgnoreDataMember]
public CreationResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState

[IgnoreDataMember]
public DateTime CreationTimestamp { get; set; }

public override bool OnReadComplete(TrackedObject target, Partition partition)
{
// Use this moment of time as the creation timestamp, replacing the original timestamp taken on the client.
// Use this moment of time as the creation timestamp, which is going to replace the original timestamp taken on the client.
// This is preferrable because it avoids clock synchronization issues (which can result in negative orchestration durations)
// and means the timestamp is consistently ordered with respect to timestamps of other events on this partition.
DateTime creationTimestamp = DateTime.UtcNow;

this.ExecutionStartedEvent.Timestamp = creationTimestamp;
this.CreationTimestamp = DateTime.UtcNow;

return true;
}

// make a copy of an event so we run it through the pipeline a second time
public override PartitionEvent Clone()
{
var evt = (CreationRequestReceived)base.Clone();

// make a copy of the execution started event in order to modify the creation timestamp

var ee = this.ExecutionStartedEvent;
var tm = this.TaskMessage;

evt.TaskMessage = new TaskMessage()
{
Event = new ExecutionStartedEvent(ee.EventId, ee.Input)
{
ParentInstance = ee.ParentInstance,
Name = ee.Name,
Version = ee.Version,
Tags = ee.Tags,
ScheduledStartTime = ee.ScheduledStartTime,
Correlation = ee.Correlation,
ExtensionData = ee.ExtensionData,
OrchestrationInstance = ee.OrchestrationInstance,
Timestamp = this.CreationTimestamp,
},
SequenceNumber = tm.SequenceNumber,
OrchestrationInstance = tm.OrchestrationInstance,
ExtensionData = tm.ExtensionData,
};

return evt;
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ protected override void ExtraTraceInformation(StringBuilder s)
s.Append(this.InstanceId);
}

public static bool SatisfiesWaitCondition(OrchestrationState value)
=> (value != null &&
value.OrchestrationStatus != OrchestrationStatus.Running &&
value.OrchestrationStatus != OrchestrationStatus.Pending &&
value.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew);
public static bool SatisfiesWaitCondition(OrchestrationStatus? value)
=> (value.HasValue &&
value.Value != OrchestrationStatus.Running &&
value.Value != OrchestrationStatus.Pending &&
value.Value != OrchestrationStatus.ContinuedAsNew);

public WaitResponseReceived CreateResponse(OrchestrationState value)
=> new WaitResponseReceived()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ class BatchProcessed : PartitionUpdateEvent, IRequiresPrefetch
[DataMember]
public OrchestrationStatus OrchestrationStatus { get; set; }

[IgnoreDataMember]
public OrchestrationState State { get; set; }

[DataMember]
public List<TaskMessage> ActivityMessages { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ abstract class PartitionEvent : Event
public virtual void OnSubmit(Partition partition) { }

// make a copy of an event so we run it through the pipeline a second time
public PartitionEvent Clone()
public virtual PartitionEvent Clone()
{
var evt = (PartitionEvent)this.MemberwiseClone();

Expand Down
Loading

0 comments on commit 885f7b9

Please sign in to comment.