From 99d38d58a1d00730ba550d4a45248edac7b04fca Mon Sep 17 00:00:00 2001 From: bgy217 Date: Mon, 13 May 2024 16:56:12 +0900 Subject: [PATCH 1/4] src/common/Thread: add done_track and result_track to ThreadLambda class --- src/common/Thread.h | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/common/Thread.h b/src/common/Thread.h index 11fef69a9fab0..b27049745f987 100644 --- a/src/common/Thread.h +++ b/src/common/Thread.h @@ -88,12 +88,14 @@ class ThreadLambda: public Thread { private: const char* op_thread_name = "thread_op"; - bool done = false; + bool done = false; + bool* done_track = NULL; using Lambda = std::function; Lambda lambda = NULL; ReturnType result; + ReturnType* result_track = NULL; using ParamTuple = std::tuple; ParamTuple params; @@ -111,6 +113,8 @@ class ThreadLambda: public Thread { void set_lambda(Lambda&& _lambda) { lambda = _lambda; } void set_param(Params... _params) { params = std::make_tuple(_params...); } + void set_done_track(bool* _done_track) { done_track = _done_track; } + void set_result_track(ReturnType* _result_track) { result_track = _result_track; } bool is_done() { return done; } bool reset_done() { @@ -131,8 +135,23 @@ class ThreadLambda: public Thread { void * entry() override { + if (done_track != NULL) { + *done_track = false; + } + result = process(params); done = true; + + if (result_track != NULL) { + *result_track = result; + result_track = NULL; + } + + if (done_track != NULL) { + *done_track = true; + done_track = NULL; + } + return NULL; } From 937cbaca08498527c8fe9679bf8c07ebb02396cb Mon Sep 17 00:00:00 2001 From: bgy217 Date: Mon, 13 May 2024 16:57:31 +0900 Subject: [PATCH 2/4] src/common/WorkQueue: add tracked_run, run_with_result, tracked_run_with_result to ThreadLambdaPool class --- src/common/WorkQueue.h | 70 +++++++++++++++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h index 2501d8f5f0b36..d517a30c0f74c 100644 --- a/src/common/WorkQueue.h +++ b/src/common/WorkQueue.h @@ -764,6 +764,25 @@ class ThreadLambdaPool { // run std::mutex r_mutex; + TLclass* get_next_thread() { + TLclass* return_thread; + + do { + int tries = 0; + for ( ; tries < wait_done_retries; tries++) { + return_thread = threads[next_tid]; + if (return_thread->wait_done(1)) { break; } + } + + next_tid = (next_tid + 1) % size; + + if (tries < wait_done_retries) { break; } + + } while (true); + + return return_thread; + } + public: ThreadLambdaPool(Lambda&& _lambda_template, unsigned int _size = 1000) : lambda_template(_lambda_template), size(_size) { @@ -783,29 +802,50 @@ class ThreadLambdaPool { void run(Params... _params) { unique_lock r_lock(r_mutex); - TLclass* allocated_thread; - do { - int tries = 0; - for ( ; tries < wait_done_retries; tries++) { - allocated_thread = threads[next_tid]; - if (allocated_thread->wait_done(1)) { break; } - } + TLclass* allocated_thread = get_next_thread(); - if (tries == wait_done_retries) { - next_tid = (next_tid + 1) % size; - } - else { - break; - } - } while (true); + allocated_thread->reset_done(); + allocated_thread->set_param(_params ...); + + allocated_thread->start(); + } + + void tracked_run(bool* pdone, Params... _params) { + unique_lock r_lock(r_mutex); + + TLclass* allocated_thread = get_next_thread(); + + allocated_thread->reset_done(); + allocated_thread->set_done_track(pdone); + allocated_thread->set_param(_params ...); + + allocated_thread->start(); + } + + void run_with_result(ReturnType* presult, Params... _params) { + unique_lock r_lock(r_mutex); + + TLclass* allocated_thread = get_next_thread(); allocated_thread->reset_done(); + allocated_thread->set_result_track(presult); allocated_thread->set_param(_params ...); allocated_thread->start(); + } + + void tracked_run_with_result(bool* pdone, ReturnType* presult, Params... _params) { + unique_lock r_lock(r_mutex); - next_tid = (next_tid + 1) % size; + TLclass* allocated_thread = get_next_thread(); + + allocated_thread->reset_done(); + allocated_thread->set_done_track(pdone); + allocated_thread->set_result_track(presult); + allocated_thread->set_param(_params ...); + + allocated_thread->start(); } }; From 44d1868f156adb674b2e46f946aae1b3aa9194f1 Mon Sep 17 00:00:00 2001 From: bgy217 Date: Mon, 13 May 2024 16:58:52 +0900 Subject: [PATCH 3/4] rgw/rgw_bucket: make rgw_remove_bucket() to use run_with_result method --- src/rgw/rgw_bucket.cc | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 9d4915d18284b..00ff773888b7c 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -363,9 +363,9 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket, bool is_truncated = false; - using ThreadObjDelPool = ThreadLambdaPool*, std::mutex*>; + using ThreadObjDelPool = ThreadLambdaPool; ThreadObjDelPool* tod_pool = new ThreadObjDelPool( - [&store, &info, &bucket](rgw_obj_key key, map* result_map, std::mutex* mu_mutex) -> int { + [&store, &info, &bucket](rgw_obj_key key) -> int { int ret = rgw_remove_object(store, info, bucket, key); if (ret < 0 && ret != -ENOENT) { lderr(store->ctx()) << "ERROR: failed to delete " << key.name << " (error code: " << ret << ")" << dendl; @@ -376,11 +376,6 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket, dout(10) << key.name << " is deleted" << dendl; } - { - unique_lock mu_lock(*mu_mutex); - result_map->insert(make_pair(key.name, ret)); - } - return ret; }, cct->_conf->rgw_delete_thread_num @@ -400,13 +395,18 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket, } // map_update - std::mutex mu_mutex; map result_map; for (const auto& obj : objs) { rgw_obj_key each_obj_key(obj.key); - tod_pool->run(each_obj_key, &result_map, &mu_mutex); + std::pair::iterator,bool> insert_ret; + + insert_ret = result_map.insert(make_pair(each_obj_key.name, 9999)); + + if (insert_ret.second == true) { + tod_pool->run_with_result(&(insert_ret.first->second), each_obj_key); + } } int err_code = 0; @@ -415,9 +415,14 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket, rgw_obj_key each_obj_key(obj.key); map::iterator found; + found = result_map.find(each_obj_key.name); + + if (found == result_map.end()) { continue; } + + int each_result; do { - found = result_map.find(each_obj_key.name); - if (found == result_map.end()) { + each_result = found->second; + if (each_result > 0) { usleep(100); } else { @@ -425,7 +430,6 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket, } } while (true); - int each_result = found->second; if (each_result != 0) { err_code = each_result; } } From ed6284f5474334d81ba71d74ace54d10713f9624 Mon Sep 17 00:00:00 2001 From: bgy217 Date: Mon, 13 May 2024 17:03:30 +0900 Subject: [PATCH 4/4] rgw/{rgw_op, rgw_rados}: make multi_delete_objs rgw request to use tracked_run --- src/rgw/rgw_op.cc | 40 +++++++++++++--------------------------- src/rgw/rgw_rados.h | 1 + 2 files changed, 14 insertions(+), 27 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 8e6702c155432..52742fa1a34ef 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -6551,17 +6551,18 @@ void RGWDeleteMultiObj::pre_exec() rgw_bucket_object_pre_exec(s); } -using ThreadObjDelPool = ThreadLambdaPool*, std::mutex*>; +using ThreadObjDelPool = ThreadLambdaPool; void RGWDeleteMultiObj::execute() { - // map_update - std::mutex mu_mutex; - static ThreadObjDelPool* tod_pool; if (tod_pool == nullptr) { int tp_size = s->cct->_conf->rgw_delete_thread_num; + tod_pool = new ThreadObjDelPool( - [](req_state* s, rgw::sal::RGWRadosStore* store, RGWRados::Object::Delete* del_op, map* result_map, std::mutex* mu_mutex) -> int { + [](RGWDeleteMultiObj* _this, RGWRados::Object::Delete* del_op) -> int { + req_state* s = _this->s; + rgw::sal::RGWRadosStore* store = _this->store; + rgw_obj obj = del_op->target->get_obj(); del_op->params.bucket_owner = s->bucket_owner.get_id(); @@ -6577,11 +6578,6 @@ void RGWDeleteMultiObj::execute() del_op->result.op_ret = op_ret; - { - unique_lock mu_lock(*mu_mutex); - result_map->insert(make_pair(obj.key, del_op)); - } - const auto obj_state = s->obj_ctx->get_state(obj); bufferlist etag_bl; @@ -6609,8 +6605,7 @@ void RGWDeleteMultiObj::execute() RGWMultiDelXMLParser parser; RGWObjectCtx *obj_ctx = static_cast(s->obj_ctx); - std::vector run_objects; - map result_map; + vector dop_results; char* buf; @@ -6730,8 +6725,8 @@ void RGWDeleteMultiObj::execute() RGWRados::Object* del_target = new RGWRados::Object(store->getRados(), s->bucket_info, *obj_ctx, obj); RGWRados::Object::Delete* del_op = new RGWRados::Object::Delete(del_target); - run_objects.push_back(obj.key); - tod_pool->run(s, store, del_op, &result_map, &mu_mutex); + dop_results.push_back(del_op); + tod_pool->tracked_run(&(del_op->delete_done), this, del_op); } /* set the return code to zero, errors at this point will be @@ -6739,21 +6734,12 @@ void RGWDeleteMultiObj::execute() op_ret = 0; wrap_up: - for (iter = run_objects.begin(); iter != run_objects.end(); ++iter) { - rgw_obj_key each_obj_key = *iter; + auto result_iter = dop_results.begin(); + for (; result_iter != dop_results.end(); ++result_iter) { + RGWRados::Object::Delete* each_obj_delete = *result_iter; - map::iterator found; - do { - found = result_map.find(each_obj_key); - if (found == result_map.end()) { - usleep(10); - } - else { - break; - } - } while (true); + while (!(each_obj_delete->delete_done)) { usleep(1); }; - RGWRados::Object::Delete* each_obj_delete = found->second; int each_op_ret = each_obj_delete->result.op_ret; rgw_obj obj = each_obj_delete->target->get_obj(); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index c50cd6c64609c..bb584f4ed2ca3 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -834,6 +834,7 @@ class RGWRados struct Delete { RGWRados::Object *target; + bool delete_done = false; struct DeleteParams { rgw_user bucket_owner;