Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extra fix for Java.Lang.NullPointerException #257

Merged
merged 2 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/documentation/articles/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ namespace MASES.KNetTemplate.KNetConsumer

using (var consumer = new KafkaConsumer<string, string>(props))
{
consumer.Subscribe(Collections.singleton(topicToUse));
var topics = Collections.Singleton(topicToUse);
consumer.Subscribe(topics);
while (!resetEvent.WaitOne(0))
{
var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
Expand All @@ -293,6 +294,7 @@ namespace MASES.KNetTemplate.KNetConsumer
Console.WriteLine($"Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}");
}
}
topics?.Dispose(); // needed to avoid Java.Lang.NullPointerException in some conditions where .NET GC retires topics too early
}
}

Expand Down
15 changes: 11 additions & 4 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,18 @@ public void ConsumeAsync(long timeoutMs)
Duration duration = TimeSpan.FromMilliseconds(timeoutMs);
if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true.");
if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running.");
var results = this.Poll(duration);
consumedRecords.Enqueue(results);
lock (consumedRecords)
try
{
System.Threading.Monitor.Pulse(consumedRecords);
var results = this.Poll(duration);
consumedRecords.Enqueue(results);
lock (consumedRecords)
{
System.Threading.Monitor.Pulse(consumedRecords);
}
}
finally
{
duration?.Dispose();
}
}
/// <inheritdoc cref="IKNetConsumer{K, V}.Consume(long, Action{KNetConsumerRecord{K, V}})"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,12 @@ public static async Task<DescribeTopicsResult> DescribeTopicsAsync(this IAdmin a
/// </summary>
public static async Task<DescribeTopicsResult> DescribeTopicAsync(this IAdmin admin, string topicName)
{
return await Execute(admin.DescribeTopics, Collections.Singleton(topicName));
var topics = Collections.Singleton(topicName);
try
{
return await Execute(admin.DescribeTopics, topics);
}
finally { topics?.Dispose(); }
}
/// <summary>
/// Async version of <see cref="IAdmin.DescribeTopics(Collection{string}, DescribeTopicsOptions)"/>
Expand Down
101 changes: 64 additions & 37 deletions src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Refer to LICENSE for more information.
*/

using Java.Time;
using Java.Util;
using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Admin;
Expand Down Expand Up @@ -175,13 +176,13 @@ public interface IKNetCompactedReplicator<TKey, TValue> : IDictionary<TKey, TVal
/// </summary>
ProducerConfigBuilder ProducerConfig { get; }
/// <summary>
/// Get or set <see cref="KNetSerDes{TKey}"/> to use in <see cref="KNetCompactedReplicator{TKey, TValue}"/>, by default it creates a default one based on <typeparamref name="TKey"/>
/// Get or set an instance of <see cref="IKNetSerDes{TKey}"/> to use in <see cref="KNetCompactedReplicator{TKey, TValue}"/>, by default it creates a default one based on <typeparamref name="TKey"/>
/// </summary>
KNetSerDes<TKey> KeySerDes { get; }
IKNetSerDes<TKey> KeySerDes { get; }
/// <summary>
/// Get or set <see cref="KNetSerDes{TValue}"/> to use in <see cref="KNetCompactedReplicator{TKey, TValue}"/>, by default it creates a default one based on <typeparamref name="TValue"/>
/// Get or set an instance of <see cref="IKNetSerDes{TValue}"/> to use in <see cref="KNetCompactedReplicator{TKey, TValue}"/>, by default it creates a default one based on <typeparamref name="TValue"/>
/// </summary>
KNetSerDes<TValue> ValueSerDes { get; }
IKNetSerDes<TValue> ValueSerDes { get; }
/// <summary>
/// <see langword="true"/> if the instance was started
/// </summary>
Expand Down Expand Up @@ -383,17 +384,28 @@ public void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
static void OnDemandRetrieve(IKNetConsumer<TKey, TValue> consumer, string topic, TKey key, ILocalDataStorage data)
{
var topicPartition = new TopicPartition(topic, data.Partition);
consumer.Assign(Collections.SingletonList(topicPartition));
consumer.Seek(topicPartition, data.Offset);
var results = consumer.Poll(TimeSpan.FromMinutes(1));
if (results == null) throw new InvalidOperationException("Failed to get records from remote.");
foreach (var result in results)
var topics = Collections.SingletonList(topicPartition);
Duration duration = TimeSpan.FromMinutes(1);
try
{
if (!Equals(result.Key, key)) continue;
if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}");
data.HasValue = true;
data.Value = result.Value;
break;
consumer.Assign(topics);
consumer.Seek(topicPartition, data.Offset);
var results = consumer.Poll(duration);
if (results == null) throw new InvalidOperationException("Failed to get records from remote.");
foreach (var result in results)
{
if (!Equals(result.Key, key)) continue;
if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}");
data.HasValue = true;
data.Value = result.Value;
break;
}
}
finally
{
topicPartition?.Dispose();
topics?.Dispose();
duration?.Dispose();
}
}
}
Expand Down Expand Up @@ -464,8 +476,8 @@ public override void OnPartitionsLost(Java.Util.Collection<Org.Apache.Kafka.Comm
private ManualResetEvent[] _assignmentWaiters;
private long[] _lastPartitionLags = null;

private KNetSerDes<TKey> _keySerDes = null;
private KNetSerDes<TValue> _valueSerDes = null;
private IKNetSerDes<TKey> _keySerDes = null;
private IKNetSerDes<TValue> _valueSerDes = null;

private bool _started = false;

Expand Down Expand Up @@ -529,10 +541,10 @@ public Func<IKNetCompactedReplicator<TKey, TValue>, bool, KeyValuePair<TKey, TVa
public ProducerConfigBuilder ProducerConfig { get { return _producerConfig; } set { CheckStarted(); _producerConfig = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.KeySerDes"/>
public KNetSerDes<TKey> KeySerDes { get { return _keySerDes; } set { CheckStarted(); _keySerDes = value; } }
public IKNetSerDes<TKey> KeySerDes { get { return _keySerDes; } set { CheckStarted(); _keySerDes = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.ValueSerDes"/>
public KNetSerDes<TValue> ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } }
public IKNetSerDes<TValue> ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.IsStarted"/>
public bool IsStarted => _started;
Expand Down Expand Up @@ -820,35 +832,45 @@ void BuildProducer()
void ConsumerPollHandler(object o)
{
int index = (int)o;
_consumers[index].Subscribe(Collections.Singleton(StateName), _consumerListeners[index]);
while (_consumerPollRun)
var topics = Collections.Singleton(StateName);
try
{
try
_consumers[index].Subscribe(topics, _consumerListeners[index]);
while (_consumerPollRun)
{
_consumers[index].ConsumeAsync(100);
lock (_consumerAssociatedPartition)
try
{
foreach (var partitionIndex in _consumerAssociatedPartition[index])
_consumers[index].ConsumeAsync(100);
lock (_consumerAssociatedPartition)
{
bool execute = false;
if (_assignmentWaiters[partitionIndex].SafeWaitHandle.IsClosed) continue;
else execute = _assignmentWaiters[partitionIndex].WaitOne(0);
if (execute)
foreach (var partitionIndex in _consumerAssociatedPartition[index])
{
try
{
var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex));
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : -1);
}
catch (Java.Lang.IllegalStateException)
bool execute = false;
if (_assignmentWaiters[partitionIndex].SafeWaitHandle.IsClosed) continue;
else execute = _assignmentWaiters[partitionIndex].WaitOne(0);
if (execute)
{
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], -1);
try
{
var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex));
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : -1);
}
catch (Java.Lang.IllegalStateException)
{
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], -1);
}
}
}
}
}
catch { }
}
catch { }
}
catch { }
finally
{
_consumers[index].Unsubscribe();
topics?.Dispose();
}
}

Expand Down Expand Up @@ -882,10 +904,11 @@ public void Start()
}
catch (TopicExistsException)
{
var topics = Collections.Singleton(StateName);
// recover partitions of the topic
try
{
var result = _admin.DescribeTopics(Collections.Singleton(StateName));
var result = _admin.DescribeTopics(topics);
if (result != null)
{
var map = result.AllTopicNames().Get();
Expand All @@ -900,6 +923,10 @@ public void Start()
{

}
finally
{
topics?.Dispose();
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions tests/KNetBenchmark/ProgramKNet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,11 @@ static Stopwatch ConsumeKNet(int testNum, string topicName, int length, int nump
};

var consumer = KNetConsumer();
var topics = Collections.Singleton(topicName);
try
{
int counter = 0;
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
consumer.Subscribe(topics, rebalanceListener);
consumer.SetCallback((message) =>
{
if (CheckOnConsume)
Expand Down Expand Up @@ -268,6 +269,7 @@ static Stopwatch ConsumeKNet(int testNum, string topicName, int length, int nump
{
if (!SharedObjects) consumer.Dispose();
rebalanceListener?.Dispose();
topics?.Dispose();
}
}
catch (Java.Util.Concurrent.ExecutionException ex)
Expand Down Expand Up @@ -296,10 +298,11 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,

var consumer = KNetConsumer();
var producer = KNetProducer();
var topics = Collections.Singleton(topicName);
try
{
int counter = 0;
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
consumer.Subscribe(topics, rebalanceListener);
while (true)
{
var records = consumer.Poll(TimeSpan.FromMinutes(1));
Expand Down Expand Up @@ -331,6 +334,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,
consumer.Dispose();
producer.Dispose();
}
topics?.Dispose();
}
}
catch (Java.Util.Concurrent.ExecutionException ex)
Expand All @@ -351,6 +355,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,
System.Threading.Thread thread = new System.Threading.Thread(() =>
{
ConsumerRebalanceListener rebalanceListener = null;
var topics = Collections.Singleton(topicName);
try
{
rebalanceListener = new()
Expand All @@ -365,7 +370,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,
startEvent.Set();
}
};
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
consumer.Subscribe(topics, rebalanceListener);
Java.Time.Duration duration = TimeSpan.FromSeconds(1);
int counter = 0;
while (true)
Expand Down Expand Up @@ -399,6 +404,7 @@ static Stopwatch ConsumeProduceKNet(string topicName, int length, int numpacket,
consumer.Dispose();
}
startEvent.Set();
topics?.Dispose();
}
});

Expand Down
14 changes: 10 additions & 4 deletions tests/KNetBenchmark/ProgramKafka.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,12 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num
};

var consumer = KafkaConsumer();
Java.Time.Duration duration = TimeSpan.FromMinutes(1);
var topics = Collections.Singleton(topicName);
try
{
Java.Time.Duration duration = TimeSpan.FromMinutes(1);
int counter = 0;
consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
consumer.Subscribe(topics, rebalanceListener);
while (true)
{
var records = consumer.Poll(duration);
Expand Down Expand Up @@ -318,6 +319,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num
{
if (!SharedObjects) consumer.Dispose();
rebalanceListener?.Dispose();
duration?.Dispose();
topics?.Dispose();
}
}
catch (Java.Util.Concurrent.ExecutionException ex)
Expand All @@ -337,6 +340,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num

System.Threading.Thread thread = new System.Threading.Thread(() =>
{
Java.Time.Duration duration = TimeSpan.FromSeconds(1);
var topics = Collections.Singleton(topicName);
ConsumerRebalanceListener rebalanceListener = null;
try
{
Expand All @@ -353,8 +358,7 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num
}
};

consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener);
Java.Time.Duration duration = TimeSpan.FromSeconds(1);
consumer.Subscribe(topics, rebalanceListener);
int counter = 0;
while (true)
{
Expand Down Expand Up @@ -402,6 +406,8 @@ static Stopwatch ConsumeKafka(int testNum, string topicName, int length, int num
consumer.Dispose();
}
startEvent.Set();
duration?.Dispose();
topics?.Dispose();
}
});
thread.Start();
Expand Down
Loading