Skip to content

Commit

Permalink
tikv-ctl: improve scan and fix a bug for mvcc (tikv#2964)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Apr 25, 2018
1 parent f01d5ee commit 9b1718e
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 38 deletions.
105 changes: 69 additions & 36 deletions src/bin/tikv-ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,27 +203,38 @@ trait DebugExecutor {
}
}

/// Dump mvcc infos for a given key range. The given `from` and `to` must
/// be raw key with `z` prefix. Both `to` and `limit` are empty value means
/// what we want is point query instead of range scan.
fn dump_mvccs_infos(
&self,
from: Vec<u8>,
to: Option<Vec<u8>>,
limit: Option<u64>,
to: Vec<u8>,
mut limit: u64,
cfs: Vec<&str>,
start_ts: Option<u64>,
commit_ts: Option<u64>,
) {
let to = to.unwrap_or_default();
let limit = limit.unwrap_or_default();
if to.is_empty() && limit == 0 {
eprintln!(r#"please pass "to" or "limit""#);
if !from.starts_with(b"z") || (!to.is_empty() && !to.starts_with(b"z")) {
eprintln!("from and to should start with \"z\"");
process::exit(-1);
}
if !to.is_empty() && to < from {
eprintln!("The region's from pos must greater than the to pos.");
eprintln!("\"to\" must be greater than \"from\"");
process::exit(-1);
}
let scan_future = self.get_mvcc_infos(from, to, limit)
.for_each(move |(key, mvcc)| {

let point_query = to.is_empty() && limit == 0;
if point_query {
limit = 1;
}

let scan_future = self.get_mvcc_infos(from.clone(), to, limit).for_each(
move |(key, mvcc)| {
if point_query && key != from {
println!("no mvcc infos for {}", escape(&from));
}

println!("key: {}", escape(&key));
if cfs.contains(&CF_LOCK) && mvcc.has_lock() {
let lock_info = mvcc.get_lock();
Expand All @@ -249,7 +260,8 @@ trait DebugExecutor {
}
println!();
future::ok::<(), String>(())
});
},
);
if let Err(e) = scan_future.wait() {
eprintln!("{}", e);
process::exit(-1);
Expand Down Expand Up @@ -728,6 +740,8 @@ impl DebugExecutor for Debugger {

#[allow(cyclomatic_complexity)]
fn main() {
let raw_key_hint: &'static str = "raw key (generally starts with \"z\") in escaped form";

let mut app = App::new("TiKV Ctl")
.author("PingCAP")
.about("Distributed transactional key value database powered by Rust and Raft")
Expand Down Expand Up @@ -824,7 +838,7 @@ fn main() {
.conflicts_with_all(&["region", "index"])
.short("k")
.takes_value(true)
.help("set the raw key, in escaped form"),
.help(raw_key_hint)
),
)
.subcommand(
Expand Down Expand Up @@ -870,17 +884,18 @@ fn main() {
.about("print the range db range")
.arg(
Arg::with_name("from")
.required(true)
.short("f")
.long("from")
.takes_value(true)
.help("set the scan from raw key, in escaped format"),
.help(raw_key_hint)
)
.arg(
Arg::with_name("to")
.short("t")
.long("to")
.takes_value(true)
.help("set the scan end raw key, in escaped format"),
.help(raw_key_hint)
)
.arg(
Arg::with_name("limit")
Expand All @@ -901,8 +916,8 @@ fn main() {
.help("set the scan commit_ts as filter"),
)
.arg(
Arg::with_name("cf")
.long("cf")
Arg::with_name("show-cf")
.long("show-cf")
.takes_value(true)
.multiple(true)
.use_delimiter(true)
Expand All @@ -927,21 +942,22 @@ fn main() {
.required(true)
.short("k")
.takes_value(true)
.help("set the query raw key, in escaped form"),
.help(raw_key_hint)
),
)
.subcommand(
SubCommand::with_name("mvcc")
.about("print the mvcc value")
.arg(
Arg::with_name("key")
.required(true)
.short("k")
.takes_value(true)
.help("set the query raw key, in escaped form"),
.help(raw_key_hint)
)
.arg(
Arg::with_name("cf")
.short("c")
Arg::with_name("show-cf")
.long("show-cf")
.takes_value(true)
.multiple(true)
.use_delimiter(true)
Expand All @@ -968,18 +984,23 @@ fn main() {
.about("diff two region keys")
.arg(
Arg::with_name("region")
.required(true)
.short("r")
.takes_value(true)
.help("specify region id"),
)
.arg(
Arg::with_name("to_db")
.required_unless("to_host")
.conflicts_with("to_host")
.long("to-db")
.takes_value(true)
.help("to which db path"),
)
.arg(
Arg::with_name("to_host")
.required_unless("to_db")
.conflicts_with("to_db")
.long("to-host")
.takes_value(true)
.conflicts_with("to_db")
Expand Down Expand Up @@ -1008,14 +1029,14 @@ fn main() {
.short("f")
.long("from")
.takes_value(true)
.help("set the start raw key, in escaped form"),
.help(raw_key_hint)
)
.arg(
Arg::with_name("to")
.short("t")
.long("to")
.takes_value(true)
.help("set the end raw key, in escaped form"),
.help(raw_key_hint)
)
.arg(
Arg::with_name("region")
Expand Down Expand Up @@ -1128,12 +1149,14 @@ fn main() {
.about("modify tikv config, eg. ./tikv-ctl -h ip:port -m kvdb -n default.disable_auto_compactions -v true")
.arg(
Arg::with_name("module")
.required(true)
.short("m")
.takes_value(true)
.help("module of the tikv, eg. kvdb or raftdb"),
)
.arg(
Arg::with_name("config_name")
.required(true)
.short("n")
.takes_value(true)
.help("config name of the module, for kvdb or raftdb, you can choose \
Expand All @@ -1143,17 +1166,22 @@ fn main() {
)
.arg(
Arg::with_name("config_value")
.required(true)
.short("v")
.takes_value(true)
.help("config value of the module, eg. 8 for max_background_jobs or true for disable_auto_compactions"),
),
)
.subcommand(
SubCommand::with_name("dump-snap-meta").about("dump snapshot meta file")
.arg(Arg::with_name("file")
.short("f").long("file")
.takes_value(true)
.help("meta file path"))
.arg(
Arg::with_name("file")
.required(true)
.short("f")
.long("file")
.takes_value(true)
.help("meta file path"),
),
);
let matches = app.clone().get_matches();

Expand Down Expand Up @@ -1218,18 +1246,26 @@ fn main() {
}
} else if let Some(matches) = matches.subcommand_matches("scan") {
let from = unescape(matches.value_of("from").unwrap());
let to = matches.value_of("to").map(|to| unescape(to));
let limit = matches.value_of("limit").map(|s| s.parse().unwrap());
let cfs = Vec::from_iter(matches.values_of("cf").unwrap());
let to = matches
.value_of("to")
.map_or_else(|| vec![], |to| unescape(to));
let limit = matches
.value_of("limit")
.map_or(0, |s| s.parse().expect("parse u64"));
if to.is_empty() && limit == 0 {
eprintln!(r#"please pass "to" or "limit""#);
process::exit(-1);
}
let cfs = Vec::from_iter(matches.values_of("show-cf").unwrap());
let start_ts = matches.value_of("start_ts").map(|s| s.parse().unwrap());
let commit_ts = matches.value_of("commit_ts").map(|s| s.parse().unwrap());
debug_executor.dump_mvccs_infos(from, to, limit, cfs, start_ts, commit_ts);
} else if let Some(matches) = matches.subcommand_matches("mvcc") {
let from = unescape(matches.value_of("key").unwrap());
let cfs = Vec::from_iter(matches.values_of("cf").unwrap());
let cfs = Vec::from_iter(matches.values_of("show-cf").unwrap());
let start_ts = matches.value_of("start_ts").map(|s| s.parse().unwrap());
let commit_ts = matches.value_of("commit_ts").map(|s| s.parse().unwrap());
debug_executor.dump_mvccs_infos(from, None, Some(1), cfs, start_ts, commit_ts);
debug_executor.dump_mvccs_infos(from, vec![], 0, cfs, start_ts, commit_ts);
} else if let Some(matches) = matches.subcommand_matches("diff") {
let region = matches.value_of("region").unwrap().parse().unwrap();
let to_db = matches.value_of("to_db");
Expand Down Expand Up @@ -1318,13 +1354,10 @@ fn get_module_type(module: &str) -> MODULE {

fn from_hex(key: &str) -> Vec<u8> {
const HEX_PREFIX: &str = "0x";
let mut s = String::from(key);
if s.starts_with(HEX_PREFIX) {
let len = s.len();
let new_len = len.saturating_sub(HEX_PREFIX.len());
s.truncate(new_len);
if key.starts_with(HEX_PREFIX) {
return key[2..].from_hex().unwrap();
}
s.as_str().from_hex().unwrap()
key.from_hex().unwrap()
}

fn convert_gbmb(mut bytes: u64) -> String {
Expand Down
10 changes: 10 additions & 0 deletions src/server/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,13 @@ impl Debugger {
}
}

/// Scan MVCC Infos for given range `[start, end)`.
pub fn scan_mvcc(&self, start: &[u8], end: &[u8], limit: u64) -> Result<MvccInfoIterator> {
if !start.starts_with(b"z") || (!end.is_empty() && !end.starts_with(b"z")) {
return Err(Error::InvalidArgument(
"start and end should start with \"z\"".to_owned(),
));
}
if end.is_empty() && limit == 0 {
return Err(Error::InvalidArgument("no limit and to_key".to_owned()));
}
Expand Down Expand Up @@ -1204,6 +1210,10 @@ mod tests {
count += 1;
}
assert_eq!(count, 7);

// Test scan with bad start, end or limit.
assert!(debugger.scan_mvcc(b"z", b"", 0).is_err());
assert!(debugger.scan_mvcc(b"z", b"x", 3).is_err());
}

#[test]
Expand Down
3 changes: 1 addition & 2 deletions src/server/service/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ impl<T: RaftStoreRouter + 'static + Send> debugpb_grpc::Debug for Service<T> {
let future = future::result(debugger.scan_mvcc(&from, &to, limit))
.map_err(|e| error_to_grpc_error("scan_mvcc", e))
.and_then(|iter| {
#[allow(deprecated)]
stream::iter(iter)
stream::iter_result(iter)
.map_err(|e| error_to_grpc_error("scan_mvcc", e))
.map(|(key, mvcc_info)| {
let mut resp = ScanMvccResponse::new();
Expand Down

0 comments on commit 9b1718e

Please sign in to comment.