Skip to content

Commit

Permalink
introduce only-decription mode
Browse files Browse the repository at this point in the history
Signed-off-by: hehechen <[email protected]>
  • Loading branch information
hehechen committed Jul 4, 2022
1 parent 45194f2 commit 3a50e1f
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 7 deletions.
34 changes: 32 additions & 2 deletions components/raftstore/src/engine_store_ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<T> UnwrapExternCFunc<T> for std::option::Option<T> {
pub struct RaftStoreProxy {
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
}

Expand All @@ -78,7 +78,7 @@ impl RaftStoreProxy {
pub fn new(
status: AtomicU8,
key_manager: Option<Arc<DataKeyManager>>,
read_index_client: Box<dyn read_index_helper::ReadIndex>,
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
kv_engine: std::sync::RwLock<Option<engine_rocks::RocksEngine>>,
) -> Self {
RaftStoreProxy {
Expand Down Expand Up @@ -212,6 +212,14 @@ pub extern "C" fn ffi_batch_read_index(
fn_insert_batch_read_index_resp: Option<unsafe extern "C" fn(RawVoidPtr, BaseBuffView, u64)>,
) {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return;
}
_ => {}
}
}
debug_assert!(fn_insert_batch_read_index_resp.is_some());
if view.len != 0 {
assert_ne!(view.view, std::ptr::null());
Expand All @@ -229,6 +237,8 @@ pub extern "C" fn ffi_batch_read_index(
let resp = proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.batch_read_index(req_vec, time::Duration::from_millis(timeout_ms));
assert_ne!(res, std::ptr::null_mut());
for (r, region_id) in &resp {
Expand Down Expand Up @@ -301,12 +311,22 @@ pub extern "C" fn ffi_make_read_index_task(
req_view: BaseBuffView,
) -> RawRustPtr {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return RawRustPtr::default();
}
_ => {}
}
}
let mut req = kvrpcpb::ReadIndexRequest::default();
req.merge_from_bytes(req_view.to_slice()).unwrap();
let task = unsafe {
proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.make_read_index_task(req)
};
return match task {
Expand Down Expand Up @@ -352,6 +372,14 @@ pub extern "C" fn ffi_poll_read_index_task(
waker: RawVoidPtr,
) -> u8 {
assert!(!proxy_ptr.is_null());
unsafe {
match proxy_ptr.as_ref().read_index_client {
Option::None => {
return 0;
}
_ => {}
}
}
let task = unsafe {
&mut *(task_ptr as *mut crate::engine_store_ffi::read_index_helper::ReadIndexTask)
};
Expand All @@ -364,6 +392,8 @@ pub extern "C" fn ffi_poll_read_index_task(
proxy_ptr
.as_ref()
.read_index_client
.as_ref()
.unwrap()
.poll_read_index_task(task, waker)
} {
get_engine_store_server_helper().set_read_index_resp(resp_data, &res);
Expand Down
11 changes: 10 additions & 1 deletion components/server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ pub unsafe fn run_proxy(
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("only-decryption")
.long("only-decryption")
.help("Only do decryption in Proxy"),
)
.get_matches_from(args);

if matches.is_present("print-sample-config") {
Expand Down Expand Up @@ -245,7 +250,11 @@ pub unsafe fn run_proxy(
}

config.raft_store.engine_store_server_helper = engine_store_server_helper as *const _ as isize;
crate::server::run_tikv(config, engine_store_server_helper);
if matches.is_present("only-decryption") {
crate::server::run_tikv_only_decryption(config, engine_store_server_helper);
} else {
crate::server::run_tikv(config, engine_store_server_helper);
}
}

fn check_engine_label(matches: &clap::ArgMatches<'_>) {
Expand Down
121 changes: 119 additions & 2 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
let mut proxy = RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
tikv.encryption_key_manager.clone(),
Box::new(ReadIndexClient::new(
Some(Box::new(ReadIndexClient::new(
tikv.router.clone(),
SysQuota::cpu_cores_quota() as usize * 2,
)),
))),
std::sync::RwLock::new(None),
);

Expand Down Expand Up @@ -221,6 +221,91 @@ fn run_impl<CER: ConfiguredRaftEngine, F: KvFormat>(
info!("engine-store server is stopped");
}

#[inline]
fn run_impl_only_for_decryption<CER: ConfiguredRaftEngine, F: KvFormat>(
config: TiKvConfig,
engine_store_server_helper: &EngineStoreServerHelper,
) {
let encryption_key_manager =
data_key_manager_from_config(&config.security.encryption, &config.storage.data_dir)
.map_err(|e| {
panic!(
"Encryption failed to initialize: {}. code: {}",
e,
e.error_code()
)
})
.unwrap()
.map(Arc::new);

let mut proxy = RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
encryption_key_manager.clone(),
Option::None,
std::sync::RwLock::new(None),
);

let proxy_helper = {
let mut proxy_helper = RaftStoreProxyFFIHelper::new(&proxy);
proxy_helper.fn_server_info = Some(ffi_server_info);
proxy_helper
};

info!("set raft-store proxy helper");

engine_store_server_helper.handle_set_proxy(&proxy_helper);

info!("wait for engine-store server to start");
while engine_store_server_helper.handle_get_engine_store_server_status()
== EngineStoreServerStatus::Idle
{
thread::sleep(Duration::from_millis(200));
}

if engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Running
{
info!("engine-store server is not running, make proxy exit");
return;
}

info!("engine-store server is started");

proxy.set_status(RaftProxyStatus::Running);

{
debug_assert!(
engine_store_server_helper.handle_get_engine_store_server_status()
== EngineStoreServerStatus::Running
);
loop {
if engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Running
{
break;
}
thread::sleep(Duration::from_millis(200));
}
}

info!(
"found engine-store server status is {:?}, start to stop all services",
engine_store_server_helper.handle_get_engine_store_server_status()
);

proxy.set_status(RaftProxyStatus::Stopped);

info!("all services in raft-store proxy are stopped");

info!("wait for engine-store server to stop");
while engine_store_server_helper.handle_get_engine_store_server_status()
!= EngineStoreServerStatus::Terminated
{
thread::sleep(Duration::from_millis(200));
}
info!("engine-store server is stopped");
}

/// Run a TiKV server. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineStoreServerHelper) {
Expand Down Expand Up @@ -250,6 +335,38 @@ pub unsafe fn run_tikv(config: TiKvConfig, engine_store_server_helper: &EngineSt
})
}

/// Run a TiKV server only for decryption. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
pub unsafe fn run_tikv_only_decryption(
config: TiKvConfig,
engine_store_server_helper: &EngineStoreServerHelper,
) {
// Sets the global logger ASAP.
// It is okay to use the config w/o `validate()`,
// because `initial_logger()` handles various conditions.
initial_logger(&config);

// Print version information.
crate::log_proxy_info();

// Print resource quota.
SysQuota::log_quota();
CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota());

// Do some prepare works before start.
pre_start();

let _m = Monitor::default();

dispatch_api_version!(config.storage.api_version(), {
if !config.raft_engine.enable {
run_impl_only_for_decryption::<RocksEngine, API>(config, engine_store_server_helper)
} else {
run_impl_only_for_decryption::<RaftLogEngine, API>(config, engine_store_server_helper)
}
})
}

const RESERVED_OPEN_FDS: u64 = 1000;

const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);
Expand Down
4 changes: 2 additions & 2 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ impl<T: Simulator> Cluster<T> {
let proxy = Box::new(raftstore::engine_store_ffi::RaftStoreProxy::new(
AtomicU8::new(raftstore::engine_store_ffi::RaftProxyStatus::Idle as u8),
key_mgr.clone(),
Box::new(raftstore::engine_store_ffi::ReadIndexClient::new(
Some(Box::new(raftstore::engine_store_ffi::ReadIndexClient::new(
router.clone(),
SysQuota::cpu_cores_quota() as usize * 2,
)),
))),
std::sync::RwLock::new(Some(engines.kv.clone())),
));

Expand Down

0 comments on commit 3a50e1f

Please sign in to comment.