Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement KNet Connect SDK #68

Merged
merged 13 commits into from
May 18, 2022
Prev Previous commit
Next Next commit
#65: Added various contexts usable within SDK
  • Loading branch information
masesdevelopers committed May 16, 2022
commit 047a37cad460460579ea875606d8fc746524d317
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand All @@ -48,6 +49,10 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public SinkConnectorContext getContext() {
return context();
}

@Override
public void start(Map<String, String> props) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public KNetSinkTask() throws ConfigException, JCException, IOException {
sinkTask = (JCObject) sink.Invoke("AllocateTask", taskid);
}

public SinkTaskContext getContext() {
return context;
}

@Override
public String version() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand All @@ -48,6 +49,10 @@ public void setDataToExchange(Object dte) {
dataToExchange = dte;
}

public SourceConnectorContext getContext() {
return context();
}

@Override
public void start(Map<String, String> props) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.mases.jcobridge.*;
import org.mases.knet.connect.KNetConnectProxy;
import org.slf4j.Logger;
Expand Down Expand Up @@ -56,6 +57,10 @@ public KNetSourceTask() throws ConfigException, JCException, IOException {
sourceTask = (JCObject) source.Invoke("AllocateTask", taskid);
}

public SourceTaskContext getContext() {
return context;
}

@Override
public String version() {
try {
Expand Down
14 changes: 14 additions & 0 deletions src/net/KNet/ClientSide/BridgedClasses/Connect/KNetConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ protected T DataToExchange<T>()
}
return (reflectedConnector != null) ? reflectedConnector.Invoke<T>("getDataToExchange") : throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM");
}
/// <summary>
/// An helper function to read the data from Java side
/// </summary>
/// <typeparam name="T">The expected return <see cref="Type"/></typeparam>
/// <returns>The <typeparamref name="T"/></returns>
/// <exception cref="InvalidOperationException"> </exception>
protected T Context<T>()
{
if (reflectedConnector == null)
{
reflectedConnector = KNetCore.GlobalInstance.GetJVMGlobal(ReflectedConnectorClassName);
}
return (reflectedConnector != null) ? reflectedConnector.Invoke<T>("getContext") : throw new InvalidOperationException($"{ReflectedConnectorClassName} was not registered in global JVM");
}
/// <inheritdoc cref="IKNetConnector.AllocateTask(long)"/>
public object AllocateTask(long taskId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* Refer to LICENSE for more information.
*/

using MASES.KNet.Connect.Sink;
using MASES.KNet.Connect.Source;
using System;

namespace MASES.KNet.Connect
Expand All @@ -26,6 +28,10 @@ namespace MASES.KNet.Connect
/// <typeparam name="TTask">The task class inherited from <see cref="KNetSinkTask"/></typeparam>
public abstract class KNetSinkConnector<TTask> : KNetConnector where TTask : KNetSinkTask
{
/// <summary>
/// The <see cref="SinkConnectorContext"/>
/// </summary>
public SinkConnectorContext Context => Context<SinkConnectorContext>();
/// <summary>
/// Set the <see cref="ReflectedConnectorClassName"/> of the connector to a fixed value
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ namespace MASES.KNet.Connect
/// </summary>
public abstract class KNetSinkTask : KNetTask
{
/// <summary>
/// The <see cref="SinkTaskContext"/>
/// </summary>
public SinkTaskContext Context => Context<SinkTaskContext>();
/// <summary>
/// Set the <see cref="ReflectedTaskClassName"/> of the connector to a fixed value
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Refer to LICENSE for more information.
*/

using MASES.KNet.Connect.Source;
using System;

namespace MASES.KNet.Connect
Expand All @@ -26,6 +27,10 @@ namespace MASES.KNet.Connect
/// <typeparam name="TTask">The task class inherited from <see cref="KNetSourceTask"/></typeparam>
public abstract class KNetSourceConnector<TTask> : KNetConnector where TTask : KNetSourceTask
{
/// <summary>
/// The <see cref="SourceConnectorContext"/>
/// </summary>
public SourceConnectorContext Context => Context<SourceConnectorContext>();
/// <summary>
/// Set the <see cref="ReflectedConnectorClassName"/> of the connector to a fixed value
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ namespace MASES.KNet.Connect
/// </summary>
public abstract class KNetSourceTask : KNetTask
{
/// <summary>
/// The <see cref="SourceTaskContext"/>
/// </summary>
public SourceTaskContext Context => Context<SourceTaskContext>();
/// <summary>
/// Set the <see cref="ReflectedTaskClassName"/> of the connector to a fixed value
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions src/net/KNet/ClientSide/BridgedClasses/Connect/KNetTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ protected T DataToExchange<T>()
{
return (reflectedTask != null) ? reflectedTask.Invoke<T>("getDataToExchange") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM");
}
/// <summary>
/// An helper function to read the data from Java side
/// </summary>
/// <typeparam name="T">The expected return <see cref="Type"/></typeparam>
/// <returns>The <typeparamref name="T"/></returns>
/// <exception cref="InvalidOperationException"> </exception>
protected T Context<T>()
{
return (reflectedTask != null) ? reflectedTask.Invoke<T>("getContext") : throw new InvalidOperationException($"{ReflectedTaskClassName} was not registered in global JVM");
}
/// <inheritdoc cref="IKNetTask.Connector"/>
public IKNetConnector Connector => connector;
/// <inheritdoc cref="IKNetTask.TaskId"/>
Expand Down