diff --git a/.github/workflows/generateclasses.yaml b/.github/workflows/generateclasses.yaml index f0b5816865..b26ba9dcf9 100644 --- a/.github/workflows/generateclasses.yaml +++ b/.github/workflows/generateclasses.yaml @@ -83,12 +83,9 @@ jobs: with: branch-suffix: short-commit-hash add-paths: src/*.* - commit-message: | - Update generated classes after commit ${{ env.GITHUB_COMMIT_MESSAGE }} - - fix #92 + commit-message: Update generated classes after commit ${{ env.GITHUB_COMMIT_MESSAGE }} title: Changes by GitHub action - body: Automated changes by GitHub action + body: Automated changes by GitHub action, fix #92 reviewers: masesdevelopers assignees: masesdevelopers labels: java, .NET, enhancement diff --git a/src/net/KNet/Developed/Org/Apache/Kafka/Clients/Consumer/ConsumerRecord.cs b/src/net/KNet/Developed/Org/Apache/Kafka/Clients/Consumer/ConsumerRecord.cs index 4ce39fd610..6b4441a4cb 100644 --- a/src/net/KNet/Developed/Org/Apache/Kafka/Clients/Consumer/ConsumerRecord.cs +++ b/src/net/KNet/Developed/Org/Apache/Kafka/Clients/Consumer/ConsumerRecord.cs @@ -18,24 +18,27 @@ using MASES.JCOBridge.C2JBridge.JVMInterop; using Org.Apache.Kafka.Common.Record; +using System; namespace Org.Apache.Kafka.Clients.Consumer { public partial class ConsumerRecord { + DateTime? _dateTime = null; /// /// of /// - public System.DateTime DateTime => System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime; + public System.DateTime DateTime => _dateTime ??= System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime; // public TimestampType TimestampType => (TimestampType)System.Enum.Parse(typeof(TimestampType), IExecute("timestampType").Invoke("name")); // (TimestampType)(int)IExecute("timestampType").GetField("id"); } public partial class ConsumerRecord { + DateTime? _dateTime = null; /// /// of /// - public System.DateTime DateTime => System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime; + public System.DateTime DateTime => _dateTime ??= System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime; } } diff --git a/src/net/KNet/Specific/Consumer/KNetConsumer.cs b/src/net/KNet/Specific/Consumer/KNetConsumer.cs index 52aaa27f76..39d0611dfb 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumer.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumer.cs @@ -18,26 +18,31 @@ using Java.Time; using Java.Util; -using MASES.JCOBridge.C2JBridge; using System; using System.Collections.Concurrent; -using Org.Apache.Kafka.Clients.Consumer; using MASES.KNet.Serialization; -using Org.Apache.Kafka.Common.Header; -using Org.Apache.Kafka.Common.Record; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; namespace MASES.KNet.Consumer { /// - /// KNet extension of + /// KNet extension of /// /// The key type /// The value type - public interface IKNetConsumer : IConsumer + public interface IKNetConsumer : Org.Apache.Kafka.Clients.Consumer.IConsumer { +#if NET7_0_OR_GREATER + /// + /// if enumeration will use prefetch and the number of records is more than , i.e. the preparation of happens in an external thread + /// + /// It is by default if one of or are not , override the value using + bool IsPrefecth { get; } + /// + /// The minimum threshold to activate pretech, i.e. the preparation of happens in external thread if contains more than elements + /// + /// The default value is 10, however it shall be chosen by the developer and in the decision shall be verified if external thread activation costs more than inline execution + int PrefetchThreshold { get; } +#endif /// /// if the instance is completing async operation /// @@ -50,6 +55,15 @@ public interface IKNetConsumer : IConsumer /// Number of messages in the instance waiting to be processed in async operation /// int WaitingMessages { get; } +#if NET7_0_OR_GREATER + /// + /// Set to to enable enumeration with prefetch over threshold, i.e. preparation of in external thread + /// + /// to enable prefetch. See + /// The minimum threshold to activate pretech, default is 10. See + /// Setting to a value less, or equal, to 0 and to , the prefetch is always actived + void ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10); +#endif /// /// Sets the to use to receive /// @@ -81,11 +95,11 @@ public interface IKNetConsumer : IConsumer void Consume(long timeoutMs, Action> callback); } /// - /// KNet extension of + /// KNet extension of /// /// The key type /// The value type - public class KNetConsumer : KafkaConsumer, IKNetConsumer + public class KNetConsumer : Org.Apache.Kafka.Clients.Consumer.KafkaConsumer, IKNetConsumer { readonly bool _autoCreateSerDes = false; bool _threadRunning = false; @@ -93,53 +107,32 @@ public class KNetConsumer : KafkaConsumer, IKNetConsumer> _consumedRecords = null; readonly KNetConsumerCallback _consumerCallback = null; - readonly IKNetDeserializer _keyDeserializer; - readonly IKNetDeserializer _valueDeserializer; + readonly IKNetSerDes _keyDeserializer; + readonly IKNetSerDes _valueDeserializer; /// /// /// public override string BridgeClassName => "org.mases.knet.clients.consumer.KNetConsumer"; - /// - /// Initialize a new instance of - /// - /// The properties to use, see and - /// to active callback based mode - public KNetConsumer(Properties props, bool useJVMCallback = false) - : this(props, new KNetSerDes(), new KNetSerDes(), useJVMCallback) - { - _autoCreateSerDes = true; - if (useJVMCallback) - { - _consumerCallback = new KNetConsumerCallback(CallbackMessage, _keyDeserializer, _valueDeserializer); - IExecute("setCallback", _consumerCallback); - } - else - { - _consumedRecords = new(); - _threadRunning = true; - _consumeThread = new(ConsumeHandler); - _consumeThread.Start(); - } - } + internal KNetConsumer(Properties props) : base(props) { } + /// /// Initialize a new instance of /// /// An instance of /// to active callback based mode public KNetConsumer(ConsumerConfigBuilder configBuilder, bool useJVMCallback = false) - : this(configBuilder, configBuilder.BuildKeySerDes(), configBuilder.BuildValueSerDes()) + : this(configBuilder, configBuilder.BuildKeySerDes(), configBuilder.BuildValueSerDes(), useJVMCallback) { - _autoCreateSerDes = true; } /// /// Initialize a new instance of /// - /// The properties to use, see and + /// The properties to use, see /// Key serializer base on /// Value serializer base on /// to active callback based mode - public KNetConsumer(Properties props, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer, bool useJVMCallback = false) + public KNetConsumer(ConsumerConfigBuilder props, IKNetSerDes keyDeserializer, IKNetSerDes valueDeserializer, bool useJVMCallback = false) : base(CheckProperties(props), keyDeserializer.KafkaDeserializer, valueDeserializer.KafkaDeserializer) { _keyDeserializer = keyDeserializer; @@ -161,17 +154,17 @@ public KNetConsumer(Properties props, IKNetDeserializer keyDeserializer, IKNe static Properties CheckProperties(Properties props) { - if (!props.ContainsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + if (!props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { - props.Put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); } - else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}, remove from configuration."); + else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}, remove from configuration."); - if (!props.ContainsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) + if (!props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - props.Put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); } - else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}, remove from configuration."); + else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}, remove from configuration."); return props; } @@ -186,13 +179,13 @@ static Properties CheckProperties(Properties props) /// public new KNetConsumerRecords Poll(long timeoutMs) { - var records = IExecute>("poll", timeoutMs); + var records = IExecute>("poll", timeoutMs); return new KNetConsumerRecords(records, _keyDeserializer, _valueDeserializer); } /// public new KNetConsumerRecords Poll(Duration timeout) { - var records = IExecute>("poll", timeout); + var records = IExecute>("poll", timeout); return new KNetConsumerRecords(records, _keyDeserializer, _valueDeserializer); } @@ -227,6 +220,14 @@ public override void Dispose() } base.Dispose(); } +#if NET7_0_OR_GREATER + /// + public void ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10) + { + IsPrefecth = enablePrefetch; + PrefetchThreshold = IsPrefecth ? prefetchThreshold : 10; + } +#endif /// public void SetCallback(Action> cb) { @@ -266,6 +267,12 @@ void ConsumeHandler(object o) } catch { } } +#if NET7_0_OR_GREATER + /// + public bool IsPrefecth { get; private set; } = !(typeof(K).IsValueType && typeof(V).IsValueType); + /// + public int PrefetchThreshold { get; private set; } = 10; +#endif /// public bool IsCompleting => !_consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref _dequeing) != 0; /// @@ -284,7 +291,11 @@ public bool ConsumeAsync(long timeoutMs) bool isEmpty = results.IsEmpty; if (!isEmpty) { +#if NET7_0_OR_GREATER + _consumedRecords.Enqueue(results.ApplyPrefetch(IsPrefecth, PrefetchThreshold)); +#else _consumedRecords.Enqueue(results); +#endif lock (_consumedRecords) { System.Threading.Monitor.Pulse(_consumedRecords); diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs b/src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs index 19f1b71e6e..b14e7b0162 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs @@ -53,7 +53,7 @@ public KNetConsumerCallback(Action> recordReady, IKNetD void OnRecordReadyEventHandler(object sender, CLRListenerEventArgs data) { var record = this.BridgeInstance.Invoke>("getRecord"); - recordReadyFunction(new KNetConsumerRecord(record, _keyDeserializer, _valueDeserializer)); + recordReadyFunction(new KNetConsumerRecord(record, _keyDeserializer, _valueDeserializer, false)); } public virtual void RecordReady(KNetConsumerRecord message) { } diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs index 015853f196..d9fd7c1b86 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs @@ -16,15 +16,12 @@ * Refer to LICENSE for more information. */ -using Org.Apache.Kafka.Clients.Consumer; using MASES.KNet.Serialization; -using Org.Apache.Kafka.Common.Header; -using Org.Apache.Kafka.Common.Record; namespace MASES.KNet.Consumer { /// - /// KNet extension of + /// KNet extension of /// /// The key type /// The value type @@ -32,43 +29,58 @@ public class KNetConsumerRecord { readonly IKNetDeserializer _keyDeserializer; readonly IKNetDeserializer _valueDeserializer; - readonly ConsumerRecord _record; + readonly Org.Apache.Kafka.Clients.Consumer.ConsumerRecord _record; /// /// Initialize a new /// - /// The to use for initialization + /// The to use for initialization /// Key serializer base on /// Value serializer base on - internal KNetConsumerRecord(ConsumerRecord record, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) + /// True if the initialization comes from the prefetch iterator + internal KNetConsumerRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord record, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer, bool fromPrefetched) { _record = record; _keyDeserializer = keyDeserializer; _valueDeserializer = valueDeserializer; + if (fromPrefetched) + { + // the following lines will read and prepares Key, Value, Topic, Headers + _ = Key; + _ = Value; + } } - /// - public string Topic => _record.Topic(); - /// + string _topic = null; + /// + public string Topic { get { _topic ??= _record.Topic(); return _topic; } } + /// public int? LeaderEpoch { get { var epoch = _record.LeaderEpoch(); return epoch.IsEmpty() ? null : epoch.Get(); } } - /// - public int Partition => _record.Partition(); - /// - public Headers Headers => _record.Headers(); - /// - public long Offset => _record.Offset(); - /// + int? _partition = null; + /// + public int Partition => _partition ??= _record.Partition(); + Org.Apache.Kafka.Common.Header.Headers _headers = null; + /// + public Org.Apache.Kafka.Common.Header.Headers Headers => _headers ??= _record.Headers(); + long? _offset = null; + /// + public long Offset => _offset ??= _record.Offset(); + /// public System.DateTime DateTime => _record.DateTime; - /// - public long Timestamp => _record.Timestamp(); - /// - public TimestampType TimestampType => _record.TimestampType(); - /// - public int SerializedKeySize => _record.SerializedKeySize(); - /// - public int SerializedValueSize => _record.SerializedValueSize(); + long? _timestamp = null; + /// + public long Timestamp => _timestamp ??= _record.Timestamp(); + Org.Apache.Kafka.Common.Record.TimestampType _timestampType = null; + /// + public Org.Apache.Kafka.Common.Record.TimestampType TimestampType => _timestampType ??= _record.TimestampType(); + int? _serializedKeySize = null; + /// + public int SerializedKeySize => _serializedKeySize ??= _record.SerializedKeySize(); + int? _serializedValueSize = null; + /// + public int SerializedValueSize => _serializedValueSize ??= _record.SerializedValueSize(); bool _localKeyDes = false; K _localKey = default; - /// + /// public K Key { get @@ -84,7 +96,7 @@ public K Key bool _localValueDes = false; V _localValue = default; - /// + /// public V Value { get diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs index cb8adff55f..94ba081893 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs @@ -16,35 +16,47 @@ * Refer to LICENSE for more information. */ -using Org.Apache.Kafka.Clients.Consumer; using MASES.KNet.Serialization; using System.Collections.Generic; using System.Threading; +using System; namespace MASES.KNet.Consumer { /// - /// KNet extension of + /// KNet extension of /// /// The key type /// The value type public class KNetConsumerRecords : IEnumerable>, IAsyncEnumerable> { - readonly IKNetDeserializer _keyDeserializer; - readonly IKNetDeserializer _valueDeserializer; - readonly ConsumerRecords _records; + readonly IKNetSerDes _keyDeserializer; + readonly IKNetSerDes _valueDeserializer; + readonly Org.Apache.Kafka.Clients.Consumer.ConsumerRecords _records; /// /// Initialize a new /// - /// The to use for initialization + /// The to use for initialization /// Key serializer base on /// Value serializer base on - public KNetConsumerRecords(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) + internal KNetConsumerRecords(Org.Apache.Kafka.Clients.Consumer.ConsumerRecords records, IKNetSerDes keyDeserializer, IKNetSerDes valueDeserializer) { _records = records; _keyDeserializer = keyDeserializer; _valueDeserializer = valueDeserializer; } +#if NET7_0_OR_GREATER + /// + /// if enumeration will use prefetch and the number of records is more than , i.e. the preparation of happens in an external thread + /// + /// It is by default if one of or are not , override the value using + public bool IsPrefecth { get; private set; } = !(typeof(K).IsValueType && typeof(V).IsValueType); + /// + /// The minimum threshold to activate pretech, i.e. the preparation of happens in external thread if contains more than elements + /// + /// The default value is 10, however it shall be chosen by the developer and in the decision shall be verified if external thread activation costs more than inline execution + public int PrefetchThreshold { get; private set; } = 10; +#endif /// /// if the is empty /// @@ -53,20 +65,55 @@ public KNetConsumerRecords(ConsumerRecords records, IKNetDeseria /// The number of elements in /// public int Count => _records.Count(); +#if NET7_0_OR_GREATER + /// + /// Set to to enable enumeration with prefetch over threshold, i.e. preparation of in external thread + /// + /// to enable prefetch. See + /// The minimum threshold to activate pretech, default is 10. See + /// This instance with and set + /// Setting to a value less, or equal, to 0 and to , the prefetch is always actived + public KNetConsumerRecords ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10) + { + IsPrefecth = enablePrefetch; + PrefetchThreshold = IsPrefecth ? prefetchThreshold : -1; + return this; + } + bool UsePrefetch() + { + return IsPrefecth && + (PrefetchThreshold <= 0 || _records.Count() > PrefetchThreshold); + } +#endif IEnumerator> IEnumerable>.GetEnumerator() { - return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer); +#if NET7_0_OR_GREATER + if (UsePrefetch()) + return new KNetConsumerRecordsPrefetchableEnumerator(_records.Iterator(), _keyDeserializer, _valueDeserializer, false); + else +#endif + return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { - return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer); +#if NET7_0_OR_GREATER + if (UsePrefetch()) + return new KNetConsumerRecordsPrefetchableEnumerator(_records.Iterator(), _keyDeserializer, _valueDeserializer, false); + else +#endif + return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer); } IAsyncEnumerator> IAsyncEnumerable>.GetAsyncEnumerator(CancellationToken cancellationToken) { - return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer, cancellationToken); +#if NET7_0_OR_GREATER + if (UsePrefetch()) + return new KNetConsumerRecordsPrefetchableEnumerator(_records.Iterator(), _keyDeserializer, _valueDeserializer, true, cancellationToken); + else +#endif + return new KNetConsumerRecordsEnumerator(_records, _keyDeserializer, _valueDeserializer, cancellationToken); } } } diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecordsEnumerator.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecordsEnumerator.cs index 13e9937c1d..eced3a887b 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumerRecordsEnumerator.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecordsEnumerator.cs @@ -16,7 +16,6 @@ * Refer to LICENSE for more information. */ -using Org.Apache.Kafka.Clients.Consumer; using MASES.KNet.Serialization; using System.Collections.Generic; using System.Threading; @@ -29,11 +28,11 @@ class KNetConsumerRecordsEnumerator : IEnumerator readonly IKNetDeserializer _keyDeserializer; readonly IKNetDeserializer _valueDeserializer; readonly CancellationToken _cancellationToken; - readonly ConsumerRecords _records; - IEnumerator> _recordEnumerator; - IAsyncEnumerator> _recordAsyncEnumerator; + readonly Org.Apache.Kafka.Clients.Consumer.ConsumerRecords _records; + IEnumerator> _recordEnumerator; + IAsyncEnumerator> _recordAsyncEnumerator; - public KNetConsumerRecordsEnumerator(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) + public KNetConsumerRecordsEnumerator(Org.Apache.Kafka.Clients.Consumer.ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer) { _records = records; _recordEnumerator = _records.GetEnumerator(); @@ -41,7 +40,7 @@ public KNetConsumerRecordsEnumerator(ConsumerRecords records, IK _valueDeserializer = valueDeserializer; } - public KNetConsumerRecordsEnumerator(ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer, CancellationToken cancellationToken) + public KNetConsumerRecordsEnumerator(Org.Apache.Kafka.Clients.Consumer.ConsumerRecords records, IKNetDeserializer keyDeserializer, IKNetDeserializer valueDeserializer, CancellationToken cancellationToken) { _records = records; _recordAsyncEnumerator = _records.GetAsyncEnumerator(cancellationToken); @@ -50,9 +49,9 @@ public KNetConsumerRecordsEnumerator(ConsumerRecords records, IK _cancellationToken = cancellationToken; } - KNetConsumerRecord IAsyncEnumerator>.Current => new KNetConsumerRecord(_recordAsyncEnumerator.Current, _keyDeserializer, _valueDeserializer); + KNetConsumerRecord IAsyncEnumerator>.Current => new KNetConsumerRecord(_recordAsyncEnumerator.Current, _keyDeserializer, _valueDeserializer, false); - KNetConsumerRecord IEnumerator>.Current => new KNetConsumerRecord(_recordEnumerator.Current, _keyDeserializer, _valueDeserializer); + KNetConsumerRecord IEnumerator>.Current => new KNetConsumerRecord(_recordEnumerator.Current, _keyDeserializer, _valueDeserializer, false); object System.Collections.IEnumerator.Current => (_recordEnumerator as System.Collections.IEnumerator)?.Current; diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecordsPrefetchableEnumerator.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecordsPrefetchableEnumerator.cs new file mode 100644 index 0000000000..43830cddb8 --- /dev/null +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecordsPrefetchableEnumerator.cs @@ -0,0 +1,67 @@ +/* +* Copyright 2024 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.KNet.Serialization; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using MASES.JCOBridge.C2JBridge.JVMInterop; +using MASES.JCOBridge.C2JBridge; +using System; + +namespace MASES.KNet.Consumer +{ +#if NET7_0_OR_GREATER + sealed class KNetConsumerRecordsPrefetchableEnumerator(Java.Util.Iterator> records, + IKNetSerDes keySerDes, + IKNetSerDes valueSerDes, + bool isAsync, CancellationToken token = default) + : JVMBridgeBasePrefetchableEnumerator>(records.BridgeInstance, new PrefetchableEnumeratorSettings()), + IAsyncEnumerator> + { + Java.Util.Iterator> _records = records; // used to do not lost reference + + protected override object ConvertObject(object input) + { + if (input is IJavaObject obj) + { + return new KNetConsumerRecord(JVMBridgeBase.Wraps>(obj), keySerDes, valueSerDes, true); + } + throw new InvalidCastException($"input is not a valid IJavaObject"); + } + + protected override bool DoWorkCycle() + { + return isAsync ? !token.IsCancellationRequested : base.DoWorkCycle(); + } + + public KNetConsumerRecord Current => (this as IEnumerator>).Current; + + public ValueTask MoveNextAsync() + { + return new ValueTask(MoveNext()); + } + + public ValueTask DisposeAsync() + { + Dispose(); + return new ValueTask(); + } + } +#endif +} diff --git a/src/net/KNet/Specific/GenericConfigBuilder.cs b/src/net/KNet/Specific/GenericConfigBuilder.cs index 53e9777af1..839ee38e7c 100644 --- a/src/net/KNet/Specific/GenericConfigBuilder.cs +++ b/src/net/KNet/Specific/GenericConfigBuilder.cs @@ -22,9 +22,7 @@ using MASES.KNet.Serialization; using System.Linq; using System.Collections.Concurrent; -using static Javax.Lang.Model.Util.Elements; using MASES.JCOBridge.C2JBridge; -using MASES.JCOBridge.C2JBridge.JVMInterop; namespace MASES.KNet { @@ -32,7 +30,7 @@ namespace MASES.KNet /// Generic base configuration class /// /// - public class GenericConfigBuilder : System.ComponentModel.INotifyPropertyChanged, IGenericSerDesFactory + public class GenericConfigBuilder : System.ComponentModel.INotifyPropertyChanged, IGenericSerDesFactory, IDisposable where T : GenericConfigBuilder, new() { /// @@ -86,11 +84,6 @@ public TData GetProperty(string propertyName) { if (_options.TryGetValue(propertyName, out var result)) { - //if (typeof(IJVMBridgeBase).IsAssignableFrom(typeof(TData))) - //{ - // return JVMBridgeBase.Wraps(result as IJavaObject); - //} - return result.Convert(); } return default; @@ -256,5 +249,29 @@ public IKNetSerDes BuildValueSerDes() return serDes as IKNetSerDes; } } + /// + public void Clear() + { + foreach (IDisposable item in _keySerDes.Values.Cast()) + { + item?.Dispose(); + } + + _keySerDes.Clear(); + + foreach (IDisposable item in _valueSerDes.Values.Cast()) + { + item?.Dispose(); + } + + _valueSerDes.Clear(); + } + + /// + public void Dispose() + { + GC.SuppressFinalize(this); + Clear(); + } } } diff --git a/src/net/KNet/Specific/PrefetchableEnumeratorSettings.cs b/src/net/KNet/Specific/PrefetchableEnumeratorSettings.cs new file mode 100644 index 0000000000..b619f79eac --- /dev/null +++ b/src/net/KNet/Specific/PrefetchableEnumeratorSettings.cs @@ -0,0 +1,34 @@ +/* +* Copyright 2024 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; + +namespace MASES.KNet +{ + class PrefetchableEnumeratorSettings : IEnumerableExtension + { + public PrefetchableEnumeratorSettings() + { + UsePrefetch = true; + UseThread = true; + } + public bool UsePrefetch { get; set; } + public bool UseThread { get; set; } + public IConverterBridge ConverterBridge { get; set; } + } +} diff --git a/src/net/KNet/Specific/Producer/KNetProducer.cs b/src/net/KNet/Specific/Producer/KNetProducer.cs index b702fc85d5..7b8203e5c0 100644 --- a/src/net/KNet/Specific/Producer/KNetProducer.cs +++ b/src/net/KNet/Specific/Producer/KNetProducer.cs @@ -166,17 +166,11 @@ public class KNetProducer : KafkaProducer, IKNetProducer "org.mases.knet.clients.producer.KNetProducer"; readonly bool _autoCreateSerDes = false; - readonly IKNetSerializer _keySerializer; - readonly IKNetSerializer _valueSerializer; - /// - /// Initialize a new instance of - /// - /// The properties to use, see and - public KNetProducer(Properties props) - : this(props, new KNetSerDes(), new KNetSerDes()) - { - _autoCreateSerDes = true; - } + readonly IKNetSerDes _keySerializer; + readonly IKNetSerDes _valueSerializer; + + internal KNetProducer(Properties props) : base(props) { } + /// /// Initialize a new instance of /// @@ -184,15 +178,14 @@ public KNetProducer(Properties props) public KNetProducer(ProducerConfigBuilder configBuilder) : this(configBuilder, configBuilder.BuildKeySerDes(), configBuilder.BuildValueSerDes()) { - _autoCreateSerDes = true; } /// /// Initialize a new instance of /// - /// The properties to use, see and + /// The properties to use, see /// Key serializer base on /// Value serializer base on - public KNetProducer(Properties props, IKNetSerializer keySerializer, IKNetSerializer valueSerializer) + public KNetProducer(ProducerConfigBuilder props, IKNetSerDes keySerializer, IKNetSerDes valueSerializer) : base(CheckProperties(props), keySerializer.KafkaSerializer, valueSerializer.KafkaSerializer) { _keySerializer = keySerializer; diff --git a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs index c47ddb7691..1a715d11a0 100644 --- a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs +++ b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs @@ -16,8 +16,6 @@ * Refer to LICENSE for more information. */ -using Java.Time; -using Java.Util; using MASES.JCOBridge.C2JBridge; using MASES.KNet.Admin; using MASES.KNet.Common; @@ -25,11 +23,6 @@ using MASES.KNet.Extensions; using MASES.KNet.Producer; using MASES.KNet.Serialization; -using Org.Apache.Kafka.Clients.Admin; -using Org.Apache.Kafka.Clients.Consumer; -using Org.Apache.Kafka.Clients.Producer; -using Org.Apache.Kafka.Common; -using Org.Apache.Kafka.Common.Errors; using System; using System.Collections; using System.Collections.Concurrent; @@ -205,6 +198,18 @@ public interface IKNetCompactedReplicator : IDictionary to use in , by default it creates a default one based on /// IKNetSerDes ValueSerDes { get; } +#if NET7_0_OR_GREATER + /// + /// if enumeration will use prefetch and the number of records is more than , i.e. the preparation of happens in an external thread + /// + /// It is by default if one of or are not , override the value using + bool IsPrefecth { get; } + /// + /// The minimum threshold to activate pretech, i.e. the preparation of happens in external thread if contains more than elements + /// + /// The default value is 10, however it shall be chosen by the developer and in the decision shall be verified if external thread activation costs more than inline execution + int PrefetchThreshold { get; } +#endif /// /// if the instance was started /// @@ -226,6 +231,15 @@ public interface IKNetCompactedReplicator : IDictionary + /// Set to to enable enumeration with prefetch over threshold, i.e. preparation of in external thread + /// + /// to enable prefetch. See + /// The minimum threshold to activate pretech, default is 10. See + /// Setting to a value less, or equal, to 0 and to , the prefetch is always actived + void ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10); +#endif /// /// Start this : create the topic if not available, allocates Producer and Consumer, sets serializer/deserializer /// @@ -429,9 +443,9 @@ public void CopyTo(KeyValuePair[] array, int arrayIndex) [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] static void OnDemandRetrieve(IKNetConsumer consumer, string topic, TKey key, ILocalDataStorage data) { - var topicPartition = new TopicPartition(topic, data.Partition); - var topics = Collections.SingletonList(topicPartition); - Duration duration = TimeSpan.FromMinutes(1); + var topicPartition = new Org.Apache.Kafka.Common.TopicPartition(topic, data.Partition); + var topics = Java.Util.Collections.SingletonList(topicPartition); + Java.Time.Duration duration = TimeSpan.FromMinutes(1); try { consumer.Assign(topics); @@ -460,7 +474,7 @@ static void OnDemandRetrieve(IKNetConsumer consumer, string topic, #region KNetCompactedConsumerRebalanceListener - class KNetCompactedConsumerRebalanceListener : ConsumerRebalanceListener + class KNetCompactedConsumerRebalanceListener : Org.Apache.Kafka.Clients.Consumer.ConsumerRebalanceListener { int _consumerIndex; public KNetCompactedConsumerRebalanceListener(int consumerIndex) @@ -660,7 +674,13 @@ public Type KNetValueSerDes /// public IKNetSerDes ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } } +#if NET7_0_OR_GREATER + /// + public bool IsPrefecth { get; private set; } = !(typeof(TKey).IsValueType && typeof(TValue).IsValueType); + /// + public int PrefetchThreshold { get; private set; } = 10; +#endif /// public bool IsStarted => _started; @@ -772,7 +792,7 @@ private void OnMessage(KNetConsumerRecord record) } } - private void OnTopicPartitionsAssigned(KNetCompactedConsumerRebalanceListener listener, Collection topicPartitions) + private void OnTopicPartitionsAssigned(KNetCompactedConsumerRebalanceListener listener, Java.Util.Collection topicPartitions) { foreach (var topicPartition in topicPartitions) { @@ -789,7 +809,7 @@ private void OnTopicPartitionsAssigned(KNetCompactedConsumerRebalanceListener li } } - private void OnTopicPartitionsRevoked(KNetCompactedConsumerRebalanceListener listener, Collection topicPartitions) + private void OnTopicPartitionsRevoked(KNetCompactedConsumerRebalanceListener listener, Java.Util.Collection topicPartitions) { foreach (var topicPartition in topicPartitions) { @@ -806,7 +826,7 @@ private void OnTopicPartitionsRevoked(KNetCompactedConsumerRebalanceListener lis } } - private void OnTopicPartitionsLost(KNetCompactedConsumerRebalanceListener listener, Collection topicPartitions) + private void OnTopicPartitionsLost(KNetCompactedConsumerRebalanceListener listener, Java.Util.Collection topicPartitions) { foreach (var topicPartition in topicPartitions) { @@ -834,12 +854,12 @@ private void AddOrUpdate(TKey key, TValue value) if (UpdateModeOnDelivery) { - RecordMetadata metadata = null; + Org.Apache.Kafka.Clients.Producer.RecordMetadata metadata = null; JVMBridgeException exception = null; DateTime pTimestamp = DateTime.MaxValue; using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false)) { - using (Callback cb = new Callback() + using (Org.Apache.Kafka.Clients.Producer.Callback cb = new Org.Apache.Kafka.Clients.Producer.Callback() { OnOnCompletion = (record, error) => { @@ -904,7 +924,7 @@ private void AddOrUpdate(TKey key, TValue value) } else if (UpdateModeOnConsume || UpdateModeOnConsumeSync) { - _producer.Produce(StateName, key, value, (Callback)null); + _producer.Produce(StateName, key, value, (Org.Apache.Kafka.Clients.Producer.Callback)null); if (UpdateModeOnConsumeSync) { _OnConsumeSyncWaiter = new Tuple(key, new ManualResetEvent(false)); @@ -969,6 +989,9 @@ void BuildConsumers() { _consumerAssociatedPartition.Add(i, new System.Collections.Generic.List()); _consumers[i] = (KNetKeySerDes != null || KNetValueSerDes != null) ? new KNetConsumer(ConsumerConfig) : new KNetConsumer(ConsumerConfig, KeySerDes, ValueSerDes); +#if NET7_0_OR_GREATER + _consumers[i].ApplyPrefetch(IsPrefecth, PrefetchThreshold); +#endif _consumers[i].SetCallback(OnMessage); _consumerListeners[i] = new KNetCompactedConsumerRebalanceListener(i) { @@ -1018,7 +1041,7 @@ void ConsumerPollHandler(object o) { bool firstExecution = false; int index = (int)o; - var topics = Collections.Singleton(StateName); + var topics = Java.Util.Collections.Singleton(StateName); try { _consumers[index].Subscribe(topics, _consumerListeners[index]); @@ -1040,7 +1063,7 @@ void ConsumerPollHandler(object o) { try { - var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex)); + var lag = _consumers[index].CurrentLag(new Org.Apache.Kafka.Common.TopicPartition(StateName, partitionIndex)); Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : NotPresentLagState); } catch (Java.Lang.IllegalStateException) @@ -1051,7 +1074,7 @@ void ConsumerPollHandler(object o) } } } - catch (WakeupException) { return; } + catch (Org.Apache.Kafka.Common.Errors.WakeupException) { return; } catch { } } } @@ -1074,9 +1097,17 @@ bool CheckConsumerSyncState(int index) return _consumers[index].IsEmpty && !_consumers[index].IsCompleting && lagInSync; } - #endregion +#endregion #region Public methods +#if NET7_0_OR_GREATER + /// + public void ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10) + { + IsPrefecth = enablePrefetch; + PrefetchThreshold = IsPrefecth ? prefetchThreshold : 10; + } +#endif /// public void Start() { @@ -1088,12 +1119,12 @@ public void Start() if (ConsumerInstances > Partitions) throw new InvalidOperationException("ConsumerInstances cannot be high than Partitions"); - using Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(BootstrapServers).ToProperties(); - using var admin = KafkaAdminClient.Create(props); + using Java.Util.Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(BootstrapServers).ToProperties(); + using var admin = Org.Apache.Kafka.Clients.Admin.KafkaAdminClient.Create(props); if (AccessRights.HasFlag(AccessRightsType.Write)) { - var topic = new NewTopic(StateName, Partitions, ReplicationFactor); + var topic = new Org.Apache.Kafka.Clients.Admin.NewTopic(StateName, Partitions, ReplicationFactor); _topicConfig ??= TopicConfigBuilder.Create().WithDeleteRetentionMs(100) .WithMinCleanableDirtyRatio(0.01) .WithSegmentMs(100) @@ -1105,9 +1136,9 @@ public void Start() { admin.CreateTopic(topic); } - catch (TopicExistsException) + catch (Org.Apache.Kafka.Common.Errors.TopicExistsException) { - var topics = Collections.Singleton(StateName); + var topics = Java.Util.Collections.Singleton(StateName); // recover partitions of the topic try { @@ -1554,5 +1585,5 @@ public override string ToString() #endregion } - #endregion +#endregion } diff --git a/src/net/KNet/Specific/Serialization/IGenericSerDesFactory.cs b/src/net/KNet/Specific/Serialization/IGenericSerDesFactory.cs index 5c2fc87ed7..7c9cdf432c 100644 --- a/src/net/KNet/Specific/Serialization/IGenericSerDesFactory.cs +++ b/src/net/KNet/Specific/Serialization/IGenericSerDesFactory.cs @@ -47,5 +47,9 @@ public interface IGenericSerDesFactory /// An instance of /// If is public IKNetSerDes BuildValueSerDes(); + /// + /// Clear the current factory + /// + public void Clear(); } } diff --git a/src/net/KNet/Specific/Streams/KNetKeyValue.cs b/src/net/KNet/Specific/Streams/KNetKeyValue.cs index ccc5a39901..c2f87e2254 100644 --- a/src/net/KNet/Specific/Streams/KNetKeyValue.cs +++ b/src/net/KNet/Specific/Streams/KNetKeyValue.cs @@ -42,13 +42,13 @@ internal KNetKeyValue(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.KeyValue value, IKNetSerDes keySerDes, IKNetSerDes valueSerDes, - bool fromAsync) + bool fromPrefetched) { _factory = factory; _valueInner1 = value; _keySerDes = keySerDes; _valueSerDes = valueSerDes; - if (fromAsync) + if (fromPrefetched) { _keySerDes ??= _factory.BuildKeySerDes(); _key = _keySerDes.Deserialize(null, _valueInner1.key); @@ -63,13 +63,13 @@ internal KNetKeyValue(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.KeyValue value, IKNetSerDes keySerDes, IKNetSerDes valueSerDes, - bool fromAsync) + bool fromPrefetched) { _factory = factory; _valueInner2 = value; _keySerDes = keySerDes; _valueSerDes = valueSerDes; - if (fromAsync) + if (fromPrefetched) { _keySerDes ??= _factory.BuildKeySerDes(); _key = (TKey)(object)_valueInner2.key.LongValue(); diff --git a/src/net/KNet/Specific/Streams/KNetTimestampedKeyValue.cs b/src/net/KNet/Specific/Streams/KNetTimestampedKeyValue.cs index 609741ddde..6e29fbc2a8 100644 --- a/src/net/KNet/Specific/Streams/KNetTimestampedKeyValue.cs +++ b/src/net/KNet/Specific/Streams/KNetTimestampedKeyValue.cs @@ -40,12 +40,12 @@ public sealed class KNetTimestampedKeyValue : IGenericSerDesFactor internal KNetTimestampedKeyValue(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.KeyValue> value, IKNetSerDes keySerDes, - bool fromAsync) + bool fromPrefetched) { _factory = factory; _valueInner1 = value; _keySerDes = keySerDes; - if (fromAsync) + if (fromPrefetched) { _keySerDes ??= _factory.BuildKeySerDes(); _key = _keySerDes.Deserialize(null, _valueInner1.key); @@ -56,12 +56,12 @@ internal KNetTimestampedKeyValue(IGenericSerDesFactory factory, internal KNetTimestampedKeyValue(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.KeyValue> value, IKNetSerDes keySerDes, - bool fromAsync) + bool fromPrefetched) { _factory = factory; _valueInner2 = value; _keySerDes = keySerDes; - if (fromAsync) + if (fromPrefetched) { _keySerDes ??= _factory.BuildKeySerDes(); _key = (TKey)(object)_valueInner2.key.LongValue(); diff --git a/src/net/KNet/Specific/Streams/KNetWindowedKeyValue.cs b/src/net/KNet/Specific/Streams/KNetWindowedKeyValue.cs index 59106ae4f3..abe408330c 100644 --- a/src/net/KNet/Specific/Streams/KNetWindowedKeyValue.cs +++ b/src/net/KNet/Specific/Streams/KNetWindowedKeyValue.cs @@ -39,12 +39,12 @@ public sealed class KNetWindowedKeyValue : IGenericSerDesFactoryAp internal KNetWindowedKeyValue(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.KeyValue, byte[]> value, IKNetSerDes valueSerDes, - bool fromAsync) + bool fromPrefetched) { _factory = factory; _valueInner = value; _valueSerDes = valueSerDes; - if (fromAsync) + if (fromPrefetched) { _valueSerDes ??= _factory.BuildValueSerDes(); _value = _valueSerDes.Deserialize(null, _valueInner.value); diff --git a/src/net/KNet/Specific/Streams/Processor/KNetTimestampExtractor.cs b/src/net/KNet/Specific/Streams/Processor/KNetTimestampExtractor.cs index 9b008bfb82..8c119982f2 100644 --- a/src/net/KNet/Specific/Streams/Processor/KNetTimestampExtractor.cs +++ b/src/net/KNet/Specific/Streams/Processor/KNetTimestampExtractor.cs @@ -46,7 +46,7 @@ public sealed override long Extract(Org.Apache.Kafka.Clients.Consumer.ConsumerRe _valueSerializer ??= _factory.BuildValueSerDes(); var record = arg0.Cast>(); // KNet consider the data within Apache Kafka Streams defined always as byte[] var methodToExecute = (OnExtract != null) ? OnExtract : Extract; - return methodToExecute(new KNetConsumerRecord(record, _keySerializer, _valueSerializer), arg1); + return methodToExecute(new KNetConsumerRecord(record, _keySerializer, _valueSerializer, false), arg1); } /// /// KNet implementation of diff --git a/src/net/KNet/Specific/Streams/State/CommonIterator.cs b/src/net/KNet/Specific/Streams/State/CommonIterator.cs index 61f044c149..418653fb3f 100644 --- a/src/net/KNet/Specific/Streams/State/CommonIterator.cs +++ b/src/net/KNet/Specific/Streams/State/CommonIterator.cs @@ -25,18 +25,6 @@ namespace MASES.KNet.Streams.State { - class PrefetchableEnumeratorSettings : IEnumerableExtension - { - public PrefetchableEnumeratorSettings() - { - UsePrefetch = true; - UseThread = true; - } - public bool UsePrefetch { get; set; } - public bool UseThread { get; set; } - public IConverterBridge ConverterBridge { get; set; } - } - /// /// A common class for all iterators /// diff --git a/tests/KNetBenchmark/ProgramKNet.cs b/tests/KNetBenchmark/ProgramKNet.cs index 04c1ce9594..2625c3b28e 100644 --- a/tests/KNetBenchmark/ProgramKNet.cs +++ b/tests/KNetBenchmark/ProgramKNet.cs @@ -42,20 +42,19 @@ static IKNetProducer KNetProducer() { if (knetProducer == null || !SharedObjects) { - Properties props = ProducerConfigBuilder.Create() - .WithBootstrapServers(Server) - .WithAcks(Acks ? ProducerConfigBuilder.AcksTypes.One : ProducerConfigBuilder.AcksTypes.None) - .WithRetries(MessageSendMaxRetries) - .WithLingerMs(LingerMs) - .WithBatchSize(BatchSize) - .WithMaxInFlightRequestPerConnection(MaxInFlight) - .WithEnableIdempotence(false) - .WithSendBuffer(SocketSendBufferBytes) - .WithReceiveBuffer(SocketReceiveBufferBytes) - .WithBufferMemory(128 * 1024 * 1024) - .WithKeySerializerClass("org.apache.kafka.common.serialization.LongSerializer") - .WithValueSerializerClass("org.apache.kafka.common.serialization.ByteArraySerializer") - .ToProperties(); + ProducerConfigBuilder props = ProducerConfigBuilder.Create() + .WithBootstrapServers(Server) + .WithAcks(Acks ? ProducerConfigBuilder.AcksTypes.One : ProducerConfigBuilder.AcksTypes.None) + .WithRetries(MessageSendMaxRetries) + .WithLingerMs(LingerMs) + .WithBatchSize(BatchSize) + .WithMaxInFlightRequestPerConnection(MaxInFlight) + .WithEnableIdempotence(false) + .WithSendBuffer(SocketSendBufferBytes) + .WithReceiveBuffer(SocketReceiveBufferBytes) + .WithBufferMemory(128 * 1024 * 1024) + .WithKeySerializerClass("org.apache.kafka.common.serialization.LongSerializer") + .WithValueSerializerClass("org.apache.kafka.common.serialization.ByteArraySerializer"); knetKeySerializer = new KNetSerDes() { @@ -178,18 +177,17 @@ static IKNetConsumer KNetConsumer() { if (knetConsumer == null || !SharedObjects) { - Properties props = ConsumerConfigBuilder.Create() - .WithBootstrapServers(Server) - .WithGroupId(Guid.NewGuid().ToString()) - .WithEnableAutoCommit(!AlwaysCommit) - .WithAutoCommitIntervalMs(1000) - .WithSendBuffer(SocketSendBufferBytes) - .WithReceiveBuffer(SocketReceiveBufferBytes) - .WithFetchMinBytes(FetchMinBytes) - .WithKeyDeserializerClass("org.apache.kafka.common.serialization.LongDeserializer") - .WithValueDeserializerClass("org.apache.kafka.common.serialization.ByteArrayDeserializer") - .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST) - .ToProperties(); + ConsumerConfigBuilder props = ConsumerConfigBuilder.Create() + .WithBootstrapServers(Server) + .WithGroupId(Guid.NewGuid().ToString()) + .WithEnableAutoCommit(!AlwaysCommit) + .WithAutoCommitIntervalMs(1000) + .WithSendBuffer(SocketSendBufferBytes) + .WithReceiveBuffer(SocketReceiveBufferBytes) + .WithFetchMinBytes(FetchMinBytes) + .WithKeyDeserializerClass("org.apache.kafka.common.serialization.LongDeserializer") + .WithValueDeserializerClass("org.apache.kafka.common.serialization.ByteArrayDeserializer") + .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST); if (UseSerdes) { knetKeyDeserializer = new KNetSerDes() diff --git a/tests/KNetTest/Program.cs b/tests/KNetTest/Program.cs index 593be9d33b..d1f1412a67 100644 --- a/tests/KNetTest/Program.cs +++ b/tests/KNetTest/Program.cs @@ -30,6 +30,7 @@ using MASES.KNet.Producer; using MASES.KNet.Consumer; using MASES.KNet.Common; +using System.Diagnostics; namespace MASES.KNetTest { @@ -97,7 +98,8 @@ static void Main(string[] args) Console.CancelKeyPress += Console_CancelKeyPress; Console.WriteLine("Press Ctrl-C to exit"); - resetEvent.WaitOne(); + resetEvent.WaitOne(TimeSpan.FromSeconds(10)); + resetEvent.Set(); Thread.Sleep(2000); // wait the threads exit } @@ -172,12 +174,11 @@ static void ProduceSomething() props.Put(ProducerConfig.LINGER_MS_CONFIG, 1); ******/ - Properties props = ProducerConfigBuilder.Create() - .WithBootstrapServers(serverToUse) - .WithAcks(ProducerConfigBuilder.AcksTypes.All) - .WithRetries(0) - .WithLingerMs(1) - .ToProperties(); + ProducerConfigBuilder props = ProducerConfigBuilder.Create() + .WithBootstrapServers(serverToUse) + .WithAcks(ProducerConfigBuilder.AcksTypes.All) + .WithRetries(0) + .WithLingerMs(1); KNetSerDes keySerializer = new KNetSerDes(); JsonSerDes.Value valueSerializer = new JsonSerDes.Value(); @@ -240,12 +241,11 @@ static void ConsumeSomething() props.Put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); *******/ - Properties props = ConsumerConfigBuilder.Create() - .WithBootstrapServers(serverToUse) - .WithGroupId("test") - .WithEnableAutoCommit(true) - .WithAutoCommitIntervalMs(1000) - .ToProperties(); + ConsumerConfigBuilder props = ConsumerConfigBuilder.Create() + .WithBootstrapServers(serverToUse) + .WithGroupId("test") + .WithEnableAutoCommit(true) + .WithAutoCommitIntervalMs(1000); KNetSerDes keyDeserializer = new KNetSerDes(); KNetSerDes valueDeserializer = new JsonSerDes.Value(); @@ -266,6 +266,10 @@ static void ConsumeSomething() } }; } + const bool withPrefetch = true; + long elements = 0; + Stopwatch watcherTotal = new Stopwatch(); + Stopwatch watcher = new Stopwatch(); var topics = Collections.Singleton(topicToUse); try { @@ -277,9 +281,19 @@ static void ConsumeSomething() while (!resetEvent.WaitOne(0)) { var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds); + watcherTotal.Start(); +#if NET7_0_OR_GREATER + foreach (var item in records.ApplyPrefetch(withPrefetch, prefetchThreshold: 0)) +#else foreach (var item in records) +#endif { - Console.WriteLine($"Consuming from Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}"); + elements++; + watcher.Start(); + var str = $"Consuming from Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}"; + watcher.Stop(); + watcherTotal.Stop(); + Console.WriteLine(str); } } } @@ -289,6 +303,7 @@ static void ConsumeSomething() keyDeserializer?.Dispose(); valueDeserializer?.Dispose(); topics?.Dispose(); + if (elements != 0) Console.WriteLine($"Total mean time is {TimeSpan.FromTicks(watcherTotal.ElapsedTicks / elements)}, write mean time is {TimeSpan.FromTicks(watcher.ElapsedTicks / elements)}"); } } catch (Java.Util.Concurrent.ExecutionException ex)