Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] use generate_sink_sql=true for db2 upsert #233

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
public class Db2DataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {

private static final String QUERY_KEY = "query";

private static final String GENERATE_SINK_SQL = "generate_sink_sql";

private static final String URL_KEY = "url";

// for catalog
Expand Down Expand Up @@ -109,16 +106,6 @@ public Config mergeDatasourceConfig(

connectorConfig =
connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql));
} else if (pluginType.equals(PluginType.SINK)) {

List<String> tableFields = selectTableFields.getTableFields();

String sql = generateDb2(tableFields, databaseName, tableName);

connectorConfig =
connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql));
} else {
throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
}

return super.mergeDatasourceConfig(
Expand Down Expand Up @@ -185,10 +172,6 @@ protected String tableFieldsToSql(List<String> tableFields, String database, Str
return generateSql(tableFields, database, null, table);
}

protected String generateDb2(List<String> tableFields, String database, String table) {
return generateSinkSql(tableFields, database, null, table);
}

protected String generateSql(
List<String> tableFields, String database, String schema, String table) {
StringBuilder sb = new StringBuilder();
Expand All @@ -208,35 +191,6 @@ protected String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}

protected String generateSinkSql(
List<String> tableFields, String database, String schema, String table) {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ")
.append(quoteIdentifier(database) + "." + quoteIdentifier(table))
.append(" (");

// Append column names
for (int i = 0; i < tableFields.size(); i++) {
sb.append(quoteIdentifier(tableFields.get(i)));
if (i < tableFields.size() - 1) {
sb.append(", ");
}
}

sb.append(") VALUES (");

// Append placeholders
for (int i = 0; i < tableFields.size(); i++) {
sb.append("?");
if (i < tableFields.size() - 1) {
sb.append(", ");
}
}

sb.append(");");
return sb.toString();
}

@Override
public String getDataSourceName() {
return "JDBC-DB2";
Expand Down
Loading