Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement KNet Connect SDK #68

Merged
merged 13 commits into from
May 18, 2022
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ Have a look at the following resources:
* [Roadmap](src/net/Documentation/articles/roadmap.md)
* [Actual state](src/net/Documentation/articles/actualstate.md)
* [Performance](src/net/Documentation/articles/performance.md)
* [Connect SDK](src/net/Documentation/articles/connectSDK.md)
* [KNet usage](src/net/Documentation/articles/usage.md)
* [KNet APIs extensibility](src/net/Documentation/articles/API_extensibility.md)
* [KNetCLI usage](src/net/Documentation/articles/usageCLI.md)
* [Template Usage Guide](src/net/Documentation/articles/usageTemplates.md)
* [How to build from scratch](src/net/Documentation/articles/howtobuild.md)

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ public static synchronized boolean initializeSinkConnector(Map<String, String> p
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG);
if (className == null)
throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition");
throw new ConfigException("'knet.dotnet.classname' in KNetSinkConnector configuration requires a definition");
return (boolean) getConnectProxy().Invoke("AllocateSinkConnector", className);
}

public static synchronized boolean initializeSourceConnector(Map<String, String> props) throws JCException, IOException {
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG);
if (className == null)
throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition");
throw new ConfigException("'knet.dotnet.classname' in KNetSourceConnector configuration requires a definition");
return (boolean) getConnectProxy().Invoke("AllocateSourceConnector", className);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.mases.knet.connect.sink;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand All @@ -48,18 +49,22 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public SinkConnectorContext getContext() {
return context();
}

@Override
public void start(Map<String, String> props) {
try {
if (!KNetConnectProxy.initializeSinkConnector(props)) {
log.error("Failed Invoke of \"initializeSinkConnector\"");
throw new ConfigException("Failed Invoke of \"initializeSinkConnector\"");
throw new ConnectException("Failed Invoke of \"initializeSinkConnector\"");
} else {
JCOBridge.RegisterJVMGlobal(registrationName, this);
try {
dataToExchange = props;
JCObject sink = KNetConnectProxy.getSinkConnector();
if (sink == null) throw new ConfigException("getSinkConnector returned null.");
if (sink == null) throw new ConnectException("getSinkConnector returned null.");
sink.Invoke("StartInternal");
} finally {
dataToExchange = null;
Expand All @@ -83,7 +88,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
try {
dataToExchange = config;
JCObject sink = KNetConnectProxy.getSinkConnector();
if (sink == null) throw new ConfigException("getSinkConnector returned null.");
if (sink == null) throw new ConnectException("getSinkConnector returned null.");
sink.Invoke("TaskConfigsInternal", i);
} catch (JCException | IOException jcne) {
log.error("Failed Invoke of \"start\"", jcne);
Expand All @@ -100,7 +105,7 @@ public void stop() {
try {
try {
JCObject sink = KNetConnectProxy.getSinkConnector();
if (sink == null) throw new ConfigException("getSinkConnector returned null.");
if (sink == null) throw new ConnectException("getSinkConnector returned null.");
sink.Invoke("StopInternal");
} finally {
JCOBridge.UnregisterJVMGlobal(registrationName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.mases.knet.connect.sink;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
Expand Down Expand Up @@ -48,15 +48,19 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public KNetSinkTask() throws ConfigException, JCException, IOException {
public KNetSinkTask() throws ConnectException, JCException, IOException {
super();
long taskid = taskId.incrementAndGet();
JCOBridge.RegisterJVMGlobal(String.format("KNetSinkTask_%d", taskid), this);
JCObject sink = KNetConnectProxy.getSinkConnector();
if (sink == null) throw new ConfigException("getSinkConnector returned null.");
if (sink == null) throw new ConnectException("getSinkConnector returned null.");
sinkTask = (JCObject) sink.Invoke("AllocateTask", taskid);
}

public SinkTaskContext getContext() {
return context;
}

@Override
public String version() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.mases.knet.connect.source;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand All @@ -48,18 +49,22 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public SourceConnectorContext getContext() {
return context();
}

@Override
public void start(Map<String, String> props) {
try {
if (!KNetConnectProxy.initializeSourceConnector(props)) {
log.error("Failed Invoke of \"initializeSourceConnector\"");
throw new ConfigException("Failed Invoke of \"initializeSourceConnector\"");
throw new ConnectException("Failed Invoke of \"initializeSourceConnector\"");
} else {
JCOBridge.RegisterJVMGlobal(registrationName, this);
try {
dataToExchange = props;
JCObject source = KNetConnectProxy.getSourceConnector();
if (source == null) throw new ConfigException("getSourceConnector returned null.");
if (source == null) throw new ConnectException("getSourceConnector returned null.");
source.Invoke("StartInternal");
} finally {
dataToExchange = null;
Expand All @@ -83,7 +88,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
try {
dataToExchange = config;
JCObject source = KNetConnectProxy.getSourceConnector();
if (source == null) throw new ConfigException("getSourceConnector returned null.");
if (source == null) throw new ConnectException("getSourceConnector returned null.");
source.Invoke("TaskConfigsInternal", i);
} catch (JCException | IOException jcne) {
log.error("Failed Invoke of \"start\"", jcne);
Expand All @@ -100,7 +105,7 @@ public void stop() {
try {
try {
JCObject source = KNetConnectProxy.getSourceConnector();
if (source == null) throw new ConfigException("getSourceConnector returned null.");
if (source == null) throw new ConnectException("getSourceConnector returned null.");
source.Invoke("StopInternal");
} finally {
JCOBridge.UnregisterJVMGlobal(registrationName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.mases.knet.connect.source;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand All @@ -47,15 +48,19 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public KNetSourceTask() throws ConfigException, JCException, IOException {
public KNetSourceTask() throws ConnectException, JCException, IOException {
super();
long taskid = taskId.incrementAndGet();
JCOBridge.RegisterJVMGlobal(String.format("KNetSourceTask_%d", taskid), this);
JCObject source = KNetConnectProxy.getSourceConnector();
if (source == null) throw new ConfigException("getSourceConnector returned null.");
if (source == null) throw new ConnectException("getSourceConnector returned null.");
sourceTask = (JCObject) source.Invoke("AllocateTask", taskid);
}

public SourceTaskContext getContext() {
return context;
}

@Override
public String version() {
try {
Expand Down Expand Up @@ -85,8 +90,13 @@ public void start(Map<String, String> map) {
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
Object result = sourceTask.Invoke("PollInternal");
if (result != null) return (List<SourceRecord>) result;
try {
dataToExchange = null;
sourceTask.Invoke("PollInternal");
if (dataToExchange != null) return (List<SourceRecord>) dataToExchange;
} finally {
dataToExchange = null;
}
} catch (JCNativeException jcne) {
log.error("Failed Invoke of \"poll\"", jcne);
}
Expand Down
2 changes: 1 addition & 1 deletion src/net/Documentation/articles/actualstate.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ This release comes with some ready made classes:
* [X] Apache Kafka Admin Client covering all available APIs: since many classes are marked with @InterfaceStability.Evolving annotation some properties or methods can be missed; use **dynamic** code to interact with Admin API types.
* [X] Almost completed Apache Kafka Streams
* [X] Almost completed Apache Kafka Connect
* [ ] .NET Apache Kafka Connect SDK (under development)
* [X] .NET Apache Kafka Connect SDK (a basic version)

If something is not available use [API extensibility](API_extensibility.md) to cover missing features.
47 changes: 47 additions & 0 deletions src/net/Documentation/articles/connectSDK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# KNet: Connect SDK

This is only a quick start guide, many other information related to Apache Kafka Connect can be found at the following link https://kafka.apache.org/documentation/#connect

## General

To start a Connect session the user shall use the [KNet CLI](usageCLI.md).

The commands related to Apache Kafka Connect are:
- ConnectDistributed
- ConnectStandalone

To go in detail look at https://kafka.apache.org/documentation/#connect and https://kafka.apache.org/quickstart#quickstart_kafkaconnect.

## Standalone

In this guide we focus on the standalone version.
The guide start from the assumption that an assembly was generated: see [Template Usage Guide](usageTemplates.md).
Put the assembly within a folder (__C:\MyConnect__), then go within it.
As explained in https://kafka.apache.org/documentation/#connect Apache Kafka Connect needs at least one configuration file, in standalone mode it needs two configuration files:
1. The first file is **connect-standalone.properties** (or **connect-distributed.properties** for distributed environments): this file contains configuration information for Apache Kafka Connect;
2. The second optional file defines the connector to use and its options.

In the [config folder](https://github.com/masesgroup/KNet/tree/master/src/config) the user can found many configuration files.
The files named **connect-knet-sink.properties** and **connect-knet-source.properties** contain examples for sink and source connectors.

Copy within __C:\MyConnect__ **connect-standalone.properties** and update it especially on line containing __bootstrap.servers__, then copy **connect-knet-sink.properties** or **connect-knet-source.properties** depending on the connector type.
The main options related to KNet Connect SDK are:
- __connector.class=**KNetSinkConnector**__ where the value is the connector Java class and must be:
- __KNetSinkConnector__ for sink connectors
- __KNetSourceConnector__ for source connectors
- __knet.dotnet.classname=MASES.KNetTemplate.KNetConnect.KNetConnectSink, knetConnectSink__ where the value is the .NET class name in the form of __**FullName**, **AssemblyName**__

When the __C:\MyConnect__ folder contains all the files it is possible to run Apache Kafka Connect:

>
> knet -ClassToRun ConnectStandalone connect-standalone.properties connect-knet-sink.properties
>

## Distributed

As stated in [Apache Kafka Connect Guide](https://kafka.apache.org/documentation/#connect ) the distributed version does not use the connector file definition, instead it shall be managed using the REST interface.
The start-up command within __C:\MyConnect__ folder becomes:

>
> knet -ClassToRun ConnectDistributed connect-distributed.properties
>
18 changes: 11 additions & 7 deletions src/net/Documentation/articles/toc.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
- name: Introduction
href: intro.md
- name: Roadmap
- name: KNet Roadmap
href: roadmap.md
- name: Actual state
- name: KNet Actual state
href: actualstate.md
- name: Performance
- name: KNet Performance
href: performance.md
- name: APIs extendibility
- name: KNet APIs extendibility
href: API_extensibility.md
- name: JVM callbacks
- name: KNet JVM callbacks
href: jvm_callbacks.md
- name: Usage
- name: KNet Connect SDK
href: connectSDK.md
- name: KNet Usage
href: usage.md
- name: Usage CLI
- name: KNet CLI Usage
href: usageCLI.md
- name: KNet Template Usage Guide
href: usageTemplates.md
- name: How to build from scratch
href: howtobuild.md
38 changes: 38 additions & 0 deletions src/net/Documentation/articles/usageTemplates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# KNet: Template Usage Guide

For more information related to .NET templates look at https://docs.microsoft.com/en-us/dotnet/core/tools/dotnet-new-sdk-templates.

## Installation

To install the templates executes the following command within a command shell:

>
> dotnet new --install MASES.KNet.Templates
>

The command installs the latest version and on success will list all templates added to the list of available templates.
They are:
1. knetConsumerApp: a project to create a consumer application for Apache Kafka
2. knetPipeStreamApp: a project to create a pipe stream application for Apache Kafka Streams
3. knetProducerApp: a project to create a producer application for Apache Kafka
4. knetConnectSink: a project to create a library which conforms to an Apache Kafka Connect Sink Connector written in .NET
5. knetConnectSource: a project to create a library which conforms to an Apache Kafka Connect Source Connector written in .NET

## Simple usage

The first three templates are ready made project with enough code to use them directly.
To use one of the available templates run the following command:

>
> dotnet new knetConsumerApp
>

the previous command will create a .NET project for an executable. The user can modify the code or just execute it against an Apache Kafka server.

## SDK templates

The last two templates (knetConnectSink, knetConnectSource) are not ready made project: they are skeletons for Apache Kafka Connect Source/Sink Connector written in .NET.
The available code does not do anything: the functions in the code shall be filled to obtain some results.

With the available code the user can verify how an Apache Kafka Connect Source/Sink Connector, written in .NET, works; to do this the projects can be compiled to obtain an assembly.
See [Connect SDK](connectSDK.md) for some information on how use it.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/

using Java.Util;
using JavaLang = Java.Lang;

namespace MASES.KNet.Connect.Data
{
Expand All @@ -31,12 +30,12 @@ public ConnectSchema()
{
}

public ConnectSchema(Type type, bool optional, JavaLang.Object defaultValue, string name, int version, string doc, Map<string, string> parameters, List<Field> fields, Schema keySchema, Schema valueSchema)
public ConnectSchema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc, Map<string, string> parameters, List<Field> fields, Schema keySchema, Schema valueSchema)
: base(type, optional, defaultValue, name, version, doc, parameters, fields, keySchema, valueSchema)
{
}

public ConnectSchema(Type type, bool optional, JavaLang.Object defaultValue, string name, int version, string doc)
public ConnectSchema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc)
:this(type, optional, defaultValue, name, version, doc, null, null, null, null)
{
}
Expand Down
Loading