Skip to content

Commit

Permalink
Deduplicate CreationRequestReceived events (#381)
Browse files Browse the repository at this point in the history
* always deduplicate CreationRequestReceived events

* Update src/DurableTask.Netherite/PartitionState/InstanceState.cs

Co-authored-by: David Justo <[email protected]>

---------

Co-authored-by: David Justo <[email protected]>
  • Loading branch information
sebastianburckhardt and davidmrdavid authored Apr 22, 2024
1 parent 7e52cbe commit 40a1f8c
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions src/DurableTask.Netherite/PartitionState/InstanceState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class InstanceState : TrackedObject
[DataMember]
public List<WaitRequestReceived> Waiters { get; set; }

[DataMember]
public string CreationRequestEventId { get; set; }

[IgnoreDataMember]
public override TrackedObjectKey Key => new TrackedObjectKey(TrackedObjectKey.TrackedObjectType.Instance, this.InstanceId);

Expand All @@ -39,12 +42,19 @@ public override string ToString()

public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects)
{
if (creationRequestReceived.EventIdString == this.CreationRequestEventId)
{
// we have already processed this event - it must be a duplicate delivery. Ignore it.
return;
};

bool exists = this.OrchestrationState != null;
bool filterDuplicate = exists

bool previousExecutionWithDedupeStatus = exists
&& creationRequestReceived.DedupeStatuses != null
&& creationRequestReceived.DedupeStatuses.Contains(this.OrchestrationState.OrchestrationStatus);

if (!filterDuplicate)
if (!previousExecutionWithDedupeStatus)
{
var ee = creationRequestReceived.ExecutionStartedEvent;

Expand All @@ -65,6 +75,7 @@ public override void Process(CreationRequestReceived creationRequestReceived, Ef
ScheduledStartTime = ee.ScheduledStartTime
};
this.OrchestrationStateSize = DurableTask.Netherite.SizeUtils.GetEstimatedSize(this.OrchestrationState);
this.CreationRequestEventId = creationRequestReceived.EventIdString;

// queue the message in the session, or start a timer if delayed
if (!ee.ScheduledStartTime.HasValue)
Expand All @@ -87,7 +98,7 @@ public override void Process(CreationRequestReceived creationRequestReceived, Ef
{
ClientId = creationRequestReceived.ClientId,
RequestId = creationRequestReceived.RequestId,
Succeeded = !filterDuplicate,
Succeeded = !previousExecutionWithDedupeStatus,
ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus,
};
}
Expand Down Expand Up @@ -211,6 +222,8 @@ void DeleteState(EffectTracker effects)
effects.AddDeletion(TrackedObjectKey.History(this.InstanceId));

this.OrchestrationState = null;
this.OrchestrationStateSize = 0;
this.CreationRequestEventId = null;
this.Waiters = null;
}

Expand Down

0 comments on commit 40a1f8c

Please sign in to comment.