Skip to content

Commit

Permalink
Update to KNet 2.1.2 to solve synchronization issue
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers committed Oct 6, 2023
1 parent f0208d1 commit 886afcc
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 64 deletions.
4 changes: 2 additions & 2 deletions src/net/KEFCore/KEFCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet" Version="2.1.1">
<PackageReference Include="MASES.KNet" Version="2.1.2">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
<PackageReference Include="MASES.KNet.Serialization.Json" Version="2.1.1" />
<PackageReference Include="MASES.KNet.Serialization.Json" Version="2.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.11" PrivateAssets="none" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)
KeySerDes = _keySerdes,
ValueSerDes = _valueSerdes,
};
_kafkaCompactedReplicator.StartAndWait();
if (!_kafkaCompactedReplicator.StartAndWait()) throw new InvalidOperationException($"Failed to synchronize with {_kafkaCompactedReplicator.StateName}");
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/net/KEFCore/Storage/Internal/KafkaCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public virtual void Dispose()
{
if (_tables != null)
{
foreach (var item in _tables?.Values)
foreach (var item in _tables.Values)
{
item?.Dispose();
}
_kafkaAdminClient?.Dispose();
}
_kafkaAdminClient?.Dispose();
}

public virtual KafkaOptionsExtension Options => _options;
Expand Down
6 changes: 3 additions & 3 deletions src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void StartTopology(StreamsBuilder builder, KStream<K, V> root)
{
_currentState = newState;
#if DEBUG_PERFORMANCE
Trace.WriteLine($"StateListener oldState: {oldState} newState: {newState} on {DateTime.Now:HH:mm:ss.FFFFFFF}");
Trace.WriteLine($"StateListener of {_entityType.Name} oldState: {oldState} newState: {newState} on {DateTime.Now:HH:mm:ss.FFFFFFF}");
#endif
if (_stateChanged != null && !_stateChanged.SafeWaitHandle.IsClosed) _stateChanged.Set();
}
Expand All @@ -122,7 +122,7 @@ private void StartTopology(StreamsBuilder builder, KStream<K, V> root)
if (index == WaitHandle.WaitTimeout)
{
#if DEBUG_PERFORMANCE
Trace.WriteLine($"State: {_currentState} No handle set within {waitingTime} ms");
Trace.WriteLine($"State of {_entityType.Name}: {_currentState} No handle set within {waitingTime} ms");
#endif
continue;
}
Expand All @@ -145,7 +145,7 @@ private void StartTopology(StreamsBuilder builder, KStream<K, V> root)
_resetEvent.WaitOne();
_streams.Start();
#if DEBUG_PERFORMANCE
Trace.WriteLine($"KafkaStreamsBaseRetriever Started on {DateTime.Now:HH:mm:ss.FFFFFFF}");
Trace.WriteLine($"KafkaStreamsBaseRetriever on {_entityType.Name} started on {DateTime.Now:HH:mm:ss.FFFFFFF}");
#endif
_resetEvent.WaitOne(); // wait running state
if (_resultException != null) throw _resultException;
Expand Down
15 changes: 15 additions & 0 deletions test/KEFCore.Test/KEFCore.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@
<None Update="InMemoryTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KafkaStreamsNoLoadTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KafkaStreamsPersistedTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KafkaStreamsModelBuilderTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KNetReplicatorNoLoadTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KNetReplicatorModelBuilderTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KNetReplicatorTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand Down
5 changes: 5 additions & 0 deletions test/KEFCore.Test/KNetReplicatorModelBuilderTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"UseCompactedReplicator": true,
"UseModelBuilder": true,
"BootstrapServers": "192.168.1.103:9092"
}
6 changes: 6 additions & 0 deletions test/KEFCore.Test/KNetReplicatorNoLoadTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"UseCompactedReplicator": true,
"DeleteApplicationData": false,
"LoadApplicationData": false,
"BootstrapServers": "192.168.1.103:9092"
}
4 changes: 4 additions & 0 deletions test/KEFCore.Test/KafkaStreamsModelBuilderTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"UseModelBuilder": true,
"BootstrapServers": "192.168.1.103:9092"
}
5 changes: 5 additions & 0 deletions test/KEFCore.Test/KafkaStreamsNoLoadTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"DeleteApplicationData": false,
"LoadApplicationData": false,
"BootstrapServers": "192.168.1.103:9092"
}
6 changes: 6 additions & 0 deletions test/KEFCore.Test/KafkaStreamsPersistedTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"UsePersistentStorage": true,
"DeleteApplicationData": false,
"LoadApplicationData": false,
"BootstrapServers": "192.168.1.103:9092"
}
128 changes: 73 additions & 55 deletions test/KEFCore.Test/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,45 +86,49 @@ static void Main(string[] args)
StreamsConfigBuilder = streamConfig,
};

if (config.DeleteApplication)
if (config.DeleteApplicationData)
{
context.Database.EnsureDeleted();
context.Database.EnsureCreated();
}

testWatcher.Start();
Stopwatch watch = Stopwatch.StartNew();
for (int i = 0; i < config.NumberOfElements; i++)
Stopwatch watch = new Stopwatch();
if (config.LoadApplicationData)
{
context.Add(new Blog
watch.Start();
for (int i = 0; i < config.NumberOfElements; i++)
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
context.Add(new Blog
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
{
new Post()
{
Title = "title",
Content = i.ToString()
}
},
Rating = i,
});
Rating = i,
});
}
watch.Stop();
ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
}
watch.Stop();
ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");

watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");

if (config.UseModelBuilder)
{
watch.Restart();
var pageObject = (from op in context.Blogs
join pg in context.Posts on op.BlogId equals pg.BlogId
where pg.BlogId == op.BlogId
select new { pg, op }).SingleOrDefault();
var selector = (from op in context.Blogs
join pg in context.Posts on op.BlogId equals pg.BlogId
where pg.BlogId == op.BlogId
select new { pg, op });
var pageObject = selector.SingleOrDefault();
watch.Stop();
ReportString($"Elapsed UseModelBuilder {watch.ElapsedMilliseconds} ms");
}
Expand All @@ -134,56 +138,73 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
watch.Stop();
ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 2) {watch.ElapsedMilliseconds} ms. Result is {post}");

watch.Restart();
post = context.Posts.Single(b => b.BlogId == 1);
watch.Stop();
ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {post}");
try
{
watch.Restart();
post = context.Posts.Single(b => b.BlogId == 1);
watch.Stop();
ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {post}");
}
catch
{
if (config.LoadApplicationData) throw; // throw only if the test is loading data otherwise it was removed in a previous run
}

watch.Restart();
var all = context.Posts.All((o) => true);
watch.Stop();
ReportString($"Elapsed context.Posts.All((o) => true) {watch.ElapsedMilliseconds} ms. Result is {all}");

watch.Restart();
var blog = context.Blogs!.Single(b => b.BlogId == 1);
watch.Stop();
ReportString($"Elapsed context.Blogs!.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {blog}");
Blog blog = null;
try
{
watch.Restart();
blog = context.Blogs!.Single(b => b.BlogId == 1);
watch.Stop();
ReportString($"Elapsed context.Blogs!.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {blog}");
}
catch
{
if (config.LoadApplicationData) throw; // throw only if the test is loading data otherwise it was removed in a previous run
}

watch.Restart();
context.Remove(post);
context.Remove(blog);
watch.Stop();
ReportString($"Elapsed data remove {watch.ElapsedMilliseconds} ms");
if (config.LoadApplicationData)
{
watch.Restart();
context.Remove(post);
context.Remove(blog);
watch.Stop();
ReportString($"Elapsed data remove {watch.ElapsedMilliseconds} ms");

watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");

watch.Restart();
for (int i = config.NumberOfElements; i < config.NumberOfElements + config.NumberOfExtraElements; i++)
{
context.Add(new Blog
watch.Restart();
for (int i = config.NumberOfElements; i < config.NumberOfElements + config.NumberOfExtraElements; i++)
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
context.Add(new Blog
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
{
new Post()
{
Title = "title",
Content = i.ToString()
}
},
Rating = i,
});
Rating = i,
});
}
watch.Stop();
ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
}
watch.Stop();
ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");

watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");

watch.Restart();
post = context.Posts.Single(b => b.BlogId == 1009);
Expand All @@ -208,6 +229,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId

public class BloggingContext : KafkaDbContext
{
public override bool UsePersistentStorage { get; set; } = Program.config.UsePersistentStorage;
public override bool UseCompactedReplicator { get; set; } = Program.config.UseCompactedReplicator;

public DbSet<Blog> Blogs { get; set; }
Expand All @@ -223,10 +245,6 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
base.OnConfiguring(optionsBuilder);
}
//optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) =>
//{
// o.StreamsConfig(o.EmptyStreamsConfigBuilder.WithAcceptableRecoveryLag(100)).WithDefaultNumPartitions(10);
//});
}

protected override void OnModelCreating(ModelBuilder modelBuilder)
Expand Down
4 changes: 3 additions & 1 deletion test/KEFCore.Test/ProgramConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ public class ProgramConfig
public bool UseInMemoryProvider { get; set; } = false;
public bool UseModelBuilder { get; set; } = false;
public bool UseCompactedReplicator { get; set; } = false;
public bool UsePersistentStorage { get; set; } = false;
public string DatabaseName { get; set; } = "TestDB";
public string DatabaseNameWithModel { get; set; } = "TestDBWithModel";
public string ApplicationId { get; set; } = "TestApplication";
public bool DeleteApplication { get; set; } = true;
public bool DeleteApplicationData { get; set; } = true;
public bool LoadApplicationData { get; set; } = true;
public string BootstrapServers { get; set; } = "localhost:9092";
public int NumberOfElements { get; set; } = 1000;
public int NumberOfExtraElements { get; set; } = 100;
Expand Down

0 comments on commit 886afcc

Please sign in to comment.