Skip to content

Commit

Permalink
Added IAsyncEnumerable over KNetStreamsRetriever
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers committed Jan 21, 2024
1 parent 1923474 commit 4c47622
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ public void Dispose()
_kafkaProducer?.Dispose();
_streamData?.Dispose();
}
_keySerdes?.Dispose();
_valueSerdes?.Dispose();
}
/// <inheritdoc/>
public IEnumerable<ValueBuffer> ValueBuffers
Expand Down
92 changes: 87 additions & 5 deletions src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ public IEnumerable<ValueBuffer> GetValueBuffers()
}

class KafkaEnumberable : IEnumerable<ValueBuffer>
#if NET8_0_OR_GREATER
, IAsyncEnumerable<ValueBuffer>
#endif
{
private readonly bool _useEnumeratorWithPrefetch;
private readonly IKafkaCluster _kafkaCluster;
Expand All @@ -288,23 +291,39 @@ public IEnumerator<ValueBuffer> GetEnumerator()
#if DEBUG_PERFORMANCE
Infrastructure.KafkaDbContext.ReportString($"Requesting KafkaEnumerator for {_entityType.Name} on {DateTime.Now:HH:mm:ss.FFFFFFF}");
#endif
return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All, _useEnumeratorWithPrefetch);
return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All, _useEnumeratorWithPrefetch, false);
}

System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

#if NET8_0_OR_GREATER
public IAsyncEnumerator<ValueBuffer> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
if (_resultException != null) throw _resultException;
#if DEBUG_PERFORMANCE
Infrastructure.KafkaDbContext.ReportString($"Requesting async KafkaEnumerator for {_entityType.Name} on {DateTime.Now:HH:mm:ss.FFFFFFF}");
#endif
return new KafkaEnumerator(_kafkaCluster, _entityType, _keyValueStore?.All, _useEnumeratorWithPrefetch, true);
}
#endif
}

class KafkaEnumerator : IEnumerator<ValueBuffer>
#if NET8_0_OR_GREATER
, IAsyncEnumerator<ValueBuffer>
#endif
{
private readonly bool _useEnumeratorWithPrefetch;
private readonly IKafkaCluster _kafkaCluster;
private readonly IEntityType _entityType;
private readonly KNetKeyValueIterator<TKey, TValue>? _keyValueIterator = null;
private readonly IEnumerator<KNetKeyValue<TKey, TValue>>? _enumerator = null;

#if NET8_0_OR_GREATER
private readonly IAsyncEnumerator<KNetKeyValue<TKey, TValue>>? _asyncEnumerator = null;
#endif
#if DEBUG_PERFORMANCE
Stopwatch _moveNextSw = new Stopwatch();
Stopwatch _currentSw = new Stopwatch();
Expand All @@ -313,13 +332,16 @@ class KafkaEnumerator : IEnumerator<ValueBuffer>
Stopwatch _valueBufferSw = new Stopwatch();
#endif

public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, KNetKeyValueIterator<TKey, TValue>? keyValueIterator, bool useEnumerator)
public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, KNetKeyValueIterator<TKey, TValue>? keyValueIterator, bool useEnumerator, bool isAsync)
{
_kafkaCluster = kafkaCluster ?? throw new ArgumentNullException(nameof(kafkaCluster));
_entityType = entityType;
_keyValueIterator = keyValueIterator ?? throw new ArgumentNullException(nameof(keyValueIterator));
_useEnumeratorWithPrefetch = useEnumerator;
if (_useEnumeratorWithPrefetch) _enumerator = _keyValueIterator.ToIEnumerator();
if (_useEnumeratorWithPrefetch && !isAsync) _enumerator = _keyValueIterator.ToIEnumerator();
#if NET8_0_OR_GREATER
if (isAsync) _asyncEnumerator = _keyValueIterator.GetAsyncEnumerator();
#endif
}

ValueBuffer? _current = null;
Expand Down Expand Up @@ -354,10 +376,19 @@ public void Dispose()
_enumerator?.Dispose();
}

#if NET8_0_OR_GREATER
public ValueTask DisposeAsync()
{
#if DEBUG_PERFORMANCE
int _cycles = 0;
Infrastructure.KafkaDbContext.ReportString($"KafkaEnumerator _moveNextSw: {_moveNextSw.Elapsed} _currentSw: {_currentSw.Elapsed} _valueGetSw: {_valueGetSw.Elapsed} _valueGet2Sw: {_valueGet2Sw.Elapsed} _valueBufferSw: {_valueBufferSw.Elapsed}");
#endif
return _asyncEnumerator.DisposeAsync();
}
#endif

#if DEBUG_PERFORMANCE
int _cycles = 0;
#endif
public bool MoveNext()
{
#if DEBUG_PERFORMANCE
Expand Down Expand Up @@ -405,6 +436,57 @@ public bool MoveNext()
#endif
}

#if NET8_0_OR_GREATER
public ValueTask<bool> MoveNextAsync()
{
#if DEBUG_PERFORMANCE
try
{
_moveNextSw.Start();
#endif
ValueTask<bool> hasNext = _asyncEnumerator.MoveNextAsync();
hasNext.AsTask().Wait();
if (hasNext.Result)
{
#if DEBUG_PERFORMANCE
_cycles++;
_valueGetSw.Start();
#endif
KNetKeyValue<TKey, TValue> kv = _asyncEnumerator.Current;
#if DEBUG_PERFORMANCE
_valueGetSw.Stop();
_valueGet2Sw.Start();
#endif
TValue value = kv.Value;
#if DEBUG_PERFORMANCE
_valueGet2Sw.Stop();
_valueBufferSw.Start();
#endif
object[] array = null!;
value?.GetData(_entityType, ref array);
#if DEBUG_PERFORMANCE
_valueBufferSw.Stop();
#endif
_current = new ValueBuffer(array);

return ValueTask.FromResult(true);
}
_current = null;
return ValueTask.FromResult(false);
#if DEBUG_PERFORMANCE
}
finally
{
_moveNextSw.Stop();
if (_cycles == 0)
{
throw new InvalidOperationException($"KafkaEnumerator - No data returned from {_keyValueIterator}");
}
}
#endif
}
#endif

public void Reset()
{
throw new NotSupportedException(CoreStrings.EnumerableResetNotSupported);
Expand Down

0 comments on commit 4c47622

Please sign in to comment.