From 5125d0d54754f6ccce6de4c206c75c8d0f5b2e80 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Mon, 9 Oct 2023 21:19:09 +0200 Subject: [PATCH] Added some new fix for Java.Lang.NullPointerException --- src/documentation/articles/usage.md | 4 +- .../KNet/Specific/Consumer/KNetConsumer.cs | 15 ++- .../Extensions/KafkaAdminClientExtensions.cs | 7 +- .../Replicator/KNetCompactedReplicator.cs | 101 +++++++++++------- tests/KNetBenchmark/ProgramKNet.cs | 12 ++- tests/KNetBenchmark/ProgramKafka.cs | 14 ++- tests/KNetClassicTest/Program.cs | 17 ++- tests/KNetTest/Program.cs | 6 +- 8 files changed, 120 insertions(+), 56 deletions(-) diff --git a/src/documentation/articles/usage.md b/src/documentation/articles/usage.md index a6f5cb2e89..0fa02b3ebf 100644 --- a/src/documentation/articles/usage.md +++ b/src/documentation/articles/usage.md @@ -263,7 +263,8 @@ namespace MASES.KNetTemplate.KNetConsumer using (var consumer = new KafkaConsumer(props)) { - consumer.Subscribe(Collections.singleton(topicToUse)); + var topics = Collections.Singleton(topicToUse); + consumer.Subscribe(topics); while (!resetEvent.WaitOne(0)) { var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds); @@ -272,6 +273,7 @@ namespace MASES.KNetTemplate.KNetConsumer Console.WriteLine($"Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}"); } } + topics?.Dispose(); // needed to avoid Java.Lang.NullPointerException in some conditions where .NET GC retires topics too early } } diff --git a/src/net/KNet/Specific/Consumer/KNetConsumer.cs b/src/net/KNet/Specific/Consumer/KNetConsumer.cs index dae85e7262..3826857ff4 100644 --- a/src/net/KNet/Specific/Consumer/KNetConsumer.cs +++ b/src/net/KNet/Specific/Consumer/KNetConsumer.cs @@ -267,11 +267,18 @@ public void ConsumeAsync(long timeoutMs) Duration duration = TimeSpan.FromMilliseconds(timeoutMs); if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true."); if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running."); - var results = this.Poll(duration); - consumedRecords.Enqueue(results); - lock (consumedRecords) + try { - System.Threading.Monitor.Pulse(consumedRecords); + var results = this.Poll(duration); + consumedRecords.Enqueue(results); + lock (consumedRecords) + { + System.Threading.Monitor.Pulse(consumedRecords); + } + } + finally + { + duration?.Dispose(); } } /// diff --git a/src/net/KNet/Specific/Extensions/KafkaAdminClientExtensions.cs b/src/net/KNet/Specific/Extensions/KafkaAdminClientExtensions.cs index 3fd859db73..297d2593de 100644 --- a/src/net/KNet/Specific/Extensions/KafkaAdminClientExtensions.cs +++ b/src/net/KNet/Specific/Extensions/KafkaAdminClientExtensions.cs @@ -292,7 +292,12 @@ public static async Task DescribeTopicsAsync(this IAdmin a /// public static async Task DescribeTopicAsync(this IAdmin admin, string topicName) { - return await Execute(admin.DescribeTopics, Collections.Singleton(topicName)); + var topics = Collections.Singleton(topicName); + try + { + return await Execute(admin.DescribeTopics, topics); + } + finally { topics?.Dispose(); } } /// /// Async version of diff --git a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs index b2ffbc4b23..ccecd4b4cd 100644 --- a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs +++ b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs @@ -16,6 +16,7 @@ * Refer to LICENSE for more information. */ +using Java.Time; using Java.Util; using MASES.JCOBridge.C2JBridge; using MASES.KNet.Admin; @@ -175,13 +176,13 @@ public interface IKNetCompactedReplicator : IDictionary ProducerConfigBuilder ProducerConfig { get; } /// - /// Get or set to use in , by default it creates a default one based on + /// Get or set an instance of to use in , by default it creates a default one based on /// - KNetSerDes KeySerDes { get; } + IKNetSerDes KeySerDes { get; } /// - /// Get or set to use in , by default it creates a default one based on + /// Get or set an instance of to use in , by default it creates a default one based on /// - KNetSerDes ValueSerDes { get; } + IKNetSerDes ValueSerDes { get; } /// /// if the instance was started /// @@ -383,17 +384,28 @@ public void CopyTo(KeyValuePair[] array, int arrayIndex) static void OnDemandRetrieve(IKNetConsumer consumer, string topic, TKey key, ILocalDataStorage data) { var topicPartition = new TopicPartition(topic, data.Partition); - consumer.Assign(Collections.SingletonList(topicPartition)); - consumer.Seek(topicPartition, data.Offset); - var results = consumer.Poll(TimeSpan.FromMinutes(1)); - if (results == null) throw new InvalidOperationException("Failed to get records from remote."); - foreach (var result in results) + var topics = Collections.SingletonList(topicPartition); + Duration duration = TimeSpan.FromMinutes(1); + try { - if (!Equals(result.Key, key)) continue; - if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}"); - data.HasValue = true; - data.Value = result.Value; - break; + consumer.Assign(topics); + consumer.Seek(topicPartition, data.Offset); + var results = consumer.Poll(duration); + if (results == null) throw new InvalidOperationException("Failed to get records from remote."); + foreach (var result in results) + { + if (!Equals(result.Key, key)) continue; + if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}"); + data.HasValue = true; + data.Value = result.Value; + break; + } + } + finally + { + topicPartition?.Dispose(); + topics?.Dispose(); + duration?.Dispose(); } } } @@ -464,8 +476,8 @@ public override void OnPartitionsLost(Java.Util.Collection _keySerDes = null; - private KNetSerDes _valueSerDes = null; + private IKNetSerDes _keySerDes = null; + private IKNetSerDes _valueSerDes = null; private bool _started = false; @@ -529,10 +541,10 @@ public Func, bool, KeyValuePair - public KNetSerDes KeySerDes { get { return _keySerDes; } set { CheckStarted(); _keySerDes = value; } } + public IKNetSerDes KeySerDes { get { return _keySerDes; } set { CheckStarted(); _keySerDes = value; } } /// - public KNetSerDes ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } } + public IKNetSerDes ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } } /// public bool IsStarted => _started; @@ -820,35 +832,45 @@ void BuildProducer() void ConsumerPollHandler(object o) { int index = (int)o; - _consumers[index].Subscribe(Collections.Singleton(StateName), _consumerListeners[index]); - while (_consumerPollRun) + var topics = Collections.Singleton(StateName); + try { - try + _consumers[index].Subscribe(topics, _consumerListeners[index]); + while (_consumerPollRun) { - _consumers[index].ConsumeAsync(100); - lock (_consumerAssociatedPartition) + try { - foreach (var partitionIndex in _consumerAssociatedPartition[index]) + _consumers[index].ConsumeAsync(100); + lock (_consumerAssociatedPartition) { - bool execute = false; - if (_assignmentWaiters[partitionIndex].SafeWaitHandle.IsClosed) continue; - else execute = _assignmentWaiters[partitionIndex].WaitOne(0); - if (execute) + foreach (var partitionIndex in _consumerAssociatedPartition[index]) { - try - { - var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex)); - Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : -1); - } - catch (Java.Lang.IllegalStateException) + bool execute = false; + if (_assignmentWaiters[partitionIndex].SafeWaitHandle.IsClosed) continue; + else execute = _assignmentWaiters[partitionIndex].WaitOne(0); + if (execute) { - Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], -1); + try + { + var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex)); + Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : -1); + } + catch (Java.Lang.IllegalStateException) + { + Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], -1); + } } } } } + catch { } } - catch { } + } + catch { } + finally + { + _consumers[index].Unsubscribe(); + topics?.Dispose(); } } @@ -882,10 +904,11 @@ public void Start() } catch (TopicExistsException) { + var topics = Collections.Singleton(StateName); // recover partitions of the topic try { - var result = _admin.DescribeTopics(Collections.Singleton(StateName)); + var result = _admin.DescribeTopics(topics); if (result != null) { var map = result.AllTopicNames().Get(); @@ -900,6 +923,10 @@ public void Start() { } + finally + { + topics?.Dispose(); + } } } diff --git a/tests/KNetBenchmark/ProgramKNet.cs b/tests/KNetBenchmark/ProgramKNet.cs index f253cb0ea0..70e64df572 100644 --- a/tests/KNetBenchmark/ProgramKNet.cs +++ b/tests/KNetBenchmark/ProgramKNet.cs @@ -234,10 +234,11 @@ static Stopwatch ConsumeKNet(int testNum, string topicName, int length, int nump }; var consumer = KNetConsumer(); + var topics = Collections.Singleton(topicName); try { int counter = 0; - consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener); + consumer.Subscribe(topics, rebalanceListener); consumer.SetCallback((message) => { if (CheckOnConsume) @@ -268,6 +269,7 @@ static Stopwatch ConsumeKNet(int testNum, string topicName, int length, int nump { if (!SharedObjects) consumer.Dispose(); rebalanceListener?.Dispose(); + topics?.Dispose(); } } catch (Java.Util.Concurrent.ExecutionException ex) @@ -296,10 +298,11 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket, var consumer = KNetConsumer(); var producer = KNetProducer(); + var topics = Collections.Singleton(topicName); try { int counter = 0; - consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener); + consumer.Subscribe(topics, rebalanceListener); while (true) { var records = consumer.Poll(TimeSpan.FromMinutes(1)); @@ -331,6 +334,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket, consumer.Dispose(); producer.Dispose(); } + topics?.Dispose(); } } catch (Java.Util.Concurrent.ExecutionException ex) @@ -351,6 +355,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket, System.Threading.Thread thread = new System.Threading.Thread(() => { ConsumerRebalanceListener rebalanceListener = null; + var topics = Collections.Singleton(topicName); try { rebalanceListener = new() @@ -365,7 +370,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket, startEvent.Set(); } }; - consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener); + consumer.Subscribe(topics, rebalanceListener); Java.Time.Duration duration = TimeSpan.FromSeconds(1); int counter = 0; while (true) @@ -399,6 +404,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket, consumer.Dispose(); } startEvent.Set(); + topics?.Dispose(); } }); diff --git a/tests/KNetBenchmark/ProgramKafka.cs b/tests/KNetBenchmark/ProgramKafka.cs index 8f99d31d37..1d7f7a4258 100644 --- a/tests/KNetBenchmark/ProgramKafka.cs +++ b/tests/KNetBenchmark/ProgramKafka.cs @@ -265,11 +265,12 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num }; var consumer = KafkaConsumer(); + Java.Time.Duration duration = TimeSpan.FromMinutes(1); + var topics = Collections.Singleton(topicName); try { - Java.Time.Duration duration = TimeSpan.FromMinutes(1); int counter = 0; - consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener); + consumer.Subscribe(topics, rebalanceListener); while (true) { var records = consumer.Poll(duration); @@ -318,6 +319,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num { if (!SharedObjects) consumer.Dispose(); rebalanceListener?.Dispose(); + duration?.Dispose(); + topics?.Dispose(); } } catch (Java.Util.Concurrent.ExecutionException ex) @@ -337,6 +340,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num System.Threading.Thread thread = new System.Threading.Thread(() => { + Java.Time.Duration duration = TimeSpan.FromSeconds(1); + var topics = Collections.Singleton(topicName); ConsumerRebalanceListener rebalanceListener = null; try { @@ -353,8 +358,7 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num } }; - consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener); - Java.Time.Duration duration = TimeSpan.FromSeconds(1); + consumer.Subscribe(topics, rebalanceListener); int counter = 0; while (true) { @@ -402,6 +406,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num consumer.Dispose(); } startEvent.Set(); + duration?.Dispose(); + topics?.Dispose(); } }); thread.Start(); diff --git a/tests/KNetClassicTest/Program.cs b/tests/KNetClassicTest/Program.cs index db48ee2fda..a8f206b8ec 100644 --- a/tests/KNetClassicTest/Program.cs +++ b/tests/KNetClassicTest/Program.cs @@ -82,13 +82,15 @@ private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs static void CreateTopic() { + NewTopic topic = null; + Set coll = null; try { string topicName = topicToUse; int partitions = 1; short replicationFactor = 1; - var topic = new NewTopic(topicName, partitions, replicationFactor); + topic = new NewTopic(topicName, partitions, replicationFactor); /**** Direct mode ****** var map = Collections.SingletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); @@ -99,7 +101,7 @@ static void CreateTopic() .WithMinCleanableDirtyRatio(0.01) .WithSegmentMs(100)); - var coll = Collections.Singleton(topic); + coll = Collections.Singleton(topic); /**** Direct mode ****** Properties props = new Properties(); @@ -132,6 +134,11 @@ static void CreateTopic() { Console.WriteLine(e.Message); } + finally + { + coll?.Dispose(); + topic?.Dispose(); + } } static void ProduceSomething() @@ -290,12 +297,13 @@ static void ConsumeSomething() } }; } + var topics = Collections.Singleton(topicToUse); try { using (consumer = useSerdes ? new KafkaConsumer(props, keyDeserializer, valueDeserializer) : new KafkaConsumer(props)) { - if (useCallback) consumer.Subscribe(Collections.Singleton(topicToUse), rebalanceListener); - else consumer.Subscribe(Collections.Singleton(topicToUse)); + if (useCallback) consumer.Subscribe(topics, rebalanceListener); + else consumer.Subscribe(topics); while (!resetEvent.WaitOne(0)) { @@ -314,6 +322,7 @@ static void ConsumeSomething() keyDeserializer.Dispose(); valueDeserializer.Dispose(); } + topics?.Dispose(); } } catch (Java.Util.Concurrent.ExecutionException ex) diff --git a/tests/KNetTest/Program.cs b/tests/KNetTest/Program.cs index 6d0d319287..0cd37b7c43 100644 --- a/tests/KNetTest/Program.cs +++ b/tests/KNetTest/Program.cs @@ -266,12 +266,13 @@ static void ConsumeSomething() } }; } + var topics = Collections.Singleton(topicToUse); try { using (consumer = new KNetConsumer(props, keyDeserializer, valueDeserializer)) { - if (useCallback) consumer.Subscribe(Collections.Singleton(topicToUse), rebalanceListener); - else consumer.Subscribe(Collections.Singleton(topicToUse)); + if (useCallback) consumer.Subscribe(topics, rebalanceListener); + else consumer.Subscribe(topics); while (!resetEvent.WaitOne(0)) { @@ -287,6 +288,7 @@ static void ConsumeSomething() { keyDeserializer?.Dispose(); valueDeserializer?.Dispose(); + topics?.Dispose(); } } catch (Java.Util.Concurrent.ExecutionException ex)