Skip to content

Commit

Permalink
feat: support Merge-Into V1 (databendlabs#12350)
Browse files Browse the repository at this point in the history
* try to add merge grammer

* finish parser stage

* finish match_clause and unmatch_clause

* finish display for merge_into

* finish grammer parser, start bind stage

* remove useless codes

* add MergeIntoPlan

* fix distributed http error

* support insert values for match

* remove useless codes

* revert match_insert

* refactor merge_into_stmt, build table_reference

* cover match pattern

* cover match pattern

* stash

* try to add merge_into_source_scan

* add merge_source_scan

* bind join

* stash

* refactor merge_source

* bind clauses

* add new plan node

* add interpreter and refactor bind

* try to add processor

* add physical plan

* fix columns_set

* add update/insert expression

* remove unused codes and start to build processor and pipeline

* add split operator

* build source pipeline

* finish pipeline build, continue to work on not-matched and matched processor

* forbidden different schema for now

* refactor expr and finish event schedule

* add util split_by_expr

* finish not match insert

* fix

* refactor merge into pipeline

* add mutation logentries

* add matched mutation

* add setting

* set not support computed expr

* fix bug

* fix col_index bug

* fix pipeline bug and add basic tests

* fix test

* fix typos

* fix typos

* fix clippy

* add more tests

* add tests

* fix bugs

* use enable_experimental_merge_into adviced by BohuTang instead

* add info

* fix typo

* fix ut

* fix native failure

* remove streamingV2Source, need to support streaming in next pr

* rename vars adviced by b41sh

* Update src/common/exception/src/exception_code.rs

Co-authored-by: Andy Lok <[email protected]>

* remove useless comments

* fix

* unify codes, use bitmap to filter

* fix check

* unify codes

* fix

* check duplicate

* check duplicate

---------

Co-authored-by: Andy Lok <[email protected]>
Co-authored-by: dantengsky <[email protected]>
  • Loading branch information
3 people committed Nov 27, 2023
1 parent da4694d commit 21b7f93
Show file tree
Hide file tree
Showing 54 changed files with 2,931 additions and 43 deletions.
1 change: 1 addition & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ build_exceptions! {
BackgroundJobAlreadyExists(1501),
UnknownBackgroundJob(1502),

InvalidRowIdIndex(1503),
// Index related errors.
UnsupportedIndex(1601),
RefreshIndexError(1602),
Expand Down
37 changes: 37 additions & 0 deletions src/common/storage/src/common_metrics/merge_into.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use metrics::increment_gauge;

macro_rules! key {
($key: literal) => {
concat!("query_", $key)
};
}

pub fn metrics_inc_merge_into_replace_blocks_counter(c: u32) {
increment_gauge!(key!("merge_into_replace_blocks_counter"), c as f64);
}

pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) {
increment_gauge!(key!("merge_into_append_blocks_counter"), c as f64);
}

pub fn metrics_inc_merge_into_matched_rows(c: u32) {
increment_gauge!(key!("merge_into_matched_rows"), c as f64);
}

pub fn metrics_inc_merge_into_unmatched_rows(c: u32) {
increment_gauge!(key!("merge_into_unmatched_rows"), c as f64);
}
1 change: 1 addition & 0 deletions src/common/storage/src/common_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod copy;
pub mod merge_into;
mod storage_metrics;

pub use storage_metrics::StorageMetrics;
Expand Down
262 changes: 262 additions & 0 deletions src/query/ast/src/ast/statements/merge_into.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

use common_exception::ErrorCode;
use common_exception::Result;

use super::Hint;
use crate::ast::write_comma_separated_list;
use crate::ast::write_period_separated_list;
use crate::ast::Expr;
use crate::ast::Identifier;
use crate::ast::Query;
use crate::ast::TableAlias;
use crate::ast::TableReference;

#[derive(Debug, Clone, PartialEq)]
pub struct MergeUpdateExpr {
pub catalog: Option<Identifier>,
pub table: Option<Identifier>,
pub name: Identifier,
pub expr: Expr,
}

impl Display for MergeUpdateExpr {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
if self.catalog.is_some() {
write!(f, "{}.", self.catalog.clone().unwrap())?;
}

if self.table.is_some() {
write!(f, "{}.", self.table.clone().unwrap())?;
}

write!(f, "{} = {}", self.name, self.expr)
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum MatchOperation {
Update { update_list: Vec<MergeUpdateExpr> },
Delete,
}

#[derive(Debug, Clone, PartialEq)]
pub struct MatchedClause {
pub selection: Option<Expr>,
pub operation: MatchOperation,
}

#[derive(Debug, Clone, PartialEq)]
pub struct InsertOperation {
pub columns: Option<Vec<Identifier>>,
pub values: Vec<Expr>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct UnmatchedClause {
pub selection: Option<Expr>,
pub insert_operation: InsertOperation,
}

#[derive(Debug, Clone, PartialEq)]
pub enum MergeOption {
Match(MatchedClause),
Unmatch(UnmatchedClause),
}

#[derive(Debug, Clone, PartialEq)]
pub struct MergeIntoStmt {
pub hints: Option<Hint>,
pub catalog: Option<Identifier>,
pub database: Option<Identifier>,
pub table_ident: Identifier,
pub source: MergeSource,
// alias_target is belong to target
pub alias_target: Option<TableAlias>,
pub join_expr: Expr,
pub merge_options: Vec<MergeOption>,
}

impl Display for MergeIntoStmt {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "MERGE INTO ")?;
write_period_separated_list(
f,
self.catalog
.iter()
.chain(&self.database)
.chain(Some(&self.table_ident)),
)?;

write!(f, " USING {} ON {}", self.source, self.join_expr)?;

for clause in &self.merge_options {
match clause {
MergeOption::Match(match_clause) => {
write!(f, " WHEN MATCHED ")?;
if let Some(e) = &match_clause.selection {
write!(f, " AND {} ", e)?;
}
write!(f, " THEN ")?;

match &match_clause.operation {
MatchOperation::Update { update_list } => {
write!(f, " UPDATE SET ")?;
write_comma_separated_list(f, update_list)?;
}
MatchOperation::Delete => {
write!(f, " DELETE ")?;
}
}
}
MergeOption::Unmatch(unmatch_clause) => {
write!(f, " WHEN NOT MATCHED ")?;
if let Some(e) = &unmatch_clause.selection {
write!(f, " AND {} ", e)?;
}
write!(f, " THEN INSERT ")?;
if let Some(columns) = &unmatch_clause.insert_operation.columns {
if !columns.is_empty() {
write!(f, " (")?;
write_comma_separated_list(f, columns)?;
write!(f, ")")?;
}
}
write!(f, "VALUES")?;
for (i, value) in unmatch_clause.insert_operation.values.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "(")?;
write_comma_separated_list(f, vec![value])?;
write!(f, ")")?;
}
}
}
}
Ok(())
}
}

impl MergeIntoStmt {
pub fn split_clauses(&self) -> (Vec<MatchedClause>, Vec<UnmatchedClause>) {
let mut match_clauses = Vec::with_capacity(self.merge_options.len());
let mut unmatch_clauses = Vec::with_capacity(self.merge_options.len());
for merge_operation in &self.merge_options {
match merge_operation {
MergeOption::Match(match_clause) => match_clauses.push(match_clause.clone()),
MergeOption::Unmatch(unmatch_clause) => {
unmatch_clauses.push(unmatch_clause.clone())
}
}
}
(match_clauses, unmatch_clauses)
}

pub fn check_multi_match_clauses_semantic(clauses: &Vec<MatchedClause>) -> Result<()> {
// check match_clauses
if clauses.len() > 1 {
for (idx, clause) in clauses.iter().enumerate() {
if clause.selection.is_none() && idx < clauses.len() - 1 {
return Err(ErrorCode::SemanticError(
"when there are multi matched clauses, we must have a condition for every one except the last one".to_string(),
));
}
}
}
Ok(())
}

pub fn check_multi_unmatch_clauses_semantic(clauses: &Vec<UnmatchedClause>) -> Result<()> {
// check unmatch_clauses
if clauses.len() > 1 {
for (idx, clause) in clauses.iter().enumerate() {
if clause.selection.is_none() && idx < clauses.len() - 1 {
return Err(ErrorCode::SemanticError(
"when there are multi unmatched clauses, we must have a condition for every one except the last one".to_string(),
));
}
}
}
Ok(())
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum MergeSource {
StreamingV2 {
settings: BTreeMap<String, String>,
on_error_mode: Option<String>,
start: usize,
},

Select {
query: Box<Query>,
},
}

#[derive(Debug, Clone, PartialEq)]
pub struct StreamingSource {
settings: BTreeMap<String, String>,
on_error_mode: Option<String>,
start: usize,
}

impl MergeSource {
pub fn transform_table_reference(&self) -> TableReference {
match self {
Self::StreamingV2 {
settings: _,
on_error_mode: _,
start: _,
} => unimplemented!(),

Self::Select { query } => TableReference::Subquery {
span: None,
subquery: query.clone(),
alias: None,
},
}
}
}

impl Display for MergeSource {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
MergeSource::StreamingV2 {
settings,
on_error_mode,
start: _,
} => {
write!(f, " FILE_FORMAT = (")?;
for (k, v) in settings.iter() {
write!(f, " {} = '{}'", k, v)?;
}
write!(f, " )")?;
write!(
f,
" ON_ERROR = '{}'",
on_error_mode.as_ref().unwrap_or(&"Abort".to_string())
)
}

MergeSource::Select { query } => write!(f, "{query}"),
}
}
}
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod hint;
mod index;
mod insert;
mod kill;
mod merge_into;
mod network_policy;
mod presign;
mod replace;
Expand All @@ -48,6 +49,7 @@ pub use hint::*;
pub use index::*;
pub use insert::*;
pub use kill::*;
pub use merge_into::*;
pub use network_policy::*;
pub use presign::*;
pub use replace::*;
Expand Down
4 changes: 3 additions & 1 deletion src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_meta_app::principal::FileFormatOptionsAst;
use common_meta_app::principal::PrincipalIdentity;
use common_meta_app::principal::UserIdentity;

use super::merge_into::MergeIntoStmt;
use super::*;
use crate::ast::write_comma_separated_list;
use crate::ast::Expr;
Expand Down Expand Up @@ -76,7 +77,7 @@ pub enum Statement {

Insert(InsertStmt),
Replace(ReplaceStmt),

MergeInto(MergeIntoStmt),
Delete {
hints: Option<Hint>,
table_reference: TableReference,
Expand Down Expand Up @@ -298,6 +299,7 @@ impl Display for Statement {
Statement::Query(query) => write!(f, "{query}")?,
Statement::Insert(insert) => write!(f, "{insert}")?,
Statement::Replace(replace) => write!(f, "{replace}")?,
Statement::MergeInto(merge_into) => write!(f, "{merge_into}")?,
Statement::Delete {
table_reference,
selection,
Expand Down
Loading

0 comments on commit 21b7f93

Please sign in to comment.