From 97d78a0d811da10ae1057900999fd56e5547ad51 Mon Sep 17 00:00:00 2001 From: MASES Public Developers Team <94312179+masesdevelopers@users.noreply.github.com> Date: Sun, 21 Jan 2024 19:49:19 +0100 Subject: [PATCH] Iterator classes becomes IEnumerable and IAsyncEnumerable aware (#366) * Iterator classes becomes IEnumerable and IAsyncEnumerable aware * Update documentation --- src/documentation/articles/streamsSDK.md | 31 ++++++ .../Specific/Streams/State/CommonIterator.cs | 86 +++++++++++++++++ .../Streams/State/KNetKeyValueIterator.cs | 95 ++++++++++++------- .../State/KNetTimestampedKeyValueIterator.cs | 87 +++++++++++------ ...KNetTimestampedWindowedKeyValueIterator.cs | 81 +++++++++++----- .../State/KNetWindowedKeyValueIterator.cs | 83 ++++++++++------ 6 files changed, 348 insertions(+), 115 deletions(-) create mode 100644 src/net/KNet/Specific/Streams/State/CommonIterator.cs diff --git a/src/documentation/articles/streamsSDK.md b/src/documentation/articles/streamsSDK.md index ce128ebcfd..2cb8fa5923 100644 --- a/src/documentation/articles/streamsSDK.md +++ b/src/documentation/articles/streamsSDK.md @@ -192,3 +192,34 @@ void longFunction(int key, TestType value) > [!WARNING] > This feature uses an external thread and cannot be stopped; upon executed `ToIEnumerator` function, the thread starts and continues till the end of the available data. +The previous point can be mitigated using the `foreach` statement since iterators implements both `IEnumerable` and `IAsyncEnumerable`: + +```C# +foreach (KNetKeyValue kv in keyValueIterator) +{ + if (kv.Key == 100) break; // when iteration breaks, keyValueIterator is Disposed and the external thread exit + longFunction(kv.Key, kv.Value); // key and value are already ready before invocation of longFunction +} + +void longFunction(int key, TestType value) +{ + // long work here +} + +``` + +or + +```C# +await foreach (KNetKeyValue kv in keyValueIterator) +{ + if (kv.Key == 100) break; // when iteration breaks, keyValueIterator is Disposed and the external thread exit + longFunction(kv.Key, kv.Value); // key and value are already ready before invocation of longFunction +} + +void longFunction(int key, TestType value) +{ + // long work here +} + +``` diff --git a/src/net/KNet/Specific/Streams/State/CommonIterator.cs b/src/net/KNet/Specific/Streams/State/CommonIterator.cs new file mode 100644 index 0000000000..61f044c149 --- /dev/null +++ b/src/net/KNet/Specific/Streams/State/CommonIterator.cs @@ -0,0 +1,86 @@ +/* +* Copyright 2024 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; +using MASES.KNet.Serialization; +using System; +using System.Collections; +using System.Collections.Generic; +using System.Threading; + +namespace MASES.KNet.Streams.State +{ + class PrefetchableEnumeratorSettings : IEnumerableExtension + { + public PrefetchableEnumeratorSettings() + { + UsePrefetch = true; + UseThread = true; + } + public bool UsePrefetch { get; set; } + public bool UseThread { get; set; } + public IConverterBridge ConverterBridge { get; set; } + } + + /// + /// A common class for all iterators + /// + /// The return of and + public abstract class CommonIterator : IGenericSerDesFactoryApplier, IEnumerable, IAsyncEnumerable + { + /// + /// Initialize a new instance of + /// + /// The associated to this instance + public CommonIterator(IGenericSerDesFactory factory) + { + _factory = factory; + } + /// + /// The associated to this instance + /// + protected IGenericSerDesFactory _factory; + IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } + /// + /// Used to get or set the type of enumerator to retrieve, default is with prefetch if the platform accept it + /// + public bool UsePrefetch { get; set; } = true; + /// + public IEnumerator GetEnumerator() + { + return GetEnumerator(false) as IEnumerator; + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + /// + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + return GetEnumerator(true, cancellationToken) as IAsyncEnumerator; + } + /// + /// Internally gets the or + /// + /// If requesting an + /// The to be used in + /// An or + protected abstract object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default); + } +} diff --git a/src/net/KNet/Specific/Streams/State/KNetKeyValueIterator.cs b/src/net/KNet/Specific/Streams/State/KNetKeyValueIterator.cs index d3423d8805..9830a90438 100644 --- a/src/net/KNet/Specific/Streams/State/KNetKeyValueIterator.cs +++ b/src/net/KNet/Specific/Streams/State/KNetKeyValueIterator.cs @@ -21,6 +21,8 @@ using MASES.KNet.Serialization; using System; using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MASES.KNet.Streams.State { @@ -29,28 +31,19 @@ namespace MASES.KNet.Streams.State /// /// The key type /// The value type - public class KNetKeyValueIterator : IGenericSerDesFactoryApplier + public sealed class KNetKeyValueIterator : CommonIterator> { #if NET7_0_OR_GREATER class PrefetchableLocalEnumerator(bool isVersion2, IGenericSerDesFactory factory, IJavaObject obj, IKNetSerDes keySerDes, - IKNetSerDes valueSerDes) - : JVMBridgeBasePrefetchableEnumerator>(obj, new PrefetchableEnumeratorSettings()), IGenericSerDesFactoryApplier + IKNetSerDes valueSerDes, + bool isAsync, CancellationToken token = default) + : JVMBridgeBasePrefetchableEnumerator>(obj, new PrefetchableEnumeratorSettings()), + IGenericSerDesFactoryApplier, + IAsyncEnumerator> { - class PrefetchableEnumeratorSettings : IEnumerableExtension - { - public PrefetchableEnumeratorSettings() - { - UsePrefetch = true; - UseThread = true; - } - public bool UsePrefetch { get; set; } - public bool UseThread { get; set; } - public IConverterBridge ConverterBridge { get; set; } - } - readonly bool _isVersion2 = isVersion2; IGenericSerDesFactory _factory = factory; IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } @@ -68,12 +61,30 @@ protected override object ConvertObject(object input) } throw new InvalidCastException($"input is not a valid IJavaObject"); } + + protected override bool DoWorkCycle() + { + return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle(); + } + + public KNetKeyValue Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } } #endif - class StandardLocalEnumerator : JVMBridgeBaseEnumerator>, IGenericSerDesFactoryApplier + class StandardLocalEnumerator : JVMBridgeBaseEnumerator>, IGenericSerDesFactoryApplier, IAsyncEnumerator> { - IKNetSerDes _keySerDes = null; - IKNetSerDes _valueSerDes = null; + readonly IKNetSerDes _keySerDes = null; + readonly IKNetSerDes _valueSerDes = null; readonly bool _isVersion2; IGenericSerDesFactory _factory; IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } @@ -108,26 +119,53 @@ protected override object ConvertObject(object input) } throw new InvalidCastException($"input is not a valid IJavaObject"); } + + public KNetKeyValue Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } } readonly Org.Apache.Kafka.Streams.State.KeyValueIterator _iterator; readonly Org.Apache.Kafka.Streams.State.KeyValueIterator _iterator2; IKNetSerDes _keySerDes = null; IKNetSerDes _valueSerDes = null; - IGenericSerDesFactory _factory; - IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } internal KNetKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator iterator) + : base(factory) { - _factory = factory; _iterator = iterator; } internal KNetKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator iterator) + : base(factory) { - _factory = factory; _iterator2 = iterator; } + /// + protected override object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default) + { + _keySerDes ??= _factory.BuildKeySerDes(); + _valueSerDes ??= _factory.BuildValueSerDes(); +#if NET7_0_OR_GREATER + if (UsePrefetch) + { + return _iterator != null ? new PrefetchableLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, _valueSerDes, isAsync, cancellationToken) + : new PrefetchableLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, _valueSerDes, isAsync, cancellationToken); + } +#endif + return _iterator != null ? new StandardLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, _valueSerDes) + : new StandardLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, _valueSerDes); + } + /// /// KNet implementation of /// @@ -160,17 +198,8 @@ public void Remove() /// is not considered with .NET 6 and .NET Framework public IEnumerator> ToIEnumerator(bool usePrefetch = true) { - _keySerDes ??= _factory.BuildKeySerDes(); - _valueSerDes ??= _factory.BuildValueSerDes(); -#if NET7_0_OR_GREATER - if (usePrefetch) - { - return _iterator != null ? new PrefetchableLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, _valueSerDes) - : new PrefetchableLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, _valueSerDes); - } -#endif - return _iterator != null ? new StandardLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, _valueSerDes) - : new StandardLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, _valueSerDes); + UsePrefetch = usePrefetch; + return GetEnumerator(false) as IEnumerator>; } /// /// KNet implementation of diff --git a/src/net/KNet/Specific/Streams/State/KNetTimestampedKeyValueIterator.cs b/src/net/KNet/Specific/Streams/State/KNetTimestampedKeyValueIterator.cs index 68356c1381..dff782d7e5 100644 --- a/src/net/KNet/Specific/Streams/State/KNetTimestampedKeyValueIterator.cs +++ b/src/net/KNet/Specific/Streams/State/KNetTimestampedKeyValueIterator.cs @@ -21,6 +21,8 @@ using MASES.KNet.Serialization; using System; using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MASES.KNet.Streams.State { @@ -29,26 +31,18 @@ namespace MASES.KNet.Streams.State /// /// The key type /// The value type - public class KNetTimestampedKeyValueIterator : IGenericSerDesFactoryApplier + public class KNetTimestampedKeyValueIterator : CommonIterator> { #if NET7_0_OR_GREATER class PrefetchableLocalEnumerator(bool isVersion2, IGenericSerDesFactory factory, IJavaObject obj, - IKNetSerDes keySerDes) - : JVMBridgeBasePrefetchableEnumerator>(obj, new PrefetchableEnumeratorSettings()), IGenericSerDesFactoryApplier + IKNetSerDes keySerDes, + bool isAsync, CancellationToken token = default) + : JVMBridgeBasePrefetchableEnumerator>(obj, new PrefetchableEnumeratorSettings()), + IGenericSerDesFactoryApplier, + IAsyncEnumerator> { - class PrefetchableEnumeratorSettings : IEnumerableExtension - { - public PrefetchableEnumeratorSettings() - { - UsePrefetch = true; - UseThread = true; - } - public bool UsePrefetch { get; set; } - public bool UseThread { get; set; } - public IConverterBridge ConverterBridge { get; set; } - } readonly bool _isVersion2 = isVersion2; IGenericSerDesFactory _factory = factory; IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } @@ -66,9 +60,26 @@ protected override object ConvertObject(object input) } throw new InvalidCastException($"input is not a valid IJavaObject"); } + protected override bool DoWorkCycle() + { + return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle(); + } + + public KNetTimestampedKeyValue Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } } #endif - class StandardLocalEnumerator : JVMBridgeBaseEnumerator>, IGenericSerDesFactoryApplier + class StandardLocalEnumerator : JVMBridgeBaseEnumerator>, IGenericSerDesFactoryApplier, IAsyncEnumerator> { IKNetSerDes _keySerDes = null; readonly bool _isVersion2; @@ -99,25 +110,51 @@ protected override object ConvertObject(object input) } throw new InvalidCastException($"input is not a valid IJavaObject"); } + + public KNetTimestampedKeyValue Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } } readonly Org.Apache.Kafka.Streams.State.KeyValueIterator> _iterator = null; readonly Org.Apache.Kafka.Streams.State.KeyValueIterator> _iterator2 = null; IKNetSerDes _keySerDes; - IGenericSerDesFactory _factory; - IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } internal KNetTimestampedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator> iterator) + : base(factory) { - _factory = factory; _iterator = iterator; } internal KNetTimestampedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator> iterator) + : base(factory) { - _factory = factory; _iterator2 = iterator; } + + /// + protected override object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default) + { + _keySerDes ??= _factory.BuildKeySerDes(); +#if NET7_0_OR_GREATER + if (UsePrefetch) + { + return _iterator != null ? new PrefetchableLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, isAsync, cancellationToken) + : new PrefetchableLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, isAsync, cancellationToken); + } +#endif + return _iterator != null ? new StandardLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes) + : new StandardLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes); + } /// /// KNet implementation of /// @@ -148,16 +185,8 @@ public void Remove() /// is not considered with .NET 6 and .NET Framework public IEnumerator> ToIEnumerator(bool usePrefetch = true) { - _keySerDes ??= _factory.BuildKeySerDes(); -#if NET7_0_OR_GREATER - if (usePrefetch) - { - return _iterator != null ? new PrefetchableLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes) - : new PrefetchableLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes); - } -#endif - return _iterator != null ? new StandardLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes) - : new StandardLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes); + UsePrefetch = usePrefetch; + return GetEnumerator(false) as IEnumerator>; } /// /// KNet implementation of diff --git a/src/net/KNet/Specific/Streams/State/KNetTimestampedWindowedKeyValueIterator.cs b/src/net/KNet/Specific/Streams/State/KNetTimestampedWindowedKeyValueIterator.cs index 10968acbcc..f3f93c9c03 100644 --- a/src/net/KNet/Specific/Streams/State/KNetTimestampedWindowedKeyValueIterator.cs +++ b/src/net/KNet/Specific/Streams/State/KNetTimestampedWindowedKeyValueIterator.cs @@ -22,6 +22,8 @@ using MASES.KNet.Streams.Kstream; using System; using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; namespace MASES.KNet.Streams.State { @@ -30,25 +32,16 @@ namespace MASES.KNet.Streams.State /// /// The key type /// The value type - public class KNetTimestampedWindowedKeyValueIterator : IGenericSerDesFactoryApplier + public class KNetTimestampedWindowedKeyValueIterator : CommonIterator> { #if NET7_0_OR_GREATER class PrefetchableLocalEnumerator(IGenericSerDesFactory factory, - IJavaObject obj) - : JVMBridgeBasePrefetchableEnumerator>(obj, new PrefetchableEnumeratorSettings()), IGenericSerDesFactoryApplier + IJavaObject obj, + bool isAsync, CancellationToken token = default) + : JVMBridgeBasePrefetchableEnumerator>(obj, new PrefetchableEnumeratorSettings()), + IGenericSerDesFactoryApplier, + IAsyncEnumerator> { - class PrefetchableEnumeratorSettings : IEnumerableExtension - { - public PrefetchableEnumeratorSettings() - { - UsePrefetch = true; - UseThread = true; - } - public bool UsePrefetch { get; set; } - public bool UseThread { get; set; } - public IConverterBridge ConverterBridge { get; set; } - } - IGenericSerDesFactory _factory = factory; IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } @@ -60,10 +53,28 @@ protected override object ConvertObject(object input) } throw new InvalidCastException($"input is not a valid IJavaObject"); } + + protected override bool DoWorkCycle() + { + return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle(); + } + + public KNetTimestampedWindowedKeyValue Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } } #endif - class StandardLocalEnumerator : JVMBridgeBaseEnumerator>, IGenericSerDesFactoryApplier + class StandardLocalEnumerator : JVMBridgeBaseEnumerator>, IGenericSerDesFactoryApplier, IAsyncEnumerator> { IGenericSerDesFactory _factory; IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } @@ -81,17 +92,40 @@ protected override object ConvertObject(object input) } throw new InvalidCastException($"input is not a valid IJavaObject"); } + + public KNetTimestampedWindowedKeyValue Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } } readonly Org.Apache.Kafka.Streams.State.KeyValueIterator, Org.Apache.Kafka.Streams.State.ValueAndTimestamp> _iterator; - IGenericSerDesFactory _factory; - IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } internal KNetTimestampedWindowedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator, Org.Apache.Kafka.Streams.State.ValueAndTimestamp> iterator) + :base(factory) { - _factory = factory; _iterator = iterator; } + + /// + protected override object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default) + { +#if NET7_0_OR_GREATER + if (UsePrefetch) + { + return new PrefetchableLocalEnumerator(_factory, _iterator.BridgeInstance, isAsync, cancellationToken); + } +#endif + return new StandardLocalEnumerator(_factory, _iterator.BridgeInstance); + } /// /// KNet implementation of /// @@ -118,13 +152,8 @@ public void Remove() /// is not considered with .NET 6 and .NET Framework public IEnumerator> ToIEnumerator(bool usePrefetch = true) { -#if NET7_0_OR_GREATER - if (usePrefetch) - { - return new PrefetchableLocalEnumerator(_factory, _iterator.BridgeInstance); - } -#endif - return new StandardLocalEnumerator(_factory, _iterator.BridgeInstance); + UsePrefetch = usePrefetch; + return GetEnumerator(false) as IEnumerator>; } /// /// KNet implementation of diff --git a/src/net/KNet/Specific/Streams/State/KNetWindowedKeyValueIterator.cs b/src/net/KNet/Specific/Streams/State/KNetWindowedKeyValueIterator.cs index aa033aaa31..c36163f909 100644 --- a/src/net/KNet/Specific/Streams/State/KNetWindowedKeyValueIterator.cs +++ b/src/net/KNet/Specific/Streams/State/KNetWindowedKeyValueIterator.cs @@ -22,6 +22,8 @@ using MASES.KNet.Streams.Kstream; using System.Collections.Generic; using System; +using System.Threading; +using System.Threading.Tasks; namespace MASES.KNet.Streams.State { @@ -30,26 +32,17 @@ namespace MASES.KNet.Streams.State /// /// The key type /// The value type - public class KNetWindowedKeyValueIterator : IGenericSerDesFactoryApplier + public class KNetWindowedKeyValueIterator : CommonIterator> { #if NET7_0_OR_GREATER class PrefetchableLocalEnumerator(IGenericSerDesFactory factory, IJavaObject obj, - IKNetSerDes valueSerDes) - : JVMBridgeBasePrefetchableEnumerator>(obj, new PrefetchableEnumeratorSettings()), IGenericSerDesFactoryApplier + IKNetSerDes valueSerDes, + bool isAsync, CancellationToken token = default) + : JVMBridgeBasePrefetchableEnumerator>(obj, new PrefetchableEnumeratorSettings()), + IGenericSerDesFactoryApplier, + IAsyncEnumerator> { - class PrefetchableEnumeratorSettings : IEnumerableExtension - { - public PrefetchableEnumeratorSettings() - { - UsePrefetch = true; - UseThread = true; - } - public bool UsePrefetch { get; set; } - public bool UseThread { get; set; } - public IConverterBridge ConverterBridge { get; set; } - } - IGenericSerDesFactory _factory = factory; IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } @@ -63,9 +56,27 @@ protected override object ConvertObject(object input) } throw new InvalidCastException($"input is not a valid IJavaObject"); } + + protected override bool DoWorkCycle() + { + return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle(); + } + + public KNetWindowedKeyValue Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } } #endif - class StandardLocalEnumerator : JVMBridgeBaseEnumerator>, IGenericSerDesFactoryApplier + class StandardLocalEnumerator : JVMBridgeBaseEnumerator>, IGenericSerDesFactoryApplier, IAsyncEnumerator> { IKNetSerDes _valueSerDes = null; IGenericSerDesFactory _factory; @@ -90,18 +101,42 @@ protected override object ConvertObject(object input) } throw new InvalidCastException($"input is not a valid IJavaObject"); } + + public KNetWindowedKeyValue Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } } readonly Org.Apache.Kafka.Streams.State.KeyValueIterator, byte[]> _iterator; IKNetSerDes _valueSerDes = null; - IGenericSerDesFactory _factory; - IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } internal KNetWindowedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator, byte[]> iterator) + :base(factory) { - _factory = factory; _iterator = iterator; } + + /// + protected override object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default) + { + _valueSerDes ??= _factory.BuildValueSerDes(); +#if NET7_0_OR_GREATER + if (UsePrefetch) + { + return new PrefetchableLocalEnumerator(_factory, _iterator.BridgeInstance, _valueSerDes, isAsync, cancellationToken); + } +#endif + return new StandardLocalEnumerator(_factory, _iterator.BridgeInstance, _valueSerDes); + } /// /// KNet implementation of /// @@ -132,14 +167,8 @@ public void Remove() /// is not considered with .NET 6 and .NET Framework public IEnumerator> ToIEnumerator(bool usePrefetch = true) { - _valueSerDes ??= _factory.BuildValueSerDes(); -#if NET7_0_OR_GREATER - if (usePrefetch) - { - return new PrefetchableLocalEnumerator(_factory, _iterator.BridgeInstance, _valueSerDes); - } -#endif - return new StandardLocalEnumerator(_factory, _iterator.BridgeInstance, _valueSerDes); + UsePrefetch = usePrefetch; + return GetEnumerator(false) as IEnumerator>; } /// /// KNet implementation of