diff --git a/Cargo.lock b/Cargo.lock index 0074b781906..0a43ef6def3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,7 +461,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.1" -source = "git+https://github.com/pingcap/kvproto.git#1c5dddd57fe7b3ceac57b5107afbbf4539fb97bc" +source = "git+https://github.com/pingcap/kvproto.git#dc0b3b3e57761a9620639c28687e7ee4d5ee0c2a" dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "grpcio 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -502,7 +502,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "librocksdb_sys" version = "0.1.0" -source = "git+https://github.com/pingcap/rust-rocksdb.git#7e21e222c8dcb9729a61222995dffc3e836194da" +source = "git+https://github.com/pingcap/rust-rocksdb.git#1fc594481da84ceb515e5a7380ccc130d2d2c733" dependencies = [ "bzip2-sys 0.1.6 (git+https://github.com/alexcrichton/bzip2-rs.git)", "cc 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -898,7 +898,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rocksdb" version = "0.3.0" -source = "git+https://github.com/pingcap/rust-rocksdb.git#7e21e222c8dcb9729a61222995dffc3e836194da" +source = "git+https://github.com/pingcap/rust-rocksdb.git#1fc594481da84ceb515e5a7380ccc130d2d2c733" dependencies = [ "crc 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/etc/config-template.toml b/etc/config-template.toml index b8b30b08e66..70a87d1e4d4 100644 --- a/etc/config-template.toml +++ b/etc/config-template.toml @@ -442,6 +442,8 @@ # key-path = "" [import] +# the directory to store importing kv data. +# import-dir = "/tmp/tikv/import" # number of threads to handle RPC requests. # num-threads = 8 # stream channel window size, stream will be blocked on channel full. diff --git a/src/bin/signal_handler.rs b/src/bin/signal_handler.rs index 52d06c012df..81cb5915499 100644 --- a/src/bin/signal_handler.rs +++ b/src/bin/signal_handler.rs @@ -20,7 +20,7 @@ mod imp { use tikv::raftstore::store::Engines; use tikv::util::{metrics, rocksdb_stats}; - pub fn handle_signal(engines: Engines) { + pub fn handle_signal(engines: Option) { use signal::trap::Trap; use nix::sys::signal::{SIGUSR1, SIGUSR2, SIGHUP, SIGINT, SIGTERM}; let trap = Trap::trap(&[SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2]); @@ -33,8 +33,10 @@ mod imp { SIGUSR1 => { // Use SIGUSR1 to log metrics. info!("{}", metrics::dump()); - info!("{:?}", rocksdb_stats::dump(&engines.kv_engine)); - info!("{:?}", rocksdb_stats::dump(&engines.raft_engine)); + if let Some(ref engines) = engines { + info!("{:?}", rocksdb_stats::dump(&engines.kv_engine)); + info!("{:?}", rocksdb_stats::dump(&engines.raft_engine)); + } } SIGUSR2 => profiling::dump_prof(None), // TODO: handle more signal @@ -48,7 +50,7 @@ mod imp { mod imp { use tikv::raftstore::store::Engines; - pub fn handle_signal(_: Engines) {} + pub fn handle_signal(_: Option) {} } pub use self::imp::handle_signal; diff --git a/src/bin/tikv-server.rs b/src/bin/tikv-server.rs index 7ac256869d9..6b46f6a63bc 100644 --- a/src/bin/tikv-server.rs +++ b/src/bin/tikv-server.rs @@ -72,7 +72,7 @@ use tikv::raftstore::coprocessor::CoprocessorHost; use tikv::pd::{PdClient, RpcClient}; use tikv::util::time::Monitor; use tikv::util::rocksdb::metrics_flusher::{MetricsFlusher, DEFAULT_FLUSHER_INTERVAL}; -use tikv::import::{ImportSSTService, SSTImporter}; +use tikv::import::{ImportKVServer, ImportSSTService, SSTImporter}; const RESERVED_OPEN_FDS: u64 = 1000; @@ -283,7 +283,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc Config { Config { + import_dir: "/tmp/tikv/import".to_owned(), num_threads: 8, stream_channel_window: 128, } diff --git a/src/import/engine.rs b/src/import/engine.rs new file mode 100644 index 00000000000..296f3cd273b --- /dev/null +++ b/src/import/engine.rs @@ -0,0 +1,172 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp; +use std::i32; +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; +use std::path::Path; + +use uuid::Uuid; + +use rocksdb::{BlockBasedOptions, ColumnFamilyOptions, DBOptions, Writable, WriteBatch as RawBatch, + DB}; +use kvproto::importpb::*; + +use config::DbConfig; +use storage::CF_DEFAULT; +use storage::types::Key; +use util::config::MB; +use util::rocksdb::{new_engine_opt, CFOptions}; +use util::rocksdb::properties::SizePropertiesCollectorFactory; + +use super::Result; + +pub struct Engine { + db: Arc, + uuid: Uuid, +} + +impl Engine { + pub fn new>(path: P, uuid: Uuid, opts: DbConfig) -> Result { + let db = { + let (db_opts, cf_opts) = tune_dboptions_for_bulk_load(&opts); + new_engine_opt(path.as_ref().to_str().unwrap(), db_opts, vec![cf_opts])? + }; + Ok(Engine { + db: Arc::new(db), + uuid: uuid, + }) + } + + pub fn uuid(&self) -> Uuid { + self.uuid + } + + pub fn write(&self, mut batch: WriteBatch) -> Result { + // Just a guess. + let wb_cap = cmp::min(batch.get_mutations().len() * 128, MB as usize); + let wb = RawBatch::with_capacity(wb_cap); + let commit_ts = batch.get_commit_ts(); + for m in batch.take_mutations().iter_mut() { + let k = Key::from_raw(m.get_key()).append_ts(commit_ts); + wb.put(k.encoded(), m.get_value()).unwrap(); + } + + let size = wb.data_size(); + self.write_without_wal(wb)?; + + Ok(size) + } +} + +impl Deref for Engine { + type Target = DB; + + fn deref(&self) -> &Self::Target { + &self.db + } +} + +impl fmt::Debug for Engine { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Engine") + .field("uuid", &self.uuid()) + .field("path", &self.path().to_owned()) + .finish() + } +} + +fn tune_dboptions_for_bulk_load(opts: &DbConfig) -> (DBOptions, CFOptions) { + const DISABLED: i32 = i32::MAX; + + let mut db_opts = DBOptions::new(); + db_opts.create_if_missing(true); + db_opts.enable_statistics(false); + // Vector memtable doesn't support concurrent write. + db_opts.allow_concurrent_memtable_write(false); + db_opts.set_use_direct_io_for_flush_and_compaction(true); + // RocksDB preserves `max_background_jobs/4` for flush. + db_opts.set_max_background_jobs(opts.max_background_jobs); + + let mut block_base_opts = BlockBasedOptions::new(); + // Use a large block size for sequential access. + block_base_opts.set_block_size(MB as usize); + let mut cf_opts = ColumnFamilyOptions::new(); + cf_opts.set_block_based_table_factory(&block_base_opts); + cf_opts.compression_per_level(&opts.defaultcf.compression_per_level); + // Consider using a large write buffer but be careful about OOM. + cf_opts.set_write_buffer_size(opts.defaultcf.write_buffer_size.0); + cf_opts.set_target_file_size_base(opts.defaultcf.write_buffer_size.0); + cf_opts.set_vector_memtable_factory(opts.defaultcf.write_buffer_size.0); + cf_opts.set_max_write_buffer_number(opts.defaultcf.max_write_buffer_number); + // Disable compaction and rate limit. + cf_opts.set_disable_auto_compactions(true); + cf_opts.set_soft_pending_compaction_bytes_limit(0); + cf_opts.set_hard_pending_compaction_bytes_limit(0); + cf_opts.set_level_zero_stop_writes_trigger(DISABLED); + cf_opts.set_level_zero_slowdown_writes_trigger(DISABLED); + // Add size properties to get approximate ranges wihout scan. + let f = Box::new(SizePropertiesCollectorFactory::default()); + cf_opts.add_table_properties_collector_factory("tikv.size-properties-collector", f); + + (db_opts, CFOptions::new(CF_DEFAULT, cf_opts)) +} + +#[cfg(test)] +mod tests { + use super::*; + + use tempdir::TempDir; + + fn new_engine() -> (TempDir, Engine) { + let dir = TempDir::new("test_import_engine").unwrap(); + let uuid = Uuid::new_v4(); + let opts = DbConfig::default(); + let engine = Engine::new(dir.path(), uuid, opts).unwrap(); + (dir, engine) + } + + fn new_write_batch(n: u8, ts: u64) -> WriteBatch { + let mut wb = WriteBatch::new(); + for i in 0..n { + let mut m = Mutation::new(); + m.set_op(Mutation_OP::Put); + m.set_key(vec![i]); + m.set_value(vec![i]); + wb.mut_mutations().push(m); + } + wb.set_commit_ts(ts); + wb + } + + fn new_encoded_key(i: u8, ts: u64) -> Vec { + Key::from_raw(&[i]).append_ts(ts).encoded().to_owned() + } + + #[test] + fn test_write() { + let (_dir, engine) = new_engine(); + + let n = 10; + let commit_ts = 10; + let wb = new_write_batch(n, commit_ts); + engine.write(wb).unwrap(); + + for i in 0..n { + let key = new_encoded_key(i, commit_ts); + assert_eq!(engine.get(&key).unwrap().unwrap(), &[i]); + } + } +} diff --git a/src/import/errors.rs b/src/import/errors.rs index ed2bc7fb97a..41a26efd154 100644 --- a/src/import/errors.rs +++ b/src/import/errors.rs @@ -18,7 +18,7 @@ use std::result; use futures::sync::oneshot::Canceled; use grpc::Error as GrpcError; -use uuid::ParseError; +use uuid::{ParseError, Uuid}; use raftstore::errors::Error as RaftStoreError; use util::codec::Error as CodecError; @@ -82,6 +82,13 @@ quick_error! { TokenNotFound(token: usize) { display("Token {} not found", token) } + EngineInUse(uuid: Uuid) { + display("Engine {} is in use", uuid) + } + EngineNotFound(uuid: Uuid) { + display("Engine {} not found", uuid) + } + InvalidChunk {} } } diff --git a/src/import/kv_importer.rs b/src/import/kv_importer.rs new file mode 100644 index 00000000000..7b1cfa1dbe2 --- /dev/null +++ b/src/import/kv_importer.rs @@ -0,0 +1,274 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fs; +use std::fmt; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use uuid::Uuid; +use kvproto::importpb::*; + +use config::DbConfig; +use util::collections::HashMap; + +use super::engine::*; +use super::{Config, Error, Result}; + +pub struct KVImporter { + dir: EngineDir, + engines: Mutex>>, +} + +impl KVImporter { + pub fn new(cfg: Config, opts: DbConfig) -> Result { + let dir = EngineDir::new(&cfg.import_dir, opts)?; + Ok(KVImporter { + dir: dir, + engines: Mutex::new(HashMap::default()), + }) + } + + /// Open the engine. + pub fn open_engine(&self, uuid: Uuid) -> Result<()> { + let mut engines = self.engines.lock().unwrap(); + if engines.contains_key(&uuid) { + return Ok(()); + } + + match self.dir.open(uuid) { + Ok(engine) => { + info!("open {:?}", engine); + engines.insert(uuid, Arc::new(engine)); + Ok(()) + } + Err(e) => { + error!("open {}: {:?}", uuid, e); + Err(e) + } + } + } + + pub fn bind_engine(&self, uuid: Uuid) -> Result> { + let engines = self.engines.lock().unwrap(); + match engines.get(&uuid) { + Some(engine) => Ok(Arc::clone(engine)), + None => Err(Error::EngineNotFound(uuid)), + } + } + + /// Close the engine. + /// Engine can not be closed when it is writing. + pub fn close_engine(&self, uuid: Uuid) -> Result<()> { + let mut engine = { + let mut engines = self.engines.lock().unwrap(); + let engine = match engines.remove(&uuid) { + Some(engine) => engine, + None => return Err(Error::EngineNotFound(uuid)), + }; + match Arc::try_unwrap(engine) { + Ok(engine) => engine, + Err(engine) => { + engines.insert(uuid, engine); + return Err(Error::EngineInUse(uuid)); + } + } + }; + + match engine.close() { + Ok(_) => { + info!("close {:?}", engine); + Ok(()) + } + Err(e) => { + error!("close {:?}: {:?}", engine, e); + Err(e) + } + } + } +} + +pub struct EngineDir { + opts: DbConfig, + root_dir: PathBuf, + temp_dir: PathBuf, +} + +impl EngineDir { + const TEMP_DIR: &'static str = ".temp"; + + fn new>(root: P, opts: DbConfig) -> Result { + let root_dir = root.as_ref().to_owned(); + let temp_dir = root_dir.join(Self::TEMP_DIR); + if !temp_dir.exists() { + fs::create_dir_all(&temp_dir)?; + } + Ok(EngineDir { + opts: opts, + root_dir: root_dir, + temp_dir: temp_dir, + }) + } + + fn join(&self, uuid: Uuid) -> EnginePath { + let file_name = format!("{}", uuid); + let save_path = self.root_dir.join(&file_name); + let temp_path = self.temp_dir.join(&file_name); + EnginePath { + save: save_path, + temp: temp_path, + } + } + + fn open(&self, uuid: Uuid) -> Result { + let path = self.join(uuid); + if path.save.exists() { + return Err(Error::FileExists(path.save)); + } + EngineFile::new(uuid, path, self.opts.clone()) + } +} + +#[derive(Clone)] +pub struct EnginePath { + // The path of the engine that has been closed. + save: PathBuf, + // The path of the engine that is being wrote. + temp: PathBuf, +} + +impl fmt::Debug for EnginePath { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EnginePath") + .field("save", &self.save) + .field("temp", &self.temp) + .finish() + } +} + +pub struct EngineFile { + uuid: Uuid, + path: EnginePath, + engine: Option, +} + +impl EngineFile { + fn new(uuid: Uuid, path: EnginePath, opts: DbConfig) -> Result { + let engine = Engine::new(&path.temp, uuid, opts)?; + Ok(EngineFile { + uuid: uuid, + path: path, + engine: Some(engine), + }) + } + + pub fn write(&self, batch: WriteBatch) -> Result { + self.engine.as_ref().unwrap().write(batch) + } + + fn close(&mut self) -> Result<()> { + self.engine.take().unwrap().flush(true)?; + if self.path.save.exists() { + return Err(Error::FileExists(self.path.save.clone())); + } + fs::rename(&self.path.temp, &self.path.save)?; + Ok(()) + } + + fn cleanup(&mut self) -> Result<()> { + self.engine.take(); + if self.path.temp.exists() { + fs::remove_dir_all(&self.path.temp)?; + } + Ok(()) + } +} + +impl Drop for EngineFile { + fn drop(&mut self) { + if let Err(e) = self.cleanup() { + warn!("cleanup {:?}: {:?}", self, e); + } + } +} + +impl fmt::Debug for EngineFile { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EngineFile") + .field("uuid", &self.uuid) + .field("path", &self.path) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use tempdir::TempDir; + + #[test] + fn test_kv_importer() { + let temp_dir = TempDir::new("test_kv_importer").unwrap(); + + let mut cfg = Config::default(); + cfg.import_dir = temp_dir.path().to_str().unwrap().to_owned(); + let importer = KVImporter::new(cfg, DbConfig::default()).unwrap(); + + let uuid = Uuid::new_v4(); + // Can not bind to an unopened engine. + assert!(importer.bind_engine(uuid).is_err()); + importer.open_engine(uuid).unwrap(); + let engine = importer.bind_engine(uuid).unwrap(); + engine.write(WriteBatch::new()).unwrap(); + // Can not close an in use engine. + assert!(importer.close_engine(uuid).is_err()); + drop(engine); + importer.close_engine(uuid).unwrap(); + } + + #[test] + fn test_engine_file() { + let temp_dir = TempDir::new("test_engine_file").unwrap(); + + let uuid = Uuid::new_v4(); + let opts = DbConfig::default(); + let path = EnginePath { + save: temp_dir.path().join("save"), + temp: temp_dir.path().join("temp"), + }; + + // Test close. + { + let mut f = EngineFile::new(uuid, path.clone(), opts.clone()).unwrap(); + // Cannot create the same file again. + assert!(EngineFile::new(uuid, path.clone(), opts.clone()).is_err()); + assert!(path.temp.exists()); + assert!(!path.save.exists()); + f.close().unwrap(); + assert!(!path.temp.exists()); + assert!(path.save.exists()); + fs::remove_dir_all(&path.save).unwrap(); + } + + // Test cleanup. + { + let f = EngineFile::new(uuid, path.clone(), opts.clone()).unwrap(); + assert!(path.temp.exists()); + assert!(!path.save.exists()); + drop(f); + assert!(!path.temp.exists()); + assert!(!path.save.exists()); + } + } +} diff --git a/src/import/kv_server.rs b/src/import/kv_server.rs new file mode 100644 index 00000000000..28e9d0bed7a --- /dev/null +++ b/src/import/kv_server.rs @@ -0,0 +1,74 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; + +use grpc::{ChannelBuilder, EnvBuilder, Server as GrpcServer, ServerBuilder}; +use kvproto::importpb_grpc::create_import_kv; + +use config::TiKvConfig; + +use super::{ImportKVService, KVImporter}; + +const MAX_GRPC_MSG_LEN: usize = 32 * 1024 * 1024; + +pub struct ImportKVServer { + grpc_server: GrpcServer, +} + +impl ImportKVServer { + pub fn new(tikv: &TiKvConfig) -> ImportKVServer { + let cfg = &tikv.server; + let addr = SocketAddr::from_str(&cfg.addr).unwrap(); + + let importer = KVImporter::new(tikv.import.clone(), tikv.rocksdb.clone()).unwrap(); + let import_service = ImportKVService::new(tikv.import.clone(), Arc::new(importer)); + + let env = Arc::new( + EnvBuilder::new() + .name_prefix(thd_name!("import-server")) + .cq_count(cfg.grpc_concurrency) + .build(), + ); + + let channel_args = ChannelBuilder::new(Arc::clone(&env)) + .stream_initial_window_size(cfg.grpc_stream_initial_window_size.0 as usize) + .max_concurrent_stream(cfg.grpc_concurrent_stream) + .max_send_message_len(MAX_GRPC_MSG_LEN) + .max_receive_message_len(MAX_GRPC_MSG_LEN) + .build_args(); + + let grpc_server = ServerBuilder::new(Arc::clone(&env)) + .bind(format!("{}", addr.ip()), addr.port()) + .channel_args(channel_args) + .register_service(create_import_kv(import_service)) + .build() + .unwrap(); + + ImportKVServer { grpc_server } + } + + pub fn start(&mut self) { + self.grpc_server.start(); + } + + pub fn shutdown(&mut self) { + self.grpc_server.shutdown(); + } + + pub fn bind_addrs(&self) -> &[(String, u16)] { + self.grpc_server.bind_addrs() + } +} diff --git a/src/import/kv_service.rs b/src/import/kv_service.rs new file mode 100644 index 00000000000..ef2fbb4c3ec --- /dev/null +++ b/src/import/kv_service.rs @@ -0,0 +1,147 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use uuid::Uuid; +use grpc::{ClientStreamingSink, RequestStream, RpcContext, UnarySink}; +use futures::sync::mpsc; +use futures::{Future, Stream}; +use futures_cpupool::{Builder, CpuPool}; +use kvproto::importpb::*; +use kvproto::importpb_grpc::*; + +use util::time::Instant; + +use super::service::*; +use super::metrics::*; +use super::{Config, Error, KVImporter}; + +#[derive(Clone)] +pub struct ImportKVService { + cfg: Config, + threads: CpuPool, + importer: Arc, +} + +impl ImportKVService { + pub fn new(cfg: Config, importer: Arc) -> ImportKVService { + let threads = Builder::new() + .name_prefix("kv-importer") + .pool_size(cfg.num_threads) + .create(); + ImportKVService { + cfg: cfg, + threads: threads, + importer: importer, + } + } +} + +impl ImportKv for ImportKVService { + fn open(&self, ctx: RpcContext, req: OpenRequest, sink: UnarySink) { + let label = "open"; + let timer = Instant::now_coarse(); + let import = Arc::clone(&self.importer); + + ctx.spawn( + self.threads + .spawn_fn(move || { + let uuid = Uuid::from_bytes(req.get_uuid())?; + import.open_engine(uuid) + }) + .map(|_| OpenResponse::new()) + .then(move |res| send_rpc_response!(res, sink, label, timer)), + ) + } + + fn write( + &self, + ctx: RpcContext, + stream: RequestStream, + sink: ClientStreamingSink, + ) { + let label = "write"; + let timer = Instant::now_coarse(); + let import = Arc::clone(&self.importer); + let bounded_stream = mpsc::spawn(stream, &self.threads, self.cfg.stream_channel_window); + + ctx.spawn( + self.threads.spawn( + bounded_stream + .into_future() + .map_err(|(e, _)| Error::from(e)) + .and_then(move |(chunk, stream)| { + let head = match chunk { + Some(ref chunk) if chunk.has_head() => chunk.get_head(), + _ => return Err(Error::InvalidChunk), + }; + let uuid = Uuid::from_bytes(head.get_uuid())?; + let engine = import.bind_engine(uuid)?; + Ok((engine, stream)) + }) + .and_then(move |(engine, stream)| { + stream.map_err(Error::from).for_each(move |mut chunk| { + let start = Instant::now_coarse(); + if !chunk.has_batch() { + return Err(Error::InvalidChunk); + } + let batch = chunk.take_batch(); + let batch_size = engine.write(batch)?; + IMPORT_WRITE_CHUNK_BYTES.observe(batch_size as f64); + IMPORT_WRITE_CHUNK_DURATION.observe(start.elapsed_secs()); + Ok(()) + }) + }) + .then(move |res| match res { + Ok(_) => Ok(WriteResponse::new()), + Err(Error::EngineNotFound(v)) => { + let mut resp = WriteResponse::new(); + resp.mut_error() + .mut_engine_not_found() + .set_uuid(v.as_bytes().to_vec()); + Ok(resp) + } + Err(e) => Err(e), + }) + .then(move |res| send_rpc_response!(res, sink, label, timer)), + ), + ) + } + + fn close(&self, ctx: RpcContext, req: CloseRequest, sink: UnarySink) { + let label = "close"; + let timer = Instant::now_coarse(); + let import = Arc::clone(&self.importer); + + ctx.spawn( + self.threads + .spawn_fn(move || { + let uuid = Uuid::from_bytes(req.get_uuid())?; + import.close_engine(uuid) + }) + .then(move |res| match res { + Ok(_) => Ok(CloseResponse::new()), + Err(Error::EngineNotFound(v)) => { + let mut resp = CloseResponse::new(); + resp.mut_error() + .mut_engine_not_found() + .set_uuid(v.as_bytes().to_vec()); + Ok(resp) + } + Err(e) => Err(e), + }) + .then(move |res| send_rpc_response!(res, sink, label, timer)), + ) + } +} diff --git a/src/import/metrics.rs b/src/import/metrics.rs index 9ad64225cd6..0812af3d87f 100644 --- a/src/import/metrics.rs +++ b/src/import/metrics.rs @@ -22,6 +22,20 @@ lazy_static! { exponential_buckets(0.001, 2.0, 30).unwrap() ).unwrap(); + pub static ref IMPORT_WRITE_CHUNK_BYTES: Histogram = + register_histogram!( + "tikv_import_write_chunk_bytes", + "Bucketed histogram of import write chunk bytes", + exponential_buckets(1024.0, 2.0, 20).unwrap() + ).unwrap(); + + pub static ref IMPORT_WRITE_CHUNK_DURATION: Histogram = + register_histogram!( + "tikv_import_write_chunk_duration", + "Bucketed histogram of import write chunk duration", + exponential_buckets(0.001, 2.0, 20).unwrap() + ).unwrap(); + pub static ref IMPORT_UPLOAD_CHUNK_BYTES: Histogram = register_histogram!( "tikv_import_upload_chunk_bytes", diff --git a/src/import/mod.rs b/src/import/mod.rs index 1a0fac6ea15..5a20e59aaf2 100644 --- a/src/import/mod.rs +++ b/src/import/mod.rs @@ -13,9 +13,13 @@ mod config; mod errors; +mod engine; mod metrics; #[macro_use] mod service; +mod kv_server; +mod kv_service; +mod kv_importer; mod sst_service; mod sst_importer; @@ -23,5 +27,8 @@ pub mod test_helpers; pub use self::config::Config; pub use self::errors::{Error, Result}; +pub use self::kv_server::ImportKVServer; +pub use self::kv_service::ImportKVService; +pub use self::kv_importer::KVImporter; pub use self::sst_service::ImportSSTService; pub use self::sst_importer::SSTImporter; diff --git a/src/import/sst_importer.rs b/src/import/sst_importer.rs index f61b65a2f85..d78c90475f3 100644 --- a/src/import/sst_importer.rs +++ b/src/import/sst_importer.rs @@ -469,7 +469,7 @@ mod tests { { let mut f = ImportFile::create(meta.clone(), path.clone()).unwrap(); - // Cannot create the same again. + // Cannot create the same file again. assert!(ImportFile::create(meta.clone(), path.clone()).is_err()); f.append(data).unwrap(); // Invalid crc32 and length. diff --git a/tests/config/mod.rs b/tests/config/mod.rs index d8a286ca0a2..a955a516417 100644 --- a/tests/config/mod.rs +++ b/tests/config/mod.rs @@ -400,6 +400,7 @@ fn test_serde_custom_tikv_config() { override_ssl_target: "".to_owned(), }; value.import = ImportConfig { + import_dir: "/abc".to_owned(), num_threads: 123, stream_channel_window: 123, }; diff --git a/tests/config/test-custom.toml b/tests/config/test-custom.toml index 22e94dea718..5311e3543d6 100644 --- a/tests/config/test-custom.toml +++ b/tests/config/test-custom.toml @@ -342,5 +342,6 @@ cert-path = "invalid path" key-path = "invalid path" [import] +import-dir = "/abc" num-threads = 123 stream-channel-window = 123 diff --git a/tests/import/kv_service.rs b/tests/import/kv_service.rs new file mode 100644 index 00000000000..61f5415377a --- /dev/null +++ b/tests/import/kv_service.rs @@ -0,0 +1,105 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use uuid::Uuid; +use futures::{stream, Future, Stream}; +use tempdir::TempDir; + +use kvproto::importpb::*; +use kvproto::importpb_grpc::*; +use grpc::{ChannelBuilder, Environment, Result, WriteFlags}; + +use tikv::config::TiKvConfig; +use tikv::import::ImportKVServer; + +fn new_kv_server() -> (ImportKVServer, ImportKvClient) { + let temp_dir = TempDir::new("test_import_kv_server").unwrap(); + + let mut cfg = TiKvConfig::default(); + cfg.server.addr = "127.0.0.1:0".to_owned(); + cfg.import.import_dir = temp_dir.path().to_str().unwrap().to_owned(); + let server = ImportKVServer::new(&cfg); + + let ch = { + let env = Arc::new(Environment::new(1)); + let addr = server.bind_addrs().first().unwrap(); + ChannelBuilder::new(env).connect(&format!("{}:{}", addr.0, addr.1)) + }; + let client = ImportKvClient::new(ch); + + (server, client) +} + +#[test] +fn test_kv_service() { + let (mut server, client) = new_kv_server(); + server.start(); + + let uuid = Uuid::new_v4().as_bytes().to_vec(); + let mut head = WriteHead::new(); + head.set_uuid(uuid.clone()); + + let mut m = Mutation::new(); + m.op = Mutation_OP::Put; + m.set_key(vec![1]); + m.set_value(vec![1]); + let mut batch = WriteBatch::new(); + batch.set_commit_ts(123); + batch.mut_mutations().push(m); + + let mut open = OpenRequest::new(); + open.set_uuid(uuid.clone()); + + let mut close = CloseRequest::new(); + close.set_uuid(uuid.clone()); + + // Write an engine before it is opened. + let resp = send_write(&client, &head, &batch).unwrap(); + assert!(resp.get_error().has_engine_not_found()); + + // Close an engine before it it opened. + let resp = client.close(&close).unwrap(); + assert!(resp.get_error().has_engine_not_found()); + + client.open(&open).unwrap(); + let resp = send_write(&client, &head, &batch).unwrap(); + assert!(!resp.has_error()); + let resp = send_write(&client, &head, &batch).unwrap(); + assert!(!resp.has_error()); + let resp = client.close(&close).unwrap(); + assert!(!resp.has_error()); + + server.shutdown(); +} + +fn send_write( + client: &ImportKvClient, + head: &WriteHead, + batch: &WriteBatch, +) -> Result { + let mut r1 = WriteRequest::new(); + r1.set_head(head.clone()); + let mut r2 = WriteRequest::new(); + r2.set_batch(batch.clone()); + let mut r3 = WriteRequest::new(); + r3.set_batch(batch.clone()); + let reqs: Vec<_> = vec![r1, r2, r3] + .into_iter() + .map(|r| (r, WriteFlags::default())) + .collect(); + let (tx, rx) = client.write().unwrap(); + let stream = stream::iter_ok(reqs); + stream.forward(tx).and_then(|_| rx).wait() +} diff --git a/tests/import/mod.rs b/tests/import/mod.rs index 58093ec248f..123245e8681 100644 --- a/tests/import/mod.rs +++ b/tests/import/mod.rs @@ -11,4 +11,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod kv_service; mod sst_service;