diff --git a/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs b/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs index 3bfce2b9..bb892bff 100644 --- a/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs @@ -117,4 +117,10 @@ public static int NumPartitions(this IEntityType entityType, KafkaOptionsExtensi var numPartitions = options.DefaultNumPartitions; return numPartitions; } + + public static int? ConsumerInstances(this IEntityType entityType, KafkaOptionsExtension options) + { + var consumerInstances = options.DefaultConsumerInstances; + return consumerInstances; + } } diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs index 68b9d979..33b56157 100644 --- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs @@ -36,10 +36,14 @@ public interface IKafkaSingletonOptions : ISingletonOptions bool ProducerByEntity { get; } + bool UseCompactedReplicator { get; } + bool UsePersistentStorage { get; } int DefaultNumPartitions { get; } + int? DefaultConsumerInstances { get; } + int DefaultReplicationFactor { get; } ProducerConfigBuilder? ProducerConfigBuilder { get; } diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index e08853fd..ca3ece5a 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -20,7 +20,6 @@ using Java.Lang; using Java.Util; -using MASES.JCOBridge.C2JBridge; using MASES.KNet.Common; using MASES.KNet.Producer; using MASES.KNet.Streams; @@ -28,7 +27,6 @@ using Org.Apache.Kafka.Clients.Producer; using Org.Apache.Kafka.Streams; using System.Globalization; -using System.Text; namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; @@ -39,8 +37,10 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension private string? _applicationId; private string? _bootstrapServers; private bool _producerByEntity = false; + private bool _useCompactedReplicator = false; private bool _usePersistentStorage = false; private int _defaultNumPartitions = 1; + private int? _defaultConsumerInstances = null; private short _defaultReplicationFactor = 1; private ProducerConfigBuilder? _producerConfigBuilder; private StreamsConfigBuilder? _streamsConfigBuilder; @@ -61,8 +61,10 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) _applicationId = copyFrom._applicationId; _bootstrapServers = copyFrom._bootstrapServers; _producerByEntity = copyFrom._producerByEntity; + _useCompactedReplicator = copyFrom._useCompactedReplicator; _usePersistentStorage = copyFrom._usePersistentStorage; _defaultNumPartitions = copyFrom._defaultNumPartitions; + _defaultConsumerInstances = copyFrom._defaultConsumerInstances; _defaultReplicationFactor = copyFrom._defaultReplicationFactor; _producerConfigBuilder = ProducerConfigBuilder.CreateFrom(copyFrom._producerConfigBuilder); _streamsConfigBuilder = StreamsConfigBuilder.CreateFrom(copyFrom._streamsConfigBuilder); @@ -85,10 +87,14 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) public virtual bool ProducerByEntity => _producerByEntity; + public virtual bool UseCompactedReplicator => _useCompactedReplicator; + public virtual bool UsePersistentStorage => _usePersistentStorage; public virtual int DefaultNumPartitions => _defaultNumPartitions; + public virtual int? DefaultConsumerInstances => _defaultConsumerInstances; + public virtual short DefaultReplicationFactor => _defaultReplicationFactor; public virtual ProducerConfigBuilder ProducerConfigBuilder => _producerConfigBuilder!; @@ -142,6 +148,15 @@ public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity return clone; } + public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = false) + { + var clone = Clone(); + + clone._useCompactedReplicator = useCompactedReplicator; + + return clone; + } + public virtual KafkaOptionsExtension WithUsePersistentStorage(bool usePersistentStorage = false) { var clone = Clone(); @@ -160,6 +175,15 @@ public virtual KafkaOptionsExtension WithDefaultNumPartitions(int defaultNumPart return clone; } + public virtual KafkaOptionsExtension WithDefaultConsumerInstances(int? defaultConsumerInstances = null) + { + var clone = Clone(); + + clone._defaultConsumerInstances = defaultConsumerInstances; + + return clone; + } + public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultReplicationFactor = 1) { var clone = Clone(); diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs index 585d9367..ced2c8b6 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs @@ -35,8 +35,10 @@ public virtual void Initialize(IDbContextOptions options) ApplicationId = kafkaOptions.ApplicationId; BootstrapServers = kafkaOptions.BootstrapServers; ProducerByEntity = kafkaOptions.ProducerByEntity; + UseCompactedReplicator = kafkaOptions.UseCompactedReplicator; UsePersistentStorage = kafkaOptions.UsePersistentStorage; DefaultNumPartitions = kafkaOptions.DefaultNumPartitions; + DefaultConsumerInstances = kafkaOptions.DefaultConsumerInstances; DefaultReplicationFactor = kafkaOptions.DefaultReplicationFactor; ProducerConfigBuilder = ProducerConfigBuilder.CreateFrom(kafkaOptions.ProducerConfigBuilder); StreamsConfigBuilder = StreamsConfigBuilder.CreateFrom(kafkaOptions.StreamsConfigBuilder); @@ -68,10 +70,14 @@ public virtual void Validate(IDbContextOptions options) public virtual bool ProducerByEntity { get; private set; } + public virtual bool UseCompactedReplicator { get; private set; } + public virtual bool UsePersistentStorage { get; private set; } public virtual int DefaultNumPartitions { get; private set; } + public virtual int? DefaultConsumerInstances { get; private set; } + public virtual int DefaultReplicationFactor { get; private set; } public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; private set; } diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs index 959ef782..7972863d 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs @@ -19,6 +19,7 @@ * Refer to LICENSE for more information. */ +using MASES.KNet; using MASES.KNet.Common; using MASES.KNet.Producer; using MASES.KNet.Streams; @@ -30,41 +31,66 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure; /// public class KafkaDbContext : DbContext { + /// + public KafkaDbContext() + { + + } + /// + public KafkaDbContext(DbContextOptions options) : base(options) + { + + } + /// /// The bootstrap servers of the Apache Kafka cluster /// - public string? BootstrapServers { get; set; } + public virtual string? BootstrapServers { get; set; } /// /// The application id /// - public string ApplicationId { get; set; } = Guid.NewGuid().ToString(); + public virtual string ApplicationId { get; set; } = Guid.NewGuid().ToString(); /// /// Database name /// - public string? DbName { get; set; } + public virtual string? DbName { get; set; } /// /// Database number of partitions /// - public int DefaultNumPartitions { get; set; } = 10; + public virtual int DefaultNumPartitions { get; set; } = 10; /// /// Database replication factor /// - public short DefaultReplicationFactor { get; set; } = 1; + public virtual short DefaultReplicationFactor { get; set; } = 1; + /// + /// Database consumr instances used in conjunction with + /// + public virtual int? DefaultConsumerInstances { get; set; } = null; /// /// Use persistent storage /// - public bool UsePersistentStorage { get; set; } = false; + public virtual bool UsePersistentStorage { get; set; } = false; /// /// Use a producer for each Entity /// public bool UseProducerByEntity { get; set; } = false; - - public ProducerConfigBuilder? ProducerConfigBuilder { get; set; } - - public StreamsConfigBuilder? StreamsConfigBuilder { get; set; } - - public TopicConfigBuilder? TopicConfigBuilder { get; set; } - + /// + /// Use instead of Apache Kafka Streams + /// + public virtual bool UseCompactedReplicator { get; set; } = false; + /// + /// The optional + /// + public virtual ProducerConfigBuilder? ProducerConfigBuilder { get; set; } + /// + /// The optional + /// + public virtual StreamsConfigBuilder? StreamsConfigBuilder { get; set; } + /// + /// The optional + /// + public virtual TopicConfigBuilder? TopicConfigBuilder { get; set; } + /// protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { if (BootstrapServers == null) @@ -76,9 +102,10 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) => { - o.StreamsConfig(StreamsConfigBuilder??o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions); + o.StreamsConfig(StreamsConfigBuilder ?? o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions); o.WithUsePersistentStorage(UsePersistentStorage); o.WithProducerByEntity(UseProducerByEntity); + o.WithCompactedReplicator(UseCompactedReplicator); o.WithDefaultReplicationFactor(DefaultReplicationFactor); }); } diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs index e79c6138..7271b5a3 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs @@ -114,6 +114,27 @@ public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerBy return this; } + /// + /// Enables use of + /// + /// + /// See Using DbContextOptions, and + /// The EF Core Kafka database provider for more information and examples. + /// + /// If then will be used instead of Apache Kafka Streams. + /// The same builder instance so that multiple calls can be chained. + public virtual KafkaDbContextOptionsBuilder WithCompactedReplicator(bool useCompactedReplicator = false) + { + var extension = OptionsBuilder.Options.FindExtension() + ?? new KafkaOptionsExtension(); + + extension = extension.WithCompactedReplicator(useCompactedReplicator); + + ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension); + + return this; + } + /// /// Enables use of persistent storage, otherwise a storage will be in-memory /// @@ -156,6 +177,27 @@ public virtual KafkaDbContextOptionsBuilder WithDefaultNumPartitions(int default return this; } + /// + /// Defines the default number of consumer instances to be used in conjunction with + /// + /// + /// See Using DbContextOptions, and + /// The EF Core Kafka database provider for more information and examples. + /// + /// The default number of consumer instances to be used in conjunction with + /// The same builder instance so that multiple calls can be chained. + public virtual KafkaDbContextOptionsBuilder WithDefaultConsumerInstances(int? defaultConsumerInstances = null) + { + var extension = OptionsBuilder.Options.FindExtension() + ?? new KafkaOptionsExtension(); + + extension = extension.WithDefaultConsumerInstances(defaultConsumerInstances); + + ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension); + + return this; + } + /// /// Defines the default replication factor to use when a new topic is created /// diff --git a/src/net/KEFCore/KEFCore.csproj b/src/net/KEFCore/KEFCore.csproj index 52d4f510..ce0f6f01 100644 --- a/src/net/KEFCore/KEFCore.csproj +++ b/src/net/KEFCore/KEFCore.csproj @@ -13,7 +13,7 @@ README.md enable true - False + True False @@ -66,10 +66,10 @@ - + All None - + diff --git a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs b/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs index 5fc790bf..21b0c78d 100644 --- a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs +++ b/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs @@ -22,12 +22,20 @@ namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal { public interface IKafkaSerdesEntityType { + string Serialize(params object?[]? args); + string Serialize(Headers headers, params object?[]? args); + string Serialize(TKey key); + string Serialize(Headers headers, TKey key); + object[] Deserialize(string arg); + object[] Deserialize(Headers headers, string arg); + TKey Deserialize(string arg); + TKey Deserialize(Headers headers, string arg); object[] ConvertData(object[]? input); diff --git a/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs b/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs index ba5806da..4a666b5c 100644 --- a/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs +++ b/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs @@ -50,16 +50,28 @@ public KafkaSerdesEntityType(IEntityType type) _properties = _type.GetProperties().ToArray(); } + public object[] Deserialize(string arg) + { + var des = GetFullType(arg); + return ConvertData(des!.data); + } + public object[] Deserialize(Headers headers, string arg) { var des = GetFullType(arg); return ConvertData(des!.data); } + public TKey Deserialize(string arg) => System.Text.Json.JsonSerializer.Deserialize(arg)!; + public TKey Deserialize(Headers headers, string arg) => System.Text.Json.JsonSerializer.Deserialize(arg)!; + public string Serialize(params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!)); + public string Serialize(Headers headers, params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!)); + public string Serialize(TKey key) => System.Text.Json.JsonSerializer.Serialize(key); + public string Serialize(Headers headers, TKey key) => System.Text.Json.JsonSerializer.Serialize(key); public static KafkaSerdesEntityTypeData? GetFullType(string arg) => System.Text.Json.JsonSerializer.Deserialize(arg); diff --git a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs index 5cdaa59a..fe833f8c 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs @@ -19,12 +19,14 @@ using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; +using MASES.KNet; using MASES.KNet.Producer; +using MASES.KNet.Replicator; using Org.Apache.Kafka.Clients.Producer; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; -public interface IKafkaCluster +public interface IKafkaCluster :IDisposable { bool EnsureDeleted( IUpdateAdapterFactory updateAdapterFactory, @@ -46,6 +48,8 @@ bool EnsureConnected( IKafkaSerdesEntityType CreateSerdes(IEntityType entityType); + IKNetCompactedReplicator CreateCompactedReplicator(IEntityType entityType); + IProducer CreateProducer(IEntityType entityType); IEnumerable GetData(IEntityType entityType); diff --git a/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs b/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs index 6af8feee..e2aee765 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs @@ -18,7 +18,7 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; -public interface IKafkaDatabase : IDatabase +public interface IKafkaDatabase : IDatabase, IDisposable { IKafkaCluster Cluster { get; } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs new file mode 100644 index 00000000..33e96f29 --- /dev/null +++ b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs @@ -0,0 +1,32 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +#nullable enable + +using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; +using Java.Util.Concurrent; +using Org.Apache.Kafka.Clients.Producer; +using MASES.KNet.Producer; +using MASES.EntityFrameworkCore.KNet.Serdes.Internal; + +namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; + +public interface IKafkaRowBag +{ + IUpdateEntry UpdateEntry { get; } +} diff --git a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs index 78e97717..e584c7bc 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs @@ -25,7 +25,7 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; -public interface IKafkaTable +public interface IKafkaTable : IDisposable { IReadOnlyList SnapshotRows(); @@ -33,13 +33,13 @@ public interface IKafkaTable IEnumerable Rows { get; } - ProducerRecord Create(IUpdateEntry entry); + IKafkaRowBag Create(IUpdateEntry entry); - ProducerRecord Delete(IUpdateEntry entry); + IKafkaRowBag Delete(IUpdateEntry entry); - ProducerRecord Update(IUpdateEntry entry); + IKafkaRowBag Update(IUpdateEntry entry); - IEnumerable> Commit(IEnumerable> records); + IEnumerable> Commit(IEnumerable records); KafkaIntegerValueGenerator GetIntegerValueGenerator( IProperty property, diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs index 65528daf..6948f173 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs @@ -32,6 +32,10 @@ using Org.Apache.Kafka.Common.Errors; using MASES.KNet.Serialization; using MASES.KNet.Extensions; +using MASES.KNet; +using Org.Apache.Kafka.Common; +using MASES.KNet.Replicator; +using Org.Apache.Kafka.Tools; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -65,6 +69,18 @@ public KafkaCluster( _kafkaAdminClient = KafkaAdminClient.Create(props); } + public virtual void Dispose() + { + if (_tables != null) + { + foreach (var item in _tables?.Values) + { + item?.Dispose(); + } + _kafkaAdminClient?.Dispose(); + } + } + public virtual IKafkaSerdesFactory SerdesFactory => _serdesFactory; public virtual KafkaOptionsExtension Options => _options; @@ -89,13 +105,16 @@ public virtual bool EnsureDeleted( { lock (_lock) { + var coll = new ArrayList(); + foreach (var entityType in designModel.GetEntityTypes()) + { + var topic = entityType.TopicName(Options); + + coll.Add(topic); + } + try { - var coll = new ArrayList(); - foreach (var entityType in designModel.GetEntityTypes()) - { - coll.Add(entityType.TopicName(_options)); - } var result = _kafkaAdminClient.DeleteTopics(coll); result.All().Get(); } @@ -164,9 +183,10 @@ public virtual bool CreateTable(IEntityType entityType) { try { - var topic = new NewTopic(entityType.TopicName(_options), entityType.NumPartitions(_options), entityType.ReplicationFactor(_options)); - _options.TopicConfigBuilder.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete; - var map = _options.TopicConfigBuilder.ToMap(); + var topic = new NewTopic(entityType.TopicName(Options), entityType.NumPartitions(Options), entityType.ReplicationFactor(Options)); + Options.TopicConfigBuilder.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete; + Options.TopicConfigBuilder.RetentionBytes = 1024 * 1024 * 1024; + var map = Options.TopicConfigBuilder.ToMap(); topic.Configs(map); var coll = Collections.Singleton(topic); var result = _kafkaAdminClient.CreateTopics(coll); @@ -191,6 +211,24 @@ public virtual bool CreateTable(IEntityType entityType) public virtual IKafkaSerdesEntityType CreateSerdes(IEntityType entityType) => _serdesFactory.GetOrCreate(entityType); + public virtual IKNetCompactedReplicator CreateCompactedReplicator(IEntityType entityType) + { + lock (_lock) + { + return new KNetCompactedReplicator() + { + UpdateMode = UpdateModeTypes.OnConsume, + BootstrapServers = Options.BootstrapServers, + StateName = entityType.TopicName(Options), + Partitions = entityType.NumPartitions(Options), + ConsumerInstances = entityType.ConsumerInstances(Options), + ReplicationFactor = entityType.ReplicationFactor(Options), + TopicConfig = Options.TopicConfigBuilder, + ProducerConfig = Options.ProducerConfigBuilder, + }; + } + } + public virtual IProducer CreateProducer(IEntityType entityType) { if (!Options.ProducerByEntity) @@ -240,7 +278,7 @@ public virtual int ExecuteTransaction( IDiagnosticsLogger updateLogger) { var rowsAffected = 0; - System.Collections.Generic.Dictionary>> _dataInTransaction = new(); + System.Collections.Generic.Dictionary> dataInTransaction = new(); lock (_lock) { @@ -254,7 +292,7 @@ public virtual int ExecuteTransaction( var table = EnsureTable(entityType); - ProducerRecord record; + IKafkaRowBag record; if (entry.SharedIdentityEntry != null) { @@ -281,10 +319,10 @@ record = table.Update(entry); continue; } - if (!_dataInTransaction.TryGetValue(table, out System.Collections.Generic.IList>? recordList)) + if (!dataInTransaction.TryGetValue(table, out System.Collections.Generic.IList? recordList)) { - recordList = new System.Collections.Generic.List>(); - _dataInTransaction[table] = recordList; + recordList = new System.Collections.Generic.List(); + dataInTransaction[table] = recordList; } recordList?.Add(record); @@ -294,7 +332,7 @@ record = table.Update(entry); updateLogger.ChangesSaved(entries, rowsAffected); - foreach (var tableData in _dataInTransaction) + foreach (var tableData in dataInTransaction) { tableData.Key.Commit(tableData.Value); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaDatabase.cs b/src/net/KEFCore/Storage/Internal/KafkaDatabase.cs index ea286c9a..b1222247 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaDatabase.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaDatabase.cs @@ -40,6 +40,11 @@ public KafkaDatabase( _updateLogger = updateLogger; } + public void Dispose() + { + _cluster?.Dispose(); + } + public virtual IKafkaCluster Cluster => _cluster; public override int SaveChanges(IList entries) => _cluster.ExecuteTransaction(entries, _updateLogger); diff --git a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs new file mode 100644 index 00000000..94e7595c --- /dev/null +++ b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs @@ -0,0 +1,64 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +#nullable enable + +using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; +using Java.Util.Concurrent; +using Org.Apache.Kafka.Clients.Producer; +using MASES.KNet.Producer; +using MASES.EntityFrameworkCore.KNet.Serdes.Internal; +using Org.Apache.Kafka.Common.Header; + +namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; + +public class KafkaRowBag : IKafkaRowBag +{ + public KafkaRowBag(IUpdateEntry entry, TKey key, object?[]? row) + { + UpdateEntry = entry; + Key = key; + ValueBuffer = row; + } + + public IUpdateEntry UpdateEntry { get; private set; } + + public TKey Key { get; private set; } + + public object?[]? ValueBuffer { get; private set; } + + public string GetKey(IKafkaSerdesEntityType _serdes) + { + return _serdes.Serialize(Key); + } + + public string GetValue(IKafkaSerdesEntityType _serdes) + { + return _serdes.Serialize(ValueBuffer); + } + + public ProducerRecord GetRecord(string topicName, IKafkaSerdesEntityType _serdes) + { + Headers headers = Headers.Create(); + string key = _serdes.Serialize(headers, Key); + string? value = UpdateEntry.EntityState == EntityState.Deleted ? null : _serdes.Serialize(headers, ValueBuffer); + var record = new ProducerRecord(topicName, 0, DateTime.Now, key, value!, headers); + + return record; + } +} diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs index 462cedb2..48fd3918 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs @@ -85,7 +85,7 @@ private void StartTopology(StreamsBuilder builder, KStream root) { actualState = newState; Trace.WriteLine("StateListener oldState: " + oldState + " newState: " + newState + " on " + DateTime.Now.ToString("HH:mm:ss.FFFFFFF")); - stateChanged.Set(); + if (stateChanged != null && !stateChanged.SafeWaitHandle.IsClosed) stateChanged.Set(); } }; @@ -151,13 +151,17 @@ System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() public void Dispose() { + streams?.Close(); dataReceived?.Dispose(); resetEvent?.Dispose(); - stateChanged?.Dispose(); exceptionSet?.Dispose(); - streams?.Close(); errorHandler?.Dispose(); stateListener?.Dispose(); + stateChanged?.Dispose(); + + streams = null; + errorHandler = null; + stateListener = null; } class KafkaEnumerator : IEnumerator @@ -169,6 +173,8 @@ class KafkaEnumerator : IEnumerator public KafkaEnumerator(IKafkaCluster kafkaCluster, ReadOnlyKeyValueStore? keyValueStore) { + if (kafkaCluster == null) throw new ArgumentNullException(nameof(kafkaCluster)); + if (keyValueStore == null) throw new ArgumentNullException(nameof(keyValueStore)); _kafkaCluster = kafkaCluster; _keyValueStore = keyValueStore; Trace.WriteLine($"KafkaEnumerator - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries()}"); diff --git a/src/net/KEFCore/Storage/Internal/KafkaTable.cs b/src/net/KEFCore/Storage/Internal/KafkaTable.cs index d91ef575..5b8b5a12 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTable.cs @@ -28,6 +28,10 @@ using Org.Apache.Kafka.Clients.Producer; using Org.Apache.Kafka.Common.Header; using Org.Apache.Kafka.Connect.Transforms; +using MASES.KNet; +using Microsoft.EntityFrameworkCore.Metadata.Internal; +using Org.Apache.Kafka.Common; +using MASES.KNet.Replicator; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -42,6 +46,7 @@ public class KafkaTable : IKafkaTable private Dictionary? _integerGenerators; + private readonly IKNetCompactedReplicator _kafkaCompactedReplicator; private readonly IProducer _kafkaProducer; private readonly string _tableAssociatedTopicName; private readonly IKafkaSerdesEntityType _serdes; @@ -57,8 +62,16 @@ public KafkaTable( _tableAssociatedTopicName = entityType.TopicName(cluster.Options); cluster.CreateTable(entityType); _serdes = cluster.CreateSerdes(entityType); - _kafkaProducer = cluster.CreateProducer(entityType); - _streamData = new KafkaStreamsTableRetriever(cluster, entityType); + if (cluster.Options.UseCompactedReplicator) + { + _kafkaCompactedReplicator = cluster.CreateCompactedReplicator(entityType); + _kafkaCompactedReplicator.Start(); + } + else + { + _kafkaProducer = cluster.CreateProducer(entityType); + _streamData = new KafkaStreamsTableRetriever(cluster, entityType); + } _keyValueFactory = entityType.FindPrimaryKey()!.GetPrincipalKeyValueFactory(); _sensitiveLoggingEnabled = sensitiveLoggingEnabled; @@ -84,6 +97,19 @@ public KafkaTable( } } + public virtual void Dispose() + { + if (Cluster.Options.UseCompactedReplicator) + { + _kafkaCompactedReplicator?.Dispose(); + } + else + { + _kafkaProducer?.Dispose(); + _streamData?.Dispose(); + } + } + public virtual IKafkaCluster Cluster { get; } public virtual IEntityType EntityType { get; } @@ -112,7 +138,14 @@ public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator)generator; } - public virtual IEnumerable ValueBuffers => _streamData; + public virtual IEnumerable ValueBuffers => GetValueBuffer(); + + private IEnumerable GetValueBuffer() + { + if (_streamData != null) return _streamData; + _kafkaCompactedReplicator.SyncWait(); + return _kafkaCompactedReplicator.Values.Select((item) => new ValueBuffer(_serdes.Deserialize(item))); + } public virtual IEnumerable Rows => RowsInTable(); @@ -159,7 +192,7 @@ public virtual IEnumerable Rows private static System.Collections.Generic.List GetKeyComparers(IEnumerable properties) => properties.Select(p => p.GetKeyValueComparer()).ToList(); - public virtual ProducerRecord Create(IUpdateEntry entry) + public virtual IKafkaRowBag Create(IUpdateEntry entry) { var properties = entry.EntityType.GetProperties().ToList(); var row = new object?[properties.Count]; @@ -183,10 +216,10 @@ public virtual ProducerRecord Create(IUpdateEntry entry) BumpValueGenerators(row); - return NewRecord(entry, key, row); + return new KafkaRowBag(entry, key, row); } - public virtual ProducerRecord Delete(IUpdateEntry entry) + public virtual IKafkaRowBag Delete(IUpdateEntry entry) { var key = CreateKey(entry); @@ -207,7 +240,7 @@ public virtual ProducerRecord Delete(IUpdateEntry entry) _rows.Remove(key); - return NewRecord(entry, key, null); + return new KafkaRowBag(entry, key, null); } else { @@ -246,7 +279,7 @@ private static bool IsConcurrencyConflict( return false; } - public virtual ProducerRecord Update(IUpdateEntry entry) + public virtual IKafkaRowBag Update(IUpdateEntry entry) { var key = CreateKey(entry); @@ -289,7 +322,7 @@ public virtual ProducerRecord Update(IUpdateEntry entry) BumpValueGenerators(valueBuffer); - return NewRecord(entry, key, valueBuffer); + return new KafkaRowBag(entry, key, valueBuffer); } else { @@ -297,18 +330,32 @@ public virtual ProducerRecord Update(IUpdateEntry entry) } } - public virtual IEnumerable> Commit(IEnumerable> records) + public virtual IEnumerable> Commit(IEnumerable records) { - System.Collections.Generic.List> futures = new(); - foreach (var record in records) + if (Cluster.Options.UseCompactedReplicator) { - var future = _kafkaProducer.Send(record); - futures.Add(future); + foreach (KafkaRowBag record in records) + { + var key = record.GetKey(_serdes); + var value = record.GetValue(_serdes); + _kafkaCompactedReplicator[key] = value; + } + + return null; } + else + { + System.Collections.Generic.List> futures = new(); + foreach (KafkaRowBag record in records) + { + var future = _kafkaProducer.Send(record.GetRecord(_tableAssociatedTopicName, _serdes)); + futures.Add(future); + } - _kafkaProducer.Flush(); + _kafkaProducer.Flush(); - return futures; + return futures; + } } public virtual void BumpValueGenerators(object?[] row) diff --git a/test/KEFCore.Test/KEFCore.Test.csproj b/test/KEFCore.Test/KEFCore.Test.csproj index eefaacbf..be26f56e 100644 --- a/test/KEFCore.Test/KEFCore.Test.csproj +++ b/test/KEFCore.Test/KEFCore.Test.csproj @@ -11,6 +11,7 @@ - + + diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs index ba9fc215..8a783931 100644 --- a/test/KEFCore.Test/Program.cs +++ b/test/KEFCore.Test/Program.cs @@ -25,47 +25,76 @@ using MASES.EntityFrameworkCore.KNet.Infrastructure; using MASES.KNet.Streams; using Microsoft.EntityFrameworkCore; +using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; namespace MASES.EntityFrameworkCore.KNet.Test { - class Program + partial class Program { const string theServer = "localhost:9092"; static string serverToUse = theServer; + static string databaseName = "TestDB"; + static string databaseNameWithModel = "TestDBWithModel"; + static string applicationId = "TestApplication"; static void Main(string[] args) { - KEFCore.CreateGlobalInstance(); - var appArgs = KEFCore.FilteredArgs; - - if (appArgs.Length != 0) + if (!UseInMemoryProvider) { - serverToUse = args[0]; + KEFCore.CreateGlobalInstance(); + var appArgs = KEFCore.FilteredArgs; + + if (appArgs.Length > 0) + { + serverToUse = args[0]; + } + + if (appArgs.Length > 1) + { + deleteApplication = args[1].ToLowerInvariant() == "true"; + } + + if (appArgs.Length > 2) + { + applicationId = args[2]; + } } - var streamConfig = StreamsConfigBuilder.Create(); - streamConfig = streamConfig.WithAcceptableRecoveryLag(100); + DatabaseName = UseModelBuilder ? databaseNameWithModel : databaseName; - using (var context = new BloggingContext() + var globalWatcher = Stopwatch.StartNew(); + StreamsConfigBuilder streamConfig = null; + if (!UseInMemoryProvider) + { + streamConfig = StreamsConfigBuilder.Create(); + streamConfig = streamConfig.WithAcceptableRecoveryLag(100); + } + + var context = new BloggingContext() { BootstrapServers = serverToUse, - ApplicationId = "TestApplication", - DbName = "TestDB", + ApplicationId = applicationId, + DbName = DatabaseName, StreamsConfigBuilder = streamConfig, - }) + }; + + if (deleteApplication) { context.Database.EnsureDeleted(); context.Database.EnsureCreated(); + } - for (int i = 0; i < 1000; i++) + var testWatcher = Stopwatch.StartNew(); + Stopwatch watch = Stopwatch.StartNew(); + for (int i = 0; i < 1000; i++) + { + context.Add(new Blog { - context.Add(new Blog - { - Url = "http://blogs.msdn.com/adonet" + i.ToString(), - Posts = new List() + Url = "http://blogs.msdn.com/adonet" + i.ToString(), + Posts = new List() { new Post() { @@ -73,68 +102,127 @@ static void Main(string[] args) Content = i.ToString() } }, - Rating = i, - }); - } - context.SaveChanges(); + Rating = i, + }); } + watch.Stop(); + Trace.WriteLine($"Elapsed data load {watch.ElapsedMilliseconds} ms"); - using (var context = new BloggingContext() - { - BootstrapServers = serverToUse, - ApplicationId = "TestApplication", - DbName = "TestDB", - StreamsConfigBuilder = streamConfig, - }) - { - - //var pageObject = (from op in context.Blogs - // join pg in context.Posts on op.BlogId equals pg.BlogId - // where pg.BlogId == op.BlogId - // select new { pg, op }).SingleOrDefault(); - - Stopwatch watch = Stopwatch.StartNew(); - var post = context.Posts.Single(b => b.BlogId == 2); - watch.Stop(); - Trace.WriteLine($"Elapsed {watch.ElapsedMilliseconds} ms"); + watch.Restart(); + context.SaveChanges(); + watch.Stop(); + Trace.WriteLine($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms"); + if (UseModelBuilder) + { watch.Restart(); - post = context.Posts.Single(b => b.BlogId == 1); + var pageObject = (from op in context.Blogs + join pg in context.Posts on op.BlogId equals pg.BlogId + where pg.BlogId == op.BlogId + select new { pg, op }).SingleOrDefault(); watch.Stop(); - Trace.WriteLine($"Elapsed {watch.ElapsedMilliseconds} ms"); + Trace.WriteLine($"Elapsed UseModelBuilder {watch.ElapsedMilliseconds} ms"); + } - watch.Restart(); - var all = context.Posts.All((o) => true); - watch.Stop(); - Trace.WriteLine($"Elapsed {watch.ElapsedMilliseconds} ms"); + watch.Restart(); + var post = context.Posts.Single(b => b.BlogId == 2); + watch.Stop(); + Trace.WriteLine($"Elapsed context.Posts.Single(b => b.BlogId == 2) {watch.ElapsedMilliseconds} ms. Result is {post}"); + + watch.Restart(); + post = context.Posts.Single(b => b.BlogId == 1); + watch.Stop(); + Trace.WriteLine($"Elapsed context.Posts.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {post}"); + + watch.Restart(); + var all = context.Posts.All((o) => true); + watch.Stop(); + Trace.WriteLine($"Elapsed context.Posts.All((o) => true) {watch.ElapsedMilliseconds} ms. Result is {all}"); + + watch.Restart(); + var blog = context.Blogs!.Single(b => b.BlogId == 1); + watch.Stop(); + Trace.WriteLine($"Elapsed context.Blogs!.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {blog}"); + + watch.Restart(); + context.Remove(post); + context.Remove(blog); + watch.Stop(); + Trace.WriteLine($"Elapsed data remove {watch.ElapsedMilliseconds} ms"); + + watch.Restart(); + context.SaveChanges(); + watch.Stop(); + Trace.WriteLine($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms"); + + watch.Restart(); + for (int i = 1000; i < 1100; i++) + { + context.Add(new Blog + { + Url = "http://blogs.msdn.com/adonet" + i.ToString(), + Posts = new List() + { + new Post() + { + Title = "title", + Content = i.ToString() + } + }, + Rating = i, + }); + } + watch.Stop(); + Trace.WriteLine($"Elapsed data load {watch.ElapsedMilliseconds} ms"); - watch.Restart(); - var blog = context.Blogs!.Single(b => b.BlogId == 1); - watch.Stop(); - Trace.WriteLine($"Elapsed {watch.ElapsedMilliseconds} ms"); + watch.Restart(); + context.SaveChanges(); + watch.Stop(); + Trace.WriteLine($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms"); - var value = context.Blogs.AsQueryable().ToQueryString(); - } + watch.Restart(); + post = context.Posts.Single(b => b.BlogId == 1009); + watch.Stop(); + Trace.WriteLine($"Elapsed context.Posts.Single(b => b.BlogId == 1009) {watch.ElapsedMilliseconds} ms. Result is {post}"); + + var value = context.Blogs.AsQueryable().ToQueryString(); + + context?.Dispose(); + testWatcher.Stop(); + globalWatcher.Stop(); + Console.WriteLine($"Full test completed in {globalWatcher.Elapsed}, only tests completed in {testWatcher.Elapsed}"); } } public class BloggingContext : KafkaDbContext { + public override bool UseCompactedReplicator { get; set; } = Program.UseCompactedReplicator; + public DbSet Blogs { get; set; } public DbSet Posts { get; set; } - //protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) - //{ - // optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) => - // { - // o.StreamsConfig(o.EmptyStreamsConfigBuilder.WithAcceptableRecoveryLag(100)).WithDefaultNumPartitions(10); - // }); - //} - - //protected override void OnModelCreating(ModelBuilder modelBuilder) - //{ - // modelBuilder.Entity().HasKey(c => new { c.BlogId, c.Rating }); - //} + protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) + { + if (Program.UseInMemoryProvider) + { + optionsBuilder.UseInMemoryDatabase(Program.DatabaseName); + } + else + { + base.OnConfiguring(optionsBuilder); + } + //optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) => + //{ + // o.StreamsConfig(o.EmptyStreamsConfigBuilder.WithAcceptableRecoveryLag(100)).WithDefaultNumPartitions(10); + //}); + } + + protected override void OnModelCreating(ModelBuilder modelBuilder) + { + if (!Program.UseModelBuilder) return; + + modelBuilder.Entity().HasKey(c => new { c.BlogId, c.Rating }); + } } public class Blog @@ -143,6 +231,11 @@ public class Blog public string Url { get; set; } public long Rating { get; set; } public List Posts { get; set; } + + public override string ToString() + { + return $"BlogId: {BlogId} Url: {Url} Rating: {Rating}"; + } } public class Post @@ -153,5 +246,10 @@ public class Post public int BlogId { get; set; } public Blog Blog { get; set; } + + public override string ToString() + { + return $"PostId: {PostId} Title: {Title} Content: {Content} BlogId: {BlogId}"; + } } } diff --git a/test/KEFCore.Test/ProgramConfig.cs b/test/KEFCore.Test/ProgramConfig.cs new file mode 100644 index 00000000..f17acf99 --- /dev/null +++ b/test/KEFCore.Test/ProgramConfig.cs @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2022 MASES s.r.l. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +namespace MASES.EntityFrameworkCore.KNet.Test +{ + partial class Program + { + public static bool UseInMemoryProvider = false; + public static bool UseModelBuilder = false; + public static bool UseCompactedReplicator = false; + public static string DatabaseName = null; + static bool deleteApplication = false; + } +}