Skip to content

Commit

Permalink
Add config for setting page fetch size
Browse files Browse the repository at this point in the history
Adds a field in connector config for setting default page fetch size
on CDC library side. This is useful in cases where connector works with
really heavy rows and loading too many of them at once exhausts available
memory very quickly. By setting smaller fetch size the connector will load
less rows at once.

Requires scylla-cdc-java with the configurable fetch size patch.
  • Loading branch information
Bouncheck authored and avelanarius committed Jun 17, 2024
1 parent 11cbdd4 commit 20f2724
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
<!-- Docs claim java 8 supported, but support is deprecated -->
<kafka.version>3.3.1</kafka.version>
<scylla.driver.version>3.11.5.3</scylla.driver.version>
<scylla.cdc.java.version>1.3.2</scylla.cdc.java.version>
<scylla.cdc.java.version>1.3.3</scylla.cdc.java.version>
<flogger.version>0.5.1</flogger.version>
<!-- added for transitive dependencies -->
<log4j.version>2.17.1</log4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("The consistency level of CDC table read queries. This consistency level is used only for read queries " +
"to the CDC log table.");
public static final Field QUERY_OPTIONS_FETCH_SIZE = Field.create("scylla.query.options.fetch.size")
.withDisplayName("Queries fetch size")
.withType(ConfigDef.Type.INT)
.withDefault(0)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(Field::isNonNegativeInteger)
.withDescription("The default page fetch size for all driver select queries. Value 0 means use driver " +
"defaults (usually 5000). Passed to " +
"driver's QueryOptions before session construction. Set this to an explicit value if " +
"experiencing too high memory usage.");

public static final Field LOCAL_DC_NAME = Field.create("scylla.local.dc")
.withDisplayName("Local DC Name")
Expand Down Expand Up @@ -192,7 +203,7 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
private static final ConfigDefinition CONFIG_DEFINITION =
CommonConnectorConfig.CONFIG_DEFINITION.edit()
.name("Scylla")
.type(CLUSTER_IP_ADDRESSES, USER, PASSWORD, LOGICAL_NAME, CONSISTENCY_LEVEL, LOCAL_DC_NAME, SSL_ENABLED, SSL_PROVIDER, SSL_TRUSTSTORE_PATH, SSL_TRUSTSTORE_PASSWORD, SSL_KEYSTORE_PATH, SSL_KEYSTORE_PASSWORD,SSL_CIPHER_SUITES, SSL_OPENSLL_KEYCERTCHAIN, SSL_OPENSLL_PRIVATEKEY)
.type(CLUSTER_IP_ADDRESSES, USER, PASSWORD, LOGICAL_NAME, CONSISTENCY_LEVEL, QUERY_OPTIONS_FETCH_SIZE, LOCAL_DC_NAME, SSL_ENABLED, SSL_PROVIDER, SSL_TRUSTSTORE_PATH, SSL_TRUSTSTORE_PASSWORD, SSL_KEYSTORE_PATH, SSL_KEYSTORE_PASSWORD,SSL_CIPHER_SUITES, SSL_OPENSLL_KEYCERTCHAIN, SSL_OPENSLL_PRIVATEKEY)
.connector(QUERY_TIME_WINDOW_SIZE, CONFIDENCE_WINDOW_SIZE, PREIMAGES_ENABLED)
.events(TABLE_NAMES)
.excluding(Heartbeat.HEARTBEAT_INTERVAL).events(CUSTOM_HEARTBEAT_INTERVAL)
Expand Down Expand Up @@ -302,6 +313,10 @@ public boolean getPreimagesEnabled() {
return config.getBoolean(ScyllaConnectorConfig.PREIMAGES_ENABLED);
}

public int getQueryOptionsFetchSize() {
return config.getInteger(ScyllaConnectorConfig.QUERY_OPTIONS_FETCH_SIZE);
}

@Override
public String getContextName() {
return "Scylla";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public Driver3Session build() {
sslBuilder.withPrivateKeyPath(configuration.getPrivateKeyPath());
builder.withSslConfig(sslBuilder.build());
}
builder.withQueryOptionsFetchSize(configuration.getQueryOptionsFetchSize());

return new Driver3Session(builder.build());
}
}

0 comments on commit 20f2724

Please sign in to comment.