From a7c59cc663e438f51baeb3b1a7611b7a23f02baa Mon Sep 17 00:00:00 2001
From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com>
Date: Wed, 27 Sep 2023 16:48:33 +0200
Subject: [PATCH 1/5] Fix documentation
---
README.md | 40 +++++++++++++++++++--------
src/documentation/articles/intro.md | 2 +-
src/documentation/articles/roadmap.md | 1 +
src/documentation/articles/usage.md | 4 +--
src/documentation/index.md | 4 +--
5 files changed, 34 insertions(+), 17 deletions(-)
diff --git a/README.md b/README.md
index b93d2697..bbe0ed0d 100644
--- a/README.md
+++ b/README.md
@@ -1,19 +1,42 @@
-# KEFCore: the EntityFrameworkCore provider for Apache Kafka
+# KEFCore: Entity Framework Core provider for Apache Kafka
-[![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml) [![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml)
+KEFCore is the Entity Framework Core provider for Apache Kafka.
+Based on [KNet](https://github.com/masesgroup/KNet) it allows to use Apache Kafka as a distributed database.
+
+### Libraries and Tools
[![latest version](https://img.shields.io/nuget/v/MASES.EntityFrameworkCore.KNet)](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet) [![downloads](https://img.shields.io/nuget/dt/MASES.EntityFrameworkCore.KNet)](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet)
-KEFCore is the EntityFrameworkCore provider for Apache Kafka.
-Based on [KNet](https://github.com/masesgroup/KNet) it allows to use Apache Kafka as a distributed database.
+### Pipelines
-This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com.
+[![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml)
+[![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml)
+
+---
## Scope of the project
This project aims to create a provider to access the information stored within an Apache Kafka cluster using the paradigm behind Entity Framework.
The project is based on available information within the official [EntityFrameworkCore repository](https://github.com/dotnet/efcore), many classes was copied from there as reported in the official documentation within the Microsoft website at https://docs.microsoft.com/en-us/ef/core/providers/writing-a-provider.
+### Community and Contribution
+
+Do you like the project?
+- Request your free [community subscription](https://www.jcobridge.com/pricing-25/).
+
+Do you want to help us?
+- put a :star: on this project
+- open [issues](https://github.com/masesgroup/KEFCore/issues) to request features or report bugs :bug:
+- improves the project with Pull Requests
+
+This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com.
+
+## Summary
+
+* [Roadmap](src/documentation/articles/roadmap.md)
+* [Actual state](src/documentation/articles/actualstate.md)
+* [KEFCore usage](src/documentation/articles/usage.md)
+
## Runtime engine
KEFCore uses [KNet](https://github.com/masesgroup/KNet), and indeed [JCOBridge](https://www.jcobridge.com) with its [features](https://www.jcobridge.com/features/), to obtain many benefits:
@@ -36,13 +59,6 @@ Have a look at the following JCOBridge resources:
- [Commercial Edition](https://www.jcobridge.com/pricing-25/)
- Latest release: [![JCOBridge nuget](https://img.shields.io/nuget/v/MASES.JCOBridge)](https://www.nuget.org/packages/MASES.JCOBridge)
----
-## Summary
-
-* [Roadmap](src/documentation/articles/roadmap.md)
-* [Actual state](src/documentation/articles/actualstate.md)
-* [KEFCore usage](src/documentation/articles/usage.md)
-
---
KAFKA is a registered trademark of The Apache Software Foundation. KEFCore has no affiliation with and is not endorsed by The Apache Software Foundation.
diff --git a/src/documentation/articles/intro.md b/src/documentation/articles/intro.md
index 941f20f2..bcb98275 100644
--- a/src/documentation/articles/intro.md
+++ b/src/documentation/articles/intro.md
@@ -1,6 +1,6 @@
# Welcome to KEFCore
-KEFCore is the EntityFrameworkCore provider for Apache Kafka.
+KEFCore is the Entity Framework Core provider for Apache Kafka.
Based on [KNet](https://github.com/masesgroup/KNet) it allows to use Apache Kafka as a distributed database.
This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com.
diff --git a/src/documentation/articles/roadmap.md b/src/documentation/articles/roadmap.md
index 20028ec0..c67c227e 100644
--- a/src/documentation/articles/roadmap.md
+++ b/src/documentation/articles/roadmap.md
@@ -4,3 +4,4 @@ The roadmap can be synthetized in the following points:
* Create a first working provider based on InMemory provider
* Extends the first provider with new features able to create Apache Kafka Streams topology to retrieve information
+* Use KNetCompactedReplicator beside Apache Kafka Streams
diff --git a/src/documentation/articles/usage.md b/src/documentation/articles/usage.md
index a352a11e..2009dfc5 100644
--- a/src/documentation/articles/usage.md
+++ b/src/documentation/articles/usage.md
@@ -4,7 +4,7 @@
### Installation
-EF Core for Apache Kafka is available on [NuGet](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet):
+Entity Framework Core provider for Apache Kafka is available on [NuGet](https://www.nuget.org/packages/MASES.EntityFrameworkCore.KNet):
```sh
dotnet add package MASES.EntityFrameworkCore.KNet
@@ -12,7 +12,7 @@ dotnet add package MASES.EntityFrameworkCore.KNet
### Basic usage
-The following code demonstrates basic usage of EF Core for Apache Kafka.
+The following code demonstrates basic usage of Entity Framework Core provider for Apache Kafka.
For a full tutorial configuring the `KafkaDbContext`, defining the model, and creating the database, see [KafkaDbContext](kafkadbcontext.md) in the docs.
```cs
diff --git a/src/documentation/index.md b/src/documentation/index.md
index c2de054f..08394643 100644
--- a/src/documentation/index.md
+++ b/src/documentation/index.md
@@ -1,8 +1,8 @@
-# KEFCore: the EntityFrameworkCore provider for Apache Kafka
+# KEFCore: Entity Framework Core provider for Apache Kafka
[![CI_BUILD](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/build.yaml) [![CI_RELEASE](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml/badge.svg)](https://github.com/masesgroup/KEFCore/actions/workflows/release.yaml)
-KEFCore is the EntityFrameworkCore provider for Apache Kafka.
+KEFCore is the Entity Framework Core provider for Apache Kafka.
Based on [KNet](https://github.com/masesgroup/KNet) it allows to use Apache Kafka as a distributed database.
This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to coc_reporting@masesgroup.com.
From 74ad884ba5cde0572b1848d3d5c60af3a9811f24 Mon Sep 17 00:00:00 2001
From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com>
Date: Sat, 30 Sep 2023 15:19:42 +0200
Subject: [PATCH 2/5] Temporary modification
---
.../Internal/KafkaOptionsExtension.cs | 22 +-
src/net/KEFCore/KEFCore.csproj | 1 +
.../Storage/Internal/EntityTypeProducer.cs | 368 ++++++++++++++++++
.../Storage/Internal/IEntityTypeProducer.cs | 31 ++
.../KEFCore/Storage/Internal/IKafkaCluster.cs | 11 +-
.../KEFCore/Storage/Internal/IKafkaRowBag.cs | 6 +-
.../KEFCore/Storage/Internal/KafkaCluster.cs | 96 ++---
.../KEFCore/Storage/Internal/KafkaRowBag.cs | 35 +-
.../Internal/KafkaStreamsBaseRetriever.cs | 35 +-
.../Internal/KafkaStreamsTableRetriever.cs | 9 +-
.../KEFCore/Storage/Internal/KafkaTable.cs | 142 +++----
test/KEFCore.Test/Program.cs | 5 +-
test/KEFCore.Test/ProgramConfig.cs | 2 +-
13 files changed, 563 insertions(+), 200 deletions(-)
create mode 100644 src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
create mode 100644 src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
index ca3ece5a..0580f82f 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
@@ -242,7 +242,7 @@ public virtual Properties StreamsOptions(string applicationId)
{
props.Remove(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG);
}
- props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$StringSerde", true, SystemClassLoader));
+ props.Put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.Serdes$ByteArraySerde", true, SystemClassLoader));
if (props.ContainsKey(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG))
{
props.Remove(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG);
@@ -277,16 +277,16 @@ public virtual Properties ProducerOptions()
{
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
}
- if (props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
- {
- props.Remove(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- }
- props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader));
- if (props.ContainsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
- {
- props.Remove(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- }
- props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader));
+ //if (props.ContainsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+ //{
+ // props.Remove(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ //}
+ //props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader));
+ //if (props.ContainsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
+ //{
+ // props.Remove(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ //}
+ //props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.ForName("org.apache.kafka.common.serialization.StringSerializer", true, SystemClassLoader));
return props;
}
diff --git a/src/net/KEFCore/KEFCore.csproj b/src/net/KEFCore/KEFCore.csproj
index ce0f6f01..ff1d0029 100644
--- a/src/net/KEFCore/KEFCore.csproj
+++ b/src/net/KEFCore/KEFCore.csproj
@@ -70,6 +70,7 @@
All
None
+
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
new file mode 100644
index 00000000..4a8a71e3
--- /dev/null
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
@@ -0,0 +1,368 @@
+/*
+* Copyright 2022 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+#nullable enable
+
+using Java.Util.Concurrent;
+using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
+using MASES.KNet.Producer;
+using MASES.KNet.Replicator;
+using MASES.KNet.Serialization;
+using System.Collections.Concurrent;
+using System.Text.Json.Serialization;
+using MASES.KNet.Serialization.Json;
+using Org.Apache.Kafka.Clients.Producer;
+using System.Text.Json;
+using Javax.Xml.Crypto;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+
+namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
+
+public class EntityTypeProducers
+{
+ static IEntityTypeProducer? _globalProducer = null;
+ static readonly ConcurrentDictionary _producers = new ConcurrentDictionary();
+
+ public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull
+ {
+ if (!cluster.Options.ProducerByEntity)
+ {
+ lock (_producers)
+ {
+ if (_globalProducer == null) _globalProducer = CreateProducerLocal(entityType, cluster);
+ return _globalProducer;
+ }
+ }
+ else
+ {
+ return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster));
+ }
+ }
+
+ static IEntityTypeProducer CreateProducerLocal(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull => new EntityTypeProducer(entityType, cluster);
+}
+
+//public class ListStringObjectTupleConverter : JsonConverterFactory
+//{
+// public override bool CanConvert(Type typeToConvert)
+// {
+// if (typeToConvert != typeof(List<(Type, object)>))
+// {
+// return false;
+// }
+
+// return true;
+// }
+
+// public override JsonConverter CreateConverter(Type type, JsonSerializerOptions options)
+// {
+// return new ListStringObjectTupleConverterInner();
+// }
+
+// private class ListStringObjectTupleConverterInner : JsonConverter>
+// {
+// public override List<(Type, object)> Read(
+// ref Utf8JsonReader reader,
+// Type typeToConvert,
+// JsonSerializerOptions options)
+// {
+// if (reader.TokenType != JsonTokenType.StartObject)
+// {
+// throw new JsonException();
+// }
+
+// var dictionary = new List<(Type, object)>();
+
+// while (reader.Read())
+// {
+// if (reader.TokenType == JsonTokenType.EndObject)
+// {
+// return dictionary;
+// }
+
+// // Get the key.
+// if (reader.TokenType != JsonTokenType.PropertyName)
+// {
+// throw new JsonException();
+// }
+
+// string? propertyName = reader.GetString();
+// var type = Type.GetType(propertyName!);
+// if (type == null)
+// {
+// throw new JsonException($"Unable to convert \"{propertyName}\" to known CLR Type.");
+// }
+
+// var valueConverter = (JsonConverter)options.GetConverter(type);
+
+// // Get the value.
+// reader.Read();
+// object value = valueConverter.Read(ref reader, type, options)!;
+
+// // Add to dictionary.
+// dictionary.Add((type, value));
+// }
+
+// throw new JsonException();
+// }
+
+// public override void Write(
+// Utf8JsonWriter writer,
+// List<(Type, object)> dictionary,
+// JsonSerializerOptions options)
+// {
+// writer.WriteStartObject();
+// foreach ((Type key, object value) in dictionary)
+// {
+// var propertyName = key.FullName.ToString();
+// writer.WritePropertyName(options.PropertyNamingPolicy?.ConvertName(propertyName) ?? propertyName);
+// JsonConverter valueConverter = options.GetConverter(key);
+// valueConverter.
+// valueConverter.Write(writer, value, options);
+// }
+
+// writer.WriteEndObject();
+// }
+// }
+//}
+
+[JsonSerializable(typeof(ObjectType))]
+public class ObjectType : IJsonOnDeserialized
+{
+ public ObjectType()
+ {
+
+ }
+
+ public ObjectType(IProperty typeName, object value)
+ {
+ TypeName = typeName.ClrType?.FullName;
+ Value = value;
+ }
+
+ public void OnDeserialized()
+ {
+ if (Value is JsonElement elem)
+ {
+ switch (elem.ValueKind)
+ {
+ case JsonValueKind.Undefined:
+ break;
+ case JsonValueKind.Object:
+ break;
+ case JsonValueKind.Array:
+ break;
+ case JsonValueKind.String:
+ Value = elem.GetString()!;
+ break;
+ case JsonValueKind.Number:
+ var tmp = elem.GetInt64();
+ Value = Convert.ChangeType(tmp, Type.GetType(TypeName!)!);
+ break;
+ case JsonValueKind.True:
+ Value = true;
+ break;
+ case JsonValueKind.False:
+ Value = false;
+ break;
+ case JsonValueKind.Null:
+ Value = null;
+ break;
+ default:
+ break;
+ }
+ }
+ else
+ {
+ Value = Convert.ChangeType(Value, Type.GetType(TypeName!)!);
+ }
+ }
+
+ public string? TypeName { get; set; }
+
+ public object Value { get; set; }
+}
+
+[JsonSerializable(typeof(KNetEntityTypeData<>))]
+public class KNetEntityTypeData
+{
+ public KNetEntityTypeData() { }
+
+ public KNetEntityTypeData(IEntityType tName, IProperty[] properties, object[] rData)
+ {
+ TypeName = tName.Name;
+ Data = new Dictionary();
+ for (int i = 0; i < properties.Length; i++)
+ {
+ Data.Add(properties[i].GetIndex(), new ObjectType(properties[i], rData[i]));
+ }
+ }
+
+ public string TypeName { get; set; }
+ // [JsonConverter(typeof(ListStringObjectTupleConverter))]
+ public Dictionary Data { get; set; }
+
+ public object[] GetData(IEntityType tName)
+ {
+ if (Data == null) return null;
+
+ var array = Data.Select((o) => o.Value.Value).ToArray();
+
+ return array;
+
+ var _properties = tName.GetProperties().ToArray();
+ List data = new List();
+
+ for (int i = 0; i < Data!.Count; i++)
+ {
+ if (Data[i].Value is JsonElement elem)
+ {
+ switch (elem.ValueKind)
+ {
+ case JsonValueKind.Undefined:
+ break;
+ case JsonValueKind.Object:
+ break;
+ case JsonValueKind.Array:
+ break;
+ case JsonValueKind.String:
+ data.Add(elem.GetString());
+ break;
+ case JsonValueKind.Number:
+ var tmp = elem.GetInt64();
+ data.Add(Convert.ChangeType(tmp, _properties[i].ClrType));
+ break;
+ case JsonValueKind.True:
+ data.Add(true);
+ break;
+ case JsonValueKind.False:
+ data.Add(false);
+ break;
+ case JsonValueKind.Null:
+ data.Add(null);
+ break;
+ default:
+ break;
+ }
+
+ }
+ else
+ {
+ data.Add(Convert.ChangeType(Data[i], _properties[i].ClrType));
+ }
+ }
+ return data.ToArray();
+ }
+}
+
+public class EntityTypeProducer : IEntityTypeProducer where TKey : notnull
+{
+ private readonly bool _useCompactedReplicator;
+ private readonly IKafkaCluster _cluster;
+ private readonly IEntityType _entityType;
+ private readonly IKNetCompactedReplicator>? _kafkaCompactedReplicator;
+ private readonly IKNetProducer>? _kafkaProducer;
+ private readonly IKafkaStreamsBaseRetriever _streamData;
+ private readonly KNetSerDes _keySerdes;
+ private readonly KNetSerDes> _valueSerdes;
+
+ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
+ {
+ _entityType = entityType;
+ _cluster = cluster;
+ _useCompactedReplicator = _cluster.Options.UseCompactedReplicator;
+
+ if (KNetSerialization.IsInternalManaged())
+ {
+ _keySerdes = new KNetSerDes();
+ }
+ else _keySerdes = new JsonSerDes();
+
+ _valueSerdes = new JsonSerDes>();
+
+ if (_useCompactedReplicator)
+ {
+ _kafkaCompactedReplicator = new KNetCompactedReplicator>()
+ {
+ UpdateMode = UpdateModeTypes.OnConsume,
+ BootstrapServers = _cluster.Options.BootstrapServers,
+ StateName = _entityType.TopicName(_cluster.Options),
+ Partitions = _entityType.NumPartitions(_cluster.Options),
+ ConsumerInstances = _entityType.ConsumerInstances(_cluster.Options),
+ ReplicationFactor = _entityType.ReplicationFactor(_cluster.Options),
+ TopicConfig = _cluster.Options.TopicConfigBuilder,
+ ProducerConfig = _cluster.Options.ProducerConfigBuilder,
+ KeySerDes = _keySerdes,
+ ValueSerDes = _valueSerdes,
+ };
+ }
+ else
+ {
+ _kafkaProducer = new KNetProducer>(_cluster.Options.ProducerOptions(), _keySerdes, _valueSerdes);
+ _streamData = new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes, _valueSerdes);
+ }
+ }
+
+ public IEnumerable> Commit(IEnumerable records)
+ {
+ if (_useCompactedReplicator)
+ {
+ foreach (KafkaRowBag record in records)
+ {
+ var value = record.Value;
+ if (_kafkaCompactedReplicator != null) _kafkaCompactedReplicator[record.Key] = value!;
+ }
+
+ return null;
+ }
+ else
+ {
+ System.Collections.Generic.List> futures = new();
+ foreach (KafkaRowBag record in records)
+ {
+ var future = _kafkaProducer?.Send(new KNetProducerRecord>(record.AssociatedTopicName, 0, record.Key, record.Value!));
+ futures.Add(future);
+ }
+
+ _kafkaProducer?.Flush();
+
+ return futures;
+ }
+ }
+
+ public void Dispose()
+ {
+ if (_useCompactedReplicator)
+ {
+ _kafkaCompactedReplicator?.Dispose();
+ }
+ else
+ {
+ _kafkaProducer?.Dispose();
+ _streamData?.Dispose();
+ }
+ }
+
+ public IEnumerable GetValueBuffer()
+ {
+ if (_streamData != null) return _streamData;
+ _kafkaCompactedReplicator?.SyncWait();
+ if (_kafkaCompactedReplicator == null) throw new InvalidOperationException("Missing _kafkaCompactedReplicator");
+ return _kafkaCompactedReplicator.Values.Select((item) => new ValueBuffer(item.GetData(_entityType)));
+ }
+}
diff --git a/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs
new file mode 100644
index 00000000..1a8eb72c
--- /dev/null
+++ b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs
@@ -0,0 +1,31 @@
+/*
+* Copyright 2022 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+#nullable enable
+
+using Java.Util.Concurrent;
+using Org.Apache.Kafka.Clients.Producer;
+
+namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
+
+public interface IEntityTypeProducer : IDisposable
+{
+ IEnumerable> Commit(IEnumerable records);
+
+ IEnumerable GetValueBuffer();
+}
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
index fe833f8c..1fe0b885 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
@@ -19,10 +19,7 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
-using MASES.KNet;
-using MASES.KNet.Producer;
using MASES.KNet.Replicator;
-using Org.Apache.Kafka.Clients.Producer;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
@@ -42,16 +39,10 @@ bool EnsureConnected(
IModel designModel,
IDiagnosticsLogger updateLogger);
- bool CreateTable(IEntityType entityType);
+ string CreateTable(IEntityType entityType);
IKafkaSerdesFactory SerdesFactory { get; }
- IKafkaSerdesEntityType CreateSerdes(IEntityType entityType);
-
- IKNetCompactedReplicator CreateCompactedReplicator(IEntityType entityType);
-
- IProducer CreateProducer(IEntityType entityType);
-
IEnumerable GetData(IEntityType entityType);
KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property);
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs
index 33e96f29..ab72f9a0 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs
@@ -18,10 +18,6 @@
#nullable enable
-using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
-using Java.Util.Concurrent;
-using Org.Apache.Kafka.Clients.Producer;
-using MASES.KNet.Producer;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
@@ -29,4 +25,6 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
public interface IKafkaRowBag
{
IUpdateEntry UpdateEntry { get; }
+
+ string AssociatedTopicName { get; }
}
diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
index 97486426..4f1f8e62 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
@@ -36,6 +36,7 @@
using Org.Apache.Kafka.Common;
using MASES.KNet.Replicator;
using Org.Apache.Kafka.Tools;
+using MASES.EntityFrameworkCore.KNet.Query.Internal;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
@@ -51,9 +52,6 @@ public class KafkaCluster : IKafkaCluster
private System.Collections.Generic.Dictionary? _tables;
- private IProducer? _globalProducer = null;
- private readonly ConcurrentDictionary> _producers;
-
public KafkaCluster(
KafkaOptionsExtension options,
IKafkaTableFactory tableFactory,
@@ -63,7 +61,6 @@ public KafkaCluster(
_tableFactory = tableFactory;
_serdesFactory = serdesFactory;
_useNameMatching = options.UseNameMatching;
- _producers = new();
Properties props = new();
props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _options.BootstrapServers);
_kafkaAdminClient = KafkaAdminClient.Create(props);
@@ -187,13 +184,14 @@ public virtual bool EnsureConnected(
return true;
}
- public virtual bool CreateTable(IEntityType entityType)
+ public virtual string CreateTable(IEntityType entityType)
{
+ var topicName = entityType.TopicName(Options);
try
{
try
{
- var topic = new NewTopic(entityType.TopicName(Options), entityType.NumPartitions(Options), entityType.ReplicationFactor(Options));
+ var topic = new NewTopic(topicName, entityType.NumPartitions(Options), entityType.ReplicationFactor(Options));
Options.TopicConfigBuilder.CleanupPolicy = MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Compact | MASES.KNet.Common.TopicConfigBuilder.CleanupPolicyTypes.Delete;
Options.TopicConfigBuilder.RetentionBytes = 1024 * 1024 * 1024;
var map = Options.TopicConfigBuilder.ToMap();
@@ -214,48 +212,56 @@ public virtual bool CreateTable(IEntityType entityType)
Thread.Sleep(1000); // wait a while to complete topic deletion
return CreateTable(entityType);
}
- return false;
- }
- return true;
- }
-
- public virtual IKafkaSerdesEntityType CreateSerdes(IEntityType entityType) => _serdesFactory.GetOrCreate(entityType);
-
- public virtual IKNetCompactedReplicator CreateCompactedReplicator(IEntityType entityType)
- {
- lock (_lock)
- {
- return new KNetCompactedReplicator()
- {
- UpdateMode = UpdateModeTypes.OnConsume,
- BootstrapServers = Options.BootstrapServers,
- StateName = entityType.TopicName(Options),
- Partitions = entityType.NumPartitions(Options),
- ConsumerInstances = entityType.ConsumerInstances(Options),
- ReplicationFactor = entityType.ReplicationFactor(Options),
- TopicConfig = Options.TopicConfigBuilder,
- ProducerConfig = Options.ProducerConfigBuilder,
- };
- }
- }
-
- public virtual IProducer CreateProducer(IEntityType entityType)
- {
- if (!Options.ProducerByEntity)
- {
- lock (_lock)
- {
- if (_globalProducer == null) _globalProducer = CreateProducer();
- return _globalProducer;
- }
- }
- else
- {
- return _producers.GetOrAdd(entityType, _ => CreateProducer());
}
+ return topicName;
}
- private IProducer CreateProducer() => new KafkaProducer(Options.ProducerOptions());
+ //public virtual IKafkaSerdesEntityType CreateSerdes(IEntityType entityType) =>
+
+ //public virtual IKNetCompactedReplicator CreateCompactedReplicator(IEntityType entityType)
+ //{
+ // lock (_lock)
+ // {
+ // return new KNetCompactedReplicator()
+ // {
+ // UpdateMode = UpdateModeTypes.OnConsume,
+ // BootstrapServers = Options.BootstrapServers,
+ // StateName = entityType.TopicName(Options),
+ // Partitions = entityType.NumPartitions(Options),
+ // ConsumerInstances = entityType.ConsumerInstances(Options),
+ // ReplicationFactor = entityType.ReplicationFactor(Options),
+ // TopicConfig = Options.TopicConfigBuilder,
+ // ProducerConfig = Options.ProducerConfigBuilder,
+ // };
+ // }
+ //}
+
+ //public virtual IEntityTypeProducer CreateProducer(IEntityType entityType) => EntityTypeProducer.Create(entityType, Options);
+ //{
+ // return
+
+
+ // //if (!Options.ProducerByEntity)
+ // //{
+ // // lock (_lock)
+ // // {
+ // // if (_globalProducer == null) _globalProducer = CreateProducerLocal(entityType);
+ // // return _globalProducer;
+ // // }
+ // //}
+ // //else
+ // //{
+ // // return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType));
+ // //}
+ //}
+
+ // private IEntityTypeProducer CreateProducerLocal(IEntityType entityType) => EntityTypeProducer.Create(entityType, Options);
+ //{
+ // var type = typeof(KafkaProducer<,>).MakeGenericType(typeof(string), typeof(string));
+ // var ctor = type.GetTypeInfo().DeclaredConstructors.Single(c => c.GetParameters().Length == 1 && c.GetParameters()[0].ParameterType == typeof(Properties));
+ // return ctor.Invoke(new object[] { Options.ProducerOptions() });
+ // new KafkaProducer(Options.ProducerOptions());
+ //}
private static System.Collections.Generic.Dictionary CreateTables() => new();
diff --git a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs
index 94e7595c..3782790a 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs
@@ -18,47 +18,28 @@
#nullable enable
-using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
-using Java.Util.Concurrent;
-using Org.Apache.Kafka.Clients.Producer;
-using MASES.KNet.Producer;
-using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
-using Org.Apache.Kafka.Common.Header;
-
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
public class KafkaRowBag : IKafkaRowBag
{
- public KafkaRowBag(IUpdateEntry entry, TKey key, object?[]? row)
+ public KafkaRowBag(IUpdateEntry entry, string topicName, TKey key, IProperty[] properties, object?[]? row)
{
UpdateEntry = entry;
+ AssociatedTopicName = topicName;
Key = key;
+ Properties = properties;
ValueBuffer = row;
}
public IUpdateEntry UpdateEntry { get; private set; }
- public TKey Key { get; private set; }
-
- public object?[]? ValueBuffer { get; private set; }
+ public string AssociatedTopicName { get; private set; }
- public string GetKey(IKafkaSerdesEntityType _serdes)
- {
- return _serdes.Serialize(Key);
- }
+ public TKey Key { get; private set; }
- public string GetValue(IKafkaSerdesEntityType _serdes)
- {
- return _serdes.Serialize(ValueBuffer);
- }
+ public IProperty[] Properties { get; private set; }
- public ProducerRecord GetRecord(string topicName, IKafkaSerdesEntityType _serdes)
- {
- Headers headers = Headers.Create();
- string key = _serdes.Serialize(headers, Key);
- string? value = UpdateEntry.EntityState == EntityState.Deleted ? null : _serdes.Serialize(headers, ValueBuffer);
- var record = new ProducerRecord(topicName, 0, DateTime.Now, key, value!, headers);
+ public KNetEntityTypeData? Value => UpdateEntry.EntityState == EntityState.Deleted ? null : new KNetEntityTypeData(UpdateEntry.EntityType, Properties, ValueBuffer!);
- return record;
- }
+ public object?[]? ValueBuffer { get; private set; }
}
diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
index 48fd3918..41fea0d3 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
@@ -19,20 +19,28 @@
#nullable enable
using MASES.JCOBridge.C2JBridge;
+using MASES.KNet.Serialization;
using Org.Apache.Kafka.Common.Utils;
using Org.Apache.Kafka.Streams;
using Org.Apache.Kafka.Streams.Errors;
using Org.Apache.Kafka.Streams.Kstream;
using Org.Apache.Kafka.Streams.State;
+using System.Collections;
using static Org.Apache.Kafka.Streams.Errors.StreamsUncaughtExceptionHandler;
using static Org.Apache.Kafka.Streams.KafkaStreams;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal
{
- public class KafkaStreamsBaseRetriever : IEnumerable, IDisposable
+ public interface IKafkaStreamsBaseRetriever : IEnumerable, IDisposable
+ {
+ }
+
+ public class KafkaStreamsBaseRetriever : IKafkaStreamsBaseRetriever
{
private readonly IKafkaCluster _kafkaCluster;
private readonly IEntityType _entityType;
+ private readonly IKNetSerDes _keySerdes;
+ private readonly IKNetSerDes> _valueSerdes;
private readonly StreamsBuilder _builder;
private readonly KStream _root;
@@ -50,10 +58,12 @@ public class KafkaStreamsBaseRetriever : IEnumerable, IDispos
private State actualState = State.NOT_RUNNING;
private ReadOnlyKeyValueStore? keyValueStore;
- public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, string storageId, StreamsBuilder builder, KStream root)
+ public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, string storageId, StreamsBuilder builder, KStream root)
{
_kafkaCluster = kafkaCluster;
_entityType = entityType;
+ _keySerdes = keySerdes;
+ _valueSerdes = valueSerdes;
_builder = builder;
_root = root;
_storageId = _kafkaCluster.Options.UsePersistentStorage ? storageId : Process.GetCurrentProcess().ProcessName + "-" + storageId;
@@ -141,7 +151,7 @@ public IEnumerator GetEnumerator()
{
if (resultException != null) throw resultException;
Trace.WriteLine("Requested KafkaEnumerator on " + DateTime.Now.ToString("HH:mm:ss.FFFFFFF"));
- return new KafkaEnumerator(_kafkaCluster, keyValueStore);
+ return new KafkaEnumerator(_kafkaCluster, _entityType, _keySerdes, _valueSerdes, keyValueStore);
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
@@ -167,15 +177,23 @@ public void Dispose()
class KafkaEnumerator : IEnumerator
{
private readonly IKafkaCluster _kafkaCluster;
+ private readonly IEntityType _entityType;
+ private readonly IKNetSerDes _keySerdes;
+ private readonly IKNetSerDes> _valueSerdes;
private readonly ReadOnlyKeyValueStore? _keyValueStore;
private KeyValueIterator? keyValueIterator = null;
private IEnumerator>? keyValueEnumerator = null;
- public KafkaEnumerator(IKafkaCluster kafkaCluster, ReadOnlyKeyValueStore? keyValueStore)
+ public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, ReadOnlyKeyValueStore? keyValueStore)
{
if (kafkaCluster == null) throw new ArgumentNullException(nameof(kafkaCluster));
+ if (keySerdes == null) throw new ArgumentNullException(nameof(keySerdes));
+ if (valueSerdes == null) throw new ArgumentNullException(nameof(valueSerdes));
if (keyValueStore == null) throw new ArgumentNullException(nameof(keyValueStore));
_kafkaCluster = kafkaCluster;
+ _entityType = entityType;
+ _keySerdes = keySerdes;
+ _valueSerdes = valueSerdes;
_keyValueStore = keyValueStore;
Trace.WriteLine($"KafkaEnumerator - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries()}");
keyValueIterator = _keyValueStore?.All();
@@ -190,8 +208,13 @@ public ValueBuffer Current
{
var kv = keyValueEnumerator.Current;
object? v = kv.value;
- var data = _kafkaCluster.SerdesFactory.Deserialize(v as byte[]);
- return new ValueBuffer(data);
+ KNetEntityTypeData entityTypeData = _valueSerdes.DeserializeWithHeaders(null, null, v as byte[]);
+ var data = new ValueBuffer(entityTypeData.GetData(_entityType));
+ if (data.IsEmpty)
+ {
+ throw new InvalidOperationException("Data is Empty");
+ }
+ return data;
}
throw new InvalidOperationException("InvalidEnumerator");
}
diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs
index 3776a884..62d2ee3a 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs
@@ -18,19 +18,20 @@
#nullable enable
+using MASES.KNet.Serialization;
using Org.Apache.Kafka.Streams;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal
{
public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever
{
- public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType)
- : this(kafkaCluster, entityType, new StreamsBuilder())
+ public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes)
+ : this(kafkaCluster, entityType, keySerdes, valueSerdes, new StreamsBuilder())
{
}
- public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, StreamsBuilder builder)
- : base(kafkaCluster, entityType, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options)))
+ public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, StreamsBuilder builder)
+ : base(kafkaCluster, entityType, keySerdes, valueSerdes, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options)))
{
}
}
diff --git a/src/net/KEFCore/Storage/Internal/KafkaTable.cs b/src/net/KEFCore/Storage/Internal/KafkaTable.cs
index 5b8b5a12..856a807e 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaTable.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaTable.cs
@@ -24,14 +24,8 @@
using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
using Java.Util.Concurrent;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
-using MASES.KNet.Producer;
using Org.Apache.Kafka.Clients.Producer;
using Org.Apache.Kafka.Common.Header;
-using Org.Apache.Kafka.Connect.Transforms;
-using MASES.KNet;
-using Microsoft.EntityFrameworkCore.Metadata.Internal;
-using Org.Apache.Kafka.Common;
-using MASES.KNet.Replicator;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
@@ -45,12 +39,10 @@ public class KafkaTable : IKafkaTable
private readonly IList<(int, ValueComparer)>? _valueComparers;
private Dictionary? _integerGenerators;
-
- private readonly IKNetCompactedReplicator _kafkaCompactedReplicator;
- private readonly IProducer _kafkaProducer;
+ readonly IEntityTypeProducer _producer;
private readonly string _tableAssociatedTopicName;
private readonly IKafkaSerdesEntityType _serdes;
- private readonly KafkaStreamsTableRetriever _streamData;
+
public KafkaTable(
IKafkaCluster cluster,
@@ -59,20 +51,9 @@ public KafkaTable(
{
Cluster = cluster;
EntityType = entityType;
- _tableAssociatedTopicName = entityType.TopicName(cluster.Options);
- cluster.CreateTable(entityType);
- _serdes = cluster.CreateSerdes(entityType);
- if (cluster.Options.UseCompactedReplicator)
- {
- _kafkaCompactedReplicator = cluster.CreateCompactedReplicator(entityType);
- _kafkaCompactedReplicator.Start();
- }
- else
- {
- _kafkaProducer = cluster.CreateProducer(entityType);
- _streamData = new KafkaStreamsTableRetriever(cluster, entityType);
- }
-
+ _tableAssociatedTopicName = Cluster.CreateTable(entityType);
+ _serdes = Cluster.SerdesFactory.GetOrCreate(entityType);
+ _producer = EntityTypeProducers.Create(entityType, Cluster);
_keyValueFactory = entityType.FindPrimaryKey()!.GetPrincipalKeyValueFactory();
_sensitiveLoggingEnabled = sensitiveLoggingEnabled;
_rows = new Dictionary(_keyValueFactory.EqualityComparer);
@@ -99,15 +80,7 @@ public KafkaTable(
public virtual void Dispose()
{
- if (Cluster.Options.UseCompactedReplicator)
- {
- _kafkaCompactedReplicator?.Dispose();
- }
- else
- {
- _kafkaProducer?.Dispose();
- _streamData?.Dispose();
- }
+ _producer?.Dispose();
}
public virtual IKafkaCluster Cluster { get; }
@@ -138,17 +111,9 @@ public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator)generator;
}
- public virtual IEnumerable ValueBuffers => GetValueBuffer();
-
- private IEnumerable GetValueBuffer()
- {
- if (_streamData != null) return _streamData;
- _kafkaCompactedReplicator.SyncWait();
- return _kafkaCompactedReplicator.Values.Select((item) => new ValueBuffer(_serdes.Deserialize(item)));
- }
+ public virtual IEnumerable ValueBuffers => _producer.GetValueBuffer();
- public virtual IEnumerable Rows
- => RowsInTable();
+ public virtual IEnumerable Rows => RowsInTable();
public virtual IReadOnlyList SnapshotRows()
{
@@ -184,22 +149,18 @@ public virtual IEnumerable Rows
return rows;
}
- private IEnumerable RowsInTable()
- {
- return _rows.Values;
- }
+ private IEnumerable RowsInTable() => _rows.Values;
- private static System.Collections.Generic.List GetKeyComparers(IEnumerable properties)
- => properties.Select(p => p.GetKeyValueComparer()).ToList();
+ private static List GetKeyComparers(IEnumerable properties) => properties.Select(p => p.GetKeyValueComparer()).ToList();
public virtual IKafkaRowBag Create(IUpdateEntry entry)
{
- var properties = entry.EntityType.GetProperties().ToList();
- var row = new object?[properties.Count];
- var nullabilityErrors = new System.Collections.Generic.List();
+ var properties = entry.EntityType.GetProperties().ToArray();
+ var row = new object?[properties.Length];
+ var nullabilityErrors = new List();
var key = CreateKey(entry);
- for (var index = 0; index < properties.Count; index++)
+ for (var index = 0; index < properties.Length; index++)
{
var propertyValue = SnapshotValue(properties[index], properties[index].GetKeyValueComparer(), entry);
@@ -216,7 +177,7 @@ public virtual IKafkaRowBag Create(IUpdateEntry entry)
BumpValueGenerators(row);
- return new KafkaRowBag(entry, key, row);
+ return new KafkaRowBag(entry, _tableAssociatedTopicName, key, properties, row);
}
public virtual IKafkaRowBag Delete(IUpdateEntry entry)
@@ -225,10 +186,10 @@ public virtual IKafkaRowBag Delete(IUpdateEntry entry)
if (_rows.TryGetValue(key, out var row))
{
- var properties = entry.EntityType.GetProperties().ToList();
+ var properties = entry.EntityType.GetProperties().ToArray();
var concurrencyConflicts = new Dictionary();
- for (var index = 0; index < properties.Count; index++)
+ for (var index = 0; index < properties.Length; index++)
{
IsConcurrencyConflict(entry, properties[index], row[index], concurrencyConflicts);
}
@@ -240,7 +201,7 @@ public virtual IKafkaRowBag Delete(IUpdateEntry entry)
_rows.Remove(key);
- return new KafkaRowBag(entry, key, null);
+ return new KafkaRowBag(entry, _tableAssociatedTopicName, key, properties, null);
}
else
{
@@ -285,11 +246,11 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry)
if (_rows.TryGetValue(key, out var row))
{
- var properties = entry.EntityType.GetProperties().ToList();
+ var properties = entry.EntityType.GetProperties().ToArray();
var comparers = GetKeyComparers(properties);
- var valueBuffer = new object?[properties.Count];
+ var valueBuffer = new object?[properties.Length];
var concurrencyConflicts = new Dictionary();
- var nullabilityErrors = new System.Collections.Generic.List();
+ var nullabilityErrors = new List();
for (var index = 0; index < valueBuffer.Length; index++)
{
@@ -322,7 +283,7 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry)
BumpValueGenerators(valueBuffer);
- return new KafkaRowBag(entry, key, valueBuffer);
+ return new KafkaRowBag(entry, _tableAssociatedTopicName, key, properties, valueBuffer);
}
else
{
@@ -330,33 +291,35 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry)
}
}
- public virtual IEnumerable> Commit(IEnumerable records)
- {
- if (Cluster.Options.UseCompactedReplicator)
- {
- foreach (KafkaRowBag record in records)
- {
- var key = record.GetKey(_serdes);
- var value = record.GetValue(_serdes);
- _kafkaCompactedReplicator[key] = value;
- }
-
- return null;
- }
- else
- {
- System.Collections.Generic.List> futures = new();
- foreach (KafkaRowBag record in records)
- {
- var future = _kafkaProducer.Send(record.GetRecord(_tableAssociatedTopicName, _serdes));
- futures.Add(future);
- }
-
- _kafkaProducer.Flush();
-
- return futures;
- }
- }
+ public virtual IEnumerable> Commit(IEnumerable records) => _producer.Commit(records);
+ //{
+ // return
+
+ // //if (Cluster.Options.UseCompactedReplicator)
+ // //{
+ // // foreach (KafkaRowBag record in records)
+ // // {
+ // // var key = record.GetKey(_serdes);
+ // // var value = record.GetValue(_serdes);
+ // // _kafkaCompactedReplicator[key] = value;
+ // // }
+
+ // // return null;
+ // //}
+ // //else
+ // //{
+ // // System.Collections.Generic.List> futures = new();
+ // // foreach (KafkaRowBag record in records)
+ // // {
+ // // var future = _kafkaProducer.Send(record.GetRecord(_tableAssociatedTopicName, _serdes));
+ // // futures.Add(future);
+ // // }
+
+ // // _kafkaProducer.Flush();
+
+ // // return futures;
+ // //}
+ //}
public virtual void BumpValueGenerators(object?[] row)
{
@@ -377,8 +340,7 @@ private ProducerRecord NewRecord(IUpdateEntry entry, TKey key, o
return record;
}
- private TKey CreateKey(IUpdateEntry entry)
- => _keyValueFactory.CreateFromCurrentValues(entry);
+ private TKey CreateKey(IUpdateEntry entry) => _keyValueFactory.CreateFromCurrentValues(entry);
private static object? SnapshotValue(IProperty property, ValueComparer? comparer, IUpdateEntry entry)
{
diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs
index 8a783931..639ef169 100644
--- a/test/KEFCore.Test/Program.cs
+++ b/test/KEFCore.Test/Program.cs
@@ -25,6 +25,7 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure;
using MASES.KNet.Streams;
using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Metadata.Internal;
using System;
using System.Collections.Generic;
using System.Diagnostics;
@@ -89,7 +90,7 @@ static void Main(string[] args)
var testWatcher = Stopwatch.StartNew();
Stopwatch watch = Stopwatch.StartNew();
- for (int i = 0; i < 1000; i++)
+ for (int i = 1; i <= 1000; i++)
{
context.Add(new Blog
{
@@ -229,7 +230,7 @@ public class Blog
{
public int BlogId { get; set; }
public string Url { get; set; }
- public long Rating { get; set; }
+ public int Rating { get; set; }
public List Posts { get; set; }
public override string ToString()
diff --git a/test/KEFCore.Test/ProgramConfig.cs b/test/KEFCore.Test/ProgramConfig.cs
index f17acf99..c8953176 100644
--- a/test/KEFCore.Test/ProgramConfig.cs
+++ b/test/KEFCore.Test/ProgramConfig.cs
@@ -30,6 +30,6 @@ partial class Program
public static bool UseModelBuilder = false;
public static bool UseCompactedReplicator = false;
public static string DatabaseName = null;
- static bool deleteApplication = false;
+ static bool deleteApplication = true;
}
}
From 4048b06a735faf19a5d56cf4ef9df0b4970f84c4 Mon Sep 17 00:00:00 2001
From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com>
Date: Sat, 30 Sep 2023 23:14:57 +0200
Subject: [PATCH 3/5] #59: removed ProducerByEntity because the new
EntityTypeProducer must be used with each Entity
---
.../Internal/IKafkaSingletonOptions.cs | 2 +-
.../Internal/KafkaOptionsExtension.cs | 18 ++++-----
.../Internal/KafkaSingletonOptions.cs | 2 +-
.../KEFCore/Infrastructure/KafkaDbContext.cs | 10 ++---
.../KafkaDbContextOptionsBuilder.cs | 40 +++++++++----------
.../Storage/Internal/EntityTypeProducer.cs | 22 +++++-----
.../Internal/KafkaStreamsBaseRetriever.cs | 2 +-
7 files changed, 48 insertions(+), 48 deletions(-)
diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
index 33b56157..d10ac5b8 100644
--- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs
@@ -34,7 +34,7 @@ public interface IKafkaSingletonOptions : ISingletonOptions
string? BootstrapServers { get; }
- bool ProducerByEntity { get; }
+ //bool ProducerByEntity { get; }
bool UseCompactedReplicator { get; }
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
index 0580f82f..5d0d9e31 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs
@@ -36,7 +36,7 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension
private string? _databaseName;
private string? _applicationId;
private string? _bootstrapServers;
- private bool _producerByEntity = false;
+ //private bool _producerByEntity = false;
private bool _useCompactedReplicator = false;
private bool _usePersistentStorage = false;
private int _defaultNumPartitions = 1;
@@ -60,7 +60,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
_databaseName = copyFrom._databaseName;
_applicationId = copyFrom._applicationId;
_bootstrapServers = copyFrom._bootstrapServers;
- _producerByEntity = copyFrom._producerByEntity;
+ //_producerByEntity = copyFrom._producerByEntity;
_useCompactedReplicator = copyFrom._useCompactedReplicator;
_usePersistentStorage = copyFrom._usePersistentStorage;
_defaultNumPartitions = copyFrom._defaultNumPartitions;
@@ -85,7 +85,7 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom)
public virtual string BootstrapServers => _bootstrapServers!;
- public virtual bool ProducerByEntity => _producerByEntity;
+ //public virtual bool ProducerByEntity => _producerByEntity;
public virtual bool UseCompactedReplicator => _useCompactedReplicator;
@@ -139,14 +139,14 @@ public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServer
return clone;
}
- public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity = false)
- {
- var clone = Clone();
+ //public virtual KafkaOptionsExtension WithProducerByEntity(bool producerByEntity = false)
+ //{
+ // var clone = Clone();
- clone._producerByEntity = producerByEntity;
+ // clone._producerByEntity = producerByEntity;
- return clone;
- }
+ // return clone;
+ //}
public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = false)
{
diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
index ced2c8b6..ffae2fd6 100644
--- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
+++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs
@@ -34,7 +34,7 @@ public virtual void Initialize(IDbContextOptions options)
DatabaseName = kafkaOptions.DatabaseName;
ApplicationId = kafkaOptions.ApplicationId;
BootstrapServers = kafkaOptions.BootstrapServers;
- ProducerByEntity = kafkaOptions.ProducerByEntity;
+ //ProducerByEntity = kafkaOptions.ProducerByEntity;
UseCompactedReplicator = kafkaOptions.UseCompactedReplicator;
UsePersistentStorage = kafkaOptions.UsePersistentStorage;
DefaultNumPartitions = kafkaOptions.DefaultNumPartitions;
diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
index 7972863d..cdfd20d9 100644
--- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
+++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs
@@ -70,10 +70,10 @@ public KafkaDbContext(DbContextOptions options) : base(options)
/// Use persistent storage
///
public virtual bool UsePersistentStorage { get; set; } = false;
- ///
- /// Use a producer for each Entity
- ///
- public bool UseProducerByEntity { get; set; } = false;
+ /////
+ ///// Use a producer for each Entity
+ /////
+ //public bool UseProducerByEntity { get; set; } = false;
///
/// Use instead of Apache Kafka Streams
///
@@ -104,7 +104,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
o.StreamsConfig(StreamsConfigBuilder ?? o.EmptyStreamsConfigBuilder).WithDefaultNumPartitions(DefaultNumPartitions);
o.WithUsePersistentStorage(UsePersistentStorage);
- o.WithProducerByEntity(UseProducerByEntity);
+ //o.WithProducerByEntity(UseProducerByEntity);
o.WithCompactedReplicator(UseCompactedReplicator);
o.WithDefaultReplicationFactor(DefaultReplicationFactor);
});
diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
index 7271b5a3..0b43f008 100644
--- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
+++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs
@@ -93,26 +93,26 @@ public virtual KafkaDbContextOptionsBuilder WithUseNameMatching(bool useNameMatc
return this;
}
- ///
- /// Enables creation of producer for each
- ///
- ///
- /// See Using DbContextOptions , and
- /// The EF Core Kafka database provider for more information and examples.
- ///
- /// If , then each entity will have its own .
- /// The same builder instance so that multiple calls can be chained.
- public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerByEntity = false)
- {
- var extension = OptionsBuilder.Options.FindExtension()
- ?? new KafkaOptionsExtension();
-
- extension = extension.WithProducerByEntity(producerByEntity);
-
- ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);
-
- return this;
- }
+ /////
+ ///// Enables creation of producer for each
+ /////
+ /////
+ ///// See Using DbContextOptions , and
+ ///// The EF Core Kafka database provider for more information and examples.
+ /////
+ ///// If , then each entity will have its own .
+ ///// The same builder instance so that multiple calls can be chained.
+ //public virtual KafkaDbContextOptionsBuilder WithProducerByEntity(bool producerByEntity = false)
+ //{
+ // var extension = OptionsBuilder.Options.FindExtension()
+ // ?? new KafkaOptionsExtension();
+
+ // extension = extension.WithProducerByEntity(producerByEntity);
+
+ // ((IDbContextOptionsBuilderInfrastructure)OptionsBuilder).AddOrUpdateExtension(extension);
+
+ // return this;
+ //}
///
/// Enables use of
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
index 4a8a71e3..eb54f420 100644
--- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
@@ -40,18 +40,18 @@ public class EntityTypeProducers
public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull
{
- if (!cluster.Options.ProducerByEntity)
- {
- lock (_producers)
- {
- if (_globalProducer == null) _globalProducer = CreateProducerLocal(entityType, cluster);
- return _globalProducer;
- }
- }
- else
- {
+ //if (!cluster.Options.ProducerByEntity)
+ //{
+ // lock (_producers)
+ // {
+ // if (_globalProducer == null) _globalProducer = CreateProducerLocal(entityType, cluster);
+ // return _globalProducer;
+ // }
+ //}
+ //else
+ //{
return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster));
- }
+ //}
}
static IEntityTypeProducer CreateProducerLocal(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull => new EntityTypeProducer(entityType, cluster);
diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
index 41fea0d3..749d609c 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
@@ -195,7 +195,7 @@ public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, IKNet
_keySerdes = keySerdes;
_valueSerdes = valueSerdes;
_keyValueStore = keyValueStore;
- Trace.WriteLine($"KafkaEnumerator - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries()}");
+ Trace.WriteLine($"KafkaEnumerator for {_entityType.Name} - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries()}");
keyValueIterator = _keyValueStore?.All();
keyValueEnumerator = keyValueIterator?.ToIEnumerator();
}
From 244c021aeca5ae4023dc2b63a64b65e743567149 Mon Sep 17 00:00:00 2001
From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com>
Date: Sun, 1 Oct 2023 14:39:26 +0200
Subject: [PATCH 4/5] Code cleanup
---
.../Storage/Internal/EntityTypeProducer.cs | 88 +------------------
.../KEFCore/Storage/Internal/IKafkaCluster.cs | 1 -
.../KEFCore/Storage/Internal/IKafkaTable.cs | 1 -
.../KEFCore/Storage/Internal/KafkaCluster.cs | 57 ------------
.../Internal/KafkaStreamsBaseRetriever.cs | 2 -
.../KEFCore/Storage/Internal/KafkaTable.cs | 37 --------
.../Storage/Internal/KafkaTableFactory.cs | 1 -
7 files changed, 2 insertions(+), 185 deletions(-)
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
index eb54f420..2b8a84bc 100644
--- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
@@ -57,90 +57,6 @@ public static IEntityTypeProducer Create(IEntityType entityType, IKafkaClu
static IEntityTypeProducer CreateProducerLocal(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull => new EntityTypeProducer(entityType, cluster);
}
-//public class ListStringObjectTupleConverter : JsonConverterFactory
-//{
-// public override bool CanConvert(Type typeToConvert)
-// {
-// if (typeToConvert != typeof(List<(Type, object)>))
-// {
-// return false;
-// }
-
-// return true;
-// }
-
-// public override JsonConverter CreateConverter(Type type, JsonSerializerOptions options)
-// {
-// return new ListStringObjectTupleConverterInner();
-// }
-
-// private class ListStringObjectTupleConverterInner : JsonConverter>
-// {
-// public override List<(Type, object)> Read(
-// ref Utf8JsonReader reader,
-// Type typeToConvert,
-// JsonSerializerOptions options)
-// {
-// if (reader.TokenType != JsonTokenType.StartObject)
-// {
-// throw new JsonException();
-// }
-
-// var dictionary = new List<(Type, object)>();
-
-// while (reader.Read())
-// {
-// if (reader.TokenType == JsonTokenType.EndObject)
-// {
-// return dictionary;
-// }
-
-// // Get the key.
-// if (reader.TokenType != JsonTokenType.PropertyName)
-// {
-// throw new JsonException();
-// }
-
-// string? propertyName = reader.GetString();
-// var type = Type.GetType(propertyName!);
-// if (type == null)
-// {
-// throw new JsonException($"Unable to convert \"{propertyName}\" to known CLR Type.");
-// }
-
-// var valueConverter = (JsonConverter)options.GetConverter(type);
-
-// // Get the value.
-// reader.Read();
-// object value = valueConverter.Read(ref reader, type, options)!;
-
-// // Add to dictionary.
-// dictionary.Add((type, value));
-// }
-
-// throw new JsonException();
-// }
-
-// public override void Write(
-// Utf8JsonWriter writer,
-// List<(Type, object)> dictionary,
-// JsonSerializerOptions options)
-// {
-// writer.WriteStartObject();
-// foreach ((Type key, object value) in dictionary)
-// {
-// var propertyName = key.FullName.ToString();
-// writer.WritePropertyName(options.PropertyNamingPolicy?.ConvertName(propertyName) ?? propertyName);
-// JsonConverter valueConverter = options.GetConverter(key);
-// valueConverter.
-// valueConverter.Write(writer, value, options);
-// }
-
-// writer.WriteEndObject();
-// }
-// }
-//}
-
[JsonSerializable(typeof(ObjectType))]
public class ObjectType : IJsonOnDeserialized
{
@@ -318,7 +234,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
}
}
- public IEnumerable> Commit(IEnumerable records)
+ public IEnumerable> Commit(IEnumerable records)
{
if (_useCompactedReplicator)
{
@@ -332,7 +248,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
}
else
{
- System.Collections.Generic.List> futures = new();
+ List> futures = new();
foreach (KafkaRowBag record in records)
{
var future = _kafkaProducer?.Send(new KNetProducerRecord>(record.AssociatedTopicName, 0, record.Key, record.Value!));
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
index 1fe0b885..985695e8 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
@@ -19,7 +19,6 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
-using MASES.KNet.Replicator;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
diff --git a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs
index e584c7bc..4242794b 100644
--- a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs
+++ b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs
@@ -21,7 +21,6 @@
using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
using Java.Util.Concurrent;
using Org.Apache.Kafka.Clients.Producer;
-using MASES.KNet.Producer;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
index 4f1f8e62..c6d6f97e 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
@@ -23,20 +23,10 @@
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using Java.Util;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
-using System.Collections.Concurrent;
using Java.Util.Concurrent;
-using MASES.KNet.Producer;
using Org.Apache.Kafka.Clients.Admin;
-using Org.Apache.Kafka.Common.Config;
-using Org.Apache.Kafka.Clients.Producer;
using Org.Apache.Kafka.Common.Errors;
-using MASES.KNet.Serialization;
-using MASES.KNet.Extensions;
-using MASES.KNet;
-using Org.Apache.Kafka.Common;
-using MASES.KNet.Replicator;
using Org.Apache.Kafka.Tools;
-using MASES.EntityFrameworkCore.KNet.Query.Internal;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
@@ -216,53 +206,6 @@ public virtual string CreateTable(IEntityType entityType)
return topicName;
}
- //public virtual IKafkaSerdesEntityType CreateSerdes(IEntityType entityType) =>
-
- //public virtual IKNetCompactedReplicator CreateCompactedReplicator(IEntityType entityType)
- //{
- // lock (_lock)
- // {
- // return new KNetCompactedReplicator()
- // {
- // UpdateMode = UpdateModeTypes.OnConsume,
- // BootstrapServers = Options.BootstrapServers,
- // StateName = entityType.TopicName(Options),
- // Partitions = entityType.NumPartitions(Options),
- // ConsumerInstances = entityType.ConsumerInstances(Options),
- // ReplicationFactor = entityType.ReplicationFactor(Options),
- // TopicConfig = Options.TopicConfigBuilder,
- // ProducerConfig = Options.ProducerConfigBuilder,
- // };
- // }
- //}
-
- //public virtual IEntityTypeProducer CreateProducer(IEntityType entityType) => EntityTypeProducer.Create(entityType, Options);
- //{
- // return
-
-
- // //if (!Options.ProducerByEntity)
- // //{
- // // lock (_lock)
- // // {
- // // if (_globalProducer == null) _globalProducer = CreateProducerLocal(entityType);
- // // return _globalProducer;
- // // }
- // //}
- // //else
- // //{
- // // return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType));
- // //}
- //}
-
- // private IEntityTypeProducer CreateProducerLocal(IEntityType entityType) => EntityTypeProducer.Create(entityType, Options);
- //{
- // var type = typeof(KafkaProducer<,>).MakeGenericType(typeof(string), typeof(string));
- // var ctor = type.GetTypeInfo().DeclaredConstructors.Single(c => c.GetParameters().Length == 1 && c.GetParameters()[0].ParameterType == typeof(Properties));
- // return ctor.Invoke(new object[] { Options.ProducerOptions() });
- // new KafkaProducer(Options.ProducerOptions());
- //}
-
private static System.Collections.Generic.Dictionary CreateTables() => new();
public virtual IEnumerable GetData(IEntityType entityType)
diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
index 749d609c..51504bdb 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
@@ -18,14 +18,12 @@
#nullable enable
-using MASES.JCOBridge.C2JBridge;
using MASES.KNet.Serialization;
using Org.Apache.Kafka.Common.Utils;
using Org.Apache.Kafka.Streams;
using Org.Apache.Kafka.Streams.Errors;
using Org.Apache.Kafka.Streams.Kstream;
using Org.Apache.Kafka.Streams.State;
-using System.Collections;
using static Org.Apache.Kafka.Streams.Errors.StreamsUncaughtExceptionHandler;
using static Org.Apache.Kafka.Streams.KafkaStreams;
diff --git a/src/net/KEFCore/Storage/Internal/KafkaTable.cs b/src/net/KEFCore/Storage/Internal/KafkaTable.cs
index 856a807e..993da531 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaTable.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaTable.cs
@@ -25,7 +25,6 @@
using Java.Util.Concurrent;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
using Org.Apache.Kafka.Clients.Producer;
-using Org.Apache.Kafka.Common.Header;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
@@ -292,34 +291,6 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry)
}
public virtual IEnumerable> Commit(IEnumerable records) => _producer.Commit(records);
- //{
- // return
-
- // //if (Cluster.Options.UseCompactedReplicator)
- // //{
- // // foreach (KafkaRowBag record in records)
- // // {
- // // var key = record.GetKey(_serdes);
- // // var value = record.GetValue(_serdes);
- // // _kafkaCompactedReplicator[key] = value;
- // // }
-
- // // return null;
- // //}
- // //else
- // //{
- // // System.Collections.Generic.List> futures = new();
- // // foreach (KafkaRowBag record in records)
- // // {
- // // var future = _kafkaProducer.Send(record.GetRecord(_tableAssociatedTopicName, _serdes));
- // // futures.Add(future);
- // // }
-
- // // _kafkaProducer.Flush();
-
- // // return futures;
- // //}
- //}
public virtual void BumpValueGenerators(object?[] row)
{
@@ -332,14 +303,6 @@ public virtual void BumpValueGenerators(object?[] row)
}
}
- private ProducerRecord NewRecord(IUpdateEntry entry, TKey key, object?[]? row)
- {
- Headers headers = Headers.Create();
- var record = new ProducerRecord(_tableAssociatedTopicName, 0, new System.DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(), _serdes.Serialize(headers, key), _serdes.Serialize(headers, row), headers);
-
- return record;
- }
-
private TKey CreateKey(IUpdateEntry entry) => _keyValueFactory.CreateFromCurrentValues(entry);
private static object? SnapshotValue(IProperty property, ValueComparer? comparer, IUpdateEntry entry)
diff --git a/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs b/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs
index 3a25c639..10cd8d74 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs
@@ -17,7 +17,6 @@
*/
using System.Collections.Concurrent;
-using JetBrains.Annotations;
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
From 3b681c92c75c3d30b68a70b6e93023f3564fc454 Mon Sep 17 00:00:00 2001
From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com>
Date: Sun, 1 Oct 2023 14:46:13 +0200
Subject: [PATCH 5/5] Version upgrade
---
src/net/Common/Common.props | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/net/Common/Common.props b/src/net/Common/Common.props
index 927d080a..f061bd4a 100644
--- a/src/net/Common/Common.props
+++ b/src/net/Common/Common.props
@@ -4,7 +4,7 @@
MASES s.r.l.
MASES s.r.l.
MASES s.r.l.
- 0.7.2.0
+ 0.8.0.0
net6.0
latest
true