diff --git a/src/config/connect-knet-sink.properties b/src/config/connect-knet-sink.properties
new file mode 100644
index 0000000000..22ff9b967f
--- /dev/null
+++ b/src/config/connect-knet-sink.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-knet-sink
+connector.class=KNetSinkConnector
+tasks.max=1
+knet.dotnet.classname=MASES.KNetTemplate.KNetConnect.KNetConnectSink, knetConnectSink
+topics=topic-perf
\ No newline at end of file
diff --git a/src/config/connect-knet-source.properties b/src/config/connect-knet-source.properties
new file mode 100644
index 0000000000..47cc28b0fe
--- /dev/null
+++ b/src/config/connect-knet-source.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-knet-source
+connector.class=KNetSourceConnector
+tasks.max=1
+knet.dotnet.classname=MASES.KNetTemplate.KNetConnect.KNetConnectSource, knetConnectSource
+topic=topic-perf
\ No newline at end of file
diff --git a/src/java/knet/pom.xml b/src/java/knet/pom.xml
index 581aa7dd7a..b73df91a82 100644
--- a/src/java/knet/pom.xml
+++ b/src/java/knet/pom.xml
@@ -9,7 +9,7 @@
mases.knet
Interface bridging implementation for Apache Kafka
https://github.com/masesgroup/KNet
- 1.2.4.0
+ 1.3.0.0
@@ -40,7 +40,7 @@
8
8
${basedir}/classpathfile.classpath
- 3.1.0
+ 3.1.1
@@ -228,6 +228,11 @@
4.13.1
test
+
+ org.slf4j
+ slf4j-api
+ 1.7.36
+
org.apache.kafka
kafka-clients
diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java b/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java
new file mode 100644
index 0000000000..775a8cd20e
--- /dev/null
+++ b/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2022 MASES s.r.l.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Refer to LICENSE for more information.
+ */
+
+package org.mases.knet.connect;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.mases.jcobridge.*;
+import org.mases.knet.connect.source.KNetSourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class KNetConnectProxy {
+ private static final Logger log = LoggerFactory.getLogger(KNetSourceTask.class);
+
+ public static final String DOTNET_CLASSNAME_CONFIG = "knet.dotnet.classname";
+
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(DOTNET_CLASSNAME_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, ".NET class name in the form usable from .NET like \"classname, assembly name\".");
+
+ static JCObject knetConnectProxy = null;
+ static String sinkConnectorName = null;
+ static JCObject sinkConnector = null;
+ static String sourceConnectorName = null;
+ static JCObject sourceConnector = null;
+
+ static synchronized JCObject getConnectProxy() throws JCException, IOException {
+ if (knetConnectProxy == null) {
+ JCOBridge.Initialize();
+ knetConnectProxy = (JCObject) JCOBridge.GetCLRGlobal("KNetConnectProxy");
+ }
+ return knetConnectProxy;
+ }
+
+ public static synchronized boolean initializeSinkConnector(Map props) throws JCException, IOException {
+ AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
+ String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG);
+ if (className == null)
+ throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition");
+ return (boolean) getConnectProxy().Invoke("AllocateSinkConnector", className);
+ }
+
+ public static synchronized boolean initializeSourceConnector(Map props) throws JCException, IOException {
+ AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
+ String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG);
+ if (className == null)
+ throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition");
+ return (boolean) getConnectProxy().Invoke("AllocateSourceConnector", className);
+ }
+
+ public static synchronized JCObject getSinkConnector() throws JCException, IOException {
+ if (sinkConnector == null) {
+ sinkConnectorName = (String) getConnectProxy().Invoke("SinkConnectorName");
+ if (sinkConnectorName != null) {
+ sinkConnector = (JCObject) JCOBridge.GetCLRGlobal(sinkConnectorName);
+ }
+ }
+ return sinkConnector;
+ }
+
+ public static synchronized JCObject getSourceConnector() throws JCException, IOException {
+ if (sourceConnector == null) {
+ sourceConnectorName = (String) getConnectProxy().Invoke("SourceConnectorName");
+ if (sourceConnectorName != null) {
+ sourceConnector = (JCObject) JCOBridge.GetCLRGlobal(sourceConnectorName);
+ }
+ }
+ return sourceConnector;
+ }
+}
diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java
new file mode 100644
index 0000000000..90666a44ee
--- /dev/null
+++ b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2022 MASES s.r.l.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Refer to LICENSE for more information.
+ */
+
+package org.mases.knet.connect.sink;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.mases.jcobridge.*;
+import org.mases.knet.connect.KNetConnectProxy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KNetSinkConnector extends SinkConnector {
+ private static final Logger log = LoggerFactory.getLogger(KNetSinkConnector.class);
+
+ private static final String registrationName = "KNetSinkConnector";
+
+ Object dataToExchange = null;
+
+ public Object getDataToExchange() {
+ return dataToExchange;
+ }
+
+ public void setDataToExchange(Object dte) {
+ dataToExchange = dte;
+ }
+
+ @Override
+ public void start(Map props) {
+ try {
+ if (!KNetConnectProxy.initializeSinkConnector(props)) {
+ log.error("Failed Invoke of \"initializeSinkConnector\"");
+ throw new ConfigException("Failed Invoke of \"initializeSinkConnector\"");
+ } else {
+ JCOBridge.RegisterJVMGlobal(registrationName, this);
+ try {
+ dataToExchange = props;
+ JCObject sink = KNetConnectProxy.getSinkConnector();
+ if (sink == null) throw new ConfigException("getSinkConnector returned null.");
+ sink.Invoke("StartInternal");
+ } finally {
+ dataToExchange = null;
+ }
+ }
+ } catch (JCException | IOException jcne) {
+ log.error("Failed Invoke of \"start\"", jcne);
+ }
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return KNetSinkTask.class;
+ }
+
+ @Override
+ public List