Skip to content

Commit

Permalink
refactor(binder): refine resolve source schema (#10195)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Jun 19, 2023
1 parent eeb2a01 commit 3cea9d2
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 389 deletions.
49 changes: 8 additions & 41 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ WITH (
ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'

# but if we let the key as a column, there's no such requirement
statement ok
CREATE TABLE upsert_student_key_not_subset_of_value_avro_json (
key_struct struct<"NOTEXIST" INT> PRIMARY KEY
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_student_key_not_subset_of_value_avro_json')
ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'

statement ok
CREATE TABLE upsert_student_avro_json ()
Expand All @@ -46,7 +35,7 @@ row schema location confluent schema registry 'http://message_queue:8081'

statement ok
CREATE TABLE upsert_avro_json (
id TEXT PRIMARY KEY
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
Expand All @@ -58,7 +47,7 @@ row schema location confluent schema registry 'http://message_queue:8081'
# Just ignore the kafka key, it works
statement ok
CREATE TABLE upsert_avro_json2 (
"ID" TEXT PRIMARY KEY
PRIMARY KEY("ID")
)
WITH (
connector = 'kafka',
Expand All @@ -68,15 +57,15 @@ ROW FORMAT UPSERT_AVRO
row schema location confluent schema registry 'http://message_queue:8081'

statement ok
CREATE TABLE debezium_non_compact (order_id INT PRIMARY KEY) with (
CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'debezium_non_compact_avro_json',
kafka.brokers = 'message_queue:29092',
kafka.scan.startup.mode = 'earliest'
) ROW FORMAT DEBEZIUM_AVRO ROW SCHEMA LOCATION CONFLUENT SCHEMA REGISTRY 'http://message_queue:8081';

statement ok
CREATE TABLE debezium_compact (order_id INT PRIMARY KEY) with (
CREATE TABLE debezium_compact (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'debezium_compact_avro_json',
kafka.brokers = 'message_queue:29092',
Expand All @@ -98,28 +87,10 @@ FROM
ORDER BY
"ID";
----
id1 update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
id2 delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
id3 delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
id5 delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z



query II
SELECT
*
FROM
upsert_student_key_not_subset_of_value_avro_json
ORDER BY
"ID";
----
(1) 1 Ethan Martinez 18 6.1 180
(2) 2 Emily Jackson 19 5.4 110
(3) 3 Noah Thompson 21 6.3 195
(4) 4 Emma Brown 20 5.3 130
(5) 5 Michael Williams 22 6.2 190
(6) 6 Leah Davis 18 5.7 140
(9) 9 Jacob Anderson 20 5.8 155
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z


query II
Expand Down Expand Up @@ -166,10 +137,6 @@ statement ok
DROP TABLE upsert_avro_json;


statement ok
DROP TABLE upsert_student_key_not_subset_of_value_avro_json;


statement ok
DROP TABLE upsert_avro_json2;

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/debezium-mysql/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE orders (order_id INT PRIMARY KEY) with (
CREATE TABLE orders (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'mysql.mydb.orders',
kafka.brokers = 'message_queue:29092',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/debezium-postgres/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE orders (order_id INT PRIMARY KEY) with (
CREATE TABLE orders (PRIMARY KEY(order_id)) with (
connector = 'kafka',
kafka.topic = 'postgres.public.orders',
kafka.brokers = 'message_queue:29092',
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/upsert-avro/create_source.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CREATE TABLE items
(
id text PRIMARY KEY
PRIMARY KEY(id)
)
WITH (
connector='kafka',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
select * from s
logical_plan: |
LogicalProject { exprs: [id, value] }
└─LogicalSource { source: s, columns: [_rw_kafka_timestamp, _row_id, id, value], time_range: [(Unbounded, Unbounded)] }
└─LogicalSource { source: s, columns: [id, value, _rw_kafka_timestamp, _row_id], time_range: [(Unbounded, Unbounded)] }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, value] }
└─BatchSource { source: "s", columns: ["_rw_kafka_timestamp", "_row_id", "id", "value"], filter: (None, None) }
└─BatchSource { source: "s", columns: ["id", "value", "_rw_kafka_timestamp", "_row_id"], filter: (None, None) }
create_source:
row_format: protobuf
name: s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@
logical_plan: |
LogicalProject { exprs: [s.v1, s.v2, s.v3] }
└─LogicalFilter { predicate: (s.v3 = Row(1:Int32, 2:Int32, Row(1:Int32, 2:Int32, 3:Int32))) }
└─LogicalScan { table: s, columns: [s._row_id, s.v1, s.v2, s.v3] }
└─LogicalScan { table: s, columns: [s.v1, s.v2, s.v3, s._row_id] }
create_table_with_connector:
row_format: protobuf
name: s
Expand Down
Loading

0 comments on commit 3cea9d2

Please sign in to comment.