diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeDataStorage.cs b/src/net/KEFCore/Storage/Internal/EntityTypeDataStorage.cs
new file mode 100644
index 00000000..58064fb8
--- /dev/null
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeDataStorage.cs
@@ -0,0 +1,174 @@
+/*
+* Copyright 2023 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+#nullable enable
+
+using System.Text.Json;
+using System.Text.Json.Serialization;
+
+namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
+
+///
+/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
+/// the same compatibility standards as public APIs. It may be changed or removed without notice in
+/// any release. You should only use it directly in your code with extreme caution and knowing that
+/// doing so can result in application failures when updating to a new Entity Framework Core release.
+///
+[JsonSerializable(typeof(ObjectType))]
+public class ObjectType : IJsonOnDeserialized
+{
+ public ObjectType()
+ {
+
+ }
+
+ public ObjectType(IProperty typeName, object value)
+ {
+ TypeName = typeName.ClrType?.FullName;
+ PropertyName = typeName.Name;
+ Value = value;
+ }
+
+ public void OnDeserialized()
+ {
+ if (Value is JsonElement elem)
+ {
+ switch (elem.ValueKind)
+ {
+ case JsonValueKind.String:
+ Value = elem.GetString()!;
+ if (TypeName != typeof(string).FullName)
+ {
+ try
+ {
+ Value = Convert.ChangeType(Value, Type.GetType(TypeName!)!);
+ }
+ catch (InvalidCastException)
+ {
+ // failed conversion, try with other methods for known types
+ if (TypeName == typeof(Guid).FullName)
+ {
+ Value = elem.GetGuid();
+ }
+ else if (TypeName == typeof(DateTime).FullName)
+ {
+ Value = elem.GetDateTime();
+ }
+ else if (TypeName == typeof(DateTimeOffset).FullName)
+ {
+ Value = elem.GetDateTimeOffset();
+ }
+ else
+ {
+ Value = elem.GetString()!;
+ }
+ }
+ }
+ break;
+ case JsonValueKind.Number:
+ var tmp = elem.GetInt64();
+ Value = Convert.ChangeType(tmp, Type.GetType(TypeName!)!);
+ break;
+ case JsonValueKind.True:
+ Value = true;
+ break;
+ case JsonValueKind.False:
+ Value = false;
+ break;
+ case JsonValueKind.Null:
+ Value = null;
+ break;
+ case JsonValueKind.Object:
+ case JsonValueKind.Array:
+ case JsonValueKind.Undefined:
+ default:
+ throw new InvalidOperationException($"Failed to deserialize {PropertyName}, ValueKind is {elem.ValueKind}");
+ }
+ }
+ else
+ {
+ Value = Convert.ChangeType(Value, Type.GetType(TypeName!)!);
+ }
+ }
+
+ public string? TypeName { get; set; }
+
+ public string? PropertyName { get; set; }
+
+ public object Value { get; set; }
+}
+
+///
+/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
+/// the same compatibility standards as public APIs. It may be changed or removed without notice in
+/// any release. You should only use it directly in your code with extreme caution and knowing that
+/// doing so can result in application failures when updating to a new Entity Framework Core release.
+///
+[JsonSerializable(typeof(EntityTypeDataStorage<>))]
+public class EntityTypeDataStorage : IEntityTypeData
+{
+ public EntityTypeDataStorage() { }
+
+ public EntityTypeDataStorage(IEntityType tName, IProperty[] properties, object[] rData)
+ {
+ TypeName = tName.Name;
+ Data = new Dictionary();
+ for (int i = 0; i < properties.Length; i++)
+ {
+ Data.Add(properties[i].GetIndex(), new ObjectType(properties[i], rData[i]));
+ }
+ }
+
+ public string TypeName { get; set; }
+
+ public Dictionary Data { get; set; }
+
+ public void GetData(IEntityType tName, ref object[] array)
+ {
+#if DEBUG_PERFORMANCE
+ Stopwatch fullSw = new Stopwatch();
+ Stopwatch newSw = new Stopwatch();
+ Stopwatch iterationSw = new Stopwatch();
+ try
+ {
+ fullSw.Start();
+#endif
+ if (Data == null) { return; }
+#if DEBUG_PERFORMANCE
+ newSw.Start();
+#endif
+ array = new object[Data.Count];
+#if DEBUG_PERFORMANCE
+ newSw.Stop();
+ iterationSw.Start();
+#endif
+ for (int i = 0; i < Data.Count; i++)
+ {
+ array[i] = Data[i].Value;
+ }
+#if DEBUG_PERFORMANCE
+ iterationSw.Stop();
+ fullSw.Stop();
+ }
+ finally
+ {
+ Trace.WriteLine($"Time to GetData with length {Data.Count}: {fullSw.Elapsed} - new array took: {newSw.Elapsed} - Iteration took: {iterationSw.Elapsed}");
+ }
+#endif
+ }
+}
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
index 000e48c6..a88ba9e3 100644
--- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
@@ -24,175 +24,35 @@
using MASES.KNet.Producer;
using MASES.KNet.Replicator;
using MASES.KNet.Serialization;
-using System.Collections.Concurrent;
-using System.Text.Json.Serialization;
using MASES.KNet.Serialization.Json;
using Org.Apache.Kafka.Clients.Producer;
-using System.Text.Json;
using System.Collections;
namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
-public class EntityTypeProducers
-{
- static IEntityTypeProducer? _globalProducer = null;
- static readonly ConcurrentDictionary _producers = new ConcurrentDictionary();
-
- public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull
- {
- //if (!cluster.Options.ProducerByEntity)
- //{
- // lock (_producers)
- // {
- // if (_globalProducer == null) _globalProducer = CreateProducerLocal(entityType, cluster);
- // return _globalProducer;
- // }
- //}
- //else
- //{
- return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster));
- //}
- }
-
- static IEntityTypeProducer CreateProducerLocal(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull => new EntityTypeProducer(entityType, cluster);
-}
-
-[JsonSerializable(typeof(ObjectType))]
-public class ObjectType : IJsonOnDeserialized
-{
- public ObjectType()
- {
-
- }
-
- public ObjectType(IProperty typeName, object value)
- {
- TypeName = typeName.ClrType?.FullName;
- Value = value;
- }
-
- public void OnDeserialized()
- {
- if (Value is JsonElement elem)
- {
- switch (elem.ValueKind)
- {
- case JsonValueKind.Undefined:
- break;
- case JsonValueKind.Object:
- break;
- case JsonValueKind.Array:
- break;
- case JsonValueKind.String:
- Value = elem.GetString()!;
- break;
- case JsonValueKind.Number:
- var tmp = elem.GetInt64();
- Value = Convert.ChangeType(tmp, Type.GetType(TypeName!)!);
- break;
- case JsonValueKind.True:
- Value = true;
- break;
- case JsonValueKind.False:
- Value = false;
- break;
- case JsonValueKind.Null:
- Value = null;
- break;
- default:
- break;
- }
- }
- else
- {
- Value = Convert.ChangeType(Value, Type.GetType(TypeName!)!);
- }
- }
-
- public string? TypeName { get; set; }
-
- public object Value { get; set; }
-}
-
-public interface IEntityTypeData
-{
- void GetData(IEntityType tName, ref object[] array);
-}
-
-[JsonSerializable(typeof(KNetEntityTypeData<>))]
-public class KNetEntityTypeData : IEntityTypeData
-{
- public KNetEntityTypeData() { }
-
- public KNetEntityTypeData(IEntityType tName, IProperty[] properties, object[] rData)
- {
- TypeName = tName.Name;
- Data = new Dictionary();
- for (int i = 0; i < properties.Length; i++)
- {
- Data.Add(properties[i].GetIndex(), new ObjectType(properties[i], rData[i]));
- }
- }
-
- public string TypeName { get; set; }
-
- public Dictionary Data { get; set; }
-
- public void GetData(IEntityType tName, ref object[] array)
- {
-#if DEBUG_PERFORMANCE
- Stopwatch fullSw = new Stopwatch();
- Stopwatch newSw = new Stopwatch();
- Stopwatch iterationSw = new Stopwatch();
- try
- {
- fullSw.Start();
-#endif
- if (Data == null) { return; }
-#if DEBUG_PERFORMANCE
- newSw.Start();
-#endif
- array = new object[Data.Count];
-#if DEBUG_PERFORMANCE
- newSw.Stop();
- iterationSw.Start();
-#endif
- for (int i = 0; i < Data.Count; i++)
- {
- array[i] = Data[i].Value;
- }
-#if DEBUG_PERFORMANCE
- iterationSw.Stop();
- fullSw.Stop();
- }
- finally
- {
- Trace.WriteLine($"Time to GetData with length {Data.Count}: {fullSw.Elapsed} - new array took: {newSw.Elapsed} - Iteration took: {iterationSw.Elapsed}");
- }
-#endif
- }
-}
-
public class EntityTypeProducer : IEntityTypeProducer where TKey : notnull
{
private readonly bool _useCompactedReplicator;
private readonly IKafkaCluster _cluster;
private readonly IEntityType _entityType;
- private readonly IKNetCompactedReplicator>? _kafkaCompactedReplicator;
- private readonly IKNetProducer>? _kafkaProducer;
+ private readonly IKNetCompactedReplicator>? _kafkaCompactedReplicator;
+ private readonly IKNetProducer>? _kafkaProducer;
private readonly IKafkaStreamsBaseRetriever _streamData;
private readonly KNetSerDes _keySerdes;
- private readonly KNetSerDes> _valueSerdes;
+ private readonly KNetSerDes> _valueSerdes;
+ #region KNetCompactedReplicatorEnumerable
class KNetCompactedReplicatorEnumerable : IEnumerable
{
readonly IEntityType _entityType;
- readonly IKNetCompactedReplicator>? _kafkaCompactedReplicator;
+ readonly IKNetCompactedReplicator>? _kafkaCompactedReplicator;
+
+ #region KNetCompactedReplicatorEnumerator
class KNetCompactedReplicatorEnumerator : IEnumerator
{
readonly IEntityType _entityType;
- readonly IEnumerator>> _enumerator;
- public KNetCompactedReplicatorEnumerator(IEntityType entityType, IKNetCompactedReplicator>? kafkaCompactedReplicator)
+ readonly IEnumerator>> _enumerator;
+ public KNetCompactedReplicatorEnumerator(IEntityType entityType, IKNetCompactedReplicator>? kafkaCompactedReplicator)
{
_entityType = entityType;
kafkaCompactedReplicator?.SyncWait();
@@ -228,8 +88,9 @@ public void Reset()
_enumerator?.Reset();
}
}
+ #endregion
- public KNetCompactedReplicatorEnumerable(IEntityType entityType, IKNetCompactedReplicator>? kafkaCompactedReplicator)
+ public KNetCompactedReplicatorEnumerable(IEntityType entityType, IKNetCompactedReplicator>? kafkaCompactedReplicator)
{
_entityType = entityType;
_kafkaCompactedReplicator = kafkaCompactedReplicator;
@@ -245,7 +106,7 @@ IEnumerator IEnumerable.GetEnumerator()
return GetEnumerator();
}
}
-
+ #endregion
public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
{
@@ -259,11 +120,11 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
}
else _keySerdes = new JsonSerDes();
- _valueSerdes = new JsonSerDes>();
+ _valueSerdes = new JsonSerDes>();
if (_useCompactedReplicator)
{
- _kafkaCompactedReplicator = new KNetCompactedReplicator>()
+ _kafkaCompactedReplicator = new KNetCompactedReplicator>()
{
UpdateMode = UpdateModeTypes.OnConsume,
BootstrapServers = _cluster.Options.BootstrapServers,
@@ -280,7 +141,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
}
else
{
- _kafkaProducer = new KNetProducer>(_cluster.Options.ProducerOptions(), _keySerdes, _valueSerdes);
+ _kafkaProducer = new KNetProducer>(_cluster.Options.ProducerOptions(), _keySerdes, _valueSerdes);
_streamData = new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes, _valueSerdes);
}
}
@@ -302,7 +163,7 @@ public IEnumerable> Commit(IEnumerable reco
List> futures = new();
foreach (KafkaRowBag record in records)
{
- var future = _kafkaProducer?.Send(new KNetProducerRecord>(record.AssociatedTopicName, 0, record.Key, record.Value!));
+ var future = _kafkaProducer?.Send(new KNetProducerRecord>(record.AssociatedTopicName, 0, record.Key, record.Value!));
futures.Add(future);
}
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs
new file mode 100644
index 00000000..503422c1
--- /dev/null
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs
@@ -0,0 +1,52 @@
+/*
+* Copyright 2023 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+#nullable enable
+
+using System.Collections.Concurrent;
+
+namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
+///
+/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
+/// the same compatibility standards as public APIs. It may be changed or removed without notice in
+/// any release. You should only use it directly in your code with extreme caution and knowing that
+/// doing so can result in application failures when updating to a new Entity Framework Core release.
+///
+public class EntityTypeProducers
+{
+ static IEntityTypeProducer? _globalProducer = null;
+ static readonly ConcurrentDictionary _producers = new ConcurrentDictionary();
+
+ public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull
+ {
+ //if (!cluster.Options.ProducerByEntity)
+ //{
+ // lock (_producers)
+ // {
+ // if (_globalProducer == null) _globalProducer = CreateProducerLocal(entityType, cluster);
+ // return _globalProducer;
+ // }
+ //}
+ //else
+ //{
+ return _producers.GetOrAdd(entityType, _ => CreateProducerLocal(entityType, cluster));
+ //}
+ }
+
+ static IEntityTypeProducer CreateProducerLocal(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull => new EntityTypeProducer(entityType, cluster);
+}
diff --git a/src/net/KEFCore/Storage/Internal/IEntityTypeData.cs b/src/net/KEFCore/Storage/Internal/IEntityTypeData.cs
new file mode 100644
index 00000000..e6214fff
--- /dev/null
+++ b/src/net/KEFCore/Storage/Internal/IEntityTypeData.cs
@@ -0,0 +1,31 @@
+/*
+* Copyright 2023 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+#nullable enable
+
+namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
+///
+/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
+/// the same compatibility standards as public APIs. It may be changed or removed without notice in
+/// any release. You should only use it directly in your code with extreme caution and knowing that
+/// doing so can result in application failures when updating to a new Entity Framework Core release.
+///
+public interface IEntityTypeData
+{
+ void GetData(IEntityType tName, ref object[] array);
+}
diff --git a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs
index 3782790a..f82838b9 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs
@@ -39,7 +39,7 @@ public KafkaRowBag(IUpdateEntry entry, string topicName, TKey key, IProperty[] p
public IProperty[] Properties { get; private set; }
- public KNetEntityTypeData? Value => UpdateEntry.EntityState == EntityState.Deleted ? null : new KNetEntityTypeData(UpdateEntry.EntityType, Properties, ValueBuffer!);
+ public EntityTypeDataStorage? Value => UpdateEntry.EntityState == EntityState.Deleted ? null : new EntityTypeDataStorage(UpdateEntry.EntityType, Properties, ValueBuffer!);
public object?[]? ValueBuffer { get; private set; }
}
diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs
index e41473b2..523f6dba 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs
@@ -1,5 +1,5 @@
/*
-* Copyright 2022 MASES s.r.l.
+* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,18 +21,24 @@
using MASES.KNet.Serialization;
using Org.Apache.Kafka.Streams;
-namespace MASES.EntityFrameworkCore.KNet.Storage.Internal
+namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
+
+///
+/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
+/// the same compatibility standards as public APIs. It may be changed or removed without notice in
+/// any release. You should only use it directly in your code with extreme caution and knowing that
+/// doing so can result in application failures when updating to a new Entity Framework Core release.
+///
+public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever, byte[], byte[]>
{
- public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever, byte[], byte[]>
+ public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes)
+ : this(kafkaCluster, entityType, keySerdes, valueSerdes, new StreamsBuilder())
{
- public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes)
- : this(kafkaCluster, entityType, keySerdes, valueSerdes, new StreamsBuilder())
- {
- }
+ }
- public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, StreamsBuilder builder)
- : base(kafkaCluster, entityType, keySerdes, valueSerdes, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options)))
- {
- }
+ public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes> valueSerdes, StreamsBuilder builder)
+ : base(kafkaCluster, entityType, keySerdes, valueSerdes, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options)))
+ {
}
}
+
diff --git a/test/KEFCore.Test/ProgramConfig.cs b/test/Common/ProgramConfig.cs
similarity index 100%
rename from test/KEFCore.Test/ProgramConfig.cs
rename to test/Common/ProgramConfig.cs
diff --git a/test/KEFCore.Complex.Test/KEFCore.Complex.Test.csproj b/test/KEFCore.Complex.Test/KEFCore.Complex.Test.csproj
new file mode 100644
index 00000000..7b518684
--- /dev/null
+++ b/test/KEFCore.Complex.Test/KEFCore.Complex.Test.csproj
@@ -0,0 +1,26 @@
+
+
+
+ Exe
+ MASES.EntityFrameworkCore.KNet.Complex.Test
+ MASES.EntityFrameworkCore.KNet.Complex.Test
+ EntityFrameworkCore KNet Complex Test
+ EntityFrameworkCore KNet Complex.Test
+ MASES.EntityFrameworkCore.KNet.Complex.Test
+ ..\..\bin\
+
+
+
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
diff --git a/test/KEFCore.Complex.Test/KNetReplicatorComplexTest.json b/test/KEFCore.Complex.Test/KNetReplicatorComplexTest.json
new file mode 100644
index 00000000..eb441e51
--- /dev/null
+++ b/test/KEFCore.Complex.Test/KNetReplicatorComplexTest.json
@@ -0,0 +1,7 @@
+{
+ "DatabaseName": "TestDBComplex",
+ "UseCompactedReplicator": true,
+ "BootstrapServers": "192.168.1.103:9092",
+ "NumberOfElements": 10,
+ "NumberOfExtraElements": 1
+}
diff --git a/test/KEFCore.Complex.Test/Program.cs b/test/KEFCore.Complex.Test/Program.cs
new file mode 100644
index 00000000..b1ad1fca
--- /dev/null
+++ b/test/KEFCore.Complex.Test/Program.cs
@@ -0,0 +1,293 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2022 MASES s.r.l.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+using MASES.EntityFrameworkCore.KNet.Infrastructure;
+using MASES.KNet.Streams;
+using Microsoft.EntityFrameworkCore;
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Text.Json;
+
+namespace MASES.EntityFrameworkCore.KNet.Test
+{
+ partial class Program
+ {
+ internal static ProgramConfig config = new();
+
+ static void ReportString(string message)
+ {
+ if (Debugger.IsAttached)
+ {
+ Trace.WriteLine(message);
+ }
+ else
+ {
+ Console.WriteLine(message);
+ }
+ }
+
+ static void Main(string[] args)
+ {
+ BloggingContext context = null;
+ var testWatcher = new Stopwatch();
+ var globalWatcher = new Stopwatch();
+
+ if (args.Length > 0)
+ {
+ config = JsonSerializer.Deserialize(File.ReadAllText(args[0]));
+ }
+
+ if (!config.UseInMemoryProvider)
+ {
+ KEFCore.CreateGlobalInstance();
+ }
+
+ var databaseName = config.UseModelBuilder ? config.DatabaseNameWithModel : config.DatabaseName;
+
+ try
+ {
+ globalWatcher.Start();
+ StreamsConfigBuilder streamConfig = null;
+ if (!config.UseInMemoryProvider)
+ {
+ streamConfig = StreamsConfigBuilder.Create();
+ streamConfig = streamConfig.WithAcceptableRecoveryLag(100);
+ }
+
+ context = new BloggingContext()
+ {
+ BootstrapServers = config.BootstrapServers,
+ ApplicationId = config.ApplicationId,
+ DbName = databaseName,
+ StreamsConfigBuilder = streamConfig,
+ };
+
+ if (config.DeleteApplicationData)
+ {
+ context.Database.EnsureDeleted();
+ context.Database.EnsureCreated();
+ }
+
+ testWatcher.Start();
+ Stopwatch watch = new Stopwatch();
+ if (config.LoadApplicationData)
+ {
+ watch.Start();
+ for (int i = 0; i < config.NumberOfElements; i++)
+ {
+ context.Add(new Blog
+ {
+ Url = "http://blogs.msdn.com/adonet" + i.ToString(),
+ Posts = new List()
+ {
+ new Post()
+ {
+ Title = "title",
+ Content = i.ToString(),
+ CreationTime = DateTime.Now,
+ Identifier = Guid.NewGuid()
+ }
+ },
+ Rating = i,
+ });
+ }
+ watch.Stop();
+ ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
+ watch.Restart();
+ context.SaveChanges();
+ watch.Stop();
+ ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
+ }
+
+ if (config.UseModelBuilder)
+ {
+ watch.Restart();
+ var selector = (from op in context.Blogs
+ join pg in context.Posts on op.BlogId equals pg.BlogId
+ where pg.BlogId == op.BlogId
+ select new { pg, op });
+ var pageObject = selector.SingleOrDefault();
+ watch.Stop();
+ ReportString($"Elapsed UseModelBuilder {watch.ElapsedMilliseconds} ms");
+ }
+
+ watch.Restart();
+ var post = context.Posts.Single(b => b.BlogId == 2);
+ watch.Stop();
+ ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 2) {watch.ElapsedMilliseconds} ms. Result is {post}");
+
+ try
+ {
+ watch.Restart();
+ post = context.Posts.Single(b => b.BlogId == 1);
+ watch.Stop();
+ ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {post}");
+ }
+ catch
+ {
+ if (config.LoadApplicationData) throw; // throw only if the test is loading data otherwise it was removed in a previous run
+ }
+
+ watch.Restart();
+ var all = context.Posts.All((o) => true);
+ watch.Stop();
+ ReportString($"Elapsed context.Posts.All((o) => true) {watch.ElapsedMilliseconds} ms. Result is {all}");
+
+ Blog blog = null;
+ try
+ {
+ watch.Restart();
+ blog = context.Blogs!.Single(b => b.BlogId == 1);
+ watch.Stop();
+ ReportString($"Elapsed context.Blogs!.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {blog}");
+ }
+ catch
+ {
+ if (config.LoadApplicationData) throw; // throw only if the test is loading data otherwise it was removed in a previous run
+ }
+
+ if (config.LoadApplicationData)
+ {
+ watch.Restart();
+ context.Remove(post);
+ context.Remove(blog);
+ watch.Stop();
+ ReportString($"Elapsed data remove {watch.ElapsedMilliseconds} ms");
+
+ watch.Restart();
+ context.SaveChanges();
+ watch.Stop();
+ ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
+
+ watch.Restart();
+ for (int i = config.NumberOfElements; i < config.NumberOfElements + config.NumberOfExtraElements; i++)
+ {
+ context.Add(new Blog
+ {
+ Url = "http://blogs.msdn.com/adonet" + i.ToString(),
+ BooleanValue = i % 2 == 0,
+ Posts = new List()
+ {
+ new Post()
+ {
+ Title = "title",
+ Content = i.ToString(),
+ CreationTime = DateTime.Now,
+ Identifier = Guid.NewGuid()
+ }
+ },
+ Rating = i,
+ });
+ }
+ watch.Stop();
+ ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
+ watch.Restart();
+ context.SaveChanges();
+ watch.Stop();
+ ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
+ }
+
+ watch.Restart();
+ post = context.Posts.Single(b => b.BlogId == config.NumberOfElements + (config.NumberOfExtraElements != 0 ? 1 : 0));
+ watch.Stop();
+ ReportString($"Elapsed context.Posts.Single(b => b.BlogId == config.NumberOfElements + (config.NumberOfExtraElements != 0 ? 1 : 0)) {watch.ElapsedMilliseconds} ms. Result is {post}");
+
+ var value = context.Blogs.AsQueryable().ToQueryString();
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine(ex.ToString());
+ }
+ finally
+ {
+ context?.Dispose();
+ testWatcher.Stop();
+ globalWatcher.Stop();
+ Console.WriteLine($"Full test completed in {globalWatcher.Elapsed}, only tests completed in {testWatcher.Elapsed}");
+ }
+ }
+ }
+
+ public class BloggingContext : KafkaDbContext
+ {
+ public override bool UsePersistentStorage { get; set; } = Program.config.UsePersistentStorage;
+ public override bool UseCompactedReplicator { get; set; } = Program.config.UseCompactedReplicator;
+
+ public DbSet Blogs { get; set; }
+ public DbSet Posts { get; set; }
+
+ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
+ {
+ if (Program.config.UseInMemoryProvider)
+ {
+ optionsBuilder.UseInMemoryDatabase(Program.config.DatabaseName);
+ }
+ else
+ {
+ base.OnConfiguring(optionsBuilder);
+ }
+ }
+
+ protected override void OnModelCreating(ModelBuilder modelBuilder)
+ {
+ if (!Program.config.UseModelBuilder) return;
+
+ modelBuilder.Entity().HasKey(c => new { c.BlogId, c.Rating });
+ }
+ }
+
+ public class Blog
+ {
+ public int BlogId { get; set; }
+ public string Url { get; set; }
+ public int Rating { get; set; }
+ public bool BooleanValue { get; set; }
+ public List Posts { get; set; }
+
+ public override string ToString()
+ {
+ return $"BlogId: {BlogId} Url: {Url} Rating: {Rating}";
+ }
+ }
+
+ public class Post
+ {
+ public int PostId { get; set; }
+ public string Title { get; set; }
+ public string Content { get; set; }
+ public DateTime CreationTime { get; set; }
+ public Guid Identifier { get; set; }
+
+ public int BlogId { get; set; }
+ public Blog Blog { get; set; }
+
+ public override string ToString()
+ {
+ return $"PostId: {PostId} Title: {Title} Content: {Content} BlogId: {BlogId}";
+ }
+ }
+}
diff --git a/test/KEFCore.Test.sln b/test/KEFCore.Test.sln
index a6a95ddf..e9e3ceea 100644
--- a/test/KEFCore.Test.sln
+++ b/test/KEFCore.Test.sln
@@ -11,6 +11,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Common", "Common", "{B35B16
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{4A0AD520-9BC4-4F92-893B-6F92BBC35BFA}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KEFCore.Complex.Test", "KEFCore.Complex.Test\KEFCore.Complex.Test.csproj", "{CC3396D4-7365-41C1-B82E-200FE87A4F33}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -25,6 +27,10 @@ Global
{BB85D638-A032-41F5-9118-3264F6F6D14C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BB85D638-A032-41F5-9118-3264F6F6D14C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BB85D638-A032-41F5-9118-3264F6F6D14C}.Release|Any CPU.Build.0 = Release|Any CPU
+ {CC3396D4-7365-41C1-B82E-200FE87A4F33}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {CC3396D4-7365-41C1-B82E-200FE87A4F33}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {CC3396D4-7365-41C1-B82E-200FE87A4F33}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {CC3396D4-7365-41C1-B82E-200FE87A4F33}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -32,6 +38,7 @@ Global
GlobalSection(NestedProjects) = preSolution
{6999B7F3-6887-41CE-B1E9-2CE6BB881FDA} = {4A0AD520-9BC4-4F92-893B-6F92BBC35BFA}
{BB85D638-A032-41F5-9118-3264F6F6D14C} = {B35B16BB-890F-4385-AB20-7AA4DD6E9C01}
+ {CC3396D4-7365-41C1-B82E-200FE87A4F33} = {4A0AD520-9BC4-4F92-893B-6F92BBC35BFA}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {36C294ED-9ECE-42AA-8273-31E008749AF3}
diff --git a/test/KEFCore.Test/KEFCore.Test.csproj b/test/KEFCore.Test/KEFCore.Test.csproj
index 4ff7ec5b..43509805 100644
--- a/test/KEFCore.Test/KEFCore.Test.csproj
+++ b/test/KEFCore.Test/KEFCore.Test.csproj
@@ -9,6 +9,9 @@
MASES.EntityFrameworkCore.KNet.Test
..\..\bin\
+
+
+