Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix other possible Java.Lang.NullPointerException and startup synchronization #260

Merged
merged 7 commits into from
Oct 10, 2023
98 changes: 52 additions & 46 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
/// </summary>
bool IsCompleting { get; }
/// <summary>
/// <see langword="true"/> if the <see cref="IKNetConsumer{K, V}"/> instance has an empty setnof items in async operation
/// <see langword="true"/> if the <see cref="IKNetConsumer{K, V}"/> instance has an empty set of items in async operation
/// </summary>
bool IsEmpty { get; }
/// <summary>
Expand Down Expand Up @@ -71,7 +71,8 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
/// KNet async extension for <see cref="Org.Apache.Kafka.Clients.Consumer.Consumer.Poll(Duration)"/>
/// </summary>
/// <param name="timeoutMs">Timeout in milliseconds</param>
void ConsumeAsync(long timeoutMs);
/// <returns><see langword="true"/> if something was enqued for Async operations</returns>
bool ConsumeAsync(long timeoutMs);
/// <summary>
/// KNet sync extension for <see cref="Org.Apache.Kafka.Clients.Consumer.Consumer.Poll(Duration)"/>
/// </summary>
Expand All @@ -86,12 +87,12 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
/// <typeparam name="V">The value type</typeparam>
public class KNetConsumer<K, V> : KafkaConsumer<byte[], byte[]>, IKNetConsumer<K, V>
{
readonly bool autoCreateSerDes = false;
bool threadRunning = false;
long dequeing = 0;
readonly System.Threading.Thread consumeThread = null;
readonly ConcurrentQueue<KNetConsumerRecords<K, V>> consumedRecords = null;
readonly KNetConsumerCallback<K, V> consumerCallback = null;
readonly bool _autoCreateSerDes = false;
bool _threadRunning = false;
long _dequeing = 0;
readonly System.Threading.Thread _consumeThread = null;
readonly ConcurrentQueue<KNetConsumerRecords<K, V>> _consumedRecords = null;
readonly KNetConsumerCallback<K, V> _consumerCallback = null;
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
/// <summary>
Expand All @@ -106,19 +107,19 @@ public class KNetConsumer<K, V> : KafkaConsumer<byte[], byte[]>, IKNetConsumer<K
public KNetConsumer(Properties props, bool useJVMCallback = false)
: this(props, new KNetSerDes<K>(), new KNetSerDes<V>(), useJVMCallback)
{
autoCreateSerDes = true;
_autoCreateSerDes = true;

if (useJVMCallback)
{
consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", consumerCallback);
_consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", _consumerCallback);
}
else
{
consumedRecords = new();
threadRunning = true;
consumeThread = new(ConsumeHandler);
consumeThread.Start();
_consumedRecords = new();
_threadRunning = true;
_consumeThread = new(ConsumeHandler);
_consumeThread.Start();
}
}
/// <summary>
Expand All @@ -136,15 +137,15 @@ public KNetConsumer(Properties props, IKNetDeserializer<K> keyDeserializer, IKNe

if (useJVMCallback)
{
consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", consumerCallback);
_consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", _consumerCallback);
}
else
{
consumedRecords = new();
threadRunning = true;
consumeThread = new(ConsumeHandler);
consumeThread.Start();
_consumedRecords = new();
_threadRunning = true;
_consumeThread = new(ConsumeHandler);
_consumeThread.Start();
}
}

Expand Down Expand Up @@ -194,22 +195,22 @@ void CallbackMessage(KNetConsumerRecord<K, V> message)
/// <inheritdoc cref="IDisposable.Dispose"/>
public override void Dispose()
{
if (consumerCallback != null)
if (_consumerCallback != null)
{
IExecute("setCallback", null);
consumerCallback?.Dispose();
_consumerCallback?.Dispose();
}
threadRunning = false;
if (consumedRecords != null)
_threadRunning = false;
if (_consumedRecords != null)
{
lock (consumedRecords)
lock (_consumedRecords)
{
System.Threading.Monitor.Pulse(consumedRecords);
System.Threading.Monitor.Pulse(_consumedRecords);
}
if (IsCompleting) { consumeThread?.Join(); };
if (IsCompleting) { _consumeThread?.Join(); };
actionCallback = null;
}
if (autoCreateSerDes)
if (_autoCreateSerDes)
{
_keyDeserializer?.Dispose();
_valueDeserializer?.Dispose();
Expand All @@ -226,11 +227,11 @@ void ConsumeHandler(object o)
{
try
{
while (threadRunning)
while (_threadRunning)
{
if (consumedRecords.TryDequeue(out KNetConsumerRecords<K, V> records))
if (_consumedRecords.TryDequeue(out KNetConsumerRecords<K, V> records))
{
System.Threading.Interlocked.Increment(ref dequeing);
System.Threading.Interlocked.Increment(ref _dequeing);
try
{
foreach (var item in records)
Expand All @@ -241,40 +242,45 @@ void ConsumeHandler(object o)
catch { }
finally
{
System.Threading.Interlocked.Decrement(ref dequeing);
System.Threading.Interlocked.Decrement(ref _dequeing);
}
}
else if (threadRunning)
else if (_threadRunning)
{
lock (consumedRecords)
lock (_consumedRecords)
{
System.Threading.Monitor.Wait(consumedRecords);
System.Threading.Monitor.Wait(_consumedRecords);
}
}
}
}
catch { }
}
/// <inheritdoc cref="IKNetConsumer{K, V}.IsCompleting"/>
public bool IsCompleting => !consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref dequeing) != 0;
public bool IsCompleting => !_consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref _dequeing) != 0;
/// <inheritdoc cref="IKNetConsumer{K, V}.IsEmpty"/>
public bool IsEmpty => consumedRecords.IsEmpty;
public bool IsEmpty => _consumedRecords.IsEmpty;
/// <inheritdoc cref="IKNetConsumer{K, V}.WaitingMessages"/>
public int WaitingMessages => consumedRecords.Count;
public int WaitingMessages => _consumedRecords.Count;
/// <inheritdoc cref="IKNetConsumer{K, V}.ConsumeAsync(long)"/>
public void ConsumeAsync(long timeoutMs)
public bool ConsumeAsync(long timeoutMs)
{
Duration duration = TimeSpan.FromMilliseconds(timeoutMs);
if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true.");
if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running.");
if (_consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true.");
if (!_threadRunning) throw new InvalidOperationException("Dispatching thread is not running.");
try
{
var results = this.Poll(duration);
consumedRecords.Enqueue(results);
lock (consumedRecords)
bool isEmpty = results.IsEmpty;
if (!isEmpty)
{
System.Threading.Monitor.Pulse(consumedRecords);
_consumedRecords.Enqueue(results);
lock (_consumedRecords)
{
System.Threading.Monitor.Pulse(_consumedRecords);
}
}
return !isEmpty;
}
finally
{
Expand All @@ -285,7 +291,7 @@ public void ConsumeAsync(long timeoutMs)
public void Consume(long timeoutMs, Action<KNetConsumerRecord<K, V>> callback)
{
Duration duration = TimeSpan.FromMilliseconds(timeoutMs);
if (consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false.");
if (_consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false.");
try
{
actionCallback = callback;
Expand Down
2 changes: 2 additions & 0 deletions src/net/KNet/Specific/Consumer/KNetConsumerRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public KNetConsumerRecord(ConsumerRecord<byte[], byte[]> record, IKNetDeserializ
}
/// <inheritdoc cref="ConsumerRecord{K, V}.Topic"/>
public string Topic => _record.Topic();
/// <inheritdoc cref="ConsumerRecord{K, V}.LeaderEpoch"/>
public int? LeaderEpoch { get { var epoch = _record.LeaderEpoch(); return epoch.IsEmpty() ? null : epoch.Get(); } }
/// <inheritdoc cref="ConsumerRecord{K, V}.Partition"/>
public int Partition => _record.Partition();
/// <inheritdoc cref="ConsumerRecord{K, V}.Headers"/>
Expand Down
8 changes: 8 additions & 0 deletions src/net/KNet/Specific/Consumer/KNetConsumerRecords.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public KNetConsumerRecords(ConsumerRecords<byte[], byte[]> records, IKNetDeseria
_keyDeserializer = keyDeserializer;
_valueDeserializer = valueDeserializer;
}
/// <summary>
/// <see langword="true"/> if the <see cref="KNetConsumerRecords{K, V}"/> is empty
/// </summary>
public bool IsEmpty => _records.IsEmpty();
/// <summary>
/// The number of elements in <see cref="KNetConsumerRecords{K, V}"/>
/// </summary>
public int Count => _records.Count();

IEnumerator<KNetConsumerRecord<K, V>> IEnumerable<KNetConsumerRecord<K, V>>.GetEnumerator()
{
Expand Down
Loading