From bd31bfbd1c940a6f3c0c80404f1ce5c4e9273f72 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Tue, 1 Aug 2023 18:23:26 +0800 Subject: [PATCH] [flink][mysql-cdc] Add synchronizing shards test for MySqlSyncTableAction and do minor code refactor (#1694) --- docs/content/how-to/cdc-ingestion.md | 42 ++++++++++--- .../flink/action/cdc/ComputedColumn.java | 1 - .../action/cdc/{mysql => }/Expression.java | 12 ++-- .../action/cdc/kafka/KafkaActionUtils.java | 2 +- .../action/cdc/mysql/MySqlActionUtils.java | 1 + .../cdc/mysql/MySqlSyncDatabaseAction.java | 2 +- .../cdc/{mysql => }/TruncateComputerTest.java | 2 +- .../cdc/mysql/MySqlSyncTableActionITCase.java | 60 ++++++++++++++++++- .../test/resources/mysql/sync_table_setup.sql | 26 ++++++++ 9 files changed, 130 insertions(+), 18 deletions(-) rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/{mysql => }/Expression.java (98%) rename paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/{mysql => }/TruncateComputerTest.java (99%) diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md index b16e0e7b6d78..a071156ff1ca 100644 --- a/docs/content/how-to/cdc-ingestion.md +++ b/docs/content/how-to/cdc-ingestion.md @@ -72,7 +72,7 @@ To use this feature through `flink run`, run the following shell command. If the Paimon table you specify does not exist, this action will automatically create the table. Its schema will be derived from all specified MySQL tables. If the Paimon table already exists, its schema will be compared against the schema of all specified MySQL tables. -Example +Example 1: synchronize tables into one Paimon table ```bash /bin/flink run \ @@ -83,12 +83,12 @@ Example --table test_table \ --partition-keys pt \ --primary-keys pt,uid \ - --computed-columns '_year=year(age)' \ + --computed-column '_year=year(age)' \ --mysql-conf hostname=127.0.0.1 \ --mysql-conf username=root \ --mysql-conf password=123456 \ - --mysql-conf database-name=source_db \ - --mysql-conf table-name='source_table' \ + --mysql-conf database-name='source_db' \ + --mysql-conf table-name='source_table1|source_table2' \ --catalog-conf metastore=hive \ --catalog-conf uri=thrift://hive-metastore:9083 \ --table-conf bucket=4 \ @@ -96,8 +96,36 @@ Example --table-conf sink.parallelism=4 ``` -The mysql-conf table-name also supports regular expressions to monitor multiple tables that satisfy -the regular expressions. +As example shows, the mysql-conf's table-name supports regular expressions to monitor multiple tables that satisfy +the regular expressions. The schemas of all the tables will be merged into one Paimon table schema. + +Example 2: synchronize shards into one Paimon table + +You can also set 'database-name' with a regular expression to capture multiple databases. A typical scenario is that a +table 'source_table' is split into database 'source_db1', 'source_db2' ..., then you can synchronize data of all the +'source_table's into one Paimon table. + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + mysql-sync-table \ + --warehouse hdfs:///path/to/warehouse \ + --database test_db \ + --table test_table \ + --partition-keys pt \ + --primary-keys pt,uid \ + --computed-column '_year=year(age)' \ + --mysql-conf hostname=127.0.0.1 \ + --mysql-conf username=root \ + --mysql-conf password=123456 \ + --mysql-conf database-name='source_db.+' \ + --mysql-conf table-name='source_table' \ + --catalog-conf metastore=hive \ + --catalog-conf uri=thrift://hive-metastore:9083 \ + --table-conf bucket=4 \ + --table-conf changelog-producer=input \ + --table-conf sink.parallelism=4 +``` ### Synchronizing Databases @@ -301,7 +329,7 @@ Example --table test_table \ --partition-keys pt \ --primary-keys pt,uid \ - --computed-columns '_year=year(age)' \ + --computed-column '_year=year(age)' \ --kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \ --kafka-conf topic=order \ --kafka-conf properties.group.id=123456 \ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java index df8142120e2a..3b262fafdae8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.action.cdc; -import org.apache.paimon.flink.action.cdc.mysql.Expression; import org.apache.paimon.types.DataType; import javax.annotation.Nullable; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java similarity index 98% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java index 6566958c3643..e38270524c4d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/Expression.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.mysql; +package org.apache.paimon.flink.action.cdc; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -288,9 +288,9 @@ final class TruncateComputer implements Expression { private final String fieldReference; - private DataType fieldType; + private final DataType fieldType; - private int width; + private final int width; TruncateComputer(String fieldReference, DataType fieldType, String literal) { this.fieldReference = fieldReference; @@ -320,11 +320,11 @@ public String eval(String input) { switch (fieldType.getTypeRoot()) { case TINYINT: case SMALLINT: - return String.valueOf(truncateShort(width, Short.valueOf(input))); + return String.valueOf(truncateShort(width, Short.parseShort(input))); case INTEGER: - return String.valueOf(truncateInt(width, Integer.valueOf(input))); + return String.valueOf(truncateInt(width, Integer.parseInt(input))); case BIGINT: - return String.valueOf(truncateLong(width, Long.valueOf(input))); + return String.valueOf(truncateLong(width, Long.parseLong(input))); case DECIMAL: return truncateDecimal(BigInteger.valueOf(width), new BigDecimal(input)) .toString(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 148e3ced24a0..93330eb4ed12 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.action.cdc.kafka; import org.apache.paimon.flink.action.cdc.ComputedColumn; -import org.apache.paimon.flink.action.cdc.mysql.Expression; +import org.apache.paimon.flink.action.cdc.Expression; import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.TableSchema; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java index b2a89f03d357..fdd99975de55 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.action.cdc.ComputedColumn; +import org.apache.paimon.flink.action.cdc.Expression; import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.TableSchema; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 048d9deb2230..cf948e09168e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -364,7 +364,7 @@ private String buildTableList( // tables, so we should use regular expression to monitor all valid tables and exclude // certain invalid tables - // The table list is build by template: + // The table list is built by template: // (?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...)) // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables whose qualified name diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/TruncateComputerTest.java similarity index 99% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/TruncateComputerTest.java index c36fb86874a0..5f84c07eefc0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/TruncateComputerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/TruncateComputerTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.flink.action.cdc.mysql; +package org.apache.paimon.flink.action.cdc; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BooleanType; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 2ca19d522d20..9d77ca31966f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -945,7 +945,7 @@ public void testTinyInt1Convert() throws Exception { List expected = Arrays.asList( "+I[1, 2021-09-15T15:00:10, 21]", "+I[2, 2023-03-23T16:00:20, 42]"); - waitForResult(expected, table, rowType, Arrays.asList("pk")); + waitForResult(expected, table, rowType, Collections.singletonList("pk")); } } @@ -1033,6 +1033,64 @@ private void testSchemaEvolutionImplWithTinyIntConvert(Statement statement) thro waitForResult(expected, table, rowType, primaryKeys); } + @Test + public void testSyncShards() throws Exception { + Map mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "shard_.+"); + mySqlConfig.put("table-name", "t.+"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(2); + env.enableCheckpointing(1000); + env.setRestartStrategy(RestartStrategies.noRestart()); + + MySqlSyncTableAction action = + new MySqlSyncTableAction( + mySqlConfig, + warehouse, + database, + tableName, + Collections.singletonList("pt"), + Arrays.asList("pk", "pt"), + Collections.singletonList("pt=substring(_date,5)"), + Collections.emptyMap(), + Collections.emptyMap()); + action.build(env); + JobClient client = env.executeAsync(); + waitJobRunning(client); + + try (Connection conn = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME), + MYSQL_CONTAINER.getUsername(), + MYSQL_CONTAINER.getPassword()); + Statement statement = conn.createStatement()) { + statement.execute("USE shard_1"); + statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30'), (2, '2023-07-30')"); + statement.execute("USE shard_2"); + statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31'), (4, '2023-07-31')"); + } + + FileStoreTable table = getFileStoreTable(); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.VARCHAR(10), + DataTypes.STRING().notNull() + }, + new String[] {"pk", "_date", "pt"}); + waitForResult( + Arrays.asList( + "+I[1, 2023-07-30, 07-30]", + "+I[2, 2023-07-30, 07-30]", + "+I[3, 2023-07-31, 07-31]", + "+I[4, 2023-07-31, 07-31]"), + table, + rowType, + Arrays.asList("pk", "pt")); + } + private FileStoreTable getFileStoreTable() throws Exception { return getFileStoreTable(tableName); } diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql index eb512695f7a1..608e324ededd 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/sync_table_setup.sql @@ -281,6 +281,10 @@ CREATE TABLE test_tinyint1_convert ( PRIMARY KEY (pk) ); +-- ################################################################################ +-- testSchemaEvolutionWithTinyint1Convert +-- ################################################################################ + CREATE DATABASE paimon_sync_table_tinyint; USE paimon_sync_table_tinyint; @@ -290,3 +294,25 @@ CREATE TABLE schema_evolution_3 ( v1 VARCHAR(10) comment 'v1', PRIMARY KEY (_id) ); + +-- ################################################################################ +-- testSyncShard +-- ################################################################################ + +CREATE DATABASE shard_1; +USE shard_1; + +CREATE TABLE t1 ( + pk INT, + _date VARCHAR(10), + PRIMARY KEY (pk) +); + +CREATE DATABASE shard_2; +USE shard_2; + +CREATE TABLE t1 ( + pk INT, + _date VARCHAR(10), + PRIMARY KEY (pk) +);