From 2c9db99a219a23f3ba4ccd9868d750db29c7f2fa Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Fri, 1 Sep 2023 12:12:30 +0200 Subject: [PATCH 1/4] Temp commit --- .../KNet/Specific/KNetCompactedReplicator.cs | 826 ++++++++++++++---- 1 file changed, 673 insertions(+), 153 deletions(-) diff --git a/src/net/KNet/Specific/KNetCompactedReplicator.cs b/src/net/KNet/Specific/KNetCompactedReplicator.cs index e4d2e75fe8..b6b0c4391e 100644 --- a/src/net/KNet/Specific/KNetCompactedReplicator.cs +++ b/src/net/KNet/Specific/KNetCompactedReplicator.cs @@ -16,7 +16,9 @@ * Refer to LICENSE for more information. */ +using Java.Time; using Java.Util; +using Javax.Xml.Crypto; using MASES.JCOBridge.C2JBridge; using MASES.KNet.Admin; using MASES.KNet.Common; @@ -34,62 +36,344 @@ using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; using System.Threading; namespace MASES.KNet { + #region AccessRightsType /// - /// Provides a reliable dictionary, persisted in a COMPACTED Kafka topic and shared among applications + /// access rights to data + /// + [Flags] + public enum AccessRightsType + { + /// + /// Data are readable, i.e. aligned with the others and accessible from this + /// + Read = 1, + /// + /// Data are writable, i.e. updates can be produced, but this is not accessible and not aligned with the others + /// + Write = 2, + /// + /// Data are readable and writable, i.e. updates can be produced, and data are aligned with the others and accessible from this + /// + ReadWrite = Read | Write, + } + + #endregion + + #region UpdateModeTypes + + /// + /// update modes + /// + [Flags()] + public enum UpdateModeTypes + { + /// + /// The is updated as soon as an update is delivered to Kafka by the current application + /// + OnDelivery = 1, + /// + /// The is updated only after an update is consumed from Kafka, even if the add or update is made locally by the current instance + /// + OnConsume = 2, + /// + /// The is updated only after an update is consumed from Kafka, even if the add or update is made locally by the current instance. Plus the update waits the consume of the data before unlock + /// + OnConsumeSync = 3, + /// + /// The value is stored in only upon a request, otherwise only the key is stored + /// + Delayed = 0x1000 + } + + #endregion + + #region IKNetCompactedReplicator + /// + /// Public interface for /// /// The type of keys in the dictionary /// The type of values in the dictionary. Must be a nullable type - public class KNetCompactedReplicator : - IDictionary, - IDisposable + public interface IKNetCompactedReplicator : IDictionary, IDisposable where TValue : class { - #region AccessRightsType + #region Events + /// - /// access rights to data + /// Called when a [, ] is updated by consuming data from the others /// - [Flags] - public enum AccessRightsType - { - /// - /// Data are readable, i.e. aligned with the others and accessible from this - /// - Read = 1, - /// - /// Data are writable, i.e. updates can be produced, but this is not accessible and not aligned with the others - /// - Write = 2, - /// - /// Data are readable and writable, i.e. updates can be produced, and data are aligned with the others and accessible from this - /// - ReadWrite = Read | Write, - } + event Action, KeyValuePair> OnRemoteUpdate; + + /// + /// Called when a [, ] is removed by consuming data from the others + /// + event Action, KeyValuePair> OnRemoteRemove; + + /// + /// Called when a [, ] is updated on this + /// + event Action, KeyValuePair> OnLocalUpdate; + + /// + /// Called when a [, ] is removed from this + /// + event Action, KeyValuePair> OnLocalRemove; + + /// + /// If contains the it is called to request if the [, ] shall be stored in the + /// + event Func, KeyValuePair, bool> OnDelayedStore; #endregion - #region UpdateModeTypes + #region Public Properties + /// + /// Get or set + /// + AccessRightsType AccessRights { get; } + /// + /// Get or set + /// + UpdateModeTypes UpdateMode { get; } + /// + /// Get or set bootstrap servers + /// + string BootstrapServers { get; } + /// + /// Get or set topic name + /// + string StateName { get; } + /// + /// Get or set the group id, if not set a value is generated + /// + string GroupId { get; } + /// + /// Get or set partitions to use when topic is created for the first time, otherwise reports the partiions of the topic + /// + int Partitions { get; } + /// + /// Get or set replication factor to use when topic is created for the first time, otherwise reports the replication factor of the topic + /// + short ReplicationFactor { get; } + /// + /// Get or set to use when topic is created for the first time + /// + TopicConfigBuilder TopicConfig { get; } + /// + /// Get or set to use in + /// + ConsumerConfigBuilder ConsumerConfig { get; } + /// + /// Get or set to use in + /// + ProducerConfigBuilder ProducerConfig { get; } + /// + /// Get or set to use in , by default it creates a default one based on + /// + KNetSerDes KeySerDes { get; } + /// + /// Get or set to use in , by default it creates a default one based on + /// + KNetSerDes ValueSerDes { get; } + /// + /// if the instance was started + /// + bool IsStarted { get; } + /// + /// if the instance was started + /// + bool IsAssigned { get; } + + #endregion + #region Public methods /// - /// update modes + /// Start this : create the topic if not available, allocates Producer and Consumer, sets serializer/deserializer /// - public enum UpdateModeTypes + /// Some errors occurred + void Start(); + /// + /// Start this : create the topic if not available, allocates Producer and Consumers, sets serializer/deserializer + /// Then waits its synchronization with topic which stores dictionary data + /// + /// Some errors occurred or the provided do not include the flag + void StartAndWait(int timeout = Timeout.Infinite); + /// + /// Waits for all paritions assignment of the topic which stores dictionary data + /// + /// The number of milliseconds to wait, or to wait indefinitely + /// if the current instance receives a signal within the given ; otherwise, + /// The provided do not include the flag + bool WaitForStateAssignment(int timeout = Timeout.Infinite); + /// + /// Waits that is synchronized to the topic which stores dictionary data + /// + /// The number of milliseconds to wait, or to wait indefinitely + /// if the current instance synchronize within the given ; otherwise, + /// The provided do not include the flag + void SyncWait(int timeout = Timeout.Infinite); + /// + /// Waits until all outstanding produce requests and delivery report callbacks are completed + /// + void Flush(); + + #endregion + } + + #endregion + + #region IKNetCompactedReplicator + /// + /// Provides a reliable dictionary, persisted in a COMPACTED Kafka topic and shared among applications + /// + /// The type of keys in the dictionary + /// The type of values in the dictionary. Must be a nullable type + public class KNetCompactedReplicator : IKNetCompactedReplicator + where TValue : class + { + #region Local storage data + + interface ILocalDataStorage { - /// - /// The is updated as soon as an update is delivered to Kafka by the current application - /// - OnDelivery = 1, - /// - /// The is updated only after an update is consumed from Kafka, even if the add or update is made locally by the current instance - /// - OnConsume = 2, - /// - /// The is updated only after an update is consumed from Kafka, even if the add or update is made locally by the current instance. Plus the update waits the consume of the data before unlock - /// - OnConsumeSync = 3 + object Lock { get; } + Int32 Partition { get; set; } + bool HasOffset { get; set; } + Int64 Offset { get; set; } + bool HasValue { get; set; } + TValue Value { get; set; } + } + + struct LocalDataStorage : ILocalDataStorage + { + object _lock = new object(); + public LocalDataStorage() + { + } + public object Lock => _lock; + public int Partition { get; set; } + public bool HasOffset { get; set; } + public long Offset { get; set; } + public bool HasValue { get; set; } + public TValue Value { get; set; } + } + + #endregion + + #region Local Enumerator + + class LocalDataStorageEnumerator : IEnumerator> + { + private IEnumerator> _enumerator; + private readonly ConcurrentDictionary _dictionary; + private readonly IKNetConsumer _consumer = null; + private readonly string _topic; + public LocalDataStorageEnumerator(ConcurrentDictionary dictionary, IKNetConsumer consumer, string topic) + { + _dictionary = dictionary; + _consumer = consumer; + _topic = topic; + _enumerator = _dictionary.GetEnumerator(); + } + + KeyValuePair? _current = null; + public KeyValuePair Current + { + get + { + lock (_enumerator) + { + if (_current == null) + { + var localCurrent = _enumerator.Current; + ILocalDataStorage data = localCurrent.Value; + lock (data.Lock) + { + if (!data.HasValue) + { + OnDemandRetrieve(_consumer, _topic, data); + } + _current = new KeyValuePair(localCurrent.Key, localCurrent.Value.Value); + } + } + return _current.Value; + } + } + } + + object IEnumerator.Current => Current; + + public void Dispose() + { + _enumerator.Dispose(); + } + + public bool MoveNext() + { + lock (_enumerator) + { + _current = null; + return _enumerator.MoveNext(); + } + } + + public void Reset() + { + _enumerator.Reset(); + } + + public System.Collections.Generic.ICollection Values() + { + System.Collections.Generic.List values = new System.Collections.Generic.List(); + while (_enumerator.MoveNext()) + { + values.Add(_enumerator.Current.Value.Value); + } + return values; + } + + public bool TryGetValue(TKey key, out TValue value) + { + value = default; + if (_dictionary.TryGetValue(key, out var data)) + { + if (!data.HasValue) + { + OnDemandRetrieve(_consumer, _topic, data); + } + value = data.Value; + return true; + } + return false; + } + + public bool Contains(KeyValuePair item) + { + if (_dictionary.TryGetValue(item.Key, out var data)) + { + if (!data.HasValue) + { + OnDemandRetrieve(_consumer, _topic, data); + } + if (data.HasValue && data.Value == item.Value) return true; + } + return false; + } + + public void CopyTo(KeyValuePair[] array, int arrayIndex) + { + var values = new System.Collections.Generic.List>(); + while (_enumerator.MoveNext()) + { + values.Add(new KeyValuePair(_enumerator.Current.Key, _enumerator.Current.Value.Value)); + } + + Array.Copy(values.ToArray(), 0, array, arrayIndex, values.Count); + } } #endregion @@ -97,14 +381,16 @@ public enum UpdateModeTypes #region Private members private bool _consumerPollRun = false; - private Thread _consumerPollThread = null; + private Thread[] _consumerPollThreads = null; private IAdmin _admin = null; - private ConcurrentDictionary _dictionary = new ConcurrentDictionary(); + private ConcurrentDictionary _dictionary = new ConcurrentDictionary(); private ConsumerRebalanceListener _consumerListener = null; - private KNetConsumer _consumer = null; + private KNetConsumer[] _consumers = null; + private KNetConsumer _onTheFlyConsumer = null; private KNetProducer _producer = null; private string _bootstrapServers = null; private string _stateName = string.Empty; + private string _groupId = Guid.NewGuid().ToString(); private int _partitions = 1; private short _replicationFactor = 1; private TopicConfigBuilder _topicConfig = null; @@ -113,7 +399,8 @@ public enum UpdateModeTypes private AccessRightsType _accessrights = AccessRightsType.ReadWrite; private UpdateModeTypes _updateMode = UpdateModeTypes.OnDelivery; private Tuple _OnConsumeSyncWaiter = null; - private readonly ManualResetEvent _assignmentWaiter = new ManualResetEvent(false); + private ManualResetEvent[] _assignmentWaiters; + private long[] _lastPartitionLags = null; private KNetSerDes _keySerDes = null; private KNetSerDes _valueSerDes = null; @@ -127,22 +414,27 @@ public enum UpdateModeTypes /// /// Called when a [, ] is updated by consuming data from the others /// - public Action, KeyValuePair> OnRemoteUpdate; + public event Action, KeyValuePair> OnRemoteUpdate; /// - /// Called when a is removed by consuming data from the others + /// Called when a [, ] is removed by consuming data from the others /// - public Action, TKey> OnRemoteRemove; + public event Action, KeyValuePair> OnRemoteRemove; /// - /// Called when a is removed from this + /// Called when a [, ] is updated on this /// public event Action, KeyValuePair> OnLocalUpdate; /// - /// Called when a is removed from this + /// Called when a [, ] is removed from this + /// + public event Action, KeyValuePair> OnLocalRemove; + + /// + /// If contains the it is called to request if the [, ] shall be stored in the /// - public event Action, TKey> OnLocalRemove; + public event Func, KeyValuePair, bool> OnDelayedStore; #endregion @@ -164,11 +456,15 @@ public enum UpdateModeTypes /// public string StateName { get { return _stateName; } set { CheckStarted(); _stateName = value; } } /// - /// Get or set partitions to use when topic is created for the first time + /// Get or set the group id, if not set a value is generated + /// + public string GroupId { get { return _groupId; } set { CheckStarted(); _groupId = value; } } + /// + /// Get or set partitions to use when topic is created for the first time, otherwise reports the partiions of the topic /// public int Partitions { get { return _partitions; } set { CheckStarted(); _partitions = value; } } /// - /// Get or set replication factor to use when topic is created for the first time + /// Get or set replication factor to use when topic is created for the first time, otherwise reports the replication factor of the topic /// public short ReplicationFactor { get { return _replicationFactor; } set { CheckStarted(); _replicationFactor = value; } } /// @@ -191,26 +487,82 @@ public enum UpdateModeTypes /// Get or set to use in , by default it creates a default one based on /// public KNetSerDes ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } } + /// + /// if the instance was started + /// + public bool IsStarted => _started; + /// + /// if the instance was started + /// + public bool IsAssigned => _assignmentWaiters.All((o) => o.WaitOne(0)); #endregion #region Private methods - + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] + static void OnDemandRetrieve(IKNetConsumer consumer, string topic, ILocalDataStorage data) + { + if (!data.HasValue) + { + var topicPartition = new TopicPartition(topic, data.Partition); + consumer.Assign(Collections.SingletonList(topicPartition)); + consumer.Seek(topicPartition, data.Offset); + var results = consumer.Poll(TimeSpan.FromMinutes(1)); + if (results == null) throw new InvalidOperationException("Failed to get records from remote."); + foreach (var result in results) + { + if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}"); + data.HasValue = true; + data.Value = result.Value; + break; + } + } + } + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] void CheckStarted() { if (_started) throw new InvalidOperationException("Cannot be changed after Start"); } + bool UpdateModeOnDelivery => (UpdateMode & UpdateModeTypes.OnDelivery) == UpdateModeTypes.OnDelivery; + + bool UpdateModeOnConsume => (UpdateMode & UpdateModeTypes.OnConsume) == UpdateModeTypes.OnConsume; + + bool UpdateModeOnConsumeSync => (UpdateMode & UpdateModeTypes.OnConsumeSync) == UpdateModeTypes.OnConsumeSync; + + bool UpdateModeDelayed => UpdateMode.HasFlag(UpdateModeTypes.Delayed); + private void OnMessage(KNetConsumerRecord record) { if (record.Value == null) { - _dictionary.TryRemove(record.Key, out _); - OnRemoteRemove?.Invoke(this, record.Key); + _dictionary.TryRemove(record.Key, out var data); + OnRemoteRemove?.Invoke(this, new KeyValuePair(record.Key, data.Value)); } else { - _dictionary[record.Key] = record.Value; + ILocalDataStorage data; + if (!_dictionary.TryGetValue(record.Key, out data)) + { + data = new LocalDataStorage(); + _dictionary[record.Key] = data; + } + lock (data.Lock) + { + data.Partition = record.Partition; + data.HasOffset = true; + data.Offset = record.Offset; + bool storeValue = true; + if (UpdateModeDelayed) + { + storeValue = (OnDelayedStore != null) ? OnDelayedStore.Invoke(this, new KeyValuePair(record.Key, record.Value)) : false; + } + if (storeValue) + { + data.HasValue = true; + data.Value = record.Value; + } + } OnRemoteUpdate?.Invoke(this, new KeyValuePair(record.Key, record.Value)); } @@ -225,14 +577,28 @@ private void OnMessage(KNetConsumerRecord record) private void OnTopicPartitionsAssigned(Collection topicPartitions) { - _assignmentWaiter?.Set(); + foreach (var topicPartition in topicPartitions) + { + _assignmentWaiters[topicPartition.Partition()].Set(); + } } - private void OnTopicPartitionsRevoked(Collection topicPartitionOffsets) + private void OnTopicPartitionsRevoked(Collection topicPartitions) { - _assignmentWaiter?.Set(); + foreach (var topicPartition in topicPartitions) + { + _assignmentWaiters[topicPartition.Partition()].Reset(); + } } + private void OnTopicPartitionsLost(Collection topicPartitions) + { + foreach (var topicPartition in topicPartitions) + { + _assignmentWaiters[topicPartition.Partition()].Reset(); + } + } + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] private void RemoveRecord(TKey key) { ValidateAccessRights(AccessRightsType.Write); @@ -240,10 +606,9 @@ private void RemoveRecord(TKey key) if (key == null) throw new ArgumentNullException(nameof(key)); - if (UpdateMode == UpdateModeTypes.OnDelivery) + if (UpdateModeOnDelivery) { JVMBridgeException exception = null; - DateTime pTimestamp = DateTime.MaxValue; using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false)) { @@ -255,7 +620,6 @@ private void RemoveRecord(TKey key) { if (deliverySemaphore.SafeWaitHandle.IsClosed) return; - exception = error; deliverySemaphore.Set(); } @@ -269,13 +633,13 @@ private void RemoveRecord(TKey key) } } } - else if (UpdateMode == UpdateModeTypes.OnConsume || UpdateMode == UpdateModeTypes.OnConsumeSync) + else if (UpdateModeOnConsume || UpdateModeOnConsumeSync) { _producer.Produce(StateName, key, null, (Callback)null); } _dictionary.TryRemove(key, out _); } - + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] private void AddOrUpdate(TKey key, TValue value) { ValidateAccessRights(AccessRightsType.Write); @@ -283,8 +647,9 @@ private void AddOrUpdate(TKey key, TValue value) if (key == null) throw new ArgumentNullException(nameof(key)); - if (UpdateMode == UpdateModeTypes.OnDelivery) + if (UpdateModeOnDelivery) { + RecordMetadata metadata = null; JVMBridgeException exception = null; DateTime pTimestamp = DateTime.MaxValue; using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false)) @@ -297,7 +662,7 @@ private void AddOrUpdate(TKey key, TValue value) { if (deliverySemaphore.SafeWaitHandle.IsClosed) return; - + metadata = record; exception = error; deliverySemaphore.Set(); } @@ -313,19 +678,40 @@ private void AddOrUpdate(TKey key, TValue value) if (value == null) { - _dictionary.TryRemove(key, out _); - OnLocalRemove?.Invoke(this, key); + _dictionary.TryRemove(key, out var data); + OnLocalRemove?.Invoke(this, new KeyValuePair(key, data.Value)); } else { - _dictionary[key] = value; + ILocalDataStorage data; + if (!_dictionary.TryGetValue(key, out data)) + { + data = new LocalDataStorage(); + _dictionary[key] = data; + } + lock (data.Lock) + { + data.Partition = metadata.Partition(); + data.HasOffset = metadata.HasOffset(); + data.Offset = metadata.Offset(); + bool storeValue = true; + if (UpdateModeDelayed) + { + storeValue = (OnDelayedStore != null) ? OnDelayedStore.Invoke(this, new KeyValuePair(key, value)) : false; + } + if (storeValue) + { + data.HasValue = true; + data.Value = value; + } + } OnLocalUpdate?.Invoke(this, new KeyValuePair(key, value)); } } - else if (UpdateMode == UpdateModeTypes.OnConsume || UpdateMode == UpdateModeTypes.OnConsumeSync) + else if (UpdateModeOnConsume || UpdateModeOnConsumeSync) { _producer.Produce(StateName, key, value, (Callback)null); - if (UpdateMode == UpdateModeTypes.OnConsumeSync) + if (UpdateModeOnConsumeSync) { _OnConsumeSyncWaiter = new Tuple(key, new ManualResetEvent(false)); _OnConsumeSyncWaiter.Item2.WaitOne(); @@ -333,17 +719,108 @@ private void AddOrUpdate(TKey key, TValue value) } } } - + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] private void ValidateAccessRights(AccessRightsType rights) { if (!_accessrights.HasFlag(rights)) throw new InvalidOperationException($"{rights} access flag not set"); } - private ConcurrentDictionary ValidateAndGetLocalDictionary() + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] + void BuildConsumers() { - ValidateAccessRights(AccessRightsType.Read); - return _dictionary; + _consumerConfig ??= ConsumerConfigBuilder.Create().WithEnableAutoCommit(true) + .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST) + .WithAllowAutoCreateTopics(false); + + ConsumerConfig.BootstrapServers = BootstrapServers; + ConsumerConfig.GroupId = GroupId; + if (ConsumerConfig.CanApplyBasicDeserializer() && KeySerDes == null) + { + KeySerDes = new KNetSerDes(); + } + + if (KeySerDes == null) throw new InvalidOperationException($"{typeof(TKey)} needs an external deserializer, set KeySerDes."); + + if (ConsumerConfig.CanApplyBasicDeserializer() && ValueSerDes == null) + { + ValueSerDes = new KNetSerDes(); + } + + if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external deserializer, set ValueSerDes."); + + _assignmentWaiters = new ManualResetEvent[_partitions]; + _lastPartitionLags = new long[_partitions]; + _consumers = new KNetConsumer[_partitions]; + + for (int i = 0; i < _partitions; i++) + { + _assignmentWaiters[i] = new ManualResetEvent(false); + _lastPartitionLags[i] = -1; + _consumers[i] = new KNetConsumer(ConsumerConfig, KeySerDes, ValueSerDes); + _consumers[i].SetCallback(OnMessage); + } + _consumerListener = new ConsumerRebalanceListener() + { + OnOnPartitionsRevoked = OnTopicPartitionsRevoked, + OnOnPartitionsAssigned = OnTopicPartitionsAssigned, + OnOnPartitionsLost = OnTopicPartitionsLost + }; + } + + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] + void BuildOnTheFlyConsumer() + { + if (_onTheFlyConsumer == null) + { + ConsumerConfigBuilder consumerConfigBuilder = ConsumerConfigBuilder.CreateFrom(_consumerConfig); + consumerConfigBuilder.WithEnableAutoCommit(false).WithGroupId(Guid.NewGuid().ToString()); + + _onTheFlyConsumer = new KNetConsumer(consumerConfigBuilder, KeySerDes, ValueSerDes); + } + } + [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] + void BuildProducer() + { + _producerConfig ??= ProducerConfigBuilder.Create().WithAcks(ProducerConfigBuilder.AcksTypes.All) + .WithRetries(0) + .WithLingerMs(1); + + ProducerConfig.BootstrapServers = BootstrapServers; + if (ProducerConfig.CanApplyBasicSerializer() && KeySerDes == null) + { + KeySerDes = new KNetSerDes(); + } + + if (KeySerDes == null) throw new InvalidOperationException($"{typeof(TKey)} needs an external serializer, set KeySerDes."); + + if (ProducerConfig.CanApplyBasicSerializer() && ValueSerDes == null) + { + ValueSerDes = new KNetSerDes(); + } + + if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external serializer, set ValueSerDes."); + + _producer = new KNetProducer(ProducerConfig, KeySerDes, ValueSerDes); + } + + void ConsumerPollHandler(object o) + { + int index = (int)o; + _consumers[index].Subscribe(Collections.Singleton(StateName), _consumerListener); + while (_consumerPollRun) + { + try + { + _consumers[index].ConsumeAsync(100); + if (_assignmentWaiters[index].WaitOne(0)) + { + var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, index)); + Interlocked.Exchange(ref _lastPartitionLags[index], lag.IsPresent() ? lag.AsLong : -1); + } + } + catch { } + } } #endregion @@ -366,7 +843,8 @@ public void Start() var topic = new NewTopic(StateName, Partitions, ReplicationFactor); _topicConfig ??= TopicConfigBuilder.Create().WithDeleteRetentionMs(100) .WithMinCleanableDirtyRatio(0.01) - .WithSegmentMs(100); + .WithSegmentMs(100) + .WithRetentionBytes(1073741824); TopicConfig.CleanupPolicy = TopicConfigBuilder.CleanupPolicyTypes.Compact | TopicConfigBuilder.CleanupPolicyTypes.Delete; topic = topic.Configs(TopicConfig); @@ -376,89 +854,67 @@ public void Start() } catch (TopicExistsException) { + // recover partitions of the topic + try + { + var result = _admin.DescribeTopics(Collections.Singleton(StateName)); + if (result != null) + { + var map = result.AllTopicNames().Get(); + if (map != null) + { + var topicDesc = map.Get(StateName); + _partitions = topicDesc.Partitions().Size(); + } + } + } + catch + { + + } } } if (AccessRights.HasFlag(AccessRightsType.Read)) { - _consumerConfig ??= ConsumerConfigBuilder.Create().WithEnableAutoCommit(true) - .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST) - .WithAllowAutoCreateTopics(false); - - ConsumerConfig.BootstrapServers = BootstrapServers; - ConsumerConfig.GroupId = Guid.NewGuid().ToString(); - if (ConsumerConfig.CanApplyBasicDeserializer() && KeySerDes == null) - { - KeySerDes = new KNetSerDes(); - } - - if (KeySerDes == null) throw new InvalidOperationException($"{typeof(TKey)} needs an external deserializer, set KeySerDes."); - - if (ConsumerConfig.CanApplyBasicDeserializer() && ValueSerDes == null) - { - ValueSerDes = new KNetSerDes(); - } - - if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external deserializer, set ValueSerDes."); - - _consumer = new KNetConsumer(ConsumerConfig, KeySerDes, ValueSerDes); - _consumer.SetCallback(OnMessage); - _consumerListener = new ConsumerRebalanceListener() - { - OnOnPartitionsRevoked = OnTopicPartitionsRevoked, - OnOnPartitionsAssigned = OnTopicPartitionsAssigned - }; + BuildConsumers(); } if (AccessRights.HasFlag(AccessRightsType.Write)) { - _producerConfig ??= ProducerConfigBuilder.Create().WithAcks(ProducerConfigBuilder.AcksTypes.All) - .WithRetries(0) - .WithLingerMs(1); - - ProducerConfig.BootstrapServers = BootstrapServers; - if (ProducerConfig.CanApplyBasicSerializer() && KeySerDes == null) - { - KeySerDes = new KNetSerDes(); - } - - if (KeySerDes == null) throw new InvalidOperationException($"{typeof(TKey)} needs an external serializer, set KeySerDes."); - - if (ProducerConfig.CanApplyBasicSerializer() && ValueSerDes == null) - { - ValueSerDes = new KNetSerDes(); - } - - if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external serializer, set ValueSerDes."); - - _producer = new KNetProducer(ProducerConfig, KeySerDes, ValueSerDes); + BuildProducer(); } - if (_consumer != null) + if (_consumers != null) { _consumerPollRun = true; - _consumerPollThread = new Thread(ConsumerPollHandler); - _consumerPollThread.Start(); + _consumerPollThreads = new Thread[_partitions]; + for (int i = 0; i < _partitions; i++) + { + _consumerPollThreads[i] = new Thread(ConsumerPollHandler); + _consumerPollThreads[i].Start(i); + } } _started = true; } - void ConsumerPollHandler(object o) + + /// + /// Start this : create the topic if not available, allocates Producer and Consumers, sets serializer/deserializer + /// Then waits its synchronization with topic which stores dictionary data + /// + /// Some errors occurred or the provided do not include the flag + public void StartAndWait(int timeout = Timeout.Infinite) { - _consumer.Subscribe(Collections.Singleton(StateName), _consumerListener); - while (_consumerPollRun) - { - try - { - _consumer.ConsumeAsync(100); - } - catch { } - } + ValidateAccessRights(AccessRightsType.Read); + Start(); + WaitForStateAssignment(timeout); + SyncWait(timeout); } /// - /// Waits for the very first parition assignment of the topic which stores dictionary data + /// Waits for all paritions assignment of the topic which stores dictionary data /// /// The number of milliseconds to wait, or to wait indefinitely /// if the current instance receives a signal within the given ; otherwise, @@ -466,8 +922,26 @@ void ConsumerPollHandler(object o) public bool WaitForStateAssignment(int timeout = Timeout.Infinite) { ValidateAccessRights(AccessRightsType.Read); - - return _assignmentWaiter.WaitOne(timeout); + return WaitHandle.WaitAll(_assignmentWaiters, timeout); + } + /// + /// Waits that is synchronized to the topic which stores dictionary data + /// + /// The number of milliseconds to wait, or to wait indefinitely + /// if the current instance synchronize within the given ; otherwise, + /// The provided do not include the flag + public void SyncWait(int timeout = Timeout.Infinite) + { + ValidateAccessRights(AccessRightsType.Read); + Stopwatch watcher = Stopwatch.StartNew(); + bool sync = false; + while (!sync && watcher.ElapsedMilliseconds < (uint)timeout) + { + for (int i = 0; i < _partitions; i++) + { + sync = Interlocked.Read(ref _lastPartitionLags[i]) == 0 && _consumers[i].IsEmpty; + } + } } /// @@ -475,6 +949,7 @@ public bool WaitForStateAssignment(int timeout = Timeout.Infinite) /// public void Flush() { + ValidateAccessRights(AccessRightsType.Write); _producer?.Flush(); } @@ -493,7 +968,16 @@ public void Flush() /// The call is get and is not found public TValue this[TKey key] { - get { return ValidateAndGetLocalDictionary()[key]; } + get + { + ValidateAccessRights(AccessRightsType.Read); + BuildOnTheFlyConsumer(); + if (!new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).TryGetValue(key, out var data)) + { + throw new IndexOutOfRangeException($"Key {key} not available locally"); + } + return data; + } set { AddOrUpdate(key, value); } } @@ -504,7 +988,11 @@ public TValue this[TKey key] /// The provided do not include the flag public System.Collections.Generic.ICollection Keys { - get { return ValidateAndGetLocalDictionary().Keys; } + get + { + ValidateAccessRights(AccessRightsType.Read); + return _dictionary.Keys; + } } /// @@ -514,7 +1002,12 @@ public System.Collections.Generic.ICollection Keys /// The provided do not include the flag public System.Collections.Generic.ICollection Values { - get { return ValidateAndGetLocalDictionary().Values; } + get + { + ValidateAccessRights(AccessRightsType.Read); + BuildOnTheFlyConsumer(); + return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).Values(); + } } /// @@ -524,7 +1017,11 @@ public System.Collections.Generic.ICollection Values /// The provided do not include the flag public int Count { - get { return ValidateAndGetLocalDictionary().Count; } + get + { + ValidateAccessRights(AccessRightsType.Read); + return _dictionary.Count; + } } /// @@ -559,12 +1056,13 @@ public void Add(KeyValuePair item) } /// - /// Clears this , resetting all paritions' sync + /// Clears this , resetting all partitions' sync /// /// The provided do not include the flag public void Clear() { - ValidateAndGetLocalDictionary().Clear(); + ValidateAccessRights(AccessRightsType.Write); + _dictionary.Clear(); } /// @@ -576,7 +1074,9 @@ public void Clear() /// The provided do not include the flag public bool Contains(KeyValuePair item) { - return (ValidateAndGetLocalDictionary() as IDictionary).Contains(item); + ValidateAccessRights(AccessRightsType.Read); + BuildOnTheFlyConsumer(); + return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).Contains(item); } /// @@ -588,7 +1088,8 @@ public bool Contains(KeyValuePair item) /// The provided do not include the flag public bool ContainsKey(TKey key) { - return ValidateAndGetLocalDictionary().ContainsKey(key); + ValidateAccessRights(AccessRightsType.Read); + return _dictionary.ContainsKey(key); } /// @@ -605,7 +1106,9 @@ public bool ContainsKey(TKey key) /// The provided do not include the flag public void CopyTo(KeyValuePair[] array, int arrayIndex) { - (ValidateAndGetLocalDictionary() as ICollection).CopyTo(array, arrayIndex); + ValidateAccessRights(AccessRightsType.Read); + BuildOnTheFlyConsumer(); + new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).CopyTo(array, arrayIndex); } /// @@ -615,7 +1118,9 @@ public void CopyTo(KeyValuePair[] array, int arrayIndex) /// The provided do not include the flag public IEnumerator> GetEnumerator() { - return ValidateAndGetLocalDictionary().GetEnumerator(); + ValidateAccessRights(AccessRightsType.Read); + BuildOnTheFlyConsumer(); + return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName); } /// @@ -655,17 +1160,19 @@ public bool Remove(KeyValuePair item) /// if the was found in this ; otherwise, /// is /// The provided do not include the flag - public bool TryGetValue( - TKey key, - out TValue value) + public bool TryGetValue(TKey key, out TValue value) { - return ValidateAndGetLocalDictionary().TryGetValue(key, out value); + ValidateAccessRights(AccessRightsType.Read); + BuildOnTheFlyConsumer(); + return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).TryGetValue(key, out value); } /// The provided do not include the flag IEnumerator IEnumerable.GetEnumerator() { - return (ValidateAndGetLocalDictionary() as IEnumerable).GetEnumerator(); + ValidateAccessRights(AccessRightsType.Read); + BuildOnTheFlyConsumer(); + return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName); } #endregion @@ -679,14 +1186,27 @@ public void Dispose() { _consumerPollRun = false; - _consumer?.Dispose(); + if (_consumers != null) + { + foreach (var item in _consumers) + { + item?.Dispose(); + } + } _producer?.Flush(); _producer?.Dispose(); - _assignmentWaiter?.Close(); + if (_assignmentWaiters != null) + { + foreach (var item in _assignmentWaiters) + { + item?.Close(); + } + } } #endregion } + #endregion } From 1168b08b9d14958e3de5cb70f62fc6fbf21c1b78 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Fri, 1 Sep 2023 13:18:26 +0200 Subject: [PATCH 2/4] complete --- .../KNet/Specific/{ => Replicator}/KNetCompactedReplicator.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) rename src/net/KNet/Specific/{ => Replicator}/KNetCompactedReplicator.cs (99%) diff --git a/src/net/KNet/Specific/KNetCompactedReplicator.cs b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs similarity index 99% rename from src/net/KNet/Specific/KNetCompactedReplicator.cs rename to src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs index b6b0c4391e..27c3040ef7 100644 --- a/src/net/KNet/Specific/KNetCompactedReplicator.cs +++ b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs @@ -18,7 +18,6 @@ using Java.Time; using Java.Util; -using Javax.Xml.Crypto; using MASES.JCOBridge.C2JBridge; using MASES.KNet.Admin; using MASES.KNet.Common; @@ -40,7 +39,7 @@ using System.Linq; using System.Threading; -namespace MASES.KNet +namespace MASES.KNet.Replicator { #region AccessRightsType /// From d0bd4c541587a4922aa10674c34a3770de2bbec3 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Sat, 2 Sep 2023 03:20:43 +0200 Subject: [PATCH 3/4] #218: update KNetCompactedReplicator plus added more tests --- .../Replicator/KNetCompactedReplicator.cs | 23 +- tests/KNetCompactedReplicatorTest/Program.cs | 212 ++---------------- 2 files changed, 33 insertions(+), 202 deletions(-) diff --git a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs index 27c3040ef7..7133b0b8b1 100644 --- a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs +++ b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs @@ -38,6 +38,7 @@ using System.Diagnostics; using System.Linq; using System.Threading; +using static Javax.Swing.Text.Html.HTML; namespace MASES.KNet.Replicator { @@ -294,7 +295,7 @@ public KeyValuePair Current { if (!data.HasValue) { - OnDemandRetrieve(_consumer, _topic, data); + OnDemandRetrieve(_consumer, _topic, localCurrent.Key, data); } _current = new KeyValuePair(localCurrent.Key, localCurrent.Value.Value); } @@ -342,7 +343,7 @@ public bool TryGetValue(TKey key, out TValue value) { if (!data.HasValue) { - OnDemandRetrieve(_consumer, _topic, data); + OnDemandRetrieve(_consumer, _topic, key, data); } value = data.Value; return true; @@ -356,7 +357,7 @@ public bool Contains(KeyValuePair item) { if (!data.HasValue) { - OnDemandRetrieve(_consumer, _topic, data); + OnDemandRetrieve(_consumer, _topic, item.Key, data); } if (data.HasValue && data.Value == item.Value) return true; } @@ -499,7 +500,7 @@ public void CopyTo(KeyValuePair[] array, int arrayIndex) #region Private methods [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)] - static void OnDemandRetrieve(IKNetConsumer consumer, string topic, ILocalDataStorage data) + static void OnDemandRetrieve(IKNetConsumer consumer, string topic, TKey key, ILocalDataStorage data) { if (!data.HasValue) { @@ -510,6 +511,7 @@ static void OnDemandRetrieve(IKNetConsumer consumer, string topic, if (results == null) throw new InvalidOperationException("Failed to get records from remote."); foreach (var result in results) { + if (!Equals(result.Key, key)) continue; if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}"); data.HasValue = true; data.Value = result.Value; @@ -814,8 +816,15 @@ void ConsumerPollHandler(object o) _consumers[index].ConsumeAsync(100); if (_assignmentWaiters[index].WaitOne(0)) { - var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, index)); - Interlocked.Exchange(ref _lastPartitionLags[index], lag.IsPresent() ? lag.AsLong : -1); + try + { + var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, index)); + Interlocked.Exchange(ref _lastPartitionLags[index], lag.IsPresent() ? lag.AsLong : -1); + } + catch (Java.Lang.IllegalStateException) + { + Interlocked.Exchange(ref _lastPartitionLags[index], -1); + } } } catch { } @@ -938,7 +947,7 @@ public void SyncWait(int timeout = Timeout.Infinite) { for (int i = 0; i < _partitions; i++) { - sync = Interlocked.Read(ref _lastPartitionLags[i]) == 0 && _consumers[i].IsEmpty; + sync = _consumers[i].IsEmpty && (Interlocked.Read(ref _lastPartitionLags[i]) == 0 || Interlocked.Read(ref _lastPartitionLags[i]) == -1) ; } } } diff --git a/tests/KNetCompactedReplicatorTest/Program.cs b/tests/KNetCompactedReplicatorTest/Program.cs index 61f65b0210..86c38524ff 100644 --- a/tests/KNetCompactedReplicatorTest/Program.cs +++ b/tests/KNetCompactedReplicatorTest/Program.cs @@ -17,19 +17,18 @@ */ using Java.Util; -using MASES.KNet; using MASES.KNet.Admin; using MASES.KNet.Common; using MASES.KNet.Consumer; using MASES.KNet.Extensions; using MASES.KNet.Producer; +using MASES.KNet.Replicator; using MASES.KNet.Serialization; using MASES.KNet.Serialization.Json; using MASES.KNet.TestCommon; using Org.Apache.Kafka.Clients.Admin; using Org.Apache.Kafka.Clients.Consumer; using Org.Apache.Kafka.Clients.Producer; -using Org.Apache.Kafka.Common.Config; using System; using System.Threading; @@ -46,8 +45,6 @@ class Program static string topicToUse = theTopic; static readonly ManualResetEvent resetEvent = new(false); - static KNetCompactedReplicator _replicator = null; - public class TestType { public TestType(int i) @@ -75,18 +72,9 @@ static void Main(string[] args) serverToUse = args[0]; } - _replicator = new KNetCompactedReplicator() - { - UpdateMode = KNetCompactedReplicator.UpdateModeTypes.OnConsumeSync, - BootstrapServers = serverToUse, - StateName = "TestState", - ValueSerDes = new JsonSerDes(), - }; - - _replicator.Start(); - _replicator.WaitForStateAssignment(); + Test("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed); - _replicator["Test"] = new TestType(100); + Test("TestOnDelivery", 100, UpdateModeTypes.OnDelivery | UpdateModeTypes.Delayed); Console.CancelKeyPress += Console_CancelKeyPress; Console.WriteLine("Press Ctrl-C to exit"); @@ -99,196 +87,30 @@ private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs if (e.Cancel) resetEvent.Set(); } - static void CreateTopic() + private static void Test(string topicName, int length, UpdateModeTypes type) { - try - { - string topicName = topicToUse; - int partitions = 1; - short replicationFactor = 1; - - var topic = new NewTopic(topicName, partitions, replicationFactor); - - /**** Direct mode ****** - var map = Collections.SingletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); - topic.Configs(map); - *********/ - topic = topic.Configs(TopicConfigBuilder.Create().WithCleanupPolicy(TopicConfigBuilder.CleanupPolicyTypes.Compact | TopicConfigBuilder.CleanupPolicyTypes.Delete) - .WithDeleteRetentionMs(100) - .WithMinCleanableDirtyRatio(0.01) - .WithSegmentMs(100)); - - var coll = Collections.Singleton(topic); - - /**** Direct mode ****** - Properties props = new Properties(); - props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse); - *******/ - - Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(serverToUse).ToProperties(); - - using (IAdmin admin = KafkaAdminClient.Create(props)) - { - /******* standard - // Create a compacted topic - CreateTopicsResult result = admin.CreateTopics(coll); - - // Call values() to get the result for a specific topic - var future = result.Values.Get(topicName); - - // Call get() to block until the topic creation is complete or has failed - // if creation failed the ExecutionException wraps the underlying cause. - future.Get(); - ********/ - admin.CreateTopic(topicName, partitions, replicationFactor); - } - } - catch (Java.Util.Concurrent.ExecutionException ex) - { - Console.WriteLine(ex.InnerException.Message); - } - catch (Exception e) + using (var replicator = new KNetCompactedReplicator() { - Console.WriteLine(e.Message); - } - } - - static void ProduceSomething() - { - try + Partitions = 5, + UpdateMode = type, + BootstrapServers = serverToUse, + StateName = topicName, + ValueSerDes = new JsonSerDes(), + }) { - /**** Direct mode ****** - Properties props = new Properties(); - props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse); - props.Put(ProducerConfig.ACKS_CONFIG, "all"); - props.Put(ProducerConfig.RETRIES_CONFIG, 0); - props.Put(ProducerConfig.LINGER_MS_CONFIG, 1); - ******/ - - Properties props = ProducerConfigBuilder.Create() - .WithBootstrapServers(serverToUse) - .WithAcks(ProducerConfigBuilder.AcksTypes.All) - .WithRetries(0) - .WithLingerMs(1) - .ToProperties(); + replicator.StartAndWait(); - KNetSerDes keySerializer = new KNetSerDes(); - JsonSerDes valueSerializer = new JsonSerDes(); - try - { - using (var producer = new KNetProducer(props, keySerializer, valueSerializer)) - { - int i = 0; - Callback callback = null; - if (useCallback) - { - callback = new Callback() - { - OnOnCompletion = (o1, o2) => - { - if (o2 != null) Console.WriteLine(o2.ToString()); - else Console.WriteLine($"Produced on topic {o1.Topic()} at offset {o1.Offset()}"); - } - }; - } - try - { - while (!resetEvent.WaitOne(0)) - { - var record = new KNetProducerRecord(topicToUse, i.ToString(), new TestType(i)); - var result = useCallback ? producer.Send(record, callback) : producer.Send(record); - Console.WriteLine($"Producing: {record} with result: {result.Get()}"); - producer.Flush(); - i++; - } - } - finally { if (useCallback) callback.Dispose(); } - } - } - finally + for (int i = 0; i < length; i++) { - keySerializer?.Dispose(); - valueSerializer?.Dispose(); + replicator[i] = new TestType(i); } - } - catch (Java.Util.Concurrent.ExecutionException ex) - { - Console.WriteLine("Producer ended with error: {0}", ex.InnerException.Message); - } - catch (Exception ex) - { - Console.WriteLine("Producer ended with error: {0}", ex.Message); - } - } - static void ConsumeSomething() - { - try - { - /**** Direct mode ****** - Properties props = new Properties(); - props.Put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse); - props.Put(ConsumerConfig.GROUP_ID_CONFIG, "test"); - props.Put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - props.Put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); - *******/ + replicator.SyncWait(); - Properties props = ConsumerConfigBuilder.Create() - .WithBootstrapServers(serverToUse) - .WithGroupId("test") - .WithEnableAutoCommit(true) - .WithAutoCommitIntervalMs(1000) - .ToProperties(); - - KNetSerDes keyDeserializer = new KNetSerDes(); - KNetSerDes valueDeserializer = new JsonSerDes(); - ConsumerRebalanceListener rebalanceListener = null; - KNetConsumer consumer = null; - - if (useCallback) - { - rebalanceListener = new ConsumerRebalanceListener() - { - OnOnPartitionsRevoked = (o) => - { - Console.WriteLine("Revoked: {0}", o.ToString()); - }, - OnOnPartitionsAssigned = (o) => - { - Console.WriteLine("Assigned: {0}", o.ToString()); - } - }; - } - try + foreach (var item in replicator) { - using (consumer = new KNetConsumer(props, keyDeserializer, valueDeserializer)) - { - if (useCallback) consumer.Subscribe(Collections.Singleton(topicToUse), rebalanceListener); - else consumer.Subscribe(Collections.Singleton(topicToUse)); - - while (!resetEvent.WaitOne(0)) - { - var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds); - foreach (var item in records) - { - Console.WriteLine($"Consuming from Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}"); - } - } - } + Console.WriteLine($"Key: {item.Key} - Value: {item.Value}"); } - finally - { - keyDeserializer?.Dispose(); - valueDeserializer?.Dispose(); - } - } - catch (Java.Util.Concurrent.ExecutionException ex) - { - Console.WriteLine("Consumer ended with error: {0}", ex.InnerException.Message); - } - catch (Exception ex) - { - Console.WriteLine("Consumer ended with error: {0}", ex.Message); } } } From dc16f84fa651d8771eb541d30290deba17ba2d4f Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Sat, 2 Sep 2023 03:38:58 +0200 Subject: [PATCH 4/4] Docker fix --- src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs index 7133b0b8b1..8b187d89e2 100644 --- a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs +++ b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs @@ -253,6 +253,10 @@ struct LocalDataStorage : ILocalDataStorage object _lock = new object(); public LocalDataStorage() { + Partition = -1; + HasOffset = HasValue = false; + Offset = -1; + Value = null; } public object Lock => _lock; public int Partition { get; set; }