Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iterator classes becomes IEnumerable and IAsyncEnumerable aware #366

Merged
merged 3 commits into from
Jan 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Iterator classes becomes IEnumerable and IAsyncEnumerable aware
masesdevelopers committed Jan 21, 2024
commit e5b1310fdf98dd3df7983372662ecb84299125a7
86 changes: 86 additions & 0 deletions src/net/KNet/Specific/Streams/State/CommonIterator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2024 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Serialization;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace MASES.KNet.Streams.State
{
class PrefetchableEnumeratorSettings : IEnumerableExtension
{
public PrefetchableEnumeratorSettings()
{
UsePrefetch = true;
UseThread = true;
}
public bool UsePrefetch { get; set; }
public bool UseThread { get; set; }
public IConverterBridge ConverterBridge { get; set; }
}

/// <summary>
/// A common class for all iterators
/// </summary>
/// <typeparam name="TIteratorType">The return <see cref="Type"/> of <see cref="IEnumerable{T}"/> and <see cref="IAsyncEnumerable{T}"/></typeparam>
public abstract class CommonIterator<TIteratorType> : IGenericSerDesFactoryApplier, IEnumerable<TIteratorType>, IAsyncEnumerable<TIteratorType>
{
/// <summary>
/// Initialize a new instance of <see cref="CommonIterator{TIteratorType}"/>
/// </summary>
/// <param name="factory">The <see cref="IGenericSerDesFactory"/> associated to this instance</param>
public CommonIterator(IGenericSerDesFactory factory)
{
_factory = factory;
}
/// <summary>
/// The <see cref="IGenericSerDesFactory"/> associated to this instance
/// </summary>
protected IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }
/// <summary>
/// Used to get or set the type of enumerator to retrieve, default is with prefetch if the platform accept it
/// </summary>
public bool UsePrefetch { get; set; } = true;
/// <inheritdoc/>
public IEnumerator<TIteratorType> GetEnumerator()
{
return GetEnumerator(false) as IEnumerator<TIteratorType>;
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
/// <inheritdoc/>
public IAsyncEnumerator<TIteratorType> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return GetEnumerator(true, cancellationToken) as IAsyncEnumerator<TIteratorType>;
}
/// <summary>
/// Internally gets the <see cref="IEnumerable{T}"/> or <see cref="IAsyncEnumerable{T}"/>
/// </summary>
/// <param name="isAsync">If requesting an <see cref="IAsyncEnumerator{T}"/></param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to be used in <see cref="IAsyncEnumerator{T}"/></param>
/// <returns>An <see cref="IEnumerable{T}"/> or <see cref="IAsyncEnumerable{T}"/></returns>
protected abstract object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default);
}
}
95 changes: 62 additions & 33 deletions src/net/KNet/Specific/Streams/State/KNetKeyValueIterator.cs
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
using MASES.KNet.Serialization;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MASES.KNet.Streams.State
{
@@ -29,28 +31,19 @@ namespace MASES.KNet.Streams.State
/// </summary>
/// <typeparam name="TKey">The key type</typeparam>
/// <typeparam name="TValue">The value type</typeparam>
public class KNetKeyValueIterator<TKey, TValue> : IGenericSerDesFactoryApplier
public sealed class KNetKeyValueIterator<TKey, TValue> : CommonIterator<KNetKeyValue<TKey, TValue>>
{
#if NET7_0_OR_GREATER
class PrefetchableLocalEnumerator(bool isVersion2,
IGenericSerDesFactory factory,
IJavaObject obj,
IKNetSerDes<TKey> keySerDes,
IKNetSerDes<TValue> valueSerDes)
: JVMBridgeBasePrefetchableEnumerator<KNetKeyValue<TKey, TValue>>(obj, new PrefetchableEnumeratorSettings()), IGenericSerDesFactoryApplier
IKNetSerDes<TValue> valueSerDes,
bool isAsync, CancellationToken token = default)
: JVMBridgeBasePrefetchableEnumerator<KNetKeyValue<TKey, TValue>>(obj, new PrefetchableEnumeratorSettings()),
IGenericSerDesFactoryApplier,
IAsyncEnumerator<KNetKeyValue<TKey, TValue>>
{
class PrefetchableEnumeratorSettings : IEnumerableExtension
{
public PrefetchableEnumeratorSettings()
{
UsePrefetch = true;
UseThread = true;
}
public bool UsePrefetch { get; set; }
public bool UseThread { get; set; }
public IConverterBridge ConverterBridge { get; set; }
}

readonly bool _isVersion2 = isVersion2;
IGenericSerDesFactory _factory = factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }
@@ -68,12 +61,30 @@ protected override object ConvertObject(object input)
}
throw new InvalidCastException($"input is not a valid IJavaObject");
}

protected override bool DoWorkCycle()
{
return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle();
}

public KNetKeyValue<TKey, TValue> Current => (this as IEnumerator<KNetKeyValue<TKey, TValue>>).Current;

public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(MoveNext());
}

public ValueTask DisposeAsync()
{
Dispose();
return new ValueTask();
}
}
#endif
class StandardLocalEnumerator : JVMBridgeBaseEnumerator<KNetKeyValue<TKey, TValue>>, IGenericSerDesFactoryApplier
class StandardLocalEnumerator : JVMBridgeBaseEnumerator<KNetKeyValue<TKey, TValue>>, IGenericSerDesFactoryApplier, IAsyncEnumerator<KNetKeyValue<TKey, TValue>>
{
IKNetSerDes<TKey> _keySerDes = null;
IKNetSerDes<TValue> _valueSerDes = null;
readonly IKNetSerDes<TKey> _keySerDes = null;
readonly IKNetSerDes<TValue> _valueSerDes = null;
readonly bool _isVersion2;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }
@@ -108,26 +119,53 @@ protected override object ConvertObject(object input)
}
throw new InvalidCastException($"input is not a valid IJavaObject");
}

public KNetKeyValue<TKey, TValue> Current => (this as IEnumerator<KNetKeyValue<TKey, TValue>>).Current;

public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(MoveNext());
}

public ValueTask DisposeAsync()
{
Dispose();
return new ValueTask();
}
}

readonly Org.Apache.Kafka.Streams.State.KeyValueIterator<byte[], byte[]> _iterator;
readonly Org.Apache.Kafka.Streams.State.KeyValueIterator<Java.Lang.Long, byte[]> _iterator2;
IKNetSerDes<TKey> _keySerDes = null;
IKNetSerDes<TValue> _valueSerDes = null;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

internal KNetKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator<byte[], byte[]> iterator)
: base(factory)
{
_factory = factory;
_iterator = iterator;
}

internal KNetKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator<Java.Lang.Long, byte[]> iterator)
: base(factory)
{
_factory = factory;
_iterator2 = iterator;
}
/// <inheritdoc/>
protected override object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default)
{
_keySerDes ??= _factory.BuildKeySerDes<TKey>();
_valueSerDes ??= _factory.BuildValueSerDes<TValue>();
#if NET7_0_OR_GREATER
if (UsePrefetch)
{
return _iterator != null ? new PrefetchableLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, _valueSerDes, isAsync, cancellationToken)
: new PrefetchableLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, _valueSerDes, isAsync, cancellationToken);
}
#endif
return _iterator != null ? new StandardLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, _valueSerDes)
: new StandardLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, _valueSerDes);
}

/// <summary>
/// KNet implementation of <see href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Iterator.html#hasNext()"/>
/// </summary>
@@ -160,17 +198,8 @@ public void Remove()
/// <remarks><paramref name="usePrefetch"/> is not considered with .NET 6 and .NET Framework</remarks>
public IEnumerator<KNetKeyValue<TKey, TValue>> ToIEnumerator(bool usePrefetch = true)
{
_keySerDes ??= _factory.BuildKeySerDes<TKey>();
_valueSerDes ??= _factory.BuildValueSerDes<TValue>();
#if NET7_0_OR_GREATER
if (usePrefetch)
{
return _iterator != null ? new PrefetchableLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, _valueSerDes)
: new PrefetchableLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, _valueSerDes);
}
#endif
return _iterator != null ? new StandardLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, _valueSerDes)
: new StandardLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, _valueSerDes);
UsePrefetch = usePrefetch;
return GetEnumerator(false) as IEnumerator<KNetKeyValue<TKey, TValue>>;
}
/// <summary>
/// KNet implementation of <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/state/KeyValueIterator.html#peekNextKey--"/>
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
using MASES.KNet.Serialization;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MASES.KNet.Streams.State
{
@@ -29,26 +31,18 @@ namespace MASES.KNet.Streams.State
/// </summary>
/// <typeparam name="TKey">The key type</typeparam>
/// <typeparam name="TValue">The value type</typeparam>
public class KNetTimestampedKeyValueIterator<TKey, TValue> : IGenericSerDesFactoryApplier
public class KNetTimestampedKeyValueIterator<TKey, TValue> : CommonIterator<KNetTimestampedKeyValue<TKey, TValue>>
{
#if NET7_0_OR_GREATER
class PrefetchableLocalEnumerator(bool isVersion2,
IGenericSerDesFactory factory,
IJavaObject obj,
IKNetSerDes<TKey> keySerDes)
: JVMBridgeBasePrefetchableEnumerator<KNetTimestampedKeyValue<TKey, TValue>>(obj, new PrefetchableEnumeratorSettings()), IGenericSerDesFactoryApplier
IKNetSerDes<TKey> keySerDes,
bool isAsync, CancellationToken token = default)
: JVMBridgeBasePrefetchableEnumerator<KNetTimestampedKeyValue<TKey, TValue>>(obj, new PrefetchableEnumeratorSettings()),
IGenericSerDesFactoryApplier,
IAsyncEnumerator<KNetTimestampedKeyValue<TKey, TValue>>
{
class PrefetchableEnumeratorSettings : IEnumerableExtension
{
public PrefetchableEnumeratorSettings()
{
UsePrefetch = true;
UseThread = true;
}
public bool UsePrefetch { get; set; }
public bool UseThread { get; set; }
public IConverterBridge ConverterBridge { get; set; }
}
readonly bool _isVersion2 = isVersion2;
IGenericSerDesFactory _factory = factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }
@@ -66,9 +60,26 @@ protected override object ConvertObject(object input)
}
throw new InvalidCastException($"input is not a valid IJavaObject");
}
protected override bool DoWorkCycle()
{
return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle();
}

public KNetTimestampedKeyValue<TKey, TValue> Current => (this as IEnumerator<KNetTimestampedKeyValue<TKey, TValue>>).Current;

public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(MoveNext());
}

public ValueTask DisposeAsync()
{
Dispose();
return new ValueTask();
}
}
#endif
class StandardLocalEnumerator : JVMBridgeBaseEnumerator<KNetTimestampedKeyValue<TKey, TValue>>, IGenericSerDesFactoryApplier
class StandardLocalEnumerator : JVMBridgeBaseEnumerator<KNetTimestampedKeyValue<TKey, TValue>>, IGenericSerDesFactoryApplier, IAsyncEnumerator<KNetTimestampedKeyValue<TKey, TValue>>
{
IKNetSerDes<TKey> _keySerDes = null;
readonly bool _isVersion2;
@@ -99,25 +110,51 @@ protected override object ConvertObject(object input)
}
throw new InvalidCastException($"input is not a valid IJavaObject");
}

public KNetTimestampedKeyValue<TKey, TValue> Current => (this as IEnumerator<KNetTimestampedKeyValue<TKey, TValue>>).Current;

public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(MoveNext());
}

public ValueTask DisposeAsync()
{
Dispose();
return new ValueTask();
}
}

readonly Org.Apache.Kafka.Streams.State.KeyValueIterator<byte[], Org.Apache.Kafka.Streams.State.ValueAndTimestamp<byte[]>> _iterator = null;
readonly Org.Apache.Kafka.Streams.State.KeyValueIterator<Java.Lang.Long, Org.Apache.Kafka.Streams.State.ValueAndTimestamp<byte[]>> _iterator2 = null;
IKNetSerDes<TKey> _keySerDes;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

internal KNetTimestampedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator<byte[], Org.Apache.Kafka.Streams.State.ValueAndTimestamp<byte[]>> iterator)
: base(factory)
{
_factory = factory;
_iterator = iterator;
}

internal KNetTimestampedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator<Java.Lang.Long, Org.Apache.Kafka.Streams.State.ValueAndTimestamp<byte[]>> iterator)
: base(factory)
{
_factory = factory;
_iterator2 = iterator;
}

/// <inheritdoc/>
protected override object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default)
{
_keySerDes ??= _factory.BuildKeySerDes<TKey>();
#if NET7_0_OR_GREATER
if (UsePrefetch)
{
return _iterator != null ? new PrefetchableLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes, isAsync, cancellationToken)
: new PrefetchableLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes, isAsync, cancellationToken);
}
#endif
return _iterator != null ? new StandardLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes)
: new StandardLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes);
}
/// <summary>
/// KNet implementation of <see href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Iterator.html#hasNext()"/>
/// </summary>
@@ -148,16 +185,8 @@ public void Remove()
/// <remarks><paramref name="usePrefetch"/> is not considered with .NET 6 and .NET Framework</remarks>
public IEnumerator<KNetTimestampedKeyValue<TKey, TValue>> ToIEnumerator(bool usePrefetch = true)
{
_keySerDes ??= _factory.BuildKeySerDes<TKey>();
#if NET7_0_OR_GREATER
if (usePrefetch)
{
return _iterator != null ? new PrefetchableLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes)
: new PrefetchableLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes);
}
#endif
return _iterator != null ? new StandardLocalEnumerator(false, _factory, _iterator.BridgeInstance, _keySerDes)
: new StandardLocalEnumerator(true, _factory, _iterator2.BridgeInstance, _keySerDes);
UsePrefetch = usePrefetch;
return GetEnumerator(false) as IEnumerator<KNetTimestampedKeyValue<TKey, TValue>>;
}
/// <summary>
/// KNet implementation of <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/state/KeyValueIterator.html#peekNextKey--"/>
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@
using MASES.KNet.Streams.Kstream;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MASES.KNet.Streams.State
{
@@ -30,25 +32,16 @@ namespace MASES.KNet.Streams.State
/// </summary>
/// <typeparam name="TKey">The key type</typeparam>
/// <typeparam name="TValue">The value type</typeparam>
public class KNetTimestampedWindowedKeyValueIterator<TKey, TValue> : IGenericSerDesFactoryApplier
public class KNetTimestampedWindowedKeyValueIterator<TKey, TValue> : CommonIterator<KNetTimestampedWindowedKeyValue<TKey, TValue>>
{
#if NET7_0_OR_GREATER
class PrefetchableLocalEnumerator(IGenericSerDesFactory factory,
IJavaObject obj)
: JVMBridgeBasePrefetchableEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>(obj, new PrefetchableEnumeratorSettings()), IGenericSerDesFactoryApplier
IJavaObject obj,
bool isAsync, CancellationToken token = default)
: JVMBridgeBasePrefetchableEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>(obj, new PrefetchableEnumeratorSettings()),
IGenericSerDesFactoryApplier,
IAsyncEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>
{
class PrefetchableEnumeratorSettings : IEnumerableExtension
{
public PrefetchableEnumeratorSettings()
{
UsePrefetch = true;
UseThread = true;
}
public bool UsePrefetch { get; set; }
public bool UseThread { get; set; }
public IConverterBridge ConverterBridge { get; set; }
}

IGenericSerDesFactory _factory = factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

@@ -60,10 +53,28 @@ protected override object ConvertObject(object input)
}
throw new InvalidCastException($"input is not a valid IJavaObject");
}

protected override bool DoWorkCycle()
{
return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle();
}

public KNetTimestampedWindowedKeyValue<TKey, TValue> Current => (this as IEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>).Current;

public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(MoveNext());
}

public ValueTask DisposeAsync()
{
Dispose();
return new ValueTask();
}
}
#endif

class StandardLocalEnumerator : JVMBridgeBaseEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>, IGenericSerDesFactoryApplier
class StandardLocalEnumerator : JVMBridgeBaseEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>, IGenericSerDesFactoryApplier, IAsyncEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>
{
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }
@@ -81,17 +92,40 @@ protected override object ConvertObject(object input)
}
throw new InvalidCastException($"input is not a valid IJavaObject");
}

public KNetTimestampedWindowedKeyValue<TKey, TValue> Current => (this as IEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>).Current;

public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(MoveNext());
}

public ValueTask DisposeAsync()
{
Dispose();
return new ValueTask();
}
}

readonly Org.Apache.Kafka.Streams.State.KeyValueIterator<Org.Apache.Kafka.Streams.Kstream.Windowed<byte[]>, Org.Apache.Kafka.Streams.State.ValueAndTimestamp<byte[]>> _iterator;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

internal KNetTimestampedWindowedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator<Org.Apache.Kafka.Streams.Kstream.Windowed<byte[]>, Org.Apache.Kafka.Streams.State.ValueAndTimestamp<byte[]>> iterator)
:base(factory)
{
_factory = factory;
_iterator = iterator;
}

/// <inheritdoc/>
protected override object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default)
{
#if NET7_0_OR_GREATER
if (UsePrefetch)
{
return new PrefetchableLocalEnumerator(_factory, _iterator.BridgeInstance, isAsync, cancellationToken);
}
#endif
return new StandardLocalEnumerator(_factory, _iterator.BridgeInstance);
}
/// <summary>
/// KNet implementation of <see href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Iterator.html#hasNext()"/>
/// </summary>
@@ -118,13 +152,8 @@ public void Remove()
/// <remarks><paramref name="usePrefetch"/> is not considered with .NET 6 and .NET Framework</remarks>
public IEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>> ToIEnumerator(bool usePrefetch = true)
{
#if NET7_0_OR_GREATER
if (usePrefetch)
{
return new PrefetchableLocalEnumerator(_factory, _iterator.BridgeInstance);
}
#endif
return new StandardLocalEnumerator(_factory, _iterator.BridgeInstance);
UsePrefetch = usePrefetch;
return GetEnumerator(false) as IEnumerator<KNetTimestampedWindowedKeyValue<TKey, TValue>>;
}
/// <summary>
/// KNet implementation of <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/state/KeyValueIterator.html#peekNextKey--"/>
83 changes: 56 additions & 27 deletions src/net/KNet/Specific/Streams/State/KNetWindowedKeyValueIterator.cs
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@
using MASES.KNet.Streams.Kstream;
using System.Collections.Generic;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MASES.KNet.Streams.State
{
@@ -30,26 +32,17 @@ namespace MASES.KNet.Streams.State
/// </summary>
/// <typeparam name="TKey">The key type</typeparam>
/// <typeparam name="TValue">The value type</typeparam>
public class KNetWindowedKeyValueIterator<TKey, TValue> : IGenericSerDesFactoryApplier
public class KNetWindowedKeyValueIterator<TKey, TValue> : CommonIterator<KNetWindowedKeyValue<TKey, TValue>>
{
#if NET7_0_OR_GREATER
class PrefetchableLocalEnumerator(IGenericSerDesFactory factory,
IJavaObject obj,
IKNetSerDes<TValue> valueSerDes)
: JVMBridgeBasePrefetchableEnumerator<KNetWindowedKeyValue<TKey, TValue>>(obj, new PrefetchableEnumeratorSettings()), IGenericSerDesFactoryApplier
IKNetSerDes<TValue> valueSerDes,
bool isAsync, CancellationToken token = default)
: JVMBridgeBasePrefetchableEnumerator<KNetWindowedKeyValue<TKey, TValue>>(obj, new PrefetchableEnumeratorSettings()),
IGenericSerDesFactoryApplier,
IAsyncEnumerator<KNetWindowedKeyValue<TKey, TValue>>
{
class PrefetchableEnumeratorSettings : IEnumerableExtension
{
public PrefetchableEnumeratorSettings()
{
UsePrefetch = true;
UseThread = true;
}
public bool UsePrefetch { get; set; }
public bool UseThread { get; set; }
public IConverterBridge ConverterBridge { get; set; }
}

IGenericSerDesFactory _factory = factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

@@ -63,9 +56,27 @@ protected override object ConvertObject(object input)
}
throw new InvalidCastException($"input is not a valid IJavaObject");
}

protected override bool DoWorkCycle()
{
return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle();
}

public KNetWindowedKeyValue<TKey, TValue> Current => (this as IEnumerator<KNetWindowedKeyValue<TKey, TValue>>).Current;

public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(MoveNext());
}

public ValueTask DisposeAsync()
{
Dispose();
return new ValueTask();
}
}
#endif
class StandardLocalEnumerator : JVMBridgeBaseEnumerator<KNetWindowedKeyValue<TKey, TValue>>, IGenericSerDesFactoryApplier
class StandardLocalEnumerator : JVMBridgeBaseEnumerator<KNetWindowedKeyValue<TKey, TValue>>, IGenericSerDesFactoryApplier, IAsyncEnumerator<KNetWindowedKeyValue<TKey, TValue>>
{
IKNetSerDes<TValue> _valueSerDes = null;
IGenericSerDesFactory _factory;
@@ -90,18 +101,42 @@ protected override object ConvertObject(object input)
}
throw new InvalidCastException($"input is not a valid IJavaObject");
}

public KNetWindowedKeyValue<TKey, TValue> Current => (this as IEnumerator<KNetWindowedKeyValue<TKey, TValue>>).Current;

public ValueTask<bool> MoveNextAsync()
{
return new ValueTask<bool>(MoveNext());
}

public ValueTask DisposeAsync()
{
Dispose();
return new ValueTask();
}
}

readonly Org.Apache.Kafka.Streams.State.KeyValueIterator<Org.Apache.Kafka.Streams.Kstream.Windowed<byte[]>, byte[]> _iterator;
IKNetSerDes<TValue> _valueSerDes = null;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

internal KNetWindowedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.KeyValueIterator<Org.Apache.Kafka.Streams.Kstream.Windowed<byte[]>, byte[]> iterator)
:base(factory)
{
_factory = factory;
_iterator = iterator;
}

/// <inheritdoc/>
protected override object GetEnumerator(bool isAsync, CancellationToken cancellationToken = default)
{
_valueSerDes ??= _factory.BuildValueSerDes<TValue>();
#if NET7_0_OR_GREATER
if (UsePrefetch)
{
return new PrefetchableLocalEnumerator(_factory, _iterator.BridgeInstance, _valueSerDes, isAsync, cancellationToken);
}
#endif
return new StandardLocalEnumerator(_factory, _iterator.BridgeInstance, _valueSerDes);
}
/// <summary>
/// KNet implementation of <see href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Iterator.html#hasNext()"/>
/// </summary>
@@ -132,14 +167,8 @@ public void Remove()
/// <remarks><paramref name="usePrefetch"/> is not considered with .NET 6 and .NET Framework</remarks>
public IEnumerator<KNetWindowedKeyValue<TKey, TValue>> ToIEnumerator(bool usePrefetch = true)
{
_valueSerDes ??= _factory.BuildValueSerDes<TValue>();
#if NET7_0_OR_GREATER
if (usePrefetch)
{
return new PrefetchableLocalEnumerator(_factory, _iterator.BridgeInstance, _valueSerDes);
}
#endif
return new StandardLocalEnumerator(_factory, _iterator.BridgeInstance, _valueSerDes);
UsePrefetch = usePrefetch;
return GetEnumerator(false) as IEnumerator<KNetWindowedKeyValue<TKey, TValue>>;
}
/// <summary>
/// KNet implementation of <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/state/KeyValueIterator.html#peekNextKey--"/>