diff --git a/src/net/KNet/Specific/Consumer/KNetConsumer.cs b/src/net/KNet/Specific/Consumer/KNetConsumer.cs index 3826857ff4..30c5ad8248 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumer.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumer.cs @@ -43,7 +43,7 @@ public interface IKNetConsumer : IConsumer /// bool IsCompleting { get; } /// - /// if the instance has an empty setnof items in async operation + /// if the instance has an empty set of items in async operation /// bool IsEmpty { get; } /// @@ -71,7 +71,8 @@ public interface IKNetConsumer : IConsumer /// KNet async extension for /// /// Timeout in milliseconds - void ConsumeAsync(long timeoutMs); + /// if something was enqued for Async operations + bool ConsumeAsync(long timeoutMs); /// /// KNet sync extension for /// @@ -86,12 +87,12 @@ public interface IKNetConsumer : IConsumer /// The value type public class KNetConsumer : KafkaConsumer, IKNetConsumer { - readonly bool autoCreateSerDes = false; - bool threadRunning = false; - long dequeing = 0; - readonly System.Threading.Thread consumeThread = null; - readonly ConcurrentQueue> consumedRecords = null; - readonly KNetConsumerCallback consumerCallback = null; + readonly bool _autoCreateSerDes = false; + bool _threadRunning = false; + long _dequeing = 0; + readonly System.Threading.Thread _consumeThread = null; + readonly ConcurrentQueue> _consumedRecords = null; + readonly KNetConsumerCallback _consumerCallback = null; readonly IKNetDeserializer _keyDeserializer; readonly IKNetDeserializer _valueDeserializer; /// @@ -106,19 +107,19 @@ public class KNetConsumer : KafkaConsumer, IKNetConsumer(), new KNetSerDes(), useJVMCallback) { - autoCreateSerDes = true; + _autoCreateSerDes = true; if (useJVMCallback) { - consumerCallback = new KNetConsumerCallback(CallbackMessage, _keyDeserializer, _valueDeserializer); - IExecute("setCallback", consumerCallback); + _consumerCallback = new KNetConsumerCallback(CallbackMessage, _keyDeserializer, _valueDeserializer); + IExecute("setCallback", _consumerCallback); } else { - consumedRecords = new(); - threadRunning = true; - consumeThread = new(ConsumeHandler); - consumeThread.Start(); + _consumedRecords = new(); + _threadRunning = true; + _consumeThread = new(ConsumeHandler); + _consumeThread.Start(); } } /// @@ -136,15 +137,15 @@ public KNetConsumer(Properties props, IKNetDeserializer keyDeserializer, IKNe if (useJVMCallback) { - consumerCallback = new KNetConsumerCallback(CallbackMessage, _keyDeserializer, _valueDeserializer); - IExecute("setCallback", consumerCallback); + _consumerCallback = new KNetConsumerCallback(CallbackMessage, _keyDeserializer, _valueDeserializer); + IExecute("setCallback", _consumerCallback); } else { - consumedRecords = new(); - threadRunning = true; - consumeThread = new(ConsumeHandler); - consumeThread.Start(); + _consumedRecords = new(); + _threadRunning = true; + _consumeThread = new(ConsumeHandler); + _consumeThread.Start(); } } @@ -194,22 +195,22 @@ void CallbackMessage(KNetConsumerRecord message) /// public override void Dispose() { - if (consumerCallback != null) + if (_consumerCallback != null) { IExecute("setCallback", null); - consumerCallback?.Dispose(); + _consumerCallback?.Dispose(); } - threadRunning = false; - if (consumedRecords != null) + _threadRunning = false; + if (_consumedRecords != null) { - lock (consumedRecords) + lock (_consumedRecords) { - System.Threading.Monitor.Pulse(consumedRecords); + System.Threading.Monitor.Pulse(_consumedRecords); } - if (IsCompleting) { consumeThread?.Join(); }; + if (IsCompleting) { _consumeThread?.Join(); }; actionCallback = null; } - if (autoCreateSerDes) + if (_autoCreateSerDes) { _keyDeserializer?.Dispose(); _valueDeserializer?.Dispose(); @@ -226,11 +227,11 @@ void ConsumeHandler(object o) { try { - while (threadRunning) + while (_threadRunning) { - if (consumedRecords.TryDequeue(out KNetConsumerRecords records)) + if (_consumedRecords.TryDequeue(out KNetConsumerRecords records)) { - System.Threading.Interlocked.Increment(ref dequeing); + System.Threading.Interlocked.Increment(ref _dequeing); try { foreach (var item in records) @@ -241,14 +242,14 @@ void ConsumeHandler(object o) catch { } finally { - System.Threading.Interlocked.Decrement(ref dequeing); + System.Threading.Interlocked.Decrement(ref _dequeing); } } - else if (threadRunning) + else if (_threadRunning) { - lock (consumedRecords) + lock (_consumedRecords) { - System.Threading.Monitor.Wait(consumedRecords); + System.Threading.Monitor.Wait(_consumedRecords); } } } @@ -256,25 +257,30 @@ void ConsumeHandler(object o) catch { } } /// - public bool IsCompleting => !consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref dequeing) != 0; + public bool IsCompleting => !_consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref _dequeing) != 0; /// - public bool IsEmpty => consumedRecords.IsEmpty; + public bool IsEmpty => _consumedRecords.IsEmpty; /// - public int WaitingMessages => consumedRecords.Count; + public int WaitingMessages => _consumedRecords.Count; /// - public void ConsumeAsync(long timeoutMs) + public bool ConsumeAsync(long timeoutMs) { Duration duration = TimeSpan.FromMilliseconds(timeoutMs); - if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true."); - if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running."); + if (_consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true."); + if (!_threadRunning) throw new InvalidOperationException("Dispatching thread is not running."); try { var results = this.Poll(duration); - consumedRecords.Enqueue(results); - lock (consumedRecords) + bool isEmpty = results.IsEmpty; + if (!isEmpty) { - System.Threading.Monitor.Pulse(consumedRecords); + _consumedRecords.Enqueue(results); + lock (_consumedRecords) + { + System.Threading.Monitor.Pulse(_consumedRecords); + } } + return !isEmpty; } finally { @@ -285,7 +291,7 @@ public void ConsumeAsync(long timeoutMs) public void Consume(long timeoutMs, Action> callback) { Duration duration = TimeSpan.FromMilliseconds(timeoutMs); - if (consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false."); + if (_consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false."); try { actionCallback = callback; diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs index 4ccff2baff..41ab037058 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs @@ -47,6 +47,8 @@ public KNetConsumerRecord(ConsumerRecord record, IKNetDeserializ } /// public string Topic => _record.Topic(); + /// + public int? LeaderEpoch { get { var epoch = _record.LeaderEpoch(); return epoch.IsEmpty() ? null : epoch.Get(); } } /// public int Partition => _record.Partition(); /// diff --git a/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs b/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs index 0ece4ee60c..d823a63897 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs @@ -45,6 +45,14 @@ public KNetConsumerRecords(ConsumerRecords records, IKNetDeseria _keyDeserializer = keyDeserializer; _valueDeserializer = valueDeserializer; } + /// + /// if the is empty + /// + public bool IsEmpty => _records.IsEmpty(); + /// + /// The number of elements in + /// + public int Count => _records.Count(); IEnumerator> IEnumerable>.GetEnumerator() { diff --git a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs index 829a2045a7..f554453269 100644 --- a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs +++ b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs @@ -164,6 +164,10 @@ public interface IKNetCompactedReplicator : IDictionary short ReplicationFactor { get; } /// + /// Get or set the poll timeout to be used for + /// + long ConsumePollTimeout { get; } + /// /// Get or set to use when topic is created for the first time /// TopicConfigBuilder TopicConfig { get; } @@ -241,6 +245,10 @@ public interface IKNetCompactedReplicator : IDictionary : IKNetCompactedReplicator where TValue : class { + const long InitialLagState = -2; + const long NotPresentLagState = -1; + const long InvalidLagState = -3; + #region Local storage data interface ILocalDataStorage @@ -453,7 +461,7 @@ public override void OnPartitionsLost(Java.Util.Collection _dictionary = new ConcurrentDictionary(); private KNetCompactedConsumerRebalanceListener[] _consumerListeners = null; private KNetConsumer[] _consumers = null; @@ -465,6 +473,7 @@ public override void OnPartitionsLost(Java.Util.Collection _OnConsumeSyncWaiter = null; private System.Collections.Generic.Dictionary> _consumerAssociatedPartition = new(); private ManualResetEvent[] _assignmentWaiters; + private bool[] _assignmentWaitersStatus; private long[] _lastPartitionLags = null; private IKNetSerDes _keySerDes = null; @@ -531,6 +541,9 @@ public Func, bool, KeyValuePair public short ReplicationFactor { get { return _replicationFactor; } set { CheckStarted(); _replicationFactor = value; } } + /// + public long ConsumePollTimeout { get { return _consumePollTimeout; } set { CheckStarted(); _consumePollTimeout = value; } } + /// public TopicConfigBuilder TopicConfig { get { return _topicConfig; } set { CheckStarted(); _topicConfig = value; } } @@ -629,6 +642,7 @@ private void OnTopicPartitionsAssigned(KNetCompactedConsumerRebalanceListener li } if (!_assignmentWaiters[partition].SafeWaitHandle.IsClosed) { + lock (_assignmentWaitersStatus) { _assignmentWaitersStatus[partition] = true; } _assignmentWaiters[partition].Set(); } } @@ -645,6 +659,7 @@ private void OnTopicPartitionsRevoked(KNetCompactedConsumerRebalanceListener lis } if (!_assignmentWaiters[partition].SafeWaitHandle.IsClosed) { + lock (_assignmentWaitersStatus) { _assignmentWaitersStatus[partition] = false; } _assignmentWaiters[partition].Reset(); } } @@ -661,6 +676,7 @@ private void OnTopicPartitionsLost(KNetCompactedConsumerRebalanceListener listen } if (!_assignmentWaiters[partition].SafeWaitHandle.IsClosed) { + lock (_assignmentWaitersStatus) { _assignmentWaitersStatus[partition] = false; } _assignmentWaiters[partition].Reset(); } } @@ -769,7 +785,10 @@ void BuildConsumers() .WithAllowAutoCreateTopics(false); ConsumerConfig.BootstrapServers = BootstrapServers; - ConsumerConfig.GroupId = GroupId; + if (!ConsumerConfig.ExistProperty(Org.Apache.Kafka.Clients.CommonClientConfigs.GROUP_ID_CONFIG)) + { + ConsumerConfig.GroupId = GroupId; + } if (ConsumerConfig.CanApplyBasicDeserializer() && KeySerDes == null) { KeySerDes = new KNetSerDes(); @@ -785,14 +804,16 @@ void BuildConsumers() if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external deserializer, set ValueSerDes."); _assignmentWaiters = new ManualResetEvent[Partitions]; + _assignmentWaitersStatus = new bool[Partitions]; _lastPartitionLags = new long[Partitions]; _consumers = new KNetConsumer[ConsumersToAllocate()]; _consumerListeners = new KNetCompactedConsumerRebalanceListener[ConsumersToAllocate()]; for (int i = 0; i < Partitions; i++) { - _lastPartitionLags[i] = -2; + _lastPartitionLags[i] = InitialLagState; _assignmentWaiters[i] = new ManualResetEvent(false); + _assignmentWaitersStatus[i] = false; } for (int i = 0; i < ConsumersToAllocate(); i++) @@ -847,38 +868,42 @@ void BuildProducer() void ConsumerPollHandler(object o) { + bool firstExecution = false; int index = (int)o; var topics = Collections.Singleton(StateName); try { _consumers[index].Subscribe(topics, _consumerListeners[index]); + _consumerPollThreadWaiter[index].Set(); while (_consumerPollRun) { try { - _consumers[index].ConsumeAsync(100); + _consumers[index].ConsumeAsync(ConsumePollTimeout); + if (!firstExecution) { _consumers[index].GroupMetadata(); firstExecution = true; } lock (_consumerAssociatedPartition) { foreach (var partitionIndex in _consumerAssociatedPartition[index]) { bool execute = false; if (_assignmentWaiters[partitionIndex].SafeWaitHandle.IsClosed) continue; - else execute = _assignmentWaiters[partitionIndex].WaitOne(0); + else execute = _assignmentWaitersStatus[partitionIndex]; if (execute) { try { var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex)); - Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : -1); + Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : NotPresentLagState); } catch (Java.Lang.IllegalStateException) { - Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], -1); + Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], InvalidLagState); } } } } } + catch (WakeupException) { return; } catch { } } } @@ -901,8 +926,8 @@ public void Start() if (ConsumerInstances > Partitions) throw new InvalidOperationException("ConsumerInstances cannot be high than Partitions"); - Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(BootstrapServers).ToProperties(); - _admin = KafkaAdminClient.Create(props); + using Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(BootstrapServers).ToProperties(); + using var admin = KafkaAdminClient.Create(props); if (AccessRights.HasFlag(AccessRightsType.Write)) { @@ -916,7 +941,7 @@ public void Start() topic = topic.Configs(TopicConfig); try { - _admin.CreateTopic(topic); + admin.CreateTopic(topic); } catch (TopicExistsException) { @@ -924,7 +949,7 @@ public void Start() // recover partitions of the topic try { - var result = _admin.DescribeTopics(topics); + var result = admin.DescribeTopics(topics); if (result != null) { var map = result.AllTopicNames().Get(); @@ -935,15 +960,10 @@ public void Start() } } } - catch - { - - } - finally - { - topics?.Dispose(); - } + catch { } + finally { topics?.Dispose(); } } + finally { topic?.Dispose(); } } if (AccessRights.HasFlag(AccessRightsType.Read)) @@ -960,11 +980,22 @@ public void Start() { _consumerPollRun = true; _consumerPollThreads = new Thread[ConsumersToAllocate()]; + _consumerPollThreadWaiter = new ManualResetEvent[ConsumersToAllocate()]; for (int i = 0; i < ConsumersToAllocate(); i++) { + _consumerPollThreadWaiter[i] = new ManualResetEvent(false); _consumerPollThreads[i] = new Thread(ConsumerPollHandler); _consumerPollThreads[i].Start(i); } + if (WaitHandle.WaitAll(_consumerPollThreadWaiter)) + { + for (int i = 0; i < _consumerPollThreadWaiter.Length; i++) + { + _consumerPollThreadWaiter[i].Dispose(); + _consumerPollThreadWaiter[i] = null; + } + _consumerPollThreadWaiter = null; + } } _started = true; @@ -984,7 +1015,17 @@ public bool WaitForStateAssignment(int timeout = Timeout.Infinite) { ValidateAccessRights(AccessRightsType.Read); ValidateStarted(); - return WaitHandle.WaitAll(_assignmentWaiters, timeout); + bool status = false; + do + { + if (WaitHandle.WaitAll(_assignmentWaiters, timeout)) + { + status = _assignmentWaitersStatus.All((o) => o == true); + } + else break; + } + while (!status); + return status; } /// @@ -1003,9 +1044,9 @@ public bool SyncWait(int timeout = Timeout.Infinite) foreach (var partitionIndex in _consumerAssociatedPartition[i]) { var partitionLag = Interlocked.Read(ref _lastPartitionLags[partitionIndex]); - lagInSync &= partitionLag == 0 || partitionLag == -1; + lagInSync &= partitionLag == 0; // || partitionLag == NotPresentLagState; } - syncs[i] = _consumers[i].IsEmpty && lagInSync; + syncs[i] = _consumers[i].IsEmpty && !_consumers[i].IsCompleting && lagInSync; } sync = syncs.All(x => x == true); } @@ -1297,6 +1338,21 @@ public void Dispose() } #endregion + + #region Object methods + /// + public override string ToString() + { + System.Text.StringBuilder stringBuilder = new(); + for (int i = 0; i < _lastPartitionLags.Length; i++) + { + stringBuilder.AppendFormat("P{0}: {1} ", i, Interlocked.Read(ref _lastPartitionLags[i])); + } + + return $"StateName: {StateName} - Current elements: {Count} - Consumers: {ConsumersToAllocate()} - Partitions: {Partitions} - Lags: {stringBuilder.ToString()}"; + } + + #endregion } #endregion }