Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in OutboxState #250

Merged
merged 2 commits into from
Apr 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/DurableTask.Netherite/PartitionState/OutboxState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,30 @@ public void ConfirmDurable(Event evt)
void Send(Batch batch)
{
batch.ReadyToSendTimestamp = this.Partition.CurrentTimeMs;
var outMessages = batch.OutgoingMessages.Count < 2 ? batch.OutgoingMessages : batch.OutgoingMessages.ToList();// prevent concurrent mod
batch.TotalAcksExpected = batch.OutgoingResponses.Count + outMessages.Count;
Comment on lines -102 to -103
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, what was this concurrent mod about? Not sure how this count-check is related to concurrent modifications

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic was based on the assumption that concurrent modification is only a problem if the list is larger than one element. That assumption was wrong. So we now always make a copy of the list.

this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Outbox is sending {batch.OutgoingResponses.Count} responses, {outMessages.Count} messages for event id={batch.SendingEventId}");
batch.TotalAcksExpected = batch.OutgoingResponses.Count + batch.OutgoingMessages.Count;
this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Outbox is sending {batch.OutgoingResponses.Count} responses, {batch.OutgoingMessages.Count} messages for event id={batch.SendingEventId}");

// now that we know the sending event is persisted, we can send the messages
foreach (var outresponse in batch.OutgoingResponses)
// we need to make a copy of the collections to safely iterate, because they are modified after the
// send is confirmed, which can happen before we finish the iteration.

if (batch.OutgoingResponses.Count > 0)
{
DurabilityListeners.Register(outresponse, batch);
this.Partition.Send(outresponse);
var outgoingResponses = batch.OutgoingResponses.ToArray(); //copy for safe iteration
foreach (var outresponse in outgoingResponses )
{
DurabilityListeners.Register(outresponse, batch);
this.Partition.Send(outresponse);
}
}
foreach (var outmessage in outMessages)
if (batch.OutgoingMessages.Count > 0)
{
DurabilityListeners.Register(outmessage, batch);
this.Partition.Send(outmessage);
var outgoingMessages = batch.OutgoingMessages.ToArray(); // copy for safe iteration
foreach (var outmessage in outgoingMessages)
{
DurabilityListeners.Register(outmessage, batch);
this.Partition.Send(outmessage);
}
}
}

Expand Down