Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed KNetProducer plus added new API on KNetProducer, fixed Dispose of KNetConsumer, reviewed KNetCompactedReplicator synchronization #220

Merged
merged 1 commit into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ public class KNetConsumer<K, V> : KafkaConsumer<byte[], byte[]>, IKNetConsumer<K
bool threadRunning = false;
long dequeing = 0;
readonly System.Threading.Thread consumeThread = null;
readonly System.Threading.ManualResetEvent threadExited = null;
readonly ConcurrentQueue<KNetConsumerRecords<K, V>> consumedRecords = null;
readonly KNetConsumerCallback<K, V> consumerCallback = null;
readonly IKNetDeserializer<K> _keyDeserializer;
Expand Down Expand Up @@ -328,7 +327,6 @@ public KNetConsumer(Properties props, bool useJVMCallback = false)
{
consumedRecords = new();
threadRunning = true;
threadExited = new(false);
consumeThread = new(ConsumeHandler);
consumeThread.Start();
}
Expand All @@ -355,7 +353,6 @@ public KNetConsumer(Properties props, IKNetDeserializer<K> keyDeserializer, IKNe
{
consumedRecords = new();
threadRunning = true;
threadExited = new(false);
consumeThread = new(ConsumeHandler);
consumeThread.Start();
}
Expand All @@ -382,12 +379,9 @@ static Properties CheckProperties(Properties props)
/// </summary>
~KNetConsumer()
{
if (autoCreateSerDes)
{
_keyDeserializer?.Dispose();
_valueDeserializer?.Dispose();
}
this.Dispose();
}

/// <inheritdoc cref="IKNetConsumer{K, V}.Poll(long)"/>
public new KNetConsumerRecords<K, V> Poll(long timeoutMs)
{
Expand Down Expand Up @@ -420,9 +414,14 @@ public override void Dispose()
{
System.Threading.Monitor.Pulse(consumedRecords);
}
while (IsCompleting) { threadExited.WaitOne(100); };
if (IsCompleting) { consumeThread?.Join(); };
actionCallback = null;
}
if (autoCreateSerDes)
{
_keyDeserializer?.Dispose();
_valueDeserializer?.Dispose();
}
}
/// <inheritdoc cref="IKNetConsumer{K, V}.SetCallback(Action{KNetConsumerRecord{K, V}})"/>
public void SetCallback(Action<KNetConsumerRecord<K, V>> cb)
Expand Down Expand Up @@ -462,7 +461,6 @@ void ConsumeHandler(object o)
}
}
catch { }
finally { threadExited.Set(); threadRunning = false; }
}
/// <inheritdoc cref="IKNetConsumer{K, V}.IsCompleting"/>
public bool IsCompleting => !consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref dequeing) != 0;
Expand Down
78 changes: 47 additions & 31 deletions src/net/KNet/Specific/KNetCompactedReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,39 @@ private void RemoveRecord(TKey key)
if (key == null)
throw new ArgumentNullException(nameof(key));

JVMBridgeException exception = null;

DateTime pTimestamp = DateTime.MaxValue;
using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false))
if (UpdateMode == UpdateModeTypes.OnDelivery)
{
_producer.Produce(new KNetProducerRecord<TKey, TValue>(_stateName, key, null), (record, error) =>
JVMBridgeException exception = null;

DateTime pTimestamp = DateTime.MaxValue;
using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false))
{
try
using (Callback cb = new Callback()
{
if (deliverySemaphore.SafeWaitHandle.IsClosed)
return;

exception = error;

deliverySemaphore.Set();
OnOnCompletion = (record, error) =>
{
try
{
if (deliverySemaphore.SafeWaitHandle.IsClosed)
return;

exception = error;
deliverySemaphore.Set();
}
catch { }
}
})
{
_producer.Produce(new KNetProducerRecord<TKey, TValue>(_stateName, key, null), cb);
deliverySemaphore.WaitOne();
if (exception != null) throw exception;
}
catch { }
});

deliverySemaphore.WaitOne();
}
}
else if (UpdateMode == UpdateModeTypes.OnConsume || UpdateMode == UpdateModeTypes.OnConsumeSync)
{
_producer.Produce(StateName, key, null, (Callback)null);
}

if (exception != null) throw exception;
_dictionary.TryRemove(key, out _);
}

Expand All @@ -279,24 +289,28 @@ private void AddOrUpdate(TKey key, TValue value)
DateTime pTimestamp = DateTime.MaxValue;
using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false))
{
_producer.Produce(StateName, key, value, (record, error) =>
using (Callback cb = new Callback()
{
try
OnOnCompletion = (record, error) =>
{
if (deliverySemaphore.SafeWaitHandle.IsClosed)
return;

exception = error;
deliverySemaphore.Set();
try
{
if (deliverySemaphore.SafeWaitHandle.IsClosed)
return;

exception = error;
deliverySemaphore.Set();
}
catch { }
}
catch { }
});

deliverySemaphore.WaitOne();
})
{
_producer.Produce(new KNetProducerRecord<TKey, TValue>(_stateName, key, value), cb);
deliverySemaphore.WaitOne();
if (exception != null) throw exception;
}
}

if (exception != null) throw exception;

if (value == null)
{
_dictionary.TryRemove(key, out _);
Expand Down Expand Up @@ -663,6 +677,8 @@ IEnumerator IEnumerable.GetEnumerator()
/// </summary>
public void Dispose()
{
_consumerPollRun = false;

_consumer?.Dispose();

_producer?.Flush();
Expand Down
96 changes: 68 additions & 28 deletions src/net/KNet/Specific/Producer/KNetProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,23 +211,43 @@ public interface IKNetProducer<K, V> : IProducer<byte[], byte[]>
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
void Produce(string topic, K key, V value, Callback cb = null);
Future<RecordMetadata> Produce(string topic, K key, V value, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
void Produce(string topic, int partition, K key, V value, Callback cb = null);
void ProduceAndWait(string topic, K key, V value, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
void Produce(string topic, int partition, long timestamp, K key, V value, Callback cb = null);
Future<RecordMetadata> Produce(string topic, int partition, K key, V value, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
void Produce(string topic, int partition, DateTime timestamp, K key, V value, Callback cb = null);
void ProduceAndWait(string topic, int partition, K key, V value, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
void Produce(KNetProducerRecord<K, V> record, Callback cb = null);
Future<RecordMetadata> Produce(string topic, int partition, long timestamp, K key, V value, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
void ProduceAndWait(string topic, int partition, long timestamp, K key, V value, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
Future<RecordMetadata> Produce(string topic, int partition, DateTime timestamp, K key, V value, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
void ProduceAndWait(string topic, int partition, DateTime timestamp, K key, V value, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
Future<RecordMetadata> Produce(KNetProducerRecord<K, V> record, Callback cb = null);
/// <summary>
/// KNet version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
void ProduceAndWait(KNetProducerRecord<K, V> record, Callback cb = null);
/// <summary>
/// KNet async version of <see cref="Producer{K, V}.Send(ProducerRecord{K, V}, Callback)"/>
/// </summary>
Expand Down Expand Up @@ -425,49 +445,69 @@ public void Produce(KNetProducerRecord<K, V> record, Action<RecordMetadata, JVMB
}
}
/// <inheritdoc cref="IKNetProducer{K, V}.Produce(string, K, V, Callback)"/>
public void Produce(string topic, K key, V value, Callback cb = null)
public Future<RecordMetadata> Produce(string topic, K key, V value, Callback cb = null)
{
Produce(new KNetProducerRecord<K, V>(topic, key, value), cb);
return Produce(new KNetProducerRecord<K, V>(topic, key, value), cb);
}
/// <inheritdoc cref="IKNetProducer{K, V}.ProduceAndWait(string, K, V, Callback)"/>
public void ProduceAndWait(string topic, K key, V value, Callback cb = null)
{
ProduceAndWait(new KNetProducerRecord<K, V>(topic, key, value), cb);
}
/// <inheritdoc cref="IKNetProducer{K, V}.Produce(string, int, K, V, Callback)"/>
public void Produce(string topic, int partition, K key, V value, Callback cb = null)
public Future<RecordMetadata> Produce(string topic, int partition, K key, V value, Callback cb = null)
{
Produce(new KNetProducerRecord<K, V>(topic, partition, key, value), cb);
return Produce(new KNetProducerRecord<K, V>(topic, partition, key, value), cb);
}
/// <inheritdoc cref="IKNetProducer{K, V}.Produce(string, int, long, K, V, Action{RecordMetadata, JVMBridgeException})"/>
public void Produce(string topic, int partition, long timestamp, K key, V value, Callback cb = null)
/// <inheritdoc cref="IKNetProducer{K, V}.ProduceAndWait(string, int, K, V, Callback)"/>
public void ProduceAndWait(string topic, int partition, K key, V value, Callback cb = null)
{
ProduceAndWait(new KNetProducerRecord<K, V>(topic, partition, key, value), cb);
}
/// <inheritdoc cref="IKNetProducer{K, V}.Produce(string, int, long, K, V, Callback)"/>
public Future<RecordMetadata> Produce(string topic, int partition, long timestamp, K key, V value, Callback cb = null)
{
return Produce(new KNetProducerRecord<K, V>(topic, partition, timestamp, key, value), cb);
}
/// <inheritdoc cref="IKNetProducer{K, V}.ProduceAndWait(string, int, long, K, V, Callback)"/>
public void ProduceAndWait(string topic, int partition, long timestamp, K key, V value, Callback cb = null)
{
Produce(new KNetProducerRecord<K, V>(topic, partition, timestamp, key, value), cb);
ProduceAndWait(new KNetProducerRecord<K, V>(topic, partition, timestamp, key, value), cb);
}
/// <inheritdoc cref="IKNetProducer{K, V}.Produce(string, int, DateTime, K, V, Callback)"/>
public void Produce(string topic, int partition, DateTime timestamp, K key, V value, Callback cb = null)
public Future<RecordMetadata> Produce(string topic, int partition, DateTime timestamp, K key, V value, Callback cb = null)
{
return Produce(new KNetProducerRecord<K, V>(topic, partition, timestamp, key, value), cb);
}
/// <inheritdoc cref="IKNetProducer{K, V}.ProduceAndWait(string, int, DateTime, K, V, Callback)"/>
public void ProduceAndWait(string topic, int partition, DateTime timestamp, K key, V value, Callback cb = null)
{
Produce(new KNetProducerRecord<K, V>(topic, partition, timestamp, key, value), cb);
ProduceAndWait(new KNetProducerRecord<K, V>(topic, partition, timestamp, key, value), cb);
}
/// <inheritdoc cref="IKNetProducer{K, V}.Produce(KNetProducerRecord{K, V}, Callback)"/>
public void Produce(KNetProducerRecord<K, V> record, Callback cb = null)
public Future<RecordMetadata> Produce(KNetProducerRecord<K, V> record, Callback cb = null)
{
if (cb != null)
{
return this.Send(record, cb);
}
else
{
return this.Send(record);
}
}
/// <inheritdoc cref="IKNetProducer{K, V}.ProduceAndWait(KNetProducerRecord{K, V}, Callback)"/>
public void ProduceAndWait(KNetProducerRecord<K, V> record, Callback cb = null)
{
try
{
Future<RecordMetadata> result;
if (cb != null)
{
result = this.Send(record, cb);
}
else
{
result = this.Send(record);
}
Future<RecordMetadata> result = this.Produce(record, cb);
result.Get();
}
catch (ExecutionException e)
{
throw e.InnerException;
}
finally
{
cb?.Dispose();
}
}
/// <inheritdoc cref="IKNetProducer{K, V}.Produce(string, K, V, Action{RecordMetadata, JVMBridgeException})"/>
public async Task ProduceAsync(string topic, K key, V value, Action<RecordMetadata, JVMBridgeException> action = null)
Expand Down