Skip to content

Commit

Permalink
fix deduplication of fragmented partition and client events (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt authored Jan 12, 2022
1 parent 4e2b24f commit 9ec62a7
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
8 changes: 7 additions & 1 deletion src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,16 @@ public void Process(ClientEvent clientEvent)

if (!fragment.IsLast)
{
if (!this.Fragments.TryGetValue(originalEventString, out var stream))
MemoryStream stream;

if (fragment.Fragment == 0)
{
this.Fragments[originalEventString] = stream = new MemoryStream();
}
else
{
stream = this.Fragments[originalEventString];
}
stream.Write(fragment.Bytes, 0, fragment.Bytes.Length);
}
else
Expand Down
9 changes: 8 additions & 1 deletion src/DurableTask.Netherite/PartitionState/ReassemblyState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,17 @@ public override void Process(PartitionEventFragment evt, EffectTracker effects)
}
else
{
if (!this.Fragments.TryGetValue(originalEventString, out var list))
List<PartitionEventFragment> list;

if (evt.Fragment == 0)
{
this.Fragments[originalEventString] = list = new List<PartitionEventFragment>();
}
else
{
list = this.Fragments[originalEventString];
}

list.Add(evt);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async Task SendBatch(int lastPosition)
if (tooBig)
{
// the message is too big. Break it into fragments, and send each individually.
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} fragmenting large event ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} fragmenting large event ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
var fragments = FragmentationAndReassembly.Fragment(arraySegment, evt, maxFragmentSize);
maybeSent = i;
for (int k = 0; k < fragments.Count; k++)
Expand Down

0 comments on commit 9ec62a7

Please sign in to comment.