Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use order key as mv's dist key #20176

Merged
merged 6 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchSortAgg { group_key: [mv.v1], aggs: [max(mv.v2)] }
└─BatchExchange { order: [mv.v1 DESC], dist: HashShard(mv.v1) }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: SomeShard }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: UpstreamHashShard(mv.v1) }
- sql: |
create table t(v1 int, v2 int);
select v1, max(v2) from t group by v1 order by v1 desc;
Expand Down Expand Up @@ -367,8 +366,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [mv.v1], aggs: [max(mv.v2)] }
└─BatchExchange { order: [], dist: HashShard(mv.v1) }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: SomeShard }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: UpstreamHashShard(mv.v1) }
with_config_map:
RW_BATCH_ENABLE_SORT_AGG: 'false'
- name: Not use BatchSortAgg, when output requires order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
select v1 from t1 order by v1 limit 3 offset 3;
stream_plan: |-
StreamMaterialize { columns: [v1, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [v1, t1._row_id], pk_conflict: NoCheck }
└─StreamTopN [append_only] { order: [t1.v1 ASC], limit: 3, offset: 3 }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t1.v1) }
└─StreamTopN [append_only] { order: [t1.v1 ASC], limit: 3, offset: 3 }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
Comment on lines +28 to +31
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know if we should make some special behavior for the order by clause with limit... Maybe it is ok because there are not many people write a ToN streaming job and has expectation on its performance

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

- sql: |
create table t1 (v1 int, v2 int) append only;
select max(v1) as max_v1 from t1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
sql: |
select max(v) as a1 from S;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.v], distribution: Single }
BatchSimpleAgg { aggs: [max(max(s.v))] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.v], distribution: SomeShard }
batch_local_plan: |-
BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchExchange { order: [], dist: Single }
Expand Down Expand Up @@ -160,7 +161,7 @@
└─BatchProject { exprs: [max(s.v)] }
└─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] }
└─BatchExchange { order: [], dist: HashShard(s.k) }
└─BatchScan { table: s, columns: [s.k, s.v], distribution: Single }
└─BatchScan { table: s, columns: [s.k, s.v], distribution: SomeShard }
batch_local_plan: |-
BatchProject { exprs: [max(s.v)] }
└─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] }
Expand Down
563 changes: 295 additions & 268 deletions src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
└─BatchScan { table: t1, columns: [t1.id, t1.i], limit: 2, distribution: UpstreamHashShard(t1.id) }
stream_plan: |-
StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
└─StreamExchange { dist: HashShard(t1.id) }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
- name: test functional dependency for order key pruning (order by - suffix fd)
sql: |
create table t1 (id int primary key, i int);
Expand All @@ -35,12 +36,13 @@
└─BatchScan { table: t1, columns: [t1.id, t1.i], distribution: UpstreamHashShard(t1.id) }
stream_plan: |-
StreamMaterialize { columns: [id, i], stream_key: [id], pk_columns: [i, id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
└─StreamExchange { dist: HashShard(t1.i, t1.id) }
└─StreamProject { exprs: [t1.id, t1.i] }
└─StreamTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.i ASC, t1.id ASC], limit: 2, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.id, t1.i, Vnode(t1.id) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.id, t1.i], stream_scan_type: ArrangementBackfill, stream_key: [t1.id], pk: [id], dist: UpstreamHashShard(t1.id) }
- name: test functional dependency for order key pruning on singleton
sql: |
create table t1 (id int primary key, i int);
Expand All @@ -54,8 +56,9 @@
└─BatchSort { order: [v.cnt ASC] }
└─BatchScan { table: v, columns: [v.cnt], distribution: Single }
stream_plan: |-
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
StreamMaterialize { columns: [cnt], stream_key: [], pk_columns: [cnt], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(v.cnt) }
└─StreamTableScan { table: v, columns: [v.cnt], stream_scan_type: ArrangementBackfill, stream_key: [], pk: [], dist: Single }
- name: test functional dependency for order key pruning (index)
sql: |
create table t1 (v1 int, v2 int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,12 +616,13 @@
select * from t1 order by a limit 1;
stream_plan: |-
StreamMaterialize { columns: [a], stream_key: [], pk_columns: [a], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.a] }
└─StreamTopN { order: [t1.a ASC], limit: 1, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.a ASC], limit: 1, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.a, Vnode(t1.a) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.a], stream_scan_type: ArrangementBackfill, stream_key: [t1.a], pk: [a], dist: UpstreamHashShard(t1.a) }
└─StreamExchange { dist: HashShard(t1.a) }
└─StreamProject { exprs: [t1.a] }
└─StreamTopN { order: [t1.a ASC], limit: 1, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [t1.a ASC], limit: 1, offset: 0, group_key: [_vnode] }
└─StreamProject { exprs: [t1.a, Vnode(t1.a) as _vnode] }
└─StreamTableScan { table: t1, columns: [t1.a], stream_scan_type: ArrangementBackfill, stream_key: [t1.a], pk: [a], dist: UpstreamHashShard(t1.a) }
- sql: |
create table t1 (a varchar, b int, c int, d int);
create index idx on t1(a);
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@
└─BatchScan { table: b, columns: [b.x], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [y, z, $expr2(hidden), a._row_id(hidden), b._row_id(hidden), a.x(hidden), b.x(hidden)], stream_key: [a._row_id, b._row_id, a.x, b.x], pk_columns: [$expr2, a._row_id, b._row_id, a.x, b.x], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(a._row_id, b._row_id, a.x, b.x) }
└─StreamExchange { dist: HashShard($expr2) }
└─StreamProject { exprs: [(2:Int32 * $expr1) as $expr3, $expr2, $expr2, a._row_id, b._row_id, a.x, b.x] }
└─StreamProject { exprs: [a.x, b.x, $expr1, ($expr1 + $expr1) as $expr2, a._row_id, b._row_id] }
└─StreamProject { exprs: [a.x, b.x, Coalesce(a.x, b.x) as $expr1, a._row_id, b._row_id] }
Expand Down
10 changes: 6 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/limit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@
└─BatchValues { rows: [[1:Int32]] }
stream_plan: |-
StreamMaterialize { columns: [c, _row_id(hidden)], stream_key: [], pk_columns: [c], pk_conflict: NoCheck }
└─StreamTopN [append_only] { order: [1:Int32 ASC], limit: 1, offset: 0 }
└─StreamValues { rows: [[1:Int32, 0:Int64]] }
└─StreamExchange { dist: HashShard(1:Int32) }
└─StreamTopN [append_only] { order: [1:Int32 ASC], limit: 1, offset: 0 }
└─StreamValues { rows: [[1:Int32, 0:Int64]] }
- sql: |
select 1 c union all select 1 c limit 10
batch_plan: |-
Expand All @@ -117,8 +118,9 @@
└─BatchValues { rows: [[1:Int32], [1:Int32]] }
stream_plan: |-
StreamMaterialize { columns: [c, _row_id(hidden)], stream_key: [_row_id], pk_columns: [c, _row_id], pk_conflict: NoCheck }
└─StreamTopN [append_only] { order: [1:Int32 ASC], limit: 10, offset: 0 }
└─StreamValues { rows: [[1:Int32, 0:Int64], [1:Int32, 1:Int64]] }
└─StreamExchange { dist: HashShard(1:Int32) }
└─StreamTopN [append_only] { order: [1:Int32 ASC], limit: 10, offset: 0 }
└─StreamValues { rows: [[1:Int32, 0:Int64], [1:Int32, 1:Int64]] }
- sql: |
create table t (a int);
select count(*) from t limit 1;
Expand Down
42 changes: 23 additions & 19 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2271,42 +2271,46 @@
└─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id], pk_columns: [bid_count, auction_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction)] }
└─StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0, group_key: [$expr1] }
└─StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction), Vnode(auction.id) as $expr1] }
└─StreamHashAgg { group_key: [auction.id], aggs: [internal_last_seen_value(auction.item_name), count(bid.auction), count] }
└─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all }
├─StreamExchange { dist: HashShard(auction.id) }
│ └─StreamTableScan { table: auction, columns: [auction.id, auction.item_name], stream_scan_type: ArrangementBackfill, stream_key: [auction.id], pk: [id], dist: UpstreamHashShard(auction.id) }
└─StreamExchange { dist: HashShard(bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamExchange { dist: HashShard(count(bid.auction)) }
└─StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction)] }
└─StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 }
└─StreamExchange { dist: Single }
└─StreamGroupTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0, group_key: [$expr1] }
└─StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction), Vnode(auction.id) as $expr1] }
└─StreamHashAgg { group_key: [auction.id], aggs: [internal_last_seen_value(auction.item_name), count(bid.auction), count] }
└─StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all }
├─StreamExchange { dist: HashShard(auction.id) }
│ └─StreamTableScan { table: auction, columns: [auction.id, auction.item_name], stream_scan_type: ArrangementBackfill, stream_key: [auction.id], pk: [id], dist: UpstreamHashShard(auction.id) }
└─StreamExchange { dist: HashShard(bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id], pk_columns: [bid_count, auction_id], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction)] }
└── StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 } { tables: [ TopN: 0 ] }
└── StreamExchange Single from 1
└── StreamExchange Hash([2]) from 1

Fragment 1
StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction)] }
└── StreamTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0 } { tables: [ TopN: 0 ] }
└── StreamExchange Single from 2

Fragment 2
StreamGroupTopN { order: [count(bid.auction) DESC], limit: 1000, offset: 0, group_key: [$expr1] } { tables: [ GroupTopN: 1 ] }
└── StreamProject { exprs: [auction.id, internal_last_seen_value(auction.item_name), count(bid.auction), Vnode(auction.id) as $expr1] }
└── StreamHashAgg { group_key: [auction.id], aggs: [internal_last_seen_value(auction.item_name), count(bid.auction), count] }
├── tables: [ HashAggState: 2 ]
└── StreamHashJoin { type: Inner, predicate: auction.id = bid.auction, output: all }
├── tables: [ HashJoinLeft: 3, HashJoinDegreeLeft: 4, HashJoinRight: 5, HashJoinDegreeRight: 6 ]
├── StreamExchange Hash([0]) from 2
└── StreamExchange Hash([0]) from 3
├── StreamExchange Hash([0]) from 3
└── StreamExchange Hash([0]) from 4

Fragment 2
Fragment 3
StreamTableScan { table: auction, columns: [auction.id, auction.item_name], stream_scan_type: ArrangementBackfill, stream_key: [auction.id], pk: [id], dist: UpstreamHashShard(auction.id) }
├── tables: [ StreamScan: 7 ]
├── Upstream
└── BatchPlanNode

Fragment 3
Fragment 4
StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) }
├── tables: [ StreamScan: 8 ]
├── Upstream
Expand Down Expand Up @@ -2362,7 +2366,7 @@
├── columns: [ auction_id, auction_item_name, bid_count, _rw_timestamp ]
├── primary key: [ $2 DESC, $0 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: []
├── distribution key: [ 2 ]
└── read pk prefix len hint: 2

- id: nexmark_q106
Expand Down
Loading
Loading