Skip to content

Commit

Permalink
[flink][mysql-cdc] Add synchronizing shards test for MySqlSyncTableAc…
Browse files Browse the repository at this point in the history
…tion and do minor code refactor (apache#1694)
  • Loading branch information
yuzelin authored Aug 1, 2023
1 parent 4304363 commit bd31bfb
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 18 deletions.
42 changes: 35 additions & 7 deletions docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ To use this feature through `flink run`, run the following shell command.
If the Paimon table you specify does not exist, this action will automatically create the table. Its schema will be derived from all specified MySQL tables. If the Paimon table already exists, its schema will be compared against the schema of all specified MySQL tables.
Example
Example 1: synchronize tables into one Paimon table
```bash
<FLINK_HOME>/bin/flink run \
Expand All @@ -83,21 +83,49 @@ Example
--table test_table \
--partition-keys pt \
--primary-keys pt,uid \
--computed-columns '_year=year(age)' \
--computed-column '_year=year(age)' \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=source_db \
--mysql-conf table-name='source_table' \
--mysql-conf database-name='source_db' \
--mysql-conf table-name='source_table1|source_table2' \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4
```
The mysql-conf table-name also supports regular expressions to monitor multiple tables that satisfy
the regular expressions.
As example shows, the mysql-conf's table-name supports regular expressions to monitor multiple tables that satisfy
the regular expressions. The schemas of all the tables will be merged into one Paimon table schema.
Example 2: synchronize shards into one Paimon table
You can also set 'database-name' with a regular expression to capture multiple databases. A typical scenario is that a
table 'source_table' is split into database 'source_db1', 'source_db2' ..., then you can synchronize data of all the
'source_table's into one Paimon table.
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
mysql-sync-table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table test_table \
--partition-keys pt \
--primary-keys pt,uid \
--computed-column '_year=year(age)' \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name='source_db.+' \
--mysql-conf table-name='source_table' \
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4
```
### Synchronizing Databases
Expand Down Expand Up @@ -301,7 +329,7 @@ Example
--table test_table \
--partition-keys pt \
--primary-keys pt,uid \
--computed-columns '_year=year(age)' \
--computed-column '_year=year(age)' \
--kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \
--kafka-conf topic=order \
--kafka-conf properties.group.id=123456 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.cdc.mysql.Expression;
import org.apache.paimon.types.DataType;

import javax.annotation.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.mysql;
package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
Expand Down Expand Up @@ -288,9 +288,9 @@ final class TruncateComputer implements Expression {

private final String fieldReference;

private DataType fieldType;
private final DataType fieldType;

private int width;
private final int width;

TruncateComputer(String fieldReference, DataType fieldType, String literal) {
this.fieldReference = fieldReference;
Expand Down Expand Up @@ -320,11 +320,11 @@ public String eval(String input) {
switch (fieldType.getTypeRoot()) {
case TINYINT:
case SMALLINT:
return String.valueOf(truncateShort(width, Short.valueOf(input)));
return String.valueOf(truncateShort(width, Short.parseShort(input)));
case INTEGER:
return String.valueOf(truncateInt(width, Integer.valueOf(input)));
return String.valueOf(truncateInt(width, Integer.parseInt(input)));
case BIGINT:
return String.valueOf(truncateLong(width, Long.valueOf(input)));
return String.valueOf(truncateLong(width, Long.parseLong(input)));
case DECIMAL:
return truncateDecimal(BigInteger.valueOf(width), new BigDecimal(input))
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.mysql.Expression;
import org.apache.paimon.flink.action.cdc.Expression;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.Expression;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private String buildTableList(
// tables, so we should use regular expression to monitor all valid tables and exclude
// certain invalid tables

// The table list is build by template:
// The table list is built by template:
// (?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))

// The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables whose qualified name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.mysql;
package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BooleanType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ public void testTinyInt1Convert() throws Exception {
List<String> expected =
Arrays.asList(
"+I[1, 2021-09-15T15:00:10, 21]", "+I[2, 2023-03-23T16:00:20, 42]");
waitForResult(expected, table, rowType, Arrays.asList("pk"));
waitForResult(expected, table, rowType, Collections.singletonList("pk"));
}
}

Expand Down Expand Up @@ -1033,6 +1033,64 @@ private void testSchemaEvolutionImplWithTinyIntConvert(Statement statement) thro
waitForResult(expected, table, rowType, primaryKeys);
}

@Test
public void testSyncShards() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", "shard_.+");
mySqlConfig.put("table-name", "t.+");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.noRestart());

MySqlSyncTableAction action =
new MySqlSyncTableAction(
mySqlConfig,
warehouse,
database,
tableName,
Collections.singletonList("pt"),
Arrays.asList("pk", "pt"),
Collections.singletonList("pt=substring(_date,5)"),
Collections.emptyMap(),
Collections.emptyMap());
action.build(env);
JobClient client = env.executeAsync();
waitJobRunning(client);

try (Connection conn =
DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword());
Statement statement = conn.createStatement()) {
statement.execute("USE shard_1");
statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30'), (2, '2023-07-30')");
statement.execute("USE shard_2");
statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31'), (4, '2023-07-31')");
}

FileStoreTable table = getFileStoreTable();
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.VARCHAR(10),
DataTypes.STRING().notNull()
},
new String[] {"pk", "_date", "pt"});
waitForResult(
Arrays.asList(
"+I[1, 2023-07-30, 07-30]",
"+I[2, 2023-07-30, 07-30]",
"+I[3, 2023-07-31, 07-31]",
"+I[4, 2023-07-31, 07-31]"),
table,
rowType,
Arrays.asList("pk", "pt"));
}

private FileStoreTable getFileStoreTable() throws Exception {
return getFileStoreTable(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ CREATE TABLE test_tinyint1_convert (
PRIMARY KEY (pk)
);

-- ################################################################################
-- testSchemaEvolutionWithTinyint1Convert
-- ################################################################################

CREATE DATABASE paimon_sync_table_tinyint;
USE paimon_sync_table_tinyint;

Expand All @@ -290,3 +294,25 @@ CREATE TABLE schema_evolution_3 (
v1 VARCHAR(10) comment 'v1',
PRIMARY KEY (_id)
);

-- ################################################################################
-- testSyncShard
-- ################################################################################

CREATE DATABASE shard_1;
USE shard_1;

CREATE TABLE t1 (
pk INT,
_date VARCHAR(10),
PRIMARY KEY (pk)
);

CREATE DATABASE shard_2;
USE shard_2;

CREATE TABLE t1 (
pk INT,
_date VARCHAR(10),
PRIMARY KEY (pk)
);

0 comments on commit bd31bfb

Please sign in to comment.