Skip to content

Commit

Permalink
confluentinc#1360 Prevent ungraceful JdbcSourceTask stoppage with a m…
Browse files Browse the repository at this point in the history
…aximum duration time for the poll operation
  • Loading branch information
jakubmalek committed Aug 6, 2024
1 parent 46c9b37 commit d992476
Show file tree
Hide file tree
Showing 5 changed files with 552 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicReference;

import com.microsoft.sqlserver.jdbc.SQLServerConnection;
import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.util.DatabaseDialectRecommender;
import io.confluent.connect.jdbc.util.DateTimeUtils;
import io.confluent.connect.jdbc.util.EnumRecommender;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.TimeZoneValidator;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Pattern;
Expand All @@ -44,6 +34,7 @@
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Range;
import org.apache.kafka.common.config.ConfigDef.Recommender;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Validator;
Expand All @@ -56,10 +47,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.microsoft.sqlserver.jdbc.ISQLServerConnection;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.util.DatabaseDialectRecommender;
import io.confluent.connect.jdbc.util.DateTimeUtils;
import io.confluent.connect.jdbc.util.EnumRecommender;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.TimeZoneValidator;

public class JdbcSourceConnectorConfig extends AbstractConfig {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceConnectorConfig.class);
private static Pattern INVALID_CHARS = Pattern.compile("[^a-zA-Z0-9._-]");
private static final Pattern INVALID_CHARS = Pattern.compile("[^a-zA-Z0-9._-]");

public static final String CONNECTION_PREFIX = "connection.";

Expand Down Expand Up @@ -101,6 +102,19 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
public static final int POLL_INTERVAL_MS_DEFAULT = 5000;
private static final String POLL_INTERVAL_MS_DISPLAY = "Poll Interval (ms)";

public static final String POLL_MAX_WAIT_TIME_MS_CONFIG = "poll.max.wait.time.ms";
public static final String POLL_MAX_WAIT_TIME_MS_DOC = "The maximum time in ms to wait by "
+ "the worker task for the poll operation. This includes additional poll.interval.ms "
+ "wait time applied in between subsequent poll calls. If the set maximum time is exceeded, "
+ "the task will signal no-data to the worker. The polling operation however will not be "
+ "interrupted until the task is stopped. Each time the worker is poll the records from the "
+ "source task it will either wait for the result from the previously started polling "
+ "operation or a new polling operation will be started. "
+ "When the poll.max.wait.time.ms is set to zero, then the worker will wait indefinitely "
+ "until the polling operation is finished.";
public static final int POLL_MAX_WAIT_TIME_MS_DEFAULT = 1_000;
private static final String POLL_MAX_DURATION_MS_DISPLAY = "Poll Max Wait Time (ms)";

public static final String BATCH_MAX_ROWS_CONFIG = "batch.max.rows";
private static final String BATCH_MAX_ROWS_DOC =
"Maximum number of rows to include in a single batch when polling for new data. This "
Expand Down Expand Up @@ -314,7 +328,7 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {

public static final String QUERY_SUFFIX_CONFIG = "query.suffix";
public static final String QUERY_SUFFIX_DEFAULT = "";
public static final String QUERY_SUFFIX_DOC =
public static final String QUERY_SUFFIX_DOC =
"Suffix to append at the end of the generated query.";
public static final String QUERY_SUFFIX_DISPLAY = "Query suffix";

Expand Down Expand Up @@ -401,18 +415,15 @@ public Config validateMultiConfigs(Config config) {
} else {
dialect = DatabaseDialects.findBestFor(this.getString(CONNECTION_URL_CONFIG), this);
}
if (!dialect.name().equals(
DatabaseDialects.create(
SqlServerDatabaseDialectName, this
).name()
)
) {
configValues
.get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG)
.addErrorMessage("Isolation mode of `"
+ TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name()
+ "` can only be configured with a Sql Server Dialect"
);
try (DatabaseDialect sqlServerDialect = DatabaseDialects.create(
SqlServerDatabaseDialectName, this)) {
if (!dialect.name().equals(sqlServerDialect.name())) {
configValues
.get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG)
.addErrorMessage("Isolation mode of `"
+ TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name()
+ "` can only be configured with a Sql Server Dialect");
}
}
}

Expand Down Expand Up @@ -694,6 +705,17 @@ private static final void addConnectorOptions(ConfigDef config) {
++orderInGroup,
Width.SHORT,
POLL_INTERVAL_MS_DISPLAY
).define(
POLL_MAX_WAIT_TIME_MS_CONFIG,
Type.INT,
POLL_MAX_WAIT_TIME_MS_DEFAULT,
Range.atLeast(0),
Importance.MEDIUM,
POLL_MAX_WAIT_TIME_MS_DOC,
CONNECTOR_GROUP,
++orderInGroup,
Width.SHORT,
POLL_MAX_DURATION_MS_DISPLAY
).define(
BATCH_MAX_ROWS_CONFIG,
Type.INT,
Expand Down Expand Up @@ -792,7 +814,7 @@ public JdbcSourceConnectorConfig(Map<String, ?> props) {
}

public String topicPrefix() {
return getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG).trim();
return getString(TOPIC_PREFIX_CONFIG).trim();
}

/**
Expand Down Expand Up @@ -914,7 +936,7 @@ public static NumericMapping get(JdbcSourceConnectorConfig config) {
if (newMappingConfig != null) {
return get(config.getString(JdbcSourceConnectorConfig.NUMERIC_MAPPING_CONFIG));
}
if (config.getBoolean(JdbcSourceTaskConfig.NUMERIC_PRECISION_MAPPING_CONFIG)) {
if (config.getBoolean(NUMERIC_PRECISION_MAPPING_CONFIG)) {
return NumericMapping.PRECISION_ONLY;
}
return NumericMapping.NONE;
Expand Down Expand Up @@ -993,7 +1015,7 @@ public static int get(TransactionIsolationMode mode) {
case SERIALIZABLE:
return Connection.TRANSACTION_SERIALIZABLE;
case SQL_SERVER_SNAPSHOT:
return SQLServerConnection.TRANSACTION_SNAPSHOT;
return ISQLServerConnection.TRANSACTION_SNAPSHOT;
default:
return -1;
}
Expand All @@ -1010,7 +1032,7 @@ public NumericMapping numericMapping() {
}

public TimeZone timeZone() {
String dbTimeZone = getString(JdbcSourceTaskConfig.DB_TIMEZONE_CONFIG);
String dbTimeZone = getString(DB_TIMEZONE_CONFIG);
return TimeZone.getTimeZone(ZoneId.of(dbTimeZone));
}

Expand Down
Loading

0 comments on commit d992476

Please sign in to comment.