Skip to content

Commit

Permalink
[bug fix] Avoid invalid lambda deserialization exception by wrappin…
Browse files Browse the repository at this point in the history
…g ClickHouseStatement
  • Loading branch information
itinycheng committed Mar 14, 2023
1 parent d5db4a9 commit 21c7aab
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public RowData toInternal(ResultSet resultSet) throws SQLException {
return genericRowData;
}

public void toExternal(RowData rowData, ClickHousePreparedStatement statement)
public void toExternal(RowData rowData, ClickHouseStatementWrapper statement)
throws SQLException {
for (int index = 0; index < rowData.getArity(); index++) {
if (!rowData.isNullAt(index)) {
Expand Down Expand Up @@ -217,17 +217,17 @@ protected ClickHouseRowConverter.SerializationConverter createToExternalConverte
@FunctionalInterface
interface SerializationConverter extends Serializable {
/**
* Convert a internal field to to java object and fill into the {@link
* Convert an internal field to java object and fill into the {@link
* ClickHousePreparedStatement}.
*/
void serialize(RowData rowData, int index, ClickHousePreparedStatement statement)
void serialize(RowData rowData, int index, ClickHouseStatementWrapper statement)
throws SQLException;
}

@FunctionalInterface
interface DeserializationConverter extends Serializable {
/**
* Convert a object of {@link ClickHouseResultSet} to the internal data structure object.
* Convert an object of {@link ClickHouseResultSet} to the internal data structure object.
*/
Object deserialize(Object field) throws SQLException;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.apache.flink.connector.clickhouse.internal.converter;

import ru.yandex.clickhouse.ClickHousePreparedStatement;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Timestamp;

/** Wrapper class for ClickHousePreparedStatement. */
public class ClickHouseStatementWrapper {
public final ClickHousePreparedStatement statement;

public ClickHouseStatementWrapper(ClickHousePreparedStatement statement) {
this.statement = statement;
}

public void addBatch() throws SQLException {
statement.addBatch();
}

public int[] executeBatch() throws SQLException {
return statement.executeBatch();
}

public void close() throws SQLException {
statement.close();
}

public void setBoolean(int parameterIndex, boolean x) throws SQLException {
statement.setBoolean(parameterIndex, x);
}

public void setByte(int parameterIndex, byte x) throws SQLException {
statement.setByte(parameterIndex, x);
}

public void setShort(int parameterIndex, short x) throws SQLException {
statement.setShort(parameterIndex, x);
}

public void setInt(int parameterIndex, int x) throws SQLException {
statement.setInt(parameterIndex, x);
}

public void setLong(int parameterIndex, long x) throws SQLException {
statement.setLong(parameterIndex, x);
}

public void setFloat(int parameterIndex, float x) throws SQLException {
statement.setFloat(parameterIndex, x);
}

public void setDouble(int parameterIndex, double x) throws SQLException {
statement.setDouble(parameterIndex, x);
}

public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
statement.setBigDecimal(parameterIndex, x);
}

public void setString(int parameterIndex, String x) throws SQLException {
statement.setString(parameterIndex, x);
}

public void setBytes(int parameterIndex, byte[] x) throws SQLException {
statement.setBytes(parameterIndex, x);
}

public void setDate(int parameterIndex, Date x) throws SQLException {
statement.setDate(parameterIndex, x);
}

public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
statement.setTimestamp(parameterIndex, x);
}

public void setArray(int parameterIndex, Object[] array) throws SQLException {
statement.setArray(parameterIndex, array);
}

public void setObject(int parameterIndex, Object x) throws SQLException {
statement.setObject(parameterIndex, x);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions;
import org.apache.flink.table.data.RowData;

Expand All @@ -27,7 +28,7 @@ public class ClickHouseBatchExecutor implements ClickHouseExecutor {

private final int maxRetries;

private transient ClickHousePreparedStatement statement;
private transient ClickHouseStatementWrapper statement;

private transient ClickHouseConnectionProvider connectionProvider;

Expand All @@ -40,7 +41,9 @@ public ClickHouseBatchExecutor(

@Override
public void prepareStatement(ClickHouseConnection connection) throws SQLException {
statement = (ClickHousePreparedStatement) connection.prepareStatement(insertSql);
statement =
new ClickHouseStatementWrapper(
(ClickHousePreparedStatement) connection.prepareStatement(insertSql));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.flink.connector.clickhouse.internal.ClickHouseStatementFactory;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
Expand All @@ -14,7 +15,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHousePreparedStatement;

import java.io.Serializable;
import java.sql.SQLException;
Expand All @@ -41,7 +41,7 @@ public interface ClickHouseExecutor extends Serializable {

void closeStatement();

default void attemptExecuteBatch(ClickHousePreparedStatement stmt, int maxRetries)
default void attemptExecuteBatch(ClickHouseStatementWrapper stmt, int maxRetries)
throws SQLException {
for (int i = 0; i <= maxRetries; i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions;
import org.apache.flink.table.data.RowData;

Expand Down Expand Up @@ -50,11 +51,11 @@ public class ClickHouseUpsertExecutor implements ClickHouseExecutor {

private final boolean ignoreDelete;

private transient ClickHousePreparedStatement insertStmt;
private transient ClickHouseStatementWrapper insertStatement;

private transient ClickHousePreparedStatement updateStmt;
private transient ClickHouseStatementWrapper updateStatement;

private transient ClickHousePreparedStatement deleteStmt;
private transient ClickHouseStatementWrapper deleteStatement;

private transient ClickHouseConnectionProvider connectionProvider;

Expand Down Expand Up @@ -83,9 +84,15 @@ public ClickHouseUpsertExecutor(

@Override
public void prepareStatement(ClickHouseConnection connection) throws SQLException {
this.insertStmt = (ClickHousePreparedStatement) connection.prepareStatement(this.insertSql);
this.updateStmt = (ClickHousePreparedStatement) connection.prepareStatement(this.updateSql);
this.deleteStmt = (ClickHousePreparedStatement) connection.prepareStatement(this.deleteSql);
this.insertStatement =
new ClickHouseStatementWrapper(
(ClickHousePreparedStatement) connection.prepareStatement(this.insertSql));
this.updateStatement =
new ClickHouseStatementWrapper(
(ClickHousePreparedStatement) connection.prepareStatement(this.updateSql));
this.deleteStatement =
new ClickHouseStatementWrapper(
(ClickHousePreparedStatement) connection.prepareStatement(this.deleteSql));
}

@Override
Expand All @@ -102,16 +109,16 @@ public void setRuntimeContext(RuntimeContext context) {}
public void addToBatch(RowData record) throws SQLException {
switch (record.getRowKind()) {
case INSERT:
insertConverter.toExternal(record, insertStmt);
insertStmt.addBatch();
insertConverter.toExternal(record, insertStatement);
insertStatement.addBatch();
break;
case UPDATE_AFTER:
if (INSERT.equals(updateStrategy)) {
insertConverter.toExternal(record, insertStmt);
insertStmt.addBatch();
insertConverter.toExternal(record, insertStatement);
insertStatement.addBatch();
} else if (UPDATE.equals(updateStrategy)) {
updateConverter.toExternal(updateExtractor.apply(record), updateStmt);
updateStmt.addBatch();
updateConverter.toExternal(updateExtractor.apply(record), updateStatement);
updateStatement.addBatch();
} else if (DISCARD.equals(updateStrategy)) {
LOG.debug("Discard a record of type UPDATE_AFTER: {}", record);
} else {
Expand All @@ -120,8 +127,8 @@ public void addToBatch(RowData record) throws SQLException {
break;
case DELETE:
if (!ignoreDelete) {
deleteConverter.toExternal(deleteExtractor.apply(record), deleteStmt);
deleteStmt.addBatch();
deleteConverter.toExternal(deleteExtractor.apply(record), deleteStatement);
deleteStatement.addBatch();
}
break;
case UPDATE_BEFORE:
Expand All @@ -136,21 +143,21 @@ public void addToBatch(RowData record) throws SQLException {

@Override
public void executeBatch() throws SQLException {
for (ClickHousePreparedStatement clickHousePreparedStatement :
Arrays.asList(insertStmt, updateStmt, deleteStmt)) {
if (clickHousePreparedStatement != null) {
attemptExecuteBatch(clickHousePreparedStatement, maxRetries);
for (ClickHouseStatementWrapper clickHouseStatement :
Arrays.asList(insertStatement, updateStatement, deleteStatement)) {
if (clickHouseStatement != null) {
attemptExecuteBatch(clickHouseStatement, maxRetries);
}
}
}

@Override
public void closeStatement() {
for (ClickHousePreparedStatement clickHousePreparedStatement :
Arrays.asList(insertStmt, updateStmt, deleteStmt)) {
if (clickHousePreparedStatement != null) {
for (ClickHouseStatementWrapper clickHouseStatement :
Arrays.asList(insertStatement, updateStatement, deleteStatement)) {
if (clickHouseStatement != null) {
try {
clickHousePreparedStatement.close();
clickHouseStatement.close();
} catch (SQLException exception) {
LOG.warn("ClickHouse upsert statement could not be closed.", exception);
}
Expand Down

0 comments on commit 21c7aab

Please sign in to comment.