diff --git a/Cargo.lock b/Cargo.lock index 1830a3fb05..ce8fd3f313 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1115,7 +1115,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "115e54d64eb62cdebad391c19efc9dce4981c690c85a33a12199d99bb9546fee" dependencies = [ "borsh-derive", - "hashbrown 0.12.3", + "hashbrown 0.13.2", ] [[package]] @@ -4896,6 +4896,17 @@ dependencies = [ "slog-global", ] +[[package]] +name = "papergrid" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7419ad52a7de9b60d33e11085a0fe3df1fbd5926aa3f93d3dd53afbc9e86725" +dependencies = [ + "bytecount", + "fnv", + "unicode-width", +] + [[package]] name = "parking" version = "2.0.0" @@ -7400,6 +7411,29 @@ dependencies = [ "time_ext", ] +[[package]] +name = "tabled" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c9303ee60b9bedf722012ea29ae3711ba13a67c9b9ae28993838b63057cb1b" +dependencies = [ + "papergrid", + "tabled_derive", +] + +[[package]] +name = "tabled_derive" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf0fb8bfdc709786c154e24a66777493fb63ae97e3036d914c8666774c477069" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -7880,8 +7914,10 @@ dependencies = [ "parquet_ext", "runtime", "table_engine", + "tabled", "time_ext", "tokio", + "wal", ] [[package]] @@ -8087,7 +8123,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "rand 0.8.5", "static_assertions", ] @@ -8136,9 +8172,9 @@ checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" [[package]] name = "unicode-width" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] name = "untrusted" diff --git a/Cargo.toml b/Cargo.toml index b6ca6273dd..1008359eb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -183,6 +183,7 @@ sqlparser = { version = "0.39.0", features = ["serde"] } system_catalog = { path = "src/system_catalog" } table_engine = { path = "src/table_engine" } table_kv = { path = "src/components/table_kv" } +tabled = "0.16.0" tempfile = "3.1.0" test_util = { path = "src/components/test_util" } time_ext = { path = "src/components/time_ext" } diff --git a/src/tools/Cargo.toml b/src/tools/Cargo.toml index 1a3231cb8f..76a36afbca 100644 --- a/src/tools/Cargo.toml +++ b/src/tools/Cargo.toml @@ -43,5 +43,7 @@ parquet = { workspace = true } parquet_ext = { workspace = true } runtime = { workspace = true } table_engine = { workspace = true } +tabled = { workspace = true } time_ext = { workspace = true } tokio = { workspace = true } +wal = { workspace = true, features = ["wal-local-storage"] } diff --git a/src/tools/src/bin/wal-reader.rs b/src/tools/src/bin/wal-reader.rs new file mode 100644 index 0000000000..51b9cd4700 --- /dev/null +++ b/src/tools/src/bin/wal-reader.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A cli to query region segment meta + +use std::{ + fs::{self}, + sync::Arc, +}; + +use clap::Parser; +use runtime::Builder; +use tabled::{Table, Tabled}; +use wal::local_storage_impl::segment::{Region, RegionManager, SegmentView}; + +#[derive(Parser, Debug)] +#[clap(author, version, about = "A command line tool to read and display WAL (Write-Ahead Log) segment metadata", long_about = None)] +struct Args { + /// Data directory path + #[clap(short, long, default_value = "/tmp/horaedb/wal")] + data_dir: String, + + /// Region id + #[clap(short = 'r', long, default_value = None)] + region_id: Option, + + /// Segment id + #[clap(short = 's', long, default_value = None)] + segment_id: Option, + + /// Table id + #[clap(long, default_value = None)] + table_id: Option, +} + +#[derive(Tabled)] +struct SegmentInfo { + segment_id: u64, + min_seq: u64, + max_seq: u64, + version: u8, + current_size: usize, + segment_size: usize, + number_of_records: usize, +} + +#[derive(Tabled)] +struct TableInfo { + table_id: u64, + min_seq: u64, + max_seq: u64, +} + +impl SegmentInfo { + fn load(stm: &SegmentView) -> Self { + Self { + segment_id: stm.id, + min_seq: stm.min_seq, + max_seq: stm.max_seq, + version: stm.version, + current_size: stm.current_size, + segment_size: stm.segment_size, + number_of_records: stm.number_of_records, + } + } +} + +const SEGMENT_SIZE: usize = 64 * 1024 * 1024; + +impl TableInfo { + fn load(stm: &SegmentView, table_id: &Option) -> Vec { + let mut datas = Vec::new(); + for (t_id, (min_seq, max_seq)) in stm.tables.iter() { + if table_id.is_some() && table_id.unwrap() != *t_id { + continue; + } + datas.push(TableInfo { + table_id: *t_id, + min_seq: *min_seq, + max_seq: *max_seq, + }); + } + datas + } +} + +fn region_meta_dump(region: Arc, segment_id: &Option, table_id: &Option) { + let segments = region.meta(); + for stm in segments.iter() { + if segment_id.is_some() && segment_id.unwrap() != stm.id { + continue; + } + println!("{}", "-".repeat(94)); + let pretty_segment = Table::new([SegmentInfo::load(stm)]); + println!("{}", pretty_segment); + let pretty_table = Table::new(TableInfo::load(stm, table_id)); + println!("{}", pretty_table); + } +} + +fn pretty_error_then_exit(err_msg: &str) { + eprintln!("\x1b[31m{}\x1b[0m", err_msg); + std::process::exit(1); +} + +fn main() { + let args = Args::parse(); + println!("Data directory: {}", args.data_dir); + + if !std::path::Path::new(&args.data_dir).is_dir() { + pretty_error_then_exit( + format!("Error: Data directory '{}' does not exist", &args.data_dir).as_str(), + ); + } + + let runtime = Arc::new(Builder::default().build().unwrap()); + let region_manager = RegionManager::new(args.data_dir.clone(), 32, SEGMENT_SIZE, runtime); + let region_manager = match region_manager { + Ok(v) => v, + Err(e) => { + pretty_error_then_exit(format!("Error: {}", e).as_str()); + unreachable!(); + } + }; + + if let Some(region_id) = args.region_id { + let region = region_manager.get_region(region_id); + region_meta_dump(region.unwrap(), &args.segment_id, &args.table_id); + } else { + for entry in fs::read_dir(&args.data_dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + + if path.is_file() { + continue; + } + + if let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) { + // Parse region id from directory name + if let Ok(region_id) = dir_name.parse::() { + let region = region_manager.get_region(region_id); + region_meta_dump(region.unwrap(), &args.segment_id, &args.table_id); + } + } + } + } + + region_manager.close_all().unwrap(); +} diff --git a/src/wal/src/local_storage_impl/mod.rs b/src/wal/src/local_storage_impl/mod.rs index 7bb5bf6374..d516bddb67 100644 --- a/src/wal/src/local_storage_impl/mod.rs +++ b/src/wal/src/local_storage_impl/mod.rs @@ -17,5 +17,5 @@ pub mod config; mod record_encoding; -mod segment; +pub mod segment; pub mod wal_manager; diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index 8fa0915b12..c91f2cf431 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -615,6 +615,32 @@ impl SegmentManager { } } +pub struct SegmentView { + pub id: u64, + pub min_seq: SequenceNumber, + pub max_seq: SequenceNumber, + pub version: u8, + pub current_size: usize, + pub segment_size: usize, + pub number_of_records: usize, + pub tables: HashMap, +} + +impl SegmentView { + fn new(seg: &Segment) -> Self { + Self { + id: seg.id, + min_seq: seg.min_seq, + max_seq: seg.max_seq, + version: seg.version, + current_size: seg.current_size, + segment_size: seg.segment_size, + number_of_records: seg.record_position.len(), + tables: seg.table_ranges.clone(), + } + } +} + #[derive(Debug)] pub struct Region { /// Identifier for regions. @@ -785,6 +811,17 @@ impl Region { Ok(next_sequence_num - 1) } + pub fn meta(&self) -> Vec { + let mut segments: Vec = Vec::with_capacity(10); + let all_segments = self.segment_manager.all_segments.lock().unwrap(); + for lock_seg in all_segments.values() { + let seg = lock_seg.lock().unwrap(); + let stm = SegmentView::new(&seg); + segments.push(stm); + } + segments + } + pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) -> Result { // Check read range's validity. let start = if let Some(start) = req.start.as_start_sequence_number() { @@ -922,7 +959,7 @@ impl RegionManager { /// Retrieve a region by its `region_id`. If the region does not exist, /// create a new one. - fn get_region(&self, region_id: RegionId) -> Result> { + pub fn get_region(&self, region_id: RegionId) -> Result> { let mut regions = self.regions.lock().unwrap(); if let Some(region) = regions.get(®ion_id) { return Ok(region.clone());