Skip to content

Commit

Permalink
Merge pull request #333 from C0urante/exactly-once-support
Browse files Browse the repository at this point in the history
feat: implement KIP-618 source connector API related to exactly-once support
  • Loading branch information
eliax1996 authored Jun 4, 2024
2 parents 0af9256 + 1b4220f commit 1a05381
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ publishing {
}
}

val kafkaVersion = "3.0.2"
val kafkaVersion = "3.3.2"
val slf4jVersion = "2.0.13"

val avroVersion = "1.8.1"
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/aiven/connect/jdbc/JdbcSourceConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;

Expand Down Expand Up @@ -215,4 +216,15 @@ public void stop() throws ConnectException {
public ConfigDef config() {
return JdbcSourceConnectorConfig.CONFIG_DEF;
}

@Override
public ExactlyOnceSupport exactlyOnceSupport(final Map<String, String> props) {
final String rawMode = props.get(JdbcSourceConnectorConfig.MODE_CONFIG);
// We don't support exactly-once in bulk mode (there's no offsets tracking), and we
// don't currently support exactly-once in timestamp mode (there may be multiple rows
// with the same timestamp, which can lead to either data loss or duplicates on restart)
final boolean supported = JdbcSourceConnectorConfig.MODE_INCREMENTING.equals(rawMode)
|| JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING.equals(rawMode);
return supported ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED;
}
}
26 changes: 26 additions & 0 deletions src/test/java/io/aiven/connect/jdbc/JdbcSourceConnectorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.ExactlyOnceSupport;

import io.aiven.connect.jdbc.config.JdbcConfig;
import io.aiven.connect.jdbc.source.EmbeddedDerby;
Expand All @@ -43,6 +44,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
Expand Down Expand Up @@ -212,6 +214,30 @@ public void testConflictingQueryTableSettings() {
assertThatThrownBy(() -> connector.start(connProps)).isInstanceOf(ConnectException.class);
}

@Test
public void testExactlyOnceSupport() {
connProps.remove(JdbcSourceConnectorConfig.MODE_CONFIG);
assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps));

connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, null);
assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps));

connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, "unsupported mode");
assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps));

connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_BULK);
assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps));

connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_TIMESTAMP);
assertEquals(ExactlyOnceSupport.UNSUPPORTED, connector.exactlyOnceSupport(connProps));

connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_INCREMENTING);
assertEquals(ExactlyOnceSupport.SUPPORTED, connector.exactlyOnceSupport(connProps));

connProps.put(JdbcSourceConnectorConfig.MODE_CONFIG, JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING);
assertEquals(ExactlyOnceSupport.SUPPORTED, connector.exactlyOnceSupport(connProps));
}

private void assertTaskConfigsHaveParentConfigs(final List<Map<String, String>> configs) {
for (final Map<String, String> config : configs) {
assertThat(config).containsEntry(JdbcConfig.CONNECTION_URL_CONFIG, this.db.getUrl());
Expand Down

0 comments on commit 1a05381

Please sign in to comment.