From f51633c1b67a0ff8689a942cc37adf28bc0b6e27 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Thu, 26 May 2022 09:45:51 -0700 Subject: [PATCH] Enforce persistence barrier for self-messages (#164) --- .../OrchestrationMessageBatch.cs | 14 +++--- .../PartitionState/SessionsState.cs | 44 ++++++++++++------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/DurableTask.Netherite/OrchestrationService/OrchestrationMessageBatch.cs b/src/DurableTask.Netherite/OrchestrationService/OrchestrationMessageBatch.cs index 845433c1..550e8897 100644 --- a/src/DurableTask.Netherite/OrchestrationService/OrchestrationMessageBatch.cs +++ b/src/DurableTask.Netherite/OrchestrationService/OrchestrationMessageBatch.cs @@ -24,12 +24,12 @@ class OrchestrationMessageBatch : InternalReadEvent, TransportAbstraction.IDurab public string WorkItemId; public double? WaitingSince; // measures time waiting to execute - readonly PartitionUpdateEvent waitForDequeueCountPersistence; + readonly PartitionUpdateEvent waitForPersistence; // if nonnull, must wait for this event OrchestrationWorkItem workItem; public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId); - public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition, PartitionUpdateEvent filingEvent) + public OrchestrationMessageBatch(string instanceId, SessionsState.Session session, Partition partition, PartitionUpdateEvent filingEvent, bool waitForPersistence) { this.InstanceId = instanceId; this.SessionId = session.SessionId; @@ -46,12 +46,12 @@ public OrchestrationMessageBatch(string instanceId, SessionsState.Session sessio session.CurrentBatch = this; - if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem || filingEvent is RecoveryCompleted) + if (waitForPersistence || partition.Settings.PersistDequeueCountBeforeStartingWorkItem) { - this.waitForDequeueCountPersistence = filingEvent; + this.waitForPersistence = filingEvent; } - partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId}"); + partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId} waitForPersistence={this.waitForPersistence}"); // continue when we have the history state loaded, which gives us the latest state and/or cursor partition.SubmitEvent(this); @@ -144,10 +144,10 @@ public override void OnReadComplete(TrackedObject s, Partition partition) } else { - if (this.waitForDequeueCountPersistence != null) + if (this.waitForPersistence != null) { // process the work item once the filing event is persisted - DurabilityListeners.Register(this.waitForDequeueCountPersistence, this); + DurabilityListeners.Register(this.waitForPersistence, this); } else { diff --git a/src/DurableTask.Netherite/PartitionState/SessionsState.cs b/src/DurableTask.Netherite/PartitionState/SessionsState.cs index 7b7a9b40..f37f0966 100644 --- a/src/DurableTask.Netherite/PartitionState/SessionsState.cs +++ b/src/DurableTask.Netherite/PartitionState/SessionsState.cs @@ -60,7 +60,8 @@ public override void Process(RecoveryCompleted evt, EffectTracker effects) if (!effects.IsReplaying) // during replay, we don't start work items until end of recovery { - new OrchestrationMessageBatch(kvp.Key, kvp.Value, this.Partition, evt); + // submit a message batch for processing + new OrchestrationMessageBatch(kvp.Key, kvp.Value, this.Partition, evt, waitForPersistence: true); } } @@ -144,12 +145,13 @@ void AddMessageToSession(TaskMessage message, string originWorkItemId, bool isRe if (!isReplaying) // during replay, we don't start work items until end of recovery { - new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent); + // submit a message batch for processing + new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent, waitForPersistence: false); } } } - void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerable messages, bool isReplaying, PartitionUpdateEvent filingEvent) + Session AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerable messages, bool isReplaying, PartitionUpdateEvent filingEvent) { this.Partition.Assert(!string.IsNullOrEmpty(originWorkItemId), "null originWorkItem"); int? forceNewExecution = FindLastExecutionStartedEvent(messages); @@ -159,7 +161,7 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl // A session for this instance already exists, so a work item is in progress already. // We don't need to schedule a work item because we'll notice the new messages // when the previous work item completes. - foreach(var message in messages) + foreach (var message in messages) { if (!isReplaying) { @@ -185,12 +187,12 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl } messages = messages.Skip(forceNewExecution.Value); } - + // Create a new session this.Sessions[instanceId] = session = new Session() { SessionId = this.SequenceNumber++, - Batch = new List<(TaskMessage,string)>(), + Batch = new List<(TaskMessage, string)>(), BatchStartPosition = 0, DequeueCount = 1, ForceNewExecution = forceNewExecution.HasValue, @@ -208,9 +210,11 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl if (!isReplaying) // we don't start work items until end of recovery { - new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent); + new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent, waitForPersistence: false); } } + + return session; } static int? FindLastExecutionStartedEvent(IEnumerable messages) @@ -331,6 +335,9 @@ public override void Process(BatchProcessed evt, EffectTracker effects) return; }; + // detect loopback messages, to guarantee that they act as a persistence barrier + bool containsLoopbackMessages = false; + if (!evt.NotExecutable) { @@ -354,7 +361,12 @@ public override void Process(BatchProcessed evt, EffectTracker effects) { foreach (var group in evt.LocalMessages.GroupBy(tm => tm.OrchestrationInstance.InstanceId)) { - this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying, evt); + var targetSession = this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying, evt); + + if (targetSession == session) + { + containsLoopbackMessages = true; + } } } @@ -370,22 +382,20 @@ public override void Process(BatchProcessed evt, EffectTracker effects) session.BatchStartPosition += evt.BatchLength; session.DequeueCount = 1; - this.StartNewBatchIfNeeded(session, effects, evt.InstanceId, effects.IsReplaying, evt); - } - - void StartNewBatchIfNeeded(Session session, EffectTracker effects, string instanceId, bool isReplaying, PartitionUpdateEvent filingEvent) - { + // start a new batch if needed if (session.Batch.Count == 0) { // no more pending messages for this instance, so we delete the session. - this.Sessions.Remove(instanceId); + this.Sessions.Remove(evt.InstanceId); } else { - if (!isReplaying) // we don't start work items until end of recovery + // there are more messages to process + + if (!effects.IsReplaying) // we don't start work items until end of recovery { - // there are more messages. Start another work item. - new OrchestrationMessageBatch(instanceId, session, this.Partition, filingEvent); + // submit a message batch for processing + new OrchestrationMessageBatch(evt.InstanceId, session, this.Partition, evt, waitForPersistence: containsLoopbackMessages); } } }