diff --git a/src/net/KNet/Specific/KNetCompactedReplicator.cs b/src/net/KNet/Specific/KNetCompactedReplicator.cs
deleted file mode 100644
index e4d2e75fe8..0000000000
--- a/src/net/KNet/Specific/KNetCompactedReplicator.cs
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
-* Copyright 2023 MASES s.r.l.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*
-* Refer to LICENSE for more information.
-*/
-
-using Java.Util;
-using MASES.JCOBridge.C2JBridge;
-using MASES.KNet.Admin;
-using MASES.KNet.Common;
-using MASES.KNet.Consumer;
-using MASES.KNet.Extensions;
-using MASES.KNet.Producer;
-using MASES.KNet.Serialization;
-using Org.Apache.Kafka.Clients.Admin;
-using Org.Apache.Kafka.Clients.Consumer;
-using Org.Apache.Kafka.Clients.Producer;
-using Org.Apache.Kafka.Common;
-using Org.Apache.Kafka.Common.Config;
-using Org.Apache.Kafka.Common.Errors;
-using System;
-using System.Collections;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Threading;
-
-namespace MASES.KNet
-{
- ///
- /// Provides a reliable dictionary, persisted in a COMPACTED Kafka topic and shared among applications
- ///
- /// The type of keys in the dictionary
- /// The type of values in the dictionary. Must be a nullable type
- public class KNetCompactedReplicator :
- IDictionary,
- IDisposable
- where TValue : class
- {
- #region AccessRightsType
- ///
- /// access rights to data
- ///
- [Flags]
- public enum AccessRightsType
- {
- ///
- /// Data are readable, i.e. aligned with the others and accessible from this
- ///
- Read = 1,
- ///
- /// Data are writable, i.e. updates can be produced, but this is not accessible and not aligned with the others
- ///
- Write = 2,
- ///
- /// Data are readable and writable, i.e. updates can be produced, and data are aligned with the others and accessible from this
- ///
- ReadWrite = Read | Write,
- }
-
- #endregion
-
- #region UpdateModeTypes
-
- ///
- /// update modes
- ///
- public enum UpdateModeTypes
- {
- ///
- /// The is updated as soon as an update is delivered to Kafka by the current application
- ///
- OnDelivery = 1,
- ///
- /// The is updated only after an update is consumed from Kafka, even if the add or update is made locally by the current instance
- ///
- OnConsume = 2,
- ///
- /// The is updated only after an update is consumed from Kafka, even if the add or update is made locally by the current instance. Plus the update waits the consume of the data before unlock
- ///
- OnConsumeSync = 3
- }
-
- #endregion
-
- #region Private members
-
- private bool _consumerPollRun = false;
- private Thread _consumerPollThread = null;
- private IAdmin _admin = null;
- private ConcurrentDictionary _dictionary = new ConcurrentDictionary();
- private ConsumerRebalanceListener _consumerListener = null;
- private KNetConsumer _consumer = null;
- private KNetProducer _producer = null;
- private string _bootstrapServers = null;
- private string _stateName = string.Empty;
- private int _partitions = 1;
- private short _replicationFactor = 1;
- private TopicConfigBuilder _topicConfig = null;
- private ConsumerConfigBuilder _consumerConfig = null;
- private ProducerConfigBuilder _producerConfig = null;
- private AccessRightsType _accessrights = AccessRightsType.ReadWrite;
- private UpdateModeTypes _updateMode = UpdateModeTypes.OnDelivery;
- private Tuple _OnConsumeSyncWaiter = null;
- private readonly ManualResetEvent _assignmentWaiter = new ManualResetEvent(false);
-
- private KNetSerDes _keySerDes = null;
- private KNetSerDes _valueSerDes = null;
-
- private bool _started = false;
-
- #endregion
-
- #region Events
-
- ///
- /// Called when a [, ] is updated by consuming data from the others
- ///
- public Action, KeyValuePair> OnRemoteUpdate;
-
- ///
- /// Called when a is removed by consuming data from the others
- ///
- public Action, TKey> OnRemoteRemove;
-
- ///
- /// Called when a is removed from this
- ///
- public event Action, KeyValuePair> OnLocalUpdate;
-
- ///
- /// Called when a is removed from this
- ///
- public event Action, TKey> OnLocalRemove;
-
- #endregion
-
- #region Public Properties
- ///
- /// Get or set
- ///
- public AccessRightsType AccessRights { get { return _accessrights; } set { CheckStarted(); _accessrights = value; } }
- ///
- /// Get or set
- ///
- public UpdateModeTypes UpdateMode { get { return _updateMode; } set { CheckStarted(); _updateMode = value; } }
- ///
- /// Get or set bootstrap servers
- ///
- public string BootstrapServers { get { return _bootstrapServers; } set { CheckStarted(); _bootstrapServers = value; } }
- ///
- /// Get or set topic name
- ///
- public string StateName { get { return _stateName; } set { CheckStarted(); _stateName = value; } }
- ///
- /// Get or set partitions to use when topic is created for the first time
- ///
- public int Partitions { get { return _partitions; } set { CheckStarted(); _partitions = value; } }
- ///
- /// Get or set replication factor to use when topic is created for the first time
- ///
- public short ReplicationFactor { get { return _replicationFactor; } set { CheckStarted(); _replicationFactor = value; } }
- ///
- /// Get or set to use when topic is created for the first time
- ///
- public TopicConfigBuilder TopicConfig { get { return _topicConfig; } set { CheckStarted(); _topicConfig = value; } }
- ///
- /// Get or set to use in
- ///
- public ConsumerConfigBuilder ConsumerConfig { get { return _consumerConfig; } set { CheckStarted(); _consumerConfig = value; } }
- ///
- /// Get or set to use in
- ///
- public ProducerConfigBuilder ProducerConfig { get { return _producerConfig; } set { CheckStarted(); _producerConfig = value; } }
- ///
- /// Get or set to use in , by default it creates a default one based on
- ///
- public KNetSerDes KeySerDes { get { return _keySerDes; } set { CheckStarted(); _keySerDes = value; } }
- ///
- /// Get or set to use in , by default it creates a default one based on
- ///
- public KNetSerDes ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } }
-
- #endregion
-
- #region Private methods
-
- void CheckStarted()
- {
- if (_started) throw new InvalidOperationException("Cannot be changed after Start");
- }
-
- private void OnMessage(KNetConsumerRecord record)
- {
- if (record.Value == null)
- {
- _dictionary.TryRemove(record.Key, out _);
- OnRemoteRemove?.Invoke(this, record.Key);
- }
- else
- {
- _dictionary[record.Key] = record.Value;
- OnRemoteUpdate?.Invoke(this, new KeyValuePair(record.Key, record.Value));
- }
-
- if (_OnConsumeSyncWaiter != null)
- {
- if (_OnConsumeSyncWaiter.Item1.Equals(record.Key))
- {
- _OnConsumeSyncWaiter.Item2.Set();
- }
- }
- }
-
- private void OnTopicPartitionsAssigned(Collection topicPartitions)
- {
- _assignmentWaiter?.Set();
- }
-
- private void OnTopicPartitionsRevoked(Collection topicPartitionOffsets)
- {
- _assignmentWaiter?.Set();
- }
-
- private void RemoveRecord(TKey key)
- {
- ValidateAccessRights(AccessRightsType.Write);
-
- if (key == null)
- throw new ArgumentNullException(nameof(key));
-
- if (UpdateMode == UpdateModeTypes.OnDelivery)
- {
- JVMBridgeException exception = null;
-
- DateTime pTimestamp = DateTime.MaxValue;
- using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false))
- {
- using (Callback cb = new Callback()
- {
- OnOnCompletion = (record, error) =>
- {
- try
- {
- if (deliverySemaphore.SafeWaitHandle.IsClosed)
- return;
-
- exception = error;
- deliverySemaphore.Set();
- }
- catch { }
- }
- })
- {
- _producer.Produce(new KNetProducerRecord(_stateName, key, null), cb);
- deliverySemaphore.WaitOne();
- if (exception != null) throw exception;
- }
- }
- }
- else if (UpdateMode == UpdateModeTypes.OnConsume || UpdateMode == UpdateModeTypes.OnConsumeSync)
- {
- _producer.Produce(StateName, key, null, (Callback)null);
- }
- _dictionary.TryRemove(key, out _);
- }
-
- private void AddOrUpdate(TKey key, TValue value)
- {
- ValidateAccessRights(AccessRightsType.Write);
-
- if (key == null)
- throw new ArgumentNullException(nameof(key));
-
- if (UpdateMode == UpdateModeTypes.OnDelivery)
- {
- JVMBridgeException exception = null;
- DateTime pTimestamp = DateTime.MaxValue;
- using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false))
- {
- using (Callback cb = new Callback()
- {
- OnOnCompletion = (record, error) =>
- {
- try
- {
- if (deliverySemaphore.SafeWaitHandle.IsClosed)
- return;
-
- exception = error;
- deliverySemaphore.Set();
- }
- catch { }
- }
- })
- {
- _producer.Produce(new KNetProducerRecord(_stateName, key, value), cb);
- deliverySemaphore.WaitOne();
- if (exception != null) throw exception;
- }
- }
-
- if (value == null)
- {
- _dictionary.TryRemove(key, out _);
- OnLocalRemove?.Invoke(this, key);
- }
- else
- {
- _dictionary[key] = value;
- OnLocalUpdate?.Invoke(this, new KeyValuePair(key, value));
- }
- }
- else if (UpdateMode == UpdateModeTypes.OnConsume || UpdateMode == UpdateModeTypes.OnConsumeSync)
- {
- _producer.Produce(StateName, key, value, (Callback)null);
- if (UpdateMode == UpdateModeTypes.OnConsumeSync)
- {
- _OnConsumeSyncWaiter = new Tuple(key, new ManualResetEvent(false));
- _OnConsumeSyncWaiter.Item2.WaitOne();
- _OnConsumeSyncWaiter.Item2.Dispose();
- }
- }
- }
-
- private void ValidateAccessRights(AccessRightsType rights)
- {
- if (!_accessrights.HasFlag(rights))
- throw new InvalidOperationException($"{rights} access flag not set");
- }
-
- private ConcurrentDictionary ValidateAndGetLocalDictionary()
- {
- ValidateAccessRights(AccessRightsType.Read);
- return _dictionary;
- }
-
- #endregion
-
- #region Public methods
- ///
- /// Start this : create the topic if not available, allocates Producer and Consumer, sets serializer/deserializer
- ///
- /// Some errors occurred
- public void Start()
- {
- if (string.IsNullOrWhiteSpace(BootstrapServers)) throw new InvalidOperationException("BootstrapServers must be set before start.");
- if (string.IsNullOrWhiteSpace(StateName)) throw new InvalidOperationException("StateName must be set before start.");
-
- Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(BootstrapServers).ToProperties();
- _admin = KafkaAdminClient.Create(props);
-
- if (AccessRights.HasFlag(AccessRightsType.Write))
- {
- var topic = new NewTopic(StateName, Partitions, ReplicationFactor);
- _topicConfig ??= TopicConfigBuilder.Create().WithDeleteRetentionMs(100)
- .WithMinCleanableDirtyRatio(0.01)
- .WithSegmentMs(100);
-
- TopicConfig.CleanupPolicy = TopicConfigBuilder.CleanupPolicyTypes.Compact | TopicConfigBuilder.CleanupPolicyTypes.Delete;
- topic = topic.Configs(TopicConfig);
- try
- {
- _admin.CreateTopic(topic);
- }
- catch (TopicExistsException)
- {
- }
- }
-
- if (AccessRights.HasFlag(AccessRightsType.Read))
- {
- _consumerConfig ??= ConsumerConfigBuilder.Create().WithEnableAutoCommit(true)
- .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
- .WithAllowAutoCreateTopics(false);
-
- ConsumerConfig.BootstrapServers = BootstrapServers;
- ConsumerConfig.GroupId = Guid.NewGuid().ToString();
- if (ConsumerConfig.CanApplyBasicDeserializer() && KeySerDes == null)
- {
- KeySerDes = new KNetSerDes();
- }
-
- if (KeySerDes == null) throw new InvalidOperationException($"{typeof(TKey)} needs an external deserializer, set KeySerDes.");
-
- if (ConsumerConfig.CanApplyBasicDeserializer() && ValueSerDes == null)
- {
- ValueSerDes = new KNetSerDes();
- }
-
- if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external deserializer, set ValueSerDes.");
-
- _consumer = new KNetConsumer(ConsumerConfig, KeySerDes, ValueSerDes);
- _consumer.SetCallback(OnMessage);
- _consumerListener = new ConsumerRebalanceListener()
- {
- OnOnPartitionsRevoked = OnTopicPartitionsRevoked,
- OnOnPartitionsAssigned = OnTopicPartitionsAssigned
- };
- }
-
- if (AccessRights.HasFlag(AccessRightsType.Write))
- {
- _producerConfig ??= ProducerConfigBuilder.Create().WithAcks(ProducerConfigBuilder.AcksTypes.All)
- .WithRetries(0)
- .WithLingerMs(1);
-
- ProducerConfig.BootstrapServers = BootstrapServers;
- if (ProducerConfig.CanApplyBasicSerializer() && KeySerDes == null)
- {
- KeySerDes = new KNetSerDes();
- }
-
- if (KeySerDes == null) throw new InvalidOperationException($"{typeof(TKey)} needs an external serializer, set KeySerDes.");
-
- if (ProducerConfig.CanApplyBasicSerializer() && ValueSerDes == null)
- {
- ValueSerDes = new KNetSerDes();
- }
-
- if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external serializer, set ValueSerDes.");
-
- _producer = new KNetProducer(ProducerConfig, KeySerDes, ValueSerDes);
- }
-
- if (_consumer != null)
- {
- _consumerPollRun = true;
- _consumerPollThread = new Thread(ConsumerPollHandler);
- _consumerPollThread.Start();
- }
-
- _started = true;
- }
-
- void ConsumerPollHandler(object o)
- {
- _consumer.Subscribe(Collections.Singleton(StateName), _consumerListener);
- while (_consumerPollRun)
- {
- try
- {
- _consumer.ConsumeAsync(100);
- }
- catch { }
- }
- }
-
- ///
- /// Waits for the very first parition assignment of the topic which stores dictionary data
- ///
- /// The number of milliseconds to wait, or to wait indefinitely
- /// if the current instance receives a signal within the given ; otherwise,
- /// The provided do not include the flag
- public bool WaitForStateAssignment(int timeout = Timeout.Infinite)
- {
- ValidateAccessRights(AccessRightsType.Read);
-
- return _assignmentWaiter.WaitOne(timeout);
- }
-
- ///
- /// Waits until all outstanding produce requests and delivery report callbacks are completed
- ///
- public void Flush()
- {
- _producer?.Flush();
- }
-
- #endregion
-
- #region IDictionary
-
- ///
- /// Gets or sets the element with the specified keyy. value removes the specified key
- ///
- /// The key of the element to get or set
- /// The element with the specified key
- /// The call is get, and the provided do not include the flag
- /// The call is set, and the provided do not include the flag
- /// is null
- /// The call is get and is not found
- public TValue this[TKey key]
- {
- get { return ValidateAndGetLocalDictionary()[key]; }
- set { AddOrUpdate(key, value); }
- }
-
- ///
- /// Gets an containing the keys of this
- ///
- /// containing the keys of this
- /// The provided do not include the flag
- public System.Collections.Generic.ICollection Keys
- {
- get { return ValidateAndGetLocalDictionary().Keys; }
- }
-
- ///
- /// Gets an containing the values of this
- ///
- /// containing the values of this
- /// The provided do not include the flag
- public System.Collections.Generic.ICollection Values
- {
- get { return ValidateAndGetLocalDictionary().Values; }
- }
-
- ///
- /// Gets the number of elements contained in this
- ///
- /// The number of elements contained in this
- /// The provided do not include the flag
- public int Count
- {
- get { return ValidateAndGetLocalDictionary().Count; }
- }
-
- ///
- /// if do not include the flag
- ///
- public bool IsReadOnly
- {
- get { return !_accessrights.HasFlag(AccessRightsType.Write); }
- }
-
- ///
- /// Adds or updates the in this and others in the way defined by the provided
- ///
- /// The object to use as the key of the element to add
- /// The object to use as the value of the element to add. null means remove
- /// is null
- /// The provided do not include the flag
- public void Add(TKey key, TValue value)
- {
- AddOrUpdate(key, value);
- }
-
- ///
- /// Adds or updates the in this and others in the way defined by the provided
- ///
- /// The item to add or updates. Value == null means remove key
- /// .Key is null
- /// The provided do not include the flag
- public void Add(KeyValuePair item)
- {
- AddOrUpdate(item.Key, item.Value);
- }
-
- ///
- /// Clears this , resetting all paritions' sync
- ///
- /// The provided do not include the flag
- public void Clear()
- {
- ValidateAndGetLocalDictionary().Clear();
- }
-
- ///
- /// Determines whether this contains the specified item
- ///
- /// The item to locate in this
- /// if this contains an element ; otherwise,
- /// .Key is
- /// The provided do not include the flag
- public bool Contains(KeyValuePair item)
- {
- return (ValidateAndGetLocalDictionary() as IDictionary).Contains(item);
- }
-
- ///
- /// Determines whether this contains an element with the specified key
- ///
- /// The key to locate in this
- /// if this contains an element with ; otherwise,
- /// is
- /// The provided do not include the flag
- public bool ContainsKey(TKey key)
- {
- return ValidateAndGetLocalDictionary().ContainsKey(key);
- }
-
- ///
- /// Copies the elements of this to an , starting at a particular index
- ///
- /// The one-dimensional that is the destination of the elements copied
- /// from this . The must have zero-based indexing
- /// The zero-based index in array at which copying beginsù
- /// is null
- /// is less than zero
- /// is multidimensional.
- /// -or- The number of elements in the source is greater than the available space from to the end of the destination .
- /// -or- The type of the source cannot be cast automatically to the type of the destination
- /// The provided do not include the flag
- public void CopyTo(KeyValuePair[] array, int arrayIndex)
- {
- (ValidateAndGetLocalDictionary() as ICollection).CopyTo(array, arrayIndex);
- }
-
- ///
- /// Returns an enumerator that iterates through this
- ///
- /// An enumerator for this
- /// The provided do not include the flag
- public IEnumerator> GetEnumerator()
- {
- return ValidateAndGetLocalDictionary().GetEnumerator();
- }
-
- ///
- /// Removes the from this and others in the way defined by the provided
- ///
- /// The key of the element to remove
- /// if the removal request is delivered to the others
- /// is
- /// The provided do not include the flag
- public bool Remove(TKey key)
- {
- AddOrUpdate(key, null);
-
- return true;
- }
-
- ///
- /// Removes the from this and others in the way defined by the provided
- ///
- /// Item to be removed
- /// if the removal request is delivered to the others
- /// .Key is
- /// The provided do not include the flag
- public bool Remove(KeyValuePair item)
- {
- AddOrUpdate(item.Key, null);
-
- return true;
- }
-
- ///
- /// Attempts to get the value associated with the specified from this
- ///
- /// The key of the value to get
- /// When this method returns, contains the object from this
- /// that has the specified , or the default value of the type if the operation failed
- /// if the was found in this ; otherwise,
- /// is
- /// The provided do not include the flag
- public bool TryGetValue(
- TKey key,
- out TValue value)
- {
- return ValidateAndGetLocalDictionary().TryGetValue(key, out value);
- }
-
- /// The provided do not include the flag
- IEnumerator IEnumerable.GetEnumerator()
- {
- return (ValidateAndGetLocalDictionary() as IEnumerable).GetEnumerator();
- }
-
- #endregion
-
- #region IDisposable
-
- ///
- /// Release all managed and unmanaged resources
- ///
- public void Dispose()
- {
- _consumerPollRun = false;
-
- _consumer?.Dispose();
-
- _producer?.Flush();
- _producer?.Dispose();
-
- _assignmentWaiter?.Close();
- }
-
- #endregion
- }
-}
diff --git a/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
new file mode 100644
index 0000000000..8b187d89e2
--- /dev/null
+++ b/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs
@@ -0,0 +1,1224 @@
+/*
+* Copyright 2023 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+using Java.Time;
+using Java.Util;
+using MASES.JCOBridge.C2JBridge;
+using MASES.KNet.Admin;
+using MASES.KNet.Common;
+using MASES.KNet.Consumer;
+using MASES.KNet.Extensions;
+using MASES.KNet.Producer;
+using MASES.KNet.Serialization;
+using Org.Apache.Kafka.Clients.Admin;
+using Org.Apache.Kafka.Clients.Consumer;
+using Org.Apache.Kafka.Clients.Producer;
+using Org.Apache.Kafka.Common;
+using Org.Apache.Kafka.Common.Config;
+using Org.Apache.Kafka.Common.Errors;
+using System;
+using System.Collections;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using static Javax.Swing.Text.Html.HTML;
+
+namespace MASES.KNet.Replicator
+{
+ #region AccessRightsType
+ ///
+ /// access rights to data
+ ///
+ [Flags]
+ public enum AccessRightsType
+ {
+ ///
+ /// Data are readable, i.e. aligned with the others and accessible from this
+ ///
+ Read = 1,
+ ///
+ /// Data are writable, i.e. updates can be produced, but this is not accessible and not aligned with the others
+ ///
+ Write = 2,
+ ///
+ /// Data are readable and writable, i.e. updates can be produced, and data are aligned with the others and accessible from this
+ ///
+ ReadWrite = Read | Write,
+ }
+
+ #endregion
+
+ #region UpdateModeTypes
+
+ ///
+ /// update modes
+ ///
+ [Flags()]
+ public enum UpdateModeTypes
+ {
+ ///
+ /// The is updated as soon as an update is delivered to Kafka by the current application
+ ///
+ OnDelivery = 1,
+ ///
+ /// The is updated only after an update is consumed from Kafka, even if the add or update is made locally by the current instance
+ ///
+ OnConsume = 2,
+ ///
+ /// The is updated only after an update is consumed from Kafka, even if the add or update is made locally by the current instance. Plus the update waits the consume of the data before unlock
+ ///
+ OnConsumeSync = 3,
+ ///
+ /// The value is stored in only upon a request, otherwise only the key is stored
+ ///
+ Delayed = 0x1000
+ }
+
+ #endregion
+
+ #region IKNetCompactedReplicator
+ ///
+ /// Public interface for
+ ///
+ /// The type of keys in the dictionary
+ /// The type of values in the dictionary. Must be a nullable type
+ public interface IKNetCompactedReplicator : IDictionary, IDisposable
+ where TValue : class
+ {
+ #region Events
+
+ ///
+ /// Called when a [, ] is updated by consuming data from the others
+ ///
+ event Action, KeyValuePair> OnRemoteUpdate;
+
+ ///
+ /// Called when a [, ] is removed by consuming data from the others
+ ///
+ event Action, KeyValuePair> OnRemoteRemove;
+
+ ///
+ /// Called when a [, ] is updated on this
+ ///
+ event Action, KeyValuePair> OnLocalUpdate;
+
+ ///
+ /// Called when a [, ] is removed from this
+ ///
+ event Action, KeyValuePair> OnLocalRemove;
+
+ ///
+ /// If contains the it is called to request if the [, ] shall be stored in the
+ ///
+ event Func, KeyValuePair, bool> OnDelayedStore;
+
+ #endregion
+
+ #region Public Properties
+ ///
+ /// Get or set
+ ///
+ AccessRightsType AccessRights { get; }
+ ///
+ /// Get or set
+ ///
+ UpdateModeTypes UpdateMode { get; }
+ ///
+ /// Get or set bootstrap servers
+ ///
+ string BootstrapServers { get; }
+ ///
+ /// Get or set topic name
+ ///
+ string StateName { get; }
+ ///
+ /// Get or set the group id, if not set a value is generated
+ ///
+ string GroupId { get; }
+ ///
+ /// Get or set partitions to use when topic is created for the first time, otherwise reports the partiions of the topic
+ ///
+ int Partitions { get; }
+ ///
+ /// Get or set replication factor to use when topic is created for the first time, otherwise reports the replication factor of the topic
+ ///
+ short ReplicationFactor { get; }
+ ///
+ /// Get or set to use when topic is created for the first time
+ ///
+ TopicConfigBuilder TopicConfig { get; }
+ ///
+ /// Get or set to use in
+ ///
+ ConsumerConfigBuilder ConsumerConfig { get; }
+ ///
+ /// Get or set to use in
+ ///
+ ProducerConfigBuilder ProducerConfig { get; }
+ ///
+ /// Get or set to use in , by default it creates a default one based on
+ ///
+ KNetSerDes KeySerDes { get; }
+ ///
+ /// Get or set to use in , by default it creates a default one based on
+ ///
+ KNetSerDes ValueSerDes { get; }
+ ///
+ /// if the instance was started
+ ///
+ bool IsStarted { get; }
+ ///
+ /// if the instance was started
+ ///
+ bool IsAssigned { get; }
+
+ #endregion
+
+ #region Public methods
+ ///
+ /// Start this : create the topic if not available, allocates Producer and Consumer, sets serializer/deserializer
+ ///
+ /// Some errors occurred
+ void Start();
+ ///
+ /// Start this : create the topic if not available, allocates Producer and Consumers, sets serializer/deserializer
+ /// Then waits its synchronization with topic which stores dictionary data
+ ///
+ /// Some errors occurred or the provided do not include the flag
+ void StartAndWait(int timeout = Timeout.Infinite);
+ ///
+ /// Waits for all paritions assignment of the topic which stores dictionary data
+ ///
+ /// The number of milliseconds to wait, or to wait indefinitely
+ /// if the current instance receives a signal within the given ; otherwise,
+ /// The provided do not include the flag
+ bool WaitForStateAssignment(int timeout = Timeout.Infinite);
+ ///
+ /// Waits that is synchronized to the topic which stores dictionary data
+ ///
+ /// The number of milliseconds to wait, or to wait indefinitely
+ /// if the current instance synchronize within the given ; otherwise,
+ /// The provided do not include the flag
+ void SyncWait(int timeout = Timeout.Infinite);
+ ///
+ /// Waits until all outstanding produce requests and delivery report callbacks are completed
+ ///
+ void Flush();
+
+ #endregion
+ }
+
+ #endregion
+
+ #region IKNetCompactedReplicator
+ ///
+ /// Provides a reliable dictionary, persisted in a COMPACTED Kafka topic and shared among applications
+ ///
+ /// The type of keys in the dictionary
+ /// The type of values in the dictionary. Must be a nullable type
+ public class KNetCompactedReplicator : IKNetCompactedReplicator
+ where TValue : class
+ {
+ #region Local storage data
+
+ interface ILocalDataStorage
+ {
+ object Lock { get; }
+ Int32 Partition { get; set; }
+ bool HasOffset { get; set; }
+ Int64 Offset { get; set; }
+ bool HasValue { get; set; }
+ TValue Value { get; set; }
+ }
+
+ struct LocalDataStorage : ILocalDataStorage
+ {
+ object _lock = new object();
+ public LocalDataStorage()
+ {
+ Partition = -1;
+ HasOffset = HasValue = false;
+ Offset = -1;
+ Value = null;
+ }
+ public object Lock => _lock;
+ public int Partition { get; set; }
+ public bool HasOffset { get; set; }
+ public long Offset { get; set; }
+ public bool HasValue { get; set; }
+ public TValue Value { get; set; }
+ }
+
+ #endregion
+
+ #region Local Enumerator
+
+ class LocalDataStorageEnumerator : IEnumerator>
+ {
+ private IEnumerator> _enumerator;
+ private readonly ConcurrentDictionary _dictionary;
+ private readonly IKNetConsumer _consumer = null;
+ private readonly string _topic;
+ public LocalDataStorageEnumerator(ConcurrentDictionary dictionary, IKNetConsumer consumer, string topic)
+ {
+ _dictionary = dictionary;
+ _consumer = consumer;
+ _topic = topic;
+ _enumerator = _dictionary.GetEnumerator();
+ }
+
+ KeyValuePair? _current = null;
+ public KeyValuePair Current
+ {
+ get
+ {
+ lock (_enumerator)
+ {
+ if (_current == null)
+ {
+ var localCurrent = _enumerator.Current;
+ ILocalDataStorage data = localCurrent.Value;
+ lock (data.Lock)
+ {
+ if (!data.HasValue)
+ {
+ OnDemandRetrieve(_consumer, _topic, localCurrent.Key, data);
+ }
+ _current = new KeyValuePair(localCurrent.Key, localCurrent.Value.Value);
+ }
+ }
+ return _current.Value;
+ }
+ }
+ }
+
+ object IEnumerator.Current => Current;
+
+ public void Dispose()
+ {
+ _enumerator.Dispose();
+ }
+
+ public bool MoveNext()
+ {
+ lock (_enumerator)
+ {
+ _current = null;
+ return _enumerator.MoveNext();
+ }
+ }
+
+ public void Reset()
+ {
+ _enumerator.Reset();
+ }
+
+ public System.Collections.Generic.ICollection Values()
+ {
+ System.Collections.Generic.List values = new System.Collections.Generic.List();
+ while (_enumerator.MoveNext())
+ {
+ values.Add(_enumerator.Current.Value.Value);
+ }
+ return values;
+ }
+
+ public bool TryGetValue(TKey key, out TValue value)
+ {
+ value = default;
+ if (_dictionary.TryGetValue(key, out var data))
+ {
+ if (!data.HasValue)
+ {
+ OnDemandRetrieve(_consumer, _topic, key, data);
+ }
+ value = data.Value;
+ return true;
+ }
+ return false;
+ }
+
+ public bool Contains(KeyValuePair item)
+ {
+ if (_dictionary.TryGetValue(item.Key, out var data))
+ {
+ if (!data.HasValue)
+ {
+ OnDemandRetrieve(_consumer, _topic, item.Key, data);
+ }
+ if (data.HasValue && data.Value == item.Value) return true;
+ }
+ return false;
+ }
+
+ public void CopyTo(KeyValuePair[] array, int arrayIndex)
+ {
+ var values = new System.Collections.Generic.List>();
+ while (_enumerator.MoveNext())
+ {
+ values.Add(new KeyValuePair(_enumerator.Current.Key, _enumerator.Current.Value.Value));
+ }
+
+ Array.Copy(values.ToArray(), 0, array, arrayIndex, values.Count);
+ }
+ }
+
+ #endregion
+
+ #region Private members
+
+ private bool _consumerPollRun = false;
+ private Thread[] _consumerPollThreads = null;
+ private IAdmin _admin = null;
+ private ConcurrentDictionary _dictionary = new ConcurrentDictionary();
+ private ConsumerRebalanceListener _consumerListener = null;
+ private KNetConsumer[] _consumers = null;
+ private KNetConsumer _onTheFlyConsumer = null;
+ private KNetProducer _producer = null;
+ private string _bootstrapServers = null;
+ private string _stateName = string.Empty;
+ private string _groupId = Guid.NewGuid().ToString();
+ private int _partitions = 1;
+ private short _replicationFactor = 1;
+ private TopicConfigBuilder _topicConfig = null;
+ private ConsumerConfigBuilder _consumerConfig = null;
+ private ProducerConfigBuilder _producerConfig = null;
+ private AccessRightsType _accessrights = AccessRightsType.ReadWrite;
+ private UpdateModeTypes _updateMode = UpdateModeTypes.OnDelivery;
+ private Tuple _OnConsumeSyncWaiter = null;
+ private ManualResetEvent[] _assignmentWaiters;
+ private long[] _lastPartitionLags = null;
+
+ private KNetSerDes _keySerDes = null;
+ private KNetSerDes _valueSerDes = null;
+
+ private bool _started = false;
+
+ #endregion
+
+ #region Events
+
+ ///
+ /// Called when a [, ] is updated by consuming data from the others
+ ///
+ public event Action, KeyValuePair> OnRemoteUpdate;
+
+ ///
+ /// Called when a [, ] is removed by consuming data from the others
+ ///
+ public event Action, KeyValuePair> OnRemoteRemove;
+
+ ///
+ /// Called when a [, ] is updated on this
+ ///
+ public event Action, KeyValuePair> OnLocalUpdate;
+
+ ///
+ /// Called when a [, ] is removed from this
+ ///
+ public event Action, KeyValuePair> OnLocalRemove;
+
+ ///
+ /// If contains the it is called to request if the [, ] shall be stored in the
+ ///
+ public event Func, KeyValuePair, bool> OnDelayedStore;
+
+ #endregion
+
+ #region Public Properties
+ ///
+ /// Get or set
+ ///
+ public AccessRightsType AccessRights { get { return _accessrights; } set { CheckStarted(); _accessrights = value; } }
+ ///
+ /// Get or set
+ ///
+ public UpdateModeTypes UpdateMode { get { return _updateMode; } set { CheckStarted(); _updateMode = value; } }
+ ///
+ /// Get or set bootstrap servers
+ ///
+ public string BootstrapServers { get { return _bootstrapServers; } set { CheckStarted(); _bootstrapServers = value; } }
+ ///
+ /// Get or set topic name
+ ///
+ public string StateName { get { return _stateName; } set { CheckStarted(); _stateName = value; } }
+ ///
+ /// Get or set the group id, if not set a value is generated
+ ///
+ public string GroupId { get { return _groupId; } set { CheckStarted(); _groupId = value; } }
+ ///
+ /// Get or set partitions to use when topic is created for the first time, otherwise reports the partiions of the topic
+ ///
+ public int Partitions { get { return _partitions; } set { CheckStarted(); _partitions = value; } }
+ ///
+ /// Get or set replication factor to use when topic is created for the first time, otherwise reports the replication factor of the topic
+ ///
+ public short ReplicationFactor { get { return _replicationFactor; } set { CheckStarted(); _replicationFactor = value; } }
+ ///
+ /// Get or set to use when topic is created for the first time
+ ///
+ public TopicConfigBuilder TopicConfig { get { return _topicConfig; } set { CheckStarted(); _topicConfig = value; } }
+ ///
+ /// Get or set to use in
+ ///
+ public ConsumerConfigBuilder ConsumerConfig { get { return _consumerConfig; } set { CheckStarted(); _consumerConfig = value; } }
+ ///
+ /// Get or set to use in
+ ///
+ public ProducerConfigBuilder ProducerConfig { get { return _producerConfig; } set { CheckStarted(); _producerConfig = value; } }
+ ///
+ /// Get or set to use in , by default it creates a default one based on
+ ///
+ public KNetSerDes KeySerDes { get { return _keySerDes; } set { CheckStarted(); _keySerDes = value; } }
+ ///
+ /// Get or set to use in , by default it creates a default one based on
+ ///
+ public KNetSerDes ValueSerDes { get { return _valueSerDes; } set { CheckStarted(); _valueSerDes = value; } }
+ ///
+ /// if the instance was started
+ ///
+ public bool IsStarted => _started;
+ ///
+ /// if the instance was started
+ ///
+ public bool IsAssigned => _assignmentWaiters.All((o) => o.WaitOne(0));
+
+ #endregion
+
+ #region Private methods
+ [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
+ static void OnDemandRetrieve(IKNetConsumer consumer, string topic, TKey key, ILocalDataStorage data)
+ {
+ if (!data.HasValue)
+ {
+ var topicPartition = new TopicPartition(topic, data.Partition);
+ consumer.Assign(Collections.SingletonList(topicPartition));
+ consumer.Seek(topicPartition, data.Offset);
+ var results = consumer.Poll(TimeSpan.FromMinutes(1));
+ if (results == null) throw new InvalidOperationException("Failed to get records from remote.");
+ foreach (var result in results)
+ {
+ if (!Equals(result.Key, key)) continue;
+ if (data.Offset != result.Offset) throw new IndexOutOfRangeException($"Requested offset is {data.Offset} while received offset is {result.Offset}");
+ data.HasValue = true;
+ data.Value = result.Value;
+ break;
+ }
+ }
+ }
+ [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
+ void CheckStarted()
+ {
+ if (_started) throw new InvalidOperationException("Cannot be changed after Start");
+ }
+
+ bool UpdateModeOnDelivery => (UpdateMode & UpdateModeTypes.OnDelivery) == UpdateModeTypes.OnDelivery;
+
+ bool UpdateModeOnConsume => (UpdateMode & UpdateModeTypes.OnConsume) == UpdateModeTypes.OnConsume;
+
+ bool UpdateModeOnConsumeSync => (UpdateMode & UpdateModeTypes.OnConsumeSync) == UpdateModeTypes.OnConsumeSync;
+
+ bool UpdateModeDelayed => UpdateMode.HasFlag(UpdateModeTypes.Delayed);
+
+ private void OnMessage(KNetConsumerRecord record)
+ {
+ if (record.Value == null)
+ {
+ _dictionary.TryRemove(record.Key, out var data);
+ OnRemoteRemove?.Invoke(this, new KeyValuePair(record.Key, data.Value));
+ }
+ else
+ {
+ ILocalDataStorage data;
+ if (!_dictionary.TryGetValue(record.Key, out data))
+ {
+ data = new LocalDataStorage();
+ _dictionary[record.Key] = data;
+ }
+ lock (data.Lock)
+ {
+ data.Partition = record.Partition;
+ data.HasOffset = true;
+ data.Offset = record.Offset;
+ bool storeValue = true;
+ if (UpdateModeDelayed)
+ {
+ storeValue = (OnDelayedStore != null) ? OnDelayedStore.Invoke(this, new KeyValuePair(record.Key, record.Value)) : false;
+ }
+ if (storeValue)
+ {
+ data.HasValue = true;
+ data.Value = record.Value;
+ }
+ }
+ OnRemoteUpdate?.Invoke(this, new KeyValuePair(record.Key, record.Value));
+ }
+
+ if (_OnConsumeSyncWaiter != null)
+ {
+ if (_OnConsumeSyncWaiter.Item1.Equals(record.Key))
+ {
+ _OnConsumeSyncWaiter.Item2.Set();
+ }
+ }
+ }
+
+ private void OnTopicPartitionsAssigned(Collection topicPartitions)
+ {
+ foreach (var topicPartition in topicPartitions)
+ {
+ _assignmentWaiters[topicPartition.Partition()].Set();
+ }
+ }
+
+ private void OnTopicPartitionsRevoked(Collection topicPartitions)
+ {
+ foreach (var topicPartition in topicPartitions)
+ {
+ _assignmentWaiters[topicPartition.Partition()].Reset();
+ }
+ }
+
+ private void OnTopicPartitionsLost(Collection topicPartitions)
+ {
+ foreach (var topicPartition in topicPartitions)
+ {
+ _assignmentWaiters[topicPartition.Partition()].Reset();
+ }
+ }
+ [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
+ private void RemoveRecord(TKey key)
+ {
+ ValidateAccessRights(AccessRightsType.Write);
+
+ if (key == null)
+ throw new ArgumentNullException(nameof(key));
+
+ if (UpdateModeOnDelivery)
+ {
+ JVMBridgeException exception = null;
+ DateTime pTimestamp = DateTime.MaxValue;
+ using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false))
+ {
+ using (Callback cb = new Callback()
+ {
+ OnOnCompletion = (record, error) =>
+ {
+ try
+ {
+ if (deliverySemaphore.SafeWaitHandle.IsClosed)
+ return;
+ exception = error;
+ deliverySemaphore.Set();
+ }
+ catch { }
+ }
+ })
+ {
+ _producer.Produce(new KNetProducerRecord(_stateName, key, null), cb);
+ deliverySemaphore.WaitOne();
+ if (exception != null) throw exception;
+ }
+ }
+ }
+ else if (UpdateModeOnConsume || UpdateModeOnConsumeSync)
+ {
+ _producer.Produce(StateName, key, null, (Callback)null);
+ }
+ _dictionary.TryRemove(key, out _);
+ }
+ [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
+ private void AddOrUpdate(TKey key, TValue value)
+ {
+ ValidateAccessRights(AccessRightsType.Write);
+
+ if (key == null)
+ throw new ArgumentNullException(nameof(key));
+
+ if (UpdateModeOnDelivery)
+ {
+ RecordMetadata metadata = null;
+ JVMBridgeException exception = null;
+ DateTime pTimestamp = DateTime.MaxValue;
+ using (AutoResetEvent deliverySemaphore = new AutoResetEvent(false))
+ {
+ using (Callback cb = new Callback()
+ {
+ OnOnCompletion = (record, error) =>
+ {
+ try
+ {
+ if (deliverySemaphore.SafeWaitHandle.IsClosed)
+ return;
+ metadata = record;
+ exception = error;
+ deliverySemaphore.Set();
+ }
+ catch { }
+ }
+ })
+ {
+ _producer.Produce(new KNetProducerRecord(_stateName, key, value), cb);
+ deliverySemaphore.WaitOne();
+ if (exception != null) throw exception;
+ }
+ }
+
+ if (value == null)
+ {
+ _dictionary.TryRemove(key, out var data);
+ OnLocalRemove?.Invoke(this, new KeyValuePair(key, data.Value));
+ }
+ else
+ {
+ ILocalDataStorage data;
+ if (!_dictionary.TryGetValue(key, out data))
+ {
+ data = new LocalDataStorage();
+ _dictionary[key] = data;
+ }
+ lock (data.Lock)
+ {
+ data.Partition = metadata.Partition();
+ data.HasOffset = metadata.HasOffset();
+ data.Offset = metadata.Offset();
+ bool storeValue = true;
+ if (UpdateModeDelayed)
+ {
+ storeValue = (OnDelayedStore != null) ? OnDelayedStore.Invoke(this, new KeyValuePair(key, value)) : false;
+ }
+ if (storeValue)
+ {
+ data.HasValue = true;
+ data.Value = value;
+ }
+ }
+ OnLocalUpdate?.Invoke(this, new KeyValuePair(key, value));
+ }
+ }
+ else if (UpdateModeOnConsume || UpdateModeOnConsumeSync)
+ {
+ _producer.Produce(StateName, key, value, (Callback)null);
+ if (UpdateModeOnConsumeSync)
+ {
+ _OnConsumeSyncWaiter = new Tuple(key, new ManualResetEvent(false));
+ _OnConsumeSyncWaiter.Item2.WaitOne();
+ _OnConsumeSyncWaiter.Item2.Dispose();
+ }
+ }
+ }
+ [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
+ private void ValidateAccessRights(AccessRightsType rights)
+ {
+ if (!_accessrights.HasFlag(rights))
+ throw new InvalidOperationException($"{rights} access flag not set");
+ }
+
+ [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
+ void BuildConsumers()
+ {
+ _consumerConfig ??= ConsumerConfigBuilder.Create().WithEnableAutoCommit(true)
+ .WithAutoOffsetReset(ConsumerConfigBuilder.AutoOffsetResetTypes.EARLIEST)
+ .WithAllowAutoCreateTopics(false);
+
+ ConsumerConfig.BootstrapServers = BootstrapServers;
+ ConsumerConfig.GroupId = GroupId;
+ if (ConsumerConfig.CanApplyBasicDeserializer() && KeySerDes == null)
+ {
+ KeySerDes = new KNetSerDes();
+ }
+
+ if (KeySerDes == null) throw new InvalidOperationException($"{typeof(TKey)} needs an external deserializer, set KeySerDes.");
+
+ if (ConsumerConfig.CanApplyBasicDeserializer() && ValueSerDes == null)
+ {
+ ValueSerDes = new KNetSerDes();
+ }
+
+ if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external deserializer, set ValueSerDes.");
+
+ _assignmentWaiters = new ManualResetEvent[_partitions];
+ _lastPartitionLags = new long[_partitions];
+ _consumers = new KNetConsumer[_partitions];
+
+ for (int i = 0; i < _partitions; i++)
+ {
+ _assignmentWaiters[i] = new ManualResetEvent(false);
+ _lastPartitionLags[i] = -1;
+ _consumers[i] = new KNetConsumer(ConsumerConfig, KeySerDes, ValueSerDes);
+ _consumers[i].SetCallback(OnMessage);
+ }
+ _consumerListener = new ConsumerRebalanceListener()
+ {
+ OnOnPartitionsRevoked = OnTopicPartitionsRevoked,
+ OnOnPartitionsAssigned = OnTopicPartitionsAssigned,
+ OnOnPartitionsLost = OnTopicPartitionsLost
+ };
+ }
+
+ [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
+ void BuildOnTheFlyConsumer()
+ {
+ if (_onTheFlyConsumer == null)
+ {
+ ConsumerConfigBuilder consumerConfigBuilder = ConsumerConfigBuilder.CreateFrom(_consumerConfig);
+ consumerConfigBuilder.WithEnableAutoCommit(false).WithGroupId(Guid.NewGuid().ToString());
+
+ _onTheFlyConsumer = new KNetConsumer(consumerConfigBuilder, KeySerDes, ValueSerDes);
+ }
+ }
+ [System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
+ void BuildProducer()
+ {
+ _producerConfig ??= ProducerConfigBuilder.Create().WithAcks(ProducerConfigBuilder.AcksTypes.All)
+ .WithRetries(0)
+ .WithLingerMs(1);
+
+ ProducerConfig.BootstrapServers = BootstrapServers;
+ if (ProducerConfig.CanApplyBasicSerializer() && KeySerDes == null)
+ {
+ KeySerDes = new KNetSerDes();
+ }
+
+ if (KeySerDes == null) throw new InvalidOperationException($"{typeof(TKey)} needs an external serializer, set KeySerDes.");
+
+ if (ProducerConfig.CanApplyBasicSerializer() && ValueSerDes == null)
+ {
+ ValueSerDes = new KNetSerDes();
+ }
+
+ if (ValueSerDes == null) throw new InvalidOperationException($"{typeof(TValue)} needs an external serializer, set ValueSerDes.");
+
+ _producer = new KNetProducer(ProducerConfig, KeySerDes, ValueSerDes);
+ }
+
+ void ConsumerPollHandler(object o)
+ {
+ int index = (int)o;
+ _consumers[index].Subscribe(Collections.Singleton(StateName), _consumerListener);
+ while (_consumerPollRun)
+ {
+ try
+ {
+ _consumers[index].ConsumeAsync(100);
+ if (_assignmentWaiters[index].WaitOne(0))
+ {
+ try
+ {
+ var lag = _consumers[index].CurrentLag(new TopicPartition(StateName, index));
+ Interlocked.Exchange(ref _lastPartitionLags[index], lag.IsPresent() ? lag.AsLong : -1);
+ }
+ catch (Java.Lang.IllegalStateException)
+ {
+ Interlocked.Exchange(ref _lastPartitionLags[index], -1);
+ }
+ }
+ }
+ catch { }
+ }
+ }
+
+ #endregion
+
+ #region Public methods
+ ///
+ /// Start this : create the topic if not available, allocates Producer and Consumer, sets serializer/deserializer
+ ///
+ /// Some errors occurred
+ public void Start()
+ {
+ if (string.IsNullOrWhiteSpace(BootstrapServers)) throw new InvalidOperationException("BootstrapServers must be set before start.");
+ if (string.IsNullOrWhiteSpace(StateName)) throw new InvalidOperationException("StateName must be set before start.");
+
+ Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(BootstrapServers).ToProperties();
+ _admin = KafkaAdminClient.Create(props);
+
+ if (AccessRights.HasFlag(AccessRightsType.Write))
+ {
+ var topic = new NewTopic(StateName, Partitions, ReplicationFactor);
+ _topicConfig ??= TopicConfigBuilder.Create().WithDeleteRetentionMs(100)
+ .WithMinCleanableDirtyRatio(0.01)
+ .WithSegmentMs(100)
+ .WithRetentionBytes(1073741824);
+
+ TopicConfig.CleanupPolicy = TopicConfigBuilder.CleanupPolicyTypes.Compact | TopicConfigBuilder.CleanupPolicyTypes.Delete;
+ topic = topic.Configs(TopicConfig);
+ try
+ {
+ _admin.CreateTopic(topic);
+ }
+ catch (TopicExistsException)
+ {
+ // recover partitions of the topic
+ try
+ {
+ var result = _admin.DescribeTopics(Collections.Singleton(StateName));
+ if (result != null)
+ {
+ var map = result.AllTopicNames().Get();
+ if (map != null)
+ {
+ var topicDesc = map.Get(StateName);
+ _partitions = topicDesc.Partitions().Size();
+ }
+ }
+ }
+ catch
+ {
+
+ }
+ }
+ }
+
+ if (AccessRights.HasFlag(AccessRightsType.Read))
+ {
+ BuildConsumers();
+ }
+
+ if (AccessRights.HasFlag(AccessRightsType.Write))
+ {
+ BuildProducer();
+ }
+
+ if (_consumers != null)
+ {
+ _consumerPollRun = true;
+ _consumerPollThreads = new Thread[_partitions];
+ for (int i = 0; i < _partitions; i++)
+ {
+ _consumerPollThreads[i] = new Thread(ConsumerPollHandler);
+ _consumerPollThreads[i].Start(i);
+ }
+ }
+
+ _started = true;
+ }
+
+
+ ///
+ /// Start this : create the topic if not available, allocates Producer and Consumers, sets serializer/deserializer
+ /// Then waits its synchronization with topic which stores dictionary data
+ ///
+ /// Some errors occurred or the provided do not include the flag
+ public void StartAndWait(int timeout = Timeout.Infinite)
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ Start();
+ WaitForStateAssignment(timeout);
+ SyncWait(timeout);
+ }
+
+ ///
+ /// Waits for all paritions assignment of the topic which stores dictionary data
+ ///
+ /// The number of milliseconds to wait, or to wait indefinitely
+ /// if the current instance receives a signal within the given ; otherwise,
+ /// The provided do not include the flag
+ public bool WaitForStateAssignment(int timeout = Timeout.Infinite)
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ return WaitHandle.WaitAll(_assignmentWaiters, timeout);
+ }
+ ///
+ /// Waits that is synchronized to the topic which stores dictionary data
+ ///
+ /// The number of milliseconds to wait, or to wait indefinitely
+ /// if the current instance synchronize within the given ; otherwise,
+ /// The provided do not include the flag
+ public void SyncWait(int timeout = Timeout.Infinite)
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ Stopwatch watcher = Stopwatch.StartNew();
+ bool sync = false;
+ while (!sync && watcher.ElapsedMilliseconds < (uint)timeout)
+ {
+ for (int i = 0; i < _partitions; i++)
+ {
+ sync = _consumers[i].IsEmpty && (Interlocked.Read(ref _lastPartitionLags[i]) == 0 || Interlocked.Read(ref _lastPartitionLags[i]) == -1) ;
+ }
+ }
+ }
+
+ ///
+ /// Waits until all outstanding produce requests and delivery report callbacks are completed
+ ///
+ public void Flush()
+ {
+ ValidateAccessRights(AccessRightsType.Write);
+ _producer?.Flush();
+ }
+
+ #endregion
+
+ #region IDictionary
+
+ ///
+ /// Gets or sets the element with the specified keyy. value removes the specified key
+ ///
+ /// The key of the element to get or set
+ /// The element with the specified key
+ /// The call is get, and the provided do not include the flag
+ /// The call is set, and the provided do not include the flag
+ /// is null
+ /// The call is get and is not found
+ public TValue this[TKey key]
+ {
+ get
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ BuildOnTheFlyConsumer();
+ if (!new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).TryGetValue(key, out var data))
+ {
+ throw new IndexOutOfRangeException($"Key {key} not available locally");
+ }
+ return data;
+ }
+ set { AddOrUpdate(key, value); }
+ }
+
+ ///
+ /// Gets an containing the keys of this
+ ///
+ /// containing the keys of this
+ /// The provided do not include the flag
+ public System.Collections.Generic.ICollection Keys
+ {
+ get
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ return _dictionary.Keys;
+ }
+ }
+
+ ///
+ /// Gets an containing the values of this
+ ///
+ /// containing the values of this
+ /// The provided do not include the flag
+ public System.Collections.Generic.ICollection Values
+ {
+ get
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ BuildOnTheFlyConsumer();
+ return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).Values();
+ }
+ }
+
+ ///
+ /// Gets the number of elements contained in this
+ ///
+ /// The number of elements contained in this
+ /// The provided do not include the flag
+ public int Count
+ {
+ get
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ return _dictionary.Count;
+ }
+ }
+
+ ///
+ /// if do not include the flag
+ ///
+ public bool IsReadOnly
+ {
+ get { return !_accessrights.HasFlag(AccessRightsType.Write); }
+ }
+
+ ///
+ /// Adds or updates the in this and others in the way defined by the provided
+ ///
+ /// The object to use as the key of the element to add
+ /// The object to use as the value of the element to add. null means remove
+ /// is null
+ /// The provided do not include the flag
+ public void Add(TKey key, TValue value)
+ {
+ AddOrUpdate(key, value);
+ }
+
+ ///
+ /// Adds or updates the in this and others in the way defined by the provided
+ ///
+ /// The item to add or updates. Value == null means remove key
+ /// .Key is null
+ /// The provided do not include the flag
+ public void Add(KeyValuePair item)
+ {
+ AddOrUpdate(item.Key, item.Value);
+ }
+
+ ///
+ /// Clears this , resetting all partitions' sync
+ ///
+ /// The provided do not include the flag
+ public void Clear()
+ {
+ ValidateAccessRights(AccessRightsType.Write);
+ _dictionary.Clear();
+ }
+
+ ///
+ /// Determines whether this contains the specified item
+ ///
+ /// The item to locate in this
+ /// if this contains an element ; otherwise,
+ /// .Key is
+ /// The provided do not include the flag
+ public bool Contains(KeyValuePair item)
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ BuildOnTheFlyConsumer();
+ return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).Contains(item);
+ }
+
+ ///
+ /// Determines whether this contains an element with the specified key
+ ///
+ /// The key to locate in this
+ /// if this contains an element with ; otherwise,
+ /// is
+ /// The provided do not include the flag
+ public bool ContainsKey(TKey key)
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ return _dictionary.ContainsKey(key);
+ }
+
+ ///
+ /// Copies the elements of this to an , starting at a particular index
+ ///
+ /// The one-dimensional that is the destination of the elements copied
+ /// from this . The must have zero-based indexing
+ /// The zero-based index in array at which copying beginsù
+ /// is null
+ /// is less than zero
+ /// is multidimensional.
+ /// -or- The number of elements in the source is greater than the available space from to the end of the destination .
+ /// -or- The type of the source cannot be cast automatically to the type of the destination
+ /// The provided do not include the flag
+ public void CopyTo(KeyValuePair[] array, int arrayIndex)
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ BuildOnTheFlyConsumer();
+ new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).CopyTo(array, arrayIndex);
+ }
+
+ ///
+ /// Returns an enumerator that iterates through this
+ ///
+ /// An enumerator for this
+ /// The provided do not include the flag
+ public IEnumerator> GetEnumerator()
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ BuildOnTheFlyConsumer();
+ return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName);
+ }
+
+ ///
+ /// Removes the from this and others in the way defined by the provided
+ ///
+ /// The key of the element to remove
+ /// if the removal request is delivered to the others
+ /// is
+ /// The provided do not include the flag
+ public bool Remove(TKey key)
+ {
+ AddOrUpdate(key, null);
+
+ return true;
+ }
+
+ ///
+ /// Removes the from this and others in the way defined by the provided
+ ///
+ /// Item to be removed
+ /// if the removal request is delivered to the others
+ /// .Key is
+ /// The provided do not include the flag
+ public bool Remove(KeyValuePair item)
+ {
+ AddOrUpdate(item.Key, null);
+
+ return true;
+ }
+
+ ///
+ /// Attempts to get the value associated with the specified from this
+ ///
+ /// The key of the value to get
+ /// When this method returns, contains the object from this
+ /// that has the specified , or the default value of the type if the operation failed
+ /// if the was found in this ; otherwise,
+ /// is
+ /// The provided do not include the flag
+ public bool TryGetValue(TKey key, out TValue value)
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ BuildOnTheFlyConsumer();
+ return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName).TryGetValue(key, out value);
+ }
+
+ /// The provided do not include the flag
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ ValidateAccessRights(AccessRightsType.Read);
+ BuildOnTheFlyConsumer();
+ return new LocalDataStorageEnumerator(_dictionary, _onTheFlyConsumer, StateName);
+ }
+
+ #endregion
+
+ #region IDisposable
+
+ ///
+ /// Release all managed and unmanaged resources
+ ///
+ public void Dispose()
+ {
+ _consumerPollRun = false;
+
+ if (_consumers != null)
+ {
+ foreach (var item in _consumers)
+ {
+ item?.Dispose();
+ }
+ }
+
+ _producer?.Flush();
+ _producer?.Dispose();
+
+ if (_assignmentWaiters != null)
+ {
+ foreach (var item in _assignmentWaiters)
+ {
+ item?.Close();
+ }
+ }
+ }
+
+ #endregion
+ }
+ #endregion
+}
diff --git a/tests/KNetCompactedReplicatorTest/Program.cs b/tests/KNetCompactedReplicatorTest/Program.cs
index 61f65b0210..86c38524ff 100644
--- a/tests/KNetCompactedReplicatorTest/Program.cs
+++ b/tests/KNetCompactedReplicatorTest/Program.cs
@@ -17,19 +17,18 @@
*/
using Java.Util;
-using MASES.KNet;
using MASES.KNet.Admin;
using MASES.KNet.Common;
using MASES.KNet.Consumer;
using MASES.KNet.Extensions;
using MASES.KNet.Producer;
+using MASES.KNet.Replicator;
using MASES.KNet.Serialization;
using MASES.KNet.Serialization.Json;
using MASES.KNet.TestCommon;
using Org.Apache.Kafka.Clients.Admin;
using Org.Apache.Kafka.Clients.Consumer;
using Org.Apache.Kafka.Clients.Producer;
-using Org.Apache.Kafka.Common.Config;
using System;
using System.Threading;
@@ -46,8 +45,6 @@ class Program
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new(false);
- static KNetCompactedReplicator _replicator = null;
-
public class TestType
{
public TestType(int i)
@@ -75,18 +72,9 @@ static void Main(string[] args)
serverToUse = args[0];
}
- _replicator = new KNetCompactedReplicator()
- {
- UpdateMode = KNetCompactedReplicator.UpdateModeTypes.OnConsumeSync,
- BootstrapServers = serverToUse,
- StateName = "TestState",
- ValueSerDes = new JsonSerDes(),
- };
-
- _replicator.Start();
- _replicator.WaitForStateAssignment();
+ Test("TestOnConsume", 100, UpdateModeTypes.OnConsume | UpdateModeTypes.Delayed);
- _replicator["Test"] = new TestType(100);
+ Test("TestOnDelivery", 100, UpdateModeTypes.OnDelivery | UpdateModeTypes.Delayed);
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
@@ -99,196 +87,30 @@ private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs
if (e.Cancel) resetEvent.Set();
}
- static void CreateTopic()
+ private static void Test(string topicName, int length, UpdateModeTypes type)
{
- try
- {
- string topicName = topicToUse;
- int partitions = 1;
- short replicationFactor = 1;
-
- var topic = new NewTopic(topicName, partitions, replicationFactor);
-
- /**** Direct mode ******
- var map = Collections.SingletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
- topic.Configs(map);
- *********/
- topic = topic.Configs(TopicConfigBuilder.Create().WithCleanupPolicy(TopicConfigBuilder.CleanupPolicyTypes.Compact | TopicConfigBuilder.CleanupPolicyTypes.Delete)
- .WithDeleteRetentionMs(100)
- .WithMinCleanableDirtyRatio(0.01)
- .WithSegmentMs(100));
-
- var coll = Collections.Singleton(topic);
-
- /**** Direct mode ******
- Properties props = new Properties();
- props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
- *******/
-
- Properties props = AdminClientConfigBuilder.Create().WithBootstrapServers(serverToUse).ToProperties();
-
- using (IAdmin admin = KafkaAdminClient.Create(props))
- {
- /******* standard
- // Create a compacted topic
- CreateTopicsResult result = admin.CreateTopics(coll);
-
- // Call values() to get the result for a specific topic
- var future = result.Values.Get(topicName);
-
- // Call get() to block until the topic creation is complete or has failed
- // if creation failed the ExecutionException wraps the underlying cause.
- future.Get();
- ********/
- admin.CreateTopic(topicName, partitions, replicationFactor);
- }
- }
- catch (Java.Util.Concurrent.ExecutionException ex)
- {
- Console.WriteLine(ex.InnerException.Message);
- }
- catch (Exception e)
+ using (var replicator = new KNetCompactedReplicator()
{
- Console.WriteLine(e.Message);
- }
- }
-
- static void ProduceSomething()
- {
- try
+ Partitions = 5,
+ UpdateMode = type,
+ BootstrapServers = serverToUse,
+ StateName = topicName,
+ ValueSerDes = new JsonSerDes(),
+ })
{
- /**** Direct mode ******
- Properties props = new Properties();
- props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
- props.Put(ProducerConfig.ACKS_CONFIG, "all");
- props.Put(ProducerConfig.RETRIES_CONFIG, 0);
- props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
- ******/
-
- Properties props = ProducerConfigBuilder.Create()
- .WithBootstrapServers(serverToUse)
- .WithAcks(ProducerConfigBuilder.AcksTypes.All)
- .WithRetries(0)
- .WithLingerMs(1)
- .ToProperties();
+ replicator.StartAndWait();
- KNetSerDes keySerializer = new KNetSerDes();
- JsonSerDes valueSerializer = new JsonSerDes();
- try
- {
- using (var producer = new KNetProducer(props, keySerializer, valueSerializer))
- {
- int i = 0;
- Callback callback = null;
- if (useCallback)
- {
- callback = new Callback()
- {
- OnOnCompletion = (o1, o2) =>
- {
- if (o2 != null) Console.WriteLine(o2.ToString());
- else Console.WriteLine($"Produced on topic {o1.Topic()} at offset {o1.Offset()}");
- }
- };
- }
- try
- {
- while (!resetEvent.WaitOne(0))
- {
- var record = new KNetProducerRecord(topicToUse, i.ToString(), new TestType(i));
- var result = useCallback ? producer.Send(record, callback) : producer.Send(record);
- Console.WriteLine($"Producing: {record} with result: {result.Get()}");
- producer.Flush();
- i++;
- }
- }
- finally { if (useCallback) callback.Dispose(); }
- }
- }
- finally
+ for (int i = 0; i < length; i++)
{
- keySerializer?.Dispose();
- valueSerializer?.Dispose();
+ replicator[i] = new TestType(i);
}
- }
- catch (Java.Util.Concurrent.ExecutionException ex)
- {
- Console.WriteLine("Producer ended with error: {0}", ex.InnerException.Message);
- }
- catch (Exception ex)
- {
- Console.WriteLine("Producer ended with error: {0}", ex.Message);
- }
- }
- static void ConsumeSomething()
- {
- try
- {
- /**** Direct mode ******
- Properties props = new Properties();
- props.Put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
- props.Put(ConsumerConfig.GROUP_ID_CONFIG, "test");
- props.Put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.Put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- *******/
+ replicator.SyncWait();
- Properties props = ConsumerConfigBuilder.Create()
- .WithBootstrapServers(serverToUse)
- .WithGroupId("test")
- .WithEnableAutoCommit(true)
- .WithAutoCommitIntervalMs(1000)
- .ToProperties();
-
- KNetSerDes keyDeserializer = new KNetSerDes();
- KNetSerDes valueDeserializer = new JsonSerDes();
- ConsumerRebalanceListener rebalanceListener = null;
- KNetConsumer consumer = null;
-
- if (useCallback)
- {
- rebalanceListener = new ConsumerRebalanceListener()
- {
- OnOnPartitionsRevoked = (o) =>
- {
- Console.WriteLine("Revoked: {0}", o.ToString());
- },
- OnOnPartitionsAssigned = (o) =>
- {
- Console.WriteLine("Assigned: {0}", o.ToString());
- }
- };
- }
- try
+ foreach (var item in replicator)
{
- using (consumer = new KNetConsumer(props, keyDeserializer, valueDeserializer))
- {
- if (useCallback) consumer.Subscribe(Collections.Singleton(topicToUse), rebalanceListener);
- else consumer.Subscribe(Collections.Singleton(topicToUse));
-
- while (!resetEvent.WaitOne(0))
- {
- var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
- foreach (var item in records)
- {
- Console.WriteLine($"Consuming from Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}");
- }
- }
- }
+ Console.WriteLine($"Key: {item.Key} - Value: {item.Value}");
}
- finally
- {
- keyDeserializer?.Dispose();
- valueDeserializer?.Dispose();
- }
- }
- catch (Java.Util.Concurrent.ExecutionException ex)
- {
- Console.WriteLine("Consumer ended with error: {0}", ex.InnerException.Message);
- }
- catch (Exception ex)
- {
- Console.WriteLine("Consumer ended with error: {0}", ex.Message);
}
}
}