diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java index f4f987f..b008467 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java @@ -67,7 +67,7 @@ public RowData toInternal(ResultSet resultSet) throws SQLException { return genericRowData; } - public void toExternal(RowData rowData, ClickHousePreparedStatement statement) + public void toExternal(RowData rowData, ClickHouseStatementWrapper statement) throws SQLException { for (int index = 0; index < rowData.getArity(); index++) { if (!rowData.isNullAt(index)) { @@ -217,17 +217,17 @@ protected ClickHouseRowConverter.SerializationConverter createToExternalConverte @FunctionalInterface interface SerializationConverter extends Serializable { /** - * Convert a internal field to to java object and fill into the {@link + * Convert an internal field to java object and fill into the {@link * ClickHousePreparedStatement}. */ - void serialize(RowData rowData, int index, ClickHousePreparedStatement statement) + void serialize(RowData rowData, int index, ClickHouseStatementWrapper statement) throws SQLException; } @FunctionalInterface interface DeserializationConverter extends Serializable { /** - * Convert a object of {@link ClickHouseResultSet} to the internal data structure object. + * Convert an object of {@link ClickHouseResultSet} to the internal data structure object. */ Object deserialize(Object field) throws SQLException; } diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java new file mode 100644 index 0000000..7934ddd --- /dev/null +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java @@ -0,0 +1,85 @@ +package org.apache.flink.connector.clickhouse.internal.converter; + +import ru.yandex.clickhouse.ClickHousePreparedStatement; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.SQLException; +import java.sql.Timestamp; + +/** Wrapper class for ClickHousePreparedStatement. */ +public class ClickHouseStatementWrapper { + public final ClickHousePreparedStatement statement; + + public ClickHouseStatementWrapper(ClickHousePreparedStatement statement) { + this.statement = statement; + } + + public void addBatch() throws SQLException { + statement.addBatch(); + } + + public int[] executeBatch() throws SQLException { + return statement.executeBatch(); + } + + public void close() throws SQLException { + statement.close(); + } + + public void setBoolean(int parameterIndex, boolean x) throws SQLException { + statement.setBoolean(parameterIndex, x); + } + + public void setByte(int parameterIndex, byte x) throws SQLException { + statement.setByte(parameterIndex, x); + } + + public void setShort(int parameterIndex, short x) throws SQLException { + statement.setShort(parameterIndex, x); + } + + public void setInt(int parameterIndex, int x) throws SQLException { + statement.setInt(parameterIndex, x); + } + + public void setLong(int parameterIndex, long x) throws SQLException { + statement.setLong(parameterIndex, x); + } + + public void setFloat(int parameterIndex, float x) throws SQLException { + statement.setFloat(parameterIndex, x); + } + + public void setDouble(int parameterIndex, double x) throws SQLException { + statement.setDouble(parameterIndex, x); + } + + public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + statement.setBigDecimal(parameterIndex, x); + } + + public void setString(int parameterIndex, String x) throws SQLException { + statement.setString(parameterIndex, x); + } + + public void setBytes(int parameterIndex, byte[] x) throws SQLException { + statement.setBytes(parameterIndex, x); + } + + public void setDate(int parameterIndex, Date x) throws SQLException { + statement.setDate(parameterIndex, x); + } + + public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + statement.setTimestamp(parameterIndex, x); + } + + public void setArray(int parameterIndex, Object[] array) throws SQLException { + statement.setArray(parameterIndex, array); + } + + public void setObject(int parameterIndex, Object x) throws SQLException { + statement.setObject(parameterIndex, x); + } +} diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java index e043551..08a8ca9 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java @@ -4,6 +4,7 @@ import org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; +import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions; import org.apache.flink.table.data.RowData; @@ -27,7 +28,7 @@ public class ClickHouseBatchExecutor implements ClickHouseExecutor { private final int maxRetries; - private transient ClickHousePreparedStatement statement; + private transient ClickHouseStatementWrapper statement; private transient ClickHouseConnectionProvider connectionProvider; @@ -40,7 +41,9 @@ public ClickHouseBatchExecutor( @Override public void prepareStatement(ClickHouseConnection connection) throws SQLException { - statement = (ClickHousePreparedStatement) connection.prepareStatement(insertSql); + statement = + new ClickHouseStatementWrapper( + (ClickHousePreparedStatement) connection.prepareStatement(insertSql)); } @Override diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java index a4c5d62..dcb9747 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java @@ -4,6 +4,7 @@ import org.apache.flink.connector.clickhouse.internal.ClickHouseStatementFactory; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; +import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -14,7 +15,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHouseConnection; -import ru.yandex.clickhouse.ClickHousePreparedStatement; import java.io.Serializable; import java.sql.SQLException; @@ -41,7 +41,7 @@ public interface ClickHouseExecutor extends Serializable { void closeStatement(); - default void attemptExecuteBatch(ClickHousePreparedStatement stmt, int maxRetries) + default void attemptExecuteBatch(ClickHouseStatementWrapper stmt, int maxRetries) throws SQLException { for (int i = 0; i <= maxRetries; i++) { try { diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java index 2645e4d..f914077 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java @@ -5,6 +5,7 @@ import org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; +import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions; import org.apache.flink.table.data.RowData; @@ -50,11 +51,11 @@ public class ClickHouseUpsertExecutor implements ClickHouseExecutor { private final boolean ignoreDelete; - private transient ClickHousePreparedStatement insertStmt; + private transient ClickHouseStatementWrapper insertStatement; - private transient ClickHousePreparedStatement updateStmt; + private transient ClickHouseStatementWrapper updateStatement; - private transient ClickHousePreparedStatement deleteStmt; + private transient ClickHouseStatementWrapper deleteStatement; private transient ClickHouseConnectionProvider connectionProvider; @@ -83,9 +84,15 @@ public ClickHouseUpsertExecutor( @Override public void prepareStatement(ClickHouseConnection connection) throws SQLException { - this.insertStmt = (ClickHousePreparedStatement) connection.prepareStatement(this.insertSql); - this.updateStmt = (ClickHousePreparedStatement) connection.prepareStatement(this.updateSql); - this.deleteStmt = (ClickHousePreparedStatement) connection.prepareStatement(this.deleteSql); + this.insertStatement = + new ClickHouseStatementWrapper( + (ClickHousePreparedStatement) connection.prepareStatement(this.insertSql)); + this.updateStatement = + new ClickHouseStatementWrapper( + (ClickHousePreparedStatement) connection.prepareStatement(this.updateSql)); + this.deleteStatement = + new ClickHouseStatementWrapper( + (ClickHousePreparedStatement) connection.prepareStatement(this.deleteSql)); } @Override @@ -102,16 +109,16 @@ public void setRuntimeContext(RuntimeContext context) {} public void addToBatch(RowData record) throws SQLException { switch (record.getRowKind()) { case INSERT: - insertConverter.toExternal(record, insertStmt); - insertStmt.addBatch(); + insertConverter.toExternal(record, insertStatement); + insertStatement.addBatch(); break; case UPDATE_AFTER: if (INSERT.equals(updateStrategy)) { - insertConverter.toExternal(record, insertStmt); - insertStmt.addBatch(); + insertConverter.toExternal(record, insertStatement); + insertStatement.addBatch(); } else if (UPDATE.equals(updateStrategy)) { - updateConverter.toExternal(updateExtractor.apply(record), updateStmt); - updateStmt.addBatch(); + updateConverter.toExternal(updateExtractor.apply(record), updateStatement); + updateStatement.addBatch(); } else if (DISCARD.equals(updateStrategy)) { LOG.debug("Discard a record of type UPDATE_AFTER: {}", record); } else { @@ -120,8 +127,8 @@ public void addToBatch(RowData record) throws SQLException { break; case DELETE: if (!ignoreDelete) { - deleteConverter.toExternal(deleteExtractor.apply(record), deleteStmt); - deleteStmt.addBatch(); + deleteConverter.toExternal(deleteExtractor.apply(record), deleteStatement); + deleteStatement.addBatch(); } break; case UPDATE_BEFORE: @@ -136,21 +143,21 @@ public void addToBatch(RowData record) throws SQLException { @Override public void executeBatch() throws SQLException { - for (ClickHousePreparedStatement clickHousePreparedStatement : - Arrays.asList(insertStmt, updateStmt, deleteStmt)) { - if (clickHousePreparedStatement != null) { - attemptExecuteBatch(clickHousePreparedStatement, maxRetries); + for (ClickHouseStatementWrapper clickHouseStatement : + Arrays.asList(insertStatement, updateStatement, deleteStatement)) { + if (clickHouseStatement != null) { + attemptExecuteBatch(clickHouseStatement, maxRetries); } } } @Override public void closeStatement() { - for (ClickHousePreparedStatement clickHousePreparedStatement : - Arrays.asList(insertStmt, updateStmt, deleteStmt)) { - if (clickHousePreparedStatement != null) { + for (ClickHouseStatementWrapper clickHouseStatement : + Arrays.asList(insertStatement, updateStatement, deleteStatement)) { + if (clickHouseStatement != null) { try { - clickHousePreparedStatement.close(); + clickHouseStatement.close(); } catch (SQLException exception) { LOG.warn("ClickHouse upsert statement could not be closed.", exception); }