Skip to content

Commit

Permalink
improve: Reduce rowData by primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
itinycheng committed Jul 26, 2023
1 parent 5c42c8d commit a43f9c3
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static ClickHouseUpsertExecutor createUpsertExecutor(
String databaseName,
String clusterName,
String[] fieldNames,
String[] keyFields,
String[] keyFieldNames,
String[] partitionFields,
LogicalType[] fieldTypes,
ClickHouseDmlOptions options) {
Expand All @@ -117,26 +117,26 @@ static ClickHouseUpsertExecutor createUpsertExecutor(
databaseName,
clusterName,
fieldNames,
keyFields,
keyFieldNames,
partitionFields);
String deleteSql =
ClickHouseStatementFactory.getDeleteStatement(
tableName, databaseName, clusterName, keyFields);
tableName, databaseName, clusterName, keyFieldNames);

// Re-sort the order of fields to fit the sql statement.
int[] delFields =
Arrays.stream(keyFields)
int[] keyFields =
Arrays.stream(keyFieldNames)
.mapToInt(pk -> ArrayUtils.indexOf(fieldNames, pk))
.toArray();
int[] updatableFields =
IntStream.range(0, fieldNames.length)
.filter(idx -> !ArrayUtils.contains(keyFields, fieldNames[idx]))
.filter(idx -> !ArrayUtils.contains(keyFieldNames, fieldNames[idx]))
.filter(idx -> !ArrayUtils.contains(partitionFields, fieldNames[idx]))
.toArray();
int[] updFields = ArrayUtils.addAll(updatableFields, delFields);
int[] updFields = ArrayUtils.addAll(updatableFields, keyFields);

LogicalType[] delTypes =
Arrays.stream(delFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
LogicalType[] keyTypes =
Arrays.stream(keyFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
LogicalType[] updTypes =
Arrays.stream(updFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);

Expand All @@ -146,9 +146,9 @@ static ClickHouseUpsertExecutor createUpsertExecutor(
deleteSql,
new ClickHouseRowConverter(RowType.of(fieldTypes)),
new ClickHouseRowConverter(RowType.of(updTypes)),
new ClickHouseRowConverter(RowType.of(delTypes)),
new ClickHouseRowConverter(RowType.of(keyTypes)),
createExtractor(fieldTypes, updFields),
createExtractor(fieldTypes, delFields),
createExtractor(fieldTypes, keyFields),
options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import static org.apache.flink.connector.clickhouse.config.ClickHouseConfigOptions.SinkUpdateStrategy.DISCARD;
Expand Down Expand Up @@ -43,14 +45,16 @@ public class ClickHouseUpsertExecutor implements ClickHouseExecutor {

private final Function<RowData, RowData> updateExtractor;

private final Function<RowData, RowData> deleteExtractor;
private final Function<RowData, RowData> keyExtractor;

private final int maxRetries;

private final SinkUpdateStrategy updateStrategy;

private final boolean ignoreDelete;

private final Map<RowData, RowData> reduceBuffer = new HashMap<>();

private transient ClickHouseStatementWrapper insertStatement;

private transient ClickHouseStatementWrapper updateStatement;
Expand All @@ -67,7 +71,7 @@ public ClickHouseUpsertExecutor(
ClickHouseRowConverter updateConverter,
ClickHouseRowConverter deleteConverter,
Function<RowData, RowData> updateExtractor,
Function<RowData, RowData> deleteExtractor,
Function<RowData, RowData> keyExtractor,
ClickHouseDmlOptions options) {
this.insertSql = insertSql;
this.updateSql = updateSql;
Expand All @@ -76,7 +80,7 @@ public ClickHouseUpsertExecutor(
this.updateConverter = updateConverter;
this.deleteConverter = deleteConverter;
this.updateExtractor = updateExtractor;
this.deleteExtractor = deleteExtractor;
this.keyExtractor = keyExtractor;
this.maxRetries = options.getMaxRetries();
this.updateStrategy = options.getUpdateStrategy();
this.ignoreDelete = options.isIgnoreDelete();
Expand Down Expand Up @@ -106,7 +110,28 @@ public void prepareStatement(ClickHouseConnectionProvider connectionProvider)
public void setRuntimeContext(RuntimeContext context) {}

@Override
public void addToBatch(RowData record) throws SQLException {
public void addToBatch(RowData record) {
RowData key = keyExtractor.apply(record);
reduceBuffer.put(key, record);
}

@Override
public void executeBatch() throws SQLException {
for (RowData value : reduceBuffer.values()) {
addValueToBatch(value);
}

for (ClickHouseStatementWrapper clickHouseStatement :
Arrays.asList(insertStatement, updateStatement, deleteStatement)) {
if (clickHouseStatement != null) {
attemptExecuteBatch(clickHouseStatement, maxRetries);
}
}

reduceBuffer.clear();
}

private void addValueToBatch(RowData record) throws SQLException {
switch (record.getRowKind()) {
case INSERT:
insertConverter.toExternal(record, insertStatement);
Expand All @@ -127,7 +152,7 @@ public void addToBatch(RowData record) throws SQLException {
break;
case DELETE:
if (!ignoreDelete) {
deleteConverter.toExternal(deleteExtractor.apply(record), deleteStatement);
deleteConverter.toExternal(keyExtractor.apply(record), deleteStatement);
deleteStatement.addBatch();
}
break;
Expand All @@ -141,16 +166,6 @@ public void addToBatch(RowData record) throws SQLException {
}
}

@Override
public void executeBatch() throws SQLException {
for (ClickHouseStatementWrapper clickHouseStatement :
Arrays.asList(insertStatement, updateStatement, deleteStatement)) {
if (clickHouseStatement != null) {
attemptExecuteBatch(clickHouseStatement, maxRetries);
}
}
}

@Override
public void closeStatement() {
for (ClickHouseStatementWrapper clickHouseStatement :
Expand Down

0 comments on commit a43f9c3

Please sign in to comment.