From eefbedeec7cb3f7a71061e551abefec1de61cf11 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 12 Apr 2023 15:52:50 -0700 Subject: [PATCH 1/2] fix race condition in OutboxState --- .../PartitionState/OutboxState.cs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/DurableTask.Netherite/PartitionState/OutboxState.cs b/src/DurableTask.Netherite/PartitionState/OutboxState.cs index fd8095a8..7661a4eb 100644 --- a/src/DurableTask.Netherite/PartitionState/OutboxState.cs +++ b/src/DurableTask.Netherite/PartitionState/OutboxState.cs @@ -99,20 +99,28 @@ public void ConfirmDurable(Event evt) void Send(Batch batch) { batch.ReadyToSendTimestamp = this.Partition.CurrentTimeMs; - var outMessages = batch.OutgoingMessages.Count < 2 ? batch.OutgoingMessages : batch.OutgoingMessages.ToList();// prevent concurrent mod - batch.TotalAcksExpected = batch.OutgoingResponses.Count + outMessages.Count; - this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Outbox is sending {batch.OutgoingResponses.Count} responses, {outMessages.Count} messages for event id={batch.SendingEventId}"); + batch.TotalAcksExpected = batch.OutgoingResponses.Count + batch.OutgoingMessages.Count; + this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Outbox is sending {batch.OutgoingResponses.Count} responses, {batch.OutgoingMessages.Count} messages for event id={batch.SendingEventId}"); // now that we know the sending event is persisted, we can send the messages - foreach (var outresponse in batch.OutgoingResponses) + // we need to make a copy of the collections to safely iterate, because they are modified after the + // send is confirmed, which can happen before we finish the iteration. + + if (batch.OutgoingResponses.Count > 0) { - DurabilityListeners.Register(outresponse, batch); - this.Partition.Send(outresponse); + foreach (var outresponse in batch.OutgoingResponses.ToArray()) + { + DurabilityListeners.Register(outresponse, batch); + this.Partition.Send(outresponse); + } } - foreach (var outmessage in outMessages) + if (batch.OutgoingMessages.Count > 0) { - DurabilityListeners.Register(outmessage, batch); - this.Partition.Send(outmessage); + foreach (var outmessage in batch.OutgoingMessages.ToArray()) + { + DurabilityListeners.Register(outmessage, batch); + this.Partition.Send(outmessage); + } } } From a21f646940ec6bb31f0b2ea519d6c643d63d2d97 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Mon, 17 Apr 2023 10:10:34 -0700 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: David Justo --- src/DurableTask.Netherite/PartitionState/OutboxState.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.Netherite/PartitionState/OutboxState.cs b/src/DurableTask.Netherite/PartitionState/OutboxState.cs index 7661a4eb..b60cb879 100644 --- a/src/DurableTask.Netherite/PartitionState/OutboxState.cs +++ b/src/DurableTask.Netherite/PartitionState/OutboxState.cs @@ -108,7 +108,8 @@ void Send(Batch batch) if (batch.OutgoingResponses.Count > 0) { - foreach (var outresponse in batch.OutgoingResponses.ToArray()) + var outgoingResponses = batch.OutgoingResponses.ToArray(); //copy for safe iteration + foreach (var outresponse in outgoingResponses ) { DurabilityListeners.Register(outresponse, batch); this.Partition.Send(outresponse); @@ -116,7 +117,8 @@ void Send(Batch batch) } if (batch.OutgoingMessages.Count > 0) { - foreach (var outmessage in batch.OutgoingMessages.ToArray()) + var outgoingMessages = batch.OutgoingMessages.ToArray(); // copy for safe iteration + foreach (var outmessage in outgoingMessages) { DurabilityListeners.Register(outmessage, batch); this.Partition.Send(outmessage);