Skip to content

Commit

Permalink
Support for Lookup Join Function
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyfee authored and itinycheng committed Apr 17, 2023
1 parent 7c02a59 commit 430e5bf
Show file tree
Hide file tree
Showing 7 changed files with 504 additions and 208 deletions.
418 changes: 212 additions & 206 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;

import javax.annotation.Nullable;

import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -83,7 +88,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
Properties clickHouseProperties =
getClickHouseProperties(context.getCatalogTable().getOptions());
return new ClickHouseDynamicTableSource(
getReadOptions(config), clickHouseProperties, context.getPhysicalRowDataType());
getReadOptions(config),
helper.getOptions().get(LookupOptions.MAX_RETRIES),
getLookupCache(config),
clickHouseProperties,
context.getPhysicalRowDataType());
}

@Override
Expand Down Expand Up @@ -120,6 +129,12 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(SCAN_PARTITION_NUM);
optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
optionalOptions.add(LookupOptions.CACHE_TYPE);
optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
optionalOptions.add(LookupOptions.MAX_RETRIES);
return optionalOptions;
}

Expand All @@ -135,12 +150,25 @@ private void validateConfigOptions(ReadableConfig config) {
^ config.getOptional(PASSWORD).isPresent()) {
throw new IllegalArgumentException(
"Either all or none of username and password should be provided");
} else if (!config.get(LookupOptions.CACHE_TYPE).equals(LookupOptions.LookupCacheType.NONE)
&& !config.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
throw new IllegalArgumentException(
String.format(
"The value of '%s' option should be 'NONE' or 'PARTIAL'(not support 'FULL' yet), but is %s.",
LookupOptions.CACHE_TYPE.key(), config.get(LookupOptions.CACHE_TYPE)));
} else if (config.getOptional(SCAN_PARTITION_COLUMN).isPresent()
^ config.getOptional(SCAN_PARTITION_NUM).isPresent()
^ config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
^ config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
throw new IllegalArgumentException(
"Either all or none of partition configs should be provided");
} else if (config.get(LookupOptions.MAX_RETRIES) < 0) {
throw new IllegalArgumentException(
String.format(
"The value of '%s' option shouldn't be negative, but is %s.",
LookupOptions.MAX_RETRIES.key(),
config.get(LookupOptions.MAX_RETRIES)));
}
}

Expand Down Expand Up @@ -178,4 +206,15 @@ private ClickHouseReadOptions getReadOptions(ReadableConfig config) {
.withPartitionUpperBound(config.get(SCAN_PARTITION_UPPER_BOUND))
.build();
}

@Nullable
private LookupCache getLookupCache(ReadableConfig tableOptions) {
LookupCache cache = null;
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
cache = DefaultLookupCache.fromConfig(tableOptions);
}
return cache;
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package org.apache.flink.connector.clickhouse;

import org.apache.flink.connector.clickhouse.internal.AbstractClickHouseInputFormat;
import org.apache.flink.connector.clickhouse.internal.ClickHouseRowDataLookupFunction;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseReadOptions;
import org.apache.flink.connector.clickhouse.util.FilterPushDownHelper;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -21,6 +30,7 @@
/** ClickHouse table source. */
public class ClickHouseDynamicTableSource
implements ScanTableSource,
LookupTableSource,
SupportsProjectionPushDown,
SupportsLimitPushDown,
SupportsFilterPushDown {
Expand All @@ -29,6 +39,10 @@ public class ClickHouseDynamicTableSource

private final Properties connectionProperties;

private final int lookupMaxRetryTimes;

@Nullable private final LookupCache cache;

private DataType physicalRowDataType;

private String filterClause;
Expand All @@ -37,10 +51,14 @@ public class ClickHouseDynamicTableSource

public ClickHouseDynamicTableSource(
ClickHouseReadOptions readOptions,
int lookupMaxRetryTimes,
@Nullable LookupCache cache,
Properties properties,
DataType physicalRowDataType) {
this.readOptions = readOptions;
this.connectionProperties = properties;
this.lookupMaxRetryTimes = lookupMaxRetryTimes;
this.cache = cache;
this.physicalRowDataType = physicalRowDataType;
}

Expand All @@ -49,6 +67,32 @@ public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// ClickHouse only support non-nested look up keys
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(
innerKeyArr.length == 1, "ClickHouse only support non-nested look up keys");
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
}
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
ClickHouseRowDataLookupFunction lookupFunction =
new ClickHouseRowDataLookupFunction(
readOptions,
lookupMaxRetryTimes,
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
rowType);
if (cache != null) {
return PartialCachingLookupProvider.of(lookupFunction, cache);
} else {
return LookupFunctionProvider.of(lookupFunction);
}
}

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
AbstractClickHouseInputFormat.Builder builder =
Expand All @@ -71,7 +115,11 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
public DynamicTableSource copy() {
ClickHouseDynamicTableSource source =
new ClickHouseDynamicTableSource(
readOptions, connectionProperties, physicalRowDataType);
readOptions,
lookupMaxRetryTimes,
cache,
connectionProperties,
physicalRowDataType);
source.filterClause = filterClause;
source.limit = limit;
return source;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package org.apache.flink.connector.clickhouse.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseReadOptions;
import org.apache.flink.connector.clickhouse.util.ClickHouseUtil;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHousePreparedStatement;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A lookup function for {@link ClickHouseDynamicTableSource}. */
@Internal
public class ClickHouseRowDataLookupFunction extends LookupFunction {

private static final Logger LOG =
LoggerFactory.getLogger(ClickHouseRowDataLookupFunction.class);

private final String query;
private final ClickHouseConnectionProvider connectionProvider;
private final String[] keyNames;
private final int maxRetryTimes;
private final ClickHouseRowConverter clickhouseRowConverter;
private final ClickHouseRowConverter lookupKeyRowConverter;

private transient ClickHouseStatementWrapper statement;

public ClickHouseRowDataLookupFunction(
ClickHouseReadOptions options,
int maxRetryTimes,
String[] fieldNames,
DataType[] fieldTypes,
String[] keyNames,
RowType rowType) {
checkNotNull(options, "No ClickHouseOptions supplied.");
checkNotNull(fieldNames, "No fieldNames supplied.");
checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied.");
this.connectionProvider = new ClickHouseConnectionProvider(options);
this.keyNames = keyNames;
List<String> nameList = Arrays.asList(fieldNames);
DataType[] keyTypes =
Arrays.stream(keyNames)
.map(
s -> {
checkArgument(
nameList.contains(s),
"keyName %s can't find in fieldNames %s.",
s,
nameList);
return fieldTypes[nameList.indexOf(s)];
})
.toArray(DataType[]::new);
this.maxRetryTimes = maxRetryTimes;
this.query =
ClickHouseUtil.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
this.clickhouseRowConverter = new ClickHouseRowConverter(rowType);
this.lookupKeyRowConverter =
new ClickHouseRowConverter(
RowType.of(
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new)));
}

@Override
public void open(FunctionContext context) throws Exception {
try {
establishConnectionAndStatement();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("ClickHouse driver class not found.", cnfe);
}
}

/**
* This is a lookup method which is called by Flink framework in runtime.
*
* @param keyRow lookup keys
*/
@Override
public Collection<RowData> lookup(RowData keyRow) {
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
statement.clearParameters();
lookupKeyRowConverter.toExternal(keyRow, statement);
try (ResultSet resultSet = statement.executeQuery()) {
ArrayList<RowData> rows = new ArrayList<>();
while (resultSet.next()) {
RowData row = clickhouseRowConverter.toInternal(resultSet);
rows.add(row);
}
rows.trimToSize();
return rows;
}
} catch (SQLException e) {
LOG.error(
String.format("ClickHouse executeBatch error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of ClickHouse statement failed.", e);
}

try {
if (!connectionProvider.isConnectionValid()) {
statement.close();
connectionProvider.closeConnections();
establishConnectionAndStatement();
}
} catch (SQLException | ClassNotFoundException exception) {
LOG.error(
"ClickHouse connection is not valid, and reestablish connection failed",
exception);
throw new RuntimeException(
"Reestablish ClickHouse connection failed", exception);
}

try {
Thread.sleep(1000L * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
return Collections.emptyList();
}

private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
Connection dbConn = connectionProvider.getOrCreateConnection();
statement =
new ClickHouseStatementWrapper(
(ClickHousePreparedStatement) dbConn.prepareStatement(query));
}

@Override
public void close() throws IOException {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
LOG.info("ClickHouse statement could not be closed: " + e.getMessage());
} finally {
statement = null;
}
}

connectionProvider.closeConnections();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public ClickHouseConnectionProvider(
this.connectionProperties = connectionProperties;
}

public boolean isConnectionValid() throws SQLException {
return connection != null;
}

public synchronized ClickHouseConnection getOrCreateConnection() throws SQLException {
if (connection == null) {
connection = createConnection(options.getUrl(), options.getDatabaseName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;

Expand Down Expand Up @@ -82,4 +83,12 @@ public void setArray(int parameterIndex, Object[] array) throws SQLException {
public void setObject(int parameterIndex, Object x) throws SQLException {
statement.setObject(parameterIndex, x);
}

public void clearParameters() throws SQLException {
statement.clearParameters();
}

public ResultSet executeQuery() throws SQLException {
return statement.executeQuery();
}
}
Loading

0 comments on commit 430e5bf

Please sign in to comment.