diff --git a/README.md b/README.md index b93d2697..bbe0ed0d 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,42 @@ -# KEFCore: the EntityFrameworkCore provider for Apache Kafka +# KEFCore: Entity Framework Core provider for Apache Kafka -[![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml) [![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml) +KEFCore is the Entity Framework Core provider for Apache Kafka. +Based on [KNet](https://github.com/masesgroup/KNet) it allows to use Apache Kafka as a distributed database. + +### Libraries and Tools [![latest version](https://img.shields.io/nuget/v/MASES.EntityFrameworkCore.KNet)](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet) [![downloads](https://img.shields.io/nuget/dt/MASES.EntityFrameworkCore.KNet)](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet) -KEFCore is the EntityFrameworkCore provider for Apache Kafka. -Based on [KNet](https://github.com/masesgroup/KNet) it allows to use Apache Kafka as a distributed database. +### Pipelines -This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com. +[![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml) +[![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml) + +--- ## Scope of the project This project aims to create a provider to access the information stored within an Apache Kafka cluster using the paradigm behind Entity Framework. The project is based on available information within the official [EntityFrameworkCore repository](https://github.com/dotnet/efcore), many classes was copied from there as reported in the official documentation within the Microsoft website at https://docs.microsoft.com/en-us/ef/core/providers/writing-a-provider. +### Community and Contribution + +Do you like the project? +- Request your free [community subscription](https://www.jcobridge.com/pricing-25/). + +Do you want to help us? +- put a :star: on this project +- open [issues](https://github.com/masesgroup/KEFCore/issues) to request features or report bugs :bug: +- improves the project with Pull Requests + +This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com. + +## Summary + +* [Roadmap](src/documentation/articles/roadmap.md) +* [Actual state](src/documentation/articles/actualstate.md) +* [KEFCore usage](src/documentation/articles/usage.md) + ## Runtime engine KEFCore uses [KNet](https://github.com/masesgroup/KNet), and indeed [JCOBridge](https://www.jcobridge.com) with its [features](https://www.jcobridge.com/features/), to obtain many benefits: @@ -36,13 +59,6 @@ Have a look at the following JCOBridge resources: - [Commercial Edition](https://www.jcobridge.com/pricing-25/) - Latest release: [![JCOBridge nuget](https://img.shields.io/nuget/v/MASES.JCOBridge)](https://www.nuget.org/packages/MASES.JCOBridge) ---- -## Summary - -* [Roadmap](src/documentation/articles/roadmap.md) -* [Actual state](src/documentation/articles/actualstate.md) -* [KEFCore usage](src/documentation/articles/usage.md) - --- KAFKA is a registered trademark of The Apache Software Foundation. KEFCore has no affiliation with and is not endorsed by The Apache Software Foundation. diff --git a/src/documentation/articles/intro.md b/src/documentation/articles/intro.md index 941f20f2..bcb98275 100644 --- a/src/documentation/articles/intro.md +++ b/src/documentation/articles/intro.md @@ -1,6 +1,6 @@ # Welcome to KEFCore -KEFCore is the EntityFrameworkCore provider for Apache Kafka. +KEFCore is the Entity Framework Core provider for Apache Kafka. Based on [KNet](https://github.com/masesgroup/KNet) it allows to use Apache Kafka as a distributed database. This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com. diff --git a/src/documentation/articles/roadmap.md b/src/documentation/articles/roadmap.md index 20028ec0..c67c227e 100644 --- a/src/documentation/articles/roadmap.md +++ b/src/documentation/articles/roadmap.md @@ -4,3 +4,4 @@ The roadmap can be synthetized in the following points: * Create a first working provider based on InMemory provider * Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information +* Use KNetCompactedReplicator beside Apache Kafka Streams diff --git a/src/documentation/articles/usage.md b/src/documentation/articles/usage.md index a352a11e..2009dfc5 100644 --- a/src/documentation/articles/usage.md +++ b/src/documentation/articles/usage.md @@ -4,7 +4,7 @@ ### Installation -EF Core for Apache Kafka is available on [NuGet](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet): +Entity Framework Core provider for Apache Kafka is available on [NuGet](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet): ```sh dotnet add package MASES.EntityFrameworkCore.KNet @@ -12,7 +12,7 @@ dotnet add package MASES.EntityFrameworkCore.KNet ### Basic usage -The following code demonstrates basic usage of EF Core for Apache Kafka. +The following code demonstrates basic usage of Entity Framework Core provider for Apache Kafka. For a full tutorial configuring the `KafkaDbContext`, defining the model, and creating the database, see [KafkaDbContext](kafkadbcontext.md) in the docs. ```cs diff --git a/src/documentation/index.md b/src/documentation/index.md index c2de054f..08394643 100644 --- a/src/documentation/index.md +++ b/src/documentation/index.md @@ -1,8 +1,8 @@ -# KEFCore: the EntityFrameworkCore provider for Apache Kafka +# KEFCore: Entity Framework Core provider for Apache Kafka [![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml) [![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml) -KEFCore is the EntityFrameworkCore provider for Apache Kafka. +KEFCore is the Entity Framework Core provider for Apache Kafka. Based on [KNet](https://github.com/masesgroup/KNet) it allows to use Apache Kafka as a distributed database. This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com. diff --git a/src/net/Common/Common.props b/src/net/Common/Common.props index 927d080a..f061bd4a 100644 --- a/src/net/Common/Common.props +++ b/src/net/Common/Common.props @@ -4,7 +4,7 @@ MASES s.r.l. MASES s.r.l. MASES s.r.l. - 0.7.2.0 + 0.8.0.0 net6.0 latest true diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs index 33b56157..d10ac5b8 100644 --- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs @@ -34,7 +34,7 @@ public interface IKafkaSingletonOptions : ISingletonOptions string? BootstrapServers { get; } - bool ProducerByEntity { get; } + //bool ProducerByEntity { get; } bool UseCompactedReplicator { get; } diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index ca3ece5a..5d0d9e31 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -36,7 +36,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension private string? _databaseName; private string? _applicationId; private string? _bootstrapServers; - private bool _producerByEntity = false; + //private bool _producerByEntity = false; private bool _useCompactedReplicator = false; private bool _usePersistentStorage = false; private int _defaultNumPartitions = 1; @@ -60,7 +60,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) _databaseName = copyFrom._databaseName; _applicationId = copyFrom._applicationId; _bootstrapServers = copyFrom._bootstrapServers; - _producerByEntity = copyFrom._producerByEntity; + //_producerByEntity = copyFrom._producerByEntity; _useCompactedReplicator = copyFrom._useCompactedReplicator; _usePersistentStorage = copyFrom._usePersistentStorage; _defaultNumPartitions = copyFrom._defaultNumPartitions; @@ -85,7 +85,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) public virtual string BootstrapServers => _bootstrapServers!; - public virtual bool ProducerByEntity => _producerByEntity; + //public virtual bool ProducerByEntity => _producerByEntity; public virtual bool UseCompactedReplicator => _useCompactedReplicator; @@ -139,14 +139,14 @@ public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServer return clone; } - public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity = false) - { - var clone = Clone(); + //public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity = false) + //{ + // var clone = Clone(); - clone._producerByEntity = producerByEntity; + // clone._producerByEntity = producerByEntity; - return clone; - } + // return clone; + //} public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = false) { @@ -242,7 +242,7 @@ public virtual Properties StreamsOptions(string applicationId) { props.Remove(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG); } - props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$StringSerde", true, SystemClassLoader)); + props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader)); if (props.ContainsKey(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG)) { props.Remove(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG); @@ -277,16 +277,16 @@ public virtual Properties ProducerOptions() { props.Put(ProducerConfig.LINGER_MS_CONFIG, 1); } - if (props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) - { - props.Remove(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); - } - props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader)); - if (props.ContainsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) - { - props.Remove(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); - } - props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader)); + //if (props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + //{ + // props.Remove(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); + //} + //props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader)); + //if (props.ContainsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) + //{ + // props.Remove(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + //} + //props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader)); return props; } diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs index ced2c8b6..ffae2fd6 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs @@ -34,7 +34,7 @@ public virtual void Initialize(IDbContextOptions options) DatabaseName = kafkaOptions.DatabaseName; ApplicationId = kafkaOptions.ApplicationId; BootstrapServers = kafkaOptions.BootstrapServers; - ProducerByEntity = kafkaOptions.ProducerByEntity; + //ProducerByEntity = kafkaOptions.ProducerByEntity; UseCompactedReplicator = kafkaOptions.UseCompactedReplicator; UsePersistentStorage = kafkaOptions.UsePersistentStorage; DefaultNumPartitions = kafkaOptions.DefaultNumPartitions; diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs index 7972863d..cdfd20d9 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs @@ -70,10 +70,10 @@ public KafkaDbContext(DbContextOptions options) : base(options) /// Use persistent storage /// public virtual bool UsePersistentStorage { get; set; } = false; - /// - /// Use a producer for each Entity - /// - public bool UseProducerByEntity { get; set; } = false; + ///// + ///// Use a producer for each Entity + ///// + //public bool UseProducerByEntity { get; set; } = false; /// /// Use instead of Apache Kafka Streams /// @@ -104,7 +104,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { o.StreamsConfig(StreamsConfigBuilder ?? o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions); o.WithUsePersistentStorage(UsePersistentStorage); - o.WithProducerByEntity(UseProducerByEntity); + //o.WithProducerByEntity(UseProducerByEntity); o.WithCompactedReplicator(UseCompactedReplicator); o.WithDefaultReplicationFactor(DefaultReplicationFactor); }); diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs index 7271b5a3..0b43f008 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs @@ -93,26 +93,26 @@ public virtual KafkaDbContextOptionsBuilder WithUseNameMatching(bool useNameMatc return this; } - /// - /// Enables creation of producer for each - /// - /// - /// See Using DbContextOptions, and - /// The EF Core Kafka database provider for more information and examples. - /// - /// If , then each entity will have its own . - /// The same builder instance so that multiple calls can be chained. - public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerByEntity = false) - { - var extension = OptionsBuilder.Options.FindExtension() - ?? new KafkaOptionsExtension(); - - extension = extension.WithProducerByEntity(producerByEntity); - - ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension); - - return this; - } + ///// + ///// Enables creation of producer for each + ///// + ///// + ///// See Using DbContextOptions, and + ///// The EF Core Kafka database provider for more information and examples. + ///// + ///// If , then each entity will have its own . + ///// The same builder instance so that multiple calls can be chained. + //public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerByEntity = false) + //{ + // var extension = OptionsBuilder.Options.FindExtension() + // ?? new KafkaOptionsExtension(); + + // extension = extension.WithProducerByEntity(producerByEntity); + + // ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension); + + // return this; + //} /// /// Enables use of diff --git a/src/net/KEFCore/KEFCore.csproj b/src/net/KEFCore/KEFCore.csproj index ce0f6f01..ff1d0029 100644 --- a/src/net/KEFCore/KEFCore.csproj +++ b/src/net/KEFCore/KEFCore.csproj @@ -70,6 +70,7 @@ All None + diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs new file mode 100644 index 00000000..2b8a84bc --- /dev/null +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs @@ -0,0 +1,284 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +#nullable enable + +using Java.Util.Concurrent; +using MASES.EntityFrameworkCore.KNet.Serdes.Internal; +using MASES.KNet.Producer; +using MASES.KNet.Replicator; +using MASES.KNet.Serialization; +using System.Collections.Concurrent; +using System.Text.Json.Serialization; +using MASES.KNet.Serialization.Json; +using Org.Apache.Kafka.Clients.Producer; +using System.Text.Json; +using Javax.Xml.Crypto; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; + +namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; + +public class EntityTypeProducers +{ + static IEntityTypeProducer? _globalProducer = null; + static readonly ConcurrentDictionary _producers = new ConcurrentDictionary(); + + public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull + { + //if (!cluster.Options.ProducerByEntity) + //{ + // lock (_producers) + // { + // if (_globalProducer == null) _globalProducer = CreateProducerLocal(entityType, cluster); + // return _globalProducer; + // } + //} + //else + //{ + return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster)); + //} + } + + static IEntityTypeProducer CreateProducerLocal(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull => new EntityTypeProducer(entityType, cluster); +} + +[JsonSerializable(typeof(ObjectType))] +public class ObjectType : IJsonOnDeserialized +{ + public ObjectType() + { + + } + + public ObjectType(IProperty typeName, object value) + { + TypeName = typeName.ClrType?.FullName; + Value = value; + } + + public void OnDeserialized() + { + if (Value is JsonElement elem) + { + switch (elem.ValueKind) + { + case JsonValueKind.Undefined: + break; + case JsonValueKind.Object: + break; + case JsonValueKind.Array: + break; + case JsonValueKind.String: + Value = elem.GetString()!; + break; + case JsonValueKind.Number: + var tmp = elem.GetInt64(); + Value = Convert.ChangeType(tmp, Type.GetType(TypeName!)!); + break; + case JsonValueKind.True: + Value = true; + break; + case JsonValueKind.False: + Value = false; + break; + case JsonValueKind.Null: + Value = null; + break; + default: + break; + } + } + else + { + Value = Convert.ChangeType(Value, Type.GetType(TypeName!)!); + } + } + + public string? TypeName { get; set; } + + public object Value { get; set; } +} + +[JsonSerializable(typeof(KNetEntityTypeData<>))] +public class KNetEntityTypeData +{ + public KNetEntityTypeData() { } + + public KNetEntityTypeData(IEntityType tName, IProperty[] properties, object[] rData) + { + TypeName = tName.Name; + Data = new Dictionary(); + for (int i = 0; i < properties.Length; i++) + { + Data.Add(properties[i].GetIndex(), new ObjectType(properties[i], rData[i])); + } + } + + public string TypeName { get; set; } + // [JsonConverter(typeof(ListStringObjectTupleConverter))] + public Dictionary Data { get; set; } + + public object[] GetData(IEntityType tName) + { + if (Data == null) return null; + + var array = Data.Select((o) => o.Value.Value).ToArray(); + + return array; + + var _properties = tName.GetProperties().ToArray(); + List data = new List(); + + for (int i = 0; i < Data!.Count; i++) + { + if (Data[i].Value is JsonElement elem) + { + switch (elem.ValueKind) + { + case JsonValueKind.Undefined: + break; + case JsonValueKind.Object: + break; + case JsonValueKind.Array: + break; + case JsonValueKind.String: + data.Add(elem.GetString()); + break; + case JsonValueKind.Number: + var tmp = elem.GetInt64(); + data.Add(Convert.ChangeType(tmp, _properties[i].ClrType)); + break; + case JsonValueKind.True: + data.Add(true); + break; + case JsonValueKind.False: + data.Add(false); + break; + case JsonValueKind.Null: + data.Add(null); + break; + default: + break; + } + + } + else + { + data.Add(Convert.ChangeType(Data[i], _properties[i].ClrType)); + } + } + return data.ToArray(); + } +} + +public class EntityTypeProducer : IEntityTypeProducer where TKey : notnull +{ + private readonly bool _useCompactedReplicator; + private readonly IKafkaCluster _cluster; + private readonly IEntityType _entityType; + private readonly IKNetCompactedReplicator>? _kafkaCompactedReplicator; + private readonly IKNetProducer>? _kafkaProducer; + private readonly IKafkaStreamsBaseRetriever _streamData; + private readonly KNetSerDes _keySerdes; + private readonly KNetSerDes> _valueSerdes; + + public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) + { + _entityType = entityType; + _cluster = cluster; + _useCompactedReplicator = _cluster.Options.UseCompactedReplicator; + + if (KNetSerialization.IsInternalManaged()) + { + _keySerdes = new KNetSerDes(); + } + else _keySerdes = new JsonSerDes(); + + _valueSerdes = new JsonSerDes>(); + + if (_useCompactedReplicator) + { + _kafkaCompactedReplicator = new KNetCompactedReplicator>() + { + UpdateMode = UpdateModeTypes.OnConsume, + BootstrapServers = _cluster.Options.BootstrapServers, + StateName = _entityType.TopicName(_cluster.Options), + Partitions = _entityType.NumPartitions(_cluster.Options), + ConsumerInstances = _entityType.ConsumerInstances(_cluster.Options), + ReplicationFactor = _entityType.ReplicationFactor(_cluster.Options), + TopicConfig = _cluster.Options.TopicConfigBuilder, + ProducerConfig = _cluster.Options.ProducerConfigBuilder, + KeySerDes = _keySerdes, + ValueSerDes = _valueSerdes, + }; + } + else + { + _kafkaProducer = new KNetProducer>(_cluster.Options.ProducerOptions(), _keySerdes, _valueSerdes); + _streamData = new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes, _valueSerdes); + } + } + + public IEnumerable> Commit(IEnumerable records) + { + if (_useCompactedReplicator) + { + foreach (KafkaRowBag record in records) + { + var value = record.Value; + if (_kafkaCompactedReplicator != null) _kafkaCompactedReplicator[record.Key] = value!; + } + + return null; + } + else + { + List> futures = new(); + foreach (KafkaRowBag record in records) + { + var future = _kafkaProducer?.Send(new KNetProducerRecord>(record.AssociatedTopicName, 0, record.Key, record.Value!)); + futures.Add(future); + } + + _kafkaProducer?.Flush(); + + return futures; + } + } + + public void Dispose() + { + if (_useCompactedReplicator) + { + _kafkaCompactedReplicator?.Dispose(); + } + else + { + _kafkaProducer?.Dispose(); + _streamData?.Dispose(); + } + } + + public IEnumerable GetValueBuffer() + { + if (_streamData != null) return _streamData; + _kafkaCompactedReplicator?.SyncWait(); + if (_kafkaCompactedReplicator == null) throw new InvalidOperationException("Missing _kafkaCompactedReplicator"); + return _kafkaCompactedReplicator.Values.Select((item) => new ValueBuffer(item.GetData(_entityType))); + } +} diff --git a/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs new file mode 100644 index 00000000..1a8eb72c --- /dev/null +++ b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs @@ -0,0 +1,31 @@ +/* +* Copyright 2022 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +#nullable enable + +using Java.Util.Concurrent; +using Org.Apache.Kafka.Clients.Producer; + +namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; + +public interface IEntityTypeProducer : IDisposable +{ + IEnumerable> Commit(IEnumerable records); + + IEnumerable GetValueBuffer(); +} diff --git a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs index fe833f8c..985695e8 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs @@ -19,10 +19,6 @@ using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; -using MASES.KNet; -using MASES.KNet.Producer; -using MASES.KNet.Replicator; -using Org.Apache.Kafka.Clients.Producer; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -42,16 +38,10 @@ bool EnsureConnected( IModel designModel, IDiagnosticsLogger updateLogger); - bool CreateTable(IEntityType entityType); + string CreateTable(IEntityType entityType); IKafkaSerdesFactory SerdesFactory { get; } - IKafkaSerdesEntityType CreateSerdes(IEntityType entityType); - - IKNetCompactedReplicator CreateCompactedReplicator(IEntityType entityType); - - IProducer CreateProducer(IEntityType entityType); - IEnumerable GetData(IEntityType entityType); KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property); diff --git a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs index 33e96f29..ab72f9a0 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs @@ -18,10 +18,6 @@ #nullable enable -using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; -using Java.Util.Concurrent; -using Org.Apache.Kafka.Clients.Producer; -using MASES.KNet.Producer; using MASES.EntityFrameworkCore.KNet.Serdes.Internal; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -29,4 +25,6 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; public interface IKafkaRowBag { IUpdateEntry UpdateEntry { get; } + + string AssociatedTopicName { get; } } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs index e584c7bc..4242794b 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs @@ -21,7 +21,6 @@ using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; using Java.Util.Concurrent; using Org.Apache.Kafka.Clients.Producer; -using MASES.KNet.Producer; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs index 97486426..c6d6f97e 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs @@ -23,18 +23,9 @@ using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; using Java.Util; using MASES.EntityFrameworkCore.KNet.Serdes.Internal; -using System.Collections.Concurrent; using Java.Util.Concurrent; -using MASES.KNet.Producer; using Org.Apache.Kafka.Clients.Admin; -using Org.Apache.Kafka.Common.Config; -using Org.Apache.Kafka.Clients.Producer; using Org.Apache.Kafka.Common.Errors; -using MASES.KNet.Serialization; -using MASES.KNet.Extensions; -using MASES.KNet; -using Org.Apache.Kafka.Common; -using MASES.KNet.Replicator; using Org.Apache.Kafka.Tools; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -51,9 +42,6 @@ public class KafkaCluster : IKafkaCluster private System.Collections.Generic.Dictionary? _tables; - private IProducer? _globalProducer = null; - private readonly ConcurrentDictionary> _producers; - public KafkaCluster( KafkaOptionsExtension options, IKafkaTableFactory tableFactory, @@ -63,7 +51,6 @@ public KafkaCluster( _tableFactory = tableFactory; _serdesFactory = serdesFactory; _useNameMatching = options.UseNameMatching; - _producers = new(); Properties props = new(); props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _options.BootstrapServers); _kafkaAdminClient = KafkaAdminClient.Create(props); @@ -187,13 +174,14 @@ public virtual bool EnsureConnected( return true; } - public virtual bool CreateTable(IEntityType entityType) + public virtual string CreateTable(IEntityType entityType) { + var topicName = entityType.TopicName(Options); try { try { - var topic = new NewTopic(entityType.TopicName(Options), entityType.NumPartitions(Options), entityType.ReplicationFactor(Options)); + var topic = new NewTopic(topicName, entityType.NumPartitions(Options), entityType.ReplicationFactor(Options)); Options.TopicConfigBuilder.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete; Options.TopicConfigBuilder.RetentionBytes = 1024 * 1024 * 1024; var map = Options.TopicConfigBuilder.ToMap(); @@ -214,49 +202,10 @@ public virtual bool CreateTable(IEntityType entityType) Thread.Sleep(1000); // wait a while to complete topic deletion return CreateTable(entityType); } - return false; - } - return true; - } - - public virtual IKafkaSerdesEntityType CreateSerdes(IEntityType entityType) => _serdesFactory.GetOrCreate(entityType); - - public virtual IKNetCompactedReplicator CreateCompactedReplicator(IEntityType entityType) - { - lock (_lock) - { - return new KNetCompactedReplicator() - { - UpdateMode = UpdateModeTypes.OnConsume, - BootstrapServers = Options.BootstrapServers, - StateName = entityType.TopicName(Options), - Partitions = entityType.NumPartitions(Options), - ConsumerInstances = entityType.ConsumerInstances(Options), - ReplicationFactor = entityType.ReplicationFactor(Options), - TopicConfig = Options.TopicConfigBuilder, - ProducerConfig = Options.ProducerConfigBuilder, - }; } + return topicName; } - public virtual IProducer CreateProducer(IEntityType entityType) - { - if (!Options.ProducerByEntity) - { - lock (_lock) - { - if (_globalProducer == null) _globalProducer = CreateProducer(); - return _globalProducer; - } - } - else - { - return _producers.GetOrAdd(entityType, _ => CreateProducer()); - } - } - - private IProducer CreateProducer() => new KafkaProducer(Options.ProducerOptions()); - private static System.Collections.Generic.Dictionary CreateTables() => new(); public virtual IEnumerable GetData(IEntityType entityType) diff --git a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs index 94e7595c..3782790a 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs @@ -18,47 +18,28 @@ #nullable enable -using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; -using Java.Util.Concurrent; -using Org.Apache.Kafka.Clients.Producer; -using MASES.KNet.Producer; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; -using Org.Apache.Kafka.Common.Header; - namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; public class KafkaRowBag : IKafkaRowBag { - public KafkaRowBag(IUpdateEntry entry, TKey key, object?[]? row) + public KafkaRowBag(IUpdateEntry entry, string topicName, TKey key, IProperty[] properties, object?[]? row) { UpdateEntry = entry; + AssociatedTopicName = topicName; Key = key; + Properties = properties; ValueBuffer = row; } public IUpdateEntry UpdateEntry { get; private set; } - public TKey Key { get; private set; } - - public object?[]? ValueBuffer { get; private set; } + public string AssociatedTopicName { get; private set; } - public string GetKey(IKafkaSerdesEntityType _serdes) - { - return _serdes.Serialize(Key); - } + public TKey Key { get; private set; } - public string GetValue(IKafkaSerdesEntityType _serdes) - { - return _serdes.Serialize(ValueBuffer); - } + public IProperty[] Properties { get; private set; } - public ProducerRecord GetRecord(string topicName, IKafkaSerdesEntityType _serdes) - { - Headers headers = Headers.Create(); - string key = _serdes.Serialize(headers, Key); - string? value = UpdateEntry.EntityState == EntityState.Deleted ? null : _serdes.Serialize(headers, ValueBuffer); - var record = new ProducerRecord(topicName, 0, DateTime.Now, key, value!, headers); + public KNetEntityTypeData? Value => UpdateEntry.EntityState == EntityState.Deleted ? null : new KNetEntityTypeData(UpdateEntry.EntityType, Properties, ValueBuffer!); - return record; - } + public object?[]? ValueBuffer { get; private set; } } diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs index 48fd3918..51504bdb 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs @@ -18,7 +18,7 @@ #nullable enable -using MASES.JCOBridge.C2JBridge; +using MASES.KNet.Serialization; using Org.Apache.Kafka.Common.Utils; using Org.Apache.Kafka.Streams; using Org.Apache.Kafka.Streams.Errors; @@ -29,10 +29,16 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal { - public class KafkaStreamsBaseRetriever : IEnumerable, IDisposable + public interface IKafkaStreamsBaseRetriever : IEnumerable, IDisposable + { + } + + public class KafkaStreamsBaseRetriever : IKafkaStreamsBaseRetriever { private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; + private readonly IKNetSerDes _keySerdes; + private readonly IKNetSerDes> _valueSerdes; private readonly StreamsBuilder _builder; private readonly KStream _root; @@ -50,10 +56,12 @@ public class KafkaStreamsBaseRetriever : IEnumerable, IDispos private State actualState = State.NOT_RUNNING; private ReadOnlyKeyValueStore? keyValueStore; - public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, string storageId, StreamsBuilder builder, KStream root) + public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, string storageId, StreamsBuilder builder, KStream root) { _kafkaCluster = kafkaCluster; _entityType = entityType; + _keySerdes = keySerdes; + _valueSerdes = valueSerdes; _builder = builder; _root = root; _storageId = _kafkaCluster.Options.UsePersistentStorage ? storageId : Process.GetCurrentProcess().ProcessName + "-" + storageId; @@ -141,7 +149,7 @@ public IEnumerator GetEnumerator() { if (resultException != null) throw resultException; Trace.WriteLine("Requested KafkaEnumerator on " + DateTime.Now.ToString("HH:mm:ss.FFFFFFF")); - return new KafkaEnumerator(_kafkaCluster, keyValueStore); + return new KafkaEnumerator(_kafkaCluster, _entityType, _keySerdes, _valueSerdes, keyValueStore); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() @@ -167,17 +175,25 @@ public void Dispose() class KafkaEnumerator : IEnumerator { private readonly IKafkaCluster _kafkaCluster; + private readonly IEntityType _entityType; + private readonly IKNetSerDes _keySerdes; + private readonly IKNetSerDes> _valueSerdes; private readonly ReadOnlyKeyValueStore? _keyValueStore; private KeyValueIterator? keyValueIterator = null; private IEnumerator>? keyValueEnumerator = null; - public KafkaEnumerator(IKafkaCluster kafkaCluster, ReadOnlyKeyValueStore? keyValueStore) + public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, ReadOnlyKeyValueStore? keyValueStore) { if (kafkaCluster == null) throw new ArgumentNullException(nameof(kafkaCluster)); + if (keySerdes == null) throw new ArgumentNullException(nameof(keySerdes)); + if (valueSerdes == null) throw new ArgumentNullException(nameof(valueSerdes)); if (keyValueStore == null) throw new ArgumentNullException(nameof(keyValueStore)); _kafkaCluster = kafkaCluster; + _entityType = entityType; + _keySerdes = keySerdes; + _valueSerdes = valueSerdes; _keyValueStore = keyValueStore; - Trace.WriteLine($"KafkaEnumerator - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries()}"); + Trace.WriteLine($"KafkaEnumerator for {_entityType.Name} - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries()}"); keyValueIterator = _keyValueStore?.All(); keyValueEnumerator = keyValueIterator?.ToIEnumerator(); } @@ -190,8 +206,13 @@ public ValueBuffer Current { var kv = keyValueEnumerator.Current; object? v = kv.value; - var data = _kafkaCluster.SerdesFactory.Deserialize(v as byte[]); - return new ValueBuffer(data); + KNetEntityTypeData entityTypeData = _valueSerdes.DeserializeWithHeaders(null, null, v as byte[]); + var data = new ValueBuffer(entityTypeData.GetData(_entityType)); + if (data.IsEmpty) + { + throw new InvalidOperationException("Data is Empty"); + } + return data; } throw new InvalidOperationException("InvalidEnumerator"); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs index 3776a884..62d2ee3a 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs @@ -18,19 +18,20 @@ #nullable enable +using MASES.KNet.Serialization; using Org.Apache.Kafka.Streams; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal { public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever { - public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType) - : this(kafkaCluster, entityType, new StreamsBuilder()) + public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes) + : this(kafkaCluster, entityType, keySerdes, valueSerdes, new StreamsBuilder()) { } - public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, StreamsBuilder builder) - : base(kafkaCluster, entityType, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options))) + public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, StreamsBuilder builder) + : base(kafkaCluster, entityType, keySerdes, valueSerdes, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options))) { } } diff --git a/src/net/KEFCore/Storage/Internal/KafkaTable.cs b/src/net/KEFCore/Storage/Internal/KafkaTable.cs index 5b8b5a12..993da531 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTable.cs @@ -24,14 +24,7 @@ using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; using Java.Util.Concurrent; using MASES.EntityFrameworkCore.KNet.Serdes.Internal; -using MASES.KNet.Producer; using Org.Apache.Kafka.Clients.Producer; -using Org.Apache.Kafka.Common.Header; -using Org.Apache.Kafka.Connect.Transforms; -using MASES.KNet; -using Microsoft.EntityFrameworkCore.Metadata.Internal; -using Org.Apache.Kafka.Common; -using MASES.KNet.Replicator; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -45,12 +38,10 @@ public class KafkaTable : IKafkaTable private readonly IList<(int, ValueComparer)>? _valueComparers; private Dictionary? _integerGenerators; - - private readonly IKNetCompactedReplicator _kafkaCompactedReplicator; - private readonly IProducer _kafkaProducer; + readonly IEntityTypeProducer _producer; private readonly string _tableAssociatedTopicName; private readonly IKafkaSerdesEntityType _serdes; - private readonly KafkaStreamsTableRetriever _streamData; + public KafkaTable( IKafkaCluster cluster, @@ -59,20 +50,9 @@ public KafkaTable( { Cluster = cluster; EntityType = entityType; - _tableAssociatedTopicName = entityType.TopicName(cluster.Options); - cluster.CreateTable(entityType); - _serdes = cluster.CreateSerdes(entityType); - if (cluster.Options.UseCompactedReplicator) - { - _kafkaCompactedReplicator = cluster.CreateCompactedReplicator(entityType); - _kafkaCompactedReplicator.Start(); - } - else - { - _kafkaProducer = cluster.CreateProducer(entityType); - _streamData = new KafkaStreamsTableRetriever(cluster, entityType); - } - + _tableAssociatedTopicName = Cluster.CreateTable(entityType); + _serdes = Cluster.SerdesFactory.GetOrCreate(entityType); + _producer = EntityTypeProducers.Create(entityType, Cluster); _keyValueFactory = entityType.FindPrimaryKey()!.GetPrincipalKeyValueFactory(); _sensitiveLoggingEnabled = sensitiveLoggingEnabled; _rows = new Dictionary(_keyValueFactory.EqualityComparer); @@ -99,15 +79,7 @@ public KafkaTable( public virtual void Dispose() { - if (Cluster.Options.UseCompactedReplicator) - { - _kafkaCompactedReplicator?.Dispose(); - } - else - { - _kafkaProducer?.Dispose(); - _streamData?.Dispose(); - } + _producer?.Dispose(); } public virtual IKafkaCluster Cluster { get; } @@ -138,17 +110,9 @@ public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator)generator; } - public virtual IEnumerable ValueBuffers => GetValueBuffer(); + public virtual IEnumerable ValueBuffers => _producer.GetValueBuffer(); - private IEnumerable GetValueBuffer() - { - if (_streamData != null) return _streamData; - _kafkaCompactedReplicator.SyncWait(); - return _kafkaCompactedReplicator.Values.Select((item) => new ValueBuffer(_serdes.Deserialize(item))); - } - - public virtual IEnumerable Rows - => RowsInTable(); + public virtual IEnumerable Rows => RowsInTable(); public virtual IReadOnlyList SnapshotRows() { @@ -184,22 +148,18 @@ public virtual IEnumerable Rows return rows; } - private IEnumerable RowsInTable() - { - return _rows.Values; - } + private IEnumerable RowsInTable() => _rows.Values; - private static System.Collections.Generic.List GetKeyComparers(IEnumerable properties) - => properties.Select(p => p.GetKeyValueComparer()).ToList(); + private static List GetKeyComparers(IEnumerable properties) => properties.Select(p => p.GetKeyValueComparer()).ToList(); public virtual IKafkaRowBag Create(IUpdateEntry entry) { - var properties = entry.EntityType.GetProperties().ToList(); - var row = new object?[properties.Count]; - var nullabilityErrors = new System.Collections.Generic.List(); + var properties = entry.EntityType.GetProperties().ToArray(); + var row = new object?[properties.Length]; + var nullabilityErrors = new List(); var key = CreateKey(entry); - for (var index = 0; index < properties.Count; index++) + for (var index = 0; index < properties.Length; index++) { var propertyValue = SnapshotValue(properties[index], properties[index].GetKeyValueComparer(), entry); @@ -216,7 +176,7 @@ public virtual IKafkaRowBag Create(IUpdateEntry entry) BumpValueGenerators(row); - return new KafkaRowBag(entry, key, row); + return new KafkaRowBag(entry, _tableAssociatedTopicName, key, properties, row); } public virtual IKafkaRowBag Delete(IUpdateEntry entry) @@ -225,10 +185,10 @@ public virtual IKafkaRowBag Delete(IUpdateEntry entry) if (_rows.TryGetValue(key, out var row)) { - var properties = entry.EntityType.GetProperties().ToList(); + var properties = entry.EntityType.GetProperties().ToArray(); var concurrencyConflicts = new Dictionary(); - for (var index = 0; index < properties.Count; index++) + for (var index = 0; index < properties.Length; index++) { IsConcurrencyConflict(entry, properties[index], row[index], concurrencyConflicts); } @@ -240,7 +200,7 @@ public virtual IKafkaRowBag Delete(IUpdateEntry entry) _rows.Remove(key); - return new KafkaRowBag(entry, key, null); + return new KafkaRowBag(entry, _tableAssociatedTopicName, key, properties, null); } else { @@ -285,11 +245,11 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry) if (_rows.TryGetValue(key, out var row)) { - var properties = entry.EntityType.GetProperties().ToList(); + var properties = entry.EntityType.GetProperties().ToArray(); var comparers = GetKeyComparers(properties); - var valueBuffer = new object?[properties.Count]; + var valueBuffer = new object?[properties.Length]; var concurrencyConflicts = new Dictionary(); - var nullabilityErrors = new System.Collections.Generic.List(); + var nullabilityErrors = new List(); for (var index = 0; index < valueBuffer.Length; index++) { @@ -322,7 +282,7 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry) BumpValueGenerators(valueBuffer); - return new KafkaRowBag(entry, key, valueBuffer); + return new KafkaRowBag(entry, _tableAssociatedTopicName, key, properties, valueBuffer); } else { @@ -330,33 +290,7 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry) } } - public virtual IEnumerable> Commit(IEnumerable records) - { - if (Cluster.Options.UseCompactedReplicator) - { - foreach (KafkaRowBag record in records) - { - var key = record.GetKey(_serdes); - var value = record.GetValue(_serdes); - _kafkaCompactedReplicator[key] = value; - } - - return null; - } - else - { - System.Collections.Generic.List> futures = new(); - foreach (KafkaRowBag record in records) - { - var future = _kafkaProducer.Send(record.GetRecord(_tableAssociatedTopicName, _serdes)); - futures.Add(future); - } - - _kafkaProducer.Flush(); - - return futures; - } - } + public virtual IEnumerable> Commit(IEnumerable records) => _producer.Commit(records); public virtual void BumpValueGenerators(object?[] row) { @@ -369,16 +303,7 @@ public virtual void BumpValueGenerators(object?[] row) } } - private ProducerRecord NewRecord(IUpdateEntry entry, TKey key, object?[]? row) - { - Headers headers = Headers.Create(); - var record = new ProducerRecord(_tableAssociatedTopicName, 0, new System.DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(), _serdes.Serialize(headers, key), _serdes.Serialize(headers, row), headers); - - return record; - } - - private TKey CreateKey(IUpdateEntry entry) - => _keyValueFactory.CreateFromCurrentValues(entry); + private TKey CreateKey(IUpdateEntry entry) => _keyValueFactory.CreateFromCurrentValues(entry); private static object? SnapshotValue(IProperty property, ValueComparer? comparer, IUpdateEntry entry) { diff --git a/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs b/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs index 3a25c639..10cd8d74 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs @@ -17,7 +17,6 @@ */ using System.Collections.Concurrent; -using JetBrains.Annotations; using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs index 8a783931..639ef169 100644 --- a/test/KEFCore.Test/Program.cs +++ b/test/KEFCore.Test/Program.cs @@ -25,6 +25,7 @@ using MASES.EntityFrameworkCore.KNet.Infrastructure; using MASES.KNet.Streams; using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Internal; using System; using System.Collections.Generic; using System.Diagnostics; @@ -89,7 +90,7 @@ static void Main(string[] args) var testWatcher = Stopwatch.StartNew(); Stopwatch watch = Stopwatch.StartNew(); - for (int i = 0; i < 1000; i++) + for (int i = 1; i <= 1000; i++) { context.Add(new Blog { @@ -229,7 +230,7 @@ public class Blog { public int BlogId { get; set; } public string Url { get; set; } - public long Rating { get; set; } + public int Rating { get; set; } public List Posts { get; set; } public override string ToString() diff --git a/test/KEFCore.Test/ProgramConfig.cs b/test/KEFCore.Test/ProgramConfig.cs index f17acf99..c8953176 100644 --- a/test/KEFCore.Test/ProgramConfig.cs +++ b/test/KEFCore.Test/ProgramConfig.cs @@ -30,6 +30,6 @@ partial class Program public static bool UseModelBuilder = false; public static bool UseCompactedReplicator = false; public static string DatabaseName = null; - static bool deleteApplication = false; + static bool deleteApplication = true; } }