Skip to content

Commit

Permalink
Enforce persistence barrier for self-messages (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt authored May 26, 2022
1 parent 413a353 commit f51633c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ class OrchestrationMessageBatch : InternalReadEvent, TransportAbstraction.IDurab
public string WorkItemId;
public double? WaitingSince; // measures time waiting to execute

readonly PartitionUpdateEvent waitForDequeueCountPersistence;
readonly PartitionUpdateEvent waitForPersistence; // if nonnull, must wait for this event
OrchestrationWorkItem workItem;

public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId);

public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition, PartitionUpdateEvent filingEvent)
public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition, PartitionUpdateEvent filingEvent, bool waitForPersistence)
{
this.InstanceId = instanceId;
this.SessionId = session.SessionId;
Expand All @@ -46,12 +46,12 @@ public OrchestrationMessageBatch(string instanceId, SessionsState.Session sessio

session.CurrentBatch = this;

if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem || filingEvent is RecoveryCompleted)
if (waitForPersistence || partition.Settings.PersistDequeueCountBeforeStartingWorkItem)
{
this.waitForDequeueCountPersistence = filingEvent;
this.waitForPersistence = filingEvent;
}

partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId}");
partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId} waitForPersistence={this.waitForPersistence}");

// continue when we have the history state loaded, which gives us the latest state and/or cursor
partition.SubmitEvent(this);
Expand Down Expand Up @@ -144,10 +144,10 @@ public override void OnReadComplete(TrackedObject s, Partition partition)
}
else
{
if (this.waitForDequeueCountPersistence != null)
if (this.waitForPersistence != null)
{
// process the work item once the filing event is persisted
DurabilityListeners.Register(this.waitForDequeueCountPersistence, this);
DurabilityListeners.Register(this.waitForPersistence, this);
}
else
{
Expand Down
44 changes: 27 additions & 17 deletions src/DurableTask.Netherite/PartitionState/SessionsState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public override void Process(RecoveryCompleted evt, EffectTracker effects)

if (!effects.IsReplaying) // during replay, we don't start work items until end of recovery
{
new OrchestrationMessageBatch(kvp.Key, kvp.Value, this.Partition, evt);
// submit a message batch for processing
new OrchestrationMessageBatch(kvp.Key, kvp.Value, this.Partition, evt, waitForPersistence: true);
}
}

Expand Down Expand Up @@ -144,12 +145,13 @@ void AddMessageToSession(TaskMessage message, string originWorkItemId, bool isRe

if (!isReplaying) // during replay, we don't start work items until end of recovery
{
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent);
// submit a message batch for processing
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent, waitForPersistence: false);
}
}
}

void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerable<TaskMessage> messages, bool isReplaying, PartitionUpdateEvent filingEvent)
Session AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerable<TaskMessage> messages, bool isReplaying, PartitionUpdateEvent filingEvent)
{
this.Partition.Assert(!string.IsNullOrEmpty(originWorkItemId), "null originWorkItem");
int? forceNewExecution = FindLastExecutionStartedEvent(messages);
Expand All @@ -159,7 +161,7 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl
// A session for this instance already exists, so a work item is in progress already.
// We don't need to schedule a work item because we'll notice the new messages
// when the previous work item completes.
foreach(var message in messages)
foreach (var message in messages)
{
if (!isReplaying)
{
Expand All @@ -185,12 +187,12 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl
}
messages = messages.Skip(forceNewExecution.Value);
}

// Create a new session
this.Sessions[instanceId] = session = new Session()
{
SessionId = this.SequenceNumber++,
Batch = new List<(TaskMessage,string)>(),
Batch = new List<(TaskMessage, string)>(),
BatchStartPosition = 0,
DequeueCount = 1,
ForceNewExecution = forceNewExecution.HasValue,
Expand All @@ -208,9 +210,11 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl

if (!isReplaying) // we don't start work items until end of recovery
{
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent);
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent, waitForPersistence: false);
}
}

return session;
}

static int? FindLastExecutionStartedEvent(IEnumerable<TaskMessage> messages)
Expand Down Expand Up @@ -331,6 +335,9 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
return;
};

// detect loopback messages, to guarantee that they act as a persistence barrier
bool containsLoopbackMessages = false;

if (!evt.NotExecutable)
{

Expand All @@ -354,7 +361,12 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
{
foreach (var group in evt.LocalMessages.GroupBy(tm => tm.OrchestrationInstance.InstanceId))
{
this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying, evt);
var targetSession = this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying, evt);

if (targetSession == session)
{
containsLoopbackMessages = true;
}
}
}

Expand All @@ -370,22 +382,20 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
session.BatchStartPosition += evt.BatchLength;
session.DequeueCount = 1;

this.StartNewBatchIfNeeded(session, effects, evt.InstanceId, effects.IsReplaying, evt);
}

void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instanceId, bool isReplaying, PartitionUpdateEvent filingEvent)
{
// start a new batch if needed
if (session.Batch.Count == 0)
{
// no more pending messages for this instance, so we delete the session.
this.Sessions.Remove(instanceId);
this.Sessions.Remove(evt.InstanceId);
}
else
{
if (!isReplaying) // we don't start work items until end of recovery
// there are more messages to process

if (!effects.IsReplaying) // we don't start work items until end of recovery
{
// there are more messages. Start another work item.
new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent);
// submit a message batch for processing
new OrchestrationMessageBatch(evt.InstanceId, session, this.Partition, evt, waitForPersistence: containsLoopbackMessages);
}
}
}
Expand Down

0 comments on commit f51633c

Please sign in to comment.