Skip to content

Commit

Permalink
txn-file,remote-cop,kvengine: Improve efficiency for remote coprocess…
Browse files Browse the repository at this point in the history
…or (tikv#1676)

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Jul 30, 2024
1 parent 21634e4 commit 869b465
Show file tree
Hide file tree
Showing 11 changed files with 806 additions and 96 deletions.
14 changes: 14 additions & 0 deletions cmd/cse-ctl/src/check_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use cloud_encryption::MasterKey;
use futures::executor::block_on;
use kvengine::{
dfs::{DFSConfig, Dfs, S3Fs},
txn_chunk_manager::{with_pool_size, TxnChunkManager},
Engine, Shard, ShardMeta, SnapAccess, UserMeta,
};
use kvproto::keyspacepb::{KeyspaceMeta, KeyspaceState};
Expand Down Expand Up @@ -44,6 +45,8 @@ use txn_types::KvPair;

use crate::check_table::Handle::{Common, Int};

const TXN_CHUNK_WORKER_POOL_SIZE: usize = 2;

#[derive(Args)]
pub struct CheckTableArgs {
/// The path of the config file.
Expand Down Expand Up @@ -121,6 +124,12 @@ pub(crate) fn execute_check_table(args: CheckTableArgs) {
let master_key = s3fs
.get_runtime()
.block_on(config.security.new_master_key());
let txn_chunk_manager = TxnChunkManager::new(
None,
s3fs.clone(),
None,
with_pool_size(TXN_CHUNK_WORKER_POOL_SIZE),
);
let cluster_backup = get_cluster_backup_meta(&s3fs, config.backup_name.clone());
let keyspace_ids = if config.all {
let mut all_keyspace_ids = vec![];
Expand Down Expand Up @@ -178,6 +187,7 @@ pub(crate) fn execute_check_table(args: CheckTableArgs) {
shards,
s3fs.clone(),
master_key.clone(),
txn_chunk_manager.clone(),
));
let keyspace_prefix = api_version::ApiV2::get_txn_keyspace_prefix(keyspace_id);
let dbs = block_on(schema::load_schema(backup_reader.clone(), &keyspace_prefix)).unwrap();
Expand Down Expand Up @@ -441,6 +451,7 @@ pub(crate) struct BackupReader {
s3fs: Arc<S3Fs>,
snap_cache: Arc<Mutex<Option<SnapAccess>>>,
master_key: MasterKey,
txn_chunk_manager: TxnChunkManager,
}

impl BackupReader {
Expand All @@ -450,6 +461,7 @@ impl BackupReader {
metas: Vec<ShardMeta>,
s3fs: Arc<S3Fs>,
master_key: MasterKey,
txn_chunk_manager: TxnChunkManager,
) -> Self {
Self {
ts,
Expand All @@ -458,6 +470,7 @@ impl BackupReader {
s3fs,
snap_cache: Arc::new(Mutex::new(None)),
master_key,
txn_chunk_manager,
}
}

Expand Down Expand Up @@ -528,6 +541,7 @@ impl BackupReader {
false,
&self.master_key,
None,
self.txn_chunk_manager.clone(),
));
let mut guard = self.snap_cache.lock().unwrap();
*guard = Some(snap.clone());
Expand Down
16 changes: 14 additions & 2 deletions components/cloud_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use ::native_br::{backup::BackupConfig, restore::RestoreConfig};
use kvengine::{
dfs::{DFSConfig, Dfs, S3Fs},
table::ChecksumType,
txn_chunk_manager::{with_pool_handle, TxnChunkManager},
BLOCK_CACHE_KEY_SIZE,
};
use kvproto::metapb::Store;
Expand Down Expand Up @@ -110,7 +111,7 @@ fn start_server(
config_file_path: Option<PathBuf>,
thread_pool: Arc<Runtime>,
pd: Arc<dyn PdClient>,
) -> Box<dyn Future<Output = hyper::Result<()>> + Send + Unpin> {
) -> ServerFuture {
let dfs_config = config.dfs.clone();
let s3fs = Arc::new(kvengine::dfs::S3Fs::new(
dfs_config.prefix,
Expand Down Expand Up @@ -212,6 +213,16 @@ fn start_server(

let cop_limiter = CopLimiter::new(config.cop_limiter.clone());

// Create `TxnChunkManager` using `thread_pool`. Otherwise, as `TxnChunkManager`
// is hold in async context, we will meet the panic of dropping tokio
// runtime in async context.
let txn_chunk_manager = TxnChunkManager::new(
None,
s3fs.clone(),
block_cache.clone(),
with_pool_handle(thread_pool.handle().clone()),
);

let ctx = Arc::new(server::Context {
compression_lvl,
checksum_type,
Expand All @@ -225,6 +236,7 @@ fn start_server(
quota_limiter: Arc::new(QuotaLimiter::default()),
block_cache,
cop_limiter,
txn_chunk_manager,
});
let acceptor = security_mgr.acceptor(incoming).unwrap();
let server = start_serve!(ctx.clone(), acceptor);
Expand Down Expand Up @@ -282,7 +294,7 @@ fn start_server(
}
});

Box::new(ServerFuture::new(server, cop_server_opt))
ServerFuture::new(server, cop_server_opt)
}

pub struct CloudWorker {
Expand Down
1 change: 1 addition & 0 deletions components/cloud_worker/src/remote_cop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Tikv for CopService {
&resp.take_snapshot(),
&m_ctx.master_key,
m_ctx.block_cache.clone(),
m_ctx.txn_chunk_manager.clone(),
)
.await
.map_err(|e| tikv::coprocessor::Error::Other(format!("{:?}", e)))?;
Expand Down
3 changes: 3 additions & 0 deletions components/cloud_worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use kvengine::{
dfs,
dfs::{CacheFs, S3Fs},
table::{sstable::BlockCacheKey, ChecksumType},
txn_chunk_manager::TxnChunkManager,
SnapAccess,
};
use pd_client::PdClient;
Expand Down Expand Up @@ -70,6 +71,7 @@ pub(crate) struct Context {
pub quota_limiter: Arc<QuotaLimiter>,
pub block_cache: Option<moka::sync::SegmentedCache<BlockCacheKey, Bytes>>,
pub cop_limiter: CopLimiter,
pub txn_chunk_manager: TxnChunkManager,
}

#[macro_export]
Expand Down Expand Up @@ -197,6 +199,7 @@ async fn handle_remote_coprocessor(
snap_data,
&ctx.master_key,
ctx.block_cache.clone(),
ctx.txn_chunk_manager.clone(),
)
.await;
if let Err(err) = snap_access_res.as_ref() {
Expand Down
Loading

0 comments on commit 869b465

Please sign in to comment.