Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved kafka streams with many specific KNet classes #343

Merged
merged 22 commits into from
Jan 16, 2024
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7fa8566
Temp commit
masesdevelopers Dec 11, 2023
a244688
Merge branch '327-improve-kafka-streams-with-some-specific-knet-class…
masesdevelopers Dec 11, 2023
9c76eb4
Removed unused Java classes, added specific implementation of Predicate
masesdevelopers Dec 11, 2023
238ecd5
Added Stores and StoresDefault methods to suppliers
masesdevelopers Dec 13, 2023
e4db253
Temporary commit
masesdevelopers Dec 18, 2023
f866315
Merge branch '327-improve-kafka-streams-with-some-specific-knet-class…
masesdevelopers Dec 18, 2023
a02165e
Merge branch '327-improve-kafka-streams-with-some-specific-knet-class…
masesdevelopers Dec 19, 2023
cadb313
Merge branch '327-improve-kafka-streams-with-some-specific-knet-class…
masesdevelopers Jan 5, 2024
6d0ea6e
Temp commit
masesdevelopers Jan 7, 2024
b428634
Name review
masesdevelopers Jan 8, 2024
fd67dec
Other class updates
masesdevelopers Jan 11, 2024
c6c839a
Update class
masesdevelopers Jan 12, 2024
764104c
Added many classes to manage state store, reviewed namespace and seri…
masesdevelopers Jan 14, 2024
0e1fc78
Added new classes and removed occurrence in using of Apache Kafka ref…
masesdevelopers Jan 14, 2024
a864ca7
Managed KNetKStream and added many classes
masesdevelopers Jan 15, 2024
dd34c46
Many other classes implemented
masesdevelopers Jan 15, 2024
49f8881
Some updates
masesdevelopers Jan 15, 2024
a5d1d54
Many fixes on classes
masesdevelopers Jan 16, 2024
8d12e6c
Bump version
masesdevelopers Jan 16, 2024
20fb141
Compilation update
masesdevelopers Jan 16, 2024
8b1e417
Other fixes
masesdevelopers Jan 16, 2024
aafab86
namespace cleanup
masesdevelopers Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Many other classes implemented
masesdevelopers committed Jan 15, 2024
commit dd34c463b2110411d17a4415c7a2e4dfadeae869
2 changes: 1 addition & 1 deletion src/net/KNet/Specific/Streams/KNetTimestampedKeyValue.cs
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
namespace MASES.KNet.Streams
{
/// <summary>
/// KNet implementation of <see cref="KeyValue{K, V}"/>
/// KNet implementation of <see cref="Org.Apache.Kafka.Streams.State.ValueAndTimestamp{V}"/>
/// </summary>
/// <typeparam name="TKey">The key type</typeparam>
/// <typeparam name="TValue">The value type</typeparam>
64 changes: 64 additions & 0 deletions src/net/KNet/Specific/Streams/Kstream/KNetAggregator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.KNet.Serialization;

namespace MASES.KNet.Streams.Kstream
{
/// <summary>
/// KNet implementation of <see cref="Org.Apache.Kafka.Streams.Kstream.Aggregator{K, V, VA}"/>
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
/// <typeparam name="VA">The key type</typeparam>
public class KNetAggregator<K, V, VA> : Org.Apache.Kafka.Streams.Kstream.Aggregator<byte[], byte[], byte[]>, IGenericSerDesFactoryApplier
{
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

/// <summary>
/// Handler for <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Aggregator.html#apply-java.lang.Object-java.lang.Object-java.lang.Object-"/>
/// </summary>
/// <remarks>If <see cref="OnApply"/> has a value it takes precedence over corresponding class method</remarks>
public new System.Func<K, V, VA, VA> OnApply { get; set; } = null;
/// <inheritdoc/>
public sealed override byte[] Apply(byte[] arg0, byte[] arg1, byte[] arg2)
{
IKNetSerDes<K> kSerializer = _factory.BuildKeySerDes<K>();
IKNetSerDes<V> vSerializer = _factory.BuildValueSerDes<V>();
IKNetSerDes<VA> vaSerializer = _factory.BuildValueSerDes<VA>();

var methodToExecute = (OnApply != null) ? OnApply : Apply;
var res = methodToExecute(kSerializer.Deserialize(null, arg0), vSerializer.Deserialize(null, arg1), vaSerializer.Deserialize(null, arg2));

return vaSerializer.Serialize(null, res);
}

/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Aggregator.html#apply-java.lang.Object-java.lang.Object-java.lang.Object-"/>
/// </summary>
/// <param name="arg0"><typeparamref name="K"/></param>
/// <param name="arg1"><typeparamref name="V"/></param>
/// <param name="arg2"><typeparamref name="VA"/></param>
/// <returns><typeparamref name="VA"/></returns>
public virtual VA Apply(K arg0, V arg1, VA arg2)
{
return default;
}
}
}
46 changes: 46 additions & 0 deletions src/net/KNet/Specific/Streams/Kstream/KNetBranched.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.KNet.Serialization;

namespace MASES.KNet.Streams.Kstream
{
/// <summary>
/// KNet extension of <see cref="Org.Apache.Kafka.Streams.Kstream.Branched{K, V}"/>
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
public class KNetBranched<K, V> : IGenericSerDesFactoryApplier
{
readonly Org.Apache.Kafka.Streams.Kstream.Branched<byte[], byte[]> _inner;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

KNetBranched(Org.Apache.Kafka.Streams.Kstream.Branched<byte[], byte[]> inner)
{
_inner = inner;
}

/// <summary>
/// Converter from <see cref="KNetBranched{K, V}"/> to <see cref="Org.Apache.Kafka.Streams.Kstream.Branched{K, V}"/>
/// </summary>
public static implicit operator Org.Apache.Kafka.Streams.Kstream.Branched<byte[], byte[]>(KNetBranched<K, V> t) => t._inner;

#warning till now it is only an empty class shall be completed with the method of inner class
}
}
56 changes: 34 additions & 22 deletions src/net/KNet/Specific/Streams/Kstream/KNetConsumed.cs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
*/

using MASES.KNet.Serialization;
using MASES.KNet.Streams.Processor;

namespace MASES.KNet.Streams.Kstream
{
@@ -27,26 +28,36 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
public class KNetConsumed<K, V> : IGenericSerDesFactoryApplier
{
readonly Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]> _consumed;
KNetTimestampExtractor<K, V> _timestampExtractor = null;
readonly Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]> _inner;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory
{
get => _factory;
set
{
_factory = value;
if (_timestampExtractor is IGenericSerDesFactoryApplier applier) applier.Factory = value;
}
}

KNetConsumed(Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]> consumed)
KNetConsumed(Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]> inner, KNetTimestampExtractor<K, V> timestampExtractor = null)
{
_consumed = consumed;
_inner = inner;
_timestampExtractor = timestampExtractor;
}

/// <summary>
/// Converter from <see cref="KNetConsumed{K, V}"/> to <see cref="Org.Apache.Kafka.Streams.Kstream.Consumed{K, V}"/>
/// </summary>
public static implicit operator Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]>(KNetConsumed<K, V> t) => t._consumed;
public static implicit operator Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]>(KNetConsumed<K, V> t) => t._inner;

#region Static methods
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Consumed.html#as-java.lang.String-"/>
/// </summary>
/// <param name="arg0"><see cref="string"/></param>
/// <returns><see cref="Org.Apache.Kafka.Streams.Kstream.Consumed"/></returns>
/// <returns><see cref="KNetConsumed{K, V}"/></returns>
public static KNetConsumed<K, V> As(string arg0)
{
var cons = Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]>.As(arg0);
@@ -55,22 +66,22 @@ public static KNetConsumed<K, V> As(string arg0)
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Consumed.html#with-org.apache.kafka.common.serialization.Serde-org.apache.kafka.common.serialization.Serde-org.apache.kafka.streams.processor.TimestampExtractor-org.apache.kafka.streams.Topology.AutoOffsetReset-"/>
/// </summary>
/// <param name="arg0"><see cref="Org.Apache.Kafka.Common.Serialization.Serde"/></param>
/// <param name="arg1"><see cref="Org.Apache.Kafka.Common.Serialization.Serde"/></param>
/// <param name="arg2"><see cref="Org.Apache.Kafka.Streams.Processor.TimestampExtractor"/></param>
/// <param name="arg0"><see cref="IKNetSerDes{K}"/></param>
/// <param name="arg1"><see cref="IKNetSerDes{V}"/></param>
/// <param name="arg2"><see cref="KNetTimestampExtractor{K, V}"/></param>
/// <param name="arg3"><see cref="Org.Apache.Kafka.Streams.Topology.AutoOffsetReset"/></param>
/// <returns><see cref="Org.Apache.Kafka.Streams.Kstream.Consumed"/></returns>
/// <returns><see cref="KNetConsumed{K, V}"/></returns>
public static KNetConsumed<K, V> With(IKNetSerDes<K> arg0, IKNetSerDes<V> arg1, KNetTimestampExtractor<K, V> arg2, Org.Apache.Kafka.Streams.Topology.AutoOffsetReset arg3)
{
var cons = Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]>.With(arg0.KafkaSerde, arg1.KafkaSerde, arg2, arg3);
return new KNetConsumed<K, V>(cons);
return new KNetConsumed<K, V>(cons, arg2);
}
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Consumed.html#with-org.apache.kafka.common.serialization.Serde-org.apache.kafka.common.serialization.Serde-"/>
/// </summary>
/// <param name="arg0"><see cref="Org.Apache.Kafka.Common.Serialization.Serde"/></param>
/// <param name="arg1"><see cref="Org.Apache.Kafka.Common.Serialization.Serde"/></param>
/// <returns><see cref="Org.Apache.Kafka.Streams.Kstream.Consumed"/></returns>
/// <param name="arg0"><see cref="IKNetSerDes{K}"/></param>
/// <param name="arg1"><see cref="IKNetSerDes{V}"/></param>
/// <returns><see cref="KNetConsumed{K, V}"/></returns>
public static KNetConsumed<K, V> With(IKNetSerDes<K> arg0, IKNetSerDes<V> arg1)
{
var cons = Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]>.With(arg0.KafkaSerde, arg1.KafkaSerde);
@@ -79,18 +90,18 @@ public static KNetConsumed<K, V> With(IKNetSerDes<K> arg0, IKNetSerDes<V> arg1)
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Consumed.html#with-org.apache.kafka.streams.processor.TimestampExtractor-"/>
/// </summary>
/// <param name="arg0"><see cref="Org.Apache.Kafka.Streams.Processor.TimestampExtractor"/></param>
/// <returns><see cref="Org.Apache.Kafka.Streams.Kstream.Consumed"/></returns>
/// <param name="arg0"><see cref="KNetTimestampExtractor{K, V}"/></param>
/// <returns><see cref="KNetConsumed{K, V}"/></returns>
public static KNetConsumed<K, V> With(KNetTimestampExtractor<K, V> arg0)
{
var cons = Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]>.With(arg0);
return new KNetConsumed<K, V>(cons);
return new KNetConsumed<K, V>(cons, arg0);
}
/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Consumed.html#with-org.apache.kafka.streams.Topology.AutoOffsetReset-"/>
/// </summary>
/// <param name="arg0"><see cref="Org.Apache.Kafka.Streams.Topology.AutoOffsetReset"/></param>
/// <returns><see cref="Org.Apache.Kafka.Streams.Kstream.Consumed"/></returns>
/// <returns><see cref="KNetConsumed{K, V}"/></returns>
public static KNetConsumed<K, V> With(Org.Apache.Kafka.Streams.Topology.AutoOffsetReset arg0)
{
var cons = Org.Apache.Kafka.Streams.Kstream.Consumed<byte[], byte[]>.With(arg0);
@@ -107,7 +118,7 @@ public static KNetConsumed<K, V> With(Org.Apache.Kafka.Streams.Topology.AutoOffs
/// <returns><see cref="KNetConsumed{K, V}"/></returns>
public KNetConsumed<K, V> WithKeySerde(IKNetSerDes<K> arg0)
{
_consumed?.WithKeySerde(arg0.KafkaSerde);
_inner?.WithKeySerde(arg0.KafkaSerde);
return this;
}
/// <summary>
@@ -117,7 +128,7 @@ public KNetConsumed<K, V> WithKeySerde(IKNetSerDes<K> arg0)
/// <returns><see cref="KNetConsumed{K, V}"/></returns>
public KNetConsumed<K, V> WithOffsetResetPolicy(Org.Apache.Kafka.Streams.Topology.AutoOffsetReset arg0)
{
_consumed?.WithOffsetResetPolicy(arg0);
_inner?.WithOffsetResetPolicy(arg0);
return this;
}
/// <summary>
@@ -128,7 +139,8 @@ public KNetConsumed<K, V> WithOffsetResetPolicy(Org.Apache.Kafka.Streams.Topolog
public KNetConsumed<K, V> WithTimestampExtractor(KNetTimestampExtractor<K, V> arg0)
{
if (arg0 is IGenericSerDesFactoryApplier applier) applier.Factory = _factory;
_consumed?.WithTimestampExtractor(arg0);
_timestampExtractor = arg0;
_inner?.WithTimestampExtractor(arg0);
return this;
}
/// <summary>
@@ -138,7 +150,7 @@ public KNetConsumed<K, V> WithTimestampExtractor(KNetTimestampExtractor<K, V> ar
/// <returns><see cref="KNetConsumed{K, V}"/></returns>
public KNetConsumed<K, V> WithValueSerde(IKNetSerDes<V> arg0)
{
_consumed?.WithValueSerde(arg0.KafkaSerde);
_inner?.WithValueSerde(arg0.KafkaSerde);
return this;
}

11 changes: 8 additions & 3 deletions src/net/KNet/Specific/Streams/Kstream/KNetGlobalKTable.cs
Original file line number Diff line number Diff line change
@@ -27,21 +27,26 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
public class KNetGlobalKTable<K, V> : IGenericSerDesFactoryApplier
{
Org.Apache.Kafka.Streams.Kstream.GlobalKTable<byte[], byte[]> _table;
Org.Apache.Kafka.Streams.Kstream.GlobalKTable<byte[], byte[]> _inner;

IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

internal KNetGlobalKTable(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.Kstream.GlobalKTable<byte[], byte[]> table)
{
_factory = factory;
_table = table;
_inner = table;
}

/// <summary>
/// Converter from <see cref="KNetGlobalKTable{K, V}"/> to <see cref="Org.Apache.Kafka.Streams.Kstream.GlobalKTable{K, V}"/>
/// </summary>
public static implicit operator Org.Apache.Kafka.Streams.Kstream.GlobalKTable<byte[], byte[]>(KNetGlobalKTable<K, V> t) => t._table;
public static implicit operator Org.Apache.Kafka.Streams.Kstream.GlobalKTable<byte[], byte[]>(KNetGlobalKTable<K, V> t) => t._inner;

/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/GlobalKTable.html#queryableStoreName--"/>
/// </summary>
/// <returns><see cref="string"/></returns>
public string QueryableStoreName => _inner.QueryableStoreName();
}
}
10 changes: 5 additions & 5 deletions src/net/KNet/Specific/Streams/Kstream/KNetGrouped.cs
Original file line number Diff line number Diff line change
@@ -28,20 +28,20 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
public class KNetGrouped<K, V> : IGenericSerDesFactoryApplier
{
readonly Org.Apache.Kafka.Streams.Kstream.Grouped<byte[], byte[]> _produced;
readonly Org.Apache.Kafka.Streams.Kstream.Grouped<byte[], byte[]> _inner;
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

KNetGrouped(Org.Apache.Kafka.Streams.Kstream.Grouped<byte[], byte[]> produced)
KNetGrouped(Org.Apache.Kafka.Streams.Kstream.Grouped<byte[], byte[]> inner)
{
_produced = produced;
_inner = inner;
}

/// <summary>
/// Converter from <see cref="KNetGrouped{K, V}"/> to <see cref="Org.Apache.Kafka.Streams.Kstream.Grouped{K, V}"/>
/// </summary>
public static implicit operator Org.Apache.Kafka.Streams.Kstream.Grouped<byte[], byte[]>(KNetGrouped<K, V> t) => t._produced;
public static implicit operator Org.Apache.Kafka.Streams.Kstream.Grouped<byte[], byte[]>(KNetGrouped<K, V> t) => t._inner;

#warning shall be completed
#warning till now it is only an empty class shall be completed with the method of inner class
}
}
56 changes: 56 additions & 0 deletions src/net/KNet/Specific/Streams/Kstream/KNetInitializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2023 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using MASES.KNet.Serialization;

namespace MASES.KNet.Streams.Kstream
{
/// <summary>
/// KNet implementation of <see cref="Org.Apache.Kafka.Streams.Kstream.Initializer{VA}"/>
/// </summary>
/// <typeparam name="VA">The key type</typeparam>
public class KNetInitializer<VA> : Org.Apache.Kafka.Streams.Kstream.Initializer<byte[]>, IGenericSerDesFactoryApplier
{
IGenericSerDesFactory _factory;
IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } }

/// <summary>
/// Handler for <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Initializer.html#apply--"/>
/// </summary>
/// <remarks>If <see cref="OnApply2"/> has a value it takes precedence over corresponding class method</remarks>
public System.Func<VA> OnApply2 { get; set; } = null;
/// <inheritdoc/>
public sealed override byte[] Apply()
{
IKNetSerDes<VA> valueSerializer = _factory.BuildValueSerDes<VA>();

var methodToExecute = (OnApply2 != null) ? OnApply2 : Apply2;
var res = methodToExecute();
return valueSerializer.Serialize(null, res);
}

/// <summary>
/// <see href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Initializer.html#apply--"/>
/// </summary>
/// <returns><typeparamref name="VA"/></returns>
public virtual VA Apply2()
{
return default;
}
}
}
Loading