diff --git a/README.md b/README.md index d6aceb4b3f..a5da3d182a 100644 --- a/README.md +++ b/README.md @@ -50,9 +50,11 @@ Have a look at the following resources: * [Roadmap](src/net/Documentation/articles/roadmap.md) * [Actual state](src/net/Documentation/articles/actualstate.md) * [Performance](src/net/Documentation/articles/performance.md) +* [Connect SDK](src/net/Documentation/articles/connectSDK.md) * [KNet usage](src/net/Documentation/articles/usage.md) * [KNet APIs extensibility](src/net/Documentation/articles/API_extensibility.md) * [KNetCLI usage](src/net/Documentation/articles/usageCLI.md) +* [Template Usage Guide](src/net/Documentation/articles/usageTemplates.md) * [How to build from scratch](src/net/Documentation/articles/howtobuild.md) --- diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java b/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java index 775a8cd20e..8a02a1d77a 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/KNetConnectProxy.java @@ -55,7 +55,7 @@ public static synchronized boolean initializeSinkConnector(Map p AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props); String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG); if (className == null) - throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition"); + throw new ConfigException("'knet.dotnet.classname' in KNetSinkConnector configuration requires a definition"); return (boolean) getConnectProxy().Invoke("AllocateSinkConnector", className); } @@ -63,7 +63,7 @@ public static synchronized boolean initializeSourceConnector(Map AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props); String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG); if (className == null) - throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition"); + throw new ConfigException("'knet.dotnet.classname' in KNetSourceConnector configuration requires a definition"); return (boolean) getConnectProxy().Invoke("AllocateSourceConnector", className); } diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java index 90666a44ee..a495da7780 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkConnector.java @@ -19,9 +19,10 @@ package org.mases.knet.connect.sink; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkConnectorContext; import org.mases.jcobridge.*; import org.mases.knet.connect.KNetConnectProxy; import org.slf4j.Logger; @@ -48,18 +49,22 @@ public void setDataToExchange(Object dte) { dataToExchange = dte; } + public SinkConnectorContext getContext() { + return context(); + } + @Override public void start(Map props) { try { if (!KNetConnectProxy.initializeSinkConnector(props)) { log.error("Failed Invoke of \"initializeSinkConnector\""); - throw new ConfigException("Failed Invoke of \"initializeSinkConnector\""); + throw new ConnectException("Failed Invoke of \"initializeSinkConnector\""); } else { JCOBridge.RegisterJVMGlobal(registrationName, this); try { dataToExchange = props; JCObject sink = KNetConnectProxy.getSinkConnector(); - if (sink == null) throw new ConfigException("getSinkConnector returned null."); + if (sink == null) throw new ConnectException("getSinkConnector returned null."); sink.Invoke("StartInternal"); } finally { dataToExchange = null; @@ -83,7 +88,7 @@ public List> taskConfigs(int maxTasks) { try { dataToExchange = config; JCObject sink = KNetConnectProxy.getSinkConnector(); - if (sink == null) throw new ConfigException("getSinkConnector returned null."); + if (sink == null) throw new ConnectException("getSinkConnector returned null."); sink.Invoke("TaskConfigsInternal", i); } catch (JCException | IOException jcne) { log.error("Failed Invoke of \"start\"", jcne); @@ -100,7 +105,7 @@ public void stop() { try { try { JCObject sink = KNetConnectProxy.getSinkConnector(); - if (sink == null) throw new ConfigException("getSinkConnector returned null."); + if (sink == null) throw new ConnectException("getSinkConnector returned null."); sink.Invoke("StopInternal"); } finally { JCOBridge.UnregisterJVMGlobal(registrationName); diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java index 7757237b8f..005c5ef4af 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/sink/KNetSinkTask.java @@ -18,7 +18,7 @@ package org.mases.knet.connect.sink; -import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.sink.SinkTaskContext; @@ -48,15 +48,19 @@ public void setDataToExchange(Object dte) { dataToExchange = dte; } - public KNetSinkTask() throws ConfigException, JCException, IOException { + public KNetSinkTask() throws ConnectException, JCException, IOException { super(); long taskid = taskId.incrementAndGet(); JCOBridge.RegisterJVMGlobal(String.format("KNetSinkTask_%d", taskid), this); JCObject sink = KNetConnectProxy.getSinkConnector(); - if (sink == null) throw new ConfigException("getSinkConnector returned null."); + if (sink == null) throw new ConnectException("getSinkConnector returned null."); sinkTask = (JCObject) sink.Invoke("AllocateTask", taskid); } + public SinkTaskContext getContext() { + return context; + } + @Override public String version() { try { diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java index 669c033c6f..28a141e29a 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceConnector.java @@ -19,9 +19,10 @@ package org.mases.knet.connect.source; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.source.SourceConnectorContext; import org.mases.jcobridge.*; import org.mases.knet.connect.KNetConnectProxy; import org.slf4j.Logger; @@ -48,18 +49,22 @@ public void setDataToExchange(Object dte) { dataToExchange = dte; } + public SourceConnectorContext getContext() { + return context(); + } + @Override public void start(Map props) { try { if (!KNetConnectProxy.initializeSourceConnector(props)) { log.error("Failed Invoke of \"initializeSourceConnector\""); - throw new ConfigException("Failed Invoke of \"initializeSourceConnector\""); + throw new ConnectException("Failed Invoke of \"initializeSourceConnector\""); } else { JCOBridge.RegisterJVMGlobal(registrationName, this); try { dataToExchange = props; JCObject source = KNetConnectProxy.getSourceConnector(); - if (source == null) throw new ConfigException("getSourceConnector returned null."); + if (source == null) throw new ConnectException("getSourceConnector returned null."); source.Invoke("StartInternal"); } finally { dataToExchange = null; @@ -83,7 +88,7 @@ public List> taskConfigs(int maxTasks) { try { dataToExchange = config; JCObject source = KNetConnectProxy.getSourceConnector(); - if (source == null) throw new ConfigException("getSourceConnector returned null."); + if (source == null) throw new ConnectException("getSourceConnector returned null."); source.Invoke("TaskConfigsInternal", i); } catch (JCException | IOException jcne) { log.error("Failed Invoke of \"start\"", jcne); @@ -100,7 +105,7 @@ public void stop() { try { try { JCObject source = KNetConnectProxy.getSourceConnector(); - if (source == null) throw new ConfigException("getSourceConnector returned null."); + if (source == null) throw new ConnectException("getSourceConnector returned null."); source.Invoke("StopInternal"); } finally { JCOBridge.UnregisterJVMGlobal(registrationName); diff --git a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java index b57bfd60f4..8d9e26df12 100644 --- a/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java +++ b/src/java/knet/src/main/java/org/mases/knet/connect/source/KNetSourceTask.java @@ -18,9 +18,10 @@ package org.mases.knet.connect.source; -import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTaskContext; import org.mases.jcobridge.*; import org.mases.knet.connect.KNetConnectProxy; import org.slf4j.Logger; @@ -47,15 +48,19 @@ public void setDataToExchange(Object dte) { dataToExchange = dte; } - public KNetSourceTask() throws ConfigException, JCException, IOException { + public KNetSourceTask() throws ConnectException, JCException, IOException { super(); long taskid = taskId.incrementAndGet(); JCOBridge.RegisterJVMGlobal(String.format("KNetSourceTask_%d", taskid), this); JCObject source = KNetConnectProxy.getSourceConnector(); - if (source == null) throw new ConfigException("getSourceConnector returned null."); + if (source == null) throw new ConnectException("getSourceConnector returned null."); sourceTask = (JCObject) source.Invoke("AllocateTask", taskid); } + public SourceTaskContext getContext() { + return context; + } + @Override public String version() { try { @@ -85,8 +90,13 @@ public void start(Map map) { @Override public List poll() throws InterruptedException { try { - Object result = sourceTask.Invoke("PollInternal"); - if (result != null) return (List) result; + try { + dataToExchange = null; + sourceTask.Invoke("PollInternal"); + if (dataToExchange != null) return (List) dataToExchange; + } finally { + dataToExchange = null; + } } catch (JCNativeException jcne) { log.error("Failed Invoke of \"poll\"", jcne); } diff --git a/src/net/Documentation/articles/actualstate.md b/src/net/Documentation/articles/actualstate.md index 8643eb53ca..2c10ece0e0 100644 --- a/src/net/Documentation/articles/actualstate.md +++ b/src/net/Documentation/articles/actualstate.md @@ -7,6 +7,6 @@ This release comes with some ready made classes: * [X] Apache Kafka Admin Client covering all available APIs: since many classes are marked with @InterfaceStability.Evolving annotation some properties or methods can be missed; use **dynamic** code to interact with Admin API types. * [X] Almost completed Apache Kafka Streams * [X] Almost completed Apache Kafka Connect -* [ ] .NET Apache Kafka Connect SDK (under development) +* [X] .NET Apache Kafka Connect SDK (a basic version) If something is not available use [API extensibility](API_extensibility.md) to cover missing features. diff --git a/src/net/Documentation/articles/connectSDK.md b/src/net/Documentation/articles/connectSDK.md new file mode 100644 index 0000000000..6bdb2a11ea --- /dev/null +++ b/src/net/Documentation/articles/connectSDK.md @@ -0,0 +1,47 @@ +# KNet: Connect SDK + +This is only a quick start guide, many other information related to Apache Kafka Connect can be found at the following link https://kafka.apache.org/documentation/#connect + +## General + +To start a Connect session the user shall use the [KNet CLI](usageCLI.md). + +The commands related to Apache Kafka Connect are: +- ConnectDistributed +- ConnectStandalone + +To go in detail look at https://kafka.apache.org/documentation/#connect and https://kafka.apache.org/quickstart#quickstart_kafkaconnect. + +## Standalone + +In this guide we focus on the standalone version. +The guide start from the assumption that an assembly was generated: see [Template Usage Guide](usageTemplates.md). +Put the assembly within a folder (__C:\MyConnect__), then go within it. +As explained in https://kafka.apache.org/documentation/#connect Apache Kafka Connect needs at least one configuration file, in standalone mode it needs two configuration files: +1. The first file is **connect-standalone.properties** (or **connect-distributed.properties** for distributed environments): this file contains configuration information for Apache Kafka Connect; +2. The second optional file defines the connector to use and its options. + +In the [config folder](https://github.com/masesgroup/KNet/tree/master/src/config) the user can found many configuration files. +The files named **connect-knet-sink.properties** and **connect-knet-source.properties** contain examples for sink and source connectors. + +Copy within __C:\MyConnect__ **connect-standalone.properties** and update it especially on line containing __bootstrap.servers__, then copy **connect-knet-sink.properties** or **connect-knet-source.properties** depending on the connector type. +The main options related to KNet Connect SDK are: +- __connector.class=**KNetSinkConnector**__ where the value is the connector Java class and must be: + - __KNetSinkConnector__ for sink connectors + - __KNetSourceConnector__ for source connectors +- __knet.dotnet.classname=MASES.KNetTemplate.KNetConnect.KNetConnectSink, knetConnectSink__ where the value is the .NET class name in the form of __**FullName**, **AssemblyName**__ + +When the __C:\MyConnect__ folder contains all the files it is possible to run Apache Kafka Connect: + +> +> knet -ClassToRun ConnectStandalone connect-standalone.properties connect-knet-sink.properties +> + +## Distributed + +As stated in [Apache Kafka Connect Guide](https://kafka.apache.org/documentation/#connect ) the distributed version does not use the connector file definition, instead it shall be managed using the REST interface. +The start-up command within __C:\MyConnect__ folder becomes: + +> +> knet -ClassToRun ConnectDistributed connect-distributed.properties +> diff --git a/src/net/Documentation/articles/toc.yml b/src/net/Documentation/articles/toc.yml index 892d11886d..22864a6872 100644 --- a/src/net/Documentation/articles/toc.yml +++ b/src/net/Documentation/articles/toc.yml @@ -1,18 +1,22 @@ - name: Introduction href: intro.md -- name: Roadmap +- name: KNet Roadmap href: roadmap.md -- name: Actual state +- name: KNet Actual state href: actualstate.md -- name: Performance +- name: KNet Performance href: performance.md -- name: APIs extendibility +- name: KNet APIs extendibility href: API_extensibility.md -- name: JVM callbacks +- name: KNet JVM callbacks href: jvm_callbacks.md -- name: Usage +- name: KNet Connect SDK + href: connectSDK.md +- name: KNet Usage href: usage.md -- name: Usage CLI +- name: KNet CLI Usage href: usageCLI.md +- name: KNet Template Usage Guide + href: usageTemplates.md - name: How to build from scratch href: howtobuild.md \ No newline at end of file diff --git a/src/net/Documentation/articles/usageTemplates.md b/src/net/Documentation/articles/usageTemplates.md new file mode 100644 index 0000000000..033cb01b92 --- /dev/null +++ b/src/net/Documentation/articles/usageTemplates.md @@ -0,0 +1,38 @@ +# KNet: Template Usage Guide + +For more information related to .NET templates look at https://docs.microsoft.com/en-us/dotnet/core/tools/dotnet-new-sdk-templates. + +## Installation + +To install the templates executes the following command within a command shell: + +> +> dotnet new --install MASES.KNet.Templates +> + +The command installs the latest version and on success will list all templates added to the list of available templates. +They are: +1. knetConsumerApp: a project to create a consumer application for Apache Kafka +2. knetPipeStreamApp: a project to create a pipe stream application for Apache Kafka Streams +3. knetProducerApp: a project to create a producer application for Apache Kafka +4. knetConnectSink: a project to create a library which conforms to an Apache Kafka Connect Sink Connector written in .NET +5. knetConnectSource: a project to create a library which conforms to an Apache Kafka Connect Source Connector written in .NET + +## Simple usage + +The first three templates are ready made project with enough code to use them directly. +To use one of the available templates run the following command: + +> +> dotnet new knetConsumerApp +> + +the previous command will create a .NET project for an executable. The user can modify the code or just execute it against an Apache Kafka server. + +## SDK templates + +The last two templates (knetConnectSink, knetConnectSource) are not ready made project: they are skeletons for Apache Kafka Connect Source/Sink Connector written in .NET. +The available code does not do anything: the functions in the code shall be filled to obtain some results. + +With the available code the user can verify how an Apache Kafka Connect Source/Sink Connector, written in .NET, works; to do this the projects can be compiled to obtain an assembly. +See [Connect SDK](connectSDK.md) for some information on how use it. diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/ConnectSchema.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/ConnectSchema.cs index b9ec3caa81..804af1f617 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/ConnectSchema.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/ConnectSchema.cs @@ -17,7 +17,6 @@ */ using Java.Util; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -31,12 +30,12 @@ public ConnectSchema() { } - public ConnectSchema(Type type, bool optional, JavaLang.Object defaultValue, string name, int version, string doc, Map parameters, List fields, Schema keySchema, Schema valueSchema) + public ConnectSchema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc, Map parameters, List fields, Schema keySchema, Schema valueSchema) : base(type, optional, defaultValue, name, version, doc, parameters, fields, keySchema, valueSchema) { } - public ConnectSchema(Type type, bool optional, JavaLang.Object defaultValue, string name, int version, string doc) + public ConnectSchema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc) :this(type, optional, defaultValue, name, version, doc, null, null, null, null) { } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs index a60b12d4a9..8c32a26a95 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Schema.cs @@ -18,7 +18,6 @@ using MASES.JCOBridge.C2JBridge; using Java.Util; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -92,7 +91,7 @@ public interface ISchema : IJVMBridgeBase bool IsOptional { get; } - JavaLang.Object DefaultValue(); + Java.Lang.Object DefaultValue(); string Name(); @@ -115,6 +114,8 @@ public interface ISchema : IJVMBridgeBase public class Schema : JVMBridgeBase, ISchema { + public override bool IsInterface => true; + public override string ClassName => "org.apache.kafka.connect.data.Schema"; [System.Obsolete("This is not public in Apache Kafka API")] @@ -128,19 +129,39 @@ public Schema(params object[] args) { } - public Type Type() { return IExecute("type"); } + public static Schema INT8_SCHEMA => SExecute("INT8_SCHEMA"); + public static Schema INT16_SCHEMA => SExecute("INT16_SCHEMA"); + public static Schema INT32_SCHEMA => SExecute("INT32_SCHEMA"); + public static Schema INT64_SCHEMA => SExecute("INT64_SCHEMA"); + public static Schema FLOAT32_SCHEMA => SExecute("FLOAT32_SCHEMA"); + public static Schema FLOAT64_SCHEMA => SExecute("FLOAT64_SCHEMA"); + public static Schema BOOLEAN_SCHEMA => SExecute("BOOLEAN_SCHEMA"); + public static Schema STRING_SCHEMA => SExecute("STRING_SCHEMA"); + public static Schema BYTES_SCHEMA => SExecute("BYTES_SCHEMA"); + + public static Schema OPTIONAL_INT8_SCHEMA => SExecute("OPTIONAL_INT8_SCHEMA"); + public static Schema OPTIONAL_INT16_SCHEMA => SExecute("OPTIONAL_INT16_SCHEMA"); + public static Schema OPTIONAL_INT32_SCHEMA => SExecute("OPTIONAL_INT32_SCHEMA"); + public static Schema OPTIONAL_INT64_SCHEMA => SExecute("OPTIONAL_INT64_SCHEMA"); + public static Schema OPTIONAL_FLOAT32_SCHEMA => SExecute("OPTIONAL_FLOAT32_SCHEMA"); + public static Schema OPTIONAL_FLOAT64_SCHEMA => SExecute("OPTIONAL_FLOAT64_SCHEMA"); + public static Schema OPTIONAL_BOOLEAN_SCHEMA => SExecute("OPTIONAL_BOOLEAN_SCHEMA"); + public static Schema OPTIONAL_STRING_SCHEMA => SExecute("OPTIONAL_STRING_SCHEMA"); + public static Schema OPTIONAL_BYTES_SCHEMA => SExecute("OPTIONAL_BYTES_SCHEMA"); + + public Type Type() => IExecute("type"); public bool IsOptional => IExecute("isOptional"); - public JavaLang.Object DefaultValue() { return IExecute("defaultValue"); } + public Java.Lang.Object DefaultValue() => IExecute("defaultValue"); - public string Name() { return IExecute("name"); } + public string Name() => IExecute("name"); - public int Version() { return IExecute("version"); } + public int Version() => IExecute("version"); - public string Doc() { return IExecute("doc"); } + public string Doc() => IExecute("doc"); - public Map Parameters() { return IExecute>("parameters"); } + public Map Parameters() => IExecute>("parameters"); public Schema KeySchema => IExecute("keySchema"); diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaAndValue.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaAndValue.cs index fa9f41cc40..8159beae6e 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaAndValue.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaAndValue.cs @@ -17,7 +17,6 @@ */ using MASES.JCOBridge.C2JBridge; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -31,13 +30,13 @@ public SchemaAndValue() { } - public SchemaAndValue(Schema schema, JavaLang.Object value) + public SchemaAndValue(Schema schema, Java.Lang.Object value) :base(schema, value) { } public Schema Schema => IExecute("schema"); - public JavaLang.Object Value => IExecute("value"); + public Java.Lang.Object Value => IExecute("value"); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaBuilder.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaBuilder.cs index a644168e19..fd2bc3be77 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaBuilder.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaBuilder.cs @@ -22,6 +22,8 @@ namespace MASES.KNet.Connect.Data { public class SchemaBuilder : Schema { + public override bool IsInterface => false; + public override string ClassName => "org.apache.kafka.connect.data.SchemaBuilder"; [System.Obsolete("This is not public in Apache Kafka API")] diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaProjector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaProjector.cs index e38fa86dc4..29fcc71e34 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaProjector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/SchemaProjector.cs @@ -17,7 +17,6 @@ */ using MASES.JCOBridge.C2JBridge; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -25,9 +24,9 @@ public class SchemaProjector : JVMBridgeBase { public override string ClassName => "org.apache.kafka.connect.data.SchemaProjector"; - public static JavaLang.Object Project(Schema source, JavaLang.Object record, Schema target) + public static Java.Lang.Object Project(Schema source, Java.Lang.Object record, Schema target) { - return SExecute("project", source, record, target); + return SExecute("project", source, record, target); } } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Struct.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Struct.cs index 39001c16ea..cf745a7eda 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Struct.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Struct.cs @@ -18,7 +18,6 @@ using MASES.JCOBridge.C2JBridge; using Java.Util; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -37,19 +36,19 @@ public Struct(Schema schema) public Schema Schema => IExecute("schema"); - public JavaLang.Object Get(string fieldName) + public Java.Lang.Object Get(string fieldName) { - return IExecute("get", fieldName); + return IExecute("get", fieldName); } - public JavaLang.Object Get(Field field) + public Java.Lang.Object Get(Field field) { - return IExecute("get", field); + return IExecute("get", field); } - public JavaLang.Object GetWithoutDefault(string fieldName) + public Java.Lang.Object GetWithoutDefault(string fieldName) { - return IExecute("getWithoutDefault", fieldName); + return IExecute("getWithoutDefault", fieldName); } public byte GetInt8(string fieldName) @@ -112,12 +111,12 @@ public Struct GetStruct(string fieldName) return IExecute("getStruct", fieldName); } - public Struct Put(string fieldName, JavaLang.Object value) + public Struct Put(string fieldName, Java.Lang.Object value) { return IExecute("put", fieldName, value); } - public Struct Put(Field field, JavaLang.Object value) + public Struct Put(Field field, Java.Lang.Object value) { return IExecute("put", field, value); } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Values.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Values.cs index 7865e8101b..11db47f8fd 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Values.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/Data/Values.cs @@ -19,7 +19,6 @@ using MASES.JCOBridge.C2JBridge; using Java.Math; using Java.Util; -using JavaLang = Java.Lang; namespace MASES.KNet.Connect.Data { @@ -27,82 +26,82 @@ public class Values : JVMBridgeBase { public override string ClassName => "org.apache.kafka.connect.data.Values"; - public static bool ConvertToBoolean(Schema schema, JavaLang.Object value) + public static bool ConvertToBoolean(Schema schema, Java.Lang.Object value) { return SExecute("convertToBoolean", schema, value); } - public static byte ConvertToByte(Schema schema, JavaLang.Object value) + public static byte ConvertToByte(Schema schema, Java.Lang.Object value) { return SExecute("convertToByte", schema, value); } - public static short ConvertToShort(Schema schema, JavaLang.Object value) + public static short ConvertToShort(Schema schema, Java.Lang.Object value) { return SExecute("convertToShort", schema, value); } - public static int ConvertToInteger(Schema schema, JavaLang.Object value) + public static int ConvertToInteger(Schema schema, Java.Lang.Object value) { return SExecute("convertToInteger", schema, value); } - public static long ConvertToLong(Schema schema, JavaLang.Object value) + public static long ConvertToLong(Schema schema, Java.Lang.Object value) { return SExecute("convertToLong", schema, value); } - public static float ConvertToFloat(Schema schema, JavaLang.Object value) + public static float ConvertToFloat(Schema schema, Java.Lang.Object value) { return SExecute("convertToFloat", schema, value); } - public static double ConvertToDouble(Schema schema, JavaLang.Object value) + public static double ConvertToDouble(Schema schema, Java.Lang.Object value) { return SExecute("convertToDouble", schema, value); } - public static string ConvertToString(Schema schema, JavaLang.Object value) + public static string ConvertToString(Schema schema, Java.Lang.Object value) { return SExecute("convertToString", schema, value); } - public static List ConvertToList(Schema schema, JavaLang.Object value) + public static List ConvertToList(Schema schema, Java.Lang.Object value) { return SExecute("convertToList", schema, value); } - public static Map ConvertToMap(Schema schema, JavaLang.Object value) + public static Map ConvertToMap(Schema schema, Java.Lang.Object value) { return SExecute("convertToMap", schema, value); } - public static Struct ConvertToStruct(Schema schema, JavaLang.Object value) + public static Struct ConvertToStruct(Schema schema, Java.Lang.Object value) { return SExecute("convertToStruct", schema, value); } - public static Java.Util.Date ConvertToTime(Schema schema, JavaLang.Object value) + public static Java.Util.Date ConvertToTime(Schema schema, Java.Lang.Object value) { return SExecute("convertToTime", schema, value); } - public static Java.Util.Date ConvertToDate(Schema schema, JavaLang.Object value) + public static Java.Util.Date ConvertToDate(Schema schema, Java.Lang.Object value) { return SExecute("convertToDate", schema, value); } - public static Java.Util.Date ConvertToTimestamp(Schema schema, JavaLang.Object value) + public static Java.Util.Date ConvertToTimestamp(Schema schema, Java.Lang.Object value) { return SExecute("convertToTimestamp", schema, value); } - public static BigDecimal ConvertToDecimal(Schema schema, JavaLang.Object value, int scale) + public static BigDecimal ConvertToDecimal(Schema schema, Java.Lang.Object value, int scale) { return SExecute("convertToDecimal", schema, value, scale); } - public static Schema InferSchema(JavaLang.Object value) + public static Schema InferSchema(Java.Lang.Object value) { return SExecute("inferSchema", value); } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnectProxy.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnectProxy.cs index 27213c54b9..9bf07a0dcf 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnectProxy.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnectProxy.cs @@ -20,6 +20,9 @@ namespace MASES.KNet.Connect { + /// + /// Internal class used to proxy and pairs data exchange with Java side + /// public class KNetConnectProxy { static readonly object globalInstanceLock = new(); @@ -27,7 +30,9 @@ public class KNetConnectProxy static KNetConnector SinkConnector = null; static KNetConnector SourceConnector = null; - + /// + /// Register the proxy + /// public static void Register() { lock (globalInstanceLock) @@ -39,7 +44,11 @@ public static void Register() } } } - + /// + /// Allocates a sink connector + /// + /// The class name read from Java within the configuration parameters + /// if successfully public bool AllocateSinkConnector(string connectorClassName) { lock (globalInstanceLock) @@ -57,7 +66,11 @@ public bool AllocateSinkConnector(string connectorClassName) } } } - + /// + /// Allocates a source connector + /// + /// The class name read from Java within the configuration parameters + /// if successfully public bool AllocateSourceConnector(string connectorClassName) { lock (globalInstanceLock) @@ -75,7 +88,10 @@ public bool AllocateSourceConnector(string connectorClassName) } } } - + /// + /// Returns the registration name of the sink connector + /// + /// The content of public string SinkConnectorName() { lock (globalInstanceLock) @@ -84,7 +100,10 @@ public string SinkConnectorName() return null; } } - + /// + /// Returns the registration name of the sourcce connector + /// + /// The content of public string SourceConnectorName() { lock (globalInstanceLock) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs index b764f8aed5..0079de2926 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs @@ -26,36 +26,97 @@ namespace MASES.KNet.Connect { + /// + /// Specific implementation of to support KNet Connect SDK + /// public interface IKNetConnector : IConnector { + /// + /// Allocates a task object based on + /// + /// The unique id generated from JAva side + /// The local .NET object object AllocateTask(long taskId); - + /// + /// The unique name of the connector + /// string ConnectorName { get; } - + /// + /// The of task to be allocated, it shall inherits from + /// Type TaskClassType { get; } + /// + /// Invoked during allocation of tasks from Apache Kafka Connect + /// + /// The actual index + /// The to be filled in with properties for the task: the same will be received from + void TaskConfigs(int index, Map config); } - + /// + /// The generic class which is the base of both source or sink connectors + /// public abstract class KNetConnector : IKNetConnector { - protected ConnectorContext ctx; + /// + /// The set of allocated with their associated identifiers + /// protected ConcurrentDictionary taskDictionary = new(); IJavaObject reflectedConnector = null; - + /// + /// Initializer + /// public KNetConnector() { - KNetCore.GlobalInstance.RegisterCLRGlobal(ReflectedConnectorName, this); + KNetCore.GlobalInstance.RegisterCLRGlobal(ReflectedConnectorClassName, this); } - + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// protected T DataToExchange() { if (reflectedConnector == null) { - reflectedConnector = KNetCore.GlobalInstance.GetJVMGlobal(ReflectedConnectorName); + reflectedConnector = KNetCore.GlobalInstance.GetJVMGlobal(ReflectedConnectorClassName); } - return (reflectedConnector != null) ? reflectedConnector.Invoke("getDataToExchange") : throw new InvalidOperationException($"{ReflectedConnectorName} was not registered in global JVM"); + return (reflectedConnector != null) ? reflectedConnector.Invoke("getDataToExchange") : throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM"); } - + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// + protected void DataToExchange(object data) + { + if (reflectedConnector != null) + { + JCOBridge.C2JBridge.IJVMBridgeBase jvmBBD = data as JCOBridge.C2JBridge.IJVMBridgeBase; + reflectedConnector.Invoke("setDataToExchange", jvmBBD != null ? jvmBBD.Instance : data); + } + else + { + throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM"); + } + } + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// + protected T Context() + { + if (reflectedConnector == null) + { + reflectedConnector = KNetCore.GlobalInstance.GetJVMGlobal(ReflectedConnectorClassName); + } + return (reflectedConnector != null) ? reflectedConnector.Invoke("getContext") : throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM"); + } + /// public object AllocateTask(long taskId) { return taskDictionary.GetOrAdd(taskId, (id) => @@ -65,56 +126,75 @@ public object AllocateTask(long taskId) return knetTask; }); } - - public abstract string ReflectedConnectorName { get; } - + /// + /// The unique name used to map objects between JVM and .NET + /// + public abstract string ReflectedConnectorClassName { get; } + /// public abstract string ConnectorName { get; } - + /// public abstract Type TaskClassType { get; } public void Initialize(ConnectorContext ctx) => throw new NotImplementedException("Invoked in Java before any initialization."); public void Initialize(ConnectorContext ctx, List> taskConfigs) => throw new NotImplementedException("Invoked in Java before any initialization."); - + /// + /// Public method used from Java to trigger + /// public void StartInternal() { Map props = DataToExchange>(); Start(props); } - + /// + /// Implement the method to execute the start action + /// + /// The set of properties returned from Apache Kafka Connect framework: the contains the same info from configuration file. public abstract void Start(Map props); - public void Reconfigure(Map props) { } + public void Reconfigure(Map props) => throw new NotImplementedException("Invoked in Java before any initialization."); public Class TaskClass() => throw new NotImplementedException("Invoked in Java before any initialization."); - + /// + /// Public method used from Java to trigger + /// public void TaskConfigsInternal(int index) { Map props = DataToExchange>(); TaskConfigs(index, props); } - + /// public abstract void TaskConfigs(int index, Map config); public List> TaskConfigs(int maxTasks) => throw new NotImplementedException("Invoked using the other signature."); - + /// + /// Public method used from Java to trigger + /// public void StopInternal() { Stop(); } - + /// + /// Implement the method to execute the stop action + /// public abstract void Stop(); - public object ValidateInternal(object connectorConfigsObj) - { - Map connectorConfigs = DataToExchange>(); - return Validate(connectorConfigs); - } - - public abstract Config Validate(Map connectorConfigs); + public Config Validate(Map connectorConfigs) => throw new NotImplementedException("Invoked in Java before any initialization."); public ConfigDef Config() => throw new NotImplementedException("Invoked in Java before any initialization."); public string Version() => throw new NotImplementedException("Invoked in Java before any initialization."); } + /// + /// The base connector class which is the base of both source or sink connectors and receives information about implementing class with + /// + /// The class which extends + public abstract class KNetConnector : KNetConnector + where TConnector : KNetConnector + { + /// + /// Set the of the connector to the value defined from + /// + public override string ConnectorName => typeof(TConnector).FullName; + } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs index 90c0eaf072..a63f5458a7 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkConnector.cs @@ -16,14 +16,31 @@ * Refer to LICENSE for more information. */ +using MASES.KNet.Connect.Sink; using System; namespace MASES.KNet.Connect { - public abstract class KNetSinkConnector : KNetConnector where TTask : KNetSinkTask + /// + /// An implementation of for sink connectors + /// + /// The connector class inherited from + /// The task class inherited from + public abstract class KNetSinkConnector : KNetConnector + where TSinkConnector : KNetSinkConnector + where TTask : KNetSinkTask { - public sealed override string ReflectedConnectorName => "KNetSinkConnector"; - + /// + /// The + /// + public SinkConnectorContext Context => Context(); + /// + /// Set the of the connector to a fixed value + /// + public sealed override string ReflectedConnectorClassName => "KNetSinkConnector"; + /// + /// Set the of the connector to the value defined from + /// public sealed override Type TaskClassType => typeof(TTask); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs index c9b82f0024..28239307bb 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSinkTask.cs @@ -21,16 +21,33 @@ namespace MASES.KNet.Connect { - public abstract class KNetSinkTask : KNetTask + /// + /// An implementation of for sink task + /// + /// The class which extends + public abstract class KNetSinkTask : KNetTask + where TTask : KNetSinkTask { + /// + /// The + /// + public SinkTaskContext Context => Context(); + /// + /// Set the of the connector to a fixed value + /// public override string ReflectedTaskClassName => "KNetSinkTask"; - + /// + /// Public method used from Java to trigger + /// public void PutInternal() { Collection collection = DataToExchange>(); Put(collection); } - + /// + /// Implement the method to execute the Put action + /// + /// The set of from Apache Kafka Connect framework public abstract void Put(Collection collection); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs index 028358b882..8fa16bebdd 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceConnector.cs @@ -16,14 +16,31 @@ * Refer to LICENSE for more information. */ +using MASES.KNet.Connect.Source; using System; namespace MASES.KNet.Connect { - public abstract class KNetSourceConnector : KNetConnector where TTask : KNetSourceTask + /// + /// An implementation of for source connectors + /// + /// The connector class inherited from + /// The task class inherited from + public abstract class KNetSourceConnector : KNetConnector + where TSourceConnector : KNetSourceConnector + where TTask : KNetSourceTask { - public sealed override string ReflectedConnectorName => "KNetSourceConnector"; - + /// + /// The + /// + public SourceConnectorContext Context => Context(); + /// + /// Set the of the connector to a fixed value + /// + public sealed override string ReflectedConnectorClassName => "KNetSourceConnector"; + /// + /// Set the of the connector to the value defined from + /// public sealed override Type TaskClassType => typeof(TTask); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs index 1149505862..13ef328b90 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetSourceTask.cs @@ -21,15 +21,37 @@ namespace MASES.KNet.Connect { - public abstract class KNetSourceTask : KNetTask + /// + /// An implementation of for source task + /// + /// The class which extends + public abstract class KNetSourceTask : KNetTask + where TTask : KNetSourceTask { - public override string ReflectedTaskClassName => "KNetSourceTask"; + protected Map OffsetForKey(string key, V value) => Collections.SingletonMap(key, value); + + protected Map OffsetAt(string key, V value) => Context.OffsetStorageReader.Offset(Collections.SingletonMap(key, value)); - public List PollInternal() + /// + /// The + /// + public SourceTaskContext Context => Context(); + /// + /// Set the of the connector to a fixed value + /// + public override string ReflectedTaskClassName => "KNetSourceTask"; + /// + /// Public method used from Java to trigger + /// + public void PollInternal() { - return Poll(); + var result = Poll(); + DataToExchange(result); } - + /// + /// Implement the method to execute the Poll action + /// + /// The list of to return to Apache Kafka Connect framework public abstract List Poll(); } } diff --git a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs index a387a287c1..a5dcdc9818 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs @@ -23,13 +23,23 @@ namespace MASES.KNet.Connect { + /// + /// Specific implementation of to support KNet Connect SDK + /// public interface IKNetTask : ITask { + /// + /// The associated + /// IKNetConnector Connector { get; } - + /// + /// The id received during initialization + /// long TaskId { get; } } - + /// + /// The generic class which is the base of both source or sink task + /// public abstract class KNetTask : IKNetTask { IKNetConnector connector; @@ -42,38 +52,98 @@ internal void Initialize(IKNetConnector connector, long taskId) this.taskId = taskId; reflectedTask = KNetCore.GlobalInstance.GetJVMGlobal($"{ReflectedTaskClassName}_{taskId}"); } - + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// protected T DataToExchange() { return (reflectedTask != null) ? reflectedTask.Invoke("getDataToExchange") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); } - + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// + protected void DataToExchange(object data) + { + if (reflectedTask != null) + { + JCOBridge.C2JBridge.IJVMBridgeBase jvmBBD = data as JCOBridge.C2JBridge.IJVMBridgeBase; + reflectedTask.Invoke("setDataToExchange", jvmBBD != null ? jvmBBD.Instance : data); + } + else + { + throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); + } + } + /// + /// An helper function to read the data from Java side + /// + /// The expected return + /// The + /// + protected T Context() + { + return (reflectedTask != null) ? reflectedTask.Invoke("getContext") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM"); + } + /// public IKNetConnector Connector => connector; - + /// public long TaskId => taskId; - + /// + /// The unique name used to map objects between JVM and .NET + /// public abstract string ReflectedTaskClassName { get; } - + /// + /// Public method used from Java to trigger + /// public void StartInternal() { Map props = DataToExchange>(); Start(props); } - + /// + /// Implement the method to execute the start action + /// + /// The set of properties returned from Apache Kafka Connect framework: the contains the info from . public abstract void Start(Map props); - + /// + /// Public method used from Java to trigger + /// public void StopInternal() { Stop(); } - + /// + /// Implement the method to execute the stop action + /// public abstract void Stop(); - + /// + /// Public method used from Java to trigger + /// public object VersionInternal() { return Version(); } - + /// + /// Implement the method to execute the version action + /// public abstract string Version(); } + /// + /// The base task class which is the base of both source or sink task and receives information about implementing class with + /// + /// The class which extends + public abstract class KNetTask : KNetTask + where TTask : KNetTask + { + /// + /// Set the of the task to the value defined from + /// + public override string Version() => typeof(TTask).Assembly.GetName().Version.ToString(); + } } diff --git a/src/net/templates/templatepack.csproj b/src/net/templates/templatepack.csproj index 871d6b0b22..172ad65c7f 100644 --- a/src/net/templates/templatepack.csproj +++ b/src/net/templates/templatepack.csproj @@ -28,7 +28,7 @@ true false content - README.md + usageTemplates.md @@ -45,7 +45,7 @@ - + diff --git a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs index de5b364c5b..5f0fb5d306 100644 --- a/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs +++ b/src/net/templates/templates/knetConnectSink/KNetConnectSink.cs @@ -1,14 +1,11 @@ using Java.Util; -using MASES.KNet.Common.Config; using MASES.KNet.Connect; using MASES.KNet.Connect.Sink; namespace MASES.KNetTemplate.KNetConnect { - public class KNetConnectSink : KNetSinkConnector + public class KNetConnectSink : KNetSinkConnector { - public override string ConnectorName => "MASES.KNetTemplate.KNetConnect.KNetConnectSink"; - public override void Start(Map props) { @@ -23,14 +20,9 @@ public override void TaskConfigs(int index, Map config) { } - - public override Config Validate(Map connectorConfigs) - { - return null; - } } - public class KnetConnectSinkTask : KNetSinkTask + public class KNetConnectSinkTask : KNetSinkTask { public override void Put(Collection collection) { @@ -46,10 +38,5 @@ public override void Stop() { } - - public override string Version() - { - return "KnetConnectSinkTask"; - } } } diff --git a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs index 450f94c4a6..a990a3173d 100644 --- a/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs +++ b/src/net/templates/templates/knetConnectSource/KNetConnectSource.cs @@ -1,14 +1,11 @@ using Java.Util; -using MASES.KNet.Common.Config; using MASES.KNet.Connect; using MASES.KNet.Connect.Source; namespace MASES.KNetTemplate.KNetConnect { - public class KNetConnectSource : KNetSourceConnector + public class KNetConnectSource : KNetSourceConnector { - public override string ConnectorName => "MASES.KNetTemplate.KNetConnect.KNetConnectSource"; - public override void Start(Map props) { @@ -23,14 +20,9 @@ public override void TaskConfigs(int index, Map config) { } - - public override Config Validate(Map connectorConfigs) - { - return null; - } } - public class KnetConnectSourceTask : KNetSourceTask + public class KNetConnectSourceTask : KNetSourceTask { public override List Poll() { @@ -46,10 +38,5 @@ public override void Stop() { } - - public override string Version() - { - return "KnetConnectSourceTask"; - } } } diff --git a/src/net/templates/templates/knetConsumerApp/.template.config/template.json b/src/net/templates/templates/knetConsumerApp/.template.config/template.json index c958f01f35..b11533fe5b 100644 --- a/src/net/templates/templates/knetConsumerApp/.template.config/template.json +++ b/src/net/templates/templates/knetConsumerApp/.template.config/template.json @@ -1,7 +1,7 @@ { "$schema": "http://json.schemastore.org/template", "author": "MASES s.r.l.", - "classifications": [ "Common", "Console", "C#8" ], + "classifications": [ "Common", "Console" ], "identity": "MASES.KNetTemplate.KNetConsumer", "name": "Console templates: KNet Consumer project", "shortName": "knetConsumerApp", diff --git a/src/net/templates/templates/knetPipeStreamApp/.template.config/template.json b/src/net/templates/templates/knetPipeStreamApp/.template.config/template.json index 2fb158666d..27c0467d61 100644 --- a/src/net/templates/templates/knetPipeStreamApp/.template.config/template.json +++ b/src/net/templates/templates/knetPipeStreamApp/.template.config/template.json @@ -1,7 +1,7 @@ { "$schema": "http://json.schemastore.org/template", "author": "MASES s.r.l.", - "classifications": [ "Common", "Console", "C#8" ], + "classifications": [ "Common", "Console" ], "identity": "MASES.KNetTemplate.KNetPipeStream", "name": "Console templates: KNet Pipe Stream project", "shortName": "knetPipeStreamApp", diff --git a/src/net/templates/templates/knetProducerApp/.template.config/template.json b/src/net/templates/templates/knetProducerApp/.template.config/template.json index 3342f5e785..d0ecf3aaae 100644 --- a/src/net/templates/templates/knetProducerApp/.template.config/template.json +++ b/src/net/templates/templates/knetProducerApp/.template.config/template.json @@ -1,7 +1,7 @@ { "$schema": "http://json.schemastore.org/template", "author": "MASES s.r.l.", - "classifications": [ "Common", "Console", "C#8" ], + "classifications": [ "Common", "Console" ], "identity": "MASES.KNetTemplate.KNetProducer", "name": "Console templates: KNet Producer project", "shortName": "knetProducerApp", diff --git a/tests/KNetConnectTest/KnetConnectSink.cs b/tests/KNetConnectTest/KNetConnectSink.cs similarity index 72% rename from tests/KNetConnectTest/KnetConnectSink.cs rename to tests/KNetConnectTest/KNetConnectSink.cs index 014c46c69b..eedbd43f44 100644 --- a/tests/KNetConnectTest/KnetConnectSink.cs +++ b/tests/KNetConnectTest/KNetConnectSink.cs @@ -17,16 +17,13 @@ */ using Java.Util; -using MASES.KNet.Common.Config; using MASES.KNet.Connect; using MASES.KNet.Connect.Sink; namespace MASES.KNetConnectTest { - public class KnetSinkTestConnector : KNetSinkConnector + public class KNetSinkTestConnector : KNetSinkConnector { - public override string ConnectorName => "MASES.KNetConnectTest.KnetSinkTestConnector"; - public override void Start(Map props) { @@ -41,14 +38,9 @@ public override void TaskConfigs(int index, Map config) { } - - public override Config Validate(Map connectorConfigs) - { - return null; - } } - public class KnetSinkTestTask : KNetSinkTask + public class KNetSinkTestTask : KNetSinkTask { public override void Put(Collection collection) { @@ -64,10 +56,5 @@ public override void Stop() { } - - public override string Version() - { - return "KnetSinkTestTask"; - } } } diff --git a/tests/KNetConnectTest/KnetConnectSource.cs b/tests/KNetConnectTest/KNetConnectSource.cs similarity index 70% rename from tests/KNetConnectTest/KnetConnectSource.cs rename to tests/KNetConnectTest/KNetConnectSource.cs index 4b98c3699d..eefa06d031 100644 --- a/tests/KNetConnectTest/KnetConnectSource.cs +++ b/tests/KNetConnectTest/KNetConnectSource.cs @@ -17,16 +17,13 @@ */ using Java.Util; -using MASES.KNet.Common.Config; using MASES.KNet.Connect; using MASES.KNet.Connect.Source; namespace MASES.KNetConnectTest { - public class KnetSourceTestConnector : KNetSourceConnector + public class KNetSourceTestConnector : KNetSourceConnector { - public override string ConnectorName => "MASES.KNetConnectTest.KnetSourceTestConnector"; - public override void Start(Map props) { @@ -41,18 +38,15 @@ public override void TaskConfigs(int index, Map config) { } - - public override Config Validate(Map connectorConfigs) - { - return null; - } } - public class KnetSourceTestTask : KNetSourceTask + public class KNetSourceTestTask : KNetSourceTask { public override List Poll() { - return null; + ArrayList records = new(); + + return records; } public override void Start(Map props) @@ -64,10 +58,5 @@ public override void Stop() { } - - public override string Version() - { - return "KnetSourceTestTask"; - } } }