Skip to content

Commit

Permalink
[MySQL] Optimize how to construct config table.include.list (apache#2274
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ruanhang1993 authored and zhongqishang committed Dec 7, 2023
1 parent 4ddc0af commit 874b1f9
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 6 deletions.
5 changes: 3 additions & 2 deletions docs/content/connectors/mysql-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Flink SQL> SELECT * FROM orders;
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>要监视的 MySQL 数据库的表名。表名还支持正则表达式,以监视多个表与正则表达式匹配。</td>
<td>需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。注意:MySQL CDC 连接器在正则匹配表名时,会把用户填写的 database-name, table-name 通过字符串 `\\.` 连接成一个全路径的正则表达式,然后使用该正则表达式和 MySQL 数据库中表的全限定名进行正则匹配。</td>
</tr>
<tr>
<td>port</td>
Expand Down Expand Up @@ -433,7 +433,8 @@ CREATE TABLE products (
</tr>
</tbody>
</table>
进行库表匹配时,使用的模式是database-name.table-name,所以该例子使用(^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt),匹配txc.tt、test2.test5。

进行库表匹配时,会使用正则表达式 `database-name\\.table-name` 来与MySQL表的全限定名做匹配,所以该例子使用 `(^(test).*|^(tpc).*|txc|.*[p$]|t{2})\\.(t[5-8]|tt)`,可以匹配到表 txc.tt、test2.test5。

支持的特性
--------
Expand Down
8 changes: 6 additions & 2 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ Connector Options
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression.</td>
<td>
Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables that satisfy the regular expressions. Note: When the MySQL CDC connector regularly matches the table name, it will concat the database-name and table-name filled in by the user through the string `\\.` to form a full-path regular expression, and then use the regular expression to match the fully qualified name of the table in the MySQL database.
</td>
</tr>
<tr>
<td>port</td>
Expand Down Expand Up @@ -436,7 +438,9 @@ CREATE TABLE products (
</tr>
</tbody>
</table>
It will use database.table as a pattern to match tables, as above examples using (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt) matches txc.tt、test2.test5.

It will use `database-name\\.table-name` as a pattern to match tables, as above examples using pattern `(^(test).*|^(tpc).*|txc|.*[p$]|t{2})\\.(t[5-8]|tt)` matches txc.tt、test2.test5.


Features
--------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.hostname(hostname)
.port(port)
.databaseList(database)
.tableList(database + "." + tableName)
// MySQL debezium connector will use the regular expressions to match
// the fully-qualified table identifiers of tables.
// We need use "\\." insteadof "." .
.tableList(database + "\\." + tableName)
.username(username)
.password(password)
.serverTimeZone(serverTimeZone.toString())
Expand Down Expand Up @@ -205,7 +208,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.hostname(hostname)
.port(port)
.databaseList(database)
.tableList(database + "." + tableName)
.tableList(database + "\\." + tableName)
.username(username)
.password(password)
.serverTimeZone(serverTimeZone.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,79 @@ public void testReadingWithDotTableName() throws Exception {
customer3_0Database.dropDatabase();
}

@Test
public void testReadingWithRegexPattern() throws Exception {
env.setRestartStrategy(RestartStrategies.noRestart());
customerDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE customers ("
+ " `id` INTEGER NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customerDatabase.getUsername(),
customerDatabase.getPassword(),
// The regular regex from database-name and table-name will be
// e.g. 'customer_c2dsd.*\\.customers'. This should only contain the
// customer_c2dsd.customers table. And the customer_c2dsd.prefix_customers
// table must not be contained.
String.format("%s.*", customerDatabase.getDatabaseName()),
"customers",
incrementalSnapshot,
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM customers");

CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);

String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}

@Test
public void testDdlWithDefaultStringValue() throws Exception {
if (!incrementalSnapshot) {
Expand Down
14 changes: 14 additions & 0 deletions flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ VALUES (101,"user_1","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");

-- Create a table will not be read
CREATE TABLE prefix_customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);

INSERT INTO prefix_customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234");

-- table has same name prefix with 'customers.*'
CREATE TABLE customers_1 (
id INTEGER NOT NULL PRIMARY KEY,
Expand Down

0 comments on commit 874b1f9

Please sign in to comment.