Skip to content

Commit

Permalink
[NES-20] NES 오브젝트 스토어 오브젝트 삭제 성능 개선 (#50)
Browse files Browse the repository at this point in the history
* common/Thread: add ThreadLambda

* rgw/rgw_bucket.cc: apply ThreadLambda to bucket purging process

* rgw/rgw_op.cc: apply ThreadLambda to multi object deletion process

* rgw/rgw_bucket.cc: move object deletion massage printing process to ThreadLambda when object purge

* rgw/rgw_op.cc: describe why send_partial_response() is outside ThreadLambda
  • Loading branch information
bgy217 authored Feb 21, 2024
1 parent 9daf9f4 commit 78f0bc8
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 45 deletions.
63 changes: 63 additions & 0 deletions src/common/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>

#include "include/compat.h"

Expand Down Expand Up @@ -80,4 +81,66 @@ std::thread make_named_thread(std::string_view n,
std::forward<Args>(args)...);
}, std::forward<Fun>(fun), std::forward<Args>(args)...);
}

// The "void" ReturnType is not implemented... Use "void*" instaed.
template<typename ReturnType, typename ... Params>
class ThreadLambda: public Thread {
private:
const char* op_thread_name = "thread_op";

bool done = false;

using Lambda = std::function<ReturnType (Params ...)>;
Lambda lambda = NULL;

ReturnType result;

using ParamTuple = std::tuple<Params...>;
ParamTuple params;

public:

ThreadLambda() {}
ThreadLambda(Lambda&& _lambda, Params... _params) : lambda(_lambda), params(make_tuple(_params...)) {}
~ThreadLambda() { stop(); }

void set_lambda(Lambda&& _lambda) { lambda = _lambda; }
void set_param(Params... _params) { params = std::make_tuple(_params...); }

bool is_done() { return done; }
ParamTuple get_param() { return params; }
ReturnType get_result() { return result; }

void * entry() override
{
result = process(params);
done = true;
return NULL;
}

template <typename Tuple>
ReturnType process(Tuple const& tuple)
{
return process(tuple, std::make_index_sequence<std::tuple_size<Tuple>::value>());
}

template <typename Tuple, std::size_t... I>
ReturnType process(Tuple const& tuple, std::index_sequence<I...>)
{
return lambda(std::get<I>(tuple)...);
}

void start() { create(op_thread_name); }
void stop() { if (is_started()) { join(); } }

void wait_done()
{
if (!is_started()) { return; }

if (lambda == NULL) { return; }

while (!done) { usleep(100); }
}
};

#endif
45 changes: 40 additions & 5 deletions src/rgw/rgw_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,48 @@ static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket,
return -ENOTEMPTY;
}

using ThreadObjDelete = ThreadLambda<int, rgw_obj_key>;
queue<ThreadObjDelete *> q_tods;

for (const auto& obj : objs) {
rgw_obj_key key(obj.key);
ret = rgw_remove_object(store, info, bucket, key);
if (ret < 0 && ret != -ENOENT) {
return ret;
}
rgw_obj_key each_obj_key(obj.key);

ThreadObjDelete* each_tod = new ThreadObjDelete(
[&store, &info, &bucket](rgw_obj_key key) -> int {
int ret = rgw_remove_object(store, info, bucket, key);
if (ret < 0 && ret != -ENOENT) {
lderr(store->ctx()) << "ERROR: failed to delete " << key.name << " (error code: " << ret << ")" << dendl;
}
else {
ret = 0;

dout(10) << key.name << " is deleted" << dendl;
}

return ret;
},
each_obj_key
);

each_tod->start();
q_tods.push(each_tod);
}

int err_code = 0;
while (!q_tods.empty()) {
ThreadObjDelete* each_tod = q_tods.front();

each_tod->wait_done();

int each_result = each_tod->get_result();
if (each_result == 0) { err_code = each_result; }

delete each_tod;
q_tods.pop();
}

if (err_code != 0) { return err_code; }

} while(is_truncated);

string prefix, delimiter;
Expand Down
117 changes: 77 additions & 40 deletions src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6556,28 +6556,32 @@ void RGWDeleteMultiObj::execute()
vector<rgw_obj_key>::iterator iter;
RGWMultiDelXMLParser parser;
RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);

using ThreadObjDelete = ThreadLambda<int, RGWDeleteMultiObj*, RGWRados::Object::Delete*>;
queue<ThreadObjDelete *> q_tods;

char* buf;

buf = data.c_str();
if (!buf) {
op_ret = -EINVAL;
goto error;
goto wrap_up;
}

if (!parser.init()) {
op_ret = -EINVAL;
goto error;
goto wrap_up;
}

if (!parser.parse(buf, data.length(), 1)) {
op_ret = -EINVAL;
goto error;
goto wrap_up;
}

multi_delete = static_cast<RGWMultiDelDelete *>(parser.find_first("Delete"));
if (!multi_delete) {
op_ret = -EINVAL;
goto error;
goto wrap_up;
} else {
#define DELETE_MULTI_OBJ_MAX_NUM 1000
int max_num = s->cct->_conf->rgw_delete_multi_obj_max_num;
Expand All @@ -6587,7 +6591,7 @@ void RGWDeleteMultiObj::execute()
int multi_delete_object_num = multi_delete->objects.size();
if (multi_delete_object_num > max_num) {
op_ret = -ERR_MALFORMED_XML;
goto error;
goto wrap_up;
}
}

Expand All @@ -6605,13 +6609,13 @@ void RGWDeleteMultiObj::execute()
if (has_versioned && !s->mfa_verified) {
ldpp_dout(this, 5) << "NOTICE: multi-object delete request with a versioned object, mfa auth not provided" << dendl;
op_ret = -ERR_MFA_REQUIRED;
goto error;
goto wrap_up;
}
}

begin_response();
if (multi_delete->objects.empty()) {
goto done;
goto wrap_up;
}

for (iter = multi_delete->objects.begin();
Expand Down Expand Up @@ -6640,9 +6644,9 @@ void RGWDeleteMultiObj::execute()
ARN(obj));
}
if ((e == Effect::Deny) ||
(usr_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
send_partial_response(*iter, false, "", -EACCES);
continue;
(usr_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
}

Expand Down Expand Up @@ -6673,49 +6677,82 @@ void RGWDeleteMultiObj::execute()

obj_ctx->set_atomic(obj);

RGWRados::Object del_target(store->getRados(), s->bucket_info, *obj_ctx, obj);
RGWRados::Object::Delete del_op(&del_target);
RGWRados::Object* del_target = new RGWRados::Object(store->getRados(), s->bucket_info, *obj_ctx, obj);
RGWRados::Object::Delete* del_op = new RGWRados::Object::Delete(del_target);

del_op.params.bucket_owner = s->bucket_owner.get_id();
del_op.params.versioning_status = s->bucket_info.versioning_status();
del_op.params.obj_owner = s->owner;
ldpp_dout(this, 20) << "NOTICE: make ThreadObjDelete" << dendl;
ThreadObjDelete* each_tod = new ThreadObjDelete(
[](RGWDeleteMultiObj* self, RGWRados::Object::Delete* del_op) -> int
{
rgw::sal::RGWRadosStore* store = self->store;
req_state* s = self->s;

op_ret = del_op.delete_obj(s->yield);
if (op_ret == -ENOENT) {
op_ret = 0;
}
rgw_obj obj = del_op->target->get_obj();

send_partial_response(*iter, del_op.result.delete_marker,
del_op.result.version_id, op_ret);
del_op->params.bucket_owner = s->bucket_owner.get_id();
del_op->params.versioning_status = s->bucket_info.versioning_status();
del_op->params.obj_owner = s->owner;

const auto obj_state = obj_ctx->get_state(obj);
bufferlist etag_bl;
const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
int op_ret = del_op->delete_obj(null_yield);
if (op_ret == -ENOENT) {
op_ret = 0;
}

const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, obj_state->mtime, etag,
del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
// this should be global conf (probably returnign a different handler)
// so we don't need to read the configured values before we perform it
}
const auto obj_state = s->obj_ctx->get_state(obj);

bufferlist etag_bl;
const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";

const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, obj_state->mtime, etag,
del_op->result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
store);

if (ret < 0) {
ldpp_dout(s, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
// this should be global conf (probably returnign a different handler)
// so we don't need to read the configured values before we perform it
}

ldpp_dout(s, 10) << "object deletion done!: " << obj << dendl;

return op_ret;
},
this, del_op
);

each_tod->start();
q_tods.push(each_tod);
}

/* set the return code to zero, errors at this point will be
dumped to the response */
op_ret = 0;

done:
// will likely segfault if begin_response() has not been called
end_response();
return;
wrap_up:
while (!q_tods.empty()) {
ThreadObjDelete* each_tod = q_tods.front();

error:
send_status();
return;
each_tod->wait_done();

RGWRados::Object::Delete* each_obj_delete = std::get<1>(each_tod->get_param());
int each_op_ret = each_tod->get_result();

rgw_obj obj = each_obj_delete->target->get_obj();

// The send_partial_response() is placed outside threads because it can't be performed simultaneously.
send_partial_response(obj.key, each_obj_delete->result.delete_marker, each_obj_delete->result.version_id, each_op_ret);

delete each_obj_delete->target;
delete each_obj_delete;

delete each_tod;
q_tods.pop();
}

(op_ret == 0) ? end_response() : send_status();

return;
}

bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo& binfo,
Expand Down

0 comments on commit 78f0bc8

Please sign in to comment.