Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove engine store server helper in raftstore #148

1 change: 0 additions & 1 deletion components/proxy_server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ pub unsafe fn run_proxy(
}

// Used in pre-handle snapshot.
config.raft_store.engine_store_server_helper = engine_store_server_helper as *const _ as isize;
if matches.is_present("only-decryption") {
crate::run::run_tikv_only_decryption(config, proxy_config, engine_store_server_helper);
} else {
Expand Down
15 changes: 11 additions & 4 deletions components/proxy_server/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ pub fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
proxy_config: ProxyConfig,
engine_store_server_helper: &EngineStoreServerHelper,
) {
let mut tikv = TiKvServer::<CER>::init(config, proxy_config);
let engine_store_server_helper_ptr = engine_store_server_helper as *const _ as isize;
let mut tikv = TiKvServer::<CER>::init(config, proxy_config, engine_store_server_helper_ptr);

// Must be called after `TiKvServer::init`.
let memory_limit = tikv.config.memory_usage_limit.unwrap().0;
Expand Down Expand Up @@ -458,6 +459,7 @@ const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1);
struct TiKvServer<ER: RaftEngine> {
config: TiKvConfig,
proxy_config: ProxyConfig,
engine_store_server_helper_ptr: isize,
cfg_controller: Option<ConfigController>,
security_mgr: Arc<SecurityManager>,
pd_client: Arc<RpcClient>,
Expand Down Expand Up @@ -501,7 +503,11 @@ type LocalServer<EK, ER> =
type LocalRaftKv<EK, ER> = RaftKv<EK, ServerRaftStoreRouter<EK, ER>>;

impl<ER: RaftEngine> TiKvServer<ER> {
fn init(mut config: TiKvConfig, proxy_config: ProxyConfig) -> TiKvServer<ER> {
fn init(
mut config: TiKvConfig,
proxy_config: ProxyConfig,
engine_store_server_helper_ptr: isize,
) -> TiKvServer<ER> {
tikv_util::thread_group::set_properties(Some(GroupProperties::default()));
// It is okay use pd config and security config before `init_config`,
// because these configs must be provided by command line, and only
Expand Down Expand Up @@ -556,6 +562,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
TiKvServer {
config,
proxy_config,
engine_store_server_helper_ptr,
cfg_controller: Some(cfg_controller),
security_mgr,
pd_client,
Expand Down Expand Up @@ -1116,7 +1123,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {

{
raftstore::engine_store_ffi::gen_engine_store_server_helper(
self.config.raft_store.engine_store_server_helper,
self.engine_store_server_helper_ptr,
)
.set_store(node.store());
info!("set store {} to engine-store", node.id());
Expand Down Expand Up @@ -1508,7 +1515,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
if status_enabled {
let mut status_server = match StatusServer::new(
raftstore::engine_store_ffi::gen_engine_store_server_helper(
self.config.raft_store.engine_store_server_helper,
self.engine_store_server_helper_ptr,
),
self.config.server.status_thread_pool_size,
self.cfg_controller.take().unwrap(),
Expand Down
16 changes: 13 additions & 3 deletions components/raftstore/src/engine_store_ffi/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,11 @@ impl ApplySnapshotObserver for TiFlashObserver {
Some(t) => {
let neer_retry = match t.recv.recv() {
Ok(snap_ptr) => {
info!("get prehandled snapshot success";
"peer_id" => ?snap_key,
"region" => ?ob_ctx.region(),
"pending" => self.engine.pending_applies_count.load(Ordering::SeqCst),
);
self.engine_store_server_helper
.apply_pre_handled_snapshot(snap_ptr.0);
false
Expand All @@ -739,6 +744,11 @@ impl ApplySnapshotObserver for TiFlashObserver {
self.engine
.pending_applies_count
.fetch_sub(1, Ordering::SeqCst);
info!("apply snapshot finished";
"peer_id" => ?snap_key,
"region" => ?ob_ctx.region(),
"pending" => self.engine.pending_applies_count.load(Ordering::SeqCst),
);
neer_retry
}
None => {
Expand All @@ -764,9 +774,9 @@ impl ApplySnapshotObserver for TiFlashObserver {
.apply_pre_handled_snapshot(ptr.0);
}
info!("apply snapshot finished";
"peer_id" => ?peer_id,
"region" => ?ob_ctx.region(),
"pending" => self.engine.pending_applies_count.load(Ordering::SeqCst),
"peer_id" => ?peer_id,
"region" => ?ob_ctx.region(),
"pending" => self.engine.pending_applies_count.load(Ordering::SeqCst),
);
Ok(())
}
Expand Down
4 changes: 0 additions & 4 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ with_prefix!(prefix_store "store-");
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
#[online_config(skip)]
pub engine_store_server_helper: isize,

// minimizes disruption when a partitioned node rejoins the cluster by using a two phase election.
#[online_config(skip)]
pub prevote: bool,
Expand Down Expand Up @@ -314,7 +311,6 @@ pub struct Config {
impl Default for Config {
fn default() -> Config {
Config {
engine_store_server_helper: 0,
prevote: true,
raftdb_path: String::new(),
capacity: ReadableSize(0),
Expand Down
50 changes: 0 additions & 50 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,6 @@ struct ApplyContext<EK>
where
EK: KvEngine,
{
pub engine_store_server_helper: &'static crate::engine_store_ffi::EngineStoreServerHelper,

tag: String,
timer: Option<Instant>,
host: CoprocessorHost<EK>,
Expand Down Expand Up @@ -449,10 +447,6 @@ where
) -> ApplyContext<EK> {
let kv_wb = engine.write_batch_with_cap(DEFAULT_APPLY_WB_SIZE);
ApplyContext {
engine_store_server_helper: crate::engine_store_ffi::gen_engine_store_server_helper(
cfg.engine_store_server_helper,
),

tag,
timer: None,
host,
Expand Down Expand Up @@ -1779,50 +1773,6 @@ where
Ok(())
}

fn handle_ingest_sst_for_engine_store(
&mut self,
ctx: &ApplyContext<EK>,
ssts: &Vec<SstMetaInfo>,
) -> EngineStoreApplyRes {
let mut ssts_wrap = vec![];
let mut sst_views = vec![];

for sst in ssts {
let sst = &sst.meta;
if sst.get_cf_name() == CF_LOCK {
panic!("should not ingest sst of lock cf");
}

if let Err(e) = check_sst_for_ingestion(sst, &self.region) {
error!(
"ingest fail";
"region_id" => self.region_id(),
"peer_id" => self.id(),
"sst" => ?sst,
"region" => ?&self.region,
"err" => ?e
);
// This file is not valid, we can delete it here.
let _ = ctx.importer.delete(sst);
continue;
}

ssts_wrap.push((
ctx.importer.get_path(sst),
crate::engine_store_ffi::name_to_cf(sst.get_cf_name()),
));
}

for (path, cf) in &ssts_wrap {
sst_views.push((path.to_str().unwrap().as_bytes(), *cf));
}

ctx.engine_store_server_helper.handle_ingest_sst(
sst_views,
RaftCmdHeader::new(self.region.get_id(), ctx.exec_log_index, ctx.exec_log_term),
)
}

fn handle_ingest_sst(
&mut self,
ctx: &mut ApplyContext<EK>,
Expand Down
50 changes: 0 additions & 50 deletions components/raftstore/src/store/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,56 +458,6 @@ pub struct PreHandledSnapshot {

unsafe impl Send for PreHandledSnapshot {}

impl Snapshot {
pub fn pre_handle_snapshot(
&self,
engine_store_server_helper: &'static crate::engine_store_ffi::EngineStoreServerHelper,
region: &kvproto::metapb::Region,
peer_id: u64,
index: u64,
term: u64,
) -> PreHandledSnapshot {
let mut sst_views = vec![];
let mut full_paths = vec![];
for cf_file in &self.cf_files {
// Skip empty cf file.
if cf_file.size.len() == 0 {
continue;
}

if cf_file.size[0] == 0 {
continue;
}

if plain_file_used(cf_file.cf) {
assert!(cf_file.cf == CF_LOCK);
}

// We have only one cf file.
let full_path = format!(
"{}/{}",
cf_file.path.to_str().unwrap(),
cf_file.file_names[0]
);
{
full_paths.push((full_path, engine_store_ffi::name_to_cf(cf_file.cf)));
}
}
for (s, cf) in full_paths.iter() {
sst_views.push((s.as_bytes(), *cf));
}

let res = engine_store_server_helper
.pre_handle_snapshot(&region, peer_id, sst_views, index, term);

PreHandledSnapshot {
index,
term,
inner: res,
}
}
}

#[derive(PartialEq, Eq, Clone, Copy)]
enum CheckPolicy {
ErrAllowed,
Expand Down
11 changes: 4 additions & 7 deletions components/raftstore/src/store/worker/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,14 +420,11 @@ where
coprocessor_host: self.coprocessor_host.clone(),
};
s.apply(options)?;
match self
.coprocessor_host
.post_apply_snapshot(&region, peer_id, &snap_key, Some(&s))
if let Err(e) =
self.coprocessor_host
.post_apply_snapshot(&region, peer_id, &snap_key, Some(&s))
{
Ok(_) => (),
Err(e) => {
return Err(box_err!("post apply snapshot error {:?}", e));
}
return Err(box_err!("post apply snapshot error {:?}", e));
};

let mut wb = self.engine.write_batch();
Expand Down
12 changes: 1 addition & 11 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,6 @@ impl<ER: RaftEngine> TiKvServer<ER> {
node.try_bootstrap_store(engines.engines.clone())
.unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e));

{
raftstore::engine_store_ffi::gen_engine_store_server_helper(
self.config.raft_store.engine_store_server_helper,
)
.set_store(node.store());
info!("set store {} to engine-store", node.id());
}

self.snap_mgr = Some(snap_mgr.clone());
// Create server
let server = Server::new(
Expand Down Expand Up @@ -1139,9 +1131,7 @@ impl<ER: RaftEngine> TiKvServer<ER> {
let status_enabled = !self.config.server.status_addr.is_empty();
if status_enabled {
let mut status_server = match StatusServer::new(
raftstore::engine_store_ffi::gen_engine_store_server_helper(
self.config.raft_store.engine_store_server_helper,
),
raftstore::engine_store_ffi::gen_engine_store_server_helper(0),
self.config.server.status_thread_pool_size,
self.cfg_controller.take().unwrap(),
Arc::new(self.config.security.clone()),
Expand Down
9 changes: 2 additions & 7 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,7 @@ impl<T: Simulator> Cluster<T> {
std::pin::Pin::new(&*engine_store_server_wrap),
));

// TODO(tiflash) Used by RegionRunner, to remove.
let mut node_cfg = self.cfg.clone();
let helper_sz = &*engine_store_server_helper as *const _ as isize;
node_cfg.raft_store.engine_store_server_helper = helper_sz;
let node_cfg = self.cfg.clone();
let ffi_helper_set = FFIHelperSet {
proxy,
proxy_helper,
Expand Down Expand Up @@ -463,9 +460,7 @@ impl<T: Simulator> Cluster<T> {
debug!("calling run node"; "node_id" => node_id);

let mut node_cfg = if self.ffi_helper_set.contains_key(&node_id) {
let mut node_cfg = self.cfg.clone();
node_cfg.raft_store.engine_store_server_helper =
&*self.ffi_helper_set[&node_id].engine_store_server_helper as *const _ as isize;
let node_cfg = self.cfg.clone();
node_cfg
} else {
let (ffi_helper_set, node_cfg) = self.make_ffi_helper_set(
Expand Down
2 changes: 1 addition & 1 deletion components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ impl ServerCluster {
let check_leader_runner = CheckLeaderRunner::new(store_meta.clone());
let check_leader_scheduler = bg_worker.start("check-leader", check_leader_runner);

let mut lock_mgr = LockManager::new(&cfg.pessimistic_txn);
let lock_mgr = LockManager::new(&cfg.pessimistic_txn);
let quota_limiter = Arc::new(QuotaLimiter::new(
cfg.quota.foreground_cpu_time,
cfg.quota.foreground_write_bandwidth,
Expand Down
12 changes: 0 additions & 12 deletions new-mock-engine-store/src/mock_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,6 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
.as_mut()
.unwrap()
.engine_store_server_helper = engine_store_server_helper_ptr;
// TODO(tiflash) when we pre handle snap with observer, this is useless.
let mut node_cfg = node_cfg;
node_cfg.raft_store.engine_store_server_helper = engine_store_server_helper_ptr;
let ffi_helper_set = FFIHelperSet {
proxy,
proxy_helper,
Expand Down Expand Up @@ -364,11 +361,6 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
let node_id = {
let mut sim = self.sim.wl();
let mut cfg = self.cfg.clone();
{
// TODO(tiflash) remove this when we use observer to pre handle snap.
cfg.raft_store.engine_store_server_helper =
engines.kv.engine_store_server_helper;
}
// Like TiKVServer::init
sim.run_node(
0,
Expand Down Expand Up @@ -756,10 +748,6 @@ impl<T: Simulator<TiFlashEngine>> Cluster<T> {
if let Some(labels) = self.labels.get(&node_id) {
cfg.server.labels = labels.to_owned();
}
{
// TODO(tiflash) remove this when we use observer to pre handle snap.
cfg.raft_store.engine_store_server_helper = engines.kv.engine_store_server_helper;
}
let store_meta = match self.store_metas.entry(node_id) {
MapEntry::Occupied(o) => {
let mut meta = o.get().lock().unwrap();
Expand Down
2 changes: 0 additions & 2 deletions new-mock-engine-store/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ impl Simulator<TiFlashEngine> for NodeCluster {
) -> ServerResult<u64> {
assert!(node_id == 0 || !self.nodes.contains_key(&node_id));
assert_ne!(engines.kv.engine_store_server_helper, 0);
// TODO(tiflash) can be remove when we pre handle snap outside.
assert_ne!(cfg.raft_store.engine_store_server_helper, 0);

let pd_worker = LazyWorker::new("test-pd-worker");

Expand Down