Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to KNet 2.1.2 to solve synchronization issue #75

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/net/KEFCore/KEFCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet" Version="2.1.1">
<PackageReference Include="MASES.KNet" Version="2.1.2">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
<PackageReference Include="MASES.KNet.Serialization.Json" Version="2.1.1" />
<PackageReference Include="MASES.KNet.Serialization.Json" Version="2.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.11" PrivateAssets="none" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@
{
readonly IEntityType _entityType;
readonly IEnumerator<KeyValuePair<TKey, KNetEntityTypeData<TKey>>> _enumerator;
public KNetCompactedReplicatorEnumerator(IEntityType entityType, IKNetCompactedReplicator<TKey, KNetEntityTypeData<TKey>>? kafkaCompactedReplicator)

Check warning on line 195 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field '_enumerator' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.
{
_entityType = entityType;
kafkaCompactedReplicator?.SyncWait();
_enumerator = kafkaCompactedReplicator?.GetEnumerator();

Check warning on line 199 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference assignment.
}

ValueBuffer? _current = null;
Expand All @@ -214,8 +214,8 @@
{
if (_enumerator.MoveNext())
{
object[] array = null;

Check warning on line 217 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Converting null literal or possible null value to non-nullable type.
_enumerator.Current.Value.GetData(_entityType, ref array);

Check warning on line 218 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference assignment.
_current = new ValueBuffer(array);
return true;
}
Expand Down Expand Up @@ -247,7 +247,7 @@
}


public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)

Check warning on line 250 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field '_streamData' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.
{
_entityType = entityType;
_cluster = cluster;
Expand Down Expand Up @@ -276,7 +276,7 @@
KeySerDes = _keySerdes,
ValueSerDes = _valueSerdes,
};
_kafkaCompactedReplicator.StartAndWait();
if (!_kafkaCompactedReplicator.StartAndWait()) throw new InvalidOperationException($"Failed to synchronize with {_kafkaCompactedReplicator.StateName}");
}
else
{
Expand All @@ -295,7 +295,7 @@
if (_kafkaCompactedReplicator != null) _kafkaCompactedReplicator[record.Key] = value!;
}

return null;

Check warning on line 298 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference return.
}
else
{
Expand All @@ -303,7 +303,7 @@
foreach (KafkaRowBag<TKey> record in records)
{
var future = _kafkaProducer?.Send(new KNetProducerRecord<TKey, KNetEntityTypeData<TKey>>(record.AssociatedTopicName, 0, record.Key, record.Value!));
futures.Add(future);

Check warning on line 306 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference argument for parameter 'item' in 'void List<Future<RecordMetadata>>.Add(Future<RecordMetadata> item)'.
}

_kafkaProducer?.Flush();
Expand Down
4 changes: 2 additions & 2 deletions src/net/KEFCore/Storage/Internal/KafkaCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public virtual void Dispose()
{
if (_tables != null)
{
foreach (var item in _tables?.Values)
foreach (var item in _tables.Values)
{
item?.Dispose();
}
_kafkaAdminClient?.Dispose();
}
_kafkaAdminClient?.Dispose();
}

public virtual KafkaOptionsExtension Options => _options;
Expand Down
6 changes: 3 additions & 3 deletions src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
{
_currentState = newState;
#if DEBUG_PERFORMANCE
Trace.WriteLine($"StateListener oldState: {oldState} newState: {newState} on {DateTime.Now:HH:mm:ss.FFFFFFF}");
Trace.WriteLine($"StateListener of {_entityType.Name} oldState: {oldState} newState: {newState} on {DateTime.Now:HH:mm:ss.FFFFFFF}");
#endif
if (_stateChanged != null && !_stateChanged.SafeWaitHandle.IsClosed) _stateChanged.Set();
}
Expand All @@ -122,7 +122,7 @@
if (index == WaitHandle.WaitTimeout)
{
#if DEBUG_PERFORMANCE
Trace.WriteLine($"State: {_currentState} No handle set within {waitingTime} ms");
Trace.WriteLine($"State of {_entityType.Name}: {_currentState} No handle set within {waitingTime} ms");
#endif
continue;
}
Expand All @@ -145,7 +145,7 @@
_resetEvent.WaitOne();
_streams.Start();
#if DEBUG_PERFORMANCE
Trace.WriteLine($"KafkaStreamsBaseRetriever Started on {DateTime.Now:HH:mm:ss.FFFFFFF}");
Trace.WriteLine($"KafkaStreamsBaseRetriever on {_entityType.Name} started on {DateTime.Now:HH:mm:ss.FFFFFFF}");
#endif
_resetEvent.WaitOne(); // wait running state
if (_resultException != null) throw _resultException;
Expand Down Expand Up @@ -235,8 +235,8 @@
_valueSerdesSw.Stop();
_valueBufferSw.Start();
#endif
object[] array = null;

Check warning on line 238 in src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Converting null literal or possible null value to non-nullable type.
entityTypeData.GetData(_entityType, ref array);

Check warning on line 239 in src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference assignment.
#if DEBUG_PERFORMANCE
_valueBufferSw.Stop();
#endif
Expand Down
15 changes: 15 additions & 0 deletions test/KEFCore.Test/KEFCore.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@
<None Update="InMemoryTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KafkaStreamsNoLoadTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KafkaStreamsPersistedTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KafkaStreamsModelBuilderTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KNetReplicatorNoLoadTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KNetReplicatorModelBuilderTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="KNetReplicatorTest.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Expand Down
5 changes: 5 additions & 0 deletions test/KEFCore.Test/KNetReplicatorModelBuilderTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"UseCompactedReplicator": true,
"UseModelBuilder": true,
"BootstrapServers": "192.168.1.103:9092"
}
6 changes: 6 additions & 0 deletions test/KEFCore.Test/KNetReplicatorNoLoadTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"UseCompactedReplicator": true,
"DeleteApplicationData": false,
"LoadApplicationData": false,
"BootstrapServers": "192.168.1.103:9092"
}
4 changes: 4 additions & 0 deletions test/KEFCore.Test/KafkaStreamsModelBuilderTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"UseModelBuilder": true,
"BootstrapServers": "192.168.1.103:9092"
}
5 changes: 5 additions & 0 deletions test/KEFCore.Test/KafkaStreamsNoLoadTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"DeleteApplicationData": false,
"LoadApplicationData": false,
"BootstrapServers": "192.168.1.103:9092"
}
6 changes: 6 additions & 0 deletions test/KEFCore.Test/KafkaStreamsPersistedTest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"UsePersistentStorage": true,
"DeleteApplicationData": false,
"LoadApplicationData": false,
"BootstrapServers": "192.168.1.103:9092"
}
128 changes: 73 additions & 55 deletions test/KEFCore.Test/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,45 +86,49 @@ static void Main(string[] args)
StreamsConfigBuilder = streamConfig,
};

if (config.DeleteApplication)
if (config.DeleteApplicationData)
{
context.Database.EnsureDeleted();
context.Database.EnsureCreated();
}

testWatcher.Start();
Stopwatch watch = Stopwatch.StartNew();
for (int i = 0; i < config.NumberOfElements; i++)
Stopwatch watch = new Stopwatch();
if (config.LoadApplicationData)
{
context.Add(new Blog
watch.Start();
for (int i = 0; i < config.NumberOfElements; i++)
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
context.Add(new Blog
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
{
new Post()
{
Title = "title",
Content = i.ToString()
}
},
Rating = i,
});
Rating = i,
});
}
watch.Stop();
ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
}
watch.Stop();
ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");

watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");

if (config.UseModelBuilder)
{
watch.Restart();
var pageObject = (from op in context.Blogs
join pg in context.Posts on op.BlogId equals pg.BlogId
where pg.BlogId == op.BlogId
select new { pg, op }).SingleOrDefault();
var selector = (from op in context.Blogs
join pg in context.Posts on op.BlogId equals pg.BlogId
where pg.BlogId == op.BlogId
select new { pg, op });
var pageObject = selector.SingleOrDefault();
watch.Stop();
ReportString($"Elapsed UseModelBuilder {watch.ElapsedMilliseconds} ms");
}
Expand All @@ -134,56 +138,73 @@ join pg in context.Posts on op.BlogId equals pg.BlogId
watch.Stop();
ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 2) {watch.ElapsedMilliseconds} ms. Result is {post}");

watch.Restart();
post = context.Posts.Single(b => b.BlogId == 1);
watch.Stop();
ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {post}");
try
{
watch.Restart();
post = context.Posts.Single(b => b.BlogId == 1);
watch.Stop();
ReportString($"Elapsed context.Posts.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {post}");
}
catch
{
if (config.LoadApplicationData) throw; // throw only if the test is loading data otherwise it was removed in a previous run
}

watch.Restart();
var all = context.Posts.All((o) => true);
watch.Stop();
ReportString($"Elapsed context.Posts.All((o) => true) {watch.ElapsedMilliseconds} ms. Result is {all}");

watch.Restart();
var blog = context.Blogs!.Single(b => b.BlogId == 1);
watch.Stop();
ReportString($"Elapsed context.Blogs!.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {blog}");
Blog blog = null;
try
{
watch.Restart();
blog = context.Blogs!.Single(b => b.BlogId == 1);
watch.Stop();
ReportString($"Elapsed context.Blogs!.Single(b => b.BlogId == 1) {watch.ElapsedMilliseconds} ms. Result is {blog}");
}
catch
{
if (config.LoadApplicationData) throw; // throw only if the test is loading data otherwise it was removed in a previous run
}

watch.Restart();
context.Remove(post);
context.Remove(blog);
watch.Stop();
ReportString($"Elapsed data remove {watch.ElapsedMilliseconds} ms");
if (config.LoadApplicationData)
{
watch.Restart();
context.Remove(post);
context.Remove(blog);
watch.Stop();
ReportString($"Elapsed data remove {watch.ElapsedMilliseconds} ms");

watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");

watch.Restart();
for (int i = config.NumberOfElements; i < config.NumberOfElements + config.NumberOfExtraElements; i++)
{
context.Add(new Blog
watch.Restart();
for (int i = config.NumberOfElements; i < config.NumberOfElements + config.NumberOfExtraElements; i++)
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
context.Add(new Blog
{
Url = "http://blogs.msdn.com/adonet" + i.ToString(),
Posts = new List<Post>()
{
new Post()
{
Title = "title",
Content = i.ToString()
}
},
Rating = i,
});
Rating = i,
});
}
watch.Stop();
ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");
watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");
}
watch.Stop();
ReportString($"Elapsed data load {watch.ElapsedMilliseconds} ms");

watch.Restart();
context.SaveChanges();
watch.Stop();
ReportString($"Elapsed SaveChanges {watch.ElapsedMilliseconds} ms");

watch.Restart();
post = context.Posts.Single(b => b.BlogId == 1009);
Expand All @@ -208,6 +229,7 @@ join pg in context.Posts on op.BlogId equals pg.BlogId

public class BloggingContext : KafkaDbContext
{
public override bool UsePersistentStorage { get; set; } = Program.config.UsePersistentStorage;
public override bool UseCompactedReplicator { get; set; } = Program.config.UseCompactedReplicator;

public DbSet<Blog> Blogs { get; set; }
Expand All @@ -223,10 +245,6 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
base.OnConfiguring(optionsBuilder);
}
//optionsBuilder.UseKafkaDatabase(ApplicationId, DbName, BootstrapServers, (o) =>
//{
// o.StreamsConfig(o.EmptyStreamsConfigBuilder.WithAcceptableRecoveryLag(100)).WithDefaultNumPartitions(10);
//});
}

protected override void OnModelCreating(ModelBuilder modelBuilder)
Expand Down
4 changes: 3 additions & 1 deletion test/KEFCore.Test/ProgramConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ public class ProgramConfig
public bool UseInMemoryProvider { get; set; } = false;
public bool UseModelBuilder { get; set; } = false;
public bool UseCompactedReplicator { get; set; } = false;
public bool UsePersistentStorage { get; set; } = false;
public string DatabaseName { get; set; } = "TestDB";
public string DatabaseNameWithModel { get; set; } = "TestDBWithModel";
public string ApplicationId { get; set; } = "TestApplication";
public bool DeleteApplication { get; set; } = true;
public bool DeleteApplicationData { get; set; } = true;
public bool LoadApplicationData { get; set; } = true;
public string BootstrapServers { get; set; } = "localhost:9092";
public int NumberOfElements { get; set; } = 1000;
public int NumberOfExtraElements { get; set; } = 100;
Expand Down
Loading