From 430e5bf3cf50317e171e21456fdb71e7a6c57f74 Mon Sep 17 00:00:00 2001 From: Lyfee <1305633643@qq.com> Date: Mon, 10 Apr 2023 08:28:21 +0000 Subject: [PATCH] Support for Lookup Join Function --- README.md | 418 +++++++++--------- .../ClickHouseDynamicTableFactory.java | 41 +- .../ClickHouseDynamicTableSource.java | 50 ++- .../ClickHouseRowDataLookupFunction.java | 170 +++++++ .../ClickHouseConnectionProvider.java | 4 + .../converter/ClickHouseStatementWrapper.java | 9 + .../clickhouse/util/ClickHouseUtil.java | 20 + 7 files changed, 504 insertions(+), 208 deletions(-) create mode 100644 src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java diff --git a/README.md b/README.md index d04b430..3510bed 100644 --- a/README.md +++ b/README.md @@ -1,206 +1,212 @@ -# Flink ClickHouse Connector - -[Flink](https://github.com/apache/flink) SQL connector -for [ClickHouse](https://github.com/yandex/ClickHouse) database, this project Powered -by [ClickHouse JDBC](https://github.com/ClickHouse/clickhouse-jdbc). - -Currently, the project supports `Source/Sink Table` and `Flink Catalog`. -Please create issues if you encounter bugs and any help for the project is greatly appreciated. - -## Connector Options - -| Option | Required | Default | Type | Description | -|:-----------------------------------|:---------|:---------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | required | none | String | The ClickHouse jdbc url in format `clickhouse://:`. | -| username | optional | none | String | The 'username' and 'password' must both be specified if any of them is specified. | -| password | optional | none | String | The ClickHouse password. | -| database-name | optional | default | String | The ClickHouse database name. | -| table-name | required | none | String | The ClickHouse table name. | -| use-local | optional | false | Boolean | Directly read/write local tables in case of distributed table engine. | -| sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data. | -| sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data. | -| sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed. | -| ~~sink.write-local~~ | optional | false | Boolean | Removed from version 1.15, use `use-local` instead. | -| sink.update-strategy | optional | update | String | Convert a record of type UPDATE_AFTER to update/insert statement or just discard it, available: update, insert, discard. | -| sink.partition-strategy | optional | balanced | String | Partition strategy: balanced(round-robin), hash(partition key), shuffle(random). | -| sink.partition-key | optional | none | String | Partition key used for hash strategy. | -| sink.sharding.use-table-definition | optional | false | Boolean | Sharding strategy consistent with definition of distributed table, if set to true, the configuration of `sink.partition-strategy` and `sink.partition-key` will be overwritten. | -| sink.ignore-delete | optional | true | Boolean | Whether to ignore delete statements. | -| sink.parallelism | optional | none | Integer | Defines a custom parallelism for the sink. | -| scan.partition.column | optional | none | String | The column name used for partitioning the input. | -| scan.partition.num | optional | none | Integer | The number of partitions. | -| scan.partition.lower-bound | optional | none | Long | The smallest value of the first partition. | -| scan.partition.upper-bound | optional | none | Long | The largest value of the last partition. | -| catalog.ignore-primary-key | optional | true | Boolean | Whether to ignore primary keys when using ClickHouseCatalog to create table. | -| properties.* | optional | none | String | This can set and pass `clickhouse-jdbc` configurations. | - -**Update/Delete Data Considerations:** - -1. Distributed table don't support the update/delete statements, if you want to use the - update/delete statements, please be sure to write records to local table or set `use-local` to - true. -2. The data is updated and deleted by the primary key, please be aware of this when using it in the - partition table. - -**breaking** - -Since version 1.16, we have taken shard weight into consideration, this may affect which shard the data is distributed to. - -## Data Type Mapping - -| Flink Type | ClickHouse Type | -| :------------------ |:--------------------------------------------------------| -| CHAR | String | -| VARCHAR | String / IP / UUID | -| STRING | String / Enum | -| BOOLEAN | UInt8 | -| BYTES | FixedString | -| DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 | -| TINYINT | Int8 | -| SMALLINT | Int16 / UInt8 | -| INTEGER | Int32 / UInt16 / Interval | -| BIGINT | Int64 / UInt32 | -| FLOAT | Float32 | -| DOUBLE | Float64 | -| DATE | Date | -| TIME | DateTime | -| TIMESTAMP | DateTime | -| TIMESTAMP_LTZ | DateTime | -| INTERVAL_YEAR_MONTH | Int32 | -| INTERVAL_DAY_TIME | Int64 | -| ARRAY | Array | -| MAP | Map | -| ROW | Not supported | -| MULTISET | Not supported | -| RAW | Not supported | - -## Maven Dependency - -The project isn't published to the maven central repository, we need to deploy/install to our own -repository before use it, step as follows: - -```bash -# clone the project -git clone https://github.com/itinycheng/flink-connector-clickhouse.git - -# enter the project directory -cd flink-connector-clickhouse/ - -# display remote branches -git branch -r - -# checkout the branch you need -git checkout $branch_name - -# install or deploy the project to our own repository -mvn clean install -DskipTests -mvn clean deploy -DskipTests -``` - -```xml - - - org.apache.flink - flink-connector-clickhouse - 1.16.0-SNAPSHOT - -``` - -## How to use - -### Create and read/write table - -```SQL - --- register a clickhouse table `t_user` in flink sql. -CREATE TABLE t_user ( - `user_id` BIGINT, - `user_type` INTEGER, - `language` STRING, - `country` STRING, - `gender` STRING, - `score` DOUBLE, - `list` ARRAY, - `map` Map, - PRIMARY KEY (`user_id`) NOT ENFORCED -) WITH ( - 'connector' = 'clickhouse', - 'url' = 'clickhouse://{ip}:{port}', - 'database-name' = 'tutorial', - 'table-name' = 'users', - 'sink.batch-size' = '500', - 'sink.flush-interval' = '1000', - 'sink.max-retries' = '3' -); - --- read data from clickhouse -SELECT user_id, user_type from t_user; - --- write data into the clickhouse table from the table `T` -INSERT INTO t_user -SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP) FROM T; - -``` - -### Create and use ClickHouseCatalog - -#### Scala - -```scala -val tEnv = TableEnvironment.create(setting) - -val props = new util.HashMap[String, String]() -props.put(ClickHouseConfig.DATABASE_NAME, "default") -props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123") -props.put(ClickHouseConfig.USERNAME, "username") -props.put(ClickHouseConfig.PASSWORD, "password") -props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s") -val cHcatalog = new ClickHouseCatalog("clickhouse", props) -tEnv.registerCatalog("clickhouse", cHcatalog) -tEnv.useCatalog("clickhouse") - -tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select..."); -``` - -#### Java - -```java -TableEnvironment tEnv = TableEnvironment.create(setting); - -Map props = new HashMap<>(); -props.put(ClickHouseConfig.DATABASE_NAME, "default") -props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123") -props.put(ClickHouseConfig.USERNAME, "username") -props.put(ClickHouseConfig.PASSWORD, "password") -props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s"); -Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props); -tEnv.registerCatalog("clickhouse", cHcatalog); -tEnv.useCatalog("clickhouse"); - -tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select..."); -``` - -#### SQL - -```sql -> CREATE CATALOG clickhouse WITH ( - 'type' = 'clickhouse', - 'url' = 'clickhouse://127.0.0.1:8123', - 'username' = 'username', - 'password' = 'password', - 'database-name' = 'default', - 'use-local' = 'false', - ... -); - -> USE CATALOG clickhouse; -> SELECT user_id, user_type FROM `default`.`t_user` limit 10; -> INSERT INTO `default`.`t_user` SELECT ...; -``` - -## Roadmap - -- [x] Implement the Flink SQL Sink function. -- [x] Support array and Map types. -- [x] Support ClickHouseCatalog. -- [x] Implement the Flink SQL Source function. +# Flink ClickHouse Connector + +[Flink](https://github.com/apache/flink) SQL connector +for [ClickHouse](https://github.com/yandex/ClickHouse) database, this project Powered +by [ClickHouse JDBC](https://github.com/ClickHouse/clickhouse-jdbc). + +Currently, the project supports `Source/Sink Table` and `Flink Catalog`. +Please create issues if you encounter bugs and any help for the project is greatly appreciated. + +## Connector Options + +| Option | Required | Default | Type | Description | +|:-----------------------------------------|:---------|:---------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | required | none | String | The ClickHouse jdbc url in format `clickhouse://:`. | +| username | optional | none | String | The 'username' and 'password' must both be specified if any of them is specified. | +| password | optional | none | String | The ClickHouse password. | +| database-name | optional | default | String | The ClickHouse database name. | +| table-name | required | none | String | The ClickHouse table name. | +| use-local | optional | false | Boolean | Directly read/write local tables in case of distributed table engine. | +| sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data. | +| sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data. | +| sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed. | +| ~~sink.write-local~~ | optional | false | Boolean | Removed from version 1.15, use `use-local` instead. | +| sink.update-strategy | optional | update | String | Convert a record of type UPDATE_AFTER to update/insert statement or just discard it, available: update, insert, discard. | +| sink.partition-strategy | optional | balanced | String | Partition strategy: balanced(round-robin), hash(partition key), shuffle(random). | +| sink.partition-key | optional | none | String | Partition key used for hash strategy. | +| sink.sharding.use-table-definition | optional | false | Boolean | Sharding strategy consistent with definition of distributed table, if set to true, the configuration of `sink.partition-strategy` and `sink.partition-key` will be overwritten. | +| sink.ignore-delete | optional | true | Boolean | Whether to ignore delete statements. | +| sink.parallelism | optional | none | Integer | Defines a custom parallelism for the sink. | +| scan.partition.column | optional | none | String | The column name used for partitioning the input. | +| scan.partition.num | optional | none | Integer | The number of partitions. | +| scan.partition.lower-bound | optional | none | Long | The smallest value of the first partition. | +| scan.partition.upper-bound | optional | none | Long | The largest value of the last partition. | +| catalog.ignore-primary-key | optional | true | Boolean | Whether to ignore primary keys when using ClickHouseCatalog to create table. | +| properties.* | optional | none | String | This can set and pass `clickhouse-jdbc` configurations. | +| lookup.cache | optional | NONE | String | The caching strategy for this lookup table, including NONE and PARTIAL(not support FULL yet) | +| lookup.partial-cache.expire-after-access | optional | none | Duration | Duration to expire an entry in the cache after accessing, over this time, the oldest rows will be expired. | +| lookup.partial-cache.expire-after-write | optional | none | Duration | Duration to expire an entry in the cache after writing, over this time, the oldest rows will be expired. | +| lookup.partial-cache.max-rows | optional | none | Long | The max number of rows of lookup cache, over this value, the oldest rows will be expired. | +| lookup.partial-cache.caching-missing-key | optional | true | Boolean | Flag to cache missing key, true by default | +| lookup.max-retries | optional | 3 | Integer | The max retry times if lookup database failed. | + +**Update/Delete Data Considerations:** + +1. Distributed table don't support the update/delete statements, if you want to use the + update/delete statements, please be sure to write records to local table or set `use-local` to + true. +2. The data is updated and deleted by the primary key, please be aware of this when using it in the + partition table. + +**breaking** + +Since version 1.16, we have taken shard weight into consideration, this may affect which shard the data is distributed to. + +## Data Type Mapping + +| Flink Type | ClickHouse Type | +| :------------------ |:--------------------------------------------------------| +| CHAR | String | +| VARCHAR | String / IP / UUID | +| STRING | String / Enum | +| BOOLEAN | UInt8 | +| BYTES | FixedString | +| DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 | +| TINYINT | Int8 | +| SMALLINT | Int16 / UInt8 | +| INTEGER | Int32 / UInt16 / Interval | +| BIGINT | Int64 / UInt32 | +| FLOAT | Float32 | +| DOUBLE | Float64 | +| DATE | Date | +| TIME | DateTime | +| TIMESTAMP | DateTime | +| TIMESTAMP_LTZ | DateTime | +| INTERVAL_YEAR_MONTH | Int32 | +| INTERVAL_DAY_TIME | Int64 | +| ARRAY | Array | +| MAP | Map | +| ROW | Not supported | +| MULTISET | Not supported | +| RAW | Not supported | + +## Maven Dependency + +The project isn't published to the maven central repository, we need to deploy/install to our own +repository before use it, step as follows: + +```bash +# clone the project +git clone https://github.com/itinycheng/flink-connector-clickhouse.git + +# enter the project directory +cd flink-connector-clickhouse/ + +# display remote branches +git branch -r + +# checkout the branch you need +git checkout $branch_name + +# install or deploy the project to our own repository +mvn clean install -DskipTests +mvn clean deploy -DskipTests +``` + +```xml + + + org.apache.flink + flink-connector-clickhouse + 1.16.0-SNAPSHOT + +``` + +## How to use + +### Create and read/write table + +```SQL + +-- register a clickhouse table `t_user` in flink sql. +CREATE TABLE t_user ( + `user_id` BIGINT, + `user_type` INTEGER, + `language` STRING, + `country` STRING, + `gender` STRING, + `score` DOUBLE, + `list` ARRAY, + `map` Map, + PRIMARY KEY (`user_id`) NOT ENFORCED +) WITH ( + 'connector' = 'clickhouse', + 'url' = 'clickhouse://{ip}:{port}', + 'database-name' = 'tutorial', + 'table-name' = 'users', + 'sink.batch-size' = '500', + 'sink.flush-interval' = '1000', + 'sink.max-retries' = '3' +); + +-- read data from clickhouse +SELECT user_id, user_type from t_user; + +-- write data into the clickhouse table from the table `T` +INSERT INTO t_user +SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP) FROM T; + +``` + +### Create and use ClickHouseCatalog + +#### Scala + +```scala +val tEnv = TableEnvironment.create(setting) + +val props = new util.HashMap[String, String]() +props.put(ClickHouseConfig.DATABASE_NAME, "default") +props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123") +props.put(ClickHouseConfig.USERNAME, "username") +props.put(ClickHouseConfig.PASSWORD, "password") +props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s") +val cHcatalog = new ClickHouseCatalog("clickhouse", props) +tEnv.registerCatalog("clickhouse", cHcatalog) +tEnv.useCatalog("clickhouse") + +tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select..."); +``` + +#### Java + +```java +TableEnvironment tEnv = TableEnvironment.create(setting); + +Map props = new HashMap<>(); +props.put(ClickHouseConfig.DATABASE_NAME, "default") +props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123") +props.put(ClickHouseConfig.USERNAME, "username") +props.put(ClickHouseConfig.PASSWORD, "password") +props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s"); +Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props); +tEnv.registerCatalog("clickhouse", cHcatalog); +tEnv.useCatalog("clickhouse"); + +tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select..."); +``` + +#### SQL + +```sql +> CREATE CATALOG clickhouse WITH ( + 'type' = 'clickhouse', + 'url' = 'clickhouse://127.0.0.1:8123', + 'username' = 'username', + 'password' = 'password', + 'database-name' = 'default', + 'use-local' = 'false', + ... +); + +> USE CATALOG clickhouse; +> SELECT user_id, user_type FROM `default`.`t_user` limit 10; +> INSERT INTO `default`.`t_user` SELECT ...; +``` + +## Roadmap + +- [x] Implement the Flink SQL Sink function. +- [x] Support array and Map types. +- [x] Support ClickHouseCatalog. +- [x] Implement the Flink SQL Source function. diff --git a/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableFactory.java b/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableFactory.java index ecb0d62..9551a85 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableFactory.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableFactory.java @@ -9,11 +9,16 @@ import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.lookup.LookupOptions; +import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; +import javax.annotation.Nullable; + import java.util.HashSet; import java.util.Properties; import java.util.Set; @@ -83,7 +88,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { Properties clickHouseProperties = getClickHouseProperties(context.getCatalogTable().getOptions()); return new ClickHouseDynamicTableSource( - getReadOptions(config), clickHouseProperties, context.getPhysicalRowDataType()); + getReadOptions(config), + helper.getOptions().get(LookupOptions.MAX_RETRIES), + getLookupCache(config), + clickHouseProperties, + context.getPhysicalRowDataType()); } @Override @@ -120,6 +129,12 @@ public Set> optionalOptions() { optionalOptions.add(SCAN_PARTITION_NUM); optionalOptions.add(SCAN_PARTITION_LOWER_BOUND); optionalOptions.add(SCAN_PARTITION_UPPER_BOUND); + optionalOptions.add(LookupOptions.CACHE_TYPE); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS); + optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY); + optionalOptions.add(LookupOptions.MAX_RETRIES); return optionalOptions; } @@ -135,12 +150,25 @@ private void validateConfigOptions(ReadableConfig config) { ^ config.getOptional(PASSWORD).isPresent()) { throw new IllegalArgumentException( "Either all or none of username and password should be provided"); + } else if (!config.get(LookupOptions.CACHE_TYPE).equals(LookupOptions.LookupCacheType.NONE) + && !config.get(LookupOptions.CACHE_TYPE) + .equals(LookupOptions.LookupCacheType.PARTIAL)) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option should be 'NONE' or 'PARTIAL'(not support 'FULL' yet), but is %s.", + LookupOptions.CACHE_TYPE.key(), config.get(LookupOptions.CACHE_TYPE))); } else if (config.getOptional(SCAN_PARTITION_COLUMN).isPresent() ^ config.getOptional(SCAN_PARTITION_NUM).isPresent() ^ config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent() ^ config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) { throw new IllegalArgumentException( "Either all or none of partition configs should be provided"); + } else if (config.get(LookupOptions.MAX_RETRIES) < 0) { + throw new IllegalArgumentException( + String.format( + "The value of '%s' option shouldn't be negative, but is %s.", + LookupOptions.MAX_RETRIES.key(), + config.get(LookupOptions.MAX_RETRIES))); } } @@ -178,4 +206,15 @@ private ClickHouseReadOptions getReadOptions(ReadableConfig config) { .withPartitionUpperBound(config.get(SCAN_PARTITION_UPPER_BOUND)) .build(); } + + @Nullable + private LookupCache getLookupCache(ReadableConfig tableOptions) { + LookupCache cache = null; + if (tableOptions + .get(LookupOptions.CACHE_TYPE) + .equals(LookupOptions.LookupCacheType.PARTIAL)) { + cache = DefaultLookupCache.fromConfig(tableOptions); + } + return cache; + } } diff --git a/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableSource.java b/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableSource.java index e354ba7..fd31f24 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableSource.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/ClickHouseDynamicTableSource.java @@ -1,18 +1,27 @@ package org.apache.flink.connector.clickhouse; import org.apache.flink.connector.clickhouse.internal.AbstractClickHouseInputFormat; +import org.apache.flink.connector.clickhouse.internal.ClickHouseRowDataLookupFunction; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseReadOptions; import org.apache.flink.connector.clickhouse.util.FilterPushDownHelper; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.InputFormatProvider; +import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; +import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; @@ -21,6 +30,7 @@ /** ClickHouse table source. */ public class ClickHouseDynamicTableSource implements ScanTableSource, + LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown, SupportsFilterPushDown { @@ -29,6 +39,10 @@ public class ClickHouseDynamicTableSource private final Properties connectionProperties; + private final int lookupMaxRetryTimes; + + @Nullable private final LookupCache cache; + private DataType physicalRowDataType; private String filterClause; @@ -37,10 +51,14 @@ public class ClickHouseDynamicTableSource public ClickHouseDynamicTableSource( ClickHouseReadOptions readOptions, + int lookupMaxRetryTimes, + @Nullable LookupCache cache, Properties properties, DataType physicalRowDataType) { this.readOptions = readOptions; this.connectionProperties = properties; + this.lookupMaxRetryTimes = lookupMaxRetryTimes; + this.cache = cache; this.physicalRowDataType = physicalRowDataType; } @@ -49,6 +67,32 @@ public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } + @Override + public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { + // ClickHouse only support non-nested look up keys + String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { + int[] innerKeyArr = context.getKeys()[i]; + Preconditions.checkArgument( + innerKeyArr.length == 1, "ClickHouse only support non-nested look up keys"); + keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); + } + final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); + ClickHouseRowDataLookupFunction lookupFunction = + new ClickHouseRowDataLookupFunction( + readOptions, + lookupMaxRetryTimes, + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), + DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), + keyNames, + rowType); + if (cache != null) { + return PartialCachingLookupProvider.of(lookupFunction, cache); + } else { + return LookupFunctionProvider.of(lookupFunction); + } + } + @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { AbstractClickHouseInputFormat.Builder builder = @@ -71,7 +115,11 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon public DynamicTableSource copy() { ClickHouseDynamicTableSource source = new ClickHouseDynamicTableSource( - readOptions, connectionProperties, physicalRowDataType); + readOptions, + lookupMaxRetryTimes, + cache, + connectionProperties, + physicalRowDataType); source.filterClause = filterClause; source.limit = limit; return source; diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java new file mode 100644 index 0000000..60536b0 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java @@ -0,0 +1,170 @@ +package org.apache.flink.connector.clickhouse.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; +import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; +import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper; +import org.apache.flink.connector.clickhouse.internal.options.ClickHouseReadOptions; +import org.apache.flink.connector.clickhouse.util.ClickHouseUtil; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.LookupFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.yandex.clickhouse.ClickHousePreparedStatement; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A lookup function for {@link ClickHouseDynamicTableSource}. */ +@Internal +public class ClickHouseRowDataLookupFunction extends LookupFunction { + + private static final Logger LOG = + LoggerFactory.getLogger(ClickHouseRowDataLookupFunction.class); + + private final String query; + private final ClickHouseConnectionProvider connectionProvider; + private final String[] keyNames; + private final int maxRetryTimes; + private final ClickHouseRowConverter clickhouseRowConverter; + private final ClickHouseRowConverter lookupKeyRowConverter; + + private transient ClickHouseStatementWrapper statement; + + public ClickHouseRowDataLookupFunction( + ClickHouseReadOptions options, + int maxRetryTimes, + String[] fieldNames, + DataType[] fieldTypes, + String[] keyNames, + RowType rowType) { + checkNotNull(options, "No ClickHouseOptions supplied."); + checkNotNull(fieldNames, "No fieldNames supplied."); + checkNotNull(fieldTypes, "No fieldTypes supplied."); + checkNotNull(keyNames, "No keyNames supplied."); + this.connectionProvider = new ClickHouseConnectionProvider(options); + this.keyNames = keyNames; + List nameList = Arrays.asList(fieldNames); + DataType[] keyTypes = + Arrays.stream(keyNames) + .map( + s -> { + checkArgument( + nameList.contains(s), + "keyName %s can't find in fieldNames %s.", + s, + nameList); + return fieldTypes[nameList.indexOf(s)]; + }) + .toArray(DataType[]::new); + this.maxRetryTimes = maxRetryTimes; + this.query = + ClickHouseUtil.getSelectFromStatement(options.getTableName(), fieldNames, keyNames); + this.clickhouseRowConverter = new ClickHouseRowConverter(rowType); + this.lookupKeyRowConverter = + new ClickHouseRowConverter( + RowType.of( + Arrays.stream(keyTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new))); + } + + @Override + public void open(FunctionContext context) throws Exception { + try { + establishConnectionAndStatement(); + } catch (SQLException sqe) { + throw new IllegalArgumentException("open() failed.", sqe); + } catch (ClassNotFoundException cnfe) { + throw new IllegalArgumentException("ClickHouse driver class not found.", cnfe); + } + } + + /** + * This is a lookup method which is called by Flink framework in runtime. + * + * @param keyRow lookup keys + */ + @Override + public Collection lookup(RowData keyRow) { + for (int retry = 0; retry <= maxRetryTimes; retry++) { + try { + statement.clearParameters(); + lookupKeyRowConverter.toExternal(keyRow, statement); + try (ResultSet resultSet = statement.executeQuery()) { + ArrayList rows = new ArrayList<>(); + while (resultSet.next()) { + RowData row = clickhouseRowConverter.toInternal(resultSet); + rows.add(row); + } + rows.trimToSize(); + return rows; + } + } catch (SQLException e) { + LOG.error( + String.format("ClickHouse executeBatch error, retry times = %d", retry), e); + if (retry >= maxRetryTimes) { + throw new RuntimeException("Execution of ClickHouse statement failed.", e); + } + + try { + if (!connectionProvider.isConnectionValid()) { + statement.close(); + connectionProvider.closeConnections(); + establishConnectionAndStatement(); + } + } catch (SQLException | ClassNotFoundException exception) { + LOG.error( + "ClickHouse connection is not valid, and reestablish connection failed", + exception); + throw new RuntimeException( + "Reestablish ClickHouse connection failed", exception); + } + + try { + Thread.sleep(1000L * retry); + } catch (InterruptedException e1) { + throw new RuntimeException(e1); + } + } + } + return Collections.emptyList(); + } + + private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException { + Connection dbConn = connectionProvider.getOrCreateConnection(); + statement = + new ClickHouseStatementWrapper( + (ClickHousePreparedStatement) dbConn.prepareStatement(query)); + } + + @Override + public void close() throws IOException { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + LOG.info("ClickHouse statement could not be closed: " + e.getMessage()); + } finally { + statement = null; + } + } + + connectionProvider.closeConnections(); + } +} diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java index a71b6bb..8e998e1 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java @@ -52,6 +52,10 @@ public ClickHouseConnectionProvider( this.connectionProperties = connectionProperties; } + public boolean isConnectionValid() throws SQLException { + return connection != null; + } + public synchronized ClickHouseConnection getOrCreateConnection() throws SQLException { if (connection == null) { connection = createConnection(options.getUrl(), options.getDatabaseName()); diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java index 7934ddd..a30214c 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java @@ -4,6 +4,7 @@ import java.math.BigDecimal; import java.sql.Date; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; @@ -82,4 +83,12 @@ public void setArray(int parameterIndex, Object[] array) throws SQLException { public void setObject(int parameterIndex, Object x) throws SQLException { statement.setObject(parameterIndex, x); } + + public void clearParameters() throws SQLException { + statement.clearParameters(); + } + + public ResultSet executeQuery() throws SQLException { + return statement.executeQuery(); + } } diff --git a/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java b/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java index 95a11d1..0e26b16 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/util/ClickHouseUtil.java @@ -13,10 +13,13 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; +import static java.lang.String.format; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.flink.connector.clickhouse.config.ClickHouseConfig.PROPERTIES_PREFIX; @@ -95,4 +98,21 @@ private static Expression parseFunctionExpr(String shardingExpr) { Expression expression = parseFunctionExpr(subExprLiteral); return FunctionExpr.of(functionName, singletonList(expression)); } + + public static String getSelectFromStatement( + String tableName, String[] selectFields, String[] conditionFields) { + String selectExpressions = + Arrays.stream(selectFields) + .map(ClickHouseUtil::quoteIdentifier) + .collect(Collectors.joining(", ")); + String fieldExpressions = + Arrays.stream(conditionFields) + .map(f -> format("%s = ?", quoteIdentifier(f))) + .collect(Collectors.joining(" AND ")); + return "SELECT " + + selectExpressions + + " FROM " + + quoteIdentifier(tableName) + + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : ""); + } }