Skip to content

Commit

Permalink
#65: Added fallback value in configuration for exactlyOnceSupport and…
Browse files Browse the repository at this point in the history
… canDefineTransactionBoundaries (#90)

* #65: Added fallback value in configuration for exactlyOnceSupport and canDefineTransactionBoundaries

* #65: scoped new properties
  • Loading branch information
masesdevelopers authored Oct 4, 2022
1 parent 4872fc0 commit 01bfffd
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/config/connect-knet-source.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ name=local-knet-source
connector.class=KNetSourceConnector
tasks.max=1
knet.dotnet.classname=MASES.KNetTemplate.KNetConnect.KNetConnectSource, knetConnectSource
# enable following line if Connect state machine invokes exactlyOnceSupport method before infrastructure is ready to receive request in .NET side
#knet.dotnet.source.exactlyOnceSupport=true
# enable following line if Connect state machine invokes canDefineTransactionBoundaries method before infrastructure is ready to receive request in .NET side
#knet.dotnet.source.canDefineTransactionBoundaries=true
topic=topic-perf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.mases.knet.connect.source;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -38,6 +39,13 @@ public class KNetSourceConnector extends SourceConnector {

private static final String registrationName = "KNetSourceConnector";

public static final String DOTNET_EXACTLYONCESUPPORT_CONFIG = "knet.dotnet.source.exactlyOnceSupport";
public static final String DOTNET_CANDEFINETRANSACTIONBOUNDARIES_CONFIG = "knet.dotnet.source.canDefineTransactionBoundaries";

public static final ConfigDef CONFIG_DEF = new ConfigDef(KNetConnectProxy.CONFIG_DEF)
.define(DOTNET_EXACTLYONCESUPPORT_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "Fallback value if infrastructure is not ready to receive request in .NET side to get exactlyOnceSupport")
.define(DOTNET_CANDEFINETRANSACTIONBOUNDARIES_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, "Fallback value if infrastructure is not ready to receive request in .NET side to get canDefineTransactionBoundaries");

Object dataToExchange = null;

public Object getDataToExchange() {
Expand Down Expand Up @@ -116,7 +124,7 @@ public void stop() {

@Override
public ConfigDef config() {
return KNetConnectProxy.CONFIG_DEF;
return CONFIG_DEF;
}

@Override
Expand All @@ -131,6 +139,7 @@ public String version() {
}
return "NOT AVAILABLE";
}

@Override
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
try {
Expand All @@ -145,7 +154,11 @@ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig
} catch (JCException | IOException jcne) {
log.error("Failed Invoke of \"exactlyOnceSupport\"", jcne);
}
return null;
log.info("Fallback Invoke of \"exactlyOnceSupport\" to configuration");
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, connectorConfig);
Boolean exactlyOnceSupport = parsedConfig.getBoolean(DOTNET_EXACTLYONCESUPPORT_CONFIG);
if (exactlyOnceSupport.booleanValue()) return ExactlyOnceSupport.SUPPORTED;
return ExactlyOnceSupport.UNSUPPORTED;
}

@Override
Expand All @@ -162,6 +175,10 @@ public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String,
} catch (JCException | IOException jcne) {
log.error("Failed Invoke of \"canDefineTransactionBoundaries\"", jcne);
}
log.info("Fallback Invoke of \"canDefineTransactionBoundaries\" to configuration");
AbstractConfig parsedConfig = new AbstractConfig(CONFIG_DEF, connectorConfig);
Boolean canDefineTransactionBoundaries = parsedConfig.getBoolean(DOTNET_CANDEFINETRANSACTIONBOUNDARIES_CONFIG);
if (canDefineTransactionBoundaries.booleanValue()) return ConnectorTransactionBoundaries.SUPPORTED;
return ConnectorTransactionBoundaries.UNSUPPORTED;
}
}
8 changes: 8 additions & 0 deletions src/net/Documentation/articles/connectSDK.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ When the __C:\MyConnect__ folder contains all the files it is possible to run Ap
> knet -ClassToRun ConnectStandalone connect-standalone.properties connect-knet-sink.properties
>
### Exactly Once and Transaction properties for Source Connector

From version 3.3.1 of Apache Kafka Connect it is possible to manage Exactly Once and Transaction in the source connector.

Two new fallback options are available in case the infrastructure is not ready to receive request in .NET side to obtain values related to Exactly Once and Transaction:
- `knet.dotnet.source.exactlyOnceSupport`: set to true if .NET Source Connector supports Exactly Once
- `knet.dotnet.source.canDefineTransactionBoundaries`: set to true if .NET Source Connector can define Transaction Boundaries

## Distributed

As stated in [Apache Kafka Connect Guide](https://kafka.apache.org/documentation/#connect ) the distributed version does not use the connector file definition, instead it shall be managed using the REST interface.
Expand Down

0 comments on commit 01bfffd

Please sign in to comment.