From 886afcc2c4eb59998e99c098a2f61dfd33633636 Mon Sep 17 00:00:00 2001
From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com>
Date: Fri, 6 Oct 2023 03:04:41 +0200
Subject: [PATCH] Update to KNet 2.1.2 to solve synchronization issue
---
src/net/KEFCore/KEFCore.csproj | 4 +-
.../Storage/Internal/EntityTypeProducer.cs | 2 +-
.../KEFCore/Storage/Internal/KafkaCluster.cs | 4 +-
.../Internal/KafkaStreamsBaseRetriever.cs | 6 +-
test/KEFCore.Test/KEFCore.Test.csproj | 15 ++
.../KNetReplicatorModelBuilderTest.json | 5 +
.../KNetReplicatorNoLoadTest.json | 6 +
.../KafkaStreamsModelBuilderTest.json | 4 +
test/KEFCore.Test/KafkaStreamsNoLoadTest.json | 5 +
.../KafkaStreamsPersistedTest.json | 6 +
test/KEFCore.Test/Program.cs | 128 ++++++++++--------
test/KEFCore.Test/ProgramConfig.cs | 4 +-
12 files changed, 125 insertions(+), 64 deletions(-)
create mode 100644 test/KEFCore.Test/KNetReplicatorModelBuilderTest.json
create mode 100644 test/KEFCore.Test/KNetReplicatorNoLoadTest.json
create mode 100644 test/KEFCore.Test/KafkaStreamsModelBuilderTest.json
create mode 100644 test/KEFCore.Test/KafkaStreamsNoLoadTest.json
create mode 100644 test/KEFCore.Test/KafkaStreamsPersistedTest.json
diff --git a/src/net/KEFCore/KEFCore.csproj b/src/net/KEFCore/KEFCore.csproj
index 936be2e3..86003728 100644
--- a/src/net/KEFCore/KEFCore.csproj
+++ b/src/net/KEFCore/KEFCore.csproj
@@ -66,11 +66,11 @@
-
+
All
None
-
+
diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
index 1cf0f6a8..000e48c6 100644
--- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
+++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
@@ -276,7 +276,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
KeySerDes = _keySerdes,
ValueSerDes = _valueSerdes,
};
- _kafkaCompactedReplicator.StartAndWait();
+ if (!_kafkaCompactedReplicator.StartAndWait()) throw new InvalidOperationException($"Failed to synchronize with {_kafkaCompactedReplicator.StateName}");
}
else
{
diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
index 997943cf..f1336838 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs
@@ -56,12 +56,12 @@ public virtual void Dispose()
{
if (_tables != null)
{
- foreach (var item in _tables?.Values)
+ foreach (var item in _tables.Values)
{
item?.Dispose();
}
- _kafkaAdminClient?.Dispose();
}
+ _kafkaAdminClient?.Dispose();
}
public virtual KafkaOptionsExtension Options => _options;
diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
index e4c90c94..91d0ce0a 100644
--- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
+++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
@@ -95,7 +95,7 @@ private void StartTopology(StreamsBuilder builder, KStream root)
{
_currentState = newState;
#if DEBUG_PERFORMANCE
- Trace.WriteLine($"StateListener oldState: {oldState} newState: {newState} on {DateTime.Now:HH:mm:ss.FFFFFFF}");
+ Trace.WriteLine($"StateListener of {_entityType.Name} oldState: {oldState} newState: {newState} on {DateTime.Now:HH:mm:ss.FFFFFFF}");
#endif
if (_stateChanged != null && !_stateChanged.SafeWaitHandle.IsClosed) _stateChanged.Set();
}
@@ -122,7 +122,7 @@ private void StartTopology(StreamsBuilder builder, KStream root)
if (index == WaitHandle.WaitTimeout)
{
#if DEBUG_PERFORMANCE
- Trace.WriteLine($"State: {_currentState} No handle set within {waitingTime} ms");
+ Trace.WriteLine($"State of {_entityType.Name}: {_currentState} No handle set within {waitingTime} ms");
#endif
continue;
}
@@ -145,7 +145,7 @@ private void StartTopology(StreamsBuilder builder, KStream root)
_resetEvent.WaitOne();
_streams.Start();
#if DEBUG_PERFORMANCE
- Trace.WriteLine($"KafkaStreamsBaseRetriever Started on {DateTime.Now:HH:mm:ss.FFFFFFF}");
+ Trace.WriteLine($"KafkaStreamsBaseRetriever on {_entityType.Name} started on {DateTime.Now:HH:mm:ss.FFFFFFF}");
#endif
_resetEvent.WaitOne(); // wait running state
if (_resultException != null) throw _resultException;
diff --git a/test/KEFCore.Test/KEFCore.Test.csproj b/test/KEFCore.Test/KEFCore.Test.csproj
index fe28413b..4ff7ec5b 100644
--- a/test/KEFCore.Test/KEFCore.Test.csproj
+++ b/test/KEFCore.Test/KEFCore.Test.csproj
@@ -19,6 +19,21 @@
PreserveNewest
+
+ PreserveNewest
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
+
+ PreserveNewest
+
PreserveNewest
diff --git a/test/KEFCore.Test/KNetReplicatorModelBuilderTest.json b/test/KEFCore.Test/KNetReplicatorModelBuilderTest.json
new file mode 100644
index 00000000..8965f7e2
--- /dev/null
+++ b/test/KEFCore.Test/KNetReplicatorModelBuilderTest.json
@@ -0,0 +1,5 @@
+{
+ "UseCompactedReplicator": true,
+ "UseModelBuilder": true,
+ "BootstrapServers": "192.168.1.103:9092"
+}
diff --git a/test/KEFCore.Test/KNetReplicatorNoLoadTest.json b/test/KEFCore.Test/KNetReplicatorNoLoadTest.json
new file mode 100644
index 00000000..c8c48603
--- /dev/null
+++ b/test/KEFCore.Test/KNetReplicatorNoLoadTest.json
@@ -0,0 +1,6 @@
+{
+ "UseCompactedReplicator": true,
+ "DeleteApplicationData": false,
+ "LoadApplicationData": false,
+ "BootstrapServers": "192.168.1.103:9092"
+}
diff --git a/test/KEFCore.Test/KafkaStreamsModelBuilderTest.json b/test/KEFCore.Test/KafkaStreamsModelBuilderTest.json
new file mode 100644
index 00000000..562f8dc7
--- /dev/null
+++ b/test/KEFCore.Test/KafkaStreamsModelBuilderTest.json
@@ -0,0 +1,4 @@
+{
+ "UseModelBuilder": true,
+ "BootstrapServers": "192.168.1.103:9092"
+}
diff --git a/test/KEFCore.Test/KafkaStreamsNoLoadTest.json b/test/KEFCore.Test/KafkaStreamsNoLoadTest.json
new file mode 100644
index 00000000..99c3dced
--- /dev/null
+++ b/test/KEFCore.Test/KafkaStreamsNoLoadTest.json
@@ -0,0 +1,5 @@
+{
+ "DeleteApplicationData": false,
+ "LoadApplicationData": false,
+ "BootstrapServers": "192.168.1.103:9092"
+}
diff --git a/test/KEFCore.Test/KafkaStreamsPersistedTest.json b/test/KEFCore.Test/KafkaStreamsPersistedTest.json
new file mode 100644
index 00000000..8ddb4d9d
--- /dev/null
+++ b/test/KEFCore.Test/KafkaStreamsPersistedTest.json
@@ -0,0 +1,6 @@
+{
+ "UsePersistentStorage": true,
+ "DeleteApplicationData": false,
+ "LoadApplicationData": false,
+ "BootstrapServers": "192.168.1.103:9092"
+}
diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs
index f9ea5251..f56ec8d3 100644
--- a/test/KEFCore.Test/Program.cs
+++ b/test/KEFCore.Test/Program.cs
@@ -86,20 +86,23 @@ static void Main(string[] args)
StreamsConfigBuilder = streamConfig,
};
- if (config.DeleteApplication)
+ if (config.DeleteApplicationData)
{
context.Database.EnsureDeleted();
context.Database.EnsureCreated();
}
testWatcher.Start();
- Stopwatch watch = Stopwatch.StartNew();
- for (int i = 0; i < config.NumberOfElements; i++)
+ Stopwatch watch = new Stopwatch();
+ if (config.LoadApplicationData)
{
- context.Add(new Blog
+ watch.Start();
+ for (int i = 0; i < config.NumberOfElements; i++)
{
- Url = "http://blogs.msdn.com/adonet" + i.ToString(),
- Posts = new List()
+ context.Add(new Blog
+ {
+ Url = "http://blogs.msdn.com/adonet" + i.ToString(),
+ Posts = new List()
{
new Post()
{
@@ -107,24 +110,25 @@ static void Main(string[] args)
Content = i.ToString()
}
},
- Rating = i,
- });
+ Rating = i,
+ });
+ }
+ watch.Stop();
+ ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
+ watch.Restart();
+ context.SaveChanges();
+ watch.Stop();
+ ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
}
- watch.Stop();
- ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
-
- watch.Restart();
- context.SaveChanges();
- watch.Stop();
- ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
if (config.UseModelBuilder)
{
watch.Restart();
- var pageObject = (from op in context.Blogs
- join pg in context.Posts on op.BlogId equals pg.BlogId
- where pg.BlogId == op.BlogId
- select new { pg, op }).SingleOrDefault();
+ var selector = (from op in context.Blogs
+ join pg in context.Posts on op.BlogId equals pg.BlogId
+ where pg.BlogId == op.BlogId
+ select new { pg, op });
+ var pageObject = selector.SingleOrDefault();
watch.Stop();
ReportString($"Elapsed UseModelBuilder {watch.ElapsedMilliseconds} ms");
}
@@ -134,39 +138,56 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
watch.Stop();
ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 2) {watch.ElapsedMilliseconds} ms. Result is {post}");
- watch.Restart();
- post = context.Posts.Single(b => b.BlogId == 1);
- watch.Stop();
- ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {post}");
+ try
+ {
+ watch.Restart();
+ post = context.Posts.Single(b => b.BlogId == 1);
+ watch.Stop();
+ ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {post}");
+ }
+ catch
+ {
+ if (config.LoadApplicationData) throw; // throw only if the test is loading data otherwise it was removed in a previous run
+ }
watch.Restart();
var all = context.Posts.All((o) => true);
watch.Stop();
ReportString($"Elapsed context.Posts.All((o) => true) {watch.ElapsedMilliseconds} ms. Result is {all}");
- watch.Restart();
- var blog = context.Blogs!.Single(b => b.BlogId == 1);
- watch.Stop();
- ReportString($"Elapsed context.Blogs!.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {blog}");
+ Blog blog = null;
+ try
+ {
+ watch.Restart();
+ blog = context.Blogs!.Single(b => b.BlogId == 1);
+ watch.Stop();
+ ReportString($"Elapsed context.Blogs!.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {blog}");
+ }
+ catch
+ {
+ if (config.LoadApplicationData) throw; // throw only if the test is loading data otherwise it was removed in a previous run
+ }
- watch.Restart();
- context.Remove(post);
- context.Remove(blog);
- watch.Stop();
- ReportString($"Elapsed data remove {watch.ElapsedMilliseconds} ms");
+ if (config.LoadApplicationData)
+ {
+ watch.Restart();
+ context.Remove(post);
+ context.Remove(blog);
+ watch.Stop();
+ ReportString($"Elapsed data remove {watch.ElapsedMilliseconds} ms");
- watch.Restart();
- context.SaveChanges();
- watch.Stop();
- ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
+ watch.Restart();
+ context.SaveChanges();
+ watch.Stop();
+ ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
- watch.Restart();
- for (int i = config.NumberOfElements; i < config.NumberOfElements + config.NumberOfExtraElements; i++)
- {
- context.Add(new Blog
+ watch.Restart();
+ for (int i = config.NumberOfElements; i < config.NumberOfElements + config.NumberOfExtraElements; i++)
{
- Url = "http://blogs.msdn.com/adonet" + i.ToString(),
- Posts = new List()
+ context.Add(new Blog
+ {
+ Url = "http://blogs.msdn.com/adonet" + i.ToString(),
+ Posts = new List()
{
new Post()
{
@@ -174,16 +195,16 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
Content = i.ToString()
}
},
- Rating = i,
- });
+ Rating = i,
+ });
+ }
+ watch.Stop();
+ ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
+ watch.Restart();
+ context.SaveChanges();
+ watch.Stop();
+ ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
}
- watch.Stop();
- ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
-
- watch.Restart();
- context.SaveChanges();
- watch.Stop();
- ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
watch.Restart();
post = context.Posts.Single(b => b.BlogId == 1009);
@@ -208,6 +229,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
public class BloggingContext : KafkaDbContext
{
+ public override bool UsePersistentStorage { get; set; } = Program.config.UsePersistentStorage;
public override bool UseCompactedReplicator { get; set; } = Program.config.UseCompactedReplicator;
public DbSet Blogs { get; set; }
@@ -223,10 +245,6 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
base.OnConfiguring(optionsBuilder);
}
- //optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) =>
- //{
- // o.StreamsConfig(o.EmptyStreamsConfigBuilder.WithAcceptableRecoveryLag(100)).WithDefaultNumPartitions(10);
- //});
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
diff --git a/test/KEFCore.Test/ProgramConfig.cs b/test/KEFCore.Test/ProgramConfig.cs
index aa137339..7ec41c73 100644
--- a/test/KEFCore.Test/ProgramConfig.cs
+++ b/test/KEFCore.Test/ProgramConfig.cs
@@ -29,10 +29,12 @@ public class ProgramConfig
public bool UseInMemoryProvider { get; set; } = false;
public bool UseModelBuilder { get; set; } = false;
public bool UseCompactedReplicator { get; set; } = false;
+ public bool UsePersistentStorage { get; set; } = false;
public string DatabaseName { get; set; } = "TestDB";
public string DatabaseNameWithModel { get; set; } = "TestDBWithModel";
public string ApplicationId { get; set; } = "TestApplication";
- public bool DeleteApplication { get; set; } = true;
+ public bool DeleteApplicationData { get; set; } = true;
+ public bool LoadApplicationData { get; set; } = true;
public string BootstrapServers { get; set; } = "localhost:9092";
public int NumberOfElements { get; set; } = 1000;
public int NumberOfExtraElements { get; set; } = 100;