Skip to content

Commit

Permalink
Added ConsumerInstances on KNetCompactedReplicator to manage the …
Browse files Browse the repository at this point in the history
…number of active consumers (#238)

* Clean Program.cs files

* #237: added ConsumerInstances to manage the number of consumers
  • Loading branch information
masesdevelopers authored Sep 26, 2023
1 parent 7020dbf commit 2f38fc3
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 55 deletions.
154 changes: 118 additions & 36 deletions src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ public interface IKNetCompactedReplicator<TKey, TValue> : IDictionary<TKey, TVal
/// </summary>
int Partitions { get; }
/// <summary>
/// Get or set the number of <see cref="KNetConsumer{K, V}"/> instances to be used, null to allocate <see cref="KNetConsumer{K, V}"/> based on <see cref="Partitions"/>
/// </summary>
int? ConsumerInstances { get; }
/// <summary>
/// Get or set replication factor to use when topic is created for the first time, otherwise reports the replication factor of the topic
/// </summary>
short ReplicationFactor { get; }
Expand Down Expand Up @@ -228,7 +232,7 @@ public interface IKNetCompactedReplicator<TKey, TValue> : IDictionary<TKey, TVal

#endregion

#region IKNetCompactedReplicator<TKey, TValue>
#region KNetCompactedReplicator<TKey, TValue>
/// <summary>
/// Provides a reliable dictionary, persisted in a COMPACTED Kafka topic and shared among applications
/// </summary>
Expand Down Expand Up @@ -397,20 +401,58 @@ static void OnDemandRetrieve(IKNetConsumer<TKey, TValue> consumer, string topic,

#endregion

#region KNetCompactedConsumerRebalanceListener

class KNetCompactedConsumerRebalanceListener : ConsumerRebalanceListener
{
int _consumerIndex;
public KNetCompactedConsumerRebalanceListener(int consumerIndex)
: base()
{
_consumerIndex = consumerIndex;
}

public int ConsumerIndex => _consumerIndex;

public new System.Action<KNetCompactedConsumerRebalanceListener, Java.Util.Collection<Org.Apache.Kafka.Common.TopicPartition>> OnOnPartitionsAssigned { get; set; }

public override void OnPartitionsAssigned(Java.Util.Collection<Org.Apache.Kafka.Common.TopicPartition> arg0)
{
OnOnPartitionsAssigned?.Invoke(this, arg0);
}

public new System.Action<KNetCompactedConsumerRebalanceListener, Java.Util.Collection<Org.Apache.Kafka.Common.TopicPartition>> OnOnPartitionsRevoked { get; set; }

public override void OnPartitionsRevoked(Java.Util.Collection<Org.Apache.Kafka.Common.TopicPartition> arg0)
{
OnOnPartitionsRevoked?.Invoke(this, arg0);
}

public new System.Action<KNetCompactedConsumerRebalanceListener, Java.Util.Collection<Org.Apache.Kafka.Common.TopicPartition>> OnOnPartitionsLost { get; set; }

public override void OnPartitionsLost(Java.Util.Collection<Org.Apache.Kafka.Common.TopicPartition> arg0)
{
OnOnPartitionsLost?.Invoke(this, arg0);
}
}

#endregion

#region Private members

private bool _consumerPollRun = false;
private Thread[] _consumerPollThreads = null;
private IAdmin _admin = null;
private ConcurrentDictionary<TKey, ILocalDataStorage> _dictionary = new ConcurrentDictionary<TKey, ILocalDataStorage>();
private ConsumerRebalanceListener _consumerListener = null;
private KNetCompactedConsumerRebalanceListener[] _consumerListeners = null;
private KNetConsumer<TKey, TValue>[] _consumers = null;
private KNetConsumer<TKey, TValue> _onTheFlyConsumer = null;
private KNetProducer<TKey, TValue> _producer = null;
private string _bootstrapServers = null;
private string _stateName = string.Empty;
private string _groupId = Guid.NewGuid().ToString();
private int _partitions = 1;
private int? _consumerInstances = null;
private short _replicationFactor = 1;
private TopicConfigBuilder _topicConfig = null;
private ConsumerConfigBuilder _consumerConfig = null;
Expand All @@ -419,6 +461,7 @@ static void OnDemandRetrieve(IKNetConsumer<TKey, TValue> consumer, string topic,
private AccessRightsType _accessrights = AccessRightsType.ReadWrite;
private UpdateModeTypes _updateMode = UpdateModeTypes.OnDelivery;
private Tuple<TKey, ManualResetEvent> _OnConsumeSyncWaiter = null;
private System.Collections.Generic.Dictionary<int, System.Collections.Generic.IList<int>> _consumerAssociatedPartition = new();
private ManualResetEvent[] _assignmentWaiters;
private long[] _lastPartitionLags = null;

Expand Down Expand Up @@ -471,6 +514,9 @@ public Func<IKNetCompactedReplicator<TKey, TValue>, bool, KeyValuePair<TKey, TVa
/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.Partitions"/>
public int Partitions { get { return _partitions; } set { CheckStarted(); _partitions = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.ConsumerInstances"/>
public int? ConsumerInstances { get { return _consumerInstances.HasValue ? _consumerInstances.Value : _partitions; } set { CheckStarted(); _consumerInstances = value; } }

/// <inheritdoc cref="IKNetCompactedReplicator{TKey, TValue}.ReplicationFactor"/>
public short ReplicationFactor { get { return _replicationFactor; } set { CheckStarted(); _replicationFactor = value; } }

Expand Down Expand Up @@ -504,6 +550,11 @@ void CheckStarted()
{
if (_started) throw new InvalidOperationException("Cannot be changed after Start");
}
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
int ConsumersToAllocate()
{
return ConsumerInstances.HasValue ? ConsumerInstances.Value : Partitions;
}

bool UpdateModeOnDelivery => (UpdateMode & UpdateModeTypes.OnDelivery) == UpdateModeTypes.OnDelivery;

Expand Down Expand Up @@ -556,26 +607,41 @@ private void OnMessage(KNetConsumerRecord<TKey, TValue> record)
}
}

private void OnTopicPartitionsAssigned(Collection<TopicPartition> topicPartitions)
private void OnTopicPartitionsAssigned(KNetCompactedConsumerRebalanceListener listener, Collection<TopicPartition> topicPartitions)
{
foreach (var topicPartition in topicPartitions)
{
_assignmentWaiters[topicPartition.Partition()].Set();
var partition = topicPartition.Partition();
lock (_consumerAssociatedPartition)
{
_consumerAssociatedPartition[listener.ConsumerIndex].Add(partition);
}
_assignmentWaiters[partition].Set();
}
}

private void OnTopicPartitionsRevoked(Collection<TopicPartition> topicPartitions)
private void OnTopicPartitionsRevoked(KNetCompactedConsumerRebalanceListener listener, Collection<TopicPartition> topicPartitions)
{
foreach (var topicPartition in topicPartitions)
{
var partition = topicPartition.Partition();
lock (_consumerAssociatedPartition)
{
_consumerAssociatedPartition[listener.ConsumerIndex].Remove(partition);
}
_assignmentWaiters[topicPartition.Partition()].Reset();
}
}

private void OnTopicPartitionsLost(Collection<TopicPartition> topicPartitions)
private void OnTopicPartitionsLost(KNetCompactedConsumerRebalanceListener listener, Collection<TopicPartition> topicPartitions)
{
foreach (var topicPartition in topicPartitions)
{
var partition = topicPartition.Partition();
lock (_consumerAssociatedPartition)
{
_consumerAssociatedPartition[listener.ConsumerIndex].Remove(partition);
}
_assignmentWaiters[topicPartition.Partition()].Reset();
}
}
Expand Down Expand Up @@ -690,23 +756,29 @@ void BuildConsumers()

if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external deserializer, set ValueSerDes.");

_assignmentWaiters = new ManualResetEvent[_partitions];
_lastPartitionLags = new long[_partitions];
_consumers = new KNetConsumer<TKey, TValue>[_partitions];
_assignmentWaiters = new ManualResetEvent[Partitions];
_lastPartitionLags = new long[Partitions];
_consumers = new KNetConsumer<TKey, TValue>[ConsumersToAllocate()];
_consumerListeners = new KNetCompactedConsumerRebalanceListener[ConsumersToAllocate()];

for (int i = 0; i < _partitions; i++)
for (int i = 0; i < Partitions; i++)
{
_assignmentWaiters[i] = new ManualResetEvent(false);
_lastPartitionLags[i] = -1;
_assignmentWaiters[i] = new ManualResetEvent(false);
}

for (int i = 0; i < ConsumersToAllocate(); i++)
{
_consumerAssociatedPartition.Add(i, new System.Collections.Generic.List<int>());
_consumers[i] = new KNetConsumer<TKey, TValue>(ConsumerConfig, KeySerDes, ValueSerDes);
_consumers[i].SetCallback(OnMessage);
_consumerListeners[i] = new KNetCompactedConsumerRebalanceListener(i)
{
OnOnPartitionsRevoked = OnTopicPartitionsRevoked,
OnOnPartitionsAssigned = OnTopicPartitionsAssigned,
OnOnPartitionsLost = OnTopicPartitionsLost
};
}
_consumerListener = new ConsumerRebalanceListener()
{
OnOnPartitionsRevoked = OnTopicPartitionsRevoked,
OnOnPartitionsAssigned = OnTopicPartitionsAssigned,
OnOnPartitionsLost = OnTopicPartitionsLost
};
}

[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -748,28 +820,31 @@ void BuildProducer()
void ConsumerPollHandler(object o)
{
int index = (int)o;
_consumers[index].Subscribe(Collections.Singleton(StateName), _consumerListener);
_consumers[index].Subscribe(Collections.Singleton(StateName), _consumerListeners[index]);
while (_consumerPollRun)
{
try
{
_consumers[index].ConsumeAsync(100);
bool execute = false;
lock (_assignmentWaiters[index])
lock (_consumerAssociatedPartition)
{
if (_assignmentWaiters[index].SafeWaitHandle.IsClosed) continue;
else execute = _assignmentWaiters[index].WaitOne(0);
}
if (execute)
{
try
{
var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, index));
Interlocked.Exchange(ref _lastPartitionLags[index], lag.IsPresent() ? lag.AsLong : -1);
}
catch (Java.Lang.IllegalStateException)
foreach (var partitionIndex in _consumerAssociatedPartition[index])
{
Interlocked.Exchange(ref _lastPartitionLags[index], -1);
bool execute = false;
if (_assignmentWaiters[partitionIndex].SafeWaitHandle.IsClosed) continue;
else execute = _assignmentWaiters[partitionIndex].WaitOne(0);
if (execute)
{
try
{
var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, partitionIndex));
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], lag.IsPresent() ? lag.AsLong : -1);
}
catch (Java.Lang.IllegalStateException)
{
Interlocked.Exchange(ref _lastPartitionLags[partitionIndex], -1);
}
}
}
}
}
Expand All @@ -786,6 +861,8 @@ public void Start()
if (string.IsNullOrWhiteSpace(BootstrapServers)) throw new InvalidOperationException("BootstrapServers must be set before start.");
if (string.IsNullOrWhiteSpace(StateName)) throw new InvalidOperationException("StateName must be set before start.");

if (ConsumerInstances > Partitions) throw new InvalidOperationException("ConsumerInstances cannot be high than Partitions");

Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(BootstrapServers).ToProperties();
_admin = KafkaAdminClient.Create(props);

Expand Down Expand Up @@ -839,8 +916,8 @@ public void Start()
if (_consumers != null)
{
_consumerPollRun = true;
_consumerPollThreads = new Thread[_partitions];
for (int i = 0; i < _partitions; i++)
_consumerPollThreads = new Thread[ConsumersToAllocate()];
for (int i = 0; i < ConsumersToAllocate(); i++)
{
_consumerPollThreads[i] = new Thread(ConsumerPollHandler);
_consumerPollThreads[i].Start(i);
Expand Down Expand Up @@ -874,9 +951,14 @@ public void SyncWait(int timeout = Timeout.Infinite)
bool sync = false;
while (!sync && watcher.ElapsedMilliseconds < (uint)timeout)
{
for (int i = 0; i < _partitions; i++)
for (int i = 0; i < ConsumersToAllocate(); i++)
{
sync = _consumers[i].IsEmpty && (Interlocked.Read(ref _lastPartitionLags[i]) == 0 || Interlocked.Read(ref _lastPartitionLags[i]) == -1);
bool lagInSync = true;
foreach (var partitionIndex in _consumerAssociatedPartition[i])
{
lagInSync &= Interlocked.Read(ref _lastPartitionLags[partitionIndex]) == 0 || Interlocked.Read(ref _lastPartitionLags[partitionIndex]) == -1;
}
sync = _consumers[i].IsEmpty && lagInSync;
}
}
}
Expand Down
1 change: 0 additions & 1 deletion tests/KNetClassicTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
using Org.Apache.Kafka.Clients.Admin;
using Org.Apache.Kafka.Clients.Consumer;
using Org.Apache.Kafka.Clients.Producer;
using Org.Apache.Kafka.Common.Config;
using Org.Apache.Kafka.Common.Serialization;
using System;
using System.Text;
Expand Down
30 changes: 13 additions & 17 deletions tests/KNetCompactedReplicatorTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,9 @@
* Refer to LICENSE for more information.
*/

using Java.Util;
using MASES.KNet.Admin;
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Extensions;
using MASES.KNet.Producer;
using MASES.KNet.Replicator;
using MASES.KNet.Serialization;
using MASES.KNet.Serialization.Json;
using MASES.KNet.TestCommon;
using Org.Apache.Kafka.Clients.Admin;
using Org.Apache.Kafka.Clients.Consumer;
using Org.Apache.Kafka.Clients.Producer;
using System;
using System.Diagnostics;
using System.Threading;
Expand Down Expand Up @@ -73,15 +63,19 @@ static void Main(string[] args)
serverToUse = args[0];
}
var sw = Stopwatch.StartNew();
TestValues("TestValues", 100, UpdateModeTypes.OnDelivery);
TestValues("TestValues", 100, UpdateModeTypes.OnDelivery, 5);
sw.Stop();
Console.WriteLine($"End TestValues in {sw.Elapsed}");
sw = Stopwatch.StartNew();
Test("TestOnDelivery", 100, UpdateModeTypes.OnDelivery | UpdateModeTypes.Delayed);
Test("TestOnDelivery", 100, UpdateModeTypes.OnDelivery | UpdateModeTypes.Delayed, 5);
sw.Stop();
Console.WriteLine($"End TestOnDelivery in {sw.Elapsed}");
sw = Stopwatch.StartNew();
Test("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed);
Test("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5);
sw.Stop();
Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
sw = Stopwatch.StartNew();
Test("TestOnConsumeLessConsumers", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed, 5, 2);
sw.Stop();
Console.WriteLine($"End TestOnConsume in {sw.Elapsed}");
Console.CancelKeyPress += Console_CancelKeyPress;
Expand All @@ -95,11 +89,12 @@ private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs
if (e.Cancel) resetEvent.Set();
}

private static void TestValues(string topicName, int length, UpdateModeTypes type)
private static void TestValues(string topicName, int length, UpdateModeTypes type, int partitions, int? consumers = null)
{
using (var replicator = new KNetCompactedReplicator<int, TestType>()
{
Partitions = 5,
Partitions = partitions,
ConsumerInstances = consumers,
UpdateMode = type,
BootstrapServers = serverToUse,
StateName = topicName,
Expand All @@ -122,11 +117,12 @@ private static void TestValues(string topicName, int length, UpdateModeTypes typ
}
}

private static void Test(string topicName, int length, UpdateModeTypes type)
private static void Test(string topicName, int length, UpdateModeTypes type, int partitions, int? consumers = null)
{
using (var replicator = new KNetCompactedReplicator<int, TestType>()
{
Partitions = 5,
Partitions = partitions,
ConsumerInstances = consumers,
UpdateMode = type,
BootstrapServers = serverToUse,
StateName = topicName,
Expand Down
1 change: 0 additions & 1 deletion tests/KNetTestAdmin/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/

using Java.Util;
using MASES.KNet;
using Org.Apache.Kafka.Clients.Admin;
using Org.Apache.Kafka.Common.Config;
using MASES.KNet.TestCommon;
Expand Down

0 comments on commit 2f38fc3

Please sign in to comment.