diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs index 2f1da8a7..7bef6792 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs @@ -290,27 +290,25 @@ public IEnumerable ValueBuffers { get { + if (_kafkaCompactedReplicator != null) return new KNetCompactedReplicatorEnumerable(_entityType, _kafkaCompactedReplicator); if (_streamData != null) return _streamData; - if (_kafkaCompactedReplicator == null) throw new InvalidOperationException("Missing _kafkaCompactedReplicator"); - return new KNetCompactedReplicatorEnumerable(_entityType, _kafkaCompactedReplicator); + throw new InvalidOperationException("Missing _kafkaCompactedReplicator or _streamData"); } } private void KafkaCompactedReplicator_OnRemoteUpdate(IKNetCompactedReplicator arg1, KeyValuePair arg2) { - try + Task.Factory.StartNew(() => { _onChangeEvent?.Invoke(new EntityTypeChanged(_entityType, EntityTypeChanged.ChangeKindType.Upserted, arg2.Key)); - } - catch { } + }); } private void KafkaCompactedReplicator_OnRemoteRemove(IKNetCompactedReplicator arg1, KeyValuePair arg2) { - try + Task.Factory.StartNew(() => { _onChangeEvent?.Invoke(new EntityTypeChanged(_entityType, EntityTypeChanged.ChangeKindType.Removed, arg2.Key)); - } - catch { } + }); } }