Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full review of KNet Streams SDK classes #422

Merged
merged 28 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8a6dfa4
Added other classes to KNet Streams SDK
masesdevelopers Feb 10, 2024
5564bd2
Temp update
masesdevelopers Feb 12, 2024
7429181
Temporary generation of classes with new JNetReflector: only for test
masesdevelopers Feb 13, 2024
1b5addd
Code alignment to latest generated classes which uses Java.Lang.String
masesdevelopers Feb 13, 2024
88fbf02
Temp to be reopened
masesdevelopers Feb 19, 2024
951ebac
Update IKNetDeserializer, IKNetSerializer, IKNetSerDes to manage JVM …
masesdevelopers Feb 19, 2024
b689ab5
Update tests
masesdevelopers Feb 19, 2024
a8192b6
Merge remote-tracking branch 'upstream/master' into 416-avoid-use-of-…
masesdevelopers Feb 20, 2024
17e73f3
Merge branch '327-improve-kafka-streams-with-some-specific-knet-class…
masesdevelopers Feb 20, 2024
b64dad7
Update to JNet 2.3.0 and bump Google.Protobuf version
masesdevelopers Feb 20, 2024
f1c187c
Documentation fix
masesdevelopers Feb 20, 2024
6b19514
Avoid release trigger when description is edited
masesdevelopers Feb 20, 2024
1224e40
Merge branch '416-avoid-use-of-string-systemstring-and-prefer-javalan…
masesdevelopers Feb 20, 2024
5d5d8de
General review of classes with JVM type added beside .NET type
masesdevelopers Feb 21, 2024
c91dad4
Reviewed how the IGenericSerDesFactory is shared across the classes
masesdevelopers Feb 21, 2024
6966f94
Added new serialization builder
masesdevelopers Feb 21, 2024
714bfe9
Merge remote-tracking branch 'upstream/master' into 327-improve-kafka…
masesdevelopers Feb 21, 2024
8e44b0e
Rename step 1
masesdevelopers Feb 21, 2024
8bc6d04
Rename step 2
masesdevelopers Feb 21, 2024
1b228f3
Rename all classes to align the name to the same of the JVM version
masesdevelopers Feb 21, 2024
8ce1cf2
Update test programs
masesdevelopers Feb 21, 2024
7468e8d
File rename
masesdevelopers Feb 21, 2024
1541b89
Added missing implementation of some classes, update documentation
masesdevelopers Feb 21, 2024
05ceeb6
Tests moved under right location
masesdevelopers Feb 22, 2024
22df17c
Added KNEt Streams SDK basic tests
masesdevelopers Feb 22, 2024
f9c2bd4
Update workflows
masesdevelopers Feb 22, 2024
78816bd
Fix
masesdevelopers Feb 22, 2024
f9a2fae
Update documentation and avoid build if only the documentation has ch…
masesdevelopers Feb 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,23 @@ jobs:
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "src/"* ]]; then
if [[ $added_modified_file == "src/container/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "src/container/DockerfileKNet.linux"* ]]; then
echo "$added_modified_file file changed"
if [[ $added_modified_file == "src/jvm/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "src/net/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "src/container/DockerfileKNetConnect.linux"* ]]; then
echo "$added_modified_file file changed"
if [[ $added_modified_file == "tests/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
Expand Down Expand Up @@ -132,7 +137,7 @@ jobs:
submodules: 'true'

- name: Pre compile tests
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" tests\KNetTest.sln
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" tests\net\KNetTest.sln

- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src\net\KNet\KNet.csproj
Expand Down
12 changes: 11 additions & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,17 @@ jobs:
run: |
mapfile -d ',' -t added_modified_files < <(printf '%s,' '${{ steps.get_changed_files.outputs.added_modified }}')
for added_modified_file in "${added_modified_files[@]}"; do
if [[ $added_modified_file == "src/"* ]]; then
if [[ $added_modified_file == "src/jvm/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "src/net/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "tests/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/generateclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
- name: Clear Java generated files
run: Remove-Item .\src\jvm\knet\src\main\java\org\mases\knet\generated\* -Recurse -Force -Exclude README.md

- run: dotnet tool update -g MASES.JNetReflector
- run: dotnet tool update --version 2.3.0 -g MASES.JNetReflector

- name: Build classes
shell: cmd
Expand Down
19 changes: 17 additions & 2 deletions .github/workflows/pullrequest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,22 @@ jobs:
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "src/"* ]]; then
if [[ $added_modified_file == "src/container/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "src/jvm/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "src/net/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
fi
if [[ $added_modified_file == "tests/"* ]]; then
echo "$added_modified_file file is under the directory 'src/'."
echo "run_build_windows=true" >> $GITHUB_OUTPUT
break
Expand Down Expand Up @@ -102,7 +117,7 @@ jobs:
submodules: 'true'

- name: Pre compile tests
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" tests\KNetTest.sln
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" tests\net\KNetTest.sln

- name: Pre compile
run: dotnet build --no-incremental --configuration Release /p:Platform="Any CPU" /p:NoWarn="0108%3B1030%3B0618" src\net\KNet\KNet.csproj
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
release:
types:
- published
- edited
# - edited

jobs:
build_container_knet:
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDU
* V1.5.4+: From version 1.5.4 there are new packages dedicated to [KNet Serializer/Deserializer](src/documentation/articles/usageSerDes.md)
* V2.0.0+: From version 2.0.0 the code base is fully reflected from the JARs of the Apache Kafka distribution downloaded from Maven; some developed classes still remains beside the specific KNet implementations
* V2.4.0+: From version 2.4.0 it is available the new KNet Streams SDK
* V2.5.0+: From version 2.5.0 there are two breaking changes: uses `Java.Lang.String` instead of `string` (`System.String`) in generated classes and KNet Streams SDK manages the counter-part JVM types

---

## Runtime engine
Expand Down
56 changes: 28 additions & 28 deletions src/documentation/articles/streamsSDK.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ The available classes are under the following namespaces:
- **MASES.KNet.Streams.Utils**: adds some useful functions

All KNet Streams SDK APIs starts with the KNet prefix to avoid confusion during development; some examples are:
- _org.apache.kafka.streams.KafkaStreams_ is managed from **MASES.KNet.Streams.KNetStreams**
- _org.apache.kafka.streams.state.KeyValueIterator<K, V>_ is managed from **MASES.KNet.Streams.State.KNetStreamsKNetKeyValueIterator<TKey, TValue>** applying byte[] on both K and V on _org.apache.kafka.streams.state.KeyValueIterator<K, V>_; there are special cases for this, and other classes, to manage different JVM types:
- **MASES.KNet.Streams.State.KNetTimestampedKeyValueIterator<TKey, TValue>** uses an _org.apache.kafka.streams.state.KeyValueIterator<K, V>_ applying byte[] on K and _org.apache.kafka.streams.state.ValueAndTimestamp<byte[]>_ on V;
- **MASES.KNet.Streams.State.KNetTimestampedWindowedKeyValueIterator<TKey, TValue>** uses an _org.apache.kafka.streams.state.KeyValueIterator<K, V>_ applying _org.apache.kafka.streams.kstream.Windowed<byte[]>_ on K and _org.apache.kafka.streams.state.ValueAndTimestamp<byte[]>_ on V;
- _org.apache.kafka.streams.KafkaStreams_ is managed from **MASES.KNet.Streams.Streams**
- _org.apache.kafka.streams.state.KeyValueIterator<K, V>_ is managed from **MASES.KNet.Streams.State.KeyValueIterator<TKey, TValue>** applying byte[] on both K and V on _org.apache.kafka.streams.state.KeyValueIterator<K, V>_; there are special cases for this, and other classes, to manage different JVM types:
- **MASES.KNet.Streams.State.TimestampedKeyValueIterator<TKey, TValue>** uses an _org.apache.kafka.streams.state.KeyValueIterator<K, V>_ applying byte[] on K and _org.apache.kafka.streams.state.ValueAndTimestamp<byte[]>_ on V;
- **MASES.KNet.Streams.State.TimestampedWindowedKeyValueIterator<TKey, TValue>** uses an _org.apache.kafka.streams.state.KeyValueIterator<K, V>_ applying _org.apache.kafka.streams.kstream.Windowed<byte[]>_ on K and _org.apache.kafka.streams.state.ValueAndTimestamp<byte[]>_ on V;

**Current available APIs cover a subset of the full APIs available in Apache Kafka Streams and some classes are only placeholder for some implemented APIs.**

Expand All @@ -59,22 +59,22 @@ string topicName = "topic-input";
string storageId = "myStorage";

StreamsConfigBuilder streamsConfig = StreamsConfigBuilder.Create();
KNetStreamsBuilder builder = new KNetStreamsBuilder(streamsConfig);
StreamsBuilder builder = new StreamsBuilder(streamsConfig);

Org.Apache.Kafka.Streams.State.KeyValueBytesStoreSupplier storeSupplier = Org.Apache.Kafka.Streams.State.Stores.InMemoryKeyValueStore(storageId);
KNetMaterialized<string, string> materialized = KNetMaterialized<string, string>.As(storeSupplier);
KNetGlobalKTable<string, string> globalTable = builder.GlobalTable(topicName, materialized);
KNetTopology topology = builder.Build();
KNetStreams streams = new KNetStreams(topology, streamsConfig);
Materialized<string, string> materialized = Materialized<string, string>.As(storeSupplier);
GlobalKTable<string, string> globalTable = builder.GlobalTable(topicName, materialized);
Topology topology = builder.Build();
Streams streams = new Streams(topology, streamsConfig);

streams.Start();

KNetReadOnlyKeyValueStore<string, string> keyValueStore = streams.Store(storageId, KNetQueryableStoreTypes.KeyValueStore<string, string>());
KNetKeyValueIterator<string, string> keyValueIterator = keyValueStore.All;
ReadOnlyKeyValueStore<string, string> keyValueStore = streams.Store(storageId, QueryableStoreTypes.KeyValueStore<string, string>());
KeyValueIterator<string, string> keyValueIterator = keyValueStore.All;

while (keyValueIterator.HasNext)
{
KNetKeyValue<string, string> kv = keyValueIterator.Next;
KeyValue<string, string> kv = keyValueIterator.Next;

}

Expand Down Expand Up @@ -112,18 +112,18 @@ StreamsConfigBuilder streamsConfig = StreamsConfigBuilder.Create();
// streamsConfig.KNetKeySerDes = typeof(JsonSerDes.Key<>); // needed for complex keys
streamsConfig.KNetValueSerDes = typeof(JsonSerDes.Value<>);

KNetStreamsBuilder builder = new KNetStreamsBuilder(streamsConfig);
StreamsBuilder builder = new StreamsBuilder(streamsConfig);

Org.Apache.Kafka.Streams.State.KeyValueBytesStoreSupplier storeSupplier = Org.Apache.Kafka.Streams.State.Stores.InMemoryKeyValueStore(storageId);
KNetMaterialized<int, TestType> materialized = KNetMaterialized<int, TestType>.As(storeSupplier);
KNetGlobalKTable<int, TestType> globalTable = builder.GlobalTable(topicName, materialized);
KNetTopology topology = builder.Build();
KNetStreams streams = new KNetStreams(topology, streamsConfig);
Materialized<int, TestType> materialized = Materialized<int, TestType>.As(storeSupplier);
GlobalKTable<int, TestType> globalTable = builder.GlobalTable(topicName, materialized);
Topology topology = builder.Build();
Streams streams = new Streams(topology, streamsConfig);

streams.Start();

KNetReadOnlyKeyValueStore<int, TestType> keyValueStore = streams.Store(storageId, KNetQueryableStoreTypes.KeyValueStore<int, TestType>());
KNetKeyValueIterator<int, TestType> keyValueIterator = keyValueStore.All;
ReadOnlyKeyValueStore<int, TestType> keyValueStore = streams.Store(storageId, QueryableStoreTypes.KeyValueStore<int, TestType>());
KeyValueIterator<int, TestType> keyValueIterator = keyValueStore.All;

while (keyValueIterator.HasNext)
{
Expand All @@ -139,13 +139,13 @@ Other ready made serializers can be found on [KNet serializers](usageSerDes.md).

## Performance consideration

In the previous examples data retrieve use a `KNetKeyValueIterator<TKey, TValue>` obtained from a `KNetReadOnlyKeyValueStore<TKey, TValue>`.
In KNet Streams SDK the serializer is used only when the specifc field is requested, so the following cycle can traverse the full `KNetKeyValueIterator<TKey, TValue>` content searching a specifc key, then the value is returned:
In the previous examples data retrieve use a `KeyValueIterator<TKey, TValue>` obtained from a `ReadOnlyKeyValueStore<TKey, TValue>`.
In KNet Streams SDK the serializer is used only when the specifc field is requested, so the following cycle can traverse the full `KeyValueIterator<TKey, TValue>` content searching a specifc key, then the value is returned:

```C#
while (keyValueIterator.HasNext)
{
KNetKeyValue<int, TestType> kv = keyValueIterator.Next;
KeyValue<int, TestType> kv = keyValueIterator.Next;
if (kv.Key == 100) // key deserialization happens here
{
return kv.Value; // value deserialization happens here
Expand All @@ -160,7 +160,7 @@ However there are conditions which needs to avoid the deserialization to be made
```C#
while (keyValueIterator.HasNext)
{
KNetKeyValue<int, TestType> kv = keyValueIterator.Next;
KeyValue<int, TestType> kv = keyValueIterator.Next;
longFunction(kv.Key, kv.Value); // key and value deserialization happens here before invocation of longFunction
}

Expand All @@ -171,14 +171,14 @@ void longFunction(int key, TestType value)

```

To solve this problem KNet Streams SDK comes with a feature to deserialize in parallel while `longFunction` do its work; `KNetKeyValueIterator<TKey, TValue>` can return a special `IEnumerator<TKeyValue>` which deserialize in parallel:
To solve this problem KNet Streams SDK comes with a feature to deserialize in parallel while `longFunction` do its work; `KeyValueIterator<TKey, TValue>` can return a special `IEnumerator<TKeyValue>` which deserialize in parallel:

```C#
IEnumerator<KNetKeyValue<int, TestType>> enumerator = keyValueIterator.ToIEnumerator(); // it was used the default, i.e. with prefetch feature
IEnumerator<KeyValue<int, TestType>> enumerator = keyValueIterator.ToIEnumerator(); // it was used the default, i.e. with prefetch feature
// key and value deserialization happens behind the scene
while (enumerator.MoveNext())
{
KNetKeyValue<int, TestType> kv = keyValueIterator.Current;
KeyValue<int, TestType> kv = keyValueIterator.Current;
longFunction(kv.Key, kv.Value); // key and value are already ready before invocation of longFunction
}

Expand All @@ -195,7 +195,7 @@ void longFunction(int key, TestType value)
The previous point can be mitigated using the `foreach` statement since iterators implements both `IEnumerable<T>` and `IAsyncEnumerable<T>`:

```C#
foreach (KNetKeyValue<int, TestType> kv in keyValueIterator)
foreach (KeyValue<int, TestType> kv in keyValueIterator)
{
if (kv.Key == 100) break; // when iteration breaks, keyValueIterator is Disposed and the external thread exit
longFunction(kv.Key, kv.Value); // key and value are already ready before invocation of longFunction
Expand All @@ -211,7 +211,7 @@ void longFunction(int key, TestType value)
or

```C#
await foreach (KNetKeyValue<int, TestType> kv in keyValueIterator)
await foreach (KeyValue<int, TestType> kv in keyValueIterator)
{
if (kv.Key == 100) break; // when iteration breaks, keyValueIterator is Disposed and the external thread exit
longFunction(kv.Key, kv.Value); // key and value are already ready before invocation of longFunction
Expand Down
2 changes: 2 additions & 0 deletions src/documentation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ This project adheres to the Contributor [Covenant code of conduct](CODE_OF_CONDU
* V1.5.4+: From version 1.5.4 there are new packages dedicated to [KNet Serializer/Deserializer](articles/usageSerDes.md)
* V2.0.0+: From version 2.0.0 the code base is fully reflected from the JARs of the Apache Kafka distribution downloaded from Maven; some developed classes still remains beside the specific KNet implementations
* V2.4.0+: From version 2.4.0 it is available the new KNet Streams SDK
* V2.5.0+: From version 2.5.0 there are two breaking changes: uses `Java.Lang.String` instead of `string` (`System.String`) in generated classes and KNet Streams SDK manages the counter-part JVM types

---

## Runtime engine
Expand Down
Loading