Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NES-70] NES 다중 오브젝트 삭제 중 발생하는 간헐적인 크래시 해소 #56

Merged
merged 4 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/common/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ class ThreadLambda: public Thread {
private:
const char* op_thread_name = "thread_op";

bool done = false;
bool done = false;
bool* done_track = NULL;

using Lambda = std::function<ReturnType (Params ...)>;
Lambda lambda = NULL;

ReturnType result;
ReturnType* result_track = NULL;

using ParamTuple = std::tuple<Params...>;
ParamTuple params;
Expand All @@ -111,6 +113,8 @@ class ThreadLambda: public Thread {

void set_lambda(Lambda&& _lambda) { lambda = _lambda; }
void set_param(Params... _params) { params = std::make_tuple(_params...); }
void set_done_track(bool* _done_track) { done_track = _done_track; }
void set_result_track(ReturnType* _result_track) { result_track = _result_track; }

bool is_done() { return done; }
bool reset_done() {
Expand All @@ -131,8 +135,23 @@ class ThreadLambda: public Thread {

void * entry() override
{
if (done_track != NULL) {
*done_track = false;
}

result = process(params);
done = true;

if (result_track != NULL) {
*result_track = result;
result_track = NULL;
}

if (done_track != NULL) {
*done_track = true;
done_track = NULL;
}

return NULL;
}

Expand Down
70 changes: 55 additions & 15 deletions src/common/WorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,25 @@ class ThreadLambdaPool {
// run
std::mutex r_mutex;

TLclass* get_next_thread() {
TLclass* return_thread;

do {
int tries = 0;
for ( ; tries < wait_done_retries; tries++) {
return_thread = threads[next_tid];
if (return_thread->wait_done(1)) { break; }
}

next_tid = (next_tid + 1) % size;

if (tries < wait_done_retries) { break; }

} while (true);

return return_thread;
}

public:
ThreadLambdaPool(Lambda&& _lambda_template, unsigned int _size = 1000) : lambda_template(_lambda_template), size(_size)
{
Expand All @@ -783,29 +802,50 @@ class ThreadLambdaPool {

void run(Params... _params) {
unique_lock<std::mutex> r_lock(r_mutex);
TLclass* allocated_thread;

do {
int tries = 0;
for ( ; tries < wait_done_retries; tries++) {
allocated_thread = threads[next_tid];
if (allocated_thread->wait_done(1)) { break; }
}
TLclass* allocated_thread = get_next_thread();

if (tries == wait_done_retries) {
next_tid = (next_tid + 1) % size;
}
else {
break;
}
} while (true);
allocated_thread->reset_done();
allocated_thread->set_param(_params ...);

allocated_thread->start();
}

void tracked_run(bool* pdone, Params... _params) {
unique_lock<std::mutex> r_lock(r_mutex);

TLclass* allocated_thread = get_next_thread();

allocated_thread->reset_done();
allocated_thread->set_done_track(pdone);
allocated_thread->set_param(_params ...);

allocated_thread->start();
}

void run_with_result(ReturnType* presult, Params... _params) {
unique_lock<std::mutex> r_lock(r_mutex);

TLclass* allocated_thread = get_next_thread();

allocated_thread->reset_done();
allocated_thread->set_result_track(presult);
allocated_thread->set_param(_params ...);

allocated_thread->start();
}

void tracked_run_with_result(bool* pdone, ReturnType* presult, Params... _params) {
unique_lock<std::mutex> r_lock(r_mutex);

next_tid = (next_tid + 1) % size;
TLclass* allocated_thread = get_next_thread();

allocated_thread->reset_done();
allocated_thread->set_done_track(pdone);
allocated_thread->set_result_track(presult);
allocated_thread->set_param(_params ...);

allocated_thread->start();
}
};

Expand Down
28 changes: 16 additions & 12 deletions src/rgw/rgw_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,9 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket,

bool is_truncated = false;

using ThreadObjDelPool = ThreadLambdaPool<int, rgw_obj_key, map<string, int>*, std::mutex*>;
using ThreadObjDelPool = ThreadLambdaPool<int, rgw_obj_key>;
ThreadObjDelPool* tod_pool = new ThreadObjDelPool(
[&store, &info, &bucket](rgw_obj_key key, map<string, int>* result_map, std::mutex* mu_mutex) -> int {
[&store, &info, &bucket](rgw_obj_key key) -> int {
int ret = rgw_remove_object(store, info, bucket, key);
if (ret < 0 && ret != -ENOENT) {
lderr(store->ctx()) << "ERROR: failed to delete " << key.name << " (error code: " << ret << ")" << dendl;
Expand All @@ -376,11 +376,6 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket,
dout(10) << key.name << " is deleted" << dendl;
}

{
unique_lock<std::mutex> mu_lock(*mu_mutex);
result_map->insert(make_pair(key.name, ret));
}

return ret;
},
cct->_conf->rgw_delete_thread_num
Expand All @@ -400,13 +395,18 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket,
}

// map_update
std::mutex mu_mutex;
map<string, int> result_map;

for (const auto& obj : objs) {
rgw_obj_key each_obj_key(obj.key);

tod_pool->run(each_obj_key, &result_map, &mu_mutex);
std::pair<std::map<string, int>::iterator,bool> insert_ret;

insert_ret = result_map.insert(make_pair(each_obj_key.name, 9999));

if (insert_ret.second == true) {
tod_pool->run_with_result(&(insert_ret.first->second), each_obj_key);
}
}

int err_code = 0;
Expand All @@ -415,17 +415,21 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket,
rgw_obj_key each_obj_key(obj.key);

map<string, int>::iterator found;
found = result_map.find(each_obj_key.name);

if (found == result_map.end()) { continue; }

int each_result;
do {
found = result_map.find(each_obj_key.name);
if (found == result_map.end()) {
each_result = found->second;
if (each_result > 0) {
usleep(100);
}
else {
break;
}
} while (true);

int each_result = found->second;
if (each_result != 0) { err_code = each_result; }
}

Expand Down
40 changes: 13 additions & 27 deletions src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6551,17 +6551,18 @@ void RGWDeleteMultiObj::pre_exec()
rgw_bucket_object_pre_exec(s);
}

using ThreadObjDelPool = ThreadLambdaPool<int, req_state*, rgw::sal::RGWRadosStore*, RGWRados::Object::Delete*, map<rgw_obj_key, RGWRados::Object::Delete*>*, std::mutex*>;
using ThreadObjDelPool = ThreadLambdaPool<int, RGWDeleteMultiObj*, RGWRados::Object::Delete*>;
void RGWDeleteMultiObj::execute()
{
// map_update
std::mutex mu_mutex;

static ThreadObjDelPool* tod_pool;
if (tod_pool == nullptr) {
int tp_size = s->cct->_conf->rgw_delete_thread_num;

tod_pool = new ThreadObjDelPool(
[](req_state* s, rgw::sal::RGWRadosStore* store, RGWRados::Object::Delete* del_op, map<rgw_obj_key, RGWRados::Object::Delete*>* result_map, std::mutex* mu_mutex) -> int {
[](RGWDeleteMultiObj* _this, RGWRados::Object::Delete* del_op) -> int {
req_state* s = _this->s;
rgw::sal::RGWRadosStore* store = _this->store;

rgw_obj obj = del_op->target->get_obj();

del_op->params.bucket_owner = s->bucket_owner.get_id();
Expand All @@ -6577,11 +6578,6 @@ void RGWDeleteMultiObj::execute()

del_op->result.op_ret = op_ret;

{
unique_lock<std::mutex> mu_lock(*mu_mutex);
result_map->insert(make_pair(obj.key, del_op));
}

const auto obj_state = s->obj_ctx->get_state(obj);

bufferlist etag_bl;
Expand Down Expand Up @@ -6609,8 +6605,7 @@ void RGWDeleteMultiObj::execute()
RGWMultiDelXMLParser parser;
RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);

std::vector<rgw_obj_key> run_objects;
map<rgw_obj_key, RGWRados::Object::Delete*> result_map;
vector<RGWRados::Object::Delete*> dop_results;

char* buf;

Expand Down Expand Up @@ -6730,30 +6725,21 @@ void RGWDeleteMultiObj::execute()
RGWRados::Object* del_target = new RGWRados::Object(store->getRados(), s->bucket_info, *obj_ctx, obj);
RGWRados::Object::Delete* del_op = new RGWRados::Object::Delete(del_target);

run_objects.push_back(obj.key);
tod_pool->run(s, store, del_op, &result_map, &mu_mutex);
dop_results.push_back(del_op);
tod_pool->tracked_run(&(del_op->delete_done), this, del_op);
}

/* set the return code to zero, errors at this point will be
dumped to the response */
op_ret = 0;

wrap_up:
for (iter = run_objects.begin(); iter != run_objects.end(); ++iter) {
rgw_obj_key each_obj_key = *iter;
auto result_iter = dop_results.begin();
for (; result_iter != dop_results.end(); ++result_iter) {
RGWRados::Object::Delete* each_obj_delete = *result_iter;

map<rgw_obj_key, RGWRados::Object::Delete*>::iterator found;
do {
found = result_map.find(each_obj_key);
if (found == result_map.end()) {
usleep(10);
}
else {
break;
}
} while (true);
while (!(each_obj_delete->delete_done)) { usleep(1); };

RGWRados::Object::Delete* each_obj_delete = found->second;
int each_op_ret = each_obj_delete->result.op_ret;

rgw_obj obj = each_obj_delete->target->get_obj();
Expand Down
1 change: 1 addition & 0 deletions src/rgw/rgw_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ class RGWRados

struct Delete {
RGWRados::Object *target;
bool delete_done = false;

struct DeleteParams {
rgw_user bucket_owner;
Expand Down