Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added performance test project and lots of improvements #82

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/net/Common/Common.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Owners>MASES s.r.l.</Owners>
<Authors>MASES s.r.l.</Authors>
<Company>MASES s.r.l.</Company>
<Version>0.8.1.0</Version>
<Version>0.8.2.0</Version>
<TargetFrameworks>net6.0</TargetFrameworks>
<LangVersion>latest</LangVersion>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
Expand Down
45 changes: 44 additions & 1 deletion src/net/KEFCore/Infrastructure/KafkaDbContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* Refer to LICENSE for more information.
*/

using MASES.KNet;
using MASES.KNet.Common;
using MASES.KNet.Producer;
using MASES.KNet.Streams;
Expand All @@ -31,6 +30,50 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure;
/// </summary>
public class KafkaDbContext : DbContext
{
#if DEBUG_PERFORMANCE
const bool perf = true;
/// <summary>
/// Enable tracing of <see cref="MASES.EntityFrameworkCore.KNet.Storage.Internal.EntityTypeDataStorage{TKey}"/>
/// </summary>
public static bool TraceEntityTypeDataStorageGetData = false;

public static void ReportString(string message)
{
if (!_enableKEFCoreTracing) return;

if (Debugger.IsAttached)
{
Trace.WriteLine($"{DateTime.Now:HH::mm::ss:ffff} - {message}");
}
else
{
Console.WriteLine($"{DateTime.Now:HH::mm::ss:ffff} - {message}");
}
}
#else
const bool perf = false;
#endif
/// <summary>
/// Reports if the library was compiled to reports performance information
/// </summary>
public const bool IsPerformanceVersion = perf;
static bool _enableKEFCoreTracing = false;
/// <summary>
/// Set to <see langword="true"/> to enable tracing of KEFCore
/// </summary>
/// <remarks>Can be set only if the project is compiled with DEBUG_PERFORMANCE preprocessor directive, otherwise an <see cref="InvalidOperationException"/> is raised</remarks>
public static bool EnableKEFCoreTracing
{
get { return _enableKEFCoreTracing; }
set
{
_enableKEFCoreTracing = value;
#if DEBUG_PERFORMANCE
if (_enableKEFCoreTracing) throw new InvalidOperationException("Compile KEFCore using DEBUG_PERFORMANCE preprocessor directive");
#endif
}
}

/// <inheritdoc cref="DbContext.DbContext()"/>
public KafkaDbContext()
{
Expand Down
6 changes: 3 additions & 3 deletions src/net/KEFCore/KEFCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="MASES.KNet" Version="2.1.2">
<PackageReference Include="MASES.KNet" Version="2.1.3">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
<PackageReference Include="MASES.KNet.Serialization.Json" Version="2.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.11" PrivateAssets="none" />
<PackageReference Include="MASES.KNet.Serialization.Json" Version="2.1.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.12" PrivateAssets="none" />
</ItemGroup>
</Project>
16 changes: 3 additions & 13 deletions src/net/KEFCore/Query/Internal/KafkaQueryContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,16 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal;

public class KafkaQueryContext : QueryContext
{
private readonly IDictionary<IEntityType, IEnumerable<ValueBuffer>> _valueBuffersCache
= new Dictionary<IEntityType, IEnumerable<ValueBuffer>>();
private readonly IKafkaCluster _cluster;

public virtual IEnumerable<ValueBuffer> GetValueBuffers(IEntityType entityType)
{
if (!_valueBuffersCache.TryGetValue(entityType, out var valueBuffers))
{
valueBuffers = Cluster.GetValueBuffers(entityType);

_valueBuffersCache[entityType] = valueBuffers;
}

return valueBuffers;
return _cluster.GetValueBuffers(entityType);
}

public KafkaQueryContext(QueryContextDependencies dependencies, IKafkaCluster cluster)
: base(dependencies)
{
Cluster = cluster;
_cluster = cluster;
}

public virtual IKafkaCluster Cluster { get; }
}
4 changes: 1 addition & 3 deletions src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal;
public class KafkaQueryContextFactory : IQueryContextFactory
{
private readonly IKafkaCluster _cluster;
readonly KafkaQueryContext context;

public KafkaQueryContextFactory(
QueryContextDependencies dependencies,
Expand All @@ -35,13 +34,12 @@ public KafkaQueryContextFactory(
{
_cluster = clusterCache.GetCluster(contextOptions);
Dependencies = dependencies;
context = new KafkaQueryContext(Dependencies, _cluster);
}

/// <summary>
/// Dependencies for this service.
/// </summary>
protected virtual QueryContextDependencies Dependencies { get; }

public virtual QueryContext Create() => context; // new KafkaQueryContext(Dependencies, _cluster);
public virtual QueryContext Create() => new KafkaQueryContext(Dependencies, _cluster);
}
7 changes: 6 additions & 1 deletion src/net/KEFCore/Storage/Internal/EntityTypeDataStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* Refer to LICENSE for more information.
*/

// #define DEBUG_PERFORMANCE

#nullable enable

using System.Text.Json;
Expand Down Expand Up @@ -167,7 +169,10 @@ public void GetData(IEntityType tName, ref object[] array)
}
finally
{
Trace.WriteLine($"Time to GetData with length {Data.Count}: {fullSw.Elapsed} - new array took: {newSw.Elapsed} - Iteration took: {iterationSw.Elapsed}");
if (Infrastructure.KafkaDbContext.TraceEntityTypeDataStorageGetData)
{
Infrastructure.KafkaDbContext.ReportString($"Time to GetData with length {Data.Count}: {fullSw.Elapsed} - new array took: {newSw.Elapsed} - Iteration took: {iterationSw.Elapsed}");
}
}
#endif
}
Expand Down
94 changes: 84 additions & 10 deletions src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,37 +50,99 @@
#region KNetCompactedReplicatorEnumerator
class KNetCompactedReplicatorEnumerator : IEnumerator<ValueBuffer>
{
#if DEBUG_PERFORMANCE
Stopwatch _moveNextSw = new Stopwatch();
Stopwatch _currentSw = new Stopwatch();
Stopwatch _valueBufferSw = new Stopwatch();
#endif
readonly IEntityType _entityType;
IKNetCompactedReplicator<TKey, EntityTypeDataStorage<TKey>>? _kafkaCompactedReplicator;
readonly IEnumerator<KeyValuePair<TKey, EntityTypeDataStorage<TKey>>> _enumerator;
public KNetCompactedReplicatorEnumerator(IEntityType entityType, IKNetCompactedReplicator<TKey, EntityTypeDataStorage<TKey>>? kafkaCompactedReplicator)

Check warning on line 61 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field '_enumerator' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.

Check warning on line 61 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field '_enumerator' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.
{
_entityType = entityType;
kafkaCompactedReplicator?.SyncWait();
_enumerator = kafkaCompactedReplicator?.GetEnumerator();
_kafkaCompactedReplicator = kafkaCompactedReplicator;
#if DEBUG_PERFORMANCE
Stopwatch sw = Stopwatch.StartNew();
#endif
if (!_kafkaCompactedReplicator!.SyncWait()) throw new InvalidOperationException($"Failed to synchronize with {_kafkaCompactedReplicator.StateName}");
#if DEBUG_PERFORMANCE
sw.Stop();
Infrastructure.KafkaDbContext.ReportString($"KNetCompactedReplicatorEnumerator SyncWait for {_entityType.Name} tooks {sw.Elapsed}");
#endif
_enumerator = _kafkaCompactedReplicator?.GetEnumerator();

Check warning on line 73 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference assignment.

Check warning on line 73 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference assignment.
}

ValueBuffer? _current = null;

public ValueBuffer Current => _current.HasValue ? _current.Value : default;
public ValueBuffer Current
{
get
{
#if DEBUG_PERFORMANCE
try
{
_currentSw.Start();
#endif
return _current.HasValue ? _current.Value : default;
#if DEBUG_PERFORMANCE
}
finally
{
_currentSw.Stop();
}
#endif
}
}

object IEnumerator.Current => Current;

public void Dispose()
{
#if DEBUG_PERFORMANCE
Infrastructure.KafkaDbContext.ReportString($"KNetCompactedReplicatorEnumerator _moveNextSw: {_moveNextSw.Elapsed} _currentSw: {_currentSw.Elapsed} _valueBufferSw: {_valueBufferSw.Elapsed}");
#endif
_enumerator?.Dispose();
}

#if DEBUG_PERFORMANCE
int _cycles = 0;
#endif

public bool MoveNext()
{
if (_enumerator.MoveNext())
#if DEBUG_PERFORMANCE
try
{
_moveNextSw.Start();
#endif
if (_enumerator.MoveNext())
{
#if DEBUG_PERFORMANCE
_cycles++;
_valueBufferSw.Start();
#endif
object[] array = null;

Check warning on line 125 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Converting null literal or possible null value to non-nullable type.

Check warning on line 125 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Converting null literal or possible null value to non-nullable type.
_enumerator.Current.Value.GetData(_entityType, ref array);
#if DEBUG_PERFORMANCE
_valueBufferSw.Stop();
#endif
_current = new ValueBuffer(array);
return true;
}
_current = null;
return false;
#if DEBUG_PERFORMANCE
}
finally
{
object[] array = null;
_enumerator.Current.Value.GetData(_entityType, ref array);
_current = new ValueBuffer(array);
return true;
_moveNextSw.Stop();
if (_cycles == 0)
{
throw new InvalidOperationException($"KNetCompactedReplicatorEnumerator - No data returned from {_kafkaCompactedReplicator}");
}
}
_current = null;
return false;
#endif
}

public void Reset()
Expand Down Expand Up @@ -108,8 +170,11 @@
}
#endregion

public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster)

Check warning on line 173 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field '_streamData' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.

Check warning on line 173 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Non-nullable field '_streamData' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.
{
#if DEBUG_PERFORMANCE
Infrastructure.KafkaDbContext.ReportString($"Creating new EntityTypeProducer for {entityType.Name}");
#endif
_entityType = entityType;
_cluster = cluster;
_useCompactedReplicator = _cluster.Options.UseCompactedReplicator;
Expand Down Expand Up @@ -137,7 +202,14 @@
KeySerDes = _keySerdes,
ValueSerDes = _valueSerdes,
};
#if DEBUG_PERFORMANCE
Stopwatch sw = Stopwatch.StartNew();
#endif
if (!_kafkaCompactedReplicator.StartAndWait()) throw new InvalidOperationException($"Failed to synchronize with {_kafkaCompactedReplicator.StateName}");
#if DEBUG_PERFORMANCE
sw.Stop();
Infrastructure.KafkaDbContext.ReportString($"EntityTypeProducer - KNetCompactedReplicator::StartAndWait for {entityType.Name} in {sw.Elapsed}");
#endif
}
else
{
Expand All @@ -146,6 +218,8 @@
}
}

public virtual IEntityType EntityType => _entityType;

public IEnumerable<Future<RecordMetadata>> Commit(IEnumerable<IKafkaRowBag> records)
{
if (_useCompactedReplicator)
Expand All @@ -156,7 +230,7 @@
if (_kafkaCompactedReplicator != null) _kafkaCompactedReplicator[record.Key] = value!;
}

return null;

Check warning on line 233 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference return.

Check warning on line 233 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference return.
}
else
{
Expand All @@ -164,7 +238,7 @@
foreach (KafkaRowBag<TKey> record in records)
{
var future = _kafkaProducer?.Send(new KNetProducerRecord<TKey, EntityTypeDataStorage<TKey>>(record.AssociatedTopicName, 0, record.Key, record.Value!));
futures.Add(future);

Check warning on line 241 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference argument for parameter 'item' in 'void List<Future<RecordMetadata>>.Add(Future<RecordMetadata> item)'.

Check warning on line 241 in src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference argument for parameter 'item' in 'void List<Future<RecordMetadata>>.Add(Future<RecordMetadata> item)'.
}

_kafkaProducer?.Flush();
Expand Down
20 changes: 9 additions & 11 deletions src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,16 @@ public class EntityTypeProducers

public static IEntityTypeProducer Create<TKey>(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull
{
//if (!cluster.Options.ProducerByEntity)
//{
// lock (_producers)
// {
// if (_globalProducer == null) _globalProducer = CreateProducerLocal<TKey>(entityType, cluster);
// return _globalProducer;
// }
//}
//else
//{
return _producers.GetOrAdd(entityType, _ => CreateProducerLocal<TKey>(entityType, cluster));
//}
}

public static void Dispose(IEntityTypeProducer producer)
{
if (!_producers.TryRemove(new KeyValuePair<IEntityType, IEntityTypeProducer>(producer.EntityType, producer)))
{
throw new InvalidOperationException($"Failed to remove IEntityTypeProducer for {producer.EntityType.Name}");
}
producer.Dispose();
}

static IEntityTypeProducer CreateProducerLocal<TKey>(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull => new EntityTypeProducer<TKey>(entityType, cluster);
Expand Down
2 changes: 2 additions & 0 deletions src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;

public interface IEntityTypeProducer : IDisposable
{
IEntityType EntityType { get; }

IEnumerable<Future<RecordMetadata>> Commit(IEnumerable<IKafkaRowBag> records);

IEnumerable<ValueBuffer> ValueBuffers { get; }
Expand Down
2 changes: 2 additions & 0 deletions src/net/KEFCore/Storage/Internal/IKafkaCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ public interface IKafkaCluster : IDisposable

int ExecuteTransaction(IList<IUpdateEntry> entries, IDiagnosticsLogger<DbLoggerCategory.Update> updateLogger);

string ClusterId { get; }

KafkaOptionsExtension Options { get; }
}
2 changes: 2 additions & 0 deletions src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
public interface IKafkaClusterCache
{
IKafkaCluster GetCluster(KafkaOptionsExtension options);

void Dispose(IKafkaCluster cluster);
}
2 changes: 2 additions & 0 deletions src/net/KEFCore/Storage/Internal/IKafkaTableFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;
public interface IKafkaTableFactory
{
IKafkaTable Create(IKafkaCluster cluster, IEntityType entityType);

void Dispose(IKafkaTable table);
}
Loading
Loading