Skip to content

Commit

Permalink
use tokio threadpool and thread local metrics for readpool (tikv#4486)
Browse files Browse the repository at this point in the history
* *:use tokio-threadpool and thread local metrics in Storage

Signed-off-by: Breezewish <[email protected]>
  • Loading branch information
fredbjer authored and siddontang committed Apr 14, 2019
1 parent e54f485 commit 5eac6bd
Show file tree
Hide file tree
Showing 27 changed files with 1,323 additions and 1,170 deletions.
94 changes: 66 additions & 28 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ smallvec = { version = "0.6", features = ["union"] }
flate2 = { version = "1.0", features = ["zlib"], default-features = false }
more-asserts = "0.1"
hyper = "0.12"
tokio-threadpool = "0.1"
tokio-threadpool = "0.1.13"
vlog = "0.1.4"
twoway = "0.2.0"
cop_datatype = { path = "components/cop_datatype" }
Expand Down
22 changes: 5 additions & 17 deletions components/test_coprocessor/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ use super::*;
use kvproto::kvrpcpb::Context;

use tikv::coprocessor::codec::Datum;
use tikv::coprocessor::{Endpoint, ReadPoolContext};
use tikv::server::readpool::{self, ReadPool};
use tikv::coprocessor::Endpoint;
use tikv::server::readpool;
use tikv::server::Config;
use tikv::storage::kv::RocksEngine;
use tikv::storage::{Engine, TestEngineBuilder};
use tikv_util::worker::FutureWorker;

#[derive(Clone)]
pub struct ProductTable(Table);
Expand Down Expand Up @@ -54,15 +53,7 @@ pub fn init_data_with_engine_and_commit<E: Engine>(
vals: &[(i64, Option<&str>, i64)],
commit: bool,
) -> (Store<E>, Endpoint<E>) {
init_data_with_details(
ctx,
engine,
tbl,
vals,
commit,
&Config::default(),
&readpool::Config::default_for_test(),
)
init_data_with_details(ctx, engine, tbl, vals, commit, &Config::default())
}

pub fn init_data_with_details<E: Engine>(
Expand All @@ -72,7 +63,6 @@ pub fn init_data_with_details<E: Engine>(
vals: &[(i64, Option<&str>, i64)],
commit: bool,
cfg: &Config,
read_pool_cfg: &readpool::Config,
) -> (Store<E>, Endpoint<E>) {
let mut store = Store::from_engine(engine);

Expand All @@ -88,10 +78,8 @@ pub fn init_data_with_details<E: Engine>(
if commit {
store.commit_with_ctx(ctx);
}
let pd_worker = FutureWorker::new("test-pd-worker");
let pool = ReadPool::new("readpool", read_pool_cfg, || {
ReadPoolContext::new(pd_worker.scheduler())
});

let pool = readpool::Builder::build_for_test();
let cop = Endpoint::new(cfg, store.get_engine(), pool);
(store, cop)
}
Expand Down
17 changes: 6 additions & 11 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::{Callback, SnapManager};
use tikv::raftstore::Result;
use tikv::server::load_statistics::ThreadLoad;
use tikv::server::readpool::ReadPool;
use tikv::server::readpool;
use tikv::server::resolve::{self, Task as ResolveTask};
use tikv::server::transport::RaftStoreRouter;
use tikv::server::transport::ServerRaftStoreRouter;
Expand All @@ -28,7 +28,8 @@ use tikv::server::{
create_raft_storage, Config, Error, Node, PdStoreAddrResolver, RaftClient, Server,
ServerTransport,
};
use tikv::storage::{self, RaftKv};

use tikv::storage::RaftKv;
use tikv_util::collections::{HashMap, HashSet};
use tikv_util::security::SecurityManager;
use tikv_util::worker::{FutureWorker, Worker};
Expand Down Expand Up @@ -126,11 +127,8 @@ impl Simulator for ServerCluster {
let sim_router = SimulateTransport::new(raft_router);

// Create storage.
let pd_worker = FutureWorker::new("test-future-worker");
let storage_read_pool =
ReadPool::new("store-read", &cfg.readpool.storage.build_config(), || {
storage::ReadPoolContext::new(pd_worker.scheduler())
});
let pd_worker = FutureWorker::new("test-pd-worker");
let storage_read_pool = readpool::Builder::build_for_test();
let store = create_raft_storage(
sim_router.clone(),
&cfg.storage,
Expand All @@ -155,12 +153,9 @@ impl Simulator for ServerCluster {
// Create pd client, snapshot manager, server.
let (worker, resolver) = resolve::new_resolver(Arc::clone(&self.pd_client)).unwrap();
let snap_mgr = SnapManager::new(tmp_str, Some(router.clone()));
let pd_worker = FutureWorker::new("test-pd-worker");
let server_cfg = Arc::new(cfg.server.clone());
let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap());
let cop_read_pool = ReadPool::new("cop", &cfg.readpool.coprocessor.build_config(), || {
coprocessor::ReadPoolContext::new(pd_worker.scheduler())
});
let cop_read_pool = readpool::Builder::build_for_test();
let cop = coprocessor::Endpoint::new(&server_cfg, store.get_engine(), cop_read_pool);
let mut server = None;
for _ in 0..100 {
Expand Down
1 change: 1 addition & 0 deletions components/tikv_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tikv_alloc = { path = "../tikv_alloc" }
tokio-core = "0.1"
tokio-timer = "0.2"
tokio-executor = "0.1"
tokio-threadpool = "0.1.13"
serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
Expand Down
79 changes: 79 additions & 0 deletions components/tikv_util/src/future_pool/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use tokio_threadpool::Builder as TokioBuilder;

use super::metrics::*;

pub struct Builder {
inner_builder: TokioBuilder,
name_prefix: Option<String>,
on_tick: Option<Box<dyn Fn() + Send + Sync>>,
}

impl Builder {
pub fn new() -> Self {
Self {
inner_builder: TokioBuilder::new(),
name_prefix: None,
on_tick: None,
}
}

pub fn pool_size(&mut self, val: usize) -> &mut Self {
self.inner_builder.pool_size(val);
self
}

pub fn stack_size(&mut self, val: usize) -> &mut Self {
self.inner_builder.stack_size(val);
self
}

pub fn name_prefix(&mut self, val: impl Into<String>) -> &mut Self {
let name = val.into();
self.name_prefix = Some(name.clone());
self.inner_builder.name_prefix(name);
self
}

pub fn on_tick<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.on_tick = Some(Box::new(f));
self
}

pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.inner_builder.before_stop(f);
self
}

pub fn after_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.inner_builder.after_start(f);
self
}

pub fn build(&mut self) -> super::FuturePool {
let name = if let Some(name) = &self.name_prefix {
name.as_str()
} else {
"future_pool"
};
let env = Arc::new(super::Env {
on_tick: self.on_tick.take(),
metrics_running_task_count: FUTUREPOOL_RUNNING_TASK_VEC.with_label_values(&[name]),
metrics_handled_task_count: FUTUREPOOL_HANDLED_TASK_VEC.with_label_values(&[name]),
});
let pool = Arc::new(self.inner_builder.build());
super::FuturePool { pool, env }
}
}
18 changes: 18 additions & 0 deletions components/tikv_util/src/future_pool/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use prometheus::*;

lazy_static! {
pub static ref FUTUREPOOL_RUNNING_TASK_VEC: IntGaugeVec = register_int_gauge_vec!(
"tikv_futurepool_pending_task_total",
"Current future_pool pending + running tasks.",
&["name"]
)
.unwrap();
pub static ref FUTUREPOOL_HANDLED_TASK_VEC: IntCounterVec = register_int_counter_vec!(
"tikv_futurepool_handled_task_total",
"Total number of future_pool handled tasks.",
&["name"]
)
.unwrap();
}
Loading

0 comments on commit 5eac6bd

Please sign in to comment.