Skip to content

Commit

Permalink
Add ImportKV service (tikv#2881)
Browse files Browse the repository at this point in the history
Implement Open/Write/Close APIs to write key-value pairs to import
server.

`Open` opens an engine identified by an UUID.

`Write` creates a write stream to write key-value batch to the opened
engine. Different clients can write to the same engine concurrently.

`Close` closes the engine after all write batches are finished. An
engine can only be closed when all write streams are closed. An engine
can only be closed once, and it can not be opened again once it is
closed.

After these three steps, the data in the engine is ready to be
imported to TiKV, but the `Import` API is not included in this PR.
  • Loading branch information
huachaohuang authored Apr 4, 2018
1 parent b660281 commit c1c8c82
Show file tree
Hide file tree
Showing 17 changed files with 840 additions and 12 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions etc/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@
# key-path = ""

[import]
# the directory to store importing kv data.
# import-dir = "/tmp/tikv/import"
# number of threads to handle RPC requests.
# num-threads = 8
# stream channel window size, stream will be blocked on channel full.
Expand Down
10 changes: 6 additions & 4 deletions src/bin/signal_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod imp {
use tikv::raftstore::store::Engines;
use tikv::util::{metrics, rocksdb_stats};

pub fn handle_signal(engines: Engines) {
pub fn handle_signal(engines: Option<Engines>) {
use signal::trap::Trap;
use nix::sys::signal::{SIGUSR1, SIGUSR2, SIGHUP, SIGINT, SIGTERM};
let trap = Trap::trap(&[SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2]);
Expand All @@ -33,8 +33,10 @@ mod imp {
SIGUSR1 => {
// Use SIGUSR1 to log metrics.
info!("{}", metrics::dump());
info!("{:?}", rocksdb_stats::dump(&engines.kv_engine));
info!("{:?}", rocksdb_stats::dump(&engines.raft_engine));
if let Some(ref engines) = engines {
info!("{:?}", rocksdb_stats::dump(&engines.kv_engine));
info!("{:?}", rocksdb_stats::dump(&engines.raft_engine));
}
}
SIGUSR2 => profiling::dump_prof(None),
// TODO: handle more signal
Expand All @@ -48,7 +50,7 @@ mod imp {
mod imp {
use tikv::raftstore::store::Engines;

pub fn handle_signal(_: Engines) {}
pub fn handle_signal(_: Option<Engines>) {}
}

pub use self::imp::handle_signal;
25 changes: 22 additions & 3 deletions src/bin/tikv-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use tikv::raftstore::coprocessor::CoprocessorHost;
use tikv::pd::{PdClient, RpcClient};
use tikv::util::time::Monitor;
use tikv::util::rocksdb::metrics_flusher::{MetricsFlusher, DEFAULT_FLUSHER_INTERVAL};
use tikv::import::{ImportSSTService, SSTImporter};
use tikv::import::{ImportKVServer, ImportSSTService, SSTImporter};

const RESERVED_OPEN_FDS: u64 = 1000;

Expand Down Expand Up @@ -283,7 +283,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
server
.start(server_cfg, security_mgr)
.unwrap_or_else(|e| fatal!("failed to start server: {:?}", e));
signal_handler::handle_signal(engines);
signal_handler::handle_signal(Some(engines));

// Stop.
server
Expand All @@ -299,6 +299,19 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
}
}

fn run_import_server(cfg: &TiKvConfig) {
let mut metric = cfg.metric.clone();
metric.job = "import-server".to_owned();
initial_metric(&metric, None);

let mut server = ImportKVServer::new(cfg);
server.start();
info!("import server started");
signal_handler::handle_signal(None);
server.shutdown();
info!("import server shutdown");
}

fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatches) {
if let Some(level) = matches.value_of("log-level") {
config.log_level = logger::get_level_by_string(level);
Expand Down Expand Up @@ -350,7 +363,7 @@ fn overwrite_config_with_cmd_args(config: &mut TiKvConfig, matches: &ArgMatches)
config.raft_store.capacity = capacity;
}

if matches.is_present("import-mode") {
if matches.is_present("import-mode") || matches.is_present("import-server") {
config.tune_for_import_mode();
}
}
Expand Down Expand Up @@ -527,6 +540,12 @@ fn main() {

configure_grpc_poll_strategy();

// Run as import server.
if matches.is_present("import-server") {
run_import_server(&config);
return;
}

let security_mgr = Arc::new(
SecurityManager::new(&config.security)
.unwrap_or_else(|e| fatal!("failed to create security manager: {:?}", e)),
Expand Down
2 changes: 2 additions & 0 deletions src/import/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ use std::result::Result;
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
pub import_dir: String,
pub num_threads: usize,
pub stream_channel_window: usize,
}

impl Default for Config {
fn default() -> Config {
Config {
import_dir: "/tmp/tikv/import".to_owned(),
num_threads: 8,
stream_channel_window: 128,
}
Expand Down
172 changes: 172 additions & 0 deletions src/import/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp;
use std::i32;
use std::fmt;
use std::ops::Deref;
use std::sync::Arc;
use std::path::Path;

use uuid::Uuid;

use rocksdb::{BlockBasedOptions, ColumnFamilyOptions, DBOptions, Writable, WriteBatch as RawBatch,
DB};
use kvproto::importpb::*;

use config::DbConfig;
use storage::CF_DEFAULT;
use storage::types::Key;
use util::config::MB;
use util::rocksdb::{new_engine_opt, CFOptions};
use util::rocksdb::properties::SizePropertiesCollectorFactory;

use super::Result;

pub struct Engine {
db: Arc<DB>,
uuid: Uuid,
}

impl Engine {
pub fn new<P: AsRef<Path>>(path: P, uuid: Uuid, opts: DbConfig) -> Result<Engine> {
let db = {
let (db_opts, cf_opts) = tune_dboptions_for_bulk_load(&opts);
new_engine_opt(path.as_ref().to_str().unwrap(), db_opts, vec![cf_opts])?
};
Ok(Engine {
db: Arc::new(db),
uuid: uuid,
})
}

pub fn uuid(&self) -> Uuid {
self.uuid
}

pub fn write(&self, mut batch: WriteBatch) -> Result<usize> {
// Just a guess.
let wb_cap = cmp::min(batch.get_mutations().len() * 128, MB as usize);
let wb = RawBatch::with_capacity(wb_cap);
let commit_ts = batch.get_commit_ts();
for m in batch.take_mutations().iter_mut() {
let k = Key::from_raw(m.get_key()).append_ts(commit_ts);
wb.put(k.encoded(), m.get_value()).unwrap();
}

let size = wb.data_size();
self.write_without_wal(wb)?;

Ok(size)
}
}

impl Deref for Engine {
type Target = DB;

fn deref(&self) -> &Self::Target {
&self.db
}
}

impl fmt::Debug for Engine {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Engine")
.field("uuid", &self.uuid())
.field("path", &self.path().to_owned())
.finish()
}
}

fn tune_dboptions_for_bulk_load(opts: &DbConfig) -> (DBOptions, CFOptions) {
const DISABLED: i32 = i32::MAX;

let mut db_opts = DBOptions::new();
db_opts.create_if_missing(true);
db_opts.enable_statistics(false);
// Vector memtable doesn't support concurrent write.
db_opts.allow_concurrent_memtable_write(false);
db_opts.set_use_direct_io_for_flush_and_compaction(true);
// RocksDB preserves `max_background_jobs/4` for flush.
db_opts.set_max_background_jobs(opts.max_background_jobs);

let mut block_base_opts = BlockBasedOptions::new();
// Use a large block size for sequential access.
block_base_opts.set_block_size(MB as usize);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.set_block_based_table_factory(&block_base_opts);
cf_opts.compression_per_level(&opts.defaultcf.compression_per_level);
// Consider using a large write buffer but be careful about OOM.
cf_opts.set_write_buffer_size(opts.defaultcf.write_buffer_size.0);
cf_opts.set_target_file_size_base(opts.defaultcf.write_buffer_size.0);
cf_opts.set_vector_memtable_factory(opts.defaultcf.write_buffer_size.0);
cf_opts.set_max_write_buffer_number(opts.defaultcf.max_write_buffer_number);
// Disable compaction and rate limit.
cf_opts.set_disable_auto_compactions(true);
cf_opts.set_soft_pending_compaction_bytes_limit(0);
cf_opts.set_hard_pending_compaction_bytes_limit(0);
cf_opts.set_level_zero_stop_writes_trigger(DISABLED);
cf_opts.set_level_zero_slowdown_writes_trigger(DISABLED);
// Add size properties to get approximate ranges wihout scan.
let f = Box::new(SizePropertiesCollectorFactory::default());
cf_opts.add_table_properties_collector_factory("tikv.size-properties-collector", f);

(db_opts, CFOptions::new(CF_DEFAULT, cf_opts))
}

#[cfg(test)]
mod tests {
use super::*;

use tempdir::TempDir;

fn new_engine() -> (TempDir, Engine) {
let dir = TempDir::new("test_import_engine").unwrap();
let uuid = Uuid::new_v4();
let opts = DbConfig::default();
let engine = Engine::new(dir.path(), uuid, opts).unwrap();
(dir, engine)
}

fn new_write_batch(n: u8, ts: u64) -> WriteBatch {
let mut wb = WriteBatch::new();
for i in 0..n {
let mut m = Mutation::new();
m.set_op(Mutation_OP::Put);
m.set_key(vec![i]);
m.set_value(vec![i]);
wb.mut_mutations().push(m);
}
wb.set_commit_ts(ts);
wb
}

fn new_encoded_key(i: u8, ts: u64) -> Vec<u8> {
Key::from_raw(&[i]).append_ts(ts).encoded().to_owned()
}

#[test]
fn test_write() {
let (_dir, engine) = new_engine();

let n = 10;
let commit_ts = 10;
let wb = new_write_batch(n, commit_ts);
engine.write(wb).unwrap();

for i in 0..n {
let key = new_encoded_key(i, commit_ts);
assert_eq!(engine.get(&key).unwrap().unwrap(), &[i]);
}
}
}
9 changes: 8 additions & 1 deletion src/import/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::result;

use futures::sync::oneshot::Canceled;
use grpc::Error as GrpcError;
use uuid::ParseError;
use uuid::{ParseError, Uuid};

use raftstore::errors::Error as RaftStoreError;
use util::codec::Error as CodecError;
Expand Down Expand Up @@ -82,6 +82,13 @@ quick_error! {
TokenNotFound(token: usize) {
display("Token {} not found", token)
}
EngineInUse(uuid: Uuid) {
display("Engine {} is in use", uuid)
}
EngineNotFound(uuid: Uuid) {
display("Engine {} not found", uuid)
}
InvalidChunk {}
}
}

Expand Down
Loading

0 comments on commit c1c8c82

Please sign in to comment.