Skip to content

Commit

Permalink
refactor(sink): prune out hidden columns within sink executor (#10276)
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx authored Jun 15, 2023
1 parent d818a00 commit ea7f95b
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 96 deletions.
4 changes: 3 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,17 @@ message SourceNode {
}

message SinkDesc {
reserved 4;
reserved "columns";
uint32 id = 1;
string name = 2;
string definition = 3;
repeated plan_common.ColumnDesc columns = 4;
repeated common.ColumnOrder plan_pk = 5;
repeated uint32 downstream_pk = 6;
repeated uint32 distribution_key = 7;
map<string, string> properties = 8;
catalog.SinkType sink_type = 9;
repeated plan_common.ColumnCatalog column_catalogs = 10;
}

message SinkNode {
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use risingwave_common::catalog::{
ColumnCatalog, ConnectionId, DatabaseId, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::plan_common::PbColumnDesc;
use risingwave_pb::stream_plan::PbSinkDesc;

use super::{SinkCatalog, SinkId, SinkType};
Expand Down Expand Up @@ -89,10 +88,10 @@ impl SinkDesc {
id: self.id.sink_id,
name: self.name.clone(),
definition: self.definition.clone(),
columns: self
column_catalogs: self
.columns
.iter()
.map(|column| Into::<PbColumnDesc>::into(&column.column_desc))
.map(|column| column.to_protobuf())
.collect_vec(),
plan_pk: self.plan_pk.iter().map(|k| k.to_protobuf()).collect_vec(),
downstream_pk: self.downstream_pk.iter().map(|idx| *idx as _).collect_vec(),
Expand Down
54 changes: 24 additions & 30 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@
BatchExchange { order: [], dist: Single }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard }
sink_plan: |
StreamSink { type: append-only, columns: [auction, bidder, price, date_time] }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, date_time, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Expand Down Expand Up @@ -89,11 +88,10 @@
└─BatchProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time] }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard }
sink_plan: |
StreamSink { type: append-only, columns: [auction, bidder, price, date_time] }
└─StreamProject { exprs: [bid.auction, bid.bidder, $expr1, bid.date_time] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, bid._row_id(hidden)] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, date_time, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, bid.date_time, bid._row_id] }
Expand Down Expand Up @@ -132,11 +130,10 @@
└─BatchFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─BatchScan { table: bid, columns: [bid.auction, bid.price], distribution: SomeShard }
sink_plan: |
StreamSink { type: append-only, columns: [auction, price] }
└─StreamProject { exprs: [bid.auction, bid.price] }
└─StreamExchange { dist: Single }
└─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
StreamSink { type: append-only, columns: [auction, price, bid._row_id(hidden)] }
└─StreamExchange { dist: Single }
└─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.price, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [auction, price, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
└─StreamFilter { predicate: (((((bid.auction = 1007:Int32) OR (bid.auction = 1020:Int32)) OR (bid.auction = 2001:Int32)) OR (bid.auction = 2019:Int32)) OR (bid.auction = 2087:Int32)) }
Expand Down Expand Up @@ -820,11 +817,10 @@
└─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2] }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time], distribution: SomeShard }
sink_plan: |
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, date, time] }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, $expr1, $expr2] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, date_time, date, time, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, ToChar(bid.date_time, 'YYYY-MM-DD':Varchar) as $expr1, ToChar(bid.date_time, 'HH:MI':Varchar) as $expr2, bid._row_id] }
Expand Down Expand Up @@ -927,12 +923,11 @@
└─BatchFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra], distribution: SomeShard }
sink_plan: |
StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time, extra] }
└─StreamProject { exprs: [bid.auction, bid.bidder, $expr1, $expr2, bid.date_time, bid.extra] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] }
└─StreamFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
StreamSink { type: append-only, columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] }
└─StreamFilter { predicate: ((0.908:Decimal * bid.price::Decimal) > 1000000:Decimal) AND ((0.908:Decimal * bid.price::Decimal) < 50000000:Decimal) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, bidtimetype, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [bid.auction, bid.bidder, (0.908:Decimal * bid.price::Decimal) as $expr1, Case(((Extract('HOUR':Varchar, bid.date_time) >= 8:Decimal) AND (Extract('HOUR':Varchar, bid.date_time) <= 18:Decimal)), 'dayTime':Varchar, ((Extract('HOUR':Varchar, bid.date_time) <= 6:Decimal) OR (Extract('HOUR':Varchar, bid.date_time) >= 20:Decimal)), 'nightTime':Varchar, 'otherTime':Varchar) as $expr2, bid.date_time, bid.extra, bid._row_id] }
Expand Down Expand Up @@ -1385,11 +1380,10 @@
└─BatchProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3] }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url], distribution: SomeShard }
sink_plan: |
StreamSink { type: append-only, columns: [auction, bidder, price, channel, dir1, dir2, dir3] }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, $expr1, $expr2, $expr3] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
StreamSink { type: append-only, columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, channel, dir1, dir2, dir3, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, SplitPart(bid.url, '/':Varchar, 4:Int32) as $expr1, SplitPart(bid.url, '/':Varchar, 5:Int32) as $expr2, SplitPart(bid.url, '/':Varchar, 6:Int32) as $expr3, bid._row_id] }
Expand Down
20 changes: 1 addition & 19 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use self::plan_visitor::InputRefValidator;
use self::property::RequiredDist;
use self::rule::*;
use crate::catalog::table_catalog::{TableType, TableVersion};
use crate::expr::InputRef;
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::{
BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive,
Expand Down Expand Up @@ -523,24 +522,7 @@ impl PlanRoot {
definition: String,
properties: WithOptions,
) -> Result<StreamSink> {
let mut stream_plan = self.gen_optimized_stream_plan(false)?;

// Add a project node if there is hidden column(s).
let input_fields = stream_plan.schema().fields();
if input_fields.len() != self.out_fields.count_ones(..) {
let exprs = input_fields
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if self.out_fields.contains(idx) {
Some(InputRef::new(idx, field.data_type.clone()).into())
} else {
None
}
})
.collect_vec();
stream_plan = StreamProject::new(generic::Project::new(exprs, stream_plan)).into();
}
let stream_plan = self.gen_optimized_stream_plan(false)?;

StreamSink::create(
stream_plan,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl fmt::Display for StreamSink {
.sink_desc
.columns
.iter()
.map(|col| col.column_desc.name.clone())
.map(|col| col.name_with_hidden())
.collect_vec()
.join(", ");
builder
Expand Down
Loading

0 comments on commit ea7f95b

Please sign in to comment.