From 0b9e993e3e963e2223b2cf78c1f5a873a1e547f9 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 17 May 2022 00:34:10 +0200 Subject: [PATCH 01/13] #65: added some documentation --- .../Connect/KNetConnectProxy.cs | 29 +++++- .../BridgedClasses/Connect/KNetConnector.cs | 92 +++++++++++++------ .../Connect/KNetSinkConnector.cs | 13 ++- .../BridgedClasses/Connect/KNetSinkTask.cs | 15 ++- .../Connect/KNetSourceConnector.cs | 13 ++- .../BridgedClasses/Connect/KNetSourceTask.cs | 15 ++- .../BridgedClasses/Connect/KNetTask.cs | 54 ++++++++--- 7 files changed, 178 insertions(+), 53 deletions(-) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnectProxy.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnectProxy.cs index 27213c54b9..9bf07a0dcf 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnectProxy.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnectProxy.cs @@ -20,6 +20,9 @@ namespace MASES.KNet.Connect { + /// + /// Internal class used to proxy and pairs data exchange with Java side + /// public class KNetConnectProxy { static readonly object globalInstanceLock = new(); @@ -27,7 +30,9 @@ public class KNetConnectProxy static KNetConnector SinkConnector = null; static KNetConnector SourceConnector = null; - + /// + /// Register the proxy + /// public static void Register() { lock (globalInstanceLock) @@ -39,7 +44,11 @@ public static void Register() } } } - + /// + /// Allocates a sink connector + /// + /// The class name read from Java within the configuration parameters + /// if successfully public bool AllocateSinkConnector(string connectorClassName) { lock (globalInstanceLock) @@ -57,7 +66,11 @@ public bool AllocateSinkConnector(string connectorClassName) } } } - + /// + /// Allocates a source connector + /// + /// The class name read from Java within the configuration parameters + /// if successfully public bool AllocateSourceConnector(string connectorClassName) { lock (globalInstanceLock) @@ -75,7 +88,10 @@ public bool AllocateSourceConnector(string connectorClassName) } } } - + /// + /// Returns the registration name of the sink connector + /// + /// The content of public string SinkConnectorName() { lock (globalInstanceLock) @@ -84,7 +100,10 @@ public string SinkConnectorName() return null; } } - + /// + /// Returns the registration name of the sourcce connector + /// + /// The content of public string SourceConnectorName() { lock (globalInstanceLock) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs index b764f8aed5..f83ebbcc14 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs @@ -26,36 +26,65 @@ namespace MASES.KNet.Connect { + /// + /// Specific implementation of to support KNet Connect SDK + /// public interface IKNetConnector : IConnector { + /// + /// Allocates a task object based on + /// + /// The unique id generated from JAva side + /// The local .NET object object AllocateTask(long taskId); - + /// + /// The unique name of the connector + /// string ConnectorName { get; } - + /// + /// The of task to be allocated, it shall inherits from + /// Type TaskClassType { get; } + /// + /// Invoked during allocation of tasks from Apache Kafka Connect + /// + /// The actual index + /// The to be filled in with properties for the task: the same will be received from + void TaskConfigs(int index, Map config); } - + /// + /// The generic class which is the base of both source or sink connectors + /// public abstract class KNetConnector : IKNetConnector { - protected ConnectorContext ctx; + /// + /// The set of allocated with their associated identifiers + /// protected ConcurrentDictionary taskDictionary = new(); IJavaObject reflectedConnector = null; - + /// + /// Initializer + /// public KNetConnector() { - KNetCore.GlobalInstance.RegisterCLRGlobal(ReflectedConnectorName, this); + KNetCore.GlobalInstance.RegisterCLRGlobal(ReflectedConnectorClassName, this); } - + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// protected T DataToExchange() { if (reflectedConnector == null) { - reflectedConnector = KNetCore.GlobalInstance.GetJVMGlobal(ReflectedConnectorName); + reflectedConnector = KNetCore.GlobalInstance.GetJVMGlobal(ReflectedConnectorClassName); } - return (reflectedConnector != null) ? reflectedConnector.Invoke("getDataToExchange") : throw new InvalidOperationException($"{ReflectedConnectorName} was not registered in global JVM"); + return (reflectedConnector != null) ? reflectedConnector.Invoke("getDataToExchange") : throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM"); } - + /// public object AllocateTask(long taskId) { return taskDictionary.GetOrAdd(taskId, (id) => @@ -65,53 +94,60 @@ public object AllocateTask(long taskId) return knetTask; }); } - - public abstract string ReflectedConnectorName { get; } - + /// + /// The unique name used to map objects between JVM and .NET + /// + public abstract string ReflectedConnectorClassName { get; } + /// public abstract string ConnectorName { get; } - + /// public abstract Type TaskClassType { get; } public void Initialize(ConnectorContext ctx) => throw new NotImplementedException("Invoked in Java before any initialization."); public void Initialize(ConnectorContext ctx, List> taskConfigs) => throw new NotImplementedException("Invoked in Java before any initialization."); - + /// + /// Public method used from Java to trigger + /// public void StartInternal() { Map props = DataToExchange>(); Start(props); } - + /// + /// Implement the method to execute the start action + /// + /// The set of properties returned from Apache Kafka Connect framework: the contains the same info from configuration file. public abstract void Start(Map props); - public void Reconfigure(Map props) { } + public void Reconfigure(Map props) => throw new NotImplementedException("Invoked in Java before any initialization."); public Class TaskClass() => throw new NotImplementedException("Invoked in Java before any initialization."); - + /// + /// Public method used from Java to trigger + /// public void TaskConfigsInternal(int index) { Map props = DataToExchange>(); TaskConfigs(index, props); } - + /// public abstract void TaskConfigs(int index, Map config); public List> TaskConfigs(int maxTasks) => throw new NotImplementedException("Invoked using the other signature."); - + /// + /// Public method used from Java to trigger + /// public void StopInternal() { Stop(); } - + /// + /// Implement the method to execute the stop action + /// public abstract void Stop(); - public object ValidateInternal(object connectorConfigsObj) - { - Map connectorConfigs = DataToExchange>(); - return Validate(connectorConfigs); - } - - public abstract Config Validate(Map connectorConfigs); + public Config Validate(Map connectorConfigs) => throw new NotImplementedException("Invoked in Java before any initialization."); public ConfigDef Config() => throw new NotImplementedException("Invoked in Java before any initialization."); diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs index 90c0eaf072..4170b533d7 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs @@ -20,10 +20,19 @@ namespace MASES.KNet.Connect { + /// + /// An implementation of for sink connectors + /// + /// The task class inherited from public abstract class KNetSinkConnector : KNetConnector where TTask : KNetSinkTask { - public sealed override string ReflectedConnectorName => "KNetSinkConnector"; - + /// + /// Set the of the connector to a fixed value + /// + public sealed override string ReflectedConnectorClassName => "KNetSinkConnector"; + /// + /// Set the of the connector to the value defined from + /// public sealed override Type TaskClassType => typeof(TTask); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs index c9b82f0024..aa84fb7a80 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs @@ -21,16 +21,27 @@ namespace MASES.KNet.Connect { + /// + /// An implementation of for sink task + /// public abstract class KNetSinkTask : KNetTask { + /// + /// Set the of the connector to a fixed value + /// public override string ReflectedTaskClassName => "KNetSinkTask"; - + /// + /// Public method used from Java to trigger + /// public void PutInternal() { Collection collection = DataToExchange>(); Put(collection); } - + /// + /// Implement the method to execute the Put action + /// + /// The set of from Apache Kafka Connect framework public abstract void Put(Collection collection); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs index 028358b882..ecfb4d6dd0 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs @@ -20,10 +20,19 @@ namespace MASES.KNet.Connect { + /// + /// An implementation of for source connectors + /// + /// The task class inherited from public abstract class KNetSourceConnector : KNetConnector where TTask : KNetSourceTask { - public sealed override string ReflectedConnectorName => "KNetSourceConnector"; - + /// + /// Set the of the connector to a fixed value + /// + public sealed override string ReflectedConnectorClassName => "KNetSourceConnector"; + /// + /// Set the of the connector to the value defined from + /// public sealed override Type TaskClassType => typeof(TTask); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs index 1149505862..0f62934237 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs @@ -21,15 +21,26 @@ namespace MASES.KNet.Connect { + /// + /// An implementation of for source task + /// public abstract class KNetSourceTask : KNetTask { + /// + /// Set the of the connector to a fixed value + /// public override string ReflectedTaskClassName => "KNetSourceTask"; - + /// + /// Public method used from Java to trigger + /// public List PollInternal() { return Poll(); } - + /// + /// Implement the method to execute the Poll action + /// + /// The list of to return to Apache Kafka Connect framework public abstract List Poll(); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs index a387a287c1..eed1a3cdb8 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs @@ -23,13 +23,23 @@ namespace MASES.KNet.Connect { + /// + /// Specific implementation of to support KNet Connect SDK + /// public interface IKNetTask : ITask { + /// + /// The associated + /// IKNetConnector Connector { get; } - + /// + /// The id received during initialization + /// long TaskId { get; } } - + /// + /// The generic class which is the base of both source or sink task + /// public abstract class KNetTask : IKNetTask { IKNetConnector connector; @@ -42,38 +52,58 @@ internal void Initialize(IKNetConnector connector, long taskId) this.taskId = taskId; reflectedTask = KNetCore.GlobalInstance.GetJVMGlobal($"{ReflectedTaskClassName}_{taskId}"); } - + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// protected T DataToExchange() { return (reflectedTask != null) ? reflectedTask.Invoke("getDataToExchange") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); } - + /// public IKNetConnector Connector => connector; - + /// public long TaskId => taskId; - + /// + /// The unique name used to map objects between JVM and .NET + /// public abstract string ReflectedTaskClassName { get; } - + /// + /// Public method used from Java to trigger + /// public void StartInternal() { Map props = DataToExchange>(); Start(props); } - + /// + /// Implement the method to execute the start action + /// + /// The set of properties returned from Apache Kafka Connect framework: the contains the info from . public abstract void Start(Map props); - + /// + /// Public method used from Java to trigger + /// public void StopInternal() { Stop(); } - + /// + /// Implement the method to execute the stop action + /// public abstract void Stop(); - + /// + /// Public method used from Java to trigger + /// public object VersionInternal() { return Version(); } - + /// + /// Implement the method to execute the version action + /// public abstract string Version(); } } From 8ed53217156311bb825872dccdf4b81a013219b3 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 17 May 2022 00:43:48 +0200 Subject: [PATCH 02/13] Minor alignment --- .../templates/knetConnectSink/KNetConnectSink.cs | 6 ------ .../templates/knetConnectSource/KNetConnectSource.cs | 6 ------ .../knetConsumerApp/.template.config/template.json | 2 +- .../knetPipeStreamApp/.template.config/template.json | 2 +- .../knetProducerApp/.template.config/template.json | 2 +- tests/KNetConnectTest/KnetConnectSink.cs | 12 ++++-------- tests/KNetConnectTest/KnetConnectSource.cs | 11 +++-------- 7 files changed, 10 insertions(+), 31 deletions(-) diff --git a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs index de5b364c5b..6bd346d1ef 100644 --- a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs +++ b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs @@ -1,5 +1,4 @@ using Java.Util; -using MASES.KNet.Common.Config; using MASES.KNet.Connect; using MASES.KNet.Connect.Sink; @@ -23,11 +22,6 @@ public override void TaskConfigs(int index, Map config) { } - - public override Config Validate(Map connectorConfigs) - { - return null; - } } public class KnetConnectSinkTask : KNetSinkTask diff --git a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs index 450f94c4a6..d80c634b66 100644 --- a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs +++ b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs @@ -1,5 +1,4 @@ using Java.Util; -using MASES.KNet.Common.Config; using MASES.KNet.Connect; using MASES.KNet.Connect.Source; @@ -23,11 +22,6 @@ public override void TaskConfigs(int index, Map config) { } - - public override Config Validate(Map connectorConfigs) - { - return null; - } } public class KnetConnectSourceTask : KNetSourceTask diff --git a/src/net/templates/templates/knetConsumerApp/.template.config/template.json b/src/net/templates/templates/knetConsumerApp/.template.config/template.json index c958f01f35..b11533fe5b 100644 --- a/src/net/templates/templates/knetConsumerApp/.template.config/template.json +++ b/src/net/templates/templates/knetConsumerApp/.template.config/template.json @@ -1,7 +1,7 @@ { "$schema": "http://json.schemastore.org/template", "author": "MASES s.r.l.", - "classifications": [ "Common", "Console", "C#8" ], + "classifications": [ "Common", "Console" ], "identity": "MASES.KNetTemplate.KNetConsumer", "name": "Console templates: KNet Consumer project", "shortName": "knetConsumerApp", diff --git a/src/net/templates/templates/knetPipeStreamApp/.template.config/template.json b/src/net/templates/templates/knetPipeStreamApp/.template.config/template.json index 2fb158666d..27c0467d61 100644 --- a/src/net/templates/templates/knetPipeStreamApp/.template.config/template.json +++ b/src/net/templates/templates/knetPipeStreamApp/.template.config/template.json @@ -1,7 +1,7 @@ { "$schema": "http://json.schemastore.org/template", "author": "MASES s.r.l.", - "classifications": [ "Common", "Console", "C#8" ], + "classifications": [ "Common", "Console" ], "identity": "MASES.KNetTemplate.KNetPipeStream", "name": "Console templates: KNet Pipe Stream project", "shortName": "knetPipeStreamApp", diff --git a/src/net/templates/templates/knetProducerApp/.template.config/template.json b/src/net/templates/templates/knetProducerApp/.template.config/template.json index 3342f5e785..d0ecf3aaae 100644 --- a/src/net/templates/templates/knetProducerApp/.template.config/template.json +++ b/src/net/templates/templates/knetProducerApp/.template.config/template.json @@ -1,7 +1,7 @@ { "$schema": "http://json.schemastore.org/template", "author": "MASES s.r.l.", - "classifications": [ "Common", "Console", "C#8" ], + "classifications": [ "Common", "Console" ], "identity": "MASES.KNetTemplate.KNetProducer", "name": "Console templates: KNet Producer project", "shortName": "knetProducerApp", diff --git a/tests/KNetConnectTest/KnetConnectSink.cs b/tests/KNetConnectTest/KnetConnectSink.cs index 014c46c69b..0e62bc6b02 100644 --- a/tests/KNetConnectTest/KnetConnectSink.cs +++ b/tests/KNetConnectTest/KnetConnectSink.cs @@ -23,9 +23,9 @@ namespace MASES.KNetConnectTest { - public class KnetSinkTestConnector : KNetSinkConnector + public class KNetSinkTestConnector : KNetSinkConnector { - public override string ConnectorName => "MASES.KNetConnectTest.KnetSinkTestConnector"; + public override string ConnectorName => "MASES.KNetConnectTest.KNetSinkTestConnector"; public override void Start(Map props) { @@ -42,13 +42,9 @@ public override void TaskConfigs(int index, Map config) } - public override Config Validate(Map connectorConfigs) - { - return null; - } } - public class KnetSinkTestTask : KNetSinkTask + public class KNetSinkTestTask : KNetSinkTask { public override void Put(Collection collection) { @@ -67,7 +63,7 @@ public override void Stop() public override string Version() { - return "KnetSinkTestTask"; + return "KNetSinkTestTask"; } } } diff --git a/tests/KNetConnectTest/KnetConnectSource.cs b/tests/KNetConnectTest/KnetConnectSource.cs index 4b98c3699d..46bdaca7b8 100644 --- a/tests/KNetConnectTest/KnetConnectSource.cs +++ b/tests/KNetConnectTest/KnetConnectSource.cs @@ -23,9 +23,9 @@ namespace MASES.KNetConnectTest { - public class KnetSourceTestConnector : KNetSourceConnector + public class KNetSourceTestConnector : KNetSourceConnector { - public override string ConnectorName => "MASES.KNetConnectTest.KnetSourceTestConnector"; + public override string ConnectorName => "MASES.KNetConnectTest.KNetSourceTestConnector"; public override void Start(Map props) { @@ -41,14 +41,9 @@ public override void TaskConfigs(int index, Map config) { } - - public override Config Validate(Map connectorConfigs) - { - return null; - } } - public class KnetSourceTestTask : KNetSourceTask + public class KNetSourceTestTask : KNetSourceTask { public override List Poll() { From 047a37cad460460579ea875606d8fc746524d317 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 17 May 2022 01:25:55 +0200 Subject: [PATCH 03/13] #65: Added various contexts usable within SDK --- .../mases/knet/connect/sink/KNetSinkConnector.java | 5 +++++ .../org/mases/knet/connect/sink/KNetSinkTask.java | 4 ++++ .../knet/connect/source/KNetSourceConnector.java | 5 +++++ .../mases/knet/connect/source/KNetSourceTask.java | 5 +++++ .../BridgedClasses/Connect/KNetConnector.cs | 14 ++++++++++++++ .../BridgedClasses/Connect/KNetSinkConnector.cs | 6 ++++++ .../BridgedClasses/Connect/KNetSinkTask.cs | 4 ++++ .../BridgedClasses/Connect/KNetSourceConnector.cs | 5 +++++ .../BridgedClasses/Connect/KNetSourceTask.cs | 4 ++++ .../ClientSide/BridgedClasses/Connect/KNetTask.cs | 10 ++++++++++ 10 files changed, 62 insertions(+) diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java index 90666a44ee..10e3b81700 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkConnectorContext; import org.mases.jcobridge.*; import org.mases.knet.connect.KNetConnectProxy; import org.slf4j.Logger; @@ -48,6 +49,10 @@ public void setDataToExchange(Object dte) { dataToExchange = dte; } + public SinkConnectorContext getContext() { + return context(); + } + @Override public void start(Map props) { try { diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java index 7757237b8f..992defd65b 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java @@ -57,6 +57,10 @@ public KNetSinkTask() throws ConfigException, JCException, IOException { sinkTask = (JCObject) sink.Invoke("AllocateTask", taskid); } + public SinkTaskContext getContext() { + return context; + } + @Override public String version() { try { diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java index 669c033c6f..00cd3b00e8 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceConnectorContext; import org.mases.jcobridge.*; import org.mases.knet.connect.KNetConnectProxy; import org.slf4j.Logger; @@ -48,6 +49,10 @@ public void setDataToExchange(Object dte) { dataToExchange = dte; } + public SourceConnectorContext getContext() { + return context(); + } + @Override public void start(Map props) { try { diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java index b57bfd60f4..fb6aa56a54 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTaskContext; import org.mases.jcobridge.*; import org.mases.knet.connect.KNetConnectProxy; import org.slf4j.Logger; @@ -56,6 +57,10 @@ public KNetSourceTask() throws ConfigException, JCException, IOException { sourceTask = (JCObject) source.Invoke("AllocateTask", taskid); } + public SourceTaskContext getContext() { + return context; + } + @Override public String version() { try { diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs index f83ebbcc14..acfb359cd2 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs @@ -84,6 +84,20 @@ protected T DataToExchange() } return (reflectedConnector != null) ? reflectedConnector.Invoke("getDataToExchange") : throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM"); } + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// + protected T Context() + { + if (reflectedConnector == null) + { + reflectedConnector = KNetCore.GlobalInstance.GetJVMGlobal(ReflectedConnectorClassName); + } + return (reflectedConnector != null) ? reflectedConnector.Invoke("getContext") : throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM"); + } /// public object AllocateTask(long taskId) { diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs index 4170b533d7..1f87e501e4 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs @@ -16,6 +16,8 @@ * Refer to LICENSE for more information. */ +using MASES.KNet.Connect.Sink; +using MASES.KNet.Connect.Source; using System; namespace MASES.KNet.Connect @@ -26,6 +28,10 @@ namespace MASES.KNet.Connect /// The task class inherited from public abstract class KNetSinkConnector : KNetConnector where TTask : KNetSinkTask { + /// + /// The + /// + public SinkConnectorContext Context => Context(); /// /// Set the of the connector to a fixed value /// diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs index aa84fb7a80..4c2b9f75d0 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs @@ -26,6 +26,10 @@ namespace MASES.KNet.Connect /// public abstract class KNetSinkTask : KNetTask { + /// + /// The + /// + public SinkTaskContext Context => Context(); /// /// Set the of the connector to a fixed value /// diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs index ecfb4d6dd0..bc013b6c60 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs @@ -16,6 +16,7 @@ * Refer to LICENSE for more information. */ +using MASES.KNet.Connect.Source; using System; namespace MASES.KNet.Connect @@ -26,6 +27,10 @@ namespace MASES.KNet.Connect /// The task class inherited from public abstract class KNetSourceConnector : KNetConnector where TTask : KNetSourceTask { + /// + /// The + /// + public SourceConnectorContext Context => Context(); /// /// Set the of the connector to a fixed value /// diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs index 0f62934237..c0d80aa7aa 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs @@ -26,6 +26,10 @@ namespace MASES.KNet.Connect /// public abstract class KNetSourceTask : KNetTask { + /// + /// The + /// + public SourceTaskContext Context => Context(); /// /// Set the of the connector to a fixed value /// diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs index eed1a3cdb8..eee69bea19 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs @@ -62,6 +62,16 @@ protected T DataToExchange() { return (reflectedTask != null) ? reflectedTask.Invoke("getDataToExchange") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); } + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// + protected T Context() + { + return (reflectedTask != null) ? reflectedTask.Invoke("getContext") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); + } /// public IKNetConnector Connector => connector; /// From 91f2436f360ff4f58ac4c9e74a34a82dbde9482a Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 17 May 2022 02:22:34 +0200 Subject: [PATCH 04/13] #65: update data exchange --- .../knet/connect/source/KNetSourceTask.java | 9 +++++++-- .../BridgedClasses/Connect/KNetConnector.cs | 18 ++++++++++++++++++ .../BridgedClasses/Connect/KNetSourceTask.cs | 5 +++-- .../BridgedClasses/Connect/KNetTask.cs | 18 ++++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java index fb6aa56a54..3055c45f85 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java @@ -90,8 +90,13 @@ public void start(Map map) { @Override public List poll() throws InterruptedException { try { - Object result = sourceTask.Invoke("PollInternal"); - if (result != null) return (List) result; + try { + dataToExchange = null; + sourceTask.Invoke("PollInternal"); + if (dataToExchange != null) return (List) dataToExchange; + } finally { + dataToExchange = null; + } } catch (JCNativeException jcne) { log.error("Failed Invoke of \"poll\"", jcne); } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs index acfb359cd2..34e643076b 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs @@ -90,6 +90,24 @@ protected T DataToExchange() /// The expected return /// The /// + protected void DataToExchange(object data) + { + if (reflectedConnector != null) + { + JCOBridge.C2JBridge.IJVMBridgeBase jvmBBD = data as JCOBridge.C2JBridge.IJVMBridgeBase; + reflectedConnector.Invoke("setDataToExchange", jvmBBD != null ? jvmBBD.Instance : data); + } + else + { + throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM"); + } + } + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// protected T Context() { if (reflectedConnector == null) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs index c0d80aa7aa..ecbbb6d305 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs @@ -37,9 +37,10 @@ public abstract class KNetSourceTask : KNetTask /// /// Public method used from Java to trigger /// - public List PollInternal() + public void PollInternal() { - return Poll(); + var result = Poll(); + DataToExchange(result); } /// /// Implement the method to execute the Poll action diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs index eee69bea19..6693e270ac 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs @@ -68,6 +68,24 @@ protected T DataToExchange() /// The expected return /// The /// + protected void DataToExchange(object data) + { + if (reflectedTask != null) + { + JCOBridge.C2JBridge.IJVMBridgeBase jvmBBD = data as JCOBridge.C2JBridge.IJVMBridgeBase; + reflectedTask.Invoke("setDataToExchange", jvmBBD != null ? jvmBBD.Instance : data); + } + else + { + throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); + } + } + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// protected T Context() { return (reflectedTask != null) ? reflectedTask.Invoke("getContext") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); From 13660bd2090cbbdc98cfbab9f3992f92a90f73f4 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 17 May 2022 02:23:12 +0200 Subject: [PATCH 05/13] Minor fix and updates --- .../mases/knet/connect/KNetConnectProxy.java | 4 +-- .../BridgedClasses/Connect/Data/Schema.cs | 25 +++++++++---------- tests/KNetConnectTest/KnetConnectSink.cs | 1 - tests/KNetConnectTest/KnetConnectSource.cs | 4 ++- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java b/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java index 775a8cd20e..e233c1c1df 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java @@ -55,7 +55,7 @@ public static synchronized boolean initializeSinkConnector(Map p AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props); String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG); if (className == null) - throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition"); + throw new ConfigException("'knet.dotnet.classname' in KNetSinkConnector configuration requires a definition"); return (boolean) getConnectProxy().Invoke("AllocateSinkConnector", className); } @@ -63,7 +63,7 @@ public static synchronized boolean initializeSourceConnector(Map AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props); String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG); if (className == null) - throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition"); + throw new ConfigException("'knet.dotnet.classname' in KNetSinkConnector configuration requires a definition"); return (boolean) getConnectProxy().Invoke("AllocateSourceConnector", className); } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs index a60b12d4a9..f7a27eb780 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs @@ -18,7 +18,6 @@ using MASES.JCOBridge.C2JBridge; using Java.Util; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -88,19 +87,19 @@ public enum Type public interface ISchema : IJVMBridgeBase { - Type Type(); + Type Type { get; } bool IsOptional { get; } - JavaLang.Object DefaultValue(); + Java.Lang.Object DefaultValue { get; } - string Name(); + string Name { get; } - int Version(); + int Version { get; } - string Doc(); + string Doc { get; } - Map Parameters(); + Map Parameters { get; } Schema KeySchema { get; } @@ -128,19 +127,19 @@ public Schema(params object[] args) { } - public Type Type() { return IExecute("type"); } + public Type Type => IExecute("type"); public bool IsOptional => IExecute("isOptional"); - public JavaLang.Object DefaultValue() { return IExecute("defaultValue"); } + public Java.Lang.Object DefaultValue => IExecute("defaultValue"); - public string Name() { return IExecute("name"); } + public string Name => IExecute("name"); - public int Version() { return IExecute("version"); } + public int Version => IExecute("version"); - public string Doc() { return IExecute("doc"); } + public string Doc => IExecute("doc"); - public Map Parameters() { return IExecute>("parameters"); } + public Map Parameters => IExecute>("parameters"); public Schema KeySchema => IExecute("keySchema"); diff --git a/tests/KNetConnectTest/KnetConnectSink.cs b/tests/KNetConnectTest/KnetConnectSink.cs index 0e62bc6b02..489b9f1731 100644 --- a/tests/KNetConnectTest/KnetConnectSink.cs +++ b/tests/KNetConnectTest/KnetConnectSink.cs @@ -41,7 +41,6 @@ public override void TaskConfigs(int index, Map config) { } - } public class KNetSinkTestTask : KNetSinkTask diff --git a/tests/KNetConnectTest/KnetConnectSource.cs b/tests/KNetConnectTest/KnetConnectSource.cs index 46bdaca7b8..f88dac270e 100644 --- a/tests/KNetConnectTest/KnetConnectSource.cs +++ b/tests/KNetConnectTest/KnetConnectSource.cs @@ -47,7 +47,9 @@ public class KNetSourceTestTask : KNetSourceTask { public override List Poll() { - return null; + ArrayList records = new(); + + return records; } public override void Start(Map props) From 2b0b99ade8a0cf7d49177ed6acc5642e7b057521 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 17 May 2022 02:44:45 +0200 Subject: [PATCH 06/13] Class fix --- .../BridgedClasses/Connect/Data/Schema.cs | 46 ++++++++++++++----- .../Connect/Data/SchemaAndValue.cs | 5 +- .../Connect/Data/SchemaBuilder.cs | 2 + 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs index f7a27eb780..8c32a26a95 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs @@ -87,19 +87,19 @@ public enum Type public interface ISchema : IJVMBridgeBase { - Type Type { get; } + Type Type(); bool IsOptional { get; } - Java.Lang.Object DefaultValue { get; } + Java.Lang.Object DefaultValue(); - string Name { get; } + string Name(); - int Version { get; } + int Version(); - string Doc { get; } + string Doc(); - Map Parameters { get; } + Map Parameters(); Schema KeySchema { get; } @@ -114,6 +114,8 @@ public interface ISchema : IJVMBridgeBase public class Schema : JVMBridgeBase, ISchema { + public override bool IsInterface => true; + public override string ClassName => "org.apache.kafka.connect.data.Schema"; [System.Obsolete("This is not public in Apache Kafka API")] @@ -127,19 +129,39 @@ public Schema(params object[] args) { } - public Type Type => IExecute("type"); + public static Schema INT8_SCHEMA => SExecute("INT8_SCHEMA"); + public static Schema INT16_SCHEMA => SExecute("INT16_SCHEMA"); + public static Schema INT32_SCHEMA => SExecute("INT32_SCHEMA"); + public static Schema INT64_SCHEMA => SExecute("INT64_SCHEMA"); + public static Schema FLOAT32_SCHEMA => SExecute("FLOAT32_SCHEMA"); + public static Schema FLOAT64_SCHEMA => SExecute("FLOAT64_SCHEMA"); + public static Schema BOOLEAN_SCHEMA => SExecute("BOOLEAN_SCHEMA"); + public static Schema STRING_SCHEMA => SExecute("STRING_SCHEMA"); + public static Schema BYTES_SCHEMA => SExecute("BYTES_SCHEMA"); + + public static Schema OPTIONAL_INT8_SCHEMA => SExecute("OPTIONAL_INT8_SCHEMA"); + public static Schema OPTIONAL_INT16_SCHEMA => SExecute("OPTIONAL_INT16_SCHEMA"); + public static Schema OPTIONAL_INT32_SCHEMA => SExecute("OPTIONAL_INT32_SCHEMA"); + public static Schema OPTIONAL_INT64_SCHEMA => SExecute("OPTIONAL_INT64_SCHEMA"); + public static Schema OPTIONAL_FLOAT32_SCHEMA => SExecute("OPTIONAL_FLOAT32_SCHEMA"); + public static Schema OPTIONAL_FLOAT64_SCHEMA => SExecute("OPTIONAL_FLOAT64_SCHEMA"); + public static Schema OPTIONAL_BOOLEAN_SCHEMA => SExecute("OPTIONAL_BOOLEAN_SCHEMA"); + public static Schema OPTIONAL_STRING_SCHEMA => SExecute("OPTIONAL_STRING_SCHEMA"); + public static Schema OPTIONAL_BYTES_SCHEMA => SExecute("OPTIONAL_BYTES_SCHEMA"); + + public Type Type() => IExecute("type"); public bool IsOptional => IExecute("isOptional"); - public Java.Lang.Object DefaultValue => IExecute("defaultValue"); + public Java.Lang.Object DefaultValue() => IExecute("defaultValue"); - public string Name => IExecute("name"); + public string Name() => IExecute("name"); - public int Version => IExecute("version"); + public int Version() => IExecute("version"); - public string Doc => IExecute("doc"); + public string Doc() => IExecute("doc"); - public Map Parameters => IExecute>("parameters"); + public Map Parameters() => IExecute>("parameters"); public Schema KeySchema => IExecute("keySchema"); diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaAndValue.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaAndValue.cs index fa9f41cc40..8159beae6e 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaAndValue.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaAndValue.cs @@ -17,7 +17,6 @@ */ using MASES.JCOBridge.C2JBridge; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -31,13 +30,13 @@ public SchemaAndValue() { } - public SchemaAndValue(Schema schema, JavaLang.Object value) + public SchemaAndValue(Schema schema, Java.Lang.Object value) :base(schema, value) { } public Schema Schema => IExecute("schema"); - public JavaLang.Object Value => IExecute("value"); + public Java.Lang.Object Value => IExecute("value"); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaBuilder.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaBuilder.cs index a644168e19..fd2bc3be77 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaBuilder.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaBuilder.cs @@ -22,6 +22,8 @@ namespace MASES.KNet.Connect.Data { public class SchemaBuilder : Schema { + public override bool IsInterface => false; + public override string ClassName => "org.apache.kafka.connect.data.SchemaBuilder"; [System.Obsolete("This is not public in Apache Kafka API")] From db6dcbcaf75c2a8bc0fec8367b0da69678b0f96d Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 17 May 2022 02:48:46 +0200 Subject: [PATCH 07/13] #65: added support functions --- .../KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs index ecbbb6d305..2e1ab6d5b7 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs @@ -26,6 +26,10 @@ namespace MASES.KNet.Connect /// public abstract class KNetSourceTask : KNetTask { + protected Map OffsetForKey(string key, V value) => Collections.SingletonMap(key, value); + + protected Map OffsetAt(string key, V value) => Context.OffsetStorageReader.Offset(Collections.SingletonMap(key, value)); + /// /// The /// From c9d448a79a84aa226f657bad449e06f8281abdda Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 18 May 2022 00:39:09 +0200 Subject: [PATCH 08/13] Update exception used in methods --- .../java/org/mases/knet/connect/KNetConnectProxy.java | 2 +- .../org/mases/knet/connect/sink/KNetSinkConnector.java | 10 +++++----- .../java/org/mases/knet/connect/sink/KNetSinkTask.java | 6 +++--- .../mases/knet/connect/source/KNetSourceConnector.java | 10 +++++----- .../org/mases/knet/connect/source/KNetSourceTask.java | 6 +++--- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java b/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java index e233c1c1df..8a02a1d77a 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java @@ -63,7 +63,7 @@ public static synchronized boolean initializeSourceConnector(Map AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props); String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG); if (className == null) - throw new ConfigException("'knet.dotnet.classname' in KNetSinkConnector configuration requires a definition"); + throw new ConfigException("'knet.dotnet.classname' in KNetSourceConnector configuration requires a definition"); return (boolean) getConnectProxy().Invoke("AllocateSourceConnector", className); } diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java index 10e3b81700..a495da7780 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java @@ -19,8 +19,8 @@ package org.mases.knet.connect.sink; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkConnectorContext; import org.mases.jcobridge.*; @@ -58,13 +58,13 @@ public void start(Map props) { try { if (!KNetConnectProxy.initializeSinkConnector(props)) { log.error("Failed Invoke of \"initializeSinkConnector\""); - throw new ConfigException("Failed Invoke of \"initializeSinkConnector\""); + throw new ConnectException("Failed Invoke of \"initializeSinkConnector\""); } else { JCOBridge.RegisterJVMGlobal(registrationName, this); try { dataToExchange = props; JCObject sink = KNetConnectProxy.getSinkConnector(); - if (sink == null) throw new ConfigException("getSinkConnector returned null."); + if (sink == null) throw new ConnectException("getSinkConnector returned null."); sink.Invoke("StartInternal"); } finally { dataToExchange = null; @@ -88,7 +88,7 @@ public List> taskConfigs(int maxTasks) { try { dataToExchange = config; JCObject sink = KNetConnectProxy.getSinkConnector(); - if (sink == null) throw new ConfigException("getSinkConnector returned null."); + if (sink == null) throw new ConnectException("getSinkConnector returned null."); sink.Invoke("TaskConfigsInternal", i); } catch (JCException | IOException jcne) { log.error("Failed Invoke of \"start\"", jcne); @@ -105,7 +105,7 @@ public void stop() { try { try { JCObject sink = KNetConnectProxy.getSinkConnector(); - if (sink == null) throw new ConfigException("getSinkConnector returned null."); + if (sink == null) throw new ConnectException("getSinkConnector returned null."); sink.Invoke("StopInternal"); } finally { JCOBridge.UnregisterJVMGlobal(registrationName); diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java index 992defd65b..005c5ef4af 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java @@ -18,7 +18,7 @@ package org.mases.knet.connect.sink; -import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.sink.SinkTaskContext; @@ -48,12 +48,12 @@ public void setDataToExchange(Object dte) { dataToExchange = dte; } - public KNetSinkTask() throws ConfigException, JCException, IOException { + public KNetSinkTask() throws ConnectException, JCException, IOException { super(); long taskid = taskId.incrementAndGet(); JCOBridge.RegisterJVMGlobal(String.format("KNetSinkTask_%d", taskid), this); JCObject sink = KNetConnectProxy.getSinkConnector(); - if (sink == null) throw new ConfigException("getSinkConnector returned null."); + if (sink == null) throw new ConnectException("getSinkConnector returned null."); sinkTask = (JCObject) sink.Invoke("AllocateTask", taskid); } diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java index 00cd3b00e8..28a141e29a 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java @@ -19,8 +19,8 @@ package org.mases.knet.connect.source; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceConnectorContext; import org.mases.jcobridge.*; @@ -58,13 +58,13 @@ public void start(Map props) { try { if (!KNetConnectProxy.initializeSourceConnector(props)) { log.error("Failed Invoke of \"initializeSourceConnector\""); - throw new ConfigException("Failed Invoke of \"initializeSourceConnector\""); + throw new ConnectException("Failed Invoke of \"initializeSourceConnector\""); } else { JCOBridge.RegisterJVMGlobal(registrationName, this); try { dataToExchange = props; JCObject source = KNetConnectProxy.getSourceConnector(); - if (source == null) throw new ConfigException("getSourceConnector returned null."); + if (source == null) throw new ConnectException("getSourceConnector returned null."); source.Invoke("StartInternal"); } finally { dataToExchange = null; @@ -88,7 +88,7 @@ public List> taskConfigs(int maxTasks) { try { dataToExchange = config; JCObject source = KNetConnectProxy.getSourceConnector(); - if (source == null) throw new ConfigException("getSourceConnector returned null."); + if (source == null) throw new ConnectException("getSourceConnector returned null."); source.Invoke("TaskConfigsInternal", i); } catch (JCException | IOException jcne) { log.error("Failed Invoke of \"start\"", jcne); @@ -105,7 +105,7 @@ public void stop() { try { try { JCObject source = KNetConnectProxy.getSourceConnector(); - if (source == null) throw new ConfigException("getSourceConnector returned null."); + if (source == null) throw new ConnectException("getSourceConnector returned null."); source.Invoke("StopInternal"); } finally { JCOBridge.UnregisterJVMGlobal(registrationName); diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java index 3055c45f85..8d9e26df12 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java @@ -18,7 +18,7 @@ package org.mases.knet.connect.source; -import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; @@ -48,12 +48,12 @@ public void setDataToExchange(Object dte) { dataToExchange = dte; } - public KNetSourceTask() throws ConfigException, JCException, IOException { + public KNetSourceTask() throws ConnectException, JCException, IOException { super(); long taskid = taskId.incrementAndGet(); JCOBridge.RegisterJVMGlobal(String.format("KNetSourceTask_%d", taskid), this); JCObject source = KNetConnectProxy.getSourceConnector(); - if (source == null) throw new ConfigException("getSourceConnector returned null."); + if (source == null) throw new ConnectException("getSourceConnector returned null."); sourceTask = (JCObject) source.Invoke("AllocateTask", taskid); } From 6ef6345633afcda40a977a13f2b6c324294fe247 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 18 May 2022 01:13:08 +0200 Subject: [PATCH 09/13] #65: General review of .NET side Connect SDK classes --- .../BridgedClasses/Connect/KNetConnector.cs | 12 ++++++++++++ .../BridgedClasses/Connect/KNetSinkConnector.cs | 14 ++++++++------ .../BridgedClasses/Connect/KNetSinkTask.cs | 6 ++++-- .../BridgedClasses/Connect/KNetSourceConnector.cs | 9 ++++++--- .../BridgedClasses/Connect/KNetSourceTask.cs | 6 ++++-- .../ClientSide/BridgedClasses/Connect/KNetTask.cs | 12 ++++++++++++ 6 files changed, 46 insertions(+), 13 deletions(-) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs index 34e643076b..0079de2926 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs @@ -185,4 +185,16 @@ public void StopInternal() public string Version() => throw new NotImplementedException("Invoked in Java before any initialization."); } + /// + /// The base connector class which is the base of both source or sink connectors and receives information about implementing class with + /// + /// The class which extends + public abstract class KNetConnector : KNetConnector + where TConnector : KNetConnector + { + /// + /// Set the of the connector to the value defined from + /// + public override string ConnectorName => typeof(TConnector).FullName; + } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs index 1f87e501e4..a63f5458a7 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs @@ -17,27 +17,29 @@ */ using MASES.KNet.Connect.Sink; -using MASES.KNet.Connect.Source; using System; namespace MASES.KNet.Connect { /// - /// An implementation of for sink connectors + /// An implementation of for sink connectors /// - /// The task class inherited from - public abstract class KNetSinkConnector : KNetConnector where TTask : KNetSinkTask + /// The connector class inherited from + /// The task class inherited from + public abstract class KNetSinkConnector : KNetConnector + where TSinkConnector : KNetSinkConnector + where TTask : KNetSinkTask { /// /// The /// public SinkConnectorContext Context => Context(); /// - /// Set the of the connector to a fixed value + /// Set the of the connector to a fixed value /// public sealed override string ReflectedConnectorClassName => "KNetSinkConnector"; /// - /// Set the of the connector to the value defined from + /// Set the of the connector to the value defined from /// public sealed override Type TaskClassType => typeof(TTask); } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs index 4c2b9f75d0..28239307bb 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs @@ -22,9 +22,11 @@ namespace MASES.KNet.Connect { /// - /// An implementation of for sink task + /// An implementation of for sink task /// - public abstract class KNetSinkTask : KNetTask + /// The class which extends + public abstract class KNetSinkTask : KNetTask + where TTask : KNetSinkTask { /// /// The diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs index bc013b6c60..8fa16bebdd 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs @@ -22,10 +22,13 @@ namespace MASES.KNet.Connect { /// - /// An implementation of for source connectors + /// An implementation of for source connectors /// - /// The task class inherited from - public abstract class KNetSourceConnector : KNetConnector where TTask : KNetSourceTask + /// The connector class inherited from + /// The task class inherited from + public abstract class KNetSourceConnector : KNetConnector + where TSourceConnector : KNetSourceConnector + where TTask : KNetSourceTask { /// /// The diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs index 2e1ab6d5b7..13ef328b90 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs @@ -22,9 +22,11 @@ namespace MASES.KNet.Connect { /// - /// An implementation of for source task + /// An implementation of for source task /// - public abstract class KNetSourceTask : KNetTask + /// The class which extends + public abstract class KNetSourceTask : KNetTask + where TTask : KNetSourceTask { protected Map OffsetForKey(string key, V value) => Collections.SingletonMap(key, value); diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs index 6693e270ac..a5dcdc9818 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs @@ -134,4 +134,16 @@ public object VersionInternal() /// public abstract string Version(); } + /// + /// The base task class which is the base of both source or sink task and receives information about implementing class with + /// + /// The class which extends + public abstract class KNetTask : KNetTask + where TTask : KNetTask + { + /// + /// Set the of the task to the value defined from + /// + public override string Version() => typeof(TTask).Assembly.GetName().Version.ToString(); + } } From d1183651aa6c0ef67f28154385e1a3597779c0b4 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 18 May 2022 01:29:55 +0200 Subject: [PATCH 10/13] Update test/template classes following review of SDK classes --- .../templates/knetConnectSink/KNetConnectSink.cs | 11 ++--------- .../templates/knetConnectSource/KNetConnectSource.cs | 11 ++--------- tests/KNetConnectTest/KnetConnectSink.cs | 12 ++---------- tests/KNetConnectTest/KnetConnectSource.cs | 12 ++---------- 4 files changed, 8 insertions(+), 38 deletions(-) diff --git a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs index 6bd346d1ef..477e8daf67 100644 --- a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs +++ b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs @@ -4,10 +4,8 @@ namespace MASES.KNetTemplate.KNetConnect { - public class KNetConnectSink : KNetSinkConnector + public class KNetConnectSink : KNetSinkConnector { - public override string ConnectorName => "MASES.KNetTemplate.KNetConnect.KNetConnectSink"; - public override void Start(Map props) { @@ -24,7 +22,7 @@ public override void TaskConfigs(int index, Map config) } } - public class KnetConnectSinkTask : KNetSinkTask + public class KnetConnectSinkTask : KNetSinkTask { public override void Put(Collection collection) { @@ -40,10 +38,5 @@ public override void Stop() { } - - public override string Version() - { - return "KnetConnectSinkTask"; - } } } diff --git a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs index d80c634b66..88a9c2707a 100644 --- a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs +++ b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs @@ -4,10 +4,8 @@ namespace MASES.KNetTemplate.KNetConnect { - public class KNetConnectSource : KNetSourceConnector + public class KNetConnectSource : KNetSourceConnector { - public override string ConnectorName => "MASES.KNetTemplate.KNetConnect.KNetConnectSource"; - public override void Start(Map props) { @@ -24,7 +22,7 @@ public override void TaskConfigs(int index, Map config) } } - public class KnetConnectSourceTask : KNetSourceTask + public class KnetConnectSourceTask : KNetSourceTask { public override List Poll() { @@ -40,10 +38,5 @@ public override void Stop() { } - - public override string Version() - { - return "KnetConnectSourceTask"; - } } } diff --git a/tests/KNetConnectTest/KnetConnectSink.cs b/tests/KNetConnectTest/KnetConnectSink.cs index 489b9f1731..eedbd43f44 100644 --- a/tests/KNetConnectTest/KnetConnectSink.cs +++ b/tests/KNetConnectTest/KnetConnectSink.cs @@ -17,16 +17,13 @@ */ using Java.Util; -using MASES.KNet.Common.Config; using MASES.KNet.Connect; using MASES.KNet.Connect.Sink; namespace MASES.KNetConnectTest { - public class KNetSinkTestConnector : KNetSinkConnector + public class KNetSinkTestConnector : KNetSinkConnector { - public override string ConnectorName => "MASES.KNetConnectTest.KNetSinkTestConnector"; - public override void Start(Map props) { @@ -43,7 +40,7 @@ public override void TaskConfigs(int index, Map config) } } - public class KNetSinkTestTask : KNetSinkTask + public class KNetSinkTestTask : KNetSinkTask { public override void Put(Collection collection) { @@ -59,10 +56,5 @@ public override void Stop() { } - - public override string Version() - { - return "KNetSinkTestTask"; - } } } diff --git a/tests/KNetConnectTest/KnetConnectSource.cs b/tests/KNetConnectTest/KnetConnectSource.cs index f88dac270e..eefa06d031 100644 --- a/tests/KNetConnectTest/KnetConnectSource.cs +++ b/tests/KNetConnectTest/KnetConnectSource.cs @@ -17,16 +17,13 @@ */ using Java.Util; -using MASES.KNet.Common.Config; using MASES.KNet.Connect; using MASES.KNet.Connect.Source; namespace MASES.KNetConnectTest { - public class KNetSourceTestConnector : KNetSourceConnector + public class KNetSourceTestConnector : KNetSourceConnector { - public override string ConnectorName => "MASES.KNetConnectTest.KNetSourceTestConnector"; - public override void Start(Map props) { @@ -43,7 +40,7 @@ public override void TaskConfigs(int index, Map config) } } - public class KNetSourceTestTask : KNetSourceTask + public class KNetSourceTestTask : KNetSourceTask { public override List Poll() { @@ -61,10 +58,5 @@ public override void Stop() { } - - public override string Version() - { - return "KnetSourceTestTask"; - } } } From 02555fbda4723c534c0fb43ce9b9de1c5be70856 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 18 May 2022 03:45:00 +0200 Subject: [PATCH 11/13] Documentation update --- README.md | 2 + src/net/Documentation/articles/actualstate.md | 2 +- src/net/Documentation/articles/connectSDK.md | 47 +++++++++++++++++++ src/net/Documentation/articles/toc.yml | 18 ++++--- .../Documentation/articles/usageTemplates.md | 38 +++++++++++++++ src/net/templates/templatepack.csproj | 4 +- 6 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 src/net/Documentation/articles/connectSDK.md create mode 100644 src/net/Documentation/articles/usageTemplates.md diff --git a/README.md b/README.md index d6aceb4b3f..a5da3d182a 100644 --- a/README.md +++ b/README.md @@ -50,9 +50,11 @@ Have a look at the following resources: * [Roadmap](src/net/Documentation/articles/roadmap.md) * [Actual state](src/net/Documentation/articles/actualstate.md) * [Performance](src/net/Documentation/articles/performance.md) +* [Connect SDK](src/net/Documentation/articles/connectSDK.md) * [KNet usage](src/net/Documentation/articles/usage.md) * [KNet APIs extensibility](src/net/Documentation/articles/API_extensibility.md) * [KNetCLI usage](src/net/Documentation/articles/usageCLI.md) +* [Template Usage Guide](src/net/Documentation/articles/usageTemplates.md) * [How to build from scratch](src/net/Documentation/articles/howtobuild.md) --- diff --git a/src/net/Documentation/articles/actualstate.md b/src/net/Documentation/articles/actualstate.md index 8643eb53ca..2c10ece0e0 100644 --- a/src/net/Documentation/articles/actualstate.md +++ b/src/net/Documentation/articles/actualstate.md @@ -7,6 +7,6 @@ This release comes with some ready made classes: * [X] Apache Kafka Admin Client covering all available APIs: since many classes are marked with @InterfaceStability.Evolving annotation some properties or methods can be missed; use **dynamic** code to interact with Admin API types. * [X] Almost completed Apache Kafka Streams * [X] Almost completed Apache Kafka Connect -* [ ] .NET Apache Kafka Connect SDK (under development) +* [X] .NET Apache Kafka Connect SDK (a basic version) If something is not available use [API extensibility](API_extensibility.md) to cover missing features. diff --git a/src/net/Documentation/articles/connectSDK.md b/src/net/Documentation/articles/connectSDK.md new file mode 100644 index 0000000000..6bdb2a11ea --- /dev/null +++ b/src/net/Documentation/articles/connectSDK.md @@ -0,0 +1,47 @@ +# KNet: Connect SDK + +This is only a quick start guide, many other information related to Apache Kafka Connect can be found at the following link https://kafka.apache.org/documentation/#connect + +## General + +To start a Connect session the user shall use the [KNet CLI](usageCLI.md). + +The commands related to Apache Kafka Connect are: +- ConnectDistributed +- ConnectStandalone + +To go in detail look at https://kafka.apache.org/documentation/#connect and https://kafka.apache.org/quickstart#quickstart_kafkaconnect. + +## Standalone + +In this guide we focus on the standalone version. +The guide start from the assumption that an assembly was generated: see [Template Usage Guide](usageTemplates.md). +Put the assembly within a folder (__C:\MyConnect__), then go within it. +As explained in https://kafka.apache.org/documentation/#connect Apache Kafka Connect needs at least one configuration file, in standalone mode it needs two configuration files: +1. The first file is **connect-standalone.properties** (or **connect-distributed.properties** for distributed environments): this file contains configuration information for Apache Kafka Connect; +2. The second optional file defines the connector to use and its options. + +In the [config folder](https://github.com/masesgroup/KNet/tree/master/src/config) the user can found many configuration files. +The files named **connect-knet-sink.properties** and **connect-knet-source.properties** contain examples for sink and source connectors. + +Copy within __C:\MyConnect__ **connect-standalone.properties** and update it especially on line containing __bootstrap.servers__, then copy **connect-knet-sink.properties** or **connect-knet-source.properties** depending on the connector type. +The main options related to KNet Connect SDK are: +- __connector.class=**KNetSinkConnector**__ where the value is the connector Java class and must be: + - __KNetSinkConnector__ for sink connectors + - __KNetSourceConnector__ for source connectors +- __knet.dotnet.classname=MASES.KNetTemplate.KNetConnect.KNetConnectSink, knetConnectSink__ where the value is the .NET class name in the form of __**FullName**, **AssemblyName**__ + +When the __C:\MyConnect__ folder contains all the files it is possible to run Apache Kafka Connect: + +> +> knet -ClassToRun ConnectStandalone connect-standalone.properties connect-knet-sink.properties +> + +## Distributed + +As stated in [Apache Kafka Connect Guide](https://kafka.apache.org/documentation/#connect ) the distributed version does not use the connector file definition, instead it shall be managed using the REST interface. +The start-up command within __C:\MyConnect__ folder becomes: + +> +> knet -ClassToRun ConnectDistributed connect-distributed.properties +> diff --git a/src/net/Documentation/articles/toc.yml b/src/net/Documentation/articles/toc.yml index 892d11886d..22864a6872 100644 --- a/src/net/Documentation/articles/toc.yml +++ b/src/net/Documentation/articles/toc.yml @@ -1,18 +1,22 @@ - name: Introduction href: intro.md -- name: Roadmap +- name: KNet Roadmap href: roadmap.md -- name: Actual state +- name: KNet Actual state href: actualstate.md -- name: Performance +- name: KNet Performance href: performance.md -- name: APIs extendibility +- name: KNet APIs extendibility href: API_extensibility.md -- name: JVM callbacks +- name: KNet JVM callbacks href: jvm_callbacks.md -- name: Usage +- name: KNet Connect SDK + href: connectSDK.md +- name: KNet Usage href: usage.md -- name: Usage CLI +- name: KNet CLI Usage href: usageCLI.md +- name: KNet Template Usage Guide + href: usageTemplates.md - name: How to build from scratch href: howtobuild.md \ No newline at end of file diff --git a/src/net/Documentation/articles/usageTemplates.md b/src/net/Documentation/articles/usageTemplates.md new file mode 100644 index 0000000000..033cb01b92 --- /dev/null +++ b/src/net/Documentation/articles/usageTemplates.md @@ -0,0 +1,38 @@ +# KNet: Template Usage Guide + +For more information related to .NET templates look at https://docs.microsoft.com/en-us/dotnet/core/tools/dotnet-new-sdk-templates. + +## Installation + +To install the templates executes the following command within a command shell: + +> +> dotnet new --install MASES.KNet.Templates +> + +The command installs the latest version and on success will list all templates added to the list of available templates. +They are: +1. knetConsumerApp: a project to create a consumer application for Apache Kafka +2. knetPipeStreamApp: a project to create a pipe stream application for Apache Kafka Streams +3. knetProducerApp: a project to create a producer application for Apache Kafka +4. knetConnectSink: a project to create a library which conforms to an Apache Kafka Connect Sink Connector written in .NET +5. knetConnectSource: a project to create a library which conforms to an Apache Kafka Connect Source Connector written in .NET + +## Simple usage + +The first three templates are ready made project with enough code to use them directly. +To use one of the available templates run the following command: + +> +> dotnet new knetConsumerApp +> + +the previous command will create a .NET project for an executable. The user can modify the code or just execute it against an Apache Kafka server. + +## SDK templates + +The last two templates (knetConnectSink, knetConnectSource) are not ready made project: they are skeletons for Apache Kafka Connect Source/Sink Connector written in .NET. +The available code does not do anything: the functions in the code shall be filled to obtain some results. + +With the available code the user can verify how an Apache Kafka Connect Source/Sink Connector, written in .NET, works; to do this the projects can be compiled to obtain an assembly. +See [Connect SDK](connectSDK.md) for some information on how use it. diff --git a/src/net/templates/templatepack.csproj b/src/net/templates/templatepack.csproj index 871d6b0b22..172ad65c7f 100644 --- a/src/net/templates/templatepack.csproj +++ b/src/net/templates/templatepack.csproj @@ -28,7 +28,7 @@ true false content - README.md + usageTemplates.md @@ -45,7 +45,7 @@ - + From d5607abe5af39e3aac022eb4bd103b0cdfde3e2d Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 18 May 2022 03:54:44 +0200 Subject: [PATCH 12/13] Fix class name --- .../templates/templates/knetConnectSink/KNetConnectSink.cs | 4 ++-- .../templates/knetConnectSource/KNetConnectSource.cs | 4 ++-- .../{KnetConnectSink.cs => KNetConnectSink.cs} | 0 .../{KnetConnectSource.cs => KNetConnectSource.cs} | 0 4 files changed, 4 insertions(+), 4 deletions(-) rename tests/KNetConnectTest/{KnetConnectSink.cs => KNetConnectSink.cs} (100%) rename tests/KNetConnectTest/{KnetConnectSource.cs => KNetConnectSource.cs} (100%) diff --git a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs index 477e8daf67..5f0fb5d306 100644 --- a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs +++ b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs @@ -4,7 +4,7 @@ namespace MASES.KNetTemplate.KNetConnect { - public class KNetConnectSink : KNetSinkConnector + public class KNetConnectSink : KNetSinkConnector { public override void Start(Map props) { @@ -22,7 +22,7 @@ public override void TaskConfigs(int index, Map config) } } - public class KnetConnectSinkTask : KNetSinkTask + public class KNetConnectSinkTask : KNetSinkTask { public override void Put(Collection collection) { diff --git a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs index 88a9c2707a..a990a3173d 100644 --- a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs +++ b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs @@ -4,7 +4,7 @@ namespace MASES.KNetTemplate.KNetConnect { - public class KNetConnectSource : KNetSourceConnector + public class KNetConnectSource : KNetSourceConnector { public override void Start(Map props) { @@ -22,7 +22,7 @@ public override void TaskConfigs(int index, Map config) } } - public class KnetConnectSourceTask : KNetSourceTask + public class KNetConnectSourceTask : KNetSourceTask { public override List Poll() { diff --git a/tests/KNetConnectTest/KnetConnectSink.cs b/tests/KNetConnectTest/KNetConnectSink.cs similarity index 100% rename from tests/KNetConnectTest/KnetConnectSink.cs rename to tests/KNetConnectTest/KNetConnectSink.cs diff --git a/tests/KNetConnectTest/KnetConnectSource.cs b/tests/KNetConnectTest/KNetConnectSource.cs similarity index 100% rename from tests/KNetConnectTest/KnetConnectSource.cs rename to tests/KNetConnectTest/KNetConnectSource.cs From 3d343aec398bac12a5c73ca3dec48f9c3228bf29 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 18 May 2022 12:34:43 +0200 Subject: [PATCH 13/13] Fixed namespace naming --- .../Connect/Data/ConnectSchema.cs | 5 ++- .../Connect/Data/SchemaProjector.cs | 5 ++- .../BridgedClasses/Connect/Data/Struct.cs | 17 +++++----- .../BridgedClasses/Connect/Data/Values.cs | 33 +++++++++---------- 4 files changed, 28 insertions(+), 32 deletions(-) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/ConnectSchema.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/ConnectSchema.cs index b9ec3caa81..804af1f617 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/ConnectSchema.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/ConnectSchema.cs @@ -17,7 +17,6 @@ */ using Java.Util; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -31,12 +30,12 @@ public ConnectSchema() { } - public ConnectSchema(Type type, bool optional, JavaLang.Object defaultValue, string name, int version, string doc, Map parameters, List fields, Schema keySchema, Schema valueSchema) + public ConnectSchema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc, Map parameters, List fields, Schema keySchema, Schema valueSchema) : base(type, optional, defaultValue, name, version, doc, parameters, fields, keySchema, valueSchema) { } - public ConnectSchema(Type type, bool optional, JavaLang.Object defaultValue, string name, int version, string doc) + public ConnectSchema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc) :this(type, optional, defaultValue, name, version, doc, null, null, null, null) { } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaProjector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaProjector.cs index e38fa86dc4..29fcc71e34 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaProjector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaProjector.cs @@ -17,7 +17,6 @@ */ using MASES.JCOBridge.C2JBridge; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -25,9 +24,9 @@ public class SchemaProjector : JVMBridgeBase { public override string ClassName => "org.apache.kafka.connect.data.SchemaProjector"; - public static JavaLang.Object Project(Schema source, JavaLang.Object record, Schema target) + public static Java.Lang.Object Project(Schema source, Java.Lang.Object record, Schema target) { - return SExecute("project", source, record, target); + return SExecute("project", source, record, target); } } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Struct.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Struct.cs index 39001c16ea..cf745a7eda 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Struct.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Struct.cs @@ -18,7 +18,6 @@ using MASES.JCOBridge.C2JBridge; using Java.Util; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -37,19 +36,19 @@ public Struct(Schema schema) public Schema Schema => IExecute("schema"); - public JavaLang.Object Get(string fieldName) + public Java.Lang.Object Get(string fieldName) { - return IExecute("get", fieldName); + return IExecute("get", fieldName); } - public JavaLang.Object Get(Field field) + public Java.Lang.Object Get(Field field) { - return IExecute("get", field); + return IExecute("get", field); } - public JavaLang.Object GetWithoutDefault(string fieldName) + public Java.Lang.Object GetWithoutDefault(string fieldName) { - return IExecute("getWithoutDefault", fieldName); + return IExecute("getWithoutDefault", fieldName); } public byte GetInt8(string fieldName) @@ -112,12 +111,12 @@ public Struct GetStruct(string fieldName) return IExecute("getStruct", fieldName); } - public Struct Put(string fieldName, JavaLang.Object value) + public Struct Put(string fieldName, Java.Lang.Object value) { return IExecute("put", fieldName, value); } - public Struct Put(Field field, JavaLang.Object value) + public Struct Put(Field field, Java.Lang.Object value) { return IExecute("put", field, value); } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Values.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Values.cs index 7865e8101b..11db47f8fd 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Values.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Values.cs @@ -19,7 +19,6 @@ using MASES.JCOBridge.C2JBridge; using Java.Math; using Java.Util; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -27,82 +26,82 @@ public class Values : JVMBridgeBase { public override string ClassName => "org.apache.kafka.connect.data.Values"; - public static bool ConvertToBoolean(Schema schema, JavaLang.Object value) + public static bool ConvertToBoolean(Schema schema, Java.Lang.Object value) { return SExecute("convertToBoolean", schema, value); } - public static byte ConvertToByte(Schema schema, JavaLang.Object value) + public static byte ConvertToByte(Schema schema, Java.Lang.Object value) { return SExecute("convertToByte", schema, value); } - public static short ConvertToShort(Schema schema, JavaLang.Object value) + public static short ConvertToShort(Schema schema, Java.Lang.Object value) { return SExecute("convertToShort", schema, value); } - public static int ConvertToInteger(Schema schema, JavaLang.Object value) + public static int ConvertToInteger(Schema schema, Java.Lang.Object value) { return SExecute("convertToInteger", schema, value); } - public static long ConvertToLong(Schema schema, JavaLang.Object value) + public static long ConvertToLong(Schema schema, Java.Lang.Object value) { return SExecute("convertToLong", schema, value); } - public static float ConvertToFloat(Schema schema, JavaLang.Object value) + public static float ConvertToFloat(Schema schema, Java.Lang.Object value) { return SExecute("convertToFloat", schema, value); } - public static double ConvertToDouble(Schema schema, JavaLang.Object value) + public static double ConvertToDouble(Schema schema, Java.Lang.Object value) { return SExecute("convertToDouble", schema, value); } - public static string ConvertToString(Schema schema, JavaLang.Object value) + public static string ConvertToString(Schema schema, Java.Lang.Object value) { return SExecute("convertToString", schema, value); } - public static List ConvertToList(Schema schema, JavaLang.Object value) + public static List ConvertToList(Schema schema, Java.Lang.Object value) { return SExecute("convertToList", schema, value); } - public static Map ConvertToMap(Schema schema, JavaLang.Object value) + public static Map ConvertToMap(Schema schema, Java.Lang.Object value) { return SExecute("convertToMap", schema, value); } - public static Struct ConvertToStruct(Schema schema, JavaLang.Object value) + public static Struct ConvertToStruct(Schema schema, Java.Lang.Object value) { return SExecute("convertToStruct", schema, value); } - public static Java.Util.Date ConvertToTime(Schema schema, JavaLang.Object value) + public static Java.Util.Date ConvertToTime(Schema schema, Java.Lang.Object value) { return SExecute("convertToTime", schema, value); } - public static Java.Util.Date ConvertToDate(Schema schema, JavaLang.Object value) + public static Java.Util.Date ConvertToDate(Schema schema, Java.Lang.Object value) { return SExecute("convertToDate", schema, value); } - public static Java.Util.Date ConvertToTimestamp(Schema schema, JavaLang.Object value) + public static Java.Util.Date ConvertToTimestamp(Schema schema, Java.Lang.Object value) { return SExecute("convertToTimestamp", schema, value); } - public static BigDecimal ConvertToDecimal(Schema schema, JavaLang.Object value, int scale) + public static BigDecimal ConvertToDecimal(Schema schema, Java.Lang.Object value, int scale) { return SExecute("convertToDecimal", schema, value, scale); } - public static Schema InferSchema(JavaLang.Object value) + public static Schema InferSchema(Java.Lang.Object value) { return SExecute("inferSchema", value); }