Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): support union/union all #7160

Merged
merged 15 commits into from
Aug 19, 2022
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/pipeline/sinks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ common-exception = { path = "../../exception" }
common-pipeline-core = { path = "../core" }

async-trait = { git = "https://github.com/datafuse-extras/async-trait", rev = "f0b0fd5" }

async-channel = "1.7.1"
1 change: 1 addition & 0 deletions src/common/pipeline/sinks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@

#![feature(generic_associated_types)]
#![deny(unused_crate_dependencies)]
#![feature(type_alias_impl_trait)]

pub mod processors;
2 changes: 2 additions & 0 deletions src/common/pipeline/sinks/src/processors/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod empty_sink;
mod subquery_receive_sink;
mod sync_sink;
mod sync_sink_sender;
mod union_receive_sink;

pub use async_sink::AsyncSink;
pub use async_sink::AsyncSinker;
Expand All @@ -27,3 +28,4 @@ pub use subquery_receive_sink::SubqueryReceiveSink;
pub use sync_sink::Sink;
pub use sync_sink::Sinker;
pub use sync_sink_sender::SyncSenderSink;
pub use union_receive_sink::UnionReceiveSink;
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_channel::Sender;
use async_trait::async_trait;
use async_trait::unboxed_simple;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;

use crate::processors::sinks::AsyncSink;
use crate::processors::sinks::AsyncSinker;

pub struct UnionReceiveSink {
sender: Option<Sender<DataBlock>>,
}

impl UnionReceiveSink {
pub fn create(sender: Option<Sender<DataBlock>>, input: Arc<InputPort>) -> ProcessorPtr {
AsyncSinker::create(input, UnionReceiveSink { sender })
}
}

#[async_trait]
impl AsyncSink for UnionReceiveSink {
const NAME: &'static str = "UnionReceiveSink";

async fn on_finish(&mut self) -> Result<()> {
drop(self.sender.take());
Ok(())
}

#[unboxed_simple]
async fn consume(&mut self, data_block: DataBlock) -> Result<()> {
if let Some(sender) = self.sender.as_ref() {
if sender.send(data_block).await.is_err() {
return Err(ErrorCode::UnexpectedError("UnionReceiveSink sender failed"));
};
}
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod transform_rename;
mod transform_window_func;

pub mod group_by;
mod transform_merge_block;

pub use aggregator::AggregatorParams;
pub use aggregator::AggregatorTransformParams;
Expand Down Expand Up @@ -75,6 +76,7 @@ pub use transform_limit::TransformLimit;
pub use transform_limit_by::TransformLimitBy;
pub use transform_mark_join::MarkJoinCompactor;
pub use transform_mark_join::TransformMarkJoin;
pub use transform_merge_block::TransformMergeBlock;
pub use transform_project::TransformProject;
pub use transform_rename::TransformRename;
pub use transform_sort_merge::SortMergeCompactor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::sync::Arc;

use async_channel::Receiver;
use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_exception::Result;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_core::processors::Processor;

pub struct TransformMergeBlock {
finished: bool,
input: Arc<InputPort>,
output: Arc<OutputPort>,

input_data: Option<DataBlock>,
output_data: Option<DataBlock>,
schema: DataSchemaRef,

receiver: Receiver<DataBlock>,
receiver_result: Option<DataBlock>,
}

impl TransformMergeBlock {
pub fn try_create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
schema: DataSchemaRef,
receiver: Receiver<DataBlock>,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(Box::new(TransformMergeBlock {
finished: false,
input,
output,
input_data: None,
output_data: None,
schema,
receiver,
receiver_result: None,
})))
}
}

#[async_trait::async_trait]
impl Processor for TransformMergeBlock {
fn name(&self) -> &'static str {
"TransformMergeBlock"
}

fn as_any(&mut self) -> &mut dyn Any {
self
}

fn event(&mut self) -> Result<Event> {
if self.output.is_finished() {
self.input.finish();
return Ok(Event::Finished);
}

if !self.output.can_push() {
self.input.set_not_need_data();
return Ok(Event::NeedConsume);
}

if let Some(output_data) = self.output_data.take() {
self.output.push_data(Ok(output_data));
return Ok(Event::NeedConsume);
}

if self.input_data.is_some() || self.receiver_result.is_some() {
return Ok(Event::Sync);
}

xudong963 marked this conversation as resolved.
Show resolved Hide resolved
if let Ok(result) = self.receiver.try_recv() {
self.receiver_result = Some(result);
return Ok(Event::Sync);
}

if self.input.is_finished() {
if !self.finished {
return Ok(Event::Async);
}
self.output.finish();
xudong963 marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Event::Finished);
}

if self.input.has_data() {
self.input_data = Some(self.input.pull_data().unwrap()?);
return Ok(Event::Sync);
}

self.input.set_need_data();
Ok(Event::NeedData)
}

fn process(&mut self) -> Result<()> {
if let Some(input_data) = self.input_data.take() {
if let Some(receiver_result) = self.receiver_result.take() {
let data_block =
DataBlock::create(self.schema.clone(), receiver_result.columns().to_vec());
self.output_data = Some(DataBlock::concat_blocks(&[input_data, data_block])?);
} else {
self.output_data = Some(input_data);
}
} else if let Some(receiver_result) = self.receiver_result.take() {
let data_block =
DataBlock::create(self.schema.clone(), receiver_result.columns().to_vec());
self.output_data = Some(data_block);
}

Ok(())
}

async fn async_process(&mut self) -> Result<()> {
xudong963 marked this conversation as resolved.
Show resolved Hide resolved
if !self.finished {
if let Ok(result) = self.receiver.recv().await {
self.receiver_result = Some(result);
return Ok(());
}
self.finished = true;
}
Ok(())
}
}
18 changes: 18 additions & 0 deletions src/query/service/src/sql/executor/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,19 @@ impl ExchangeSink {
}
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct Union {
xudong963 marked this conversation as resolved.
Show resolved Hide resolved
pub left: Box<PhysicalPlan>,
pub right: Box<PhysicalPlan>,
pub schema: DataSchemaRef,
}

impl Union {
pub fn output_schema(&self) -> Result<DataSchemaRef> {
Ok(self.schema.clone())
}
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum PhysicalPlan {
TableScan(TableScan),
Expand All @@ -293,6 +306,7 @@ pub enum PhysicalPlan {
Limit(Limit),
HashJoin(HashJoin),
Exchange(Exchange),
Union(Union),

/// Synthesized by fragmenter
ExchangeSource(ExchangeSource),
Expand Down Expand Up @@ -322,6 +336,7 @@ impl PhysicalPlan {
PhysicalPlan::Exchange(plan) => plan.output_schema(),
PhysicalPlan::ExchangeSource(plan) => plan.output_schema(),
PhysicalPlan::ExchangeSink(plan) => plan.output_schema(),
PhysicalPlan::Union(plan) => plan.output_schema(),
}
}

Expand All @@ -341,6 +356,9 @@ impl PhysicalPlan {
PhysicalPlan::Exchange(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::ExchangeSource(_) => Box::new(std::iter::empty()),
PhysicalPlan::ExchangeSink(plan) => Box::new(std::iter::once(plan.input.as_ref())),
PhysicalPlan::Union(plan) => Box::new(
std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())),
),
}
}
}
11 changes: 10 additions & 1 deletion src/query/service/src/sql/executor/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::sql::executor::ExpressionBuilderWithoutRenaming;
use crate::sql::executor::PhysicalPlan;
use crate::sql::executor::PhysicalScalar;
use crate::sql::executor::SortDesc;
use crate::sql::executor::Union;
use crate::sql::optimizer::SExpr;
use crate::sql::plans::AggregateMode;
use crate::sql::plans::Exchange;
Expand Down Expand Up @@ -323,7 +324,15 @@ impl PhysicalPlanBuilder {
keys,
}))
}

RelOperator::Union(_) => {
let left = self.build(s_expr.child(0)?).await?;
let schema = left.output_schema()?;
Ok(PhysicalPlan::Union(Union {
left: Box::new(left),
right: Box::new(self.build(s_expr.child(1)?).await?),
schema,
}))
}
_ => Err(ErrorCode::LogicalError(format!(
"Unsupported physical plan: {:?}",
s_expr.plan()
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sql/executor/physical_plan_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::sql::executor::PhysicalScalar;
use crate::sql::executor::Project;
use crate::sql::executor::Sort;
use crate::sql::executor::TableScan;
use crate::sql::executor::Union;
use crate::sql::plans::JoinType;

impl PhysicalPlan {
Expand Down Expand Up @@ -62,6 +63,7 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> {
PhysicalPlan::Exchange(exchange) => write!(f, "{}", exchange)?,
PhysicalPlan::ExchangeSource(source) => write!(f, "{}", source)?,
PhysicalPlan::ExchangeSink(sink) => write!(f, "{}", sink)?,
PhysicalPlan::Union(union) => write!(f, "{}", union)?,
}

for node in self.node.children() {
Expand Down Expand Up @@ -305,3 +307,9 @@ impl Display for ExchangeSink {
)
}
}

impl Display for Union {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Union")
}
}
16 changes: 16 additions & 0 deletions src/query/service/src/sql/executor/physical_plan_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use super::PhysicalPlan;
use super::Project;
use super::Sort;
use super::TableScan;
use crate::sql::executor::Union;

pub trait PhysicalPlanReplacer {
fn replace(&mut self, plan: &PhysicalPlan) -> Result<PhysicalPlan> {
Expand All @@ -43,6 +44,7 @@ pub trait PhysicalPlanReplacer {
PhysicalPlan::Exchange(plan) => self.replace_exchange(plan),
PhysicalPlan::ExchangeSource(plan) => self.replace_exchange_source(plan),
PhysicalPlan::ExchangeSink(plan) => self.replace_exchange_sink(plan),
PhysicalPlan::Union(plan) => self.replace_union(plan),
}
}

Expand Down Expand Up @@ -160,6 +162,16 @@ pub trait PhysicalPlanReplacer {
query_id: plan.query_id.clone(),
}))
}

fn replace_union(&mut self, plan: &Union) -> Result<PhysicalPlan> {
let left = self.replace(&plan.left)?;
let right = self.replace(&plan.right)?;
Ok(PhysicalPlan::Union(Union {
left: Box::new(left),
right: Box::new(right),
schema: plan.schema.clone(),
}))
}
}

impl PhysicalPlan {
Expand Down Expand Up @@ -205,6 +217,10 @@ impl PhysicalPlan {
PhysicalPlan::ExchangeSink(plan) => {
Self::traverse(&plan.input, pre_visit, visit, post_visit);
}
PhysicalPlan::Union(plan) => {
Self::traverse(&plan.left, pre_visit, visit, post_visit);
Self::traverse(&plan.right, pre_visit, visit, post_visit);
}
}
post_visit(plan);
}
Expand Down
Loading