Skip to content

Commit

Permalink
Fix race condition in OutboxState (#250)
Browse files Browse the repository at this point in the history
* fix race condition in OutboxState

* Apply suggestions from code review

Co-authored-by: David Justo <[email protected]>

---------

Co-authored-by: David Justo <[email protected]>
  • Loading branch information
sebastianburckhardt and davidmrdavid authored Apr 20, 2023
1 parent 6090def commit 67255a9
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions src/DurableTask.Netherite/PartitionState/OutboxState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,30 @@ public void ConfirmDurable(Event evt)
void Send(Batch batch)
{
batch.ReadyToSendTimestamp = this.Partition.CurrentTimeMs;
var outMessages = batch.OutgoingMessages.Count < 2 ? batch.OutgoingMessages : batch.OutgoingMessages.ToList();// prevent concurrent mod
batch.TotalAcksExpected = batch.OutgoingResponses.Count + outMessages.Count;
this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Outbox is sending {batch.OutgoingResponses.Count} responses, {outMessages.Count} messages for event id={batch.SendingEventId}");
batch.TotalAcksExpected = batch.OutgoingResponses.Count + batch.OutgoingMessages.Count;
this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Outbox is sending {batch.OutgoingResponses.Count} responses, {batch.OutgoingMessages.Count} messages for event id={batch.SendingEventId}");

// now that we know the sending event is persisted, we can send the messages
foreach (var outresponse in batch.OutgoingResponses)
// we need to make a copy of the collections to safely iterate, because they are modified after the
// send is confirmed, which can happen before we finish the iteration.

if (batch.OutgoingResponses.Count > 0)
{
DurabilityListeners.Register(outresponse, batch);
this.Partition.Send(outresponse);
var outgoingResponses = batch.OutgoingResponses.ToArray(); //copy for safe iteration
foreach (var outresponse in outgoingResponses )
{
DurabilityListeners.Register(outresponse, batch);
this.Partition.Send(outresponse);
}
}
foreach (var outmessage in outMessages)
if (batch.OutgoingMessages.Count > 0)
{
DurabilityListeners.Register(outmessage, batch);
this.Partition.Send(outmessage);
var outgoingMessages = batch.OutgoingMessages.ToArray(); // copy for safe iteration
foreach (var outmessage in outgoingMessages)
{
DurabilityListeners.Register(outmessage, batch);
this.Partition.Send(outmessage);
}
}
}

Expand Down

0 comments on commit 67255a9

Please sign in to comment.