Skip to content

Commit

Permalink
Optimize the code of lookup function
Browse files Browse the repository at this point in the history
  • Loading branch information
itinycheng committed May 24, 2023
1 parent e2994e1 commit 1076c7c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.apache.flink.connector.clickhouse.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.clickhouse.ClickHouseDynamicTableSource;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseStatementWrapper;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseReadOptions;
import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
Expand All @@ -17,7 +17,6 @@
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHousePreparedStatement;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -39,7 +38,6 @@ public class ClickHouseRowDataLookupFunction extends LookupFunction {

private final String query;
private final ClickHouseConnectionProvider connectionProvider;
private final String[] keyNames;
private final int maxRetryTimes;
private final ClickHouseRowConverter clickhouseRowConverter;
private final ClickHouseRowConverter lookupKeyRowConverter;
Expand All @@ -58,7 +56,6 @@ public ClickHouseRowDataLookupFunction(
checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied.");
this.connectionProvider = new ClickHouseConnectionProvider(options);
this.keyNames = keyNames;
List<String> nameList = Arrays.asList(fieldNames);
DataType[] keyTypes =
Arrays.stream(keyNames)
Expand All @@ -74,7 +71,8 @@ public ClickHouseRowDataLookupFunction(
.toArray(DataType[]::new);
this.maxRetryTimes = maxRetryTimes;
this.query =
ClickHouseUtil.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
ClickHouseStatementFactory.getSelectWhereStatement(
options.getTableName(), options.getDatabaseName(), fieldNames, keyNames);
this.clickhouseRowConverter = new ClickHouseRowConverter(rowType);
this.lookupKeyRowConverter =
new ClickHouseRowConverter(
Expand All @@ -85,13 +83,11 @@ public ClickHouseRowDataLookupFunction(
}

@Override
public void open(FunctionContext context) throws Exception {
public void open(FunctionContext context) {
try {
establishConnectionAndStatement();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("ClickHouse driver class not found.", cnfe);
}
}

Expand Down Expand Up @@ -128,7 +124,7 @@ public Collection<RowData> lookup(RowData keyRow) {
connectionProvider.closeConnections();
establishConnectionAndStatement();
}
} catch (SQLException | ClassNotFoundException exception) {
} catch (SQLException exception) {
LOG.error(
"ClickHouse connection is not valid, and reestablish connection failed",
exception);
Expand All @@ -146,15 +142,15 @@ public Collection<RowData> lookup(RowData keyRow) {
return Collections.emptyList();
}

private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
private void establishConnectionAndStatement() throws SQLException {
Connection dbConn = connectionProvider.getOrCreateConnection();
statement =
new ClickHouseStatementWrapper(
(ClickHousePreparedStatement) dbConn.prepareStatement(query));
}

@Override
public void close() throws IOException {
public void close() {
if (statement != null) {
try {
statement.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package org.apache.flink.connector.clickhouse.internal;

import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;

import org.apache.commons.lang3.ArrayUtils;

import java.util.Arrays;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.util.stream.Collectors.joining;
import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.quoteIdentifier;

/** Create an insert/update/delete ClickHouse statement. */
public class ClickHouseStatementFactory {
Expand All @@ -15,19 +19,32 @@ public class ClickHouseStatementFactory {
private ClickHouseStatementFactory() {}

public static String getSelectStatement(
String tableName, String databaseName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(ClickHouseStatementFactory::quoteIdentifier)
String tableName, String databaseName, String[] selectFields) {
String selectClause =
Arrays.stream(selectFields)
.map(ClickHouseUtil::quoteIdentifier)
.collect(joining(", "));
return String.join(
EMPTY, "SELECT ", columns, " FROM ", fromTableClause(tableName, databaseName));
EMPTY, "SELECT ", selectClause, " FROM ", fromTableClause(tableName, databaseName));
}

public static String getSelectWhereStatement(
String tableName,
String databaseName,
String[] selectFields,
String[] conditionFields) {
String selectStatement = getSelectStatement(tableName, databaseName, selectFields);
String whereClause =
Arrays.stream(conditionFields)
.map(f -> format("%s = ?", quoteIdentifier(f)))
.collect(Collectors.joining(" AND "));
return selectStatement + (conditionFields.length > 0 ? " WHERE " + whereClause : "");
}

public static String getInsertIntoStatement(String tableName, String[] fieldNames) {
String columns =
Arrays.stream(fieldNames)
.map(ClickHouseStatementFactory::quoteIdentifier)
.map(ClickHouseUtil::quoteIdentifier)
.collect(joining(", "));
String placeholders = Arrays.stream(fieldNames).map((f) -> "?").collect(joining(", "));
return String.join(
Expand Down Expand Up @@ -101,8 +118,4 @@ private static String fromTableClause(String tableName, String databaseName) {

return format("%s.%s", quoteIdentifier(databaseName), quoteIdentifier(tableName));
}

private static String quoteIdentifier(String identifier) {
return String.join(EMPTY, "`", identifier, "`");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.flink.connector.clickhouse.config.ClickHouseConfig.PROPERTIES_PREFIX;
Expand Down Expand Up @@ -98,21 +95,4 @@ private static Expression parseFunctionExpr(String shardingExpr) {
Expression expression = parseFunctionExpr(subExprLiteral);
return FunctionExpr.of(functionName, singletonList(expression));
}

public static String getSelectFromStatement(
String tableName, String[] selectFields, String[] conditionFields) {
String selectExpressions =
Arrays.stream(selectFields)
.map(ClickHouseUtil::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions =
Arrays.stream(conditionFields)
.map(f -> format("%s = ?", quoteIdentifier(f)))
.collect(Collectors.joining(" AND "));
return "SELECT "
+ selectExpressions
+ " FROM "
+ quoteIdentifier(tableName)
+ (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}
}

0 comments on commit 1076c7c

Please sign in to comment.