From 7fa85666accbd42b2dff6e426831463f8cca07ec Mon Sep 17 00:00:00 2001
From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com>
Date: Mon, 11 Dec 2023 03:23:03 +0100
Subject: [PATCH 01/18] Temp commit
---
.../kafka/streams/processor/Punctuator.java | 35 ++
.../Specific/Streams/KNetClientSupplier.cs | 120 +++++++
src/net/KNet/Specific/Streams/KNetStreams.cs | 146 ++++++++
src/net/KNet/Specific/Streams/KNetTopology.cs | 337 ++++++++++++++++++
.../Processor/KNetStreamPartitioner.cs | 70 ++++
5 files changed, 708 insertions(+)
create mode 100644 src/jvm/knet/src/main/java/org/mases/knet/generated/org/apache/kafka/streams/processor/Punctuator.java
create mode 100644 src/net/KNet/Specific/Streams/KNetClientSupplier.cs
create mode 100644 src/net/KNet/Specific/Streams/KNetStreams.cs
create mode 100644 src/net/KNet/Specific/Streams/KNetTopology.cs
create mode 100644 src/net/KNet/Specific/Streams/Processor/KNetStreamPartitioner.cs
diff --git a/src/jvm/knet/src/main/java/org/mases/knet/generated/org/apache/kafka/streams/processor/Punctuator.java b/src/jvm/knet/src/main/java/org/mases/knet/generated/org/apache/kafka/streams/processor/Punctuator.java
new file mode 100644
index 0000000000..e06bc56ef3
--- /dev/null
+++ b/src/jvm/knet/src/main/java/org/mases/knet/generated/org/apache/kafka/streams/processor/Punctuator.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2023 MASES s.r.l.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Refer to LICENSE for more information.
+ */
+
+/*
+* This file is generated by MASES.JNetReflector (ver. 2.1.0.0)
+*/
+
+package org.mases.knet.generated.org.apache.kafka.streams.processor;
+
+public final class Punctuator extends org.mases.jcobridge.JCListener implements org.apache.kafka.streams.processor.Punctuator {
+ public Punctuator(String key) throws org.mases.jcobridge.JCNativeException {
+ super(key);
+ }
+
+ //@Override
+ public void punctuate(long arg0) {
+ raiseEvent("punctuate", arg0);
+ }
+
+}
\ No newline at end of file
diff --git a/src/net/KNet/Specific/Streams/KNetClientSupplier.cs b/src/net/KNet/Specific/Streams/KNetClientSupplier.cs
new file mode 100644
index 0000000000..ee27a160f8
--- /dev/null
+++ b/src/net/KNet/Specific/Streams/KNetClientSupplier.cs
@@ -0,0 +1,120 @@
+/*
+* Copyright 2023 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+using Java.Util;
+using MASES.KNet.Consumer;
+using MASES.KNet.Producer;
+using MASES.KNet.Serialization;
+using Org.Apache.Kafka.Clients.Admin;
+using Org.Apache.Kafka.Clients.Consumer;
+using Org.Apache.Kafka.Clients.Producer;
+using Org.Apache.Kafka.Streams;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace MASES.KNet.Specific.Streams
+{
+ ///
+ /// KNet implementation of
+ ///
+ public class KNetClientSupplier : KafkaClientSupplier
+ {
+ private readonly System.Collections.Generic.List _admins = new System.Collections.Generic.List();
+ private readonly System.Collections.Generic.List> _consumers = new System.Collections.Generic.List>();
+ private readonly System.Collections.Generic.List> _producers = new System.Collections.Generic.List>();
+ ///
+ /// Default initializer
+ ///
+ public KNetClientSupplier()
+ {
+ }
+ ///
+ public override Org.Apache.Kafka.Clients.Admin.Admin GetAdmin(Map arg0)
+ {
+ var admin = AdminClient.Create(arg0);
+ _admins.Add(admin);
+ return admin;
+ }
+ ///
+ public override Consumer GetConsumer(Map arg0)
+ {
+ Properties properties = new();
+ properties.PutAll(arg0);
+
+ var consumer = new KNetConsumer(properties);
+ _consumers.Add(consumer);
+ return consumer;
+ }
+ ///
+ public override Consumer GetGlobalConsumer(Map arg0)
+ {
+ Properties properties = new();
+ properties.PutAll(arg0);
+
+ var consumer = new KNetConsumer(properties);
+ _consumers.Add(consumer);
+ return consumer;
+ }
+ ///
+ public override Producer GetProducer(Map arg0)
+ {
+ Properties properties = new();
+ properties.PutAll(arg0);
+
+ var producer = new KNetProducer(properties);
+ _producers.Add(producer);
+ return producer;
+ }
+ ///
+ public override Consumer GetRestoreConsumer(Map arg0)
+ {
+ Properties properties = new();
+ properties.PutAll(arg0);
+
+ var consumer = new KNetConsumer(properties);
+ _consumers.Add(consumer);
+ return consumer;
+ }
+ ///
+ public override void Dispose()
+ {
+ foreach (var item in _admins)
+ {
+ item?.Dispose();
+ }
+ _admins.Clear();
+
+ foreach (var item in _consumers)
+ {
+ item?.Dispose();
+ }
+ _consumers.Clear();
+
+ foreach (var item in _producers)
+ {
+ item?.Dispose();
+ }
+ _producers.Clear();
+
+ base.Dispose();
+ }
+ }
+}
diff --git a/src/net/KNet/Specific/Streams/KNetStreams.cs b/src/net/KNet/Specific/Streams/KNetStreams.cs
new file mode 100644
index 0000000000..7eaa19bf75
--- /dev/null
+++ b/src/net/KNet/Specific/Streams/KNetStreams.cs
@@ -0,0 +1,146 @@
+/*
+* Copyright 2023 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+using Java.Util;
+using MASES.KNet.Consumer;
+using MASES.KNet.Producer;
+using MASES.KNet.Serialization;
+using MASES.KNet.Specific.Streams.Processor;
+using Org.Apache.Kafka.Clients.Admin;
+using Org.Apache.Kafka.Clients.Consumer;
+using Org.Apache.Kafka.Clients.Producer;
+using Org.Apache.Kafka.Streams;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace MASES.KNet.Specific.Streams
+{
+ ///
+ /// KNet extension of
+ ///
+ public class KNetStreams : KafkaStreams
+ {
+ #region Constructors
+ ///
+ public KNetStreams() : base() { }
+ ///
+ public KNetStreams(params object[] args) : base(args) { }
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ ///
+ public KNetStreams(KNetTopology arg0, Java.Util.Properties arg1, Org.Apache.Kafka.Common.Utils.Time arg2)
+ : base(arg0, arg1, arg2)
+ {
+ }
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ ///
+ ///
+ public KNetStreams(KNetTopology arg0, Java.Util.Properties arg1, KNetClientSupplier arg2, Org.Apache.Kafka.Common.Utils.Time arg3)
+ : base(arg0, arg1, arg2, arg3)
+ {
+ }
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ ///
+ public KNetStreams(KNetTopology arg0, Java.Util.Properties arg1, KNetClientSupplier arg2)
+ : base(arg0, arg1, arg2)
+ {
+ }
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ public KNetStreams(KNetTopology arg0, Java.Util.Properties arg1)
+ : base(arg0, arg1)
+ {
+ }
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ ///
+ public KNetStreams(KNetTopology arg0, Org.Apache.Kafka.Streams.StreamsConfig arg1, Org.Apache.Kafka.Common.Utils.Time arg2)
+ : base(arg0, arg1, arg2)
+ {
+ }
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ ///
+ public KNetStreams(KNetTopology arg0, Org.Apache.Kafka.Streams.StreamsConfig arg1, KNetClientSupplier arg2)
+ : base(arg0, arg1, arg2)
+ {
+ }
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ public KNetStreams(KNetTopology arg0, Org.Apache.Kafka.Streams.StreamsConfig arg1)
+ : base(arg0, arg1)
+ {
+ }
+
+ #endregion
+
+ #region Instance methods
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.KeyQueryMetadata QueryMetadataForKey(string arg0, TKey arg1, IKNetSerializer arg2)
+ {
+ return QueryMetadataForKey(arg0, arg2.Serialize(null, arg1), arg2.KafkaSerializer);
+ }
+ ///
+ /// KNet override of
+ ///
+ ///
+ ///
+ ///
+ /// The key type
+ ///
+ public Org.Apache.Kafka.Streams.KeyQueryMetadata QueryMetadataForKey(string arg0, TKey arg1, KNetStreamPartitioner arg2)
+ {
+ return IExecute("queryMetadataForKey", arg0, arg2.KeySerializer.Serialize(null, arg1), arg2);
+ }
+
+ #endregion
+ }
+}
diff --git a/src/net/KNet/Specific/Streams/KNetTopology.cs b/src/net/KNet/Specific/Streams/KNetTopology.cs
new file mode 100644
index 0000000000..5eb259d04d
--- /dev/null
+++ b/src/net/KNet/Specific/Streams/KNetTopology.cs
@@ -0,0 +1,337 @@
+/*
+* Copyright 2023 MASES s.r.l.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* Refer to LICENSE for more information.
+*/
+
+using Org.Apache.Kafka.Streams;
+
+namespace MASES.KNet.Specific.Streams
+{
+ ///
+ /// KNet implementation of
+ ///
+ public class KNetTopology : Topology
+ {
+ #region Constructors
+ ///
+ public KNetTopology() : base() { }
+ ///
+ public KNetTopology(params object[] args) : base(args) { }
+
+ ///
+ /// KNet override of
+ ///
+ ///
+ public KNetTopology(Org.Apache.Kafka.Streams.TopologyConfig arg0)
+ : base(arg0)
+ {
+ }
+ #endregion
+
+ #region Instance methods
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSink(string arg0, string arg1, Org.Apache.Kafka.Common.Serialization.Serializer arg2, Org.Apache.Kafka.Common.Serialization.Serializer arg3, params string[] arg4)
+ {
+ if (arg4.Length == 0) return IExecute("addSink", arg0, arg1, arg2, arg3); else return IExecute("addSink", arg0, arg1, arg2, arg3, arg4);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSink(string arg0, string arg1, Org.Apache.Kafka.Common.Serialization.Serializer arg2, Org.Apache.Kafka.Common.Serialization.Serializer arg3, Org.Apache.Kafka.Streams.Processor.StreamPartitioner arg4, params string[] arg5) where Arg4objectSuperK : K where Arg4objectSuperV : V
+ {
+ if (arg5.Length == 0) return IExecute("addSink", arg0, arg1, arg2, arg3, arg4); else return IExecute("addSink", arg0, arg1, arg2, arg3, arg4, arg5);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSink(string arg0, string arg1, Org.Apache.Kafka.Streams.Processor.StreamPartitioner arg2, params string[] arg3) where Arg2objectSuperK : K where Arg2objectSuperV : V
+ {
+ if (arg3.Length == 0) return IExecute("addSink", arg0, arg1, arg2); else return IExecute("addSink", arg0, arg1, arg2, arg3);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSink(string arg0, Org.Apache.Kafka.Streams.Processor.TopicNameExtractor arg1, params string[] arg2)
+ {
+ if (arg2.Length == 0) return IExecute("addSink", arg0, arg1); else return IExecute("addSink", arg0, arg1, arg2);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSink(string arg0, Org.Apache.Kafka.Streams.Processor.TopicNameExtractor arg1, Org.Apache.Kafka.Common.Serialization.Serializer arg2, Org.Apache.Kafka.Common.Serialization.Serializer arg3, params string[] arg4)
+ {
+ if (arg4.Length == 0) return IExecute("addSink", arg0, arg1, arg2, arg3); else return IExecute("addSink", arg0, arg1, arg2, arg3, arg4);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSink(string arg0, Org.Apache.Kafka.Streams.Processor.TopicNameExtractor arg1, Org.Apache.Kafka.Common.Serialization.Serializer arg2, Org.Apache.Kafka.Common.Serialization.Serializer arg3, Org.Apache.Kafka.Streams.Processor.StreamPartitioner arg4, params string[] arg5) where Arg4objectSuperK : K where Arg4objectSuperV : V
+ {
+ if (arg5.Length == 0) return IExecute("addSink", arg0, arg1, arg2, arg3, arg4); else return IExecute("addSink", arg0, arg1, arg2, arg3, arg4, arg5);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSink(string arg0, Org.Apache.Kafka.Streams.Processor.TopicNameExtractor arg1, Org.Apache.Kafka.Streams.Processor.StreamPartitioner arg2, params string[] arg3) where Arg2objectSuperK : K where Arg2objectSuperV : V
+ {
+ if (arg3.Length == 0) return IExecute("addSink", arg0, arg1, arg2); else return IExecute("addSink", arg0, arg1, arg2, arg3);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSink(string arg0, string arg1, params string[] arg2)
+ {
+ if (arg2.Length == 0) return IExecute("addSink", arg0, arg1); else return IExecute("addSink", arg0, arg1, arg2);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSource(string arg0, params string[] arg1)
+ {
+ if (arg1.Length == 0) return IExecute("addSource", arg0); else return IExecute("addSource", arg0, arg1);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSource(string arg0, Java.Util.Regex.Pattern arg1)
+ {
+ return IExecute("addSource", arg0, arg1);
+ }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public Org.Apache.Kafka.Streams.Topology AddSource(string arg0, Org.Apache.Kafka.Common.Serialization.Deserializer