diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs index b96fc614..eb1cafdc 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs @@ -283,6 +283,8 @@ public void Dispose() _kafkaProducer?.Dispose(); _streamData?.Dispose(); } + _keySerdes?.Dispose(); + _valueSerdes?.Dispose(); } /// public IEnumerable ValueBuffers diff --git a/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs b/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs index b85ef41e..fcfeab6a 100644 --- a/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs @@ -264,6 +264,9 @@ public IEnumerable GetValueBuffers() } class KafkaEnumberable : IEnumerable +#if NET8_0_OR_GREATER + , IAsyncEnumerable +#endif { private readonly bool _useEnumeratorWithPrefetch; private readonly IKafkaCluster _kafkaCluster; @@ -288,23 +291,39 @@ public IEnumerator GetEnumerator() #if DEBUG_PERFORMANCE Infrastructure.KafkaDbContext.ReportString($"Requesting KafkaEnumerator for {_entityType.Name} on {DateTime.Now:HH:mm:ss.FFFFFFF}"); #endif - return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All, _useEnumeratorWithPrefetch); + return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All, _useEnumeratorWithPrefetch, false); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return GetEnumerator(); } + +#if NET8_0_OR_GREATER + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + if (_resultException != null) throw _resultException; +#if DEBUG_PERFORMANCE + Infrastructure.KafkaDbContext.ReportString($"Requesting async KafkaEnumerator for {_entityType.Name} on {DateTime.Now:HH:mm:ss.FFFFFFF}"); +#endif + return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All, _useEnumeratorWithPrefetch, true); + } +#endif } class KafkaEnumerator : IEnumerator +#if NET8_0_OR_GREATER + , IAsyncEnumerator +#endif { private readonly bool _useEnumeratorWithPrefetch; private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; private readonly KNetKeyValueIterator? _keyValueIterator = null; private readonly IEnumerator>? _enumerator = null; - +#if NET8_0_OR_GREATER + private readonly IAsyncEnumerator>? _asyncEnumerator = null; +#endif #if DEBUG_PERFORMANCE Stopwatch _moveNextSw = new Stopwatch(); Stopwatch _currentSw = new Stopwatch(); @@ -313,13 +332,16 @@ class KafkaEnumerator : IEnumerator Stopwatch _valueBufferSw = new Stopwatch(); #endif - public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, KNetKeyValueIterator? keyValueIterator, bool useEnumerator) + public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, KNetKeyValueIterator? keyValueIterator, bool useEnumerator, bool isAsync) { _kafkaCluster = kafkaCluster ?? throw new ArgumentNullException(nameof(kafkaCluster)); _entityType = entityType; _keyValueIterator = keyValueIterator ?? throw new ArgumentNullException(nameof(keyValueIterator)); _useEnumeratorWithPrefetch = useEnumerator; - if (_useEnumeratorWithPrefetch) _enumerator = _keyValueIterator.ToIEnumerator(); + if (_useEnumeratorWithPrefetch && !isAsync) _enumerator = _keyValueIterator.ToIEnumerator(); +#if NET8_0_OR_GREATER + if (isAsync) _asyncEnumerator = _keyValueIterator.GetAsyncEnumerator(); +#endif } ValueBuffer? _current = null; @@ -354,10 +376,19 @@ public void Dispose() _enumerator?.Dispose(); } +#if NET8_0_OR_GREATER + public ValueTask DisposeAsync() + { #if DEBUG_PERFORMANCE - int _cycles = 0; + Infrastructure.KafkaDbContext.ReportString($"KafkaEnumerator _moveNextSw: {_moveNextSw.Elapsed} _currentSw: {_currentSw.Elapsed} _valueGetSw: {_valueGetSw.Elapsed} _valueGet2Sw: {_valueGet2Sw.Elapsed} _valueBufferSw: {_valueBufferSw.Elapsed}"); +#endif + return _asyncEnumerator.DisposeAsync(); + } #endif +#if DEBUG_PERFORMANCE + int _cycles = 0; +#endif public bool MoveNext() { #if DEBUG_PERFORMANCE @@ -405,6 +436,57 @@ public bool MoveNext() #endif } +#if NET8_0_OR_GREATER + public ValueTask MoveNextAsync() + { +#if DEBUG_PERFORMANCE + try + { + _moveNextSw.Start(); +#endif + ValueTask hasNext = _asyncEnumerator.MoveNextAsync(); + hasNext.AsTask().Wait(); + if (hasNext.Result) + { +#if DEBUG_PERFORMANCE + _cycles++; + _valueGetSw.Start(); +#endif + KNetKeyValue kv = _asyncEnumerator.Current; +#if DEBUG_PERFORMANCE + _valueGetSw.Stop(); + _valueGet2Sw.Start(); +#endif + TValue value = kv.Value; +#if DEBUG_PERFORMANCE + _valueGet2Sw.Stop(); + _valueBufferSw.Start(); +#endif + object[] array = null!; + value?.GetData(_entityType, ref array); +#if DEBUG_PERFORMANCE + _valueBufferSw.Stop(); +#endif + _current = new ValueBuffer(array); + + return ValueTask.FromResult(true); + } + _current = null; + return ValueTask.FromResult(false); +#if DEBUG_PERFORMANCE + } + finally + { + _moveNextSw.Stop(); + if (_cycles == 0) + { + throw new InvalidOperationException($"KafkaEnumerator - No data returned from {_keyValueIterator}"); + } + } +#endif + } +#endif + public void Reset() { throw new NotSupportedException(CoreStrings.EnumerableResetNotSupported);