Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement replay checker #99

Merged
merged 9 commits into from
Jan 4, 2022
100 changes: 55 additions & 45 deletions src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,17 @@ namespace DurableTask.Netherite
/// information about the context, and to enumerate the objects on which the effect
/// is being processed.
/// </summary>
class EffectTracker : List<TrackedObjectKey>
abstract class EffectTracker : List<TrackedObjectKey>
{
readonly Func<TrackedObjectKey, EffectTracker, ValueTask> applyToStore;
readonly Func<IEnumerable<TrackedObjectKey>, ValueTask> removeFromStore;
readonly Func<(long, long)> getPositions;
readonly System.Diagnostics.Stopwatch stopWatch;
readonly HashSet<TrackedObjectKey> deletedKeys;

public EffectTracker(Partition partition, Func<TrackedObjectKey, EffectTracker, ValueTask> applyToStore, Func<IEnumerable<TrackedObjectKey>, ValueTask> removeFromStore, Func<(long, long)> getPositions)
PartitionUpdateEvent currentUpdate;

public EffectTracker()
{
this.Partition = partition;
this.applyToStore = applyToStore;
this.removeFromStore = removeFromStore;
this.getPositions = getPositions;
this.stopWatch = new System.Diagnostics.Stopwatch();
this.deletedKeys = new HashSet<TrackedObjectKey>();
this.stopWatch.Start();
}

/// <summary>
/// The current partition.
/// </summary>
public Partition Partition { get; }

/// <summary>
/// The event id of the current effect.
/// </summary>
Expand All @@ -55,7 +42,30 @@ void SetCurrentUpdateEvent(PartitionUpdateEvent updateEvent)
this.currentUpdate = updateEvent;
}

PartitionUpdateEvent currentUpdate;

#region abstract methods

public abstract ValueTask ApplyToStore(TrackedObjectKey key, EffectTracker tracker);

public abstract ValueTask RemoveFromStore(IEnumerable<TrackedObjectKey> keys);

public abstract (long, long) GetPositions();

public abstract Partition Partition { get; }

public abstract EventTraceHelper EventTraceHelper { get; }

public abstract EventTraceHelper EventDetailTracer { get; }

protected abstract void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning);

public abstract void Assert(bool condition);

public abstract uint PartitionId { get; }

public abstract double CurrentTimeMs { get; }

#endregion

/// <summary>
/// Applies the event to the given tracked object, using visitor pattern to
Expand All @@ -73,7 +83,7 @@ public void ProcessEffectOn(TrackedObject trackedObject)
{
// for robustness, we swallow exceptions inside event processing.
// It does not mean they are not serious. We still report them as errors.
this.Partition.ErrorHandler.HandleError(nameof(ProcessUpdate), $"Encountered exception on {trackedObject} when applying update event {this.currentUpdate}, eventId={this.currentUpdate?.EventId}", exception, false, false);
this.HandleError(nameof(ProcessUpdate), $"Encountered exception on {trackedObject} when applying update event {this.currentUpdate}, eventId={this.currentUpdate?.EventId}", exception, false, false);
}
}

Expand All @@ -84,16 +94,16 @@ public void AddDeletion(TrackedObjectKey key)

public async Task ProcessUpdate(PartitionUpdateEvent updateEvent)
{
(long commitLogPosition, long inputQueuePosition) = this.getPositions();
double startedTimestamp = this.Partition.CurrentTimeMs;
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, updateEvent.EventIdString))
{
try
{
this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, this.IsReplaying);
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, this.IsReplaying);

this.Partition.Assert(updateEvent != null);
this.Assert(updateEvent != null);

this.SetCurrentUpdateEvent(updateEvent);

Expand All @@ -111,10 +121,10 @@ async ValueTask ProcessRecursively()
var startPos = this.Count - 1;
var key = this[startPos];

this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Process on [{key}]");
this.EventDetailTracer?.TraceEventProcessingDetail($"Process on [{key}]");

// start with processing the event on this object
await this.applyToStore(key, this).ConfigureAwait(false);
await this.ApplyToStore(key, this).ConfigureAwait(false);

// recursively process all additional objects to process
while (this.Count - 1 > startPos)
Expand All @@ -128,7 +138,7 @@ async ValueTask ProcessRecursively()

if (this.deletedKeys.Count > 0)
{
await this.removeFromStore(this.deletedKeys);
await this.RemoveFromStore(this.deletedKeys);
this.deletedKeys.Clear();
}

Expand All @@ -142,21 +152,21 @@ async ValueTask ProcessRecursively()
{
// for robustness, we swallow exceptions inside event processing.
// It does not mean they are not serious. We still report them as errors.
this.Partition.ErrorHandler.HandleError(nameof(ProcessUpdate), $"Encountered exception while processing update event {updateEvent}", exception, false, false);
this.HandleError(nameof(ProcessUpdate), $"Encountered exception while processing update event {updateEvent}", exception, false, false);
}
finally
{
double finishedTimestamp = this.Partition.CurrentTimeMs;
this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, startedTimestamp, finishedTimestamp, this.IsReplaying);
double finishedTimestamp = this.CurrentTimeMs;
this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, startedTimestamp, finishedTimestamp, this.IsReplaying);
}
}
}

public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key, TrackedObject target)
{
(long commitLogPosition, long inputQueuePosition) = this.getPositions();
this.Partition.Assert(!this.IsReplaying); // read events are never part of the replay
double startedTimestamp = this.Partition.CurrentTimeMs;
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying); // read events are never part of the replay
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, readEvent.EventIdString))
{
Expand All @@ -171,15 +181,15 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key
InstanceState instanceState = (InstanceState)target;
string instanceExecutionId = instanceState?.OrchestrationState?.OrchestrationInstance.ExecutionId;
string status = instanceState?.OrchestrationState?.OrchestrationStatus.ToString() ?? "null";
this.Partition.EventTraceHelper.TraceFetchedInstanceStatus(readEvent, key.InstanceId, instanceExecutionId, status, startedTimestamp - readEvent.IssuedTimestamp);
this.EventTraceHelper?.TraceFetchedInstanceStatus(readEvent, key.InstanceId, instanceExecutionId, status, startedTimestamp - readEvent.IssuedTimestamp);
break;

case TrackedObjectKey.TrackedObjectType.History:
HistoryState historyState = (HistoryState)target;
string historyExecutionId = historyState?.ExecutionId;
int eventCount = historyState?.History?.Count ?? 0;
int episode = historyState?.Episode ?? 0;
this.Partition.EventTraceHelper.TraceFetchedInstanceHistory(readEvent, key.InstanceId, historyExecutionId, eventCount, episode, startedTimestamp - readEvent.IssuedTimestamp);
this.EventTraceHelper?.TraceFetchedInstanceHistory(readEvent, key.InstanceId, historyExecutionId, eventCount, episode, startedTimestamp - readEvent.IssuedTimestamp);
break;

default:
Expand All @@ -188,7 +198,7 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key

if (isReady)
{
this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, false);
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, false);
readEvent.Fire(this.Partition);
}
}
Expand All @@ -200,27 +210,27 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key
{
// for robustness, we swallow exceptions inside event processing.
// It does not mean they are not serious. We still report them as errors.
this.Partition.ErrorHandler.HandleError(nameof(ProcessReadResult), $"Encountered exception while processing read event {readEvent}", exception, false, false);
this.HandleError(nameof(ProcessReadResult), $"Encountered exception while processing read event {readEvent}", exception, false, false);
}
finally
{
double finishedTimestamp = this.Partition.CurrentTimeMs;
this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, startedTimestamp, finishedTimestamp, false);
double finishedTimestamp = this.CurrentTimeMs;
this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, readEvent, EventTraceHelper.EventCategory.ReadEvent, startedTimestamp, finishedTimestamp, false);
}
}
}

public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<OrchestrationState> instances)
{
(long commitLogPosition, long inputQueuePosition) = this.getPositions();
this.Partition.Assert(!this.IsReplaying); // query events are never part of the replay
double startedTimestamp = this.Partition.CurrentTimeMs;
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying); // query events are never part of the replay
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, queryEvent.EventIdString))
{
try
{
this.Partition.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, false);
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, false);
await queryEvent.OnQueryCompleteAsync(instances, this.Partition);
}
catch (OperationCanceledException)
Expand All @@ -231,12 +241,12 @@ public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsync
{
// for robustness, we swallow exceptions inside event processing.
// It does not mean they are not serious. We still report them as errors.
this.Partition.ErrorHandler.HandleError(nameof(ProcessQueryResultAsync), $"Encountered exception while processing query event {queryEvent}", exception, false, false);
this.HandleError(nameof(ProcessQueryResultAsync), $"Encountered exception while processing query event {queryEvent}", exception, false, false);
}
finally
{
double finishedTimestamp = this.Partition.CurrentTimeMs;
this.Partition.EventTraceHelper.TraceEventProcessed(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, startedTimestamp, finishedTimestamp, false);
double finishedTimestamp = this.CurrentTimeMs;
this.EventTraceHelper?.TraceEventProcessed(commitLogPosition, queryEvent, EventTraceHelper.EventCategory.QueryEvent, startedTimestamp, finishedTimestamp, false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Common;

/// <summary>
/// Is used while applying an effect to a partition state, to carry
/// information about the context, and to enumerate the objects on which the effect
/// is being processed.
/// </summary>
abstract class PartitionEffectTracker : EffectTracker
{
readonly Partition partition;

public PartitionEffectTracker(Partition partition)
{
this.partition = partition;
if (partition == null)
{
throw new ArgumentNullException(nameof(partition));
}
}

public override Partition Partition => this.partition;

public override EventTraceHelper EventTraceHelper
=> this.Partition.EventTraceHelper;

public override EventTraceHelper EventDetailTracer
=> this.Partition.EventDetailTracer;

protected override void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning)
=> this.Partition.ErrorHandler.HandleError(where, message, e, terminatePartition, reportAsWarning);

public override void Assert(bool condition)
=> this.Partition.Assert(condition);

public override uint PartitionId
=> this.Partition.PartitionId;

public override double CurrentTimeMs
=> this.Partition.CurrentTimeMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public virtual void OnFirstInitialization()
/// Is automatically called on all singleton objects after recovery. Typically used to
/// restart pending activities, timers, tasks and the like.
/// </summary>
public virtual void OnRecoveryCompleted()
public virtual void OnRecoveryCompleted(EffectTracker effects)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch
[DataMember]
public OrchestrationStatus[] DedupeStatuses { get; set; }

[DataMember]
public DateTime Timestamp { get; set; }

[DataMember]
public TaskMessage TaskMessage { get; set; }

Expand All @@ -42,18 +39,51 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch
[IgnoreDataMember]
public CreationResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState

[IgnoreDataMember]
public DateTime CreationTimestamp { get; set; }

public override bool OnReadComplete(TrackedObject target, Partition partition)
{
// Use this moment of time as the creation timestamp, replacing the original timestamp taken on the client.
// Use this moment of time as the creation timestamp, which is going to replace the original timestamp taken on the client.
// This is preferrable because it avoids clock synchronization issues (which can result in negative orchestration durations)
// and means the timestamp is consistently ordered with respect to timestamps of other events on this partition.
DateTime creationTimestamp = DateTime.UtcNow;

this.ExecutionStartedEvent.Timestamp = creationTimestamp;
this.CreationTimestamp = DateTime.UtcNow;

return true;
}

// make a copy of an event so we run it through the pipeline a second time
public override PartitionEvent Clone()
{
var evt = (CreationRequestReceived)base.Clone();

// make a copy of the execution started event in order to modify the creation timestamp

var ee = this.ExecutionStartedEvent;
var tm = this.TaskMessage;

evt.TaskMessage = new TaskMessage()
{
Event = new ExecutionStartedEvent(ee.EventId, ee.Input)
{
ParentInstance = ee.ParentInstance,
Name = ee.Name,
Version = ee.Version,
Tags = ee.Tags,
ScheduledStartTime = ee.ScheduledStartTime,
Correlation = ee.Correlation,
ExtensionData = ee.ExtensionData,
OrchestrationInstance = ee.OrchestrationInstance,
Timestamp = this.CreationTimestamp,
},
SequenceNumber = tm.SequenceNumber,
OrchestrationInstance = tm.OrchestrationInstance,
ExtensionData = tm.ExtensionData,
};

return evt;
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ protected override void ExtraTraceInformation(StringBuilder s)
s.Append(this.InstanceId);
}

public static bool SatisfiesWaitCondition(OrchestrationState value)
=> (value != null &&
value.OrchestrationStatus != OrchestrationStatus.Running &&
value.OrchestrationStatus != OrchestrationStatus.Pending &&
value.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew);
public static bool SatisfiesWaitCondition(OrchestrationStatus? value)
=> (value.HasValue &&
value.Value != OrchestrationStatus.Running &&
value.Value != OrchestrationStatus.Pending &&
value.Value != OrchestrationStatus.ContinuedAsNew);

public WaitResponseReceived CreateResponse(OrchestrationState value)
=> new WaitResponseReceived()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ class BatchProcessed : PartitionUpdateEvent, IRequiresPrefetch
[DataMember]
public OrchestrationStatus OrchestrationStatus { get; set; }

[IgnoreDataMember]
public OrchestrationState State { get; set; }

[DataMember]
public List<TaskMessage> ActivityMessages { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ abstract class PartitionEvent : Event
public virtual void OnSubmit(Partition partition) { }

// make a copy of an event so we run it through the pipeline a second time
public PartitionEvent Clone()
public virtual PartitionEvent Clone()
{
var evt = (PartitionEvent)this.MemberwiseClone();

Expand Down
Loading