Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): init default value for cdc table columns #19354

Merged
merged 14 commits into from
Nov 26, 2024
46 changes: 46 additions & 0 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,33 @@ create table orders_test (
PRIMARY KEY (order_id)
) from mysql_mytest table 'mytest.orders';

statement error
CREATE TABLE test_my_default_value (
id int,
name varchar,
city varchar DEFAULT 'Beijing',
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
CREATE TABLE test_my_default_value (
id int,
name varchar,
city varchar,
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

statement ok
INSERT INTO test_my_default_value VALUES (2, 'jack');

query II
select * from test_my_default_value;
----
2 jack Shanghai


statement ok
create table kt1 (*) from mysql_source table 'kdb.kt1';
Expand Down Expand Up @@ -575,3 +602,22 @@ query II
select * from upper_orders_shared order by id;
----
1 happy

statement ok
CREATE TABLE test_pg_default_value (
id int,
name varchar,
city varchar,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

statement ok
INSERT INTO test_pg_default_value VALUES (1, 'noris');

query II
select * from test_pg_default_value;
----
1 noris Shanghai
7 changes: 7 additions & 0 deletions e2e_test/source_legacy/cdc/mysql_create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ CREATE TABLE IF NOT EXISTS mysql_all_types(

INSERT INTO mysql_all_types VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null);
INSERT INTO mysql_all_types VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01');

CREATE TABLE test_my_default_value (
id int,
name varchar(64),
city varchar(200) default 'Shanghai',
PRIMARY KEY (id)
);
8 changes: 8 additions & 0 deletions e2e_test/source_legacy/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,11 @@ INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES

-- Here we create this publication without `WITH ( publish_via_partition_root = true )` only for tests. Normally, it should be added.
create publication rw_publication_pubviaroot_false for TABLE partitioned_timestamp_table;


CREATE TABLE test_default_value (
"id" int,
"name" varchar(64),
"city" varchar(200) default 'Shanghai',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why we only support inferring default value from pg not mysql?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added test for mysql too.

PRIMARY KEY ("id")
);
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ CREATE TABLE shared_orders (
) from mssql_source table 'orders';

# invalid table name
statement error Sql Server table 'dbo'.'wrong_orders' doesn't exist
statement error Sql Server table 'dbo'.'wrong_orders' not found
CREATE TABLE shared_orders (
order_id INT,
order_date BIGINT,
Expand All @@ -183,7 +183,7 @@ CREATE TABLE shared_orders (
) from mssql_source table 'mydb.dbo.wrong_orders';

# invalid schema name
statement error Sql Server table 'wrong_dbo'.'orders' doesn't exist
statement error Sql Server table 'wrong_dbo'.'orders' not found
CREATE TABLE shared_orders (
order_id INT,
order_date BIGINT,
Expand Down Expand Up @@ -295,7 +295,7 @@ CREATE TABLE shared_sqlserver_all_data_types (
PRIMARY KEY (id)
) from mssql_source table 'mydb.dbo.sqlserver_all_data_types';

statement error Sql Server table 'UpperSchema'.'UpperTable' doesn't exist in 'mydb'
statement error Sql Server table 'UpperSchema'.'UpperTable' not found in 'mydb'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
Expand All @@ -309,7 +309,7 @@ CREATE TABLE upper_table (
PRIMARY KEY ("ID")
) from upper_mssql_source table 'UpperDB.UpperSchema.UpperTable';

statement error Sql Server table 'upperSchema'.'upperTable' doesn't exist in 'UpperDB'
statement error Sql Server table 'upperSchema'.'upperTable' not found in 'UpperDB'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/source/cdc/external/sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ impl SqlServerExternalTable {
}
}

// The table does not exist
if column_descs.is_empty() {
bail!(
"Sql Server table '{}'.'{}' not found in '{}'",
config.schema,
config.table,
config.database
);
}

Ok(Self {
column_descs,
pk_names,
Expand Down
36 changes: 0 additions & 36 deletions src/frontend/planner_test/tests/testdata/input/create_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,39 +43,3 @@
) FORMAT PLAIN ENCODE CSV (delimiter = E'\t', without_header = true);
expected_outputs:
- explain_output
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain (logical) create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
expected_outputs:
- explain_output
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
expected_outputs:
- explain_output
42 changes: 0 additions & 42 deletions src/frontend/planner_test/tests/testdata/output/create_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,45 +62,3 @@
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain (logical) create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
explain_output: |
LogicalCdcScan { table: mydb.t1, columns: [v1, v2] }
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
explain_output: |
StreamMaterialize { columns: [v1, v2], stream_key: [v1], pk_columns: [v1], pk_conflict: Overwrite }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(mydb.t1.v1) }
│ └─StreamCdcTableScan { table: mydb.t1, columns: [v1, v2] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamDml { columns: [v1, v2] }
└─StreamSource
Loading
Loading