diff --git a/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs b/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs index c8cb5a04..1de4991f 100644 --- a/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs @@ -23,7 +23,6 @@ using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; using MASES.EntityFrameworkCore.KNet.Metadata.Conventions; using MASES.EntityFrameworkCore.KNet.Query.Internal; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.EntityFrameworkCore.KNet.Storage.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; using System.ComponentModel; @@ -72,8 +71,7 @@ public static IServiceCollection AddEntityFrameworkKafkaDatabase(this IServiceCo .TryAddSingleton() .TryAddSingleton() .TryAddSingleton() - .TryAddScoped() - .TryAddScoped()); + .TryAddScoped()); builder.TryAddCoreServices(); diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs b/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs index 204fb846..876ce12d 100644 --- a/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs +++ b/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs @@ -32,7 +32,7 @@ public virtual IEnumerable GetValueBuffers(IEntityType entityType) { if (!_valueBuffersCache.TryGetValue(entityType, out var valueBuffers)) { - valueBuffers = Cluster.GetData(entityType); + valueBuffers = Cluster.GetValueBuffers(entityType); _valueBuffersCache[entityType] = valueBuffers; } @@ -40,9 +40,7 @@ public virtual IEnumerable GetValueBuffers(IEntityType entityType) return valueBuffers; } - public KafkaQueryContext( - QueryContextDependencies dependencies, - IKafkaCluster cluster) + public KafkaQueryContext(QueryContextDependencies dependencies, IKafkaCluster cluster) : base(dependencies) { Cluster = cluster; diff --git a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs b/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs deleted file mode 100644 index 21b0c78d..00000000 --- a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs +++ /dev/null @@ -1,43 +0,0 @@ -/* -* Copyright 2022 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -using Org.Apache.Kafka.Common.Header; - -namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal -{ - public interface IKafkaSerdesEntityType - { - string Serialize(params object?[]? args); - - string Serialize(Headers headers, params object?[]? args); - - string Serialize(TKey key); - - string Serialize(Headers headers, TKey key); - - object[] Deserialize(string arg); - - object[] Deserialize(Headers headers, string arg); - - TKey Deserialize(string arg); - - TKey Deserialize(Headers headers, string arg); - - object[] ConvertData(object[]? input); - } -} diff --git a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesFactory.cs b/src/net/KEFCore/Serdes/Internal/IKafkaSerdesFactory.cs deleted file mode 100644 index 80b45b2f..00000000 --- a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesFactory.cs +++ /dev/null @@ -1,31 +0,0 @@ -/* -* Copyright 2022 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal -{ - public interface IKafkaSerdesFactory - { - IKafkaSerdesEntityType GetOrCreate(IEntityType type); - - IKafkaSerdesEntityType Get(string typeName); - - object[] Deserialize(byte[] data); - - object[] Deserialize(string data); - } -} diff --git a/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs b/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs deleted file mode 100644 index 4a666b5c..00000000 --- a/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Copyright 2022 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -using Org.Apache.Kafka.Common.Header; -using System.Text.Json; -using System.Text.Json.Nodes; -using System.Text.Json.Serialization; - -namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal -{ - [JsonSerializable(typeof(KafkaSerdesEntityTypeData))] - public class KafkaSerdesEntityTypeData - { - public KafkaSerdesEntityTypeData() { } - - public KafkaSerdesEntityTypeData(string tName, object[] rData) - { - typeName = tName; - data = rData; - } - [JsonInclude()] - public string typeName; - [JsonInclude()] - public object[] data; - } - - public class KafkaSerdesEntityType : IKafkaSerdesEntityType - { - private readonly IEntityType _type; - private readonly IProperty[] _properties; - - public KafkaSerdesEntityType(IEntityType type) - { - _type = type; - _properties = _type.GetProperties().ToArray(); - } - - public object[] Deserialize(string arg) - { - var des = GetFullType(arg); - return ConvertData(des!.data); - } - - public object[] Deserialize(Headers headers, string arg) - { - var des = GetFullType(arg); - return ConvertData(des!.data); - } - - public TKey Deserialize(string arg) => System.Text.Json.JsonSerializer.Deserialize(arg)!; - - public TKey Deserialize(Headers headers, string arg) => System.Text.Json.JsonSerializer.Deserialize(arg)!; - - public string Serialize(params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!)); - - public string Serialize(Headers headers, params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!)); - - public string Serialize(TKey key) => System.Text.Json.JsonSerializer.Serialize(key); - - public string Serialize(Headers headers, TKey key) => System.Text.Json.JsonSerializer.Serialize(key); - - public static KafkaSerdesEntityTypeData? GetFullType(string arg) => System.Text.Json.JsonSerializer.Deserialize(arg); - - public object[] ConvertData(object[]? input) - { - if (input == null) return null; - List data = new List(); - - for (int i = 0; i < input!.Length; i++) - { - if (input[i] is JsonElement elem) - { - switch (elem.ValueKind) - { - case JsonValueKind.Undefined: - break; - case JsonValueKind.Object: - break; - case JsonValueKind.Array: - break; - case JsonValueKind.String: - data.Add(elem.GetString()); - break; - case JsonValueKind.Number: - var tmp = elem.GetInt64(); - data.Add(Convert.ChangeType(tmp, _properties[i].ClrType)); - break; - case JsonValueKind.True: - data.Add(true); - break; - case JsonValueKind.False: - data.Add(false); - break; - case JsonValueKind.Null: - data.Add(null); - break; - default: - break; - } - - } - else - { - data.Add(Convert.ChangeType(input[i], _properties[i].ClrType)); - } - } - return data.ToArray(); - } - } -} diff --git a/src/net/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs b/src/net/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs deleted file mode 100644 index 6ad829d9..00000000 --- a/src/net/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs +++ /dev/null @@ -1,54 +0,0 @@ -/* -* Copyright 2022 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -using System.Collections.Concurrent; -using System.Text; - -namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal; - -public class KafkaSerdesFactory : IKafkaSerdesFactory -{ - private readonly ConcurrentDictionary _serdes; - - public KafkaSerdesFactory( - ILoggingOptions loggingOptions) - { - _serdes = new ConcurrentDictionary(); - LoggingOptions = loggingOptions; - } - - public ILoggingOptions LoggingOptions { get; } - - public virtual IKafkaSerdesEntityType GetOrCreate(IEntityType type) - => _serdes.GetOrAdd(type.Name, _ => new KafkaSerdesEntityType(type)); - - public virtual IKafkaSerdesEntityType Get(string typeName) - => _serdes[typeName]; - - public virtual object[] Deserialize(byte[] data) - { - var str = Encoding.UTF8.GetString(data); - return Deserialize(str); - } - - public virtual object[] Deserialize(string data) - { - var fulltype = KafkaSerdesEntityType.GetFullType(data); - return Get(fulltype!.typeName!).ConvertData(fulltype.data); - } -} diff --git a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs index 985695e8..8760a06c 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs @@ -17,38 +17,25 @@ */ using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; -public interface IKafkaCluster :IDisposable +public interface IKafkaCluster : IDisposable { - bool EnsureDeleted( - IUpdateAdapterFactory updateAdapterFactory, - IModel designModel, - IDiagnosticsLogger updateLogger); + bool EnsureDeleted(IUpdateAdapterFactory updateAdapterFactory, IModel designModel, IDiagnosticsLogger updateLogger); - bool EnsureCreated( - IUpdateAdapterFactory updateAdapterFactory, - IModel designModel, - IDiagnosticsLogger updateLogger); + bool EnsureCreated(IUpdateAdapterFactory updateAdapterFactory, IModel designModel, IDiagnosticsLogger updateLogger); - bool EnsureConnected( - IModel designModel, - IDiagnosticsLogger updateLogger); + bool EnsureConnected(IModel designModel, IDiagnosticsLogger updateLogger); string CreateTable(IEntityType entityType); - IKafkaSerdesFactory SerdesFactory { get; } - - IEnumerable GetData(IEntityType entityType); + IEnumerable GetValueBuffers(IEntityType entityType); KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property); - int ExecuteTransaction( - IList entries, - IDiagnosticsLogger updateLogger); + int ExecuteTransaction(IList entries, IDiagnosticsLogger updateLogger); KafkaOptionsExtension Options { get; } } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs index ab72f9a0..9ea24303 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs @@ -18,8 +18,6 @@ #nullable enable -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; - namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; public interface IKafkaRowBag diff --git a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs index 4242794b..e3db233c 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs @@ -24,12 +24,10 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; -public interface IKafkaTable : IDisposable +public interface IKafkaTable : IEntityTypeProducer { IReadOnlyList SnapshotRows(); - IEnumerable ValueBuffers { get; } - IEnumerable Rows { get; } IKafkaRowBag Create(IUpdateEntry entry); @@ -38,11 +36,7 @@ public interface IKafkaTable : IDisposable IKafkaRowBag Update(IUpdateEntry entry); - IEnumerable> Commit(IEnumerable records); - - KafkaIntegerValueGenerator GetIntegerValueGenerator( - IProperty property, - IReadOnlyList tables); + KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property, IReadOnlyList tables); void BumpValueGenerators(object?[] row); diff --git a/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs b/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs index 89581ee3..eaf81524 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs @@ -18,26 +18,22 @@ using System.Collections.Concurrent; using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; public class KafkaClusterCache : IKafkaClusterCache { private readonly IKafkaTableFactory _tableFactory; - private readonly IKafkaSerdesFactory _serdesFactory; private readonly ConcurrentDictionary _namedClusters; public KafkaClusterCache( IKafkaTableFactory tableFactory, - IKafkaSerdesFactory serdesFactory, IKafkaSingletonOptions? options) { _tableFactory = tableFactory; - _serdesFactory = serdesFactory; _namedClusters = new ConcurrentDictionary(); } public virtual IKafkaCluster GetCluster(KafkaOptionsExtension options) - => _namedClusters.GetOrAdd(options.ClusterId, _ => new KafkaCluster(options, _tableFactory, _serdesFactory)); + => _namedClusters.GetOrAdd(options.ClusterId, _ => new KafkaCluster(options, _tableFactory)); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaTable.cs b/src/net/KEFCore/Storage/Internal/KafkaTable.cs index 993da531..6791aed1 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTable.cs @@ -23,7 +23,6 @@ using MASES.EntityFrameworkCore.KNet.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; using Java.Util.Concurrent; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using Org.Apache.Kafka.Clients.Producer; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -40,8 +39,6 @@ public class KafkaTable : IKafkaTable private Dictionary? _integerGenerators; readonly IEntityTypeProducer _producer; private readonly string _tableAssociatedTopicName; - private readonly IKafkaSerdesEntityType _serdes; - public KafkaTable( IKafkaCluster cluster, @@ -51,7 +48,6 @@ public KafkaTable( Cluster = cluster; EntityType = entityType; _tableAssociatedTopicName = Cluster.CreateTable(entityType); - _serdes = Cluster.SerdesFactory.GetOrCreate(entityType); _producer = EntityTypeProducers.Create(entityType, Cluster); _keyValueFactory = entityType.FindPrimaryKey()!.GetPrincipalKeyValueFactory(); _sensitiveLoggingEnabled = sensitiveLoggingEnabled; @@ -110,7 +106,9 @@ public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator)generator; } - public virtual IEnumerable ValueBuffers => _producer.GetValueBuffer(); + public virtual IEnumerable> Commit(IEnumerable records) => _producer.Commit(records); + + public virtual IEnumerable ValueBuffers => _producer.ValueBuffers; public virtual IEnumerable Rows => RowsInTable(); @@ -290,8 +288,6 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry) } } - public virtual IEnumerable> Commit(IEnumerable records) => _producer.Commit(records); - public virtual void BumpValueGenerators(object?[] row) { if (_integerGenerators != null)