Skip to content

Commit

Permalink
Code update based on #65 (comment)
Browse files Browse the repository at this point in the history
  • Loading branch information
masesdevelopers committed Oct 4, 2023
1 parent 1608959 commit fe57106
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 142 deletions.
111 changes: 53 additions & 58 deletions src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
* Refer to LICENSE for more information.
*/

// #define DEBUG_PERFORMANCE

#nullable enable

using Java.Util.Concurrent;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
using MASES.KNet.Producer;
using MASES.KNet.Replicator;
using MASES.KNet.Serialization;
Expand All @@ -28,8 +29,6 @@
using MASES.KNet.Serialization.Json;
using Org.Apache.Kafka.Clients.Producer;
using System.Text.Json;
using Javax.Xml.Crypto;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;

namespace MASES.EntityFrameworkCore.KNet.Storage.Internal;

Expand All @@ -50,7 +49,7 @@ public static IEntityTypeProducer Create<TKey>(IEntityType entityType, IKafkaClu
//}
//else
//{
return _producers.GetOrAdd(entityType, _ => CreateProducerLocal<TKey>(entityType, cluster));
return _producers.GetOrAdd(entityType, _ => CreateProducerLocal<TKey>(entityType, cluster));
//}
}

Expand Down Expand Up @@ -114,8 +113,13 @@ public void OnDeserialized()
public object Value { get; set; }
}

public interface IEntityTypeData
{
void GetData(IEntityType tName, ref object[] array);
}

[JsonSerializable(typeof(KNetEntityTypeData<>))]
public class KNetEntityTypeData<TKey>
public class KNetEntityTypeData<TKey> : IEntityTypeData
{
public KNetEntityTypeData() { }

Expand All @@ -130,59 +134,41 @@ public KNetEntityTypeData(IEntityType tName, IProperty[] properties, object[] rD
}

public string TypeName { get; set; }
// [JsonConverter(typeof(ListStringObjectTupleConverter))]

public Dictionary<int, ObjectType> Data { get; set; }

public object[] GetData(IEntityType tName)
public void GetData(IEntityType tName, ref object[] array)
{
if (Data == null) return null;

var array = Data.Select((o) => o.Value.Value).ToArray();

return array;

var _properties = tName.GetProperties().ToArray();
List<object> data = new List<object>();

for (int i = 0; i < Data!.Count; i++)
#if DEBUG_PERFORMANCE
Stopwatch fullSw = new Stopwatch();
Stopwatch newSw = new Stopwatch();
Stopwatch iterationSw = new Stopwatch();
try
{
if (Data[i].Value is JsonElement elem)
{
switch (elem.ValueKind)
{
case JsonValueKind.Undefined:
break;
case JsonValueKind.Object:
break;
case JsonValueKind.Array:
break;
case JsonValueKind.String:
data.Add(elem.GetString());
break;
case JsonValueKind.Number:
var tmp = elem.GetInt64();
data.Add(Convert.ChangeType(tmp, _properties[i].ClrType));
break;
case JsonValueKind.True:
data.Add(true);
break;
case JsonValueKind.False:
data.Add(false);
break;
case JsonValueKind.Null:
data.Add(null);
break;
default:
break;
}

}
else
{
data.Add(Convert.ChangeType(Data[i], _properties[i].ClrType));
}
fullSw.Start();
#endif
if (Data == null) { return; }
#if DEBUG_PERFORMANCE
newSw.Start();
#endif
array = new object[Data.Count];
#if DEBUG_PERFORMANCE
newSw.Stop();
iterationSw.Start();
#endif
for (int i = 0; i < Data.Count; i++)
{
array[i] = Data[i].Value;
}
#if DEBUG_PERFORMANCE
iterationSw.Stop();
fullSw.Stop();
}
return data.ToArray();
finally
{
Trace.WriteLine($"Time to GetData with length {Data.Count}: {fullSw.Elapsed} - new array took: {newSw.Elapsed} - Iteration took: {iterationSw.Elapsed}");
}
#endif
}
}

Expand Down Expand Up @@ -274,11 +260,20 @@ public void Dispose()
}
}

public IEnumerable<ValueBuffer> GetValueBuffer()
public IEnumerable<ValueBuffer> ValueBuffers
{
if (_streamData != null) return _streamData;
_kafkaCompactedReplicator?.SyncWait();
if (_kafkaCompactedReplicator == null) throw new InvalidOperationException("Missing _kafkaCompactedReplicator");
return _kafkaCompactedReplicator.Values.Select((item) => new ValueBuffer(item.GetData(_entityType)));
get
{
if (_streamData != null) return _streamData;
_kafkaCompactedReplicator?.SyncWait();
if (_kafkaCompactedReplicator == null) throw new InvalidOperationException("Missing _kafkaCompactedReplicator");
return _kafkaCompactedReplicator.Values.Select((item) =>
{
object[] array = null;
item.GetData(_entityType, ref array);
return new ValueBuffer(array);
}
);
}
}
}
2 changes: 1 addition & 1 deletion src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ public interface IEntityTypeProducer : IDisposable
{
IEnumerable<Future<RecordMetadata>> Commit(IEnumerable<IKafkaRowBag> records);

IEnumerable<ValueBuffer> GetValueBuffer();
IEnumerable<ValueBuffer> ValueBuffers { get; }
}
34 changes: 16 additions & 18 deletions src/net/KEFCore/Storage/Internal/KafkaCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
* Refer to LICENSE for more information.
*/

// #define DEBUG_PERFORMANCE

#nullable enable

using MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal;
using MASES.EntityFrameworkCore.KNet.Diagnostics.Internal;
using MASES.EntityFrameworkCore.KNet.Infrastructure.Internal;
using Java.Util;
using MASES.EntityFrameworkCore.KNet.Serdes.Internal;
using Java.Util.Concurrent;
using Org.Apache.Kafka.Clients.Admin;
using Org.Apache.Kafka.Common.Errors;
Expand All @@ -34,22 +35,17 @@ public class KafkaCluster : IKafkaCluster
{
private readonly KafkaOptionsExtension _options;
private readonly IKafkaTableFactory _tableFactory;
private readonly IKafkaSerdesFactory _serdesFactory;
private readonly bool _useNameMatching;
private readonly IAdmin _kafkaAdminClient;

private readonly object _lock = new();

private System.Collections.Generic.Dictionary<object, IKafkaTable>? _tables;

public KafkaCluster(
KafkaOptionsExtension options,
IKafkaTableFactory tableFactory,
IKafkaSerdesFactory serdesFactory)
public KafkaCluster(KafkaOptionsExtension options, IKafkaTableFactory tableFactory)
{
_options = options;
_tableFactory = tableFactory;
_serdesFactory = serdesFactory;
_useNameMatching = options.UseNameMatching;
Properties props = new();
props.Put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _options.BootstrapServers);
Expand All @@ -68,8 +64,6 @@ public virtual void Dispose()
}
}

public virtual IKafkaSerdesFactory SerdesFactory => _serdesFactory;

public virtual KafkaOptionsExtension Options => _options;

public virtual KafkaIntegerValueGenerator<TProperty> GetIntegerValueGenerator<TProperty>(
Expand Down Expand Up @@ -208,27 +202,31 @@ public virtual string CreateTable(IEntityType entityType)

private static System.Collections.Generic.Dictionary<object, IKafkaTable> CreateTables() => new();

public virtual IEnumerable<ValueBuffer> GetData(IEntityType entityType)
public virtual IEnumerable<ValueBuffer> GetValueBuffers(IEntityType entityType)
{
Stopwatch watcher = new();
lock (_lock)
{
#if DEBUG_PERFORMANCE
Stopwatch watcher = new();
try
{
watcher.Start();
EnsureTable(entityType);
var key = _useNameMatching ? (object)entityType.Name : entityType;
if (_tables != null && _tables.TryGetValue(key, out var table))
{
return table.ValueBuffers;
}
throw new InvalidOperationException("No table available");
#endif
EnsureTable(entityType);
var key = _useNameMatching ? (object)entityType.Name : entityType;
if (_tables != null && _tables.TryGetValue(key, out var table))
{
return table.ValueBuffers;
}
throw new InvalidOperationException("No table available");
#if DEBUG_PERFORMANCE
}
finally
{
watcher.Stop();
Trace.WriteLine("GetData - Execution time was " + watcher.ElapsedMilliseconds + " ms");
}
#endif
}
}

Expand Down
Loading

0 comments on commit fe57106

Please sign in to comment.