diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java index 8f1ac81..f1da09f 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java @@ -1,11 +1,11 @@ package org.apache.flink.connector.clickhouse.internal; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.clickhouse.ClickHouseDynamicTableSource; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseReadOptions; -import org.apache.flink.connector.clickhouse.util.ClickHouseUtil; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.LookupFunction; @@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory; import ru.yandex.clickhouse.ClickHousePreparedStatement; -import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -39,7 +38,6 @@ public class ClickHouseRowDataLookupFunction extends LookupFunction { private final String query; private final ClickHouseConnectionProvider connectionProvider; - private final String[] keyNames; private final int maxRetryTimes; private final ClickHouseRowConverter clickhouseRowConverter; private final ClickHouseRowConverter lookupKeyRowConverter; @@ -58,7 +56,6 @@ public ClickHouseRowDataLookupFunction( checkNotNull(fieldTypes, "No fieldTypes supplied."); checkNotNull(keyNames, "No keyNames supplied."); this.connectionProvider = new ClickHouseConnectionProvider(options); - this.keyNames = keyNames; List nameList = Arrays.asList(fieldNames); DataType[] keyTypes = Arrays.stream(keyNames) @@ -74,7 +71,8 @@ public ClickHouseRowDataLookupFunction( .toArray(DataType[]::new); this.maxRetryTimes = maxRetryTimes; this.query = - ClickHouseUtil.getSelectFromStatement(options.getTableName(), fieldNames, keyNames); + ClickHouseStatementFactory.getSelectWhereStatement( + options.getTableName(), options.getDatabaseName(), fieldNames, keyNames); this.clickhouseRowConverter = new ClickHouseRowConverter(rowType); this.lookupKeyRowConverter = new ClickHouseRowConverter( @@ -85,13 +83,11 @@ public ClickHouseRowDataLookupFunction( } @Override - public void open(FunctionContext context) throws Exception { + public void open(FunctionContext context) { try { establishConnectionAndStatement(); } catch (SQLException sqe) { throw new IllegalArgumentException("open() failed.", sqe); - } catch (ClassNotFoundException cnfe) { - throw new IllegalArgumentException("ClickHouse driver class not found.", cnfe); } } @@ -128,7 +124,7 @@ public Collection lookup(RowData keyRow) { connectionProvider.closeConnections(); establishConnectionAndStatement(); } - } catch (SQLException | ClassNotFoundException exception) { + } catch (SQLException exception) { LOG.error( "ClickHouse connection is not valid, and reestablish connection failed", exception); @@ -146,7 +142,7 @@ public Collection lookup(RowData keyRow) { return Collections.emptyList(); } - private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException { + private void establishConnectionAndStatement() throws SQLException { Connection dbConn = connectionProvider.getOrCreateConnection(); statement = new ClickHouseStatementWrapper( @@ -154,7 +150,7 @@ private void establishConnectionAndStatement() throws SQLException, ClassNotFoun } @Override - public void close() throws IOException { + public void close() { if (statement != null) { try { statement.close(); diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java index 5a0d354..e23a9b2 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseStatementFactory.java @@ -1,11 +1,15 @@ package org.apache.flink.connector.clickhouse.internal; +import org.apache.flink.connector.clickhouse.util.ClickHouseUtil; + import org.apache.commons.lang3.ArrayUtils; import java.util.Arrays; +import java.util.stream.Collectors; import static java.lang.String.format; import static java.util.stream.Collectors.joining; +import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.quoteIdentifier; /** Create an insert/update/delete ClickHouse statement. */ public class ClickHouseStatementFactory { @@ -15,19 +19,32 @@ public class ClickHouseStatementFactory { private ClickHouseStatementFactory() {} public static String getSelectStatement( - String tableName, String databaseName, String[] fieldNames) { - String columns = - Arrays.stream(fieldNames) - .map(ClickHouseStatementFactory::quoteIdentifier) + String tableName, String databaseName, String[] selectFields) { + String selectClause = + Arrays.stream(selectFields) + .map(ClickHouseUtil::quoteIdentifier) .collect(joining(", ")); return String.join( - EMPTY, "SELECT ", columns, " FROM ", fromTableClause(tableName, databaseName)); + EMPTY, "SELECT ", selectClause, " FROM ", fromTableClause(tableName, databaseName)); + } + + public static String getSelectWhereStatement( + String tableName, + String databaseName, + String[] selectFields, + String[] conditionFields) { + String selectStatement = getSelectStatement(tableName, databaseName, selectFields); + String whereClause = + Arrays.stream(conditionFields) + .map(f -> format("%s = ?", quoteIdentifier(f))) + .collect(Collectors.joining(" AND ")); + return selectStatement + (conditionFields.length > 0 ? " WHERE " + whereClause : ""); } public static String getInsertIntoStatement(String tableName, String[] fieldNames) { String columns = Arrays.stream(fieldNames) - .map(ClickHouseStatementFactory::quoteIdentifier) + .map(ClickHouseUtil::quoteIdentifier) .collect(joining(", ")); String placeholders = Arrays.stream(fieldNames).map((f) -> "?").collect(joining(", ")); return String.join( @@ -101,8 +118,4 @@ private static String fromTableClause(String tableName, String databaseName) { return format("%s.%s", quoteIdentifier(databaseName), quoteIdentifier(tableName)); } - - private static String quoteIdentifier(String identifier) { - return String.join(EMPTY, "`", identifier, "`"); - } } diff --git a/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java b/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java index 0e26b16..95a11d1 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java @@ -13,13 +13,10 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.stream.Collectors; -import static java.lang.String.format; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.flink.connector.clickhouse.config.ClickHouseConfig.PROPERTIES_PREFIX; @@ -98,21 +95,4 @@ private static Expression parseFunctionExpr(String shardingExpr) { Expression expression = parseFunctionExpr(subExprLiteral); return FunctionExpr.of(functionName, singletonList(expression)); } - - public static String getSelectFromStatement( - String tableName, String[] selectFields, String[] conditionFields) { - String selectExpressions = - Arrays.stream(selectFields) - .map(ClickHouseUtil::quoteIdentifier) - .collect(Collectors.joining(", ")); - String fieldExpressions = - Arrays.stream(conditionFields) - .map(f -> format("%s = ?", quoteIdentifier(f))) - .collect(Collectors.joining(" AND ")); - return "SELECT " - + selectExpressions - + " FROM " - + quoteIdentifier(tableName) - + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : ""); - } }