Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(binder): refine resolve source schema #10195

Merged
merged 21 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 8 additions & 41 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ WITH (
ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'

# but if we let the key as a column, there's no such requirement
statement ok
CREATE TABLE upsert_student_key_not_subset_of_value_avro_json (
key_struct struct<"NOTEXIST" INT> PRIMARY KEY
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_student_key_not_subset_of_value_avro_json')
ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'

statement ok
CREATE TABLE upsert_student_avro_json ()
Expand All @@ -46,7 +35,7 @@ row schema location confluent schema registry 'http://message_queue:8081'

statement ok
CREATE TABLE upsert_avro_json (
id TEXT PRIMARY KEY
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
Expand All @@ -58,7 +47,7 @@ row schema location confluent schema registry 'http://message_queue:8081'
# Just ignore the kafka key, it works
statement ok
CREATE TABLE upsert_avro_json2 (
"ID" TEXT PRIMARY KEY
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
Expand All @@ -68,15 +57,15 @@ ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'

statement ok
CREATE TABLE debezium_non_compact (order_id INT PRIMARY KEY) with (
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'debezium_non_compact_avro_json',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) ROW FORMAT DEBEZIUM_AVRO ROW SCHEMA LOCATION CONFLUENT SCHEMA REGISTRY 'http://message_queue:8081';

statement ok
CREATE TABLE debezium_compact (order_id INT PRIMARY KEY) with (
CREATE TABLE debezium_compact (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'debezium_compact_avro_json',
kafka.brokers = 'message_queue:29092',
Expand All @@ -98,28 +87,10 @@ FROM
ORDER BY
"ID";
----
id1 update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
id2 delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
id3 delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
id5 delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z



query II
SELECT
*
FROM
upsert_student_key_not_subset_of_value_avro_json
ORDER BY
"ID";
----
(1) 1 Ethan Martinez 18 6.1 180
(2) 2 Emily Jackson 19 5.4 110
(3) 3 Noah Thompson 21 6.3 195
(4) 4 Emma Brown 20 5.3 130
(5) 5 Michael Williams 22 6.2 190
(6) 6 Leah Davis 18 5.7 140
(9) 9 Jacob Anderson 20 5.8 155
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z


query II
Expand Down Expand Up @@ -166,10 +137,6 @@ statement ok
DROP TABLE upsert_avro_json;


statement ok
DROP TABLE upsert_student_key_not_subset_of_value_avro_json;


statement ok
DROP TABLE upsert_avro_json2;

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/debezium-mysql/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE orders (order_id INT PRIMARY KEY) with (
CREATE TABLE orders (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'mysql.mydb.orders',
kafka.brokers = 'message_queue:29092',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/debezium-postgres/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE orders (order_id INT PRIMARY KEY) with (
CREATE TABLE orders (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'postgres.public.orders',
kafka.brokers = 'message_queue:29092',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/upsert-avro/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE TABLE items
(
id text PRIMARY KEY
PRIMARY KEY(id)
)
WITH (
connector='kafka',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
select * from s
logical_plan: |
LogicalProject { exprs: [id, value] }
└─LogicalSource { source: s, columns: [_rw_kafka_timestamp, _row_id, id, value], time_range: [(Unbounded, Unbounded)] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], time_range: [(Unbounded, Unbounded)] }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, value] }
└─BatchSource { source: "s", columns: ["_rw_kafka_timestamp", "_row_id", "id", "value"], filter: (None, None) }
└─BatchSource { source: "s", columns: ["id", "value", "_rw_kafka_timestamp", "_row_id"], filter: (None, None) }
create_source:
row_format: protobuf
name: s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@
logical_plan: |
LogicalProject { exprs: [s.v1, s.v2, s.v3] }
└─LogicalFilter { predicate: (s.v3 = Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, 3:Int32))) }
└─LogicalScan { table: s, columns: [s._row_id, s.v1, s.v2, s.v3] }
└─LogicalScan { table: s, columns: [s.v1, s.v2, s.v3, s._row_id] }
create_table_with_connector:
row_format: protobuf
name: s
Expand Down
Loading