Skip to content

Commit

Permalink
Make change event async (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers authored Oct 22, 2023
1 parent 4418b8b commit 04ab116
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,27 +290,25 @@ public IEnumerable<ValueBuffer> ValueBuffers
{
get
{
if (_kafkaCompactedReplicator != null) return new KNetCompactedReplicatorEnumerable(_entityType, _kafkaCompactedReplicator);
if (_streamData != null) return _streamData;
if (_kafkaCompactedReplicator == null) throw new InvalidOperationException("Missing _kafkaCompactedReplicator");
return new KNetCompactedReplicatorEnumerable(_entityType, _kafkaCompactedReplicator);
throw new InvalidOperationException("Missing _kafkaCompactedReplicator or _streamData");
}
}

private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator<TKey, TValueContainer> arg1, KeyValuePair<TKey, TValueContainer> arg2)
{
try
Task.Factory.StartNew(() =>
{
_onChangeEvent?.Invoke(new EntityTypeChanged(_entityType, EntityTypeChanged.ChangeKindType.Upserted, arg2.Key));
}
catch { }
});
}

private void KafkaCompactedReplicator_OnRemoteRemove(IKNetCompactedReplicator<TKey, TValueContainer> arg1, KeyValuePair<TKey, TValueContainer> arg2)
{
try
Task.Factory.StartNew(() =>
{
_onChangeEvent?.Invoke(new EntityTypeChanged(_entityType, EntityTypeChanged.ChangeKindType.Removed, arg2.Key));
}
catch { }
});
}
}

0 comments on commit 04ab116

Please sign in to comment.