Skip to content

Commit

Permalink
feat(cdc): init default value for cdc table columns (#19354)
Browse files Browse the repository at this point in the history
Co-authored-by: Kexiang Wang <[email protected]>
  • Loading branch information
StrikeW and KeXiangWang authored Nov 26, 2024
1 parent fc639fe commit 759ea19
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 182 deletions.
46 changes: 46 additions & 0 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,33 @@ create table orders_test (
PRIMARY KEY (order_id)
) from mysql_mytest table 'mytest.orders';

statement error
CREATE TABLE test_my_default_value (
id int,
name varchar,
city varchar DEFAULT 'Beijing',
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
CREATE TABLE test_my_default_value (
id int,
name varchar,
city varchar,
PRIMARY KEY (id)
) FROM mysql_mytest TABLE 'mytest.test_my_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

statement ok
INSERT INTO test_my_default_value VALUES (2, 'jack');

query II
select * from test_my_default_value;
----
2 jack Shanghai


statement ok
create table kt1 (*) from mysql_source table 'kdb.kt1';
Expand Down Expand Up @@ -575,3 +602,22 @@ query II
select * from upper_orders_shared order by id;
----
1 happy

statement ok
CREATE TABLE test_pg_default_value (
id int,
name varchar,
city varchar,
PRIMARY KEY (id)
) FROM pg_source TABLE 'public.test_default_value';

statement ok
SET RW_IMPLICIT_FLUSH=true;

statement ok
INSERT INTO test_pg_default_value VALUES (1, 'noris');

query II
select * from test_pg_default_value;
----
1 noris Shanghai
7 changes: 7 additions & 0 deletions e2e_test/source_legacy/cdc/mysql_create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ CREATE TABLE IF NOT EXISTS mysql_all_types(

INSERT INTO mysql_all_types VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null);
INSERT INTO mysql_all_types VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01');

CREATE TABLE test_my_default_value (
id int,
name varchar(64),
city varchar(200) default 'Shanghai',
PRIMARY KEY (id)
);
8 changes: 8 additions & 0 deletions e2e_test/source_legacy/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,11 @@ INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES

-- Here we create this publication without `WITH ( publish_via_partition_root = true )` only for tests. Normally, it should be added.
create publication rw_publication_pubviaroot_false for TABLE partitioned_timestamp_table;


CREATE TABLE test_default_value (
"id" int,
"name" varchar(64),
"city" varchar(200) default 'Shanghai',
PRIMARY KEY ("id")
);
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ CREATE TABLE shared_orders (
) from mssql_source table 'orders';

# invalid table name
statement error Sql Server table 'dbo'.'wrong_orders' doesn't exist
statement error Sql Server table 'dbo'.'wrong_orders' not found
CREATE TABLE shared_orders (
order_id INT,
order_date BIGINT,
Expand All @@ -183,7 +183,7 @@ CREATE TABLE shared_orders (
) from mssql_source table 'mydb.dbo.wrong_orders';

# invalid schema name
statement error Sql Server table 'wrong_dbo'.'orders' doesn't exist
statement error Sql Server table 'wrong_dbo'.'orders' not found
CREATE TABLE shared_orders (
order_id INT,
order_date BIGINT,
Expand Down Expand Up @@ -295,7 +295,7 @@ CREATE TABLE shared_sqlserver_all_data_types (
PRIMARY KEY (id)
) from mssql_source table 'mydb.dbo.sqlserver_all_data_types';

statement error Sql Server table 'UpperSchema'.'UpperTable' doesn't exist in 'mydb'
statement error Sql Server table 'UpperSchema'.'UpperTable' not found in 'mydb'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
Expand All @@ -309,7 +309,7 @@ CREATE TABLE upper_table (
PRIMARY KEY ("ID")
) from upper_mssql_source table 'UpperDB.UpperSchema.UpperTable';

statement error Sql Server table 'upperSchema'.'upperTable' doesn't exist in 'UpperDB'
statement error Sql Server table 'upperSchema'.'upperTable' not found in 'UpperDB'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/source/cdc/external/sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ impl SqlServerExternalTable {
}
}

// The table does not exist
if column_descs.is_empty() {
bail!(
"Sql Server table '{}'.'{}' not found in '{}'",
config.schema,
config.table,
config.database
);
}

Ok(Self {
column_descs,
pk_names,
Expand Down
36 changes: 0 additions & 36 deletions src/frontend/planner_test/tests/testdata/input/create_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,39 +43,3 @@
) FORMAT PLAIN ENCODE CSV (delimiter = E'\t', without_header = true);
expected_outputs:
- explain_output
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain (logical) create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
expected_outputs:
- explain_output
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
expected_outputs:
- explain_output
42 changes: 0 additions & 42 deletions src/frontend/planner_test/tests/testdata/output/create_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,45 +62,3 @@
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamDml { columns: [v1, v2, _row_id] }
└─StreamSource
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain (logical) create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
explain_output: |
LogicalCdcScan { table: mydb.t1, columns: [v1, v2] }
- id: create_source_with_cdc_backfill
sql: |
create source mysql_mydb with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
explain create table t1_rw (
v1 int,
v2 int,
primary key(v1)
) from mysql_mydb table 'mydb.t1';
explain_output: |
StreamMaterialize { columns: [v1, v2], stream_key: [v1], pk_columns: [v1], pk_conflict: Overwrite }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(mydb.t1.v1) }
│ └─StreamCdcTableScan { table: mydb.t1, columns: [v1, v2] }
└─StreamExchange { dist: HashShard(v1) }
└─StreamDml { columns: [v1, v2] }
└─StreamSource
Loading

0 comments on commit 759ea19

Please sign in to comment.