diff --git a/Cargo.lock b/Cargo.lock index 29704372018b..7b737905655a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3307,12 +3307,17 @@ dependencies = [ "databend-common-ast", "geos", "geozero 0.14.0", + "gimli 0.31.1", "http 1.1.0", + "libc", + "object", + "once_cell", "opendal", "parquet", "paste", "prost", "reqwest", + "rustc-demangle", "serde", "serde_json", "sqlx", @@ -6476,6 +6481,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -7010,6 +7021,17 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +dependencies = [ + "fallible-iterator", + "indexmap 2.6.0", + "stable_deref_trait", +] + [[package]] name = "gix" version = "0.63.0" @@ -7992,6 +8014,9 @@ name = "hashbrown" version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "foldhash", +] [[package]] name = "hashlink" @@ -10410,14 +10435,16 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "object" -version = "0.36.3" +version = "0.36.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" dependencies = [ "crc32fast", - "hashbrown 0.14.5", + "flate2", + "hashbrown 0.15.0", "indexmap 2.6.0", "memchr", + "ruzstd", ] [[package]] @@ -12947,6 +12974,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ruzstd" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99c3938e133aac070997ddc684d4b393777d293ba170f2988c8fd5ea2ad4ce21" +dependencies = [ + "twox-hash", +] + [[package]] name = "ryu" version = "1.0.18" diff --git a/Cargo.toml b/Cargo.toml index 61c6b9e3b9c3..7d68a2405e59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -299,6 +299,7 @@ geo-types = "0.7.13" geohash = "0.13.0" geos = { version = "9.0.0", features = ["static", "geo", "geo-types"] } geozero = { version = "0.14.0", features = ["default", "with-wkb", "with-geos", "with-geojson"] } +gimli = "0.31.0" globiter = "0.1" goldenfile = "1.4" h3o = "0.4.0" @@ -354,6 +355,7 @@ num-bigint = "0.4.6" num-derive = "0.3.3" num-traits = "0.2.19" num_cpus = "1.13.1" +object = "0.36.5" object_store_opendal = "0.48.1" once_cell = "1.15.0" openai_api_rust = "0.1" @@ -514,6 +516,7 @@ nom-rule = "0.4" pratt = "0.4.0" pretty = "0.11.3" rspack-codespan-reporting = "0.11" +rustc-demangle = "0.1" strsim = "0.10" strum_macros = "0.24" vergen = { version = "8.3.1", default-features = false, features = ["build", "cargo", "git", "gix", "rustc"] } @@ -597,6 +600,7 @@ gimli = { opt-level = 3 } miniz_oxide = { opt-level = 3 } object = { opt-level = 3 } rustc-demangle = { opt-level = 3 } +databend-common-exception = { opt-level = 3 } [profile.test] opt-level = 0 diff --git a/src/common/arrow/src/arrow/ffi/schema.rs b/src/common/arrow/src/arrow/ffi/schema.rs index b97923c2e414..7c47a49f205c 100644 --- a/src/common/arrow/src/arrow/ffi/schema.rs +++ b/src/common/arrow/src/arrow/ffi/schema.rs @@ -528,6 +528,7 @@ unsafe fn read_bytes(ptr: *const u8, len: usize) -> &'static str { } unsafe fn metadata_from_bytes(data: *const ::std::os::raw::c_char) -> (Metadata, Extension) { + #[allow(clippy::unnecessary_cast)] let mut data = data as *const u8; // u8 = i8 if data.is_null() { return (Metadata::default(), None); diff --git a/src/common/base/src/mem_allocator/mmap.rs b/src/common/base/src/mem_allocator/mmap.rs index 759c3679fb8b..8224c832ea84 100644 --- a/src/common/base/src/mem_allocator/mmap.rs +++ b/src/common/base/src/mem_allocator/mmap.rs @@ -286,6 +286,7 @@ pub mod linux { } // fallback to (5.13.0) let fallback_version = 5u32 << 16 | 13u32 << 8; + #[allow(clippy::unnecessary_cast)] let slice = unsafe { &*(&uname.release[..length] as *const _ as *const [u8]) }; let result = match std::str::from_utf8(slice) { Ok(ver) => match semver::Version::parse(ver) { diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index 48ff7f20f330..74eb2c38e5ae 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -17,16 +17,21 @@ databend-common-ast = { workspace = true } anyhow = { workspace = true } arrow-flight = { workspace = true } arrow-schema = { workspace = true } -backtrace = { workspace = true } +backtrace = { workspace = true, features = ["std", "serialize-serde"] } bincode = { workspace = true } geos = { workspace = true } geozero = { workspace = true } +gimli = { workspace = true } http = { workspace = true } +libc = { workspace = true } +object = { workspace = true } +once_cell = { workspace = true } opendal = { workspace = true } parquet = { workspace = true } paste = { workspace = true } prost = { workspace = true } reqwest = { workspace = true } +rustc-demangle = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sqlx = { workspace = true } diff --git a/src/common/exception/src/elf/dwarf.rs b/src/common/exception/src/elf/dwarf.rs new file mode 100644 index 000000000000..1dafec886c9c --- /dev/null +++ b/src/common/exception/src/elf/dwarf.rs @@ -0,0 +1,205 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use gimli::DebugAbbrev; +use gimli::DebugAddr; +use gimli::DebugAranges; +use gimli::DebugInfo; +use gimli::DebugInfoOffset; +use gimli::DebugLine; +use gimli::DebugLineStr; +use gimli::DebugRanges; +use gimli::DebugRngLists; +use gimli::DebugStr; +use gimli::DebugStrOffsets; +use gimli::EndianSlice; +use gimli::NativeEndian; +use gimli::RangeLists; +use gimli::Reader; +use gimli::UnitHeader; +use gimli::UnitType; +use object::CompressionFormat; +use object::Object; +use object::ObjectSection; + +use crate::elf::dwarf_unit::Unit; +use crate::elf::dwarf_unit::UnitAttrs; +use crate::elf::ElfFile; + +#[derive(Debug)] +pub struct CallLocation { + pub symbol: Option, + pub file: Option, + pub line: Option, + pub column: Option, + pub is_inlined: bool, +} + +pub struct Dwarf { + #[allow(unused)] + elf: Arc, + debug_str: DebugStr>, + debug_info: DebugInfo>, + debug_line: DebugLine>, + debug_line_str: DebugLineStr>, + debug_str_offsets: DebugStrOffsets>, + debug_aranges: DebugAranges>, + debug_abbrev: DebugAbbrev>, + debug_addr: DebugAddr>, + debug_range_list: RangeLists>, +} + +static EMPTY_BYTES: &[u8] = &[]; + +impl Dwarf { + pub fn create(elf: Arc) -> Option { + fn get_debug_section(elf: &ElfFile, name: &str) -> EndianSlice<'static, NativeEndian> { + let Some(section) = elf.section_by_name(name) else { + return EndianSlice::new(EMPTY_BYTES, NativeEndian); + }; + + // Unsupported compress debug info + let Ok(compressed) = section.compressed_file_range() else { + return EndianSlice::new(EMPTY_BYTES, NativeEndian); + }; + + #[allow(clippy::missing_transmute_annotations)] + unsafe { + match compressed.format != CompressionFormat::None { + true => EndianSlice::new(EMPTY_BYTES, NativeEndian), + false => match section.data() { + Err(_) => EndianSlice::new(EMPTY_BYTES, NativeEndian), + Ok(data) => EndianSlice::new(std::mem::transmute(data), NativeEndian), + }, + } + } + } + + for name in [".debug_info", ".debug_abbrev", ".debug_line"] { + if get_debug_section(&elf, name).is_empty() { + return None; + } + } + + Some(Dwarf { + debug_str: DebugStr::from(get_debug_section(&elf, ".debug_str")), + debug_info: DebugInfo::from(get_debug_section(&elf, ".debug_info")), + debug_line: DebugLine::from(get_debug_section(&elf, ".debug_line")), + debug_line_str: DebugLineStr::from(get_debug_section(&elf, ".debug_line_str")), + debug_str_offsets: DebugStrOffsets::from(get_debug_section(&elf, ".debug_str_offsets")), + debug_aranges: DebugAranges::from(get_debug_section(&elf, ".debug_aranges")), + debug_abbrev: DebugAbbrev::from(get_debug_section(&elf, ".debug_abbrev")), + debug_range_list: RangeLists::new( + DebugRanges::from(get_debug_section(&elf, ".debug_ranges")), + DebugRngLists::from(get_debug_section(&elf, ".debug_rnglists")), + ), + debug_addr: DebugAddr::from(get_debug_section(&elf, ".debug_addr")), + elf, + }) + } + + fn find_debug_info_offset(&self, probe: u64) -> Option> { + let mut heads = self.debug_aranges.headers(); + while let Some(head) = heads.next().ok()? { + let mut entries = head.entries(); + while let Some(entry) = entries.next().ok()? { + if probe >= entry.address() && probe <= entry.address() + entry.length() { + return Some(head.debug_info_offset()); + } + } + } + + None + } + + fn get_unit( + &self, + head: UnitHeader>, + ) -> gimli::Result>>> { + let abbrev_offset = head.debug_abbrev_offset(); + let Ok(abbreviations) = self.debug_abbrev.abbreviations(abbrev_offset) else { + return Ok(None); + }; + + let mut cursor = head.entries(&abbreviations); + let (_idx, root) = cursor.next_dfs()?.unwrap(); + + let mut attrs = root.attrs(); + let mut unit_attrs = UnitAttrs::create(); + + while let Some(attr) = attrs.next()? { + unit_attrs.set_attr(&self.debug_str, attr); + } + + Ok(Some(Unit { + head, + abbreviations, + attrs: unit_attrs, + debug_str: self.debug_str, + debug_info: self.debug_info, + debug_abbrev: self.debug_abbrev, + debug_line: self.debug_line, + debug_line_str: self.debug_line_str, + debug_str_offsets: self.debug_str_offsets, + debug_addr: self.debug_addr, + range_list: self.debug_range_list, + })) + } + + fn fast_find_frames(&self, probe: u64) -> gimli::Result>> { + if let Some(debug_info_offset) = self.find_debug_info_offset(probe) { + let head = self.debug_info.header_from_offset(debug_info_offset)?; + + let type_ = head.type_(); + if matches!(type_, UnitType::Compilation | UnitType::Skeleton(_)) { + if let Some(unit) = self.get_unit(head)? { + return Ok(Some(unit.find_frames(probe)?)); + } + } + } + + Ok(None) + } + + fn slow_find_frames(&self, probe: u64) -> gimli::Result> { + let mut units = self.debug_info.units(); + while let Some(head) = units.next()? { + if matches!(head.type_(), UnitType::Compilation | UnitType::Skeleton(_)) { + if let Some(unit) = self.get_unit(head)? { + if unit.match_pc(probe) { + return unit.find_frames(probe); + } + } + } + } + + Ok(vec![]) + } + + pub fn find_frames(&self, probe: u64) -> gimli::Result> { + match self.fast_find_frames(probe)? { + Some(location) => Ok(location), + None => self.slow_find_frames(probe), + } + } +} + +// #[cfg(target_os = "linux")] +#[derive(Copy, Clone, Debug)] +pub enum HighPc { + Addr(u64), + Offset(u64), +} diff --git a/src/common/exception/src/elf/dwarf_inline_functions.rs b/src/common/exception/src/elf/dwarf_inline_functions.rs new file mode 100644 index 000000000000..2a95b0172365 --- /dev/null +++ b/src/common/exception/src/elf/dwarf_inline_functions.rs @@ -0,0 +1,374 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use gimli::Attribute; +use gimli::AttributeValue; +use gimli::EntriesRaw; +use gimli::RangeListsOffset; +use gimli::Reader; +use gimli::Result; +use gimli::UnitOffset; + +use crate::elf::dwarf::CallLocation; +use crate::elf::dwarf::HighPc; +use crate::elf::dwarf_unit::Unit; +use crate::elf::dwarf_unit::UnitAttrs; + +pub struct SubroutineAttrs { + high_pc: Option, + low_pc: Option, + ranges_offset: Option>, + + name: Option, + line: Option, + file: Option, + column: Option, +} + +impl SubroutineAttrs { + pub fn create() -> SubroutineAttrs { + SubroutineAttrs { + line: None, + file: None, + name: None, + column: None, + low_pc: None, + high_pc: None, + ranges_offset: None, + } + } + + pub fn set_attr(&mut self, attr: Attribute, unit: &Unit) { + match attr.name() { + gimli::DW_AT_low_pc => match attr.value() { + AttributeValue::DebugAddrIndex(idx) => self.low_pc = Some(unit.get_address(idx)), + AttributeValue::Addr(value) => self.low_pc = Some(value), + _ => {} + }, + gimli::DW_AT_high_pc => match attr.value() { + AttributeValue::Addr(val) => self.high_pc = Some(HighPc::Addr(val)), + AttributeValue::Udata(val) => self.high_pc = Some(HighPc::Offset(val)), + AttributeValue::DebugAddrIndex(idx) => { + self.high_pc = Some(HighPc::Addr(unit.get_address(idx))) + } + _ => {} + }, + gimli::DW_AT_ranges => { + if let AttributeValue::RangeListsRef(v) = attr.value() { + self.ranges_offset = Some(RangeListsOffset(v.0)); + } + } + gimli::DW_AT_linkage_name | gimli::DW_AT_MIPS_linkage_name => { + if let Some(val) = unit.attr_str(attr.value()) { + self.name = Some(val); + } + } + gimli::DW_AT_name => { + if self.name.is_none() { + self.name = unit.attr_str(attr.value()); + } + } + gimli::DW_AT_abstract_origin | gimli::DW_AT_specification => { + if self.name.is_none() { + if let Ok(Some(v)) = unit.name_attr(attr.value(), 16) { + self.name = Some(v); + } + } + } + gimli::DW_AT_call_file => { + if let AttributeValue::FileIndex(idx) = attr.value() { + if let Ok(filename) = unit.find_file(idx) { + self.file = filename; + } + } + } + gimli::DW_AT_call_line => { + self.line = attr.udata_value().map(|x| x as u32); + } + gimli::DW_AT_call_column => { + self.column = attr.udata_value().map(|x| x as u32); + } + _ => {} + } + } + + pub fn match_pc(&self, probe: u64) -> bool { + match (self.low_pc, self.high_pc) { + (Some(low), Some(high)) => { + probe >= low + && match high { + HighPc::Addr(high) => probe < high, + HighPc::Offset(size) => probe < low + size, + } + } + _ => false, + } + } +} + +impl Unit { + pub(crate) fn attr_str(&self, value: AttributeValue) -> Option { + match value { + AttributeValue::String(string) => Some(string), + AttributeValue::DebugStrRef(offset) => self.debug_str.get_str(offset).ok(), + AttributeValue::DebugLineStrRef(offset) => self.debug_line_str.get_str(offset).ok(), + AttributeValue::DebugStrOffsetsIndex(index) => { + let offset = self + .debug_str_offsets + .get_str_offset(self.head.format(), self.attrs.str_offsets_base, index) + .ok()?; + self.debug_str.get_str(offset).ok() + } + _ => None, + } + } + + fn name_entry(&self, offset: UnitOffset, recursion: usize) -> Result> { + let mut entries = self.head.entries_raw(&self.abbreviations, Some(offset))?; + let abbrev = if let Some(abbrev) = entries.read_abbreviation()? { + abbrev + } else { + return Err(gimli::Error::NoEntryAtGivenOffset); + }; + + let mut name = None; + let mut next = None; + for spec in abbrev.attributes() { + let attr = entries.read_attribute(*spec)?; + match attr.name() { + gimli::DW_AT_linkage_name | gimli::DW_AT_MIPS_linkage_name => { + if let Some(val) = self.attr_str(attr.value()) { + return Ok(Some(val)); + } + } + gimli::DW_AT_name => name = self.attr_str(attr.value()), + gimli::DW_AT_abstract_origin | gimli::DW_AT_specification => { + next = Some(attr.value()) + } + _ => {} + }; + } + + if name.is_some() { + return Ok(name); + } + + if let Some(next) = next { + return self.name_attr(next, recursion - 1); + } + + Ok(None) + } + + pub(crate) fn name_attr(&self, v: AttributeValue, recursion: usize) -> Result> { + if recursion == 0 { + return Ok(None); + } + + match v { + AttributeValue::UnitRef(offset) => self.name_entry(offset, recursion), + AttributeValue::DebugInfoRef(dr) => { + let mut head = None; + let mut units = self.debug_info.units(); + + while let Some(unit_head) = units + .next() + .map_err(|_| gimli::Error::NoEntryAtGivenOffset)? + { + if unit_head.offset().as_debug_info_offset().unwrap() > dr { + break; + } + + head = Some(unit_head); + } + + if let Some(head) = head { + let unit_offset = dr + .to_unit_offset(&head) + .ok_or(gimli::Error::NoEntryAtGivenOffset)?; + + let abbrev_offset = head.debug_abbrev_offset(); + let Ok(abbreviations) = self.debug_abbrev.abbreviations(abbrev_offset) else { + return Ok(None); + }; + + let mut cursor = head.entries(&abbreviations); + let (_idx, root) = cursor.next_dfs()?.unwrap(); + + let mut attrs = root.attrs(); + let mut unit_attrs = UnitAttrs::create(); + + while let Some(attr) = attrs.next()? { + unit_attrs.set_attr(&self.debug_str, attr); + } + + let unit = Unit { + head, + abbreviations, + attrs: unit_attrs, + debug_str: self.debug_str.clone(), + debug_info: self.debug_info.clone(), + debug_abbrev: self.debug_abbrev.clone(), + debug_line: self.debug_line.clone(), + debug_line_str: self.debug_line_str.clone(), + debug_str_offsets: self.debug_str_offsets.clone(), + debug_addr: self.debug_addr.clone(), + range_list: self.range_list.clone(), + }; + + return unit.name_entry(unit_offset, recursion); + } + + Ok(None) + } + _ => Ok(None), + } + } + + fn inlined_functions( + &self, + mut entries: EntriesRaw, + probe: u64, + depth: isize, + inlined_functions: &mut Vec, + ) -> Result<()> { + loop { + let next_depth = entries.next_depth(); + + if next_depth <= depth { + return Ok(()); + } + + if let Some(abbrev) = entries.read_abbreviation()? { + match abbrev.tag() { + gimli::DW_TAG_subprogram => { + entries.skip_attributes(abbrev.attributes())?; + while entries.next_depth() > next_depth { + if let Some(abbrev) = entries.read_abbreviation()? { + entries.skip_attributes(abbrev.attributes())?; + } + } + } + gimli::DW_TAG_inlined_subroutine => { + let mut attrs = SubroutineAttrs::create(); + for spec in abbrev.attributes() { + let attr = entries.read_attribute(*spec)?; + attrs.set_attr(attr, self); + } + + let match_range = match attrs.ranges_offset { + None => false, + Some(range_offset) => self.match_range(probe, range_offset), + }; + + if !match_range && !attrs.match_pc(probe) { + continue; + } + + let name = match attrs.name { + None => None, + Some(name) => match name.to_string_lossy() { + Err(_) => None, + Ok(name) => { + Some(format!("{:#}", rustc_demangle::demangle(name.as_ref()))) + } + }, + }; + + inlined_functions.push(CallLocation { + symbol: name, + file: attrs.file, + line: attrs.line, + column: attrs.column, + is_inlined: true, + }); + + self.inlined_functions(entries, probe, next_depth, inlined_functions)?; + + return Ok(()); + } + _ => { + entries.skip_attributes(abbrev.attributes())?; + } + } + } + } + } + + pub fn find_function( + &self, + offset: UnitOffset, + probe: u64, + functions: &mut Vec, + ) -> Result<()> { + let mut entries = self.head.entries_raw(&self.abbreviations, Some(offset))?; + let depth = entries.next_depth(); + let abbrev = entries.read_abbreviation()?.unwrap(); + debug_assert_eq!(abbrev.tag(), gimli::DW_TAG_subprogram); + + let mut name = None; + for spec in abbrev.attributes() { + let attr = entries.read_attribute(*spec)?; + match attr.name() { + gimli::DW_AT_linkage_name | gimli::DW_AT_MIPS_linkage_name => { + if let Some(val) = self.attr_str(attr.value()) { + name = Some(val); + } + } + gimli::DW_AT_name => { + if name.is_none() { + name = self.attr_str(attr.value()); + } + } + gimli::DW_AT_abstract_origin | gimli::DW_AT_specification => { + if name.is_none() { + name = self.name_attr(attr.value(), 16)?; + } + } + _ => {} + }; + } + + self.inlined_functions(entries, probe, depth, functions)?; + + let symbol = match name { + None => None, + Some(name) => match name.to_string_lossy() { + Err(_) => None, + Ok(name) => Some(format!("{:#}", rustc_demangle::demangle(name.as_ref()))), + }, + }; + + let (mut file, mut line, mut column) = self.find_location(probe)?; + + functions.reverse(); + + #[allow(clippy::needless_range_loop)] + for index in 0..functions.len() { + std::mem::swap(&mut functions[index].file, &mut file); + std::mem::swap(&mut functions[index].line, &mut line); + std::mem::swap(&mut functions[index].column, &mut column); + } + + functions.push(CallLocation { + symbol, + file, + line, + column, + is_inlined: false, + }); + + Ok(()) + } +} diff --git a/src/common/exception/src/elf/dwarf_subprogram.rs b/src/common/exception/src/elf/dwarf_subprogram.rs new file mode 100644 index 000000000000..e83bc8b9d7b5 --- /dev/null +++ b/src/common/exception/src/elf/dwarf_subprogram.rs @@ -0,0 +1,103 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use gimli::Attribute; +use gimli::AttributeValue; +use gimli::RangeListsOffset; +use gimli::Reader; +use gimli::Result; +use gimli::UnitOffset; + +use crate::elf::dwarf::HighPc; +use crate::elf::dwarf_unit::Unit; + +pub struct SubprogramAttrs { + high_pc: Option, + low_pc: Option, + ranges_offset: Option>, +} + +impl SubprogramAttrs { + pub fn create() -> SubprogramAttrs { + SubprogramAttrs { + high_pc: None, + low_pc: None, + ranges_offset: None, + } + } + + pub fn set_attr(&mut self, attr: Attribute) { + match (attr.name(), attr.value()) { + (gimli::DW_AT_high_pc, AttributeValue::Addr(addr)) => { + self.high_pc = Some(HighPc::Addr(addr)); + } + (gimli::DW_AT_high_pc, AttributeValue::Udata(offset)) => { + self.high_pc = Some(HighPc::Offset(offset)); + } + (gimli::DW_AT_low_pc, AttributeValue::Addr(v)) => { + self.low_pc = Some(v); + } + (gimli::DW_AT_ranges, AttributeValue::RangeListsRef(v)) => { + self.ranges_offset = Some(RangeListsOffset(v.0)); + } + _ => {} + } + } + + pub fn match_pc(&self, probe: u64) -> bool { + match (self.low_pc, self.high_pc) { + (Some(low), Some(high)) => { + probe >= low + && match high { + HighPc::Addr(high) => probe < high, + HighPc::Offset(size) => probe < low + size, + } + } + _ => false, + } + } +} + +impl Unit { + pub fn find_subprogram(&self, probe: u64) -> Result>> { + let mut entries = self.head.entries_raw(&self.abbreviations, None)?; + + while !entries.is_empty() { + let dw_die_offset = entries.next_offset(); + if let Some(abbrev) = entries.read_abbreviation()? { + if abbrev.tag() == gimli::DW_TAG_subprogram { + let mut attrs = SubprogramAttrs::create(); + + for spec in abbrev.attributes() { + let attr = entries.read_attribute(*spec)?; + attrs.set_attr(attr); + } + + let range_match = match attrs.ranges_offset { + None => false, + Some(range_offset) => self.match_range(probe, range_offset), + }; + + if range_match || attrs.match_pc(probe) { + return Ok(Some(dw_die_offset)); + } + } else { + entries.skip_attributes(abbrev.attributes())?; + } + } + } + + Ok(None) + } +} diff --git a/src/common/exception/src/elf/dwarf_unit.rs b/src/common/exception/src/elf/dwarf_unit.rs new file mode 100644 index 000000000000..ac4fae8d05cd --- /dev/null +++ b/src/common/exception/src/elf/dwarf_unit.rs @@ -0,0 +1,302 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::num::NonZeroU64; +use std::path::PathBuf; + +use gimli::Abbreviations; +use gimli::Attribute; +use gimli::AttributeValue; +use gimli::DebugAbbrev; +use gimli::DebugAddr; +use gimli::DebugAddrBase; +use gimli::DebugAddrIndex; +use gimli::DebugInfo; +use gimli::DebugLine; +use gimli::DebugLineOffset; +use gimli::DebugLineStr; +use gimli::DebugLocListsBase; +use gimli::DebugRngListsBase; +use gimli::DebugStr; +use gimli::DebugStrOffsets; +use gimli::DebugStrOffsetsBase; +use gimli::RangeLists; +use gimli::RangeListsOffset; +use gimli::Reader; +use gimli::ReaderOffset; +use gimli::UnitHeader; + +use crate::elf::dwarf::CallLocation; +use crate::elf::dwarf::HighPc; + +// Unit first die attrs +pub struct UnitAttrs { + pub high_pc: Option, + pub low_pc: Option, + pub base_addr: Option, + pub name: Option, + pub comp_dir: Option, + + pub ranges_offset: Option>, + + pub addr_base: DebugAddrBase, + loclists_base: DebugLocListsBase, + rnglists_base: DebugRngListsBase, + pub str_offsets_base: DebugStrOffsetsBase, + debug_line_offset: Option>, +} + +impl UnitAttrs { + pub fn create() -> UnitAttrs { + UnitAttrs { + high_pc: None, + low_pc: None, + base_addr: None, + name: None, + comp_dir: None, + ranges_offset: None, + + debug_line_offset: None, + addr_base: DebugAddrBase(R::Offset::from_u8(0)), + loclists_base: DebugLocListsBase(R::Offset::from_u8(0)), + rnglists_base: DebugRngListsBase(R::Offset::from_u8(0)), + str_offsets_base: DebugStrOffsetsBase(R::Offset::from_u8(0)), + } + } + + pub fn set_attr(&mut self, debug_str: &DebugStr, attr: Attribute) { + match (attr.name(), attr.value()) { + (gimli::DW_AT_high_pc, v) => { + self.high_pc = match v { + AttributeValue::Addr(v) => Some(HighPc::Addr(v)), + AttributeValue::Udata(v) => Some(HighPc::Offset(v)), + _ => unreachable!(), + }; + } + (gimli::DW_AT_low_pc, AttributeValue::Addr(v)) => { + self.low_pc = Some(v); + self.base_addr = Some(v); + } + (gimli::DW_AT_name, v) => { + self.name = v.string_value(debug_str); + } + (gimli::DW_AT_entry_pc, AttributeValue::Addr(v)) => { + self.base_addr = Some(v); + } + (gimli::DW_AT_comp_dir, v) => { + self.comp_dir = v.string_value(debug_str); + } + (gimli::DW_AT_ranges, AttributeValue::RangeListsRef(v)) => { + self.ranges_offset = Some(RangeListsOffset(v.0)); + } + (gimli::DW_AT_stmt_list, AttributeValue::DebugLineRef(v)) => { + self.debug_line_offset = Some(v); + } + (gimli::DW_AT_addr_base, AttributeValue::DebugAddrBase(base)) => { + self.addr_base = base; + } + (gimli::DW_AT_GNU_addr_base, AttributeValue::DebugAddrBase(base)) => { + self.addr_base = base; + } + (gimli::DW_AT_loclists_base, AttributeValue::DebugLocListsBase(base)) => { + self.loclists_base = base; + } + (gimli::DW_AT_rnglists_base, AttributeValue::DebugRngListsBase(base)) => { + self.rnglists_base = base; + } + (gimli::DW_AT_GNU_ranges_base, AttributeValue::DebugRngListsBase(base)) => { + self.rnglists_base = base; + } + (gimli::DW_AT_str_offsets_base, AttributeValue::DebugStrOffsetsBase(base)) => { + self.str_offsets_base = base; + } + _ => {} + } + } +} + +pub struct Unit { + pub(crate) head: UnitHeader, + + pub(crate) debug_str: DebugStr, + pub(crate) debug_info: DebugInfo, + pub(crate) debug_abbrev: DebugAbbrev, + pub(crate) debug_line: DebugLine, + pub(crate) debug_line_str: DebugLineStr, + pub(crate) debug_addr: DebugAddr, + pub(crate) range_list: RangeLists, + pub(crate) debug_str_offsets: DebugStrOffsets, + + pub(crate) abbreviations: Abbreviations, + pub(crate) attrs: UnitAttrs, +} + +impl Unit { + pub fn match_range(&self, probe: u64, offset: RangeListsOffset) -> bool { + if let Ok(mut ranges) = self.range_list.ranges( + offset, + self.head.encoding(), + self.attrs.base_addr.unwrap_or(0), + &self.debug_addr, + self.attrs.addr_base, + ) { + while let Ok(Some(range)) = ranges.next() { + if probe >= range.begin && probe < range.end { + return true; + } + } + } + + false + } + + pub fn find_file(&self, file_idx: u64) -> gimli::Result> { + if let Some(offset) = &self.attrs.debug_line_offset { + let program = self.debug_line.program( + *offset, + self.head.address_size(), + self.attrs.comp_dir.clone(), + self.attrs.name.clone(), + )?; + + let header = program.header(); + + if let Some(file) = header.file(file_idx) { + let mut path_buf = PathBuf::new(); + + if let Some(dir) = &self.attrs.comp_dir { + path_buf.push(dir.to_string_lossy().unwrap().into_owned()); + } + + if file.directory_index() != 0 { + if let Some(v) = file.directory(header) { + if let Some(v) = v.string_value(&self.debug_str) { + path_buf.push(v.to_string_lossy().unwrap().into_owned()); + } + } + } + + if let Some(v) = file.path_name().string_value(&self.debug_str) { + path_buf.push(v.to_string_lossy().unwrap().into_owned()); + } + + return Ok(Some(format!("{}", path_buf.display()))); + } + } + + Ok(None) + } + + pub fn find_location( + &self, + probe: u64, + ) -> gimli::Result<(Option, Option, Option)> { + if let Some(offset) = &self.attrs.debug_line_offset { + let program = self.debug_line.program( + *offset, + self.head.address_size(), + self.attrs.comp_dir.clone(), + self.attrs.name.clone(), + )?; + + let mut is_candidate: bool = false; + let mut file_idx = 0; + let mut line = 0; + let mut column = 0; + + let mut rows = program.rows(); + while let Some((_, row)) = rows.next_row()? { + if !is_candidate && !row.end_sequence() && row.address() <= probe { + is_candidate = true; + } + + if is_candidate { + if row.address() > probe { + let mut path_buf = PathBuf::new(); + + if let Some(dir) = &self.attrs.comp_dir { + path_buf.push(dir.to_string_lossy().unwrap().into_owned()); + } + + let header = rows.header(); + if let Some(file) = header.file(file_idx) { + if file.directory_index() != 0 { + if let Some(v) = file.directory(header) { + if let Some(v) = v.string_value(&self.debug_str) { + path_buf.push(v.to_string_lossy().unwrap().into_owned()); + } + } + } + + if let Some(v) = file.path_name().string_value(&self.debug_str) { + path_buf.push(v.to_string_lossy().unwrap().into_owned()); + } + } + + return Ok(( + Some(format!("{}", path_buf.display())), + Some(line), + Some(column), + )); + } + + file_idx = row.file_index(); + line = row.line().map(NonZeroU64::get).unwrap_or(0) as u32; + column = match row.column() { + gimli::ColumnType::LeftEdge => 0, + gimli::ColumnType::Column(x) => x.get() as u32, + }; + } + + if row.end_sequence() { + is_candidate = false; + } + } + } + + Ok((None, None, None)) + } + + pub fn match_pc(&self, probe: u64) -> bool { + if let Some(range_offset) = self.attrs.ranges_offset { + return self.match_range(probe, range_offset); + } + + match (self.attrs.low_pc, self.attrs.high_pc) { + (Some(low), Some(high)) => { + probe >= low + && match high { + HighPc::Addr(high) => probe < high, + HighPc::Offset(size) => probe < low + size, + } + } + _ => false, + } + } + + pub fn find_frames(&self, probe: u64) -> gimli::Result> { + let mut functions_frames = vec![]; + if let Some(offset) = self.find_subprogram(probe)? { + let _ = self.find_function(offset, probe, &mut functions_frames); + } + + Ok(functions_frames) + } + + pub fn get_address(&self, idx: DebugAddrIndex) -> u64 { + self.debug_addr + .get_address(self.head.encoding().address_size, self.attrs.addr_base, idx) + .unwrap() + } +} diff --git a/src/common/exception/src/elf/library_loader.rs b/src/common/exception/src/elf/library_loader.rs new file mode 100644 index 000000000000..e356cb71791a --- /dev/null +++ b/src/common/exception/src/elf/library_loader.rs @@ -0,0 +1,227 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ffi::CStr; +use std::ffi::OsStr; +use std::ffi::OsString; +use std::fmt::Write; +use std::os::fd::AsRawFd; +use std::os::unix::ffi::OsStrExt; +use std::path::PathBuf; +use std::ptr; +use std::sync::Arc; + +use libc::c_int; +use libc::c_void; +use libc::dl_iterate_phdr; +use libc::dl_phdr_info; +use libc::size_t; +use object::Object; +use object::ObjectSymbol; +use object::ObjectSymbolTable; + +use crate::elf::library_manager::Library; +use crate::elf::library_symbol::Symbol; +use crate::elf::ElfFile; + +#[derive(Debug)] +pub struct LibraryLoader { + symbols: Vec, + libraries: Vec, +} + +impl LibraryLoader { + pub fn load() -> LibraryLoader { + unsafe { + let mut loader = LibraryLoader { + symbols: vec![], + libraries: vec![], + }; + + dl_iterate_phdr(Some(callback), std::ptr::addr_of_mut!(loader).cast()); + + loader + } + } + + pub unsafe fn load_symbols(&mut self, library: &mut Library) { + let library_data = library.data(); + + let Ok(elf) = ElfFile::parse(library_data) else { + return; + }; + + let symbol_table = match elf.symbol_table() { + Some(symbol_table) => symbol_table, + None => match elf.dynamic_symbol_table() { + Some(dynamic_symbol_table) => dynamic_symbol_table, + None => { + return; + } + }, + }; + + for symbol in symbol_table.symbols() { + let Ok(sym_name) = symbol.name() else { + continue; + }; + + if sym_name.is_empty() || symbol.size() == 0 { + continue; + } + + self.symbols.push(Symbol { + #[allow(clippy::missing_transmute_annotations)] + name: unsafe { std::mem::transmute(sym_name) }, + address_begin: library.address_begin as u64 + symbol.address(), + address_end: library.address_begin as u64 + symbol.address() + symbol.size(), + }); + } + + library.elf = Some(Arc::new(elf)); + } + + unsafe fn mmap_library(&mut self, library_path: PathBuf) -> std::io::Result { + let name = format!("{}", library_path.display()); + let file = std::fs::File::open(library_path)?; + let file_len = file.metadata()?.len(); + let ptr = libc::mmap( + ptr::null_mut(), + file_len as libc::size_t, + libc::PROT_READ, + libc::MAP_PRIVATE, + file.as_raw_fd(), + 0, + ); + + match ptr == libc::MAP_FAILED { + true => Err(std::io::Error::other("Cannot mmap")), + false => Ok(Library::create(name, ptr as *const u8, file_len as usize)), + } + } + + unsafe fn load_library(&mut self, info: &dl_phdr_info) -> std::io::Result { + let library_name = match info.dlpi_name.is_null() || *info.dlpi_name == 0 { + true => match self.libraries.is_empty() { + true => OsString::from("/proc/self/exe"), + false => OsString::new(), + }, + false => { + let bytes = CStr::from_ptr(info.dlpi_name).to_bytes(); + OsStr::from_bytes(bytes).to_owned() + } + }; + + if library_name.is_empty() { + return Err(std::io::Error::other("empty library name")); + } + + let binary_path = std::fs::canonicalize(library_name)?.to_path_buf(); + let mut binary_library = self.mmap_library(binary_path.clone())?; + + let Some(binary_build_id) = binary_library.build_id() else { + let binary_data = binary_library.data(); + binary_library.address_begin = info.dlpi_addr as usize; + binary_library.address_end = info.dlpi_addr as usize + binary_data.len(); + return Ok(binary_library); + }; + + // Symbol binary in the same dir ./databend-query.debug + let mut binary_debug_path = binary_path.clone(); + binary_debug_path.set_extension("debug"); + + if std::fs::exists(&binary_debug_path)? { + let mut library = self.mmap_library(binary_debug_path)?; + if matches!(library.build_id(), Some(v) if v == binary_build_id) { + let library_data = library.data(); + library.address_begin = info.dlpi_addr as usize; + library.address_end = info.dlpi_addr as usize + library_data.len(); + return Ok(library); + } + } + + // Symbol binary in the system lib/debug/relative_path dir + // /usr/lib/debug/home/ubuntu/databend/databend-query.debug + let Ok(relative_path) = binary_path.strip_prefix("/") else { + return Err(std::io::Error::other("Cannot strip_prefix for path")); + }; + + let lib_debug = std::path::Path::new("/usr/lib/debug"); + let mut system_named_debug_path = lib_debug.join(relative_path); + system_named_debug_path.set_extension("debug"); + + if std::fs::exists(&system_named_debug_path)? { + let mut library = self.mmap_library(system_named_debug_path)?; + if matches!(library.build_id(), Some(v) if v == binary_build_id) { + let library_data = library.data(); + library.address_begin = info.dlpi_addr as usize; + library.address_end = info.dlpi_addr as usize + library_data.len(); + return Ok(library); + } + } + + if binary_build_id.len() >= 2 { + fn encode_hex(bytes: &[u8]) -> String { + let mut encoded_hex = String::with_capacity(bytes.len() * 2); + for &b in bytes { + write!(&mut encoded_hex, "{:02x}", b).unwrap(); + } + encoded_hex + } + + let mut system_id_debug_path = lib_debug + .join(encode_hex(&binary_build_id[..1])) + .join(encode_hex(&binary_build_id[1..])); + + system_id_debug_path.set_extension("debug"); + + if std::fs::exists(&system_id_debug_path)? { + let mut library = self.mmap_library(system_id_debug_path)?; + + if matches!(library.build_id(), Some(v) if v == binary_build_id) { + let library_data = library.data(); + library.address_begin = info.dlpi_addr as usize; + library.address_end = info.dlpi_addr as usize + library_data.len(); + return Ok(library); + } + } + } + + let binary_library_data = binary_library.data(); + binary_library.address_begin = info.dlpi_addr as usize; + binary_library.address_end = info.dlpi_addr as usize + binary_library_data.len(); + Ok(binary_library) + } + + pub unsafe fn load_libraries(&mut self, info: &dl_phdr_info) { + if let Ok(mut library) = self.load_library(info) { + self.load_symbols(&mut library); + self.libraries.push(library); + } + } + + pub fn finalize(mut self) -> (Vec, Vec) { + self.symbols.sort_by(Symbol::sort_begin_address); + self.libraries.sort_by(Library::sort_begin_address); + self.symbols.dedup_by(Symbol::same_address); + + (self.libraries, self.symbols) + } +} + +unsafe extern "C" fn callback(info: *mut dl_phdr_info, _size: size_t, v: *mut c_void) -> c_int { + let loader = &mut *v.cast::(); + loader.load_libraries(&*info); + 0 +} diff --git a/src/common/exception/src/elf/library_manager.rs b/src/common/exception/src/elf/library_manager.rs new file mode 100644 index 000000000000..daa4bffec7d0 --- /dev/null +++ b/src/common/exception/src/elf/library_manager.rs @@ -0,0 +1,197 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use object::Object; +use once_cell::sync::OnceCell; + +use crate::elf::dwarf::Dwarf; +use crate::elf::library_loader::LibraryLoader; +use crate::elf::library_symbol::Symbol; +use crate::elf::ElfFile; +use crate::exception_backtrace::ResolvedStackFrame; +use crate::exception_backtrace::StackFrame; + +pub struct Library { + pub name: String, + pub address_begin: usize, + pub address_end: usize, + pub elf: Option>, + library_data: &'static [u8], +} + +impl Library { + pub fn create(name: String, data: *const u8, size: usize) -> Library { + Library { + name, + address_begin: 0, + address_end: 0, + elf: None, + // Leak memory + library_data: unsafe { std::slice::from_raw_parts(data, size) }, + } + } + pub fn sort_begin_address(&self, other: &Self) -> Ordering { + self.address_begin.cmp(&other.address_begin) + } + + pub fn data(&self) -> &'static [u8] { + self.library_data + } + + pub unsafe fn build_id(&self) -> Option<&'static [u8]> { + let elf_file = ElfFile::parse(self.data()).ok()?; + match elf_file.build_id() { + Ok(None) | Err(_) => None, + Ok(Some(build)) => Some(build), + } + } +} + +static INSTANCE: OnceCell> = OnceCell::new(); + +impl Debug for Library { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Library") + .field("name", &self.name) + .field("address_begin", &self.address_begin) + .field("address_end", &self.address_end) + .finish() + } +} + +// #[derive(Debug)] +pub struct LibraryManager { + symbols: Vec, + libraries: Vec, +} + +impl Debug for LibraryManager { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LibraryManager") + .field("libraries", &self.libraries) + .field("symbols", &self.symbols.len()) + .finish() + } +} + +impl LibraryManager { + fn find_library(&self, addr: usize) -> Option<&Library> { + self.libraries + .iter() + .find(|library| library.address_begin <= addr && addr <= library.address_end) + } + + pub fn resolve_frames Result<(), E>>( + &self, + frames: &[StackFrame], + only_address: bool, + mut f: F, + ) -> Result<(), E> { + let mut dwarf_cache = HashMap::with_capacity(self.libraries.len()); + + for frame in frames { + let StackFrame::Ip(addr) = frame; + + let mut resolved_frame = ResolvedStackFrame { + virtual_address: *addr, + physical_address: *addr, + symbol: String::from(""), + inlined: false, + file: None, + line: None, + column: None, + }; + + if let Some(library) = self.find_library(*addr) { + resolved_frame.physical_address = *addr - library.address_begin; + } + let Some(library) = self.find_library(*addr) else { + f(ResolvedStackFrame { + virtual_address: *addr, + physical_address: *addr, + symbol: String::from(""), + inlined: false, + file: None, + line: None, + column: None, + })?; + + continue; + }; + + let physical_address = *addr - library.address_begin; + + if !only_address { + let dwarf = match library.elf.as_ref() { + None => &None, + Some(elf) => match dwarf_cache.get(&library.name) { + Some(v) => v, + None => { + dwarf_cache.insert(library.name.clone(), Dwarf::create(elf.clone())); + dwarf_cache.get(&library.name).unwrap() + } + }, + }; + + if let Some(dwarf) = dwarf { + let adjusted_addr = (physical_address - 1) as u64; + + if let Ok(locations) = dwarf.find_frames(adjusted_addr) { + for location in locations { + f(ResolvedStackFrame { + virtual_address: 0, + physical_address, + symbol: location.symbol.unwrap_or("".to_string()), + inlined: location.is_inlined, + file: location.file, + line: location.line, + column: location.column, + })?; + } + + continue; + } + } + } + + f(ResolvedStackFrame { + physical_address, + virtual_address: *addr, + inlined: false, + symbol: String::from(""), + file: None, + line: None, + column: None, + })?; + } + + Ok(()) + } + + pub fn create() -> Arc { + let loader = LibraryLoader::load(); + let (libraries, symbols) = loader.finalize(); + Arc::new(LibraryManager { symbols, libraries }) + } + + pub fn instance() -> Arc { + INSTANCE.get_or_init(LibraryManager::create).clone() + } +} diff --git a/src/common/exception/src/elf/library_symbol.rs b/src/common/exception/src/elf/library_symbol.rs new file mode 100644 index 000000000000..84804fbbea2a --- /dev/null +++ b/src/common/exception/src/elf/library_symbol.rs @@ -0,0 +1,46 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::fmt::Debug; +use std::fmt::Formatter; + +pub struct Symbol { + pub name: &'static [u8], + pub address_end: u64, + pub address_begin: u64, +} + +impl Symbol { + pub fn sort_begin_address(&self, other: &Self) -> Ordering { + self.address_begin.cmp(&other.address_begin) + } + + pub fn same_address(&mut self, other: &mut Self) -> bool { + self.address_begin == other.address_begin && self.address_end == other.address_end + } +} + +impl Debug for Symbol { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Symbol") + .field( + "name", + &rustc_demangle::demangle(std::str::from_utf8(self.name).unwrap()), + ) + .field("address_begin", &self.address_begin) + .field("address_end", &self.address_end) + .finish() + } +} diff --git a/src/common/exception/src/elf/mod.rs b/src/common/exception/src/elf/mod.rs new file mode 100644 index 000000000000..c1696452ef16 --- /dev/null +++ b/src/common/exception/src/elf/mod.rs @@ -0,0 +1,29 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod dwarf; +mod dwarf_inline_functions; +mod dwarf_subprogram; +mod dwarf_unit; +mod library_loader; +mod library_manager; +mod library_symbol; + +#[cfg(target_pointer_width = "32")] +type ElfFile = object::read::elf::ElfFile32<'static, object::NativeEndian, &'static [u8]>; + +#[cfg(target_pointer_width = "64")] +type ElfFile = object::read::elf::ElfFile64<'static, object::NativeEndian, &'static [u8]>; + +pub use library_manager::LibraryManager; diff --git a/src/common/exception/src/exception.rs b/src/common/exception/src/exception.rs index 5f488f51058b..668aeba8c6b5 100644 --- a/src/common/exception/src/exception.rs +++ b/src/common/exception/src/exception.rs @@ -18,75 +18,14 @@ use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; use std::marker::PhantomData; -use std::sync::Arc; -use backtrace::Backtrace; use databend_common_ast::span::pretty_print_error; use databend_common_ast::Span; use thiserror::Error; use crate::exception_backtrace::capture; use crate::ErrorFrame; - -#[derive(Clone)] -pub enum ErrorCodeBacktrace { - Serialized(Arc), - Symbols(Arc), - Address(Arc), -} - -impl Display for ErrorCodeBacktrace { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - match self { - ErrorCodeBacktrace::Serialized(backtrace) => write!(f, "{}", backtrace), - ErrorCodeBacktrace::Symbols(backtrace) => write!(f, "{:?}", backtrace), - ErrorCodeBacktrace::Address(backtrace) => { - let frames_address = backtrace - .frames() - .iter() - .map(|f| (f.ip() as usize, f.symbol_address() as usize)) - .collect::>(); - write!(f, "{:?}", frames_address) - } - } - } -} - -impl From<&str> for ErrorCodeBacktrace { - fn from(s: &str) -> Self { - Self::Serialized(Arc::new(s.to_string())) - } -} - -impl From for ErrorCodeBacktrace { - fn from(s: String) -> Self { - Self::Serialized(Arc::new(s)) - } -} - -impl From> for ErrorCodeBacktrace { - fn from(s: Arc) -> Self { - Self::Serialized(s) - } -} - -impl From for ErrorCodeBacktrace { - fn from(bt: Backtrace) -> Self { - Self::Symbols(Arc::new(bt)) - } -} - -impl From<&Backtrace> for ErrorCodeBacktrace { - fn from(bt: &Backtrace) -> Self { - Self::Serialized(Arc::new(format!("{:?}", bt))) - } -} - -impl From> for ErrorCodeBacktrace { - fn from(bt: Arc) -> Self { - Self::Symbols(bt) - } -} +use crate::StackTrace; #[derive(Error)] pub struct ErrorCode { @@ -98,8 +37,8 @@ pub struct ErrorCode { // cause is only used to contain an `anyhow::Error`. // TODO: remove `cause` when we completely get rid of `anyhow::Error`. pub(crate) cause: Option>, - pub(crate) backtrace: Option, pub(crate) stacks: Vec, + pub(crate) backtrace: StackTrace, pub(crate) _phantom: PhantomData, } @@ -199,24 +138,12 @@ impl ErrorCode { self } - /// Set backtrace info for this error. - /// - /// Useful when trying to keep original backtrace - pub fn set_backtrace(mut self, bt: Option>) -> Self { - if let Some(b) = bt { - self.backtrace = Some(b.into()); - } - self - } - - pub fn backtrace(&self) -> Option { + pub fn backtrace(&self) -> StackTrace { self.backtrace.clone() } pub fn backtrace_str(&self) -> String { - self.backtrace - .as_ref() - .map_or("".to_string(), |x| x.to_string()) + format!("{:?}", &self.backtrace) } pub fn stacks(&self) -> &[ErrorFrame] { @@ -241,26 +168,12 @@ impl Debug for ErrorCode { self.message(), )?; - match self.backtrace.as_ref() { - None => write!( + match self.backtrace.frames.is_empty() { + true => write!( f, "\n\n " ), - Some(backtrace) => { - // TODO: Custom stack frame format for print - match backtrace { - ErrorCodeBacktrace::Symbols(backtrace) => write!(f, "\n\n{:?}", backtrace), - ErrorCodeBacktrace::Serialized(backtrace) => write!(f, "\n\n{}", backtrace), - ErrorCodeBacktrace::Address(backtrace) => { - let frames_address = backtrace - .frames() - .iter() - .map(|f| (f.ip() as usize, f.symbol_address() as usize)) - .collect::>(); - write!(f, "\n\n{:?}", frames_address) - } - } - } + false => write!(f, "\n\n{:?}", &self.backtrace), } } } @@ -318,8 +231,8 @@ impl ErrorCode { detail: String::new(), span: None, cause: None, - backtrace: None, stacks: vec![], + backtrace: StackTrace::no_capture(), _phantom: PhantomData::, } } @@ -330,7 +243,7 @@ impl ErrorCode { display_text: String, detail: String, cause: Option>, - backtrace: Option, + backtrace: StackTrace, ) -> Self { ErrorCode { code, diff --git a/src/common/exception/src/exception_backtrace.rs b/src/common/exception/src/exception_backtrace.rs index ef50db7f0c35..7dfdeaa5de3c 100644 --- a/src/common/exception/src/exception_backtrace.rs +++ b/src/common/exception/src/exception_backtrace.rs @@ -12,12 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -// use std::backtrace::Backtrace; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::fmt::Write; +use std::hash::Hash; +use std::hash::Hasher; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; - -use crate::exception::ErrorCodeBacktrace; +use std::sync::LazyLock; +use std::sync::Mutex; +use std::sync::PoisonError; +use std::sync::RwLock; // 0: not specified 1: disable 2: enable pub static USER_SET_ENABLE_BACKTRACE: AtomicUsize = AtomicUsize::new(0); @@ -26,6 +34,8 @@ pub fn set_backtrace(switch: bool) { if switch { USER_SET_ENABLE_BACKTRACE.store(2, Ordering::Relaxed); } else { + let mut write_guard = STACK_CACHE.write().unwrap_or_else(PoisonError::into_inner); + write_guard.clear(); USER_SET_ENABLE_BACKTRACE.store(1, Ordering::Relaxed); } } @@ -49,42 +59,220 @@ fn enable_rust_backtrace() -> bool { enabled } -enum BacktraceStyle { - Symbols, - Address, +pub fn capture() -> StackTrace { + match enable_rust_backtrace() { + true => StackTrace::capture(), + false => StackTrace::no_capture(), + } +} + +#[cfg(target_os = "linux")] +pub struct ResolvedStackFrame { + pub virtual_address: usize, + pub physical_address: usize, + pub symbol: String, + pub inlined: bool, + pub file: Option, + pub line: Option, + pub column: Option, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub enum StackFrame { + #[cfg(target_os = "linux")] + Ip(usize), + #[cfg(not(target_os = "linux"))] + Backtrace(backtrace::BacktraceFrame), } -fn backtrace_style() -> BacktraceStyle { - static ENABLED: AtomicUsize = AtomicUsize::new(0); - match ENABLED.load(Ordering::Relaxed) { - 1 => return BacktraceStyle::Address, - 2 => return BacktraceStyle::Symbols, - _ => {} +impl Eq for StackFrame {} + +impl PartialEq for StackFrame { + fn eq(&self, other: &Self) -> bool { + #[cfg(target_os = "linux")] + { + let StackFrame::Ip(addr) = &self; + let StackFrame::Ip(other_addr) = &other; + addr == other_addr + } + + #[cfg(not(target_os = "linux"))] + { + let StackFrame::Backtrace(addr) = &self; + let StackFrame::Backtrace(other_addr) = &other; + addr.ip() == other_addr.ip() + } } +} - let backtrace_style = match std::env::var("BACKTRACE_STYLE") { - Ok(style) if style.eq_ignore_ascii_case("ADDRESS") => 1, - _ => 2, - }; +impl Hash for StackFrame { + fn hash(&self, state: &mut H) { + #[cfg(target_os = "linux")] + { + let StackFrame::Ip(addr) = &self; + addr.hash(state); + } - ENABLED.store(backtrace_style, Ordering::Relaxed); - match backtrace_style { - 1 => BacktraceStyle::Address, - _ => BacktraceStyle::Symbols, + #[cfg(not(target_os = "linux"))] + { + let StackFrame::Backtrace(addr) = &self; + addr.ip().hash(state) + } } } -pub fn capture() -> Option { - match enable_rust_backtrace() { - false => None, - true => match backtrace_style() { - BacktraceStyle::Symbols => Some(ErrorCodeBacktrace::Symbols(Arc::new( - backtrace::Backtrace::new(), - ))), - // TODO: get offset address(https://github.com/rust-lang/backtrace-rs/issues/434) - BacktraceStyle::Address => Some(ErrorCodeBacktrace::Address(Arc::new( - backtrace::Backtrace::new_unresolved(), - ))), - }, +// Rewrite the backtrace on linux ELF using gimli-rs. +// +// Differences from backtrace-rs[https://github.com/rust-lang/backtrace-rs]: +// - Almost lock-free (backtrace-rs requires large-grained locks or frequent lock operations) +// - Symbol resolution is lazy, only resolved when outputting +// - Cache the all stack frames for the stack, not just a single stack frame +// - Output the physical addresses of the stack instead of virtual addresses, even in the absence of symbols (this will help us use backtraces to get cause in the case of split symbol tables) +// - Output inline functions and marked it +// +// What's different from gimli-addr2line[https://github.com/gimli-rs/addr2line](why not use gimli-addr2line): +// - Use aranges to optimize the lookup of DWARF units (if present) +// - gimli-addr2line caches and sorts the symbol tables to speed up symbol lookup, which would introduce locks and caching (but in reality, symbol lookup is a low-frequency operation in databend, and rapid reconstruction based on mmap is sufficient). +#[derive(Clone, serde::Serialize, serde::Deserialize)] +pub struct StackTrace { + pub(crate) frames: Vec, +} + +impl Eq for StackTrace {} + +impl PartialEq for StackTrace { + fn eq(&self, other: &Self) -> bool { + self.frames == other.frames + } +} + +impl StackTrace { + pub fn capture() -> StackTrace { + let mut frames = Vec::with_capacity(50); + Self::capture_frames(&mut frames); + StackTrace { frames } + } + + pub fn no_capture() -> StackTrace { + StackTrace { frames: vec![] } + } + + #[cfg(not(target_os = "linux"))] + fn capture_frames(frames: &mut Vec) { + unsafe { + backtrace::trace_unsynchronized(|frame| { + frames.push(StackFrame::Backtrace(backtrace::BacktraceFrame::from( + frame.clone(), + ))); + frames.len() != frames.capacity() + }); + } + } + + #[cfg(target_os = "linux")] + fn capture_frames(frames: &mut Vec) { + // Safety: + unsafe { + backtrace::trace_unsynchronized(|frame| { + frames.push(StackFrame::Ip(frame.ip() as usize)); + frames.len() != frames.capacity() + }); + } + } + + #[cfg(not(target_os = "linux"))] + fn fmt_frames(&self, display_text: &mut String, address: bool) -> std::fmt::Result { + let mut frames = std::vec::Vec::with_capacity(self.frames.len()); + for frame in &self.frames { + let StackFrame::Backtrace(frame) = frame; + frames.push(frame.clone()); + } + + let mut backtrace = backtrace::Backtrace::from(frames); + + if !address { + backtrace.resolve(); + } + + writeln!(display_text, "{:?}", backtrace) + } + + #[cfg(target_os = "linux")] + fn fmt_frames(&self, f: &mut String, address: bool) -> std::fmt::Result { + let mut idx = 0; + crate::elf::LibraryManager::instance().resolve_frames(&self.frames, address, |frame| { + write!(f, "{:4}: {}", idx, frame.symbol)?; + + if frame.inlined { + write!(f, "[inlined]")?; + } else if frame.physical_address != frame.virtual_address { + write!(f, "@{:x}", frame.physical_address)?; + } + + #[allow(clippy::writeln_empty_string)] + writeln!(f, "")?; + + if let Some(file) = frame.file { + write!(f, " at {}", file)?; + + if let Some(line) = frame.line { + write!(f, ":{}", line)?; + + if let Some(column) = frame.column { + write!(f, ":{}", column)?; + } + } + + #[allow(clippy::writeln_empty_string)] + writeln!(f, "")?; + } + + idx += 1; + Ok(()) + }) + } +} + +#[allow(clippy::type_complexity)] +static STACK_CACHE: LazyLock, Arc>>>>>> = + LazyLock::new(|| RwLock::new(HashMap::new())); + +impl Debug for StackTrace { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if self.frames.is_empty() { + return writeln!(f, ""); + } + + let mut display_text = { + let read_guard = STACK_CACHE.read().unwrap_or_else(PoisonError::into_inner); + read_guard.get(&self.frames).cloned() + }; + + if display_text.is_none() { + let mut guard = STACK_CACHE.write().unwrap_or_else(PoisonError::into_inner); + + display_text = Some(match guard.entry(self.frames.clone()) { + Entry::Occupied(v) => v.get().clone(), + Entry::Vacant(v) => v.insert(Arc::new(Mutex::new(None))).clone(), + }); + } + + let display_text_lock = display_text.as_ref().unwrap(); + let mut display_guard = display_text_lock + .lock() + .unwrap_or_else(PoisonError::into_inner); + + if display_guard.is_none() { + let mut display_text = String::new(); + + self.fmt_frames(&mut display_text, !enable_rust_backtrace())?; + *display_guard = Some(Arc::new(display_text)); + } + + let display_text = display_guard.as_ref().unwrap().clone(); + drop(display_guard); + + writeln!(f, "{}", display_text)?; + Ok(()) } } diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index cb18b0e451cc..a8f5a9e686e1 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -16,15 +16,14 @@ use std::error::Error; use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; -use std::sync::Arc; use databend_common_ast::Span; use geozero::error::GeozeroError; -use crate::exception::ErrorCodeBacktrace; use crate::exception_backtrace::capture; use crate::ErrorCode; use crate::ErrorFrame; +use crate::StackTrace; #[derive(thiserror::Error)] enum OtherErrors { @@ -279,7 +278,7 @@ pub struct SerializedError { pub name: String, pub message: String, pub span: Span, - pub backtrace: String, + pub backtrace: StackTrace, pub stacks: Vec, } @@ -296,7 +295,7 @@ impl From<&ErrorCode> for SerializedError { name: e.name(), message: e.message(), span: e.span(), - backtrace: e.backtrace_str(), + backtrace: e.backtrace.clone(), stacks: e.stacks().iter().map(|f| f.into()).collect(), } } @@ -304,19 +303,13 @@ impl From<&ErrorCode> for SerializedError { impl From<&SerializedError> for ErrorCode { fn from(se: &SerializedError) -> Self { - let backtrace = match se.backtrace.len() { - 0 => None, - _ => Some(ErrorCodeBacktrace::Serialized(Arc::new( - se.backtrace.clone(), - ))), - }; ErrorCode::create( se.code, se.name.clone(), se.message.clone(), String::new(), None, - backtrace, + se.backtrace.clone(), ) .set_span(se.span) .set_stacks(se.stacks.iter().map(|f| f.into()).collect()) @@ -383,28 +376,15 @@ impl From for ErrorCode { } match serde_json::from_slice::(details) { Err(error) => ErrorCode::from(error), - Ok(serialized_error) => match serialized_error.backtrace.len() { - 0 => ErrorCode::create( - serialized_error.code, - serialized_error.name, - serialized_error.message, - String::new(), - None, - None, - ) - .set_span(serialized_error.span), - _ => ErrorCode::create( - serialized_error.code, - serialized_error.name, - serialized_error.message, - String::new(), - None, - Some(ErrorCodeBacktrace::Serialized(Arc::new( - serialized_error.backtrace, - ))), - ) - .set_span(serialized_error.span), - }, + Ok(serialized_error) => ErrorCode::create( + serialized_error.code, + serialized_error.name, + serialized_error.message, + String::new(), + None, + serialized_error.backtrace, + ) + .set_span(serialized_error.span), } } _ => ErrorCode::Unimplemented(status.to_string()), @@ -414,18 +394,16 @@ impl From for ErrorCode { impl From for tonic::Status { fn from(err: ErrorCode) -> Self { - let error_json = serde_json::to_vec::(&SerializedError { + let serialized_error = SerializedError { code: err.code(), name: err.name(), message: err.message(), span: err.span(), - backtrace: { - let mut str = err.backtrace_str(); - str.truncate(2 * 1024); - str - }, stacks: err.stacks().iter().map(|f| f.into()).collect(), - }); + backtrace: err.backtrace, + }; + + let error_json = serde_json::to_vec::(&serialized_error); match error_json { Ok(serialized_error_json) => { @@ -433,7 +411,7 @@ impl From for tonic::Status { // To distinguish from that, we use Code::Unknown here tonic::Status::with_details( tonic::Code::Unknown, - err.message(), + serialized_error.message.clone(), serialized_error_json.into(), ) } diff --git a/src/common/exception/src/lib.rs b/src/common/exception/src/lib.rs index 548d1d02ae30..62332d2d4103 100644 --- a/src/common/exception/src/lib.rs +++ b/src/common/exception/src/lib.rs @@ -14,7 +14,11 @@ #![allow(clippy::uninlined_format_args)] +extern crate core; + mod context; +#[cfg(target_os = "linux")] +mod elf; pub mod exception; mod exception_backtrace; mod exception_code; @@ -28,5 +32,6 @@ pub use exception::ErrorCode; pub use exception::Result; pub use exception::ToErrorCode; pub use exception_backtrace::set_backtrace; +pub use exception_backtrace::StackTrace; pub use exception_backtrace::USER_SET_ENABLE_BACKTRACE; pub use exception_into::SerializedError; diff --git a/src/common/exception/tests/it/exception_flight.rs b/src/common/exception/tests/it/exception_flight.rs index cf360a397b83..59db53a0404e 100644 --- a/src/common/exception/tests/it/exception_flight.rs +++ b/src/common/exception/tests/it/exception_flight.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use arrow_flight::FlightData; -use backtrace::Backtrace; -use databend_common_exception::exception::ErrorCodeBacktrace; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_exception::StackTrace; #[test] fn test_serialize() -> Result<()> { @@ -28,7 +25,7 @@ fn test_serialize() -> Result<()> { String::from("test_message"), String::new(), None, - Some(ErrorCodeBacktrace::Symbols(Arc::new(Backtrace::new()))), + StackTrace::capture(), ) .set_span(Some((0..1).into())); let backtrace_str = error_code.backtrace_str(); diff --git a/src/query/pipeline/core/src/processors/profile.rs b/src/query/pipeline/core/src/processors/profile.rs index 5a61e82ceb07..f3f776601fd6 100644 --- a/src/query/pipeline/core/src/processors/profile.rs +++ b/src/query/pipeline/core/src/processors/profile.rs @@ -26,6 +26,7 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileLabel; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_exception::ErrorCode; +use databend_common_exception::StackTrace; pub struct PlanScopeGuard { idx: usize, @@ -54,7 +55,7 @@ impl Drop for PlanScopeGuard { pub struct ErrorInfoDesc { message: String, detail: String, - backtrace: String, + backtrace: StackTrace, } impl ErrorInfoDesc { @@ -62,7 +63,7 @@ impl ErrorInfoDesc { ErrorInfoDesc { message: error.message(), detail: error.detail(), - backtrace: error.backtrace_str(), + backtrace: error.backtrace(), } } } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test index b56efaf7794d..00eb09c87786 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0026_merge_into.test @@ -1,7 +1,1438 @@ statement ok set enable_distributed_merge_into = 1; -include ./09_0036_merge_into_without_distributed_enable.test +statement ok +set enable_experimental_merge_into = 1; + +statement ok +drop database if exists db; + +statement ok +create database db; + +statement ok +use db; + +statement ok +drop table if exists t1; + +statement ok +drop table if exists t2; + +statement ok +create table t1(a int,b string, c string) cluster by(a,b); + +statement ok +create table t2(a int,b string, c string) cluster by(a,b); + +statement ok +insert into t1 values(1,'b1','c1'),(2,'b2','c2'); + +statement ok +insert into t1 values(2,'b3','c3'),(3,'b4','c4'); + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c1 +2 b2 c2 +2 b3 c3 +3 b4 c4 + +statement ok +insert into t2 values(1,'b_5','c_5'),(3,'b_6','c_6'); + +statement ok +insert into t2 values(2,'b_7','c_7'); + +query TTT +select * from t2 order by a,b,c; +---- +1 b_5 c_5 +2 b_7 c_7 +3 b_6 c_6 + +## test source alias +statement error 1005 +merge into t1 using (select * from t2 ) on t1.a = t2.a when matched then update set t1.c = t2.c,t1.c = t2.c; + +# section I: basic test for match and unmatch + +statement error 1006 +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then update set t1.c = t2.c,t1.c = t2.c; + +query T +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then update set t1.c = t2.c; +---- +4 + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c_5 +2 b2 c_7 +2 b3 c_7 +3 b4 c_6 + +statement ok +insert into t2 values(4,'b_8','c_8'); + +query TTT +select * from t2 order by a,b,c; +---- +1 b_5 c_5 +2 b_7 c_7 +3 b_6 c_6 +4 b_8 c_8 + +query TT +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then update set t1.c = t2.c when not matched then insert (a,b,c) values(t2.a,t2.b,t2.c); +---- +1 4 + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c_5 +2 b2 c_7 +2 b3 c_7 +3 b4 c_6 +4 b_8 c_8 + +statement ok +insert into t2 values(1,'b_9','c_9'); + +statement error 4001 +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then update set t1.c = t2.c when not matched then insert (a,b,c) values(t2.a,t2.b,t2.c); + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c_5 +2 b2 c_7 +2 b3 c_7 +3 b4 c_6 +4 b_8 c_8 + +statement ok +delete from t2 where a = 1; + +query TTT +select * from t2 order by a,b,c; +---- +2 b_7 c_7 +3 b_6 c_6 +4 b_8 c_8 + +statement ok +insert into t2 values(5,'b_9','c_9'); + +query TTT +select * from t2 order by a,b,c; +---- +2 b_7 c_7 +3 b_6 c_6 +4 b_8 c_8 +5 b_9 c_9 + +query TT +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then delete; +---- +4 + +query ITT +select * from t1 order by a,b,c; +---- +1 b1 c_5 + +# section 2 multi clauses +statement ok +insert into t1 values(2,'b_1','c_1'),(3,'b_2','c_2'); + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c_5 +2 b_1 c_1 +3 b_2 c_2 + +statement error 1005 +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then delete when matched then update set t1.c = t2.c when not matched and t2.c = 'c_8' then insert (a,b,c) values(t2.a,t2.b,t2.c); + +query TTT +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched and t1.b = 'b_1' then delete when matched then update set t1.c = t2.c when not matched and t2.c = 'c_8' then insert (a,b,c) values(t2.a,t2.b,t2.c); +---- +1 1 1 + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c_5 +3 b_2 c_6 +4 b_8 c_8 + +query TT +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then delete when not matched and t2.c = 'c_9' then insert (a,b,c) values(t2.a,t2.b,t2.c); +---- +1 2 + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c_5 +5 b_9 c_9 + +query T +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when not matched and t2.c = 'c_8' then insert (a,b) values(t2.a,t2.b) when not matched and t2.c = 'c_7' then insert (a,c) values(t2.a,t2.c); +---- +2 + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c_5 +2 NULL c_7 +4 b_8 NULL +5 b_9 c_9 + +statement ok +insert into t2 values(5,'b_10','c_10'); + +query TTT +select * from t2 order by a,b,c; +---- +2 b_7 c_7 +3 b_6 c_6 +4 b_8 c_8 +5 b_10 c_10 +5 b_9 c_9 + +statement error 4001 +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched and t2.c = 'c_9' then update set t1.b = 'b_11' when matched and t2.c = 'c_10' then delete; + +## idempotent delete test +query T +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then delete; +---- +3 + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c_5 + +## test star for merge into +statement ok +truncate table t1; + +statement ok +truncate table t2; + +query I +select count(*) from t1; +---- +0 + +query I +select count(*) from t2; +---- +0 + +statement ok +insert into t1 values(1,'b1','c1'),(2,'b2','c2'); + +query TTT +select * from t1 order by a,b,c; +---- +1 b1 c1 +2 b2 c2 + +statement ok +insert into t2 values(1,'b3','c3'),(3,'b4','c4'); + +query TTT +select * from t2 order by a,b,c; +---- +1 b3 c3 +3 b4 c4 + +## test insert columns mismatch +statement error 1065 +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when not matched then insert values(t2.a,t2.c); + +query TT +merge into t1 using (select * from t2 ) as t2 on t1.a = t2.a when matched then update * when not matched then insert *; +---- +1 1 + + +query TTT +select * from t1 order by a,b,c; +---- +1 b3 c3 +2 b2 c2 +3 b4 c4 + +## test multi same name for star +statement error 1065 +merge into t1 using (select a,b,c,a from t2 ) as t2 on t1.a = t2.a when matched then update *; + +statement error 1065 +merge into t1 using (select a,b,c,a,b from t2 ) as t2 on t1.a = t2.a when not matched then insert *; + +## stage file test +statement ok +drop table if exists test_stage; + +statement ok +drop table if exists target_table; + +statement ok +create table target_table(a int,b string,c string) cluster by(a,b); + +statement ok +insert into target_table values(1,'a_1','b_1'),(2,'a_2','b_2'); + +query TTT +select * from target_table order by a,b,c; +---- +1 a_1 b_1 +2 a_2 b_2 + +statement ok +create table test_stage(a int,b string,c string) cluster by(a,b); + +statement ok +insert into test_stage values(1,'a1','b1'),(2,'a2','b2'),(3,'a3','b3'); + +query TTT +select * from test_stage order by a,b,c; +---- +1 a1 b1 +2 a2 b2 +3 a3 b3 + +statement ok +drop stage if exists s6_merge_into; + +statement ok +drop stage if exists s7_merge_into; + +statement ok +create stage s6_merge_into FILE_FORMAT = (TYPE = CSV); + +statement ok +remove @s6_merge_into; + +statement ok +copy into @s6_merge_into from (select a,b,c from test_stage order by a,b,c); + +query TTT +select $1,$2,$3 from @s6_merge_into order by $1,$2,$3; +---- +1 a1 b1 +2 a2 b2 +3 a3 b3 + +## test CSV +query TT +merge into target_table using (select $1,$2,$3 from @s6_merge_into) as cdc on cast(cdc.$1 as int) = target_table.a when matched then delete when not matched then insert values(cdc.$1,cdc.$2,cdc.$3); +---- +1 2 + +query TTT +select * from target_table order by a,b,c; +---- +3 a3 b3 + +## test parquet +statement ok +truncate table target_table; + +query I +select count(*) from target_table; +---- +0 + +statement ok +create stage s7_merge_into FILE_FORMAT = (TYPE = PARQUET); + +statement ok +remove @s7_merge_into; + +statement ok +copy into @s7_merge_into from (select a,b,c from test_stage order by a,b,c); + +query TTT +select $1,$2,$3 from @s7_merge_into order by $1,$2,$3; +---- +1 a1 b1 +2 a2 b2 +3 a3 b3 + +statement ok +insert into target_table values(1,'a_1','b_1'),(2,'a_2','b_2'); + +query TTT +select * from target_table order by a,b,c; +---- +1 a_1 b_1 +2 a_2 b_2 + +query TT +merge into target_table using (select $1,$2,$3 from @s7_merge_into) as cdc on cdc.$1 = target_table.a when matched then delete when not matched then insert values(cdc.$1,cdc.$2,cdc.$3); +---- +1 2 + +query TTT +select * from target_table order by a,b,c; +---- +3 a3 b3 + +## NULL test, for join, if join_expr result is +## NULL, it will be treated as not matched. +statement ok +truncate table t1; + +statement ok +truncate table t2; + +query I +select count(*) from t1; +---- +0 + +query I +select count(*) from t2; +---- +0 + +statement ok +insert into t1 values(NULL,'b_1','c_1'); + +query TTT +select * from t1 order by a,b,c; +---- +NULL b_1 c_1 + +statement ok +insert into t2 values(1,'b_4','c_4'),(2,'b_2','c_2'),(NULL,'b_3','c_3'); + +query TTT +select * from t2 order by a,b,c; +---- +1 b_4 c_4 +2 b_2 c_2 +NULL b_3 c_3 + +query TT +merge into t1 using (select * from t2) as t2 on t1.a = t2.a when matched then delete when not matched then insert *; +---- +3 0 + +query TTT +select * from t1 order by a,b,c; +---- +1 b_4 c_4 +2 b_2 c_2 +NULL b_1 c_1 +NULL b_3 c_3 + +query T +merge into t1 using (select * from t2) as t2 on t1.a = t2.a when matched then delete; +---- +2 + +query TTT +select * from t1 order by a,b,c; +---- +NULL b_1 c_1 +NULL b_3 c_3 + +statement ok +truncate table t1; + +statement ok +truncate table t2; + +query I +select count(*) from t1; +---- +0 + +query I +select count(*) from t2; +---- +0 + +## test target table alias +statement ok +insert into t2 values(1,'a1','b1'); + +query TT +merge into t1 as t3 using (select * from t2 ) as t2 on t3.a = t2.a when not matched then insert (a,b,c) values(t2.a,t2.b,t2.c); +---- +1 + +query TTT +select * from t1 order by a,b,c; +---- +1 a1 b1 + +statement ok +drop table if exists employees; + +statement ok +drop table if exists salaries; + +statement ok +CREATE TABLE employees (employee_id INT, employee_name VARCHAR(255),department VARCHAR(255)) cluster by(employee_id,employee_name); + +statement ok +drop table if exists salaries; + +statement ok +CREATE TABLE salaries (employee_id INT,salary DECIMAL(10, 2)) cluster by(employee_id,salary); + +statement ok +INSERT INTO employees VALUES(1, 'Alice', 'HR'),(2, 'Bob', 'IT'),(3, 'Charlie', 'Finance'),(4, 'David', 'HR'); + +statement ok +INSERT INTO salaries VALUES(1, 50000.00),(2, 60000.00); + +query TT +MERGE INTO salaries USING (SELECT * FROM employees) as employees ON salaries.employee_id = employees.employee_id WHEN MATCHED AND employees.department = 'HR' THEN UPDATE SET salaries.salary = salaries.salary + 1000.00 WHEN MATCHED THEN UPDATE SET salaries.salary = salaries.salary + 500.00 WHEN NOT MATCHED THEN INSERT (employee_id, salary) VALUES (employees.employee_id, 55000.00); +---- +2 2 + +query TTT +select * from salaries order by employee_id; +---- +1 51000.00 +2 60500.00 +3 55000.00 +4 55000.00 + +statement ok +drop table if exists t1_target; + +## null cast bug fix +statement ok +drop table if exists t1_target; + +statement ok +drop table if exists t2_source; + +statement ok +create table t1_target(a int not null) cluster by(a); + +statement ok +drop table if exists t2_source; + +statement ok +create table t2_source(a int not null) cluster by(a); + +statement ok +insert into t1_target values(1); + +statement ok +insert into t2_source values(1),(2); + +query TT +merge into t1_target using (select * from t2_source) as t2_source on t1_target.a = t2_source.a when matched then update * when not matched then insert *; +---- +1 1 + +query T +select * from t1_target order by a; +---- +1 +2 + +statement ok +drop table if exists cluster_target; + +## cluster table test +statement ok +drop table if exists cluster_target; + +statement ok +drop table if exists cluster_source; + +statement ok +create table cluster_target(a int,b string,c int) cluster by(a,b); + +statement ok +drop table if exists cluster_source; + +statement ok +create table cluster_source(a int,b string,c int); + +statement ok +insert into cluster_source values(12,'b',1),(1,'a',2),(2,'b',3),(2,'a',4),(3,'a',3); + +## test update indetify error +statement error 1006 +merge into cluster_target as t1 using (select * from cluster_source) as t2 on t1.a = t2.a when matched then update set cluster_target.a = t2.a; + +statement error 1006 +merge into cluster_target as t1 using (select * from cluster_source) as t2 on t1.a = t2.a when matched then update set t2.a = t2.a; + +query TT +merge into cluster_target as t1 using (select * from cluster_source) as t2 on t1.a = t2.a when not matched then insert *; +---- +5 + +# By default setting, all rows merged from `cluster_source` will be resident in a single block of `cluster_target`, +# as table `cluster_target` is clustered by `(a,b)`, the rows inside the one block are assumed to be sorted +# by `(a, b)`, consequently, the result of the following query should be ordered by `(a,b)` without an explicit +# `order by` clause. +query TTT +select * from cluster_target; +---- +1 a 2 +2 a 4 +2 b 3 +3 a 3 +12 b 1 + +## add more tests +statement ok +drop table if exists target_test; + +statement ok +drop table if exists source_test; + +statement ok +create table target_test(a int,b string) cluster by(a,b); + +statement ok +insert into target_test values(1,'a'),(2,'b'),(3,'c'); + +statement ok +create table source_test(a int,b string,delete_flag bool) cluster by(a,b); + +statement ok +insert into source_test values(1,'d',true),(2,'e',true),(3,'f',false),(4,'e',true),(5,'f',false); + +############################################################################### +# To avoid flakiness, using different stage names for http and mysql handlers # +# testing of these 2 handlers may be run concurrently, and conflict with each # +# other, leading to flaky tests. # +############################################################################### + +onlyif mysql +statement ok +drop stage if exists source_parquet2; + +onlyif mysql +statement ok +create stage source_parquet2 file_format = (type = parquet); + +onlyif mysql +statement ok +remove @source_parquet2; + +onlyif mysql +statement ok +copy into @source_parquet2 from (select * from source_test); + +onlyif mysql +query TTT +merge into `target_test` as tt using (select `a`,`b`,`delete_flag` from @source_parquet2 (pattern => '.*[.]parquet')) as ss on (ss.`a` = tt.`a`) +when matched and ss.`delete_flag` = true then delete when matched then update * when not matched and ss.`delete_flag` = false then insert *; +---- +1 1 2 + + +onlyif http +statement ok +drop stage if exists source_parquet_http2; + +onlyif http +statement ok +create stage source_parquet_http2 file_format = (type = parquet); + +onlyif http +statement ok +remove @source_parquet_http2; + +onlyif http +statement ok +copy into @source_parquet_http2 from (select * from source_test); + +onlyif http +query TTT +merge into `target_test` as tt using (select `a`,`b`,`delete_flag` from @source_parquet_http2 (pattern => '.*[.]parquet')) as ss on (ss.`a` = tt.`a`) +when matched and ss.`delete_flag` = true then delete when matched then update * when not matched and ss.`delete_flag` = false then insert *; +---- +1 1 2 + +query TT +select * from target_test order by a; +---- +3 f +5 f + +## test not match cast and predicate index +statement ok +drop table if exists test_order; + +statement ok +drop table if exists random_source; + +statement ok +create table test_order(id bigint, id1 bigint, id2 bigint, id3 bigint, id4 bigint, id5 bigint, id6 bigint, id7 bigint, s1 varchar, s2 varchar, s3 varchar, s4 varchar, s5 varchar, s6 varchar, s7 varchar, s8 varchar, s9 varchar, s10 varchar, s11 varchar, s12 varchar, s13 varchar, d1 DECIMAL(20, 8), d2 DECIMAL(20, 8), d3 DECIMAL(20, 8), d4 DECIMAL(20, 8), d5 DECIMAL(20, 8), d6 DECIMAL(30, 8), d7 DECIMAL(30, 8), d8 DECIMAL(30, 8), d9 DECIMAL(30, 8), d10 DECIMAL(30, 8),insert_time datetime, insert_time1 datetime, insert_time2 datetime, insert_time3 datetime,i int) CLUSTER BY(to_yyyymmdd(insert_time), id) bloom_index_columns='insert_time,id'; + +statement ok +create table random_source(id bigint not null, id1 bigint, id2 bigint, id3 bigint, id4 bigint, id5 bigint, id6 bigint, id7 bigint,s1 varchar, s2 varchar, s3 varchar, s4 varchar, s5 varchar, s6 varchar, s7 varchar, s8 varchar, s9 varchar, s10 varchar, s11 varchar, s12 varchar, s13 varchar,d1 DECIMAL(20, 8), d2 DECIMAL(20, 8), d3 DECIMAL(20, 8), d4 DECIMAL(20, 8), d5 DECIMAL(20, 8), d6 DECIMAL(30, 8), d7 DECIMAL(30, 8), d8 DECIMAL(30, 8), d9 DECIMAL(30, 8), d10 DECIMAL(30, 8),insert_time datetime not null, insert_time1 datetime, insert_time2 datetime, insert_time3 datetime,i int) Engine = Random; + +statement ok +merge into test_order as t using (select id,34 as id1,238 as id2, id3, id4, id5, id6, id7,s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13,d1, d2, d3, d4, d5, d6, d7, d8, d9, d10,insert_time,insert_time1,insert_time2,insert_time3,i from random_source limit 1) as s on t.id = s.id and t.insert_time = s.insert_time when matched then update * when not matched then insert *; + +## test update list #13297 +statement ok +drop table if exists t11 + +statement ok +drop table if exists t12 + +statement ok +create table t11(a int,b string, c string) cluster by(a,b); + +statement ok +create table t12(a int,b string, c string) cluster by(a,b); + +statement ok +insert into t11 values(1,'b1','c1'),(2,'b2','c2'); + +statement ok +insert into t12 values(1,'b_5','c_5'),(3,'b_6','c_6'); + +statement error 1065 +merge into t11 using (select a, c from t12) as t12 on t11.a = t12.a when matched and max(t11.a) > 0 then update set c = t12.c; + +statement error 1065 +merge into t11 using (select a, c from t12) as t12 on t11.a = t12.a when matched then update set c = count(*); + +## test issue #13287 +statement ok +drop table if exists tt1 + +statement ok +create table tt1 (a int, b int) cluster by(a,b); + +statement error 1065 +merge into tt1 using(select 10, 20) as tt2 on tt1.a = 1 when not matched and tt1.b = 2 then insert values (10, 20); + +query TT +merge into tt1 using(select 10 as a, 20 as b) as tt2 on tt1.a = 1 when not matched and tt2.b = 2 then insert values (10, 20); +---- +0 + +query T +select count(*) from tt1; +---- +0 + +## test issue #13367 +statement ok +drop table if exists tt2 + +statement ok +create table tt2(a bool, b variant, c map(string, string)) cluster by(a); + +statement ok +insert into tt2 values (true, '10', {'k1':'v1'}), (false, '20', {'k2':'v2'}) + +query T +merge into tt2 using(select true as x) as t on (x and tt2.a) when matched and tt2.a then update set tt2.b = parse_json('30'); +---- +1 + +query TTT +select a, b, c from tt2 order by b; +---- +0 20 {'k2':'v2'} +1 30 {'k1':'v1'} + +## add test: source is table +statement ok +drop table if exists t1; + +statement ok +drop table if exists t2; + +statement ok +create table t1(a int) cluster by(a); + +statement ok +create table t2(a int) cluster by(a); + +statement ok +insert into t1 values(1); + +statement ok +insert into t2 values(1),(2); + +query TT +merge into t1 using t2 on t1.a = t2.a when matched then delete when not matched then insert *; +---- +1 1 + +query T +select * from t1; +---- +2 + +statement ok +drop table if exists t1; + +statement ok +drop table if exists t2; + +statement ok +create table t1(b int) cluster by(b); + +statement ok +create table t2(a int) cluster by(a); + +statement ok +insert into t1 values(1); + +statement ok +insert into t2 values(1),(2); + +statement error 1065 +merge into t1 using t2 on t1.a = t2.a when matched then delete when not matched then insert *; + +## add more multi matched statement test +statement ok +drop table if exists t1; + +statement ok +drop table if exists t2; + +statement ok +create table t1(a int,b string,c bool) cluster by(a,b); + +statement ok +create table t2(a int,b string,c bool) cluster by(a,b); + +statement ok +insert into t1 values(1,'a1',true),(2,'a2',false),(3,'a3',true); + +statement ok +insert into t2 values(1,'b1',true),(2,'b2',false),(3,'b3',true); + +query TTT +select * from t1; +---- +1 a1 1 +2 a2 0 +3 a3 1 + +query TTT +select * from t2; +---- +1 b1 1 +2 b2 0 +3 b3 1 + +query TT +merge into t1 using t2 on t1.a = t2.a when matched and t1.a = 1 then delete when matched and t1.a = 2 then update * when matched and t1.a = 3 then delete; +---- +1 2 + +query TTT +select * from t1; +---- +2 b2 0 + +query T +merge into t1 using t2 on t1.a = t2.a when matched then delete; +---- +1 + +query T +select count(*) from t1; +---- +0 + +statement ok +insert into t1 values(1,'a1',true),(2,'a2',false),(3,'a3',true); + +query TT +merge into t1 using t2 on t1.a = t2.a when matched and t1.a = 2 then update * when matched and t1.a = 1 then delete when matched and t1.a = 3 then update *; +---- +2 1 + +query TTT +select * from t1; +---- +2 b2 0 +3 b3 1 + +## issue 13454 +statement ok +drop table if exists tt1; + +statement ok +create table tt1(a bool, b int) cluster by(a,b); + +statement ok +insert into tt1 values (true, 1), (false, 2); + +query T +merge into tt1 using (select 1 as x) as tt2 on (2 > 1) when matched and a then delete; +---- +1 + +query TT +select * from tt1; +---- +0 2 + +## issue #13298 +statement ok +drop table if exists t11; + +statement ok +drop table if exists t12; + +statement ok +create table t12 (a int, b int) cluster by(a,b); + +statement ok +create table t11 (a int, b int) cluster by(a,b); + +statement ok +insert into t11 values (1, 10),(2, 20),(3, 30),(4, 40); + +statement ok +insert into t12 values (1, 10),(2, 20),(3, 30),(4, 40); + +query T +MERGE INTO t11 USING(SELECT NULL AS c0 FROM t12) AS t12 ON (t11.a OR TRUE) WHEN MATCHED AND TRUE THEN DELETE; +---- +4 + +query T +select count(*) from t11; +---- +0 + +## test issue #13732 +statement ok +CREATE TABLE orders CLUSTER BY (to_yyyymmddhh(created_at), user_id) AS SELECT + number % 5000 AS order_id, + number % 10000 AS user_id, + CASE WHEN (rand() * 10)::int % 2 = 0 THEN 'buy' + ELSE 'sell' + END AS order_type, + CASE WHEN (rand() * 10)::int % 3 = 0 THEN 'BTC' + WHEN (rand() * 10)::int % 3 = 1 THEN 'ETH' + ELSE 'XRP' + END AS asset_type, + (rand() * 100)::decimal(18, 8) AS quantity, + (rand() * 1000)::decimal(18, 8) AS price, + CASE WHEN (rand() * 10)::int % 3 = 0 THEN 'completed' + WHEN (rand() * 10)::int % 3 = 1 THEN 'pending' + ELSE 'cancelled' + END AS status, + date_add('day', floor(rand() * 10 % 365)::int, '2021-01-01') AS created_at, + date_add('day', floor(rand() * 10 % 365)::int, '2021-01-01') AS updated_at +FROM numbers(5000); + +### for now, we disable target_table_optimization for native. Native will +### spilt one block into multi pages. We should fix this one in the future. +statement ok +MERGE INTO orders USING +( + SELECT + number % 5000 AS order_id, + number % 100000 AS user_id, + CASE WHEN (rand() * 10)::int % 2 = 0 THEN 'buy' + ELSE 'sell' + END AS order_type, + CASE WHEN (rand() * 10)::int % 3 = 0 THEN 'BTC' + WHEN (rand() * 10)::int % 3 = 1 THEN 'ETH' + ELSE 'XRP' + END AS asset_type, + (rand() * 100)::decimal(18, 8) AS quantity, + (rand() * 1000)::decimal(18, 8) AS price, + CASE WHEN (rand() * 10)::int % 3 = 0 THEN 'completed' + WHEN (rand() * 10)::int % 3 = 1 THEN 'pending' + ELSE 'cancelled' + END AS status, + date_add('day', floor(rand() * 10 % 365)::int, '2021-01-01') AS created_at, + date_add('day', floor(rand() * 10 % 365)::int, '2021-01-01') AS updated_at + FROM numbers(5000) +) AS source +ON orders.order_id = source.order_id +WHEN MATCHED THEN + UPDATE SET + orders.user_id = source.user_id, + orders.order_type = source.order_type, + orders.asset_type = source.asset_type, + orders.quantity = source.quantity, + orders.price = source.price, + orders.status = source.status, + orders.created_at = source.created_at, + orders.updated_at = source.updated_at +WHEN NOT MATCHED THEN + INSERT (order_id, user_id, order_type, asset_type, quantity, price, status, created_at, updated_at) + VALUES (source.order_id, source.user_id, source.order_type, source.asset_type, source.quantity, source.price, source.status, source.created_at, source.updated_at); + +## test issue #13733 +statement ok +CREATE TABLE transactions CLUSTER BY (to_yyyymmddhh(transaction_time), user_id) AS SELECT + number % 1000000 AS transaction_id, + number % 100000 AS user_id, + CASE WHEN (rand() * 10)::int % 3 = 0 THEN 'deposit' + WHEN (rand() * 10)::int % 3 = 1 THEN 'withdrawal' + ELSE 'trade' +END AS transaction_type, + CASE WHEN (rand() * 10)::int % 3 = 0 THEN 'BTC' + WHEN (rand() * 10)::int % 3 = 1 THEN 'ETH' + ELSE 'XRP' +END AS asset_type, + (rand() * 100)::decimal(18, 8) AS quantity, + date_add('day', floor(rand() * 10 % 365)::int, '2021-01-01') AS transaction_time +FROM numbers(1000000); + +statement ok +MERGE INTO orders AS tt USING +( + SELECT + CASE + WHEN number % 2 = 0 THEN (number / 2) % 250000 + ELSE (SELECT MAX(order_id) FROM orders) + number + 1 + END AS order_id, + number % 100000 AS user_id, + CASE WHEN (rand() * 10)::int % 2 = 0 THEN 'buy' + ELSE 'sell' + END AS order_type, + CASE WHEN (rand() * 10)::int % 3 = 0 THEN 'BTC' + WHEN (rand() * 10)::int % 3 = 1 THEN 'ETH' + ELSE 'XRP' + END AS asset_type, + (rand() * 100)::decimal(18, 8) AS quantity, + (rand() * 1000)::decimal(18, 8) AS price, + CASE WHEN (rand() * 10)::int % 3 = 0 THEN 'completed' + WHEN (rand() * 10)::int % 3 = 1 THEN 'pending' + ELSE 'cancelled' + END AS status, + date_add('day', floor(rand() * 10 % 365)::int, '2021-01-01') AS created_at, + date_add('day', floor(rand() * 10 % 365)::int, '2021-01-01') AS updated_at, + CASE WHEN number % 2 = 0 THEN false ELSE true END AS is_delete + FROM numbers(5000) +) AS ss +ON (tt.user_id = ss.user_id AND tt.asset_type = ss.asset_type) +WHEN MATCHED AND ss.is_delete = true THEN + DELETE +WHEN MATCHED AND ss.is_delete = false THEN + UPDATE * WHEN NOT MATCHED THEN + INSERT *; + +## unsupport complex exprs for now. +## #13798 we need to support non-correlated-subquery for unmatched values exprs +statement error 1065 +MERGE INTO orders USING ( + SELECT t.user_id, t.asset_type, 'buy' AS synthetic_order_type, SUM(t.quantity) AS total_quantity, today() AS synthetic_date + FROM transactions t + WHERE t.transaction_type = 'deposit' + GROUP BY t.user_id, t.asset_type + HAVING SUM(t.quantity) > 100 +) AS synthetic_orders ON orders.user_id = synthetic_orders.user_id AND orders.asset_type = synthetic_orders.asset_type +WHEN NOT MATCHED THEN + INSERT (order_id, user_id, order_type, asset_type, quantity, price, status, created_at, updated_at) + VALUES ((SELECT MAX(order_id) FROM orders) + 1, synthetic_orders.user_id, synthetic_orders.synthetic_order_type, synthetic_orders.asset_type, synthetic_orders.total_quantity, 0, 'pending', synthetic_orders.synthetic_date, synthetic_orders.synthetic_date); + +## issue #13810: rewrite rule test +statement ok +DROP TABLE IF EXISTS orders; + +statement ok +CREATE TABLE orders ( + order_id INT NOT NULL, + user_id INT NOT NULL, + order_type VARCHAR NOT NULL, + asset_type VARCHAR NOT NULL, + quantity DECIMAL(18,8) NOT NULL, + price DECIMAL(18,8) NOT NULL, + status VARCHAR NOT NULL, + created_at DATE NOT NULL, + updated_at DATE NOT NULL +) row_per_block=5113; + +statement ok +insert into orders values(200007,7,'buy','BTC',4.81412194,48.14121943,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200015,15,'buy','BTC',3.78463552,37.84635523,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200019,19,'buy','BTC',1.61186913,16.11869132,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200031,31,'buy','BTC',3.99013730,39.90137297,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200047,47,'buy','BTC',0.98841829,9.88418289,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200077,77,'buy','BTC',2.07360391,20.73603908,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200087,87,'sell','ETH',9.64567442,96.45674419,'pending',to_date('2021-01-01'),to_date('2021-01-01')), +(200095,95,'buy','BTC',2.26686563,22.66865634,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200098,98,'buy','BTC',1.37252960,13.72529599,'completed',to_date('2021-01-01'),to_date('2021-01-01')), +(200102,102,'buy','BTC',1.53596481,15.35964815,'completed',to_date('2021-01-01'),to_date('2021-01-01')); + +query T +MERGE INTO orders USING ( + SELECT o.order_id, o.user_id, o.order_type, o.asset_type, o.quantity + a.avg_quantity AS new_quantity, o.price, o.status, o.created_at, o.updated_at + FROM orders o + INNER JOIN ( + SELECT user_id, asset_type, sum(quantity) AS avg_quantity + FROM orders + GROUP BY user_id, asset_type + ) a ON o.user_id = a.user_id AND o.asset_type = a.asset_type +) AS joined_data ON orders.order_id = joined_data.order_id + WHEN MATCHED THEN + UPDATE SET orders.quantity = joined_data.new_quantity; +---- +10 + +query TTTT +SELECT SUM(quantity) AS total_quantity, + AVG(quantity) AS average_quantity, + MIN(quantity) AS min_quantity, + MAX(quantity) AS max_quantity +FROM orders; +---- +64.16764110 6.416764110000 1.97683658 19.29134884 + +statement ok +create table tb_01 (id int,c1 varchar,c2 datetime(0),c3 json) cluster by(c1,c2); + +statement ok +create table tmp_01 like tb_01; + +statement ok +insert into tmp_01 values(1,'abc',to_date('2023-11-29'),parse_json('{"a":1}')); + +query TT +merge into tb_01 as T using ( select * from tmp_01) as S on t.id = s.id when matched then update * when not matched then insert *; +---- +1 0 + +query TTT +select id,c1,to_date(c2),c3 from tb_01; +---- +1 abc 2023-11-29 {"a":1} + +## test #issue13932 +statement ok +create table null_target(a int not null,b text) cluster by(a,b); + +statement ok +create table null_source(a int not null,b text) cluster by(a,b); + +statement ok +insert into null_target values(1,'a1'); + +statement ok +insert into null_target values(2,'a2'); + +statement ok +insert into null_source values(1,'a3'); + +statement ok +insert into null_source values(3,'a4'); + +statement error 1006 +merge into null_target using null_source on null_target.a = null_source.a when matched then update * +when not matched then insert (b) values(null_source.b); + +statement ok +delete from null_source where a = 3; + +query TT +merge into null_target using null_source on null_target.a = null_source.a when matched then update * +when not matched then insert (b) values(null_source.b); +---- +0 1 + +query TT +select * from null_target order by a,b; +---- +1 a3 +2 a2 + +## issue#13972 +statement ok +create table tt1_(a bool not null, b int not null, c int not null); + +statement ok +insert into tt1_ values(true, 10, 11),(false, 20, 21); + +query TT +MERGE INTO tt1_ USING + (SELECT + 657 AS cc0, + 658 AS cc1 + ) AS tRIA7K(cc0, cc1) ON ( + cc0 < cc1) + WHEN MATCHED AND FALSE THEN UPDATE SET + a = FALSE, + b = 332366211 + WHEN MATCHED AND a THEN DELETE + WHEN NOT MATCHED AND TRUE THEN INSERT (b, c) VALUES(10, 20); +---- +0 0 1 + +query TTT +select * from tt1_; +---- +0 20 21 + +## issue#14474 +statement ok +create table target_tt1 (a bool not null default true, b int not null default 1); + +query T +merge into target_tt1 using(select false, 10) as tt2(a, b) on (target_tt1.b = 1) when not matched then insert (b) values (20); +---- +1 + +query TT +select * from target_tt1; +---- +1 20 + +## test multi insert clauses with specified default values +statement ok +create table target_default_values(a int default 12,b string default 'yes'); + +statement ok +create table source_default_values(a int default 12,b string default 'yes'); + +statement ok +insert into target_default_values values(1,'a'); + +statement ok +insert into target_default_values values(2,'b'); + +statement ok +insert into source_default_values values(1,'c'); + +statement ok +insert into source_default_values values(3,'d'); + +statement ok +insert into source_default_values values(2,'e'); + +statement ok +insert into source_default_values values(4,'f'); + +query TTT +merge into target_default_values as t1 using source_default_values as t2 on t1.a = t2.a when matched and t1.b = 'a' +then update set t1.b = t2.b when matched then delete when not matched and t2.b = 'd' then insert (a) values(t2.a) when not matched +then insert(b) values(t2.b); +---- +2 1 1 + +query TT +select * from target_default_values order by a,b; +---- +1 c +3 yes +12 f + + +## test update column only optimization +statement ok +drop table if exists column_only_optimization_target; + +statement ok +drop table if exists column_only_optimization_source; + +statement ok +create table column_only_optimization_target(a int,b string); + +statement ok +create table column_only_optimization_source(a int,b string); + +statement ok +insert into column_only_optimization_target values(1,'a1'),(2,'a2'); + +statement ok +insert into column_only_optimization_target values(3,'a3'),(4,'a4'); + +statement ok +insert into column_only_optimization_target values(5,'a5'),(6,'a6'); + +statement ok +insert into column_only_optimization_target values(7,'a7'),(8,'a8'); + +query TT +select * from column_only_optimization_target order by a,b; +---- +1 a1 +2 a2 +3 a3 +4 a4 +5 a5 +6 a6 +7 a7 +8 a8 + +statement ok +insert into column_only_optimization_source values(1,'b1'),(2,'b2'); + +statement ok +insert into column_only_optimization_source values(3,'b3'),(4,'b4'); + +query TT +select * from column_only_optimization_source order by a,b; +---- +1 b1 +2 b2 +3 b3 +4 b4 + +query TT +merge into column_only_optimization_target as t1 using column_only_optimization_source as t2 on +t1.a = t2.a when matched then update set t1.b = t2.b when not matched then insert *; +---- +0 4 + +query TT +select * from column_only_optimization_target order by a,b; +---- +1 b1 +2 b2 +3 b3 +4 b4 +5 a5 +6 a6 +7 a7 +8 a8 + +## add more tests cases for distributed modes. +statement ok +CREATE TABLE IF NOT EXISTS lineitem_target_origin_200_blocks1 ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) CLUSTER BY(l_shipdate, l_orderkey); + +statement ok +CREATE TABLE IF NOT EXISTS lineitem_target_origin_400_blocks1 ( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) CLUSTER BY(l_shipdate, l_orderkey); + +statement ok +CREATE TABLE IF NOT EXISTS lineitem_random( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +) engine = random; + +## add 4w rows +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +query T +select count(*) from lineitem_target_origin_400_blocks1; +---- +40000 + +statement ok +insert into lineitem_target_origin_200_blocks1 select * from lineitem_target_origin_400_blocks1; + +query T +select count(*) from lineitem_target_origin_200_blocks1; +---- +40000 + +statement ok +insert into lineitem_target_origin_400_blocks1 select * from lineitem_random limit 5000; + +query T +select count(*) from lineitem_target_origin_400_blocks1; +---- +45000 + +## it maybe flaky test, but in most times, it's normal. +query TT +MERGE INTO lineitem_target_origin_400_blocks1 as t1 using lineitem_target_origin_200_blocks1 as t2 on +t1.l_orderkey = t2.l_orderkey and +t1.l_partkey = t2.l_partkey +and t1.l_suppkey = t2.l_suppkey and +t1.l_linenumber = t2.l_linenumber and +t1.l_quantity = t2.l_quantity and +t1.l_extendedprice = t2.l_extendedprice and +t1.l_discount = t2.l_discount +when matched then update * +when not matched then insert *; +---- +0 40000 + +statement ok +set enable_experimental_merge_into = 0; statement ok set enable_distributed_merge_into = 0; diff --git a/tests/suites/1_stateful/02_query/02_0000_kill_query.py b/tests/suites/1_stateful/02_query/02_0000_kill_query.py index f3dc069f6223..590a8821f1cc 100755 --- a/tests/suites/1_stateful/02_query/02_0000_kill_query.py +++ b/tests/suites/1_stateful/02_query/02_0000_kill_query.py @@ -37,7 +37,7 @@ res = mycursor.fetchone() kill_query = "kill query " + str(res[0]) + ";" mycursor.execute(kill_query) - time.sleep(0.5) + time.sleep(1) mycursor.execute( "SELECT * FROM system.processes WHERE extra_info LIKE '%SELECT max(number)%' AND extra_info NOT LIKE '%system.processes%';" )