Skip to content

Commit

Permalink
Enhancement KNet Connect SDK (#68)
Browse files Browse the repository at this point in the history
* #65: added some documentation

* Minor alignment

* #65: Added various contexts usable within SDK

* #65: update data exchange

* Class fix

* #65: added support functions

* Update exception used in methods

* #65: General review of .NET side Connect SDK classes

* Update test/template classes following review of SDK classes

* Documentation update

* Fix class name

* Fixed namespace naming
  • Loading branch information
masesdevelopers authored May 18, 2022
1 parent 9d4262b commit 486e384
Show file tree
Hide file tree
Showing 32 changed files with 521 additions and 196 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ Have a look at the following resources:
* [Roadmap](src/net/Documentation/articles/roadmap.md)
* [Actual state](src/net/Documentation/articles/actualstate.md)
* [Performance](src/net/Documentation/articles/performance.md)
* [Connect SDK](src/net/Documentation/articles/connectSDK.md)
* [KNet usage](src/net/Documentation/articles/usage.md)
* [KNet APIs extensibility](src/net/Documentation/articles/API_extensibility.md)
* [KNetCLI usage](src/net/Documentation/articles/usageCLI.md)
* [Template Usage Guide](src/net/Documentation/articles/usageTemplates.md)
* [How to build from scratch](src/net/Documentation/articles/howtobuild.md)

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ public static synchronized boolean initializeSinkConnector(Map<String, String> p
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG);
if (className == null)
throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition");
throw new ConfigException("'knet.dotnet.classname' in KNetSinkConnector configuration requires a definition");
return (boolean) getConnectProxy().Invoke("AllocateSinkConnector", className);
}

public static synchronized boolean initializeSourceConnector(Map<String, String> props) throws JCException, IOException {
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, props);
String className = parsedConfig.getString(DOTNET_CLASSNAME_CONFIG);
if (className == null)
throw new ConfigException("'classname' in KNetSinkConnector configuration requires a definition");
throw new ConfigException("'knet.dotnet.classname' in KNetSourceConnector configuration requires a definition");
return (boolean) getConnectProxy().Invoke("AllocateSourceConnector", className);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.mases.knet.connect.sink;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand All @@ -48,18 +49,22 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public SinkConnectorContext getContext() {
return context();
}

@Override
public void start(Map<String, String> props) {
try {
if (!KNetConnectProxy.initializeSinkConnector(props)) {
log.error("Failed Invoke of \"initializeSinkConnector\"");
throw new ConfigException("Failed Invoke of \"initializeSinkConnector\"");
throw new ConnectException("Failed Invoke of \"initializeSinkConnector\"");
} else {
JCOBridge.RegisterJVMGlobal(registrationName, this);
try {
dataToExchange = props;
JCObject sink = KNetConnectProxy.getSinkConnector();
if (sink == null) throw new ConfigException("getSinkConnector returned null.");
if (sink == null) throw new ConnectException("getSinkConnector returned null.");
sink.Invoke("StartInternal");
} finally {
dataToExchange = null;
Expand All @@ -83,7 +88,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
try {
dataToExchange = config;
JCObject sink = KNetConnectProxy.getSinkConnector();
if (sink == null) throw new ConfigException("getSinkConnector returned null.");
if (sink == null) throw new ConnectException("getSinkConnector returned null.");
sink.Invoke("TaskConfigsInternal", i);
} catch (JCException | IOException jcne) {
log.error("Failed Invoke of \"start\"", jcne);
Expand All @@ -100,7 +105,7 @@ public void stop() {
try {
try {
JCObject sink = KNetConnectProxy.getSinkConnector();
if (sink == null) throw new ConfigException("getSinkConnector returned null.");
if (sink == null) throw new ConnectException("getSinkConnector returned null.");
sink.Invoke("StopInternal");
} finally {
JCOBridge.UnregisterJVMGlobal(registrationName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.mases.knet.connect.sink;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
Expand Down Expand Up @@ -48,15 +48,19 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public KNetSinkTask() throws ConfigException, JCException, IOException {
public KNetSinkTask() throws ConnectException, JCException, IOException {
super();
long taskid = taskId.incrementAndGet();
JCOBridge.RegisterJVMGlobal(String.format("KNetSinkTask_%d", taskid), this);
JCObject sink = KNetConnectProxy.getSinkConnector();
if (sink == null) throw new ConfigException("getSinkConnector returned null.");
if (sink == null) throw new ConnectException("getSinkConnector returned null.");
sinkTask = (JCObject) sink.Invoke("AllocateTask", taskid);
}

public SinkTaskContext getContext() {
return context;
}

@Override
public String version() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.mases.knet.connect.source;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand All @@ -48,18 +49,22 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public SourceConnectorContext getContext() {
return context();
}

@Override
public void start(Map<String, String> props) {
try {
if (!KNetConnectProxy.initializeSourceConnector(props)) {
log.error("Failed Invoke of \"initializeSourceConnector\"");
throw new ConfigException("Failed Invoke of \"initializeSourceConnector\"");
throw new ConnectException("Failed Invoke of \"initializeSourceConnector\"");
} else {
JCOBridge.RegisterJVMGlobal(registrationName, this);
try {
dataToExchange = props;
JCObject source = KNetConnectProxy.getSourceConnector();
if (source == null) throw new ConfigException("getSourceConnector returned null.");
if (source == null) throw new ConnectException("getSourceConnector returned null.");
source.Invoke("StartInternal");
} finally {
dataToExchange = null;
Expand All @@ -83,7 +88,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
try {
dataToExchange = config;
JCObject source = KNetConnectProxy.getSourceConnector();
if (source == null) throw new ConfigException("getSourceConnector returned null.");
if (source == null) throw new ConnectException("getSourceConnector returned null.");
source.Invoke("TaskConfigsInternal", i);
} catch (JCException | IOException jcne) {
log.error("Failed Invoke of \"start\"", jcne);
Expand All @@ -100,7 +105,7 @@ public void stop() {
try {
try {
JCObject source = KNetConnectProxy.getSourceConnector();
if (source == null) throw new ConfigException("getSourceConnector returned null.");
if (source == null) throw new ConnectException("getSourceConnector returned null.");
source.Invoke("StopInternal");
} finally {
JCOBridge.UnregisterJVMGlobal(registrationName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.mases.knet.connect.source;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand All @@ -47,15 +48,19 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public KNetSourceTask() throws ConfigException, JCException, IOException {
public KNetSourceTask() throws ConnectException, JCException, IOException {
super();
long taskid = taskId.incrementAndGet();
JCOBridge.RegisterJVMGlobal(String.format("KNetSourceTask_%d", taskid), this);
JCObject source = KNetConnectProxy.getSourceConnector();
if (source == null) throw new ConfigException("getSourceConnector returned null.");
if (source == null) throw new ConnectException("getSourceConnector returned null.");
sourceTask = (JCObject) source.Invoke("AllocateTask", taskid);
}

public SourceTaskContext getContext() {
return context;
}

@Override
public String version() {
try {
Expand Down Expand Up @@ -85,8 +90,13 @@ public void start(Map<String, String> map) {
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
Object result = sourceTask.Invoke("PollInternal");
if (result != null) return (List<SourceRecord>) result;
try {
dataToExchange = null;
sourceTask.Invoke("PollInternal");
if (dataToExchange != null) return (List<SourceRecord>) dataToExchange;
} finally {
dataToExchange = null;
}
} catch (JCNativeException jcne) {
log.error("Failed Invoke of \"poll\"", jcne);
}
Expand Down
2 changes: 1 addition & 1 deletion src/net/Documentation/articles/actualstate.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ This release comes with some ready made classes:
* [X] Apache Kafka Admin Client covering all available APIs: since many classes are marked with @InterfaceStability.Evolving annotation some properties or methods can be missed; use **dynamic** code to interact with Admin API types.
* [X] Almost completed Apache Kafka Streams
* [X] Almost completed Apache Kafka Connect
* [ ] .NET Apache Kafka Connect SDK (under development)
* [X] .NET Apache Kafka Connect SDK (a basic version)

If something is not available use [API extensibility](API_extensibility.md) to cover missing features.
47 changes: 47 additions & 0 deletions src/net/Documentation/articles/connectSDK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# KNet: Connect SDK

This is only a quick start guide, many other information related to Apache Kafka Connect can be found at the following link https://kafka.apache.org/documentation/#connect

## General

To start a Connect session the user shall use the [KNet CLI](usageCLI.md).

The commands related to Apache Kafka Connect are:
- ConnectDistributed
- ConnectStandalone

To go in detail look at https://kafka.apache.org/documentation/#connect and https://kafka.apache.org/quickstart#quickstart_kafkaconnect.

## Standalone

In this guide we focus on the standalone version.
The guide start from the assumption that an assembly was generated: see [Template Usage Guide](usageTemplates.md).
Put the assembly within a folder (__C:\MyConnect__), then go within it.
As explained in https://kafka.apache.org/documentation/#connect Apache Kafka Connect needs at least one configuration file, in standalone mode it needs two configuration files:
1. The first file is **connect-standalone.properties** (or **connect-distributed.properties** for distributed environments): this file contains configuration information for Apache Kafka Connect;
2. The second optional file defines the connector to use and its options.

In the [config folder](https://github.com/masesgroup/KNet/tree/master/src/config) the user can found many configuration files.
The files named **connect-knet-sink.properties** and **connect-knet-source.properties** contain examples for sink and source connectors.

Copy within __C:\MyConnect__ **connect-standalone.properties** and update it especially on line containing __bootstrap.servers__, then copy **connect-knet-sink.properties** or **connect-knet-source.properties** depending on the connector type.
The main options related to KNet Connect SDK are:
- __connector.class=**KNetSinkConnector**__ where the value is the connector Java class and must be:
- __KNetSinkConnector__ for sink connectors
- __KNetSourceConnector__ for source connectors
- __knet.dotnet.classname=MASES.KNetTemplate.KNetConnect.KNetConnectSink, knetConnectSink__ where the value is the .NET class name in the form of __**FullName**, **AssemblyName**__

When the __C:\MyConnect__ folder contains all the files it is possible to run Apache Kafka Connect:

>
> knet -ClassToRun ConnectStandalone connect-standalone.properties connect-knet-sink.properties
>
## Distributed

As stated in [Apache Kafka Connect Guide](https://kafka.apache.org/documentation/#connect ) the distributed version does not use the connector file definition, instead it shall be managed using the REST interface.
The start-up command within __C:\MyConnect__ folder becomes:

>
> knet -ClassToRun ConnectDistributed connect-distributed.properties
>
18 changes: 11 additions & 7 deletions src/net/Documentation/articles/toc.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
- name: Introduction
href: intro.md
- name: Roadmap
- name: KNet Roadmap
href: roadmap.md
- name: Actual state
- name: KNet Actual state
href: actualstate.md
- name: Performance
- name: KNet Performance
href: performance.md
- name: APIs extendibility
- name: KNet APIs extendibility
href: API_extensibility.md
- name: JVM callbacks
- name: KNet JVM callbacks
href: jvm_callbacks.md
- name: Usage
- name: KNet Connect SDK
href: connectSDK.md
- name: KNet Usage
href: usage.md
- name: Usage CLI
- name: KNet CLI Usage
href: usageCLI.md
- name: KNet Template Usage Guide
href: usageTemplates.md
- name: How to build from scratch
href: howtobuild.md
38 changes: 38 additions & 0 deletions src/net/Documentation/articles/usageTemplates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# KNet: Template Usage Guide

For more information related to .NET templates look at https://docs.microsoft.com/en-us/dotnet/core/tools/dotnet-new-sdk-templates.

## Installation

To install the templates executes the following command within a command shell:

>
> dotnet new --install MASES.KNet.Templates
>
The command installs the latest version and on success will list all templates added to the list of available templates.
They are:
1. knetConsumerApp: a project to create a consumer application for Apache Kafka
2. knetPipeStreamApp: a project to create a pipe stream application for Apache Kafka Streams
3. knetProducerApp: a project to create a producer application for Apache Kafka
4. knetConnectSink: a project to create a library which conforms to an Apache Kafka Connect Sink Connector written in .NET
5. knetConnectSource: a project to create a library which conforms to an Apache Kafka Connect Source Connector written in .NET

## Simple usage

The first three templates are ready made project with enough code to use them directly.
To use one of the available templates run the following command:

>
> dotnet new knetConsumerApp
>
the previous command will create a .NET project for an executable. The user can modify the code or just execute it against an Apache Kafka server.

## SDK templates

The last two templates (knetConnectSink, knetConnectSource) are not ready made project: they are skeletons for Apache Kafka Connect Source/Sink Connector written in .NET.
The available code does not do anything: the functions in the code shall be filled to obtain some results.

With the available code the user can verify how an Apache Kafka Connect Source/Sink Connector, written in .NET, works; to do this the projects can be compiled to obtain an assembly.
See [Connect SDK](connectSDK.md) for some information on how use it.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/

using Java.Util;
using JavaLang = Java.Lang;

namespace MASES.KNet.Connect.Data
{
Expand All @@ -31,12 +30,12 @@ public ConnectSchema()
{
}

public ConnectSchema(Type type, bool optional, JavaLang.Object defaultValue, string name, int version, string doc, Map<string, string> parameters, List<Field> fields, Schema keySchema, Schema valueSchema)
public ConnectSchema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc, Map<string, string> parameters, List<Field> fields, Schema keySchema, Schema valueSchema)
: base(type, optional, defaultValue, name, version, doc, parameters, fields, keySchema, valueSchema)
{
}

public ConnectSchema(Type type, bool optional, JavaLang.Object defaultValue, string name, int version, string doc)
public ConnectSchema(Type type, bool optional, Java.Lang.Object defaultValue, string name, int version, string doc)
:this(type, optional, defaultValue, name, version, doc, null, null, null, null)
{
}
Expand Down
Loading

0 comments on commit 486e384

Please sign in to comment.