Skip to content

Commit

Permalink
introduce only-decription mode
Browse files Browse the repository at this point in the history
Signed-off-by: hehechen <[email protected]>
  • Loading branch information
hehechen committed Jul 1, 2022
1 parent ca2f51f commit a38a76c
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 7 deletions.
34 changes: 32 additions & 2 deletions components/raftstore/src/engine_store_ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<T> UnwrapExternCFunc<T> for std::option::Option<T> {
pub struct RaftStoreProxy {
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
}

Expand All @@ -72,7 +72,7 @@ impl RaftStoreProxy {
pub fn new(
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
) -> Self {
RaftStoreProxy {
Expand Down Expand Up @@ -206,6 +206,14 @@ pub extern "C" fn ffi_batch_read_index(
fn_insert_batch_read_index_resp: Option<unsafe extern "C" fn(RawVoidPtr, BaseBuffView, u64)>,
) {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return;
}
_ => {}
}
}
debug_assert!(fn_insert_batch_read_index_resp.is_some());
if view.len != 0 {
assert_ne!(view.view, std::ptr::null());
Expand All @@ -223,6 +231,8 @@ pub extern "C" fn ffi_batch_read_index(
let resp = proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.batch_read_index(req_vec, time::Duration::from_millis(timeout_ms));
assert_ne!(res, std::ptr::null_mut());
for (r, region_id) in &resp {
Expand Down Expand Up @@ -295,12 +305,22 @@ pub extern "C" fn ffi_make_read_index_task(
req_view: BaseBuffView,
) -> RawRustPtr {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return RawRustPtr::default();
}
_ => {}
}
}
let mut req = kvrpcpb::ReadIndexRequest::default();
req.merge_from_bytes(req_view.to_slice()).unwrap();
let task = unsafe {
proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.make_read_index_task(req)
};
return match task {
Expand Down Expand Up @@ -346,6 +366,14 @@ pub extern "C" fn ffi_poll_read_index_task(
waker: RawVoidPtr,
) -> u8 {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return 0;
}
_ => {}
}
}
let task = unsafe {
&mut *(task_ptr as *mut crate::engine_store_ffi::read_index_helper::ReadIndexTask)
};
Expand All @@ -358,6 +386,8 @@ pub extern "C" fn ffi_poll_read_index_task(
proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.poll_read_index_task(task, waker)
} {
get_engine_store_server_helper().set_read_index_resp(resp_data, &res);
Expand Down
11 changes: 10 additions & 1 deletion components/server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ pub unsafe fn run_proxy(
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("only-decryption")
.long("only-decryption")
.help("Only do decryption in Proxy"),
)
.get_matches_from(args);

if matches.is_present("print-sample-config") {
Expand Down Expand Up @@ -241,7 +246,11 @@ pub unsafe fn run_proxy(
}

config.raft_store.engine_store_server_helper = engine_store_server_helper as *const _ as isize;
crate::server::run_tikv(config, engine_store_server_helper);
if matches.is_present("only-decryption") {
crate::server::run_tikv_only_decryption(config, engine_store_server_helper);
} else {
crate::server::run_tikv(config, engine_store_server_helper);
}
}

fn check_engine_label(matches: &clap::ArgMatches<'_>) {
Expand Down
117 changes: 115 additions & 2 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt
let mut proxy = RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
tikv.encryption_key_manager.clone(),
Box::new(ReadIndexClient::new(
Some(Box::new(ReadIndexClient::new(
tikv.router.clone(),
SysQuota::cpu_cores_quota() as usize * 2,
)),
))),
std::sync::RwLock::new(None),
);

Expand Down Expand Up @@ -239,6 +239,119 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt
}
}

/// Run a TiKV server only for decryption. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
pub unsafe fn run_tikv_only_decryption(
config: TiKvConfig,
engine_store_server_helper: &EngineStoreServerHelper,
) {
// Sets the global logger ASAP.
// It is okay to use the config w/o `validate()`,
// because `initial_logger()` handles various conditions.
initial_logger(&config);

// Print version information.
crate::log_proxy_info();

// Print resource quota.
SysQuota::log_quota();
CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota());

// Do some prepare works before start.
pre_start();

let _m = Monitor::default();

macro_rules! run_impl {
($ER: ty) => {{
let encryption_key_manager =
data_key_manager_from_config(&config.security.encryption, &config.storage.data_dir)
.map_err(|e| {
panic!(
"Encryption failed to initialize: {}. code: {}",
e,
e.error_code()
)
})
.unwrap()
.map(Arc::new);

let mut proxy = RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
encryption_key_manager.clone(),
Option::None,
std::sync::RwLock::new(None),
);

let proxy_helper = {
let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy);
proxy_helper.fn_server_info = Some(ffi_server_info);
proxy_helper
};

info!("set raft-store proxy helper");

engine_store_server_helper.handle_set_proxy(&proxy_helper);

info!("wait for engine-store server to start");
while engine_store_server_helper.handle_get_engine_store_server_status()
== EngineStoreServerStatus::Idle
{
thread::sleep(Duration::from_millis(200));
}

if engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Running
{
info!("engine-store server is not running, make proxy exit");
return;
}

info!("engine-store server is started");

proxy.set_status(RaftProxyStatus::Running);

{
debug_assert!(
engine_store_server_helper.handle_get_engine_store_server_status()
== EngineStoreServerStatus::Running
);
loop {
if engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Running
{
break;
}
thread::sleep(Duration::from_millis(200));
}
}

info!(
"found engine-store server status is {:?}, start to stop all services",
engine_store_server_helper.handle_get_engine_store_server_status()
);

proxy.set_status(RaftProxyStatus::Stopped);

info!("all services in raft-store proxy are stopped");

info!("wait for engine-store server to stop");
while engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Terminated
{
thread::sleep(Duration::from_millis(200));
}
info!("engine-store server is stopped");
}};
}

if !config.raft_engine.enable {
run_impl!(RocksEngine)
} else {
run_impl!(RaftLogEngine)
}
}

const RESERVED_OPEN_FDS: u64 = 1000;

const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);
Expand Down
4 changes: 2 additions & 2 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,10 @@ impl<T: Simulator> Cluster<T> {
let proxy = Box::new(raftstore::engine_store_ffi::RaftStoreProxy::new(
AtomicU8::new(raftstore::engine_store_ffi::RaftProxyStatus::Idle as u8),
key_mgr.clone(),
Box::new(raftstore::engine_store_ffi::ReadIndexClient::new(
Some(Box::new(raftstore::engine_store_ffi::ReadIndexClient::new(
router.clone(),
SysQuota::cpu_cores_quota() as usize * 2,
)),
))),
std::sync::RwLock::new(Some(engines.kv.clone())),
));

Expand Down

0 comments on commit a38a76c

Please sign in to comment.