diff --git a/README.md b/README.md index 3510bed..7b79b2e 100644 --- a/README.md +++ b/README.md @@ -1,212 +1,213 @@ -# Flink ClickHouse Connector - -[Flink](https://github.com/apache/flink) SQL connector -for [ClickHouse](https://github.com/yandex/ClickHouse) database, this project Powered -by [ClickHouse JDBC](https://github.com/ClickHouse/clickhouse-jdbc). - -Currently, the project supports `Source/Sink Table` and `Flink Catalog`. -Please create issues if you encounter bugs and any help for the project is greatly appreciated. - -## Connector Options - -| Option | Required | Default | Type | Description | -|:-----------------------------------------|:---------|:---------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| url | required | none | String | The ClickHouse jdbc url in format `clickhouse://:`. | -| username | optional | none | String | The 'username' and 'password' must both be specified if any of them is specified. | -| password | optional | none | String | The ClickHouse password. | -| database-name | optional | default | String | The ClickHouse database name. | -| table-name | required | none | String | The ClickHouse table name. | -| use-local | optional | false | Boolean | Directly read/write local tables in case of distributed table engine. | -| sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data. | -| sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data. | -| sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed. | -| ~~sink.write-local~~ | optional | false | Boolean | Removed from version 1.15, use `use-local` instead. | -| sink.update-strategy | optional | update | String | Convert a record of type UPDATE_AFTER to update/insert statement or just discard it, available: update, insert, discard. | -| sink.partition-strategy | optional | balanced | String | Partition strategy: balanced(round-robin), hash(partition key), shuffle(random). | -| sink.partition-key | optional | none | String | Partition key used for hash strategy. | -| sink.sharding.use-table-definition | optional | false | Boolean | Sharding strategy consistent with definition of distributed table, if set to true, the configuration of `sink.partition-strategy` and `sink.partition-key` will be overwritten. | -| sink.ignore-delete | optional | true | Boolean | Whether to ignore delete statements. | -| sink.parallelism | optional | none | Integer | Defines a custom parallelism for the sink. | -| scan.partition.column | optional | none | String | The column name used for partitioning the input. | -| scan.partition.num | optional | none | Integer | The number of partitions. | -| scan.partition.lower-bound | optional | none | Long | The smallest value of the first partition. | -| scan.partition.upper-bound | optional | none | Long | The largest value of the last partition. | -| catalog.ignore-primary-key | optional | true | Boolean | Whether to ignore primary keys when using ClickHouseCatalog to create table. | -| properties.* | optional | none | String | This can set and pass `clickhouse-jdbc` configurations. | -| lookup.cache | optional | NONE | String | The caching strategy for this lookup table, including NONE and PARTIAL(not support FULL yet) | -| lookup.partial-cache.expire-after-access | optional | none | Duration | Duration to expire an entry in the cache after accessing, over this time, the oldest rows will be expired. | -| lookup.partial-cache.expire-after-write | optional | none | Duration | Duration to expire an entry in the cache after writing, over this time, the oldest rows will be expired. | -| lookup.partial-cache.max-rows | optional | none | Long | The max number of rows of lookup cache, over this value, the oldest rows will be expired. | -| lookup.partial-cache.caching-missing-key | optional | true | Boolean | Flag to cache missing key, true by default | -| lookup.max-retries | optional | 3 | Integer | The max retry times if lookup database failed. | - -**Update/Delete Data Considerations:** - -1. Distributed table don't support the update/delete statements, if you want to use the - update/delete statements, please be sure to write records to local table or set `use-local` to - true. -2. The data is updated and deleted by the primary key, please be aware of this when using it in the - partition table. - -**breaking** - -Since version 1.16, we have taken shard weight into consideration, this may affect which shard the data is distributed to. - -## Data Type Mapping - -| Flink Type | ClickHouse Type | -| :------------------ |:--------------------------------------------------------| -| CHAR | String | -| VARCHAR | String / IP / UUID | -| STRING | String / Enum | -| BOOLEAN | UInt8 | -| BYTES | FixedString | -| DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 | -| TINYINT | Int8 | -| SMALLINT | Int16 / UInt8 | -| INTEGER | Int32 / UInt16 / Interval | -| BIGINT | Int64 / UInt32 | -| FLOAT | Float32 | -| DOUBLE | Float64 | -| DATE | Date | -| TIME | DateTime | -| TIMESTAMP | DateTime | -| TIMESTAMP_LTZ | DateTime | -| INTERVAL_YEAR_MONTH | Int32 | -| INTERVAL_DAY_TIME | Int64 | -| ARRAY | Array | -| MAP | Map | -| ROW | Not supported | -| MULTISET | Not supported | -| RAW | Not supported | - -## Maven Dependency - -The project isn't published to the maven central repository, we need to deploy/install to our own -repository before use it, step as follows: - -```bash -# clone the project -git clone https://github.com/itinycheng/flink-connector-clickhouse.git - -# enter the project directory -cd flink-connector-clickhouse/ - -# display remote branches -git branch -r - -# checkout the branch you need -git checkout $branch_name - -# install or deploy the project to our own repository -mvn clean install -DskipTests -mvn clean deploy -DskipTests -``` - -```xml - - - org.apache.flink - flink-connector-clickhouse - 1.16.0-SNAPSHOT - -``` - -## How to use - -### Create and read/write table - -```SQL - --- register a clickhouse table `t_user` in flink sql. -CREATE TABLE t_user ( - `user_id` BIGINT, - `user_type` INTEGER, - `language` STRING, - `country` STRING, - `gender` STRING, - `score` DOUBLE, - `list` ARRAY, - `map` Map, - PRIMARY KEY (`user_id`) NOT ENFORCED -) WITH ( - 'connector' = 'clickhouse', - 'url' = 'clickhouse://{ip}:{port}', - 'database-name' = 'tutorial', - 'table-name' = 'users', - 'sink.batch-size' = '500', - 'sink.flush-interval' = '1000', - 'sink.max-retries' = '3' -); - --- read data from clickhouse -SELECT user_id, user_type from t_user; - --- write data into the clickhouse table from the table `T` -INSERT INTO t_user -SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP) FROM T; - -``` - -### Create and use ClickHouseCatalog - -#### Scala - -```scala -val tEnv = TableEnvironment.create(setting) - -val props = new util.HashMap[String, String]() -props.put(ClickHouseConfig.DATABASE_NAME, "default") -props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123") -props.put(ClickHouseConfig.USERNAME, "username") -props.put(ClickHouseConfig.PASSWORD, "password") -props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s") -val cHcatalog = new ClickHouseCatalog("clickhouse", props) -tEnv.registerCatalog("clickhouse", cHcatalog) -tEnv.useCatalog("clickhouse") - -tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select..."); -``` - -#### Java - -```java -TableEnvironment tEnv = TableEnvironment.create(setting); - -Map props = new HashMap<>(); -props.put(ClickHouseConfig.DATABASE_NAME, "default") -props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123") -props.put(ClickHouseConfig.USERNAME, "username") -props.put(ClickHouseConfig.PASSWORD, "password") -props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s"); -Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props); -tEnv.registerCatalog("clickhouse", cHcatalog); -tEnv.useCatalog("clickhouse"); - -tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select..."); -``` - -#### SQL - -```sql -> CREATE CATALOG clickhouse WITH ( - 'type' = 'clickhouse', - 'url' = 'clickhouse://127.0.0.1:8123', - 'username' = 'username', - 'password' = 'password', - 'database-name' = 'default', - 'use-local' = 'false', - ... -); - -> USE CATALOG clickhouse; -> SELECT user_id, user_type FROM `default`.`t_user` limit 10; -> INSERT INTO `default`.`t_user` SELECT ...; -``` - -## Roadmap - -- [x] Implement the Flink SQL Sink function. -- [x] Support array and Map types. -- [x] Support ClickHouseCatalog. -- [x] Implement the Flink SQL Source function. +# Flink ClickHouse Connector + +[Flink](https://github.com/apache/flink) SQL connector +for [ClickHouse](https://github.com/yandex/ClickHouse) database, this project Powered +by [ClickHouse JDBC](https://github.com/ClickHouse/clickhouse-jdbc). + +Currently, the project supports `Source/Sink Table` and `Flink Catalog`. +Please create issues if you encounter bugs and any help for the project is greatly appreciated. + +## Connector Options + +| Option | Required | Default | Type | Description | +|:-----------------------------------------|:---------|:---------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | required | none | String | The ClickHouse jdbc url in format `clickhouse://:`. | +| username | optional | none | String | The 'username' and 'password' must both be specified if any of them is specified. | +| password | optional | none | String | The ClickHouse password. | +| database-name | optional | default | String | The ClickHouse database name. | +| table-name | required | none | String | The ClickHouse table name. | +| use-local | optional | false | Boolean | Directly read/write local tables in case of distributed table engine. | +| sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data. | +| sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data. | +| sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed. | +| ~~sink.write-local~~ | optional | false | Boolean | Removed from version 1.15, use `use-local` instead. | +| sink.update-strategy | optional | update | String | Convert a record of type UPDATE_AFTER to update/insert statement or just discard it, available: update, insert, discard. | +| sink.partition-strategy | optional | balanced | String | Partition strategy: balanced(round-robin), hash(partition key), shuffle(random). | +| sink.partition-key | optional | none | String | Partition key used for hash strategy. | +| sink.sharding.use-table-definition | optional | false | Boolean | Sharding strategy consistent with definition of distributed table, if set to true, the configuration of `sink.partition-strategy` and `sink.partition-key` will be overwritten. | +| sink.ignore-delete | optional | true | Boolean | Whether to ignore delete statements. | +| sink.parallelism | optional | none | Integer | Defines a custom parallelism for the sink. | +| scan.partition.column | optional | none | String | The column name used for partitioning the input. | +| scan.partition.num | optional | none | Integer | The number of partitions. | +| scan.partition.lower-bound | optional | none | Long | The smallest value of the first partition. | +| scan.partition.upper-bound | optional | none | Long | The largest value of the last partition. | +| catalog.ignore-primary-key | optional | true | Boolean | Whether to ignore primary keys when using ClickHouseCatalog to create table. | +| properties.* | optional | none | String | This can set and pass `clickhouse-jdbc` configurations. | +| lookup.cache | optional | NONE | String | The caching strategy for this lookup table, including NONE and PARTIAL(not support FULL yet) | +| lookup.partial-cache.expire-after-access | optional | none | Duration | Duration to expire an entry in the cache after accessing, over this time, the oldest rows will be expired. | +| lookup.partial-cache.expire-after-write | optional | none | Duration | Duration to expire an entry in the cache after writing, over this time, the oldest rows will be expired. | +| lookup.partial-cache.max-rows | optional | none | Long | The max number of rows of lookup cache, over this value, the oldest rows will be expired. | +| lookup.partial-cache.caching-missing-key | optional | true | Boolean | Flag to cache missing key, true by default | +| lookup.max-retries | optional | 3 | Integer | The max retry times if lookup database failed. | + +**Update/Delete Data Considerations:** + +1. Distributed table don't support the update/delete statements, if you want to use the + update/delete statements, please be sure to write records to local table or set `use-local` to + true. +2. The data is updated and deleted by the primary key, please be aware of this when using it in the + partition table. + +**breaking** + +Since version 1.16, we have taken shard weight into consideration, this may affect which shard the data is distributed to. + +## Data Type Mapping + +| Flink Type | ClickHouse Type | +|:--------------------|:--------------------------------------------------------| +| CHAR | String | +| VARCHAR | String / IP / UUID | +| STRING | String / Enum | +| BOOLEAN | UInt8 | +| BYTES | FixedString | +| DECIMAL | Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 | +| TINYINT | Int8 | +| SMALLINT | Int16 / UInt8 | +| INTEGER | Int32 / UInt16 / Interval | +| BIGINT | Int64 / UInt32 | +| FLOAT | Float32 | +| DOUBLE | Float64 | +| DATE | Date | +| TIME | DateTime | +| TIMESTAMP | DateTime | +| TIMESTAMP_LTZ | DateTime | +| INTERVAL_YEAR_MONTH | Int32 | +| INTERVAL_DAY_TIME | Int64 | +| ARRAY | Array | +| MAP | Map | +| ROW | Not supported | +| MULTISET | Not supported | +| RAW | Not supported | + +## Maven Dependency + +The project isn't published to the maven central repository, we need to deploy/install to our own +repository before use it, step as follows: + +```bash +# clone the project +git clone https://github.com/itinycheng/flink-connector-clickhouse.git + +# enter the project directory +cd flink-connector-clickhouse/ + +# display remote branches +git branch -r + +# checkout the branch you need +git checkout $branch_name + +# install or deploy the project to our own repository +mvn clean install -DskipTests +mvn clean deploy -DskipTests +``` + +```xml + + + org.apache.flink + flink-connector-clickhouse + 1.16.0-SNAPSHOT + +``` + +## How to use + +### Create and read/write table + +```SQL + +-- register a clickhouse table `t_user` in flink sql. +CREATE TABLE t_user ( + `user_id` BIGINT, + `user_type` INTEGER, + `language` STRING, + `country` STRING, + `gender` STRING, + `score` DOUBLE, + `list` ARRAY, + `map` Map, + PRIMARY KEY (`user_id`) NOT ENFORCED +) WITH ( + 'connector' = 'clickhouse', + 'url' = 'clickhouse://{ip}:{port}', + 'database-name' = 'tutorial', + 'table-name' = 'users', + 'sink.batch-size' = '500', + 'sink.flush-interval' = '1000', + 'sink.max-retries' = '3' +); + +-- read data from clickhouse +SELECT user_id, user_type from t_user; + +-- write data into the clickhouse table from the table `T` +INSERT INTO t_user +SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP) FROM T; + +``` + +### Create and use ClickHouseCatalog + +#### Scala + +```scala +val tEnv = TableEnvironment.create(setting) + +val props = new util.HashMap[String, String]() +props.put(ClickHouseConfig.DATABASE_NAME, "default") +props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123") +props.put(ClickHouseConfig.USERNAME, "username") +props.put(ClickHouseConfig.PASSWORD, "password") +props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s") +val cHcatalog = new ClickHouseCatalog("clickhouse", props) +tEnv.registerCatalog("clickhouse", cHcatalog) +tEnv.useCatalog("clickhouse") + +tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select..."); +``` + +#### Java + +```java +TableEnvironment tEnv = TableEnvironment.create(setting); + +Map props = new HashMap<>(); +props.put(ClickHouseConfig.DATABASE_NAME, "default") +props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123") +props.put(ClickHouseConfig.USERNAME, "username") +props.put(ClickHouseConfig.PASSWORD, "password") +props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s"); +Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props); +tEnv.registerCatalog("clickhouse", cHcatalog); +tEnv.useCatalog("clickhouse"); + +tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select..."); +``` + +#### SQL + +```sql +> CREATE CATALOG clickhouse WITH ( + 'type' = 'clickhouse', + 'url' = 'clickhouse://127.0.0.1:8123', + 'username' = 'username', + 'password' = 'password', + 'database-name' = 'default', + 'use-local' = 'false', + ... +); + +> USE CATALOG clickhouse; +> SELECT user_id, user_type FROM `default`.`t_user` limit 10; +> INSERT INTO `default`.`t_user` SELECT ...; +``` + +## Roadmap + +- [x] Implement the Flink SQL Sink function. +- [x] Support array and Map types. +- [x] Support ClickHouseCatalog. +- [x] Implement the Flink SQL Source function. +- [x] Implement the Flink SQL Lookup function. diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java index 60536b0..8f1ac81 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseRowDataLookupFunction.java @@ -2,8 +2,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; +import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; -import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseReadOptions; import org.apache.flink.connector.clickhouse.util.ClickHouseUtil; import org.apache.flink.table.data.RowData; diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseStatementWrapper.java similarity index 97% rename from src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java rename to src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseStatementWrapper.java index a30214c..0639350 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseStatementWrapper.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseStatementWrapper.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.clickhouse.internal.converter; +package org.apache.flink.connector.clickhouse.internal.connection; import ru.yandex.clickhouse.ClickHousePreparedStatement; diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java index 3b5266f..2376f69 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java @@ -1,5 +1,6 @@ package org.apache.flink.connector.clickhouse.internal.converter; +import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseStatementWrapper; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java index 08a8ca9..28f64c2 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseBatchExecutor.java @@ -3,8 +3,8 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; +import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; -import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions; import org.apache.flink.table.data.RowData; diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java index dcb9747..a692218 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseExecutor.java @@ -3,8 +3,8 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.connector.clickhouse.internal.ClickHouseStatementFactory; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; +import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; -import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; diff --git a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java index f914077..13fd850 100644 --- a/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java +++ b/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java @@ -4,8 +4,8 @@ import org.apache.flink.connector.clickhouse.config.ClickHouseConfigOptions.SinkUpdateStrategy; import org.apache.flink.connector.clickhouse.internal.ClickHouseShardOutputFormat; import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseConnectionProvider; +import org.apache.flink.connector.clickhouse.internal.connection.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseRowConverter; -import org.apache.flink.connector.clickhouse.internal.converter.ClickHouseStatementWrapper; import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions; import org.apache.flink.table.data.RowData;