Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates test execution to avoid failures #258

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d67ab3f
Use `SingleOrDefault` instead of `Single` to manage conditions where …
masesdevelopers Jun 28, 2024
22affd6
Split windows jobs since Kafka seems to crash if heavily loaded
masesdevelopers Jun 28, 2024
5f483c4
In caso of timeout, especially in Windows, exit without report error …
masesdevelopers Jun 29, 2024
54bfccb
Update removal
masesdevelopers Jun 29, 2024
3a92e45
Update
masesdevelopers Jun 29, 2024
612eae4
Configuration in workflow folder and remove prefetch
masesdevelopers Jun 29, 2024
b738d57
Upload dumps
masesdevelopers Jun 29, 2024
884162f
Added KafkaStreams test
masesdevelopers Jun 30, 2024
5332985
Update preparation of KafkaStreamsBaseRetriever
masesdevelopers Jun 30, 2024
3158458
Update
masesdevelopers Jul 1, 2024
18e55e7
Remove JDK 21 till https://github.com/masesgroup/KNet/issues/510 is s…
masesdevelopers Jul 2, 2024
c37d43c
Update
masesdevelopers Jul 2, 2024
5c8628e
Merge remote-tracking branch 'upstream/master' into 243-add-test-exec…
masesdevelopers Jul 2, 2024
34487e3
Update management of exceptions
masesdevelopers Jul 2, 2024
5d1f90c
Added Raw and Buffered tests for KafkaStreams
masesdevelopers Jul 2, 2024
580b5d6
Use latest JRE for tests
masesdevelopers Jul 2, 2024
6f20469
Avoid JRE since most variants does not have JRE version
masesdevelopers Jul 2, 2024
3988ab5
Code harmonization
masesdevelopers Jul 2, 2024
1189f6a
Adds tests with prefetch
masesdevelopers Jul 2, 2024
d923613
Fix #22 aligning key of headers to the JVM type
masesdevelopers Jul 2, 2024
7264562
Update tests moving out execution from Main to verify if some tests f…
masesdevelopers Jul 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 172 additions & 34 deletions .github/workflows/build.yaml

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions .github/workflows/configuration/Benchmark.KNetReplicator.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"DatabaseName": "TestDBBenchmark",
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10,
"UseEnumeratorWithPrefetch": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10,
"UseEnumeratorWithPrefetch": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseByteBufferDataTransfer": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseByteBufferDataTransfer": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10,
"UseEnumeratorWithPrefetch": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseKNetStreams": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"DatabaseName": "TestDBBenchmark",
"UseCompactedReplicator": false,
"UseKNetStreams": false,
"UseByteBufferDataTransfer": false,
"BootstrapServers": "localhost:9092",
"NumberOfExecutions": 10
}
32 changes: 16 additions & 16 deletions src/net/KEFCore.SerDes.Avro/AvroKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -199,8 +199,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -310,8 +310,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -377,8 +377,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -494,8 +494,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

using MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
Expand Down Expand Up @@ -562,8 +562,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

MemoryStream memStream = new();
BinaryEncoder encoder = new(memStream);
Expand Down Expand Up @@ -668,8 +668,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

using MemoryStream memStream = new();
JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream);
Expand Down Expand Up @@ -737,8 +737,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

MemoryStream memStream = new();
JsonEncoder encoder = new(AvroValueContainer._SCHEMA, memStream);
Expand Down
16 changes: 8 additions & 8 deletions src/net/KEFCore.SerDes.Protobuf/ProtobufKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
KeyContainer keyContainer = null!;
Expand Down Expand Up @@ -185,8 +185,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
KeyContainer keyContainer = null!;
Expand Down Expand Up @@ -298,8 +298,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

using (MemoryStream stream = new())
{
Expand Down Expand Up @@ -361,8 +361,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

MemoryStream stream = new();
data.WriteTo(stream);
Expand Down
16 changes: 8 additions & 8 deletions src/net/KEFCore.SerDes/DefaultKEFCoreSerDes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);
var jsonStr = System.Text.Json.JsonSerializer.Serialize<TData>(data);
Expand Down Expand Up @@ -191,8 +191,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.KeyTypeIdentifier, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifier, keySerDesName);
headers?.Add(KNetSerialization.KeyTypeIdentifierJVM, keyTypeName);
headers?.Add(KNetSerialization.KeySerializerIdentifierJVM, keySerDesName);

if (_defaultSerDes != null) return _defaultSerDes.SerializeWithHeaders(topic, headers, data);

Expand Down Expand Up @@ -301,8 +301,8 @@ public override byte[] Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override byte[] SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

var jsonStr = System.Text.Json.JsonSerializer.Serialize<TData>(data, _options);
return Encoding.UTF8.GetBytes(jsonStr);
Expand Down Expand Up @@ -365,8 +365,8 @@ public override ByteBuffer Serialize(string topic, TData data)
/// <inheritdoc cref="SerDes{TData, TJVM}.SerializeWithHeaders(string, Headers, TData)"/>
public override ByteBuffer SerializeWithHeaders(string topic, Headers headers, TData data)
{
headers?.Add(KNetSerialization.ValueSerializerIdentifier, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifier, valueContainerName);
headers?.Add(KNetSerialization.ValueSerializerIdentifierJVM, valueContainerSerDesName);
headers?.Add(KNetSerialization.ValueTypeIdentifierJVM, valueContainerName);

var ms = new MemoryStream();
System.Text.Json.JsonSerializer.Serialize<TData>(ms, data, _options);
Expand Down
8 changes: 4 additions & 4 deletions src/net/KEFCore.SerDes/KEFCoreSerDes.Helpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,22 @@ public static object FromRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord
foreach (var header in headers.ToArray())
{
var key = header.Key();
if (key == KNetSerialization.KeyTypeIdentifier)
if (key == KNetSerialization.KeyTypeIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
keyType = Type.GetType(strType, true)!;
}
if (key == KNetSerialization.KeySerializerIdentifier)
if (key == KNetSerialization.KeySerializerIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
keySerializerSelectorType = Type.GetType(strType, true)!;
}
if (key == KNetSerialization.ValueTypeIdentifier)
if (key == KNetSerialization.ValueTypeIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
valueType = Type.GetType(strType, true)!;
}
if (key == KNetSerialization.ValueSerializerIdentifier)
if (key == KNetSerialization.ValueSerializerIdentifierJVM)
{
var strType = Encoding.UTF8.GetString(header.Value());
valueSerializerSelectorType = Type.GetType(strType, true)!;
Expand Down
6 changes: 3 additions & 3 deletions src/net/KEFCore/Storage/Internal/KNetStreamsRetriever.cs
Original file line number Diff line number Diff line change
Expand Up @@ -444,20 +444,20 @@ public ValueTask<bool> MoveNextAsync()
{
_moveNextSw.Start();
#endif
ValueTask<bool> hasNext = _asyncEnumerator.MoveNextAsync();
ValueTask<bool> hasNext = _asyncEnumerator == null ? new ValueTask<bool>(false) : _asyncEnumerator.MoveNextAsync();
hasNext.AsTask().Wait();
if (hasNext.Result)
{
#if DEBUG_PERFORMANCE
_cycles++;
_valueGetSw.Start();
#endif
KeyValue<TKey, TValue, TJVMKey, TJVMValue> kv = _asyncEnumerator.Current;
KeyValue<TKey, TValue, TJVMKey, TJVMValue>? kv = _asyncEnumerator?.Current;
#if DEBUG_PERFORMANCE
_valueGetSw.Stop();
_valueGet2Sw.Start();
#endif
TValue value = kv.Value;
TValue value = kv != null ? kv.Value : default;
#if DEBUG_PERFORMANCE
_valueGet2Sw.Stop();
_valueBufferSw.Start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityT
_builder ??= builder;
_topicName = _entityType.TopicName(kafkaCluster.Options);
_usePersistentStorage = _kafkaCluster.Options.UsePersistentStorage;
_properties ??= _kafkaCluster.Options.StreamsOptions(_kafkaCluster.Options.ApplicationId);
_properties ??= _kafkaCluster.Options.StreamsOptions(_entityType);

string storageId = _entityType.StorageIdForTable(_kafkaCluster.Options);
_storageId = _usePersistentStorage ? storageId : Process.GetCurrentProcess().ProcessName + "-" + storageId;
Expand Down
Loading
Loading