From 4cbe82ce8049d4a4a8a5c19f804bfc97294515bf Mon Sep 17 00:00:00 2001
From: MianChen <283559115@qq.com>
Date: Fri, 1 Nov 2024 07:14:19 -0700
Subject: [PATCH] feat: add a cli tool to read wal meta information (#1584)
## Rationale
#1567
## Detailed Changes
A cli tool to decode wal segment.
## Test Plan
Manual test
---
Cargo.lock | 44 +++++-
Cargo.toml | 1 +
src/tools/Cargo.toml | 2 +
src/tools/src/bin/wal-reader.rs | 163 ++++++++++++++++++++++
src/wal/src/local_storage_impl/mod.rs | 2 +-
src/wal/src/local_storage_impl/segment.rs | 39 +++++-
6 files changed, 245 insertions(+), 6 deletions(-)
create mode 100644 src/tools/src/bin/wal-reader.rs
diff --git a/Cargo.lock b/Cargo.lock
index 1830a3fb05..ce8fd3f313 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1115,7 +1115,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "115e54d64eb62cdebad391c19efc9dce4981c690c85a33a12199d99bb9546fee"
dependencies = [
"borsh-derive",
- "hashbrown 0.12.3",
+ "hashbrown 0.13.2",
]
[[package]]
@@ -4896,6 +4896,17 @@ dependencies = [
"slog-global",
]
+[[package]]
+name = "papergrid"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c7419ad52a7de9b60d33e11085a0fe3df1fbd5926aa3f93d3dd53afbc9e86725"
+dependencies = [
+ "bytecount",
+ "fnv",
+ "unicode-width",
+]
+
[[package]]
name = "parking"
version = "2.0.0"
@@ -7400,6 +7411,29 @@ dependencies = [
"time_ext",
]
+[[package]]
+name = "tabled"
+version = "0.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77c9303ee60b9bedf722012ea29ae3711ba13a67c9b9ae28993838b63057cb1b"
+dependencies = [
+ "papergrid",
+ "tabled_derive",
+]
+
+[[package]]
+name = "tabled_derive"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf0fb8bfdc709786c154e24a66777493fb63ae97e3036d914c8666774c477069"
+dependencies = [
+ "heck",
+ "proc-macro-error",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
[[package]]
name = "tagptr"
version = "0.2.0"
@@ -7880,8 +7914,10 @@ dependencies = [
"parquet_ext",
"runtime",
"table_engine",
+ "tabled",
"time_ext",
"tokio",
+ "wal",
]
[[package]]
@@ -8087,7 +8123,7 @@ version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
- "cfg-if 0.1.10",
+ "cfg-if 1.0.0",
"rand 0.8.5",
"static_assertions",
]
@@ -8136,9 +8172,9 @@ checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36"
[[package]]
name = "unicode-width"
-version = "0.1.10"
+version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b"
+checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85"
[[package]]
name = "untrusted"
diff --git a/Cargo.toml b/Cargo.toml
index b6ca6273dd..1008359eb2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -183,6 +183,7 @@ sqlparser = { version = "0.39.0", features = ["serde"] }
system_catalog = { path = "src/system_catalog" }
table_engine = { path = "src/table_engine" }
table_kv = { path = "src/components/table_kv" }
+tabled = "0.16.0"
tempfile = "3.1.0"
test_util = { path = "src/components/test_util" }
time_ext = { path = "src/components/time_ext" }
diff --git a/src/tools/Cargo.toml b/src/tools/Cargo.toml
index 1a3231cb8f..76a36afbca 100644
--- a/src/tools/Cargo.toml
+++ b/src/tools/Cargo.toml
@@ -43,5 +43,7 @@ parquet = { workspace = true }
parquet_ext = { workspace = true }
runtime = { workspace = true }
table_engine = { workspace = true }
+tabled = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
+wal = { workspace = true, features = ["wal-local-storage"] }
diff --git a/src/tools/src/bin/wal-reader.rs b/src/tools/src/bin/wal-reader.rs
new file mode 100644
index 0000000000..51b9cd4700
--- /dev/null
+++ b/src/tools/src/bin/wal-reader.rs
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A cli to query region segment meta
+
+use std::{
+ fs::{self},
+ sync::Arc,
+};
+
+use clap::Parser;
+use runtime::Builder;
+use tabled::{Table, Tabled};
+use wal::local_storage_impl::segment::{Region, RegionManager, SegmentView};
+
+#[derive(Parser, Debug)]
+#[clap(author, version, about = "A command line tool to read and display WAL (Write-Ahead Log) segment metadata", long_about = None)]
+struct Args {
+ /// Data directory path
+ #[clap(short, long, default_value = "/tmp/horaedb/wal")]
+ data_dir: String,
+
+ /// Region id
+ #[clap(short = 'r', long, default_value = None)]
+ region_id: Option,
+
+ /// Segment id
+ #[clap(short = 's', long, default_value = None)]
+ segment_id: Option,
+
+ /// Table id
+ #[clap(long, default_value = None)]
+ table_id: Option,
+}
+
+#[derive(Tabled)]
+struct SegmentInfo {
+ segment_id: u64,
+ min_seq: u64,
+ max_seq: u64,
+ version: u8,
+ current_size: usize,
+ segment_size: usize,
+ number_of_records: usize,
+}
+
+#[derive(Tabled)]
+struct TableInfo {
+ table_id: u64,
+ min_seq: u64,
+ max_seq: u64,
+}
+
+impl SegmentInfo {
+ fn load(stm: &SegmentView) -> Self {
+ Self {
+ segment_id: stm.id,
+ min_seq: stm.min_seq,
+ max_seq: stm.max_seq,
+ version: stm.version,
+ current_size: stm.current_size,
+ segment_size: stm.segment_size,
+ number_of_records: stm.number_of_records,
+ }
+ }
+}
+
+const SEGMENT_SIZE: usize = 64 * 1024 * 1024;
+
+impl TableInfo {
+ fn load(stm: &SegmentView, table_id: &Option) -> Vec {
+ let mut datas = Vec::new();
+ for (t_id, (min_seq, max_seq)) in stm.tables.iter() {
+ if table_id.is_some() && table_id.unwrap() != *t_id {
+ continue;
+ }
+ datas.push(TableInfo {
+ table_id: *t_id,
+ min_seq: *min_seq,
+ max_seq: *max_seq,
+ });
+ }
+ datas
+ }
+}
+
+fn region_meta_dump(region: Arc, segment_id: &Option, table_id: &Option) {
+ let segments = region.meta();
+ for stm in segments.iter() {
+ if segment_id.is_some() && segment_id.unwrap() != stm.id {
+ continue;
+ }
+ println!("{}", "-".repeat(94));
+ let pretty_segment = Table::new([SegmentInfo::load(stm)]);
+ println!("{}", pretty_segment);
+ let pretty_table = Table::new(TableInfo::load(stm, table_id));
+ println!("{}", pretty_table);
+ }
+}
+
+fn pretty_error_then_exit(err_msg: &str) {
+ eprintln!("\x1b[31m{}\x1b[0m", err_msg);
+ std::process::exit(1);
+}
+
+fn main() {
+ let args = Args::parse();
+ println!("Data directory: {}", args.data_dir);
+
+ if !std::path::Path::new(&args.data_dir).is_dir() {
+ pretty_error_then_exit(
+ format!("Error: Data directory '{}' does not exist", &args.data_dir).as_str(),
+ );
+ }
+
+ let runtime = Arc::new(Builder::default().build().unwrap());
+ let region_manager = RegionManager::new(args.data_dir.clone(), 32, SEGMENT_SIZE, runtime);
+ let region_manager = match region_manager {
+ Ok(v) => v,
+ Err(e) => {
+ pretty_error_then_exit(format!("Error: {}", e).as_str());
+ unreachable!();
+ }
+ };
+
+ if let Some(region_id) = args.region_id {
+ let region = region_manager.get_region(region_id);
+ region_meta_dump(region.unwrap(), &args.segment_id, &args.table_id);
+ } else {
+ for entry in fs::read_dir(&args.data_dir).unwrap() {
+ let entry = entry.unwrap();
+ let path = entry.path();
+
+ if path.is_file() {
+ continue;
+ }
+
+ if let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) {
+ // Parse region id from directory name
+ if let Ok(region_id) = dir_name.parse::() {
+ let region = region_manager.get_region(region_id);
+ region_meta_dump(region.unwrap(), &args.segment_id, &args.table_id);
+ }
+ }
+ }
+ }
+
+ region_manager.close_all().unwrap();
+}
diff --git a/src/wal/src/local_storage_impl/mod.rs b/src/wal/src/local_storage_impl/mod.rs
index 7bb5bf6374..d516bddb67 100644
--- a/src/wal/src/local_storage_impl/mod.rs
+++ b/src/wal/src/local_storage_impl/mod.rs
@@ -17,5 +17,5 @@
pub mod config;
mod record_encoding;
-mod segment;
+pub mod segment;
pub mod wal_manager;
diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs
index 8fa0915b12..c91f2cf431 100644
--- a/src/wal/src/local_storage_impl/segment.rs
+++ b/src/wal/src/local_storage_impl/segment.rs
@@ -615,6 +615,32 @@ impl SegmentManager {
}
}
+pub struct SegmentView {
+ pub id: u64,
+ pub min_seq: SequenceNumber,
+ pub max_seq: SequenceNumber,
+ pub version: u8,
+ pub current_size: usize,
+ pub segment_size: usize,
+ pub number_of_records: usize,
+ pub tables: HashMap,
+}
+
+impl SegmentView {
+ fn new(seg: &Segment) -> Self {
+ Self {
+ id: seg.id,
+ min_seq: seg.min_seq,
+ max_seq: seg.max_seq,
+ version: seg.version,
+ current_size: seg.current_size,
+ segment_size: seg.segment_size,
+ number_of_records: seg.record_position.len(),
+ tables: seg.table_ranges.clone(),
+ }
+ }
+}
+
#[derive(Debug)]
pub struct Region {
/// Identifier for regions.
@@ -785,6 +811,17 @@ impl Region {
Ok(next_sequence_num - 1)
}
+ pub fn meta(&self) -> Vec {
+ let mut segments: Vec = Vec::with_capacity(10);
+ let all_segments = self.segment_manager.all_segments.lock().unwrap();
+ for lock_seg in all_segments.values() {
+ let seg = lock_seg.lock().unwrap();
+ let stm = SegmentView::new(&seg);
+ segments.push(stm);
+ }
+ segments
+ }
+
pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) -> Result {
// Check read range's validity.
let start = if let Some(start) = req.start.as_start_sequence_number() {
@@ -922,7 +959,7 @@ impl RegionManager {
/// Retrieve a region by its `region_id`. If the region does not exist,
/// create a new one.
- fn get_region(&self, region_id: RegionId) -> Result> {
+ pub fn get_region(&self, region_id: RegionId) -> Result> {
let mut regions = self.regions.lock().unwrap();
if let Some(region) = regions.get(®ion_id) {
return Ok(region.clone());