From c1c8c82842d5d01ed883965cdc80114502e4e4a5 Mon Sep 17 00:00:00 2001 From: Huachao Huang Date: Wed, 4 Apr 2018 17:30:08 +0800 Subject: [PATCH] Add ImportKV service (#2881) Implement Open/Write/Close APIs to write key-value pairs to import server. `Open` opens an engine identified by an UUID. `Write` creates a write stream to write key-value batch to the opened engine. Different clients can write to the same engine concurrently. `Close` closes the engine after all write batches are finished. An engine can only be closed when all write streams are closed. An engine can only be closed once, and it can not be opened again once it is closed. After these three steps, the data in the engine is ready to be imported to TiKV, but the `Import` API is not included in this PR. --- Cargo.lock | 6 +- etc/config-template.toml | 2 + src/bin/signal_handler.rs | 10 +- src/bin/tikv-server.rs | 25 +++- src/import/config.rs | 2 + src/import/engine.rs | 172 +++++++++++++++++++++ src/import/errors.rs | 9 +- src/import/kv_importer.rs | 274 ++++++++++++++++++++++++++++++++++ src/import/kv_server.rs | 74 +++++++++ src/import/kv_service.rs | 147 ++++++++++++++++++ src/import/metrics.rs | 14 ++ src/import/mod.rs | 7 + src/import/sst_importer.rs | 2 +- tests/config/mod.rs | 1 + tests/config/test-custom.toml | 1 + tests/import/kv_service.rs | 105 +++++++++++++ tests/import/mod.rs | 1 + 17 files changed, 840 insertions(+), 12 deletions(-) create mode 100644 src/import/engine.rs create mode 100644 src/import/kv_importer.rs create mode 100644 src/import/kv_server.rs create mode 100644 src/import/kv_service.rs create mode 100644 tests/import/kv_service.rs diff --git a/Cargo.lock b/Cargo.lock index 0074b781906..0a43ef6def3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,7 +461,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.1" -source = "git+https://github.com/pingcap/kvproto.git#1c5dddd57fe7b3ceac57b5107afbbf4539fb97bc" +source = "git+https://github.com/pingcap/kvproto.git#dc0b3b3e57761a9620639c28687e7ee4d5ee0c2a" dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "grpcio 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -502,7 +502,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "librocksdb_sys" version = "0.1.0" -source = "git+https://github.com/pingcap/rust-rocksdb.git#7e21e222c8dcb9729a61222995dffc3e836194da" +source = "git+https://github.com/pingcap/rust-rocksdb.git#1fc594481da84ceb515e5a7380ccc130d2d2c733" dependencies = [ "bzip2-sys 0.1.6 (git+https://github.com/alexcrichton/bzip2-rs.git)", "cc 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -898,7 +898,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rocksdb" version = "0.3.0" -source = "git+https://github.com/pingcap/rust-rocksdb.git#7e21e222c8dcb9729a61222995dffc3e836194da" +source = "git+https://github.com/pingcap/rust-rocksdb.git#1fc594481da84ceb515e5a7380ccc130d2d2c733" dependencies = [ "crc 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/etc/config-template.toml b/etc/config-template.toml index b8b30b08e66..70a87d1e4d4 100644 --- a/etc/config-template.toml +++ b/etc/config-template.toml @@ -442,6 +442,8 @@ # key-path = "" [import] +# the directory to store importing kv data. +# import-dir = "/tmp/tikv/import" # number of threads to handle RPC requests. # num-threads = 8 # stream channel window size, stream will be blocked on channel full. diff --git a/src/bin/signal_handler.rs b/src/bin/signal_handler.rs index 52d06c012df..81cb5915499 100644 --- a/src/bin/signal_handler.rs +++ b/src/bin/signal_handler.rs @@ -20,7 +20,7 @@ mod imp { use tikv::raftstore::store::Engines; use tikv::util::{metrics, rocksdb_stats}; - pub fn handle_signal(engines: Engines) { + pub fn handle_signal(engines: Option) { use signal::trap::Trap; use nix::sys::signal::{SIGUSR1, SIGUSR2, SIGHUP, SIGINT, SIGTERM}; let trap = Trap::trap(&[SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2]); @@ -33,8 +33,10 @@ mod imp { SIGUSR1 => { // Use SIGUSR1 to log metrics. info!("{}", metrics::dump()); - info!("{:?}", rocksdb_stats::dump(&engines.kv_engine)); - info!("{:?}", rocksdb_stats::dump(&engines.raft_engine)); + if let Some(ref engines) = engines { + info!("{:?}", rocksdb_stats::dump(&engines.kv_engine)); + info!("{:?}", rocksdb_stats::dump(&engines.raft_engine)); + } } SIGUSR2 => profiling::dump_prof(None), // TODO: handle more signal @@ -48,7 +50,7 @@ mod imp { mod imp { use tikv::raftstore::store::Engines; - pub fn handle_signal(_: Engines) {} + pub fn handle_signal(_: Option) {} } pub use self::imp::handle_signal; diff --git a/src/bin/tikv-server.rs b/src/bin/tikv-server.rs index 7ac256869d9..6b46f6a63bc 100644 --- a/src/bin/tikv-server.rs +++ b/src/bin/tikv-server.rs @@ -72,7 +72,7 @@ use tikv::raftstore::coprocessor::CoprocessorHost; use tikv::pd::{PdClient, RpcClient}; use tikv::util::time::Monitor; use tikv::util::rocksdb::metrics_flusher::{MetricsFlusher, DEFAULT_FLUSHER_INTERVAL}; -use tikv::import::{ImportSSTService, SSTImporter}; +use tikv::import::{ImportKVServer, ImportSSTService, SSTImporter}; const RESERVED_OPEN_FDS: u64 = 1000; @@ -283,7 +283,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc Config { Config { + import_dir: "/tmp/tikv/import".to_owned(), num_threads: 8, stream_channel_window: 128, } diff --git a/src/import/engine.rs b/src/import/engine.rs new file mode 100644 index 00000000000..296f3cd273b --- /dev/null +++ b/src/import/engine.rs @@ -0,0 +1,172 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp; +use std::i32; +use std::fmt; +use std::ops::Deref; +use std::sync::Arc; +use std::path::Path; + +use uuid::Uuid; + +use rocksdb::{BlockBasedOptions, ColumnFamilyOptions, DBOptions, Writable, WriteBatch as RawBatch, + DB}; +use kvproto::importpb::*; + +use config::DbConfig; +use storage::CF_DEFAULT; +use storage::types::Key; +use util::config::MB; +use util::rocksdb::{new_engine_opt, CFOptions}; +use util::rocksdb::properties::SizePropertiesCollectorFactory; + +use super::Result; + +pub struct Engine { + db: Arc, + uuid: Uuid, +} + +impl Engine { + pub fn new>(path: P, uuid: Uuid, opts: DbConfig) -> Result { + let db = { + let (db_opts, cf_opts) = tune_dboptions_for_bulk_load(&opts); + new_engine_opt(path.as_ref().to_str().unwrap(), db_opts, vec![cf_opts])? + }; + Ok(Engine { + db: Arc::new(db), + uuid: uuid, + }) + } + + pub fn uuid(&self) -> Uuid { + self.uuid + } + + pub fn write(&self, mut batch: WriteBatch) -> Result { + // Just a guess. + let wb_cap = cmp::min(batch.get_mutations().len() * 128, MB as usize); + let wb = RawBatch::with_capacity(wb_cap); + let commit_ts = batch.get_commit_ts(); + for m in batch.take_mutations().iter_mut() { + let k = Key::from_raw(m.get_key()).append_ts(commit_ts); + wb.put(k.encoded(), m.get_value()).unwrap(); + } + + let size = wb.data_size(); + self.write_without_wal(wb)?; + + Ok(size) + } +} + +impl Deref for Engine { + type Target = DB; + + fn deref(&self) -> &Self::Target { + &self.db + } +} + +impl fmt::Debug for Engine { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Engine") + .field("uuid", &self.uuid()) + .field("path", &self.path().to_owned()) + .finish() + } +} + +fn tune_dboptions_for_bulk_load(opts: &DbConfig) -> (DBOptions, CFOptions) { + const DISABLED: i32 = i32::MAX; + + let mut db_opts = DBOptions::new(); + db_opts.create_if_missing(true); + db_opts.enable_statistics(false); + // Vector memtable doesn't support concurrent write. + db_opts.allow_concurrent_memtable_write(false); + db_opts.set_use_direct_io_for_flush_and_compaction(true); + // RocksDB preserves `max_background_jobs/4` for flush. + db_opts.set_max_background_jobs(opts.max_background_jobs); + + let mut block_base_opts = BlockBasedOptions::new(); + // Use a large block size for sequential access. + block_base_opts.set_block_size(MB as usize); + let mut cf_opts = ColumnFamilyOptions::new(); + cf_opts.set_block_based_table_factory(&block_base_opts); + cf_opts.compression_per_level(&opts.defaultcf.compression_per_level); + // Consider using a large write buffer but be careful about OOM. + cf_opts.set_write_buffer_size(opts.defaultcf.write_buffer_size.0); + cf_opts.set_target_file_size_base(opts.defaultcf.write_buffer_size.0); + cf_opts.set_vector_memtable_factory(opts.defaultcf.write_buffer_size.0); + cf_opts.set_max_write_buffer_number(opts.defaultcf.max_write_buffer_number); + // Disable compaction and rate limit. + cf_opts.set_disable_auto_compactions(true); + cf_opts.set_soft_pending_compaction_bytes_limit(0); + cf_opts.set_hard_pending_compaction_bytes_limit(0); + cf_opts.set_level_zero_stop_writes_trigger(DISABLED); + cf_opts.set_level_zero_slowdown_writes_trigger(DISABLED); + // Add size properties to get approximate ranges wihout scan. + let f = Box::new(SizePropertiesCollectorFactory::default()); + cf_opts.add_table_properties_collector_factory("tikv.size-properties-collector", f); + + (db_opts, CFOptions::new(CF_DEFAULT, cf_opts)) +} + +#[cfg(test)] +mod tests { + use super::*; + + use tempdir::TempDir; + + fn new_engine() -> (TempDir, Engine) { + let dir = TempDir::new("test_import_engine").unwrap(); + let uuid = Uuid::new_v4(); + let opts = DbConfig::default(); + let engine = Engine::new(dir.path(), uuid, opts).unwrap(); + (dir, engine) + } + + fn new_write_batch(n: u8, ts: u64) -> WriteBatch { + let mut wb = WriteBatch::new(); + for i in 0..n { + let mut m = Mutation::new(); + m.set_op(Mutation_OP::Put); + m.set_key(vec![i]); + m.set_value(vec![i]); + wb.mut_mutations().push(m); + } + wb.set_commit_ts(ts); + wb + } + + fn new_encoded_key(i: u8, ts: u64) -> Vec { + Key::from_raw(&[i]).append_ts(ts).encoded().to_owned() + } + + #[test] + fn test_write() { + let (_dir, engine) = new_engine(); + + let n = 10; + let commit_ts = 10; + let wb = new_write_batch(n, commit_ts); + engine.write(wb).unwrap(); + + for i in 0..n { + let key = new_encoded_key(i, commit_ts); + assert_eq!(engine.get(&key).unwrap().unwrap(), &[i]); + } + } +} diff --git a/src/import/errors.rs b/src/import/errors.rs index ed2bc7fb97a..41a26efd154 100644 --- a/src/import/errors.rs +++ b/src/import/errors.rs @@ -18,7 +18,7 @@ use std::result; use futures::sync::oneshot::Canceled; use grpc::Error as GrpcError; -use uuid::ParseError; +use uuid::{ParseError, Uuid}; use raftstore::errors::Error as RaftStoreError; use util::codec::Error as CodecError; @@ -82,6 +82,13 @@ quick_error! { TokenNotFound(token: usize) { display("Token {} not found", token) } + EngineInUse(uuid: Uuid) { + display("Engine {} is in use", uuid) + } + EngineNotFound(uuid: Uuid) { + display("Engine {} not found", uuid) + } + InvalidChunk {} } } diff --git a/src/import/kv_importer.rs b/src/import/kv_importer.rs new file mode 100644 index 00000000000..7b1cfa1dbe2 --- /dev/null +++ b/src/import/kv_importer.rs @@ -0,0 +1,274 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fs; +use std::fmt; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use uuid::Uuid; +use kvproto::importpb::*; + +use config::DbConfig; +use util::collections::HashMap; + +use super::engine::*; +use super::{Config, Error, Result}; + +pub struct KVImporter { + dir: EngineDir, + engines: Mutex>>, +} + +impl KVImporter { + pub fn new(cfg: Config, opts: DbConfig) -> Result { + let dir = EngineDir::new(&cfg.import_dir, opts)?; + Ok(KVImporter { + dir: dir, + engines: Mutex::new(HashMap::default()), + }) + } + + /// Open the engine. + pub fn open_engine(&self, uuid: Uuid) -> Result<()> { + let mut engines = self.engines.lock().unwrap(); + if engines.contains_key(&uuid) { + return Ok(()); + } + + match self.dir.open(uuid) { + Ok(engine) => { + info!("open {:?}", engine); + engines.insert(uuid, Arc::new(engine)); + Ok(()) + } + Err(e) => { + error!("open {}: {:?}", uuid, e); + Err(e) + } + } + } + + pub fn bind_engine(&self, uuid: Uuid) -> Result> { + let engines = self.engines.lock().unwrap(); + match engines.get(&uuid) { + Some(engine) => Ok(Arc::clone(engine)), + None => Err(Error::EngineNotFound(uuid)), + } + } + + /// Close the engine. + /// Engine can not be closed when it is writing. + pub fn close_engine(&self, uuid: Uuid) -> Result<()> { + let mut engine = { + let mut engines = self.engines.lock().unwrap(); + let engine = match engines.remove(&uuid) { + Some(engine) => engine, + None => return Err(Error::EngineNotFound(uuid)), + }; + match Arc::try_unwrap(engine) { + Ok(engine) => engine, + Err(engine) => { + engines.insert(uuid, engine); + return Err(Error::EngineInUse(uuid)); + } + } + }; + + match engine.close() { + Ok(_) => { + info!("close {:?}", engine); + Ok(()) + } + Err(e) => { + error!("close {:?}: {:?}", engine, e); + Err(e) + } + } + } +} + +pub struct EngineDir { + opts: DbConfig, + root_dir: PathBuf, + temp_dir: PathBuf, +} + +impl EngineDir { + const TEMP_DIR: &'static str = ".temp"; + + fn new>(root: P, opts: DbConfig) -> Result { + let root_dir = root.as_ref().to_owned(); + let temp_dir = root_dir.join(Self::TEMP_DIR); + if !temp_dir.exists() { + fs::create_dir_all(&temp_dir)?; + } + Ok(EngineDir { + opts: opts, + root_dir: root_dir, + temp_dir: temp_dir, + }) + } + + fn join(&self, uuid: Uuid) -> EnginePath { + let file_name = format!("{}", uuid); + let save_path = self.root_dir.join(&file_name); + let temp_path = self.temp_dir.join(&file_name); + EnginePath { + save: save_path, + temp: temp_path, + } + } + + fn open(&self, uuid: Uuid) -> Result { + let path = self.join(uuid); + if path.save.exists() { + return Err(Error::FileExists(path.save)); + } + EngineFile::new(uuid, path, self.opts.clone()) + } +} + +#[derive(Clone)] +pub struct EnginePath { + // The path of the engine that has been closed. + save: PathBuf, + // The path of the engine that is being wrote. + temp: PathBuf, +} + +impl fmt::Debug for EnginePath { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EnginePath") + .field("save", &self.save) + .field("temp", &self.temp) + .finish() + } +} + +pub struct EngineFile { + uuid: Uuid, + path: EnginePath, + engine: Option, +} + +impl EngineFile { + fn new(uuid: Uuid, path: EnginePath, opts: DbConfig) -> Result { + let engine = Engine::new(&path.temp, uuid, opts)?; + Ok(EngineFile { + uuid: uuid, + path: path, + engine: Some(engine), + }) + } + + pub fn write(&self, batch: WriteBatch) -> Result { + self.engine.as_ref().unwrap().write(batch) + } + + fn close(&mut self) -> Result<()> { + self.engine.take().unwrap().flush(true)?; + if self.path.save.exists() { + return Err(Error::FileExists(self.path.save.clone())); + } + fs::rename(&self.path.temp, &self.path.save)?; + Ok(()) + } + + fn cleanup(&mut self) -> Result<()> { + self.engine.take(); + if self.path.temp.exists() { + fs::remove_dir_all(&self.path.temp)?; + } + Ok(()) + } +} + +impl Drop for EngineFile { + fn drop(&mut self) { + if let Err(e) = self.cleanup() { + warn!("cleanup {:?}: {:?}", self, e); + } + } +} + +impl fmt::Debug for EngineFile { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EngineFile") + .field("uuid", &self.uuid) + .field("path", &self.path) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use tempdir::TempDir; + + #[test] + fn test_kv_importer() { + let temp_dir = TempDir::new("test_kv_importer").unwrap(); + + let mut cfg = Config::default(); + cfg.import_dir = temp_dir.path().to_str().unwrap().to_owned(); + let importer = KVImporter::new(cfg, DbConfig::default()).unwrap(); + + let uuid = Uuid::new_v4(); + // Can not bind to an unopened engine. + assert!(importer.bind_engine(uuid).is_err()); + importer.open_engine(uuid).unwrap(); + let engine = importer.bind_engine(uuid).unwrap(); + engine.write(WriteBatch::new()).unwrap(); + // Can not close an in use engine. + assert!(importer.close_engine(uuid).is_err()); + drop(engine); + importer.close_engine(uuid).unwrap(); + } + + #[test] + fn test_engine_file() { + let temp_dir = TempDir::new("test_engine_file").unwrap(); + + let uuid = Uuid::new_v4(); + let opts = DbConfig::default(); + let path = EnginePath { + save: temp_dir.path().join("save"), + temp: temp_dir.path().join("temp"), + }; + + // Test close. + { + let mut f = EngineFile::new(uuid, path.clone(), opts.clone()).unwrap(); + // Cannot create the same file again. + assert!(EngineFile::new(uuid, path.clone(), opts.clone()).is_err()); + assert!(path.temp.exists()); + assert!(!path.save.exists()); + f.close().unwrap(); + assert!(!path.temp.exists()); + assert!(path.save.exists()); + fs::remove_dir_all(&path.save).unwrap(); + } + + // Test cleanup. + { + let f = EngineFile::new(uuid, path.clone(), opts.clone()).unwrap(); + assert!(path.temp.exists()); + assert!(!path.save.exists()); + drop(f); + assert!(!path.temp.exists()); + assert!(!path.save.exists()); + } + } +} diff --git a/src/import/kv_server.rs b/src/import/kv_server.rs new file mode 100644 index 00000000000..28e9d0bed7a --- /dev/null +++ b/src/import/kv_server.rs @@ -0,0 +1,74 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::Arc; + +use grpc::{ChannelBuilder, EnvBuilder, Server as GrpcServer, ServerBuilder}; +use kvproto::importpb_grpc::create_import_kv; + +use config::TiKvConfig; + +use super::{ImportKVService, KVImporter}; + +const MAX_GRPC_MSG_LEN: usize = 32 * 1024 * 1024; + +pub struct ImportKVServer { + grpc_server: GrpcServer, +} + +impl ImportKVServer { + pub fn new(tikv: &TiKvConfig) -> ImportKVServer { + let cfg = &tikv.server; + let addr = SocketAddr::from_str(&cfg.addr).unwrap(); + + let importer = KVImporter::new(tikv.import.clone(), tikv.rocksdb.clone()).unwrap(); + let import_service = ImportKVService::new(tikv.import.clone(), Arc::new(importer)); + + let env = Arc::new( + EnvBuilder::new() + .name_prefix(thd_name!("import-server")) + .cq_count(cfg.grpc_concurrency) + .build(), + ); + + let channel_args = ChannelBuilder::new(Arc::clone(&env)) + .stream_initial_window_size(cfg.grpc_stream_initial_window_size.0 as usize) + .max_concurrent_stream(cfg.grpc_concurrent_stream) + .max_send_message_len(MAX_GRPC_MSG_LEN) + .max_receive_message_len(MAX_GRPC_MSG_LEN) + .build_args(); + + let grpc_server = ServerBuilder::new(Arc::clone(&env)) + .bind(format!("{}", addr.ip()), addr.port()) + .channel_args(channel_args) + .register_service(create_import_kv(import_service)) + .build() + .unwrap(); + + ImportKVServer { grpc_server } + } + + pub fn start(&mut self) { + self.grpc_server.start(); + } + + pub fn shutdown(&mut self) { + self.grpc_server.shutdown(); + } + + pub fn bind_addrs(&self) -> &[(String, u16)] { + self.grpc_server.bind_addrs() + } +} diff --git a/src/import/kv_service.rs b/src/import/kv_service.rs new file mode 100644 index 00000000000..ef2fbb4c3ec --- /dev/null +++ b/src/import/kv_service.rs @@ -0,0 +1,147 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use uuid::Uuid; +use grpc::{ClientStreamingSink, RequestStream, RpcContext, UnarySink}; +use futures::sync::mpsc; +use futures::{Future, Stream}; +use futures_cpupool::{Builder, CpuPool}; +use kvproto::importpb::*; +use kvproto::importpb_grpc::*; + +use util::time::Instant; + +use super::service::*; +use super::metrics::*; +use super::{Config, Error, KVImporter}; + +#[derive(Clone)] +pub struct ImportKVService { + cfg: Config, + threads: CpuPool, + importer: Arc, +} + +impl ImportKVService { + pub fn new(cfg: Config, importer: Arc) -> ImportKVService { + let threads = Builder::new() + .name_prefix("kv-importer") + .pool_size(cfg.num_threads) + .create(); + ImportKVService { + cfg: cfg, + threads: threads, + importer: importer, + } + } +} + +impl ImportKv for ImportKVService { + fn open(&self, ctx: RpcContext, req: OpenRequest, sink: UnarySink) { + let label = "open"; + let timer = Instant::now_coarse(); + let import = Arc::clone(&self.importer); + + ctx.spawn( + self.threads + .spawn_fn(move || { + let uuid = Uuid::from_bytes(req.get_uuid())?; + import.open_engine(uuid) + }) + .map(|_| OpenResponse::new()) + .then(move |res| send_rpc_response!(res, sink, label, timer)), + ) + } + + fn write( + &self, + ctx: RpcContext, + stream: RequestStream, + sink: ClientStreamingSink, + ) { + let label = "write"; + let timer = Instant::now_coarse(); + let import = Arc::clone(&self.importer); + let bounded_stream = mpsc::spawn(stream, &self.threads, self.cfg.stream_channel_window); + + ctx.spawn( + self.threads.spawn( + bounded_stream + .into_future() + .map_err(|(e, _)| Error::from(e)) + .and_then(move |(chunk, stream)| { + let head = match chunk { + Some(ref chunk) if chunk.has_head() => chunk.get_head(), + _ => return Err(Error::InvalidChunk), + }; + let uuid = Uuid::from_bytes(head.get_uuid())?; + let engine = import.bind_engine(uuid)?; + Ok((engine, stream)) + }) + .and_then(move |(engine, stream)| { + stream.map_err(Error::from).for_each(move |mut chunk| { + let start = Instant::now_coarse(); + if !chunk.has_batch() { + return Err(Error::InvalidChunk); + } + let batch = chunk.take_batch(); + let batch_size = engine.write(batch)?; + IMPORT_WRITE_CHUNK_BYTES.observe(batch_size as f64); + IMPORT_WRITE_CHUNK_DURATION.observe(start.elapsed_secs()); + Ok(()) + }) + }) + .then(move |res| match res { + Ok(_) => Ok(WriteResponse::new()), + Err(Error::EngineNotFound(v)) => { + let mut resp = WriteResponse::new(); + resp.mut_error() + .mut_engine_not_found() + .set_uuid(v.as_bytes().to_vec()); + Ok(resp) + } + Err(e) => Err(e), + }) + .then(move |res| send_rpc_response!(res, sink, label, timer)), + ), + ) + } + + fn close(&self, ctx: RpcContext, req: CloseRequest, sink: UnarySink) { + let label = "close"; + let timer = Instant::now_coarse(); + let import = Arc::clone(&self.importer); + + ctx.spawn( + self.threads + .spawn_fn(move || { + let uuid = Uuid::from_bytes(req.get_uuid())?; + import.close_engine(uuid) + }) + .then(move |res| match res { + Ok(_) => Ok(CloseResponse::new()), + Err(Error::EngineNotFound(v)) => { + let mut resp = CloseResponse::new(); + resp.mut_error() + .mut_engine_not_found() + .set_uuid(v.as_bytes().to_vec()); + Ok(resp) + } + Err(e) => Err(e), + }) + .then(move |res| send_rpc_response!(res, sink, label, timer)), + ) + } +} diff --git a/src/import/metrics.rs b/src/import/metrics.rs index 9ad64225cd6..0812af3d87f 100644 --- a/src/import/metrics.rs +++ b/src/import/metrics.rs @@ -22,6 +22,20 @@ lazy_static! { exponential_buckets(0.001, 2.0, 30).unwrap() ).unwrap(); + pub static ref IMPORT_WRITE_CHUNK_BYTES: Histogram = + register_histogram!( + "tikv_import_write_chunk_bytes", + "Bucketed histogram of import write chunk bytes", + exponential_buckets(1024.0, 2.0, 20).unwrap() + ).unwrap(); + + pub static ref IMPORT_WRITE_CHUNK_DURATION: Histogram = + register_histogram!( + "tikv_import_write_chunk_duration", + "Bucketed histogram of import write chunk duration", + exponential_buckets(0.001, 2.0, 20).unwrap() + ).unwrap(); + pub static ref IMPORT_UPLOAD_CHUNK_BYTES: Histogram = register_histogram!( "tikv_import_upload_chunk_bytes", diff --git a/src/import/mod.rs b/src/import/mod.rs index 1a0fac6ea15..5a20e59aaf2 100644 --- a/src/import/mod.rs +++ b/src/import/mod.rs @@ -13,9 +13,13 @@ mod config; mod errors; +mod engine; mod metrics; #[macro_use] mod service; +mod kv_server; +mod kv_service; +mod kv_importer; mod sst_service; mod sst_importer; @@ -23,5 +27,8 @@ pub mod test_helpers; pub use self::config::Config; pub use self::errors::{Error, Result}; +pub use self::kv_server::ImportKVServer; +pub use self::kv_service::ImportKVService; +pub use self::kv_importer::KVImporter; pub use self::sst_service::ImportSSTService; pub use self::sst_importer::SSTImporter; diff --git a/src/import/sst_importer.rs b/src/import/sst_importer.rs index f61b65a2f85..d78c90475f3 100644 --- a/src/import/sst_importer.rs +++ b/src/import/sst_importer.rs @@ -469,7 +469,7 @@ mod tests { { let mut f = ImportFile::create(meta.clone(), path.clone()).unwrap(); - // Cannot create the same again. + // Cannot create the same file again. assert!(ImportFile::create(meta.clone(), path.clone()).is_err()); f.append(data).unwrap(); // Invalid crc32 and length. diff --git a/tests/config/mod.rs b/tests/config/mod.rs index d8a286ca0a2..a955a516417 100644 --- a/tests/config/mod.rs +++ b/tests/config/mod.rs @@ -400,6 +400,7 @@ fn test_serde_custom_tikv_config() { override_ssl_target: "".to_owned(), }; value.import = ImportConfig { + import_dir: "/abc".to_owned(), num_threads: 123, stream_channel_window: 123, }; diff --git a/tests/config/test-custom.toml b/tests/config/test-custom.toml index 22e94dea718..5311e3543d6 100644 --- a/tests/config/test-custom.toml +++ b/tests/config/test-custom.toml @@ -342,5 +342,6 @@ cert-path = "invalid path" key-path = "invalid path" [import] +import-dir = "/abc" num-threads = 123 stream-channel-window = 123 diff --git a/tests/import/kv_service.rs b/tests/import/kv_service.rs new file mode 100644 index 00000000000..61f5415377a --- /dev/null +++ b/tests/import/kv_service.rs @@ -0,0 +1,105 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use uuid::Uuid; +use futures::{stream, Future, Stream}; +use tempdir::TempDir; + +use kvproto::importpb::*; +use kvproto::importpb_grpc::*; +use grpc::{ChannelBuilder, Environment, Result, WriteFlags}; + +use tikv::config::TiKvConfig; +use tikv::import::ImportKVServer; + +fn new_kv_server() -> (ImportKVServer, ImportKvClient) { + let temp_dir = TempDir::new("test_import_kv_server").unwrap(); + + let mut cfg = TiKvConfig::default(); + cfg.server.addr = "127.0.0.1:0".to_owned(); + cfg.import.import_dir = temp_dir.path().to_str().unwrap().to_owned(); + let server = ImportKVServer::new(&cfg); + + let ch = { + let env = Arc::new(Environment::new(1)); + let addr = server.bind_addrs().first().unwrap(); + ChannelBuilder::new(env).connect(&format!("{}:{}", addr.0, addr.1)) + }; + let client = ImportKvClient::new(ch); + + (server, client) +} + +#[test] +fn test_kv_service() { + let (mut server, client) = new_kv_server(); + server.start(); + + let uuid = Uuid::new_v4().as_bytes().to_vec(); + let mut head = WriteHead::new(); + head.set_uuid(uuid.clone()); + + let mut m = Mutation::new(); + m.op = Mutation_OP::Put; + m.set_key(vec![1]); + m.set_value(vec![1]); + let mut batch = WriteBatch::new(); + batch.set_commit_ts(123); + batch.mut_mutations().push(m); + + let mut open = OpenRequest::new(); + open.set_uuid(uuid.clone()); + + let mut close = CloseRequest::new(); + close.set_uuid(uuid.clone()); + + // Write an engine before it is opened. + let resp = send_write(&client, &head, &batch).unwrap(); + assert!(resp.get_error().has_engine_not_found()); + + // Close an engine before it it opened. + let resp = client.close(&close).unwrap(); + assert!(resp.get_error().has_engine_not_found()); + + client.open(&open).unwrap(); + let resp = send_write(&client, &head, &batch).unwrap(); + assert!(!resp.has_error()); + let resp = send_write(&client, &head, &batch).unwrap(); + assert!(!resp.has_error()); + let resp = client.close(&close).unwrap(); + assert!(!resp.has_error()); + + server.shutdown(); +} + +fn send_write( + client: &ImportKvClient, + head: &WriteHead, + batch: &WriteBatch, +) -> Result { + let mut r1 = WriteRequest::new(); + r1.set_head(head.clone()); + let mut r2 = WriteRequest::new(); + r2.set_batch(batch.clone()); + let mut r3 = WriteRequest::new(); + r3.set_batch(batch.clone()); + let reqs: Vec<_> = vec![r1, r2, r3] + .into_iter() + .map(|r| (r, WriteFlags::default())) + .collect(); + let (tx, rx) = client.write().unwrap(); + let stream = stream::iter_ok(reqs); + stream.forward(tx).and_then(|_| rx).wait() +} diff --git a/tests/import/mod.rs b/tests/import/mod.rs index 58093ec248f..123245e8681 100644 --- a/tests/import/mod.rs +++ b/tests/import/mod.rs @@ -11,4 +11,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod kv_service; mod sst_service;