From fe5710627e19f25f3872a4591ab3617682c8f1be Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 4 Oct 2023 16:22:58 +0200 Subject: [PATCH 1/2] Code update based on https://github.com/masesgroup/KEFCore/issues/65#issuecomment-1746962933 --- .../Storage/Internal/EntityTypeProducer.cs | 111 ++++++----- .../Storage/Internal/IEntityTypeProducer.cs | 2 +- .../KEFCore/Storage/Internal/KafkaCluster.cs | 34 ++-- .../Internal/KafkaStreamsBaseRetriever.cs | 177 +++++++++++------- .../Internal/KafkaStreamsTableRetriever.cs | 4 +- 5 files changed, 186 insertions(+), 142 deletions(-) diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs index 2b8a84bc..9f9fac84 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs @@ -16,10 +16,11 @@ * Refer to LICENSE for more information. */ +// #define DEBUG_PERFORMANCE + #nullable enable using Java.Util.Concurrent; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.KNet.Producer; using MASES.KNet.Replicator; using MASES.KNet.Serialization; @@ -28,8 +29,6 @@ using MASES.KNet.Serialization.Json; using Org.Apache.Kafka.Clients.Producer; using System.Text.Json; -using Javax.Xml.Crypto; -using Microsoft.EntityFrameworkCore.Storage.ValueConversion; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -50,7 +49,7 @@ public static IEntityTypeProducer Create(IEntityType entityType, IKafkaClu //} //else //{ - return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster)); + return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster)); //} } @@ -114,8 +113,13 @@ public void OnDeserialized() public object Value { get; set; } } +public interface IEntityTypeData +{ + void GetData(IEntityType tName, ref object[] array); +} + [JsonSerializable(typeof(KNetEntityTypeData<>))] -public class KNetEntityTypeData +public class KNetEntityTypeData : IEntityTypeData { public KNetEntityTypeData() { } @@ -130,59 +134,41 @@ public KNetEntityTypeData(IEntityType tName, IProperty[] properties, object[] rD } public string TypeName { get; set; } - // [JsonConverter(typeof(ListStringObjectTupleConverter))] + public Dictionary Data { get; set; } - public object[] GetData(IEntityType tName) + public void GetData(IEntityType tName, ref object[] array) { - if (Data == null) return null; - - var array = Data.Select((o) => o.Value.Value).ToArray(); - - return array; - - var _properties = tName.GetProperties().ToArray(); - List data = new List(); - - for (int i = 0; i < Data!.Count; i++) +#if DEBUG_PERFORMANCE + Stopwatch fullSw = new Stopwatch(); + Stopwatch newSw = new Stopwatch(); + Stopwatch iterationSw = new Stopwatch(); + try { - if (Data[i].Value is JsonElement elem) - { - switch (elem.ValueKind) - { - case JsonValueKind.Undefined: - break; - case JsonValueKind.Object: - break; - case JsonValueKind.Array: - break; - case JsonValueKind.String: - data.Add(elem.GetString()); - break; - case JsonValueKind.Number: - var tmp = elem.GetInt64(); - data.Add(Convert.ChangeType(tmp, _properties[i].ClrType)); - break; - case JsonValueKind.True: - data.Add(true); - break; - case JsonValueKind.False: - data.Add(false); - break; - case JsonValueKind.Null: - data.Add(null); - break; - default: - break; - } - - } - else - { - data.Add(Convert.ChangeType(Data[i], _properties[i].ClrType)); - } + fullSw.Start(); +#endif + if (Data == null) { return; } +#if DEBUG_PERFORMANCE + newSw.Start(); +#endif + array = new object[Data.Count]; +#if DEBUG_PERFORMANCE + newSw.Stop(); + iterationSw.Start(); +#endif + for (int i = 0; i < Data.Count; i++) + { + array[i] = Data[i].Value; + } +#if DEBUG_PERFORMANCE + iterationSw.Stop(); + fullSw.Stop(); } - return data.ToArray(); + finally + { + Trace.WriteLine($"Time to GetData with length {Data.Count}: {fullSw.Elapsed} - new array took: {newSw.Elapsed} - Iteration took: {iterationSw.Elapsed}"); + } +#endif } } @@ -274,11 +260,20 @@ public void Dispose() } } - public IEnumerable GetValueBuffer() + public IEnumerable ValueBuffers { - if (_streamData != null) return _streamData; - _kafkaCompactedReplicator?.SyncWait(); - if (_kafkaCompactedReplicator == null) throw new InvalidOperationException("Missing _kafkaCompactedReplicator"); - return _kafkaCompactedReplicator.Values.Select((item) => new ValueBuffer(item.GetData(_entityType))); + get + { + if (_streamData != null) return _streamData; + _kafkaCompactedReplicator?.SyncWait(); + if (_kafkaCompactedReplicator == null) throw new InvalidOperationException("Missing _kafkaCompactedReplicator"); + return _kafkaCompactedReplicator.Values.Select((item) => + { + object[] array = null; + item.GetData(_entityType, ref array); + return new ValueBuffer(array); + } + ); + } } } diff --git a/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs index 1a8eb72c..c2aca09e 100644 --- a/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs @@ -27,5 +27,5 @@ public interface IEntityTypeProducer : IDisposable { IEnumerable> Commit(IEnumerable records); - IEnumerable GetValueBuffer(); + IEnumerable ValueBuffers { get; } } diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs index c6d6f97e..997943cf 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs @@ -16,13 +16,14 @@ * Refer to LICENSE for more information. */ +// #define DEBUG_PERFORMANCE + #nullable enable using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; using MASES.EntityFrameworkCore.KNet.Diagnostics.Internal; using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; using Java.Util; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using Java.Util.Concurrent; using Org.Apache.Kafka.Clients.Admin; using Org.Apache.Kafka.Common.Errors; @@ -34,7 +35,6 @@ public class KafkaCluster : IKafkaCluster { private readonly KafkaOptionsExtension _options; private readonly IKafkaTableFactory _tableFactory; - private readonly IKafkaSerdesFactory _serdesFactory; private readonly bool _useNameMatching; private readonly IAdmin _kafkaAdminClient; @@ -42,14 +42,10 @@ public class KafkaCluster : IKafkaCluster private System.Collections.Generic.Dictionary? _tables; - public KafkaCluster( - KafkaOptionsExtension options, - IKafkaTableFactory tableFactory, - IKafkaSerdesFactory serdesFactory) + public KafkaCluster(KafkaOptionsExtension options, IKafkaTableFactory tableFactory) { _options = options; _tableFactory = tableFactory; - _serdesFactory = serdesFactory; _useNameMatching = options.UseNameMatching; Properties props = new(); props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _options.BootstrapServers); @@ -68,8 +64,6 @@ public virtual void Dispose() } } - public virtual IKafkaSerdesFactory SerdesFactory => _serdesFactory; - public virtual KafkaOptionsExtension Options => _options; public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator( @@ -208,27 +202,31 @@ public virtual string CreateTable(IEntityType entityType) private static System.Collections.Generic.Dictionary CreateTables() => new(); - public virtual IEnumerable GetData(IEntityType entityType) + public virtual IEnumerable GetValueBuffers(IEntityType entityType) { - Stopwatch watcher = new(); lock (_lock) { +#if DEBUG_PERFORMANCE + Stopwatch watcher = new(); try { watcher.Start(); - EnsureTable(entityType); - var key = _useNameMatching ? (object)entityType.Name : entityType; - if (_tables != null && _tables.TryGetValue(key, out var table)) - { - return table.ValueBuffers; - } - throw new InvalidOperationException("No table available"); +#endif + EnsureTable(entityType); + var key = _useNameMatching ? (object)entityType.Name : entityType; + if (_tables != null && _tables.TryGetValue(key, out var table)) + { + return table.ValueBuffers; } + throw new InvalidOperationException("No table available"); +#if DEBUG_PERFORMANCE + } finally { watcher.Stop(); Trace.WriteLine("GetData - Execution time was " + watcher.ElapsedMilliseconds + " ms"); } +#endif } } diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs index 51504bdb..e4c90c94 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs @@ -16,6 +16,8 @@ * Refer to LICENSE for more information. */ +// #define DEBUG_PERFORMANCE + #nullable enable using MASES.KNet.Serialization; @@ -25,7 +27,6 @@ using Org.Apache.Kafka.Streams.Kstream; using Org.Apache.Kafka.Streams.State; using static Org.Apache.Kafka.Streams.Errors.StreamsUncaughtExceptionHandler; -using static Org.Apache.Kafka.Streams.KafkaStreams; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal { @@ -33,30 +34,31 @@ public interface IKafkaStreamsBaseRetriever : IEnumerable, IDisposa { } - public class KafkaStreamsBaseRetriever : IKafkaStreamsBaseRetriever + public class KafkaStreamsBaseRetriever : IKafkaStreamsBaseRetriever + where TValue : IEntityTypeData { private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; - private readonly IKNetSerDes _keySerdes; - private readonly IKNetSerDes> _valueSerdes; + private readonly IKNetSerDes _keySerdes; + private readonly IKNetSerDes _valueSerdes; private readonly StreamsBuilder _builder; private readonly KStream _root; - private readonly AutoResetEvent dataReceived = new(false); - private readonly AutoResetEvent resetEvent = new(false); - private readonly AutoResetEvent stateChanged = new(false); - private readonly AutoResetEvent exceptionSet = new(false); + private readonly AutoResetEvent _dataReceived = new(false); + private readonly AutoResetEvent _resetEvent = new(false); + private readonly AutoResetEvent _stateChanged = new(false); + private readonly AutoResetEvent _exceptionSet = new(false); - private KafkaStreams? streams = null; - private StreamsUncaughtExceptionHandler? errorHandler; - private StateListener? stateListener; + private KafkaStreams? _streams = null; + private StreamsUncaughtExceptionHandler? _errorHandler; + private KafkaStreams.StateListener? _stateListener; private readonly string _storageId; - private Exception? resultException = null; - private State actualState = State.NOT_RUNNING; + private Exception? _resultException = null; + private KafkaStreams.State _currentState = KafkaStreams.State.NOT_RUNNING; private ReadOnlyKeyValueStore? keyValueStore; - public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, string storageId, StreamsBuilder builder, KStream root) + public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes valueSerdes, string storageId, StreamsBuilder builder, KStream root) { _kafkaCluster = kafkaCluster; _entityType = entityType; @@ -75,30 +77,32 @@ private void StartTopology(StreamsBuilder builder, KStream root) var materialized = Materialized>.As(storeSupplier); root.ToTable(materialized); - streams = new(builder.Build(), _kafkaCluster.Options.StreamsOptions(_entityType)); + _streams = new(builder.Build(), _kafkaCluster.Options.StreamsOptions(_entityType)); - errorHandler = new() + _errorHandler = new() { OnHandle = (exception) => { - resultException = exception; - exceptionSet.Set(); + _resultException = exception; + _exceptionSet.Set(); return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; } }; - stateListener = new() + _stateListener = new() { OnOnChange = (newState, oldState) => { - actualState = newState; - Trace.WriteLine("StateListener oldState: " + oldState + " newState: " + newState + " on " + DateTime.Now.ToString("HH:mm:ss.FFFFFFF")); - if (stateChanged != null && !stateChanged.SafeWaitHandle.IsClosed) stateChanged.Set(); + _currentState = newState; +#if DEBUG_PERFORMANCE + Trace.WriteLine($"StateListener oldState: {oldState} newState: {newState} on {DateTime.Now:HH:mm:ss.FFFFFFF}"); +#endif + if (_stateChanged != null && !_stateChanged.SafeWaitHandle.IsClosed) _stateChanged.Set(); } }; - streams.SetUncaughtExceptionHandler(errorHandler); - streams.SetStateListener(stateListener); + _streams.SetUncaughtExceptionHandler(_errorHandler); + _streams.SetStateListener(_stateListener); ThreadPool.QueueUserWorkItem((o) => { @@ -106,18 +110,20 @@ private void StartTopology(StreamsBuilder builder, KStream root) Stopwatch watcher = new(); try { - resetEvent.Set(); - var index = WaitHandle.WaitAny(new WaitHandle[] { stateChanged, exceptionSet }); + _resetEvent.Set(); + var index = WaitHandle.WaitAny(new WaitHandle[] { _stateChanged, _exceptionSet }); if (index == 1) return; while (true) { - index = WaitHandle.WaitAny(new WaitHandle[] { stateChanged, dataReceived, exceptionSet }, waitingTime); + index = WaitHandle.WaitAny(new WaitHandle[] { _stateChanged, _dataReceived, _exceptionSet }, waitingTime); if (index == 2) return; - if (actualState.Equals(State.CREATED) || actualState.Equals(State.REBALANCING)) + if (_currentState.Equals(KafkaStreams.State.CREATED) || _currentState.Equals(KafkaStreams.State.REBALANCING)) { if (index == WaitHandle.WaitTimeout) { - Trace.WriteLine("State: " + actualState + " No handle set within " + waitingTime + " ms"); +#if DEBUG_PERFORMANCE + Trace.WriteLine($"State: {_currentState} No handle set within {waitingTime} ms"); +#endif continue; } } @@ -129,26 +135,30 @@ private void StartTopology(StreamsBuilder builder, KStream root) } catch (Exception e) { - resultException = e; + _resultException = e; } finally { - resetEvent.Set(); + _resetEvent.Set(); } }); - resetEvent.WaitOne(); - streams.Start(); - Trace.WriteLine("Started on " + DateTime.Now.ToString("HH:mm:ss.FFFFFFF")); - resetEvent.WaitOne(); // wait running state - if (resultException != null) throw resultException; + _resetEvent.WaitOne(); + _streams.Start(); +#if DEBUG_PERFORMANCE + Trace.WriteLine($"KafkaStreamsBaseRetriever Started on {DateTime.Now:HH:mm:ss.FFFFFFF}"); +#endif + _resetEvent.WaitOne(); // wait running state + if (_resultException != null) throw _resultException; - keyValueStore ??= streams?.Store(StoreQueryParameters>.FromNameAndType(_storageId, QueryableStoreTypes.KeyValueStore())); + keyValueStore ??= _streams?.Store(StoreQueryParameters>.FromNameAndType(_storageId, QueryableStoreTypes.KeyValueStore())); } public IEnumerator GetEnumerator() { - if (resultException != null) throw resultException; - Trace.WriteLine("Requested KafkaEnumerator on " + DateTime.Now.ToString("HH:mm:ss.FFFFFFF")); + if (_resultException != null) throw _resultException; +#if DEBUG_PERFORMANCE + Trace.WriteLine($"Requested KafkaEnumerator for {_entityType.Name} on {DateTime.Now:HH:mm:ss.FFFFFFF}"); +#endif return new KafkaEnumerator(_kafkaCluster, _entityType, _keySerdes, _valueSerdes, keyValueStore); } @@ -159,41 +169,47 @@ System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() public void Dispose() { - streams?.Close(); - dataReceived?.Dispose(); - resetEvent?.Dispose(); - exceptionSet?.Dispose(); - errorHandler?.Dispose(); - stateListener?.Dispose(); - stateChanged?.Dispose(); - - streams = null; - errorHandler = null; - stateListener = null; + _streams?.Close(); + _dataReceived?.Dispose(); + _resetEvent?.Dispose(); + _exceptionSet?.Dispose(); + _errorHandler?.Dispose(); + _stateListener?.Dispose(); + _stateChanged?.Dispose(); + + _streams = null; + _errorHandler = null; + _stateListener = null; } class KafkaEnumerator : IEnumerator { private readonly IKafkaCluster _kafkaCluster; private readonly IEntityType _entityType; - private readonly IKNetSerDes _keySerdes; - private readonly IKNetSerDes> _valueSerdes; + private readonly IKNetSerDes _keySerdes; + private readonly IKNetSerDes _valueSerdes; private readonly ReadOnlyKeyValueStore? _keyValueStore; private KeyValueIterator? keyValueIterator = null; private IEnumerator>? keyValueEnumerator = null; - public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, ReadOnlyKeyValueStore? keyValueStore) +#if DEBUG_PERFORMANCE + Stopwatch _moveNextSw = new Stopwatch(); + Stopwatch _currentSw = new Stopwatch(); + Stopwatch _valueSerdesSw = new Stopwatch(); + Stopwatch _valueBufferSw = new Stopwatch(); +#endif + + public KafkaEnumerator(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes valueSerdes, ReadOnlyKeyValueStore? keyValueStore) { - if (kafkaCluster == null) throw new ArgumentNullException(nameof(kafkaCluster)); - if (keySerdes == null) throw new ArgumentNullException(nameof(keySerdes)); - if (valueSerdes == null) throw new ArgumentNullException(nameof(valueSerdes)); if (keyValueStore == null) throw new ArgumentNullException(nameof(keyValueStore)); - _kafkaCluster = kafkaCluster; + _kafkaCluster = kafkaCluster ?? throw new ArgumentNullException(nameof(kafkaCluster)); _entityType = entityType; - _keySerdes = keySerdes; - _valueSerdes = valueSerdes; + _keySerdes = keySerdes ?? throw new ArgumentNullException(nameof(keySerdes)); + _valueSerdes = valueSerdes ?? throw new ArgumentNullException(nameof(valueSerdes)); _keyValueStore = keyValueStore; +#if DEBUG_PERFORMANCE Trace.WriteLine($"KafkaEnumerator for {_entityType.Name} - ApproximateNumEntries {_keyValueStore?.ApproximateNumEntries()}"); +#endif keyValueIterator = _keyValueStore?.All(); keyValueEnumerator = keyValueIterator?.ToIEnumerator(); } @@ -202,12 +218,29 @@ public ValueBuffer Current { get { +#if DEBUG_PERFORMANCE + try + { + _currentSw.Start(); +#endif if (keyValueEnumerator != null) { var kv = keyValueEnumerator.Current; object? v = kv.value; - KNetEntityTypeData entityTypeData = _valueSerdes.DeserializeWithHeaders(null, null, v as byte[]); - var data = new ValueBuffer(entityTypeData.GetData(_entityType)); +#if DEBUG_PERFORMANCE + _valueSerdesSw.Start(); +#endif + TValue entityTypeData = _valueSerdes.DeserializeWithHeaders(null, null, v as byte[]); +#if DEBUG_PERFORMANCE + _valueSerdesSw.Stop(); + _valueBufferSw.Start(); +#endif + object[] array = null; + entityTypeData.GetData(_entityType, ref array); +#if DEBUG_PERFORMANCE + _valueBufferSw.Stop(); +#endif + var data = new ValueBuffer(array); if (data.IsEmpty) { throw new InvalidOperationException("Data is Empty"); @@ -215,6 +248,13 @@ public ValueBuffer Current return data; } throw new InvalidOperationException("InvalidEnumerator"); +#if DEBUG_PERFORMANCE + } + finally + { + _currentSw.Stop(); + } +#endif } } @@ -222,13 +262,24 @@ public ValueBuffer Current public void Dispose() { +#if DEBUG_PERFORMANCE + Trace.WriteLine($"KafkaEnumerator _moveNextSw: {_moveNextSw.Elapsed} _currentSw: {_currentSw.Elapsed} _valueSerdesSw: {_valueSerdesSw.Elapsed} _valueBufferSw: {_valueBufferSw.Elapsed}"); +#endif keyValueIterator?.Dispose(); } public bool MoveNext() { - var res = (keyValueEnumerator != null) ? keyValueEnumerator.MoveNext() : false; - return res; +#if DEBUG_PERFORMANCE + try + { + _moveNextSw.Start(); +#endif + return (keyValueEnumerator != null) && keyValueEnumerator.MoveNext(); +#if DEBUG_PERFORMANCE + } + finally { _moveNextSw.Stop(); } +#endif } public void Reset() diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs index 62d2ee3a..e41473b2 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs @@ -23,7 +23,7 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal { - public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever + public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever, byte[], byte[]> { public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes) : this(kafkaCluster, entityType, keySerdes, valueSerdes, new StreamsBuilder()) @@ -31,7 +31,7 @@ public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entity } public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, StreamsBuilder builder) - : base(kafkaCluster, entityType, keySerdes, valueSerdes, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options))) + : base(kafkaCluster, entityType, keySerdes, valueSerdes, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options))) { } } From 73f17195131bdc59a9b4f6a0821025385f11829b Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 4 Oct 2023 16:23:35 +0200 Subject: [PATCH 2/2] Removed IKafkaSerdesFactory and other code clean-up --- .../KafkaServiceCollectionExtensions.cs | 4 +- .../Query/Internal/KafkaQueryContext.cs | 6 +- .../Serdes/Internal/IKafkaSerdesEntityType.cs | 43 ------ .../Serdes/Internal/IKafkaSerdesFactory.cs | 31 ----- .../Serdes/Internal/KafkaSerdesEntityType.cs | 125 ------------------ .../Serdes/Internal/KafkaSerdesFactory.cs | 54 -------- .../KEFCore/Storage/Internal/IKafkaCluster.cs | 25 +--- .../KEFCore/Storage/Internal/IKafkaRowBag.cs | 2 - .../KEFCore/Storage/Internal/IKafkaTable.cs | 10 +- .../Storage/Internal/KafkaClusterCache.cs | 6 +- .../KEFCore/Storage/Internal/KafkaTable.cs | 10 +- 11 files changed, 15 insertions(+), 301 deletions(-) delete mode 100644 src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs delete mode 100644 src/net/KEFCore/Serdes/Internal/IKafkaSerdesFactory.cs delete mode 100644 src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs delete mode 100644 src/net/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs diff --git a/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs b/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs index c8cb5a04..1de4991f 100644 --- a/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs @@ -23,7 +23,6 @@ using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; using MASES.EntityFrameworkCore.KNet.Metadata.Conventions; using MASES.EntityFrameworkCore.KNet.Query.Internal; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.EntityFrameworkCore.KNet.Storage.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; using System.ComponentModel; @@ -72,8 +71,7 @@ public static IServiceCollection AddEntityFrameworkKafkaDatabase(this IServiceCo .TryAddSingleton() .TryAddSingleton() .TryAddSingleton() - .TryAddScoped() - .TryAddScoped()); + .TryAddScoped()); builder.TryAddCoreServices(); diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs b/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs index 204fb846..876ce12d 100644 --- a/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs +++ b/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs @@ -32,7 +32,7 @@ public virtual IEnumerable GetValueBuffers(IEntityType entityType) { if (!_valueBuffersCache.TryGetValue(entityType, out var valueBuffers)) { - valueBuffers = Cluster.GetData(entityType); + valueBuffers = Cluster.GetValueBuffers(entityType); _valueBuffersCache[entityType] = valueBuffers; } @@ -40,9 +40,7 @@ public virtual IEnumerable GetValueBuffers(IEntityType entityType) return valueBuffers; } - public KafkaQueryContext( - QueryContextDependencies dependencies, - IKafkaCluster cluster) + public KafkaQueryContext(QueryContextDependencies dependencies, IKafkaCluster cluster) : base(dependencies) { Cluster = cluster; diff --git a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs b/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs deleted file mode 100644 index 21b0c78d..00000000 --- a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesEntityType.cs +++ /dev/null @@ -1,43 +0,0 @@ -/* -* Copyright 2022 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -using Org.Apache.Kafka.Common.Header; - -namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal -{ - public interface IKafkaSerdesEntityType - { - string Serialize(params object?[]? args); - - string Serialize(Headers headers, params object?[]? args); - - string Serialize(TKey key); - - string Serialize(Headers headers, TKey key); - - object[] Deserialize(string arg); - - object[] Deserialize(Headers headers, string arg); - - TKey Deserialize(string arg); - - TKey Deserialize(Headers headers, string arg); - - object[] ConvertData(object[]? input); - } -} diff --git a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesFactory.cs b/src/net/KEFCore/Serdes/Internal/IKafkaSerdesFactory.cs deleted file mode 100644 index 80b45b2f..00000000 --- a/src/net/KEFCore/Serdes/Internal/IKafkaSerdesFactory.cs +++ /dev/null @@ -1,31 +0,0 @@ -/* -* Copyright 2022 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal -{ - public interface IKafkaSerdesFactory - { - IKafkaSerdesEntityType GetOrCreate(IEntityType type); - - IKafkaSerdesEntityType Get(string typeName); - - object[] Deserialize(byte[] data); - - object[] Deserialize(string data); - } -} diff --git a/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs b/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs deleted file mode 100644 index 4a666b5c..00000000 --- a/src/net/KEFCore/Serdes/Internal/KafkaSerdesEntityType.cs +++ /dev/null @@ -1,125 +0,0 @@ -/* -* Copyright 2022 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -using Org.Apache.Kafka.Common.Header; -using System.Text.Json; -using System.Text.Json.Nodes; -using System.Text.Json.Serialization; - -namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal -{ - [JsonSerializable(typeof(KafkaSerdesEntityTypeData))] - public class KafkaSerdesEntityTypeData - { - public KafkaSerdesEntityTypeData() { } - - public KafkaSerdesEntityTypeData(string tName, object[] rData) - { - typeName = tName; - data = rData; - } - [JsonInclude()] - public string typeName; - [JsonInclude()] - public object[] data; - } - - public class KafkaSerdesEntityType : IKafkaSerdesEntityType - { - private readonly IEntityType _type; - private readonly IProperty[] _properties; - - public KafkaSerdesEntityType(IEntityType type) - { - _type = type; - _properties = _type.GetProperties().ToArray(); - } - - public object[] Deserialize(string arg) - { - var des = GetFullType(arg); - return ConvertData(des!.data); - } - - public object[] Deserialize(Headers headers, string arg) - { - var des = GetFullType(arg); - return ConvertData(des!.data); - } - - public TKey Deserialize(string arg) => System.Text.Json.JsonSerializer.Deserialize(arg)!; - - public TKey Deserialize(Headers headers, string arg) => System.Text.Json.JsonSerializer.Deserialize(arg)!; - - public string Serialize(params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!)); - - public string Serialize(Headers headers, params object?[]? args) => System.Text.Json.JsonSerializer.Serialize(new KafkaSerdesEntityTypeData(_type.Name, args!)); - - public string Serialize(TKey key) => System.Text.Json.JsonSerializer.Serialize(key); - - public string Serialize(Headers headers, TKey key) => System.Text.Json.JsonSerializer.Serialize(key); - - public static KafkaSerdesEntityTypeData? GetFullType(string arg) => System.Text.Json.JsonSerializer.Deserialize(arg); - - public object[] ConvertData(object[]? input) - { - if (input == null) return null; - List data = new List(); - - for (int i = 0; i < input!.Length; i++) - { - if (input[i] is JsonElement elem) - { - switch (elem.ValueKind) - { - case JsonValueKind.Undefined: - break; - case JsonValueKind.Object: - break; - case JsonValueKind.Array: - break; - case JsonValueKind.String: - data.Add(elem.GetString()); - break; - case JsonValueKind.Number: - var tmp = elem.GetInt64(); - data.Add(Convert.ChangeType(tmp, _properties[i].ClrType)); - break; - case JsonValueKind.True: - data.Add(true); - break; - case JsonValueKind.False: - data.Add(false); - break; - case JsonValueKind.Null: - data.Add(null); - break; - default: - break; - } - - } - else - { - data.Add(Convert.ChangeType(input[i], _properties[i].ClrType)); - } - } - return data.ToArray(); - } - } -} diff --git a/src/net/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs b/src/net/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs deleted file mode 100644 index 6ad829d9..00000000 --- a/src/net/KEFCore/Serdes/Internal/KafkaSerdesFactory.cs +++ /dev/null @@ -1,54 +0,0 @@ -/* -* Copyright 2022 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -using System.Collections.Concurrent; -using System.Text; - -namespace MASES.EntityFrameworkCore.KNet.Serdes.Internal; - -public class KafkaSerdesFactory : IKafkaSerdesFactory -{ - private readonly ConcurrentDictionary _serdes; - - public KafkaSerdesFactory( - ILoggingOptions loggingOptions) - { - _serdes = new ConcurrentDictionary(); - LoggingOptions = loggingOptions; - } - - public ILoggingOptions LoggingOptions { get; } - - public virtual IKafkaSerdesEntityType GetOrCreate(IEntityType type) - => _serdes.GetOrAdd(type.Name, _ => new KafkaSerdesEntityType(type)); - - public virtual IKafkaSerdesEntityType Get(string typeName) - => _serdes[typeName]; - - public virtual object[] Deserialize(byte[] data) - { - var str = Encoding.UTF8.GetString(data); - return Deserialize(str); - } - - public virtual object[] Deserialize(string data) - { - var fulltype = KafkaSerdesEntityType.GetFullType(data); - return Get(fulltype!.typeName!).ConvertData(fulltype.data); - } -} diff --git a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs index 985695e8..8760a06c 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs @@ -17,38 +17,25 @@ */ using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; -public interface IKafkaCluster :IDisposable +public interface IKafkaCluster : IDisposable { - bool EnsureDeleted( - IUpdateAdapterFactory updateAdapterFactory, - IModel designModel, - IDiagnosticsLogger updateLogger); + bool EnsureDeleted(IUpdateAdapterFactory updateAdapterFactory, IModel designModel, IDiagnosticsLogger updateLogger); - bool EnsureCreated( - IUpdateAdapterFactory updateAdapterFactory, - IModel designModel, - IDiagnosticsLogger updateLogger); + bool EnsureCreated(IUpdateAdapterFactory updateAdapterFactory, IModel designModel, IDiagnosticsLogger updateLogger); - bool EnsureConnected( - IModel designModel, - IDiagnosticsLogger updateLogger); + bool EnsureConnected(IModel designModel, IDiagnosticsLogger updateLogger); string CreateTable(IEntityType entityType); - IKafkaSerdesFactory SerdesFactory { get; } - - IEnumerable GetData(IEntityType entityType); + IEnumerable GetValueBuffers(IEntityType entityType); KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property); - int ExecuteTransaction( - IList entries, - IDiagnosticsLogger updateLogger); + int ExecuteTransaction(IList entries, IDiagnosticsLogger updateLogger); KafkaOptionsExtension Options { get; } } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs index ab72f9a0..9ea24303 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs @@ -18,8 +18,6 @@ #nullable enable -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; - namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; public interface IKafkaRowBag diff --git a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs index 4242794b..e3db233c 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs @@ -24,12 +24,10 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; -public interface IKafkaTable : IDisposable +public interface IKafkaTable : IEntityTypeProducer { IReadOnlyList SnapshotRows(); - IEnumerable ValueBuffers { get; } - IEnumerable Rows { get; } IKafkaRowBag Create(IUpdateEntry entry); @@ -38,11 +36,7 @@ public interface IKafkaTable : IDisposable IKafkaRowBag Update(IUpdateEntry entry); - IEnumerable> Commit(IEnumerable records); - - KafkaIntegerValueGenerator GetIntegerValueGenerator( - IProperty property, - IReadOnlyList tables); + KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property, IReadOnlyList tables); void BumpValueGenerators(object?[] row); diff --git a/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs b/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs index 89581ee3..eaf81524 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs @@ -18,26 +18,22 @@ using System.Collections.Concurrent; using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; public class KafkaClusterCache : IKafkaClusterCache { private readonly IKafkaTableFactory _tableFactory; - private readonly IKafkaSerdesFactory _serdesFactory; private readonly ConcurrentDictionary _namedClusters; public KafkaClusterCache( IKafkaTableFactory tableFactory, - IKafkaSerdesFactory serdesFactory, IKafkaSingletonOptions? options) { _tableFactory = tableFactory; - _serdesFactory = serdesFactory; _namedClusters = new ConcurrentDictionary(); } public virtual IKafkaCluster GetCluster(KafkaOptionsExtension options) - => _namedClusters.GetOrAdd(options.ClusterId, _ => new KafkaCluster(options, _tableFactory, _serdesFactory)); + => _namedClusters.GetOrAdd(options.ClusterId, _ => new KafkaCluster(options, _tableFactory)); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaTable.cs b/src/net/KEFCore/Storage/Internal/KafkaTable.cs index 993da531..6791aed1 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTable.cs @@ -23,7 +23,6 @@ using MASES.EntityFrameworkCore.KNet.Internal; using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; using Java.Util.Concurrent; -using MASES.EntityFrameworkCore.KNet.Serdes.Internal; using Org.Apache.Kafka.Clients.Producer; namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; @@ -40,8 +39,6 @@ public class KafkaTable : IKafkaTable private Dictionary? _integerGenerators; readonly IEntityTypeProducer _producer; private readonly string _tableAssociatedTopicName; - private readonly IKafkaSerdesEntityType _serdes; - public KafkaTable( IKafkaCluster cluster, @@ -51,7 +48,6 @@ public KafkaTable( Cluster = cluster; EntityType = entityType; _tableAssociatedTopicName = Cluster.CreateTable(entityType); - _serdes = Cluster.SerdesFactory.GetOrCreate(entityType); _producer = EntityTypeProducers.Create(entityType, Cluster); _keyValueFactory = entityType.FindPrimaryKey()!.GetPrincipalKeyValueFactory(); _sensitiveLoggingEnabled = sensitiveLoggingEnabled; @@ -110,7 +106,9 @@ public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator)generator; } - public virtual IEnumerable ValueBuffers => _producer.GetValueBuffer(); + public virtual IEnumerable> Commit(IEnumerable records) => _producer.Commit(records); + + public virtual IEnumerable ValueBuffers => _producer.ValueBuffers; public virtual IEnumerable Rows => RowsInTable(); @@ -290,8 +288,6 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry) } } - public virtual IEnumerable> Commit(IEnumerable records) => _producer.Commit(records); - public virtual void BumpValueGenerators(object?[] row) { if (_integerGenerators != null)