diff --git a/src/DurableTask.Netherite/PartitionState/PrefetchState.cs b/src/DurableTask.Netherite/PartitionState/PrefetchState.cs index ec6038f6..b8bae815 100644 --- a/src/DurableTask.Netherite/PartitionState/PrefetchState.cs +++ b/src/DurableTask.Netherite/PartitionState/PrefetchState.cs @@ -74,13 +74,23 @@ public override void Process(WaitRequestReceived waitRequestEvent, EffectTracker void ProcessClientRequestEventWithPrefetch(ClientRequestEventWithPrefetch clientRequestEvent, EffectTracker effects) { if (clientRequestEvent.Phase == ClientRequestEventWithPrefetch.ProcessingPhase.Read) - { - this.Partition.Assert(!this.PendingPrefetches.ContainsKey(clientRequestEvent.EventIdString), "PendingPrefetches.ContainsKey(clientRequestEvent.EventIdString)"); - - // Issue a read request that fetches the instance state. - // We have to buffer this request in the pending list so we can recover it. + { + // It's possible for EventHubs to duplicate client-to-partition events. Therefore, we perform a best-effort + // de-duplication of EH messages. For more details, see: https://github.com/microsoft/durabletask-netherite/pull/379 + if (!this.PendingPrefetches.ContainsKey(clientRequestEvent.EventIdString)) + { + // Issue a read request that fetches the instance state. + // We buffer this request in the pending list so we can recover it, and can filter duplicates + // (as long as the duplicates appear soon after the original) - this.PendingPrefetches.Add(clientRequestEvent.EventIdString, clientRequestEvent); + this.PendingPrefetches.Add(clientRequestEvent.EventIdString, clientRequestEvent); + } + else + { + // this is a duplicate. Ignore it. + effects.EventTraceHelper?.TraceEventProcessingWarning($"Dropped duplicate client request {clientRequestEvent} id={clientRequestEvent.EventIdString}"); + return; + } } else {