Skip to content

Commit

Permalink
rgw/rgw_op.cc: apply ThreadLambda to multi object deletion process
Browse files Browse the repository at this point in the history
  • Loading branch information
bgy217 committed Feb 20, 2024
1 parent 0567663 commit 075f424
Showing 1 changed file with 76 additions and 40 deletions.
116 changes: 76 additions & 40 deletions src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6556,28 +6556,32 @@ void RGWDeleteMultiObj::execute()
vector<rgw_obj_key>::iterator iter;
RGWMultiDelXMLParser parser;
RGWObjectCtx *obj_ctx = static_cast<RGWObjectCtx *>(s->obj_ctx);

using ThreadObjDelete = ThreadLambda<int, RGWDeleteMultiObj*, RGWRados::Object::Delete*>;
queue<ThreadObjDelete *> q_tods;

char* buf;

buf = data.c_str();
if (!buf) {
op_ret = -EINVAL;
goto error;
goto wrap_up;
}

if (!parser.init()) {
op_ret = -EINVAL;
goto error;
goto wrap_up;
}

if (!parser.parse(buf, data.length(), 1)) {
op_ret = -EINVAL;
goto error;
goto wrap_up;
}

multi_delete = static_cast<RGWMultiDelDelete *>(parser.find_first("Delete"));
if (!multi_delete) {
op_ret = -EINVAL;
goto error;
goto wrap_up;
} else {
#define DELETE_MULTI_OBJ_MAX_NUM 1000
int max_num = s->cct->_conf->rgw_delete_multi_obj_max_num;
Expand All @@ -6587,7 +6591,7 @@ void RGWDeleteMultiObj::execute()
int multi_delete_object_num = multi_delete->objects.size();
if (multi_delete_object_num > max_num) {
op_ret = -ERR_MALFORMED_XML;
goto error;
goto wrap_up;
}
}

Expand All @@ -6605,13 +6609,13 @@ void RGWDeleteMultiObj::execute()
if (has_versioned && !s->mfa_verified) {
ldpp_dout(this, 5) << "NOTICE: multi-object delete request with a versioned object, mfa auth not provided" << dendl;
op_ret = -ERR_MFA_REQUIRED;
goto error;
goto wrap_up;
}
}

begin_response();
if (multi_delete->objects.empty()) {
goto done;
goto wrap_up;
}

for (iter = multi_delete->objects.begin();
Expand Down Expand Up @@ -6640,9 +6644,9 @@ void RGWDeleteMultiObj::execute()
ARN(obj));
}
if ((e == Effect::Deny) ||
(usr_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
send_partial_response(*iter, false, "", -EACCES);
continue;
(usr_policy_res == Effect::Pass && e == Effect::Pass && !acl_allowed)) {
send_partial_response(*iter, false, "", -EACCES);
continue;
}
}

Expand Down Expand Up @@ -6673,49 +6677,81 @@ void RGWDeleteMultiObj::execute()

obj_ctx->set_atomic(obj);

RGWRados::Object del_target(store->getRados(), s->bucket_info, *obj_ctx, obj);
RGWRados::Object::Delete del_op(&del_target);
RGWRados::Object* del_target = new RGWRados::Object(store->getRados(), s->bucket_info, *obj_ctx, obj);
RGWRados::Object::Delete* del_op = new RGWRados::Object::Delete(del_target);

del_op.params.bucket_owner = s->bucket_owner.get_id();
del_op.params.versioning_status = s->bucket_info.versioning_status();
del_op.params.obj_owner = s->owner;
ldpp_dout(this, 20) << "NOTICE: make ThreadObjDelete" << dendl;
ThreadObjDelete* each_tod = new ThreadObjDelete(
[](RGWDeleteMultiObj* self, RGWRados::Object::Delete* del_op) -> int
{
rgw::sal::RGWRadosStore* store = self->store;
req_state* s = self->s;

op_ret = del_op.delete_obj(s->yield);
if (op_ret == -ENOENT) {
op_ret = 0;
}
rgw_obj obj = del_op->target->get_obj();

send_partial_response(*iter, del_op.result.delete_marker,
del_op.result.version_id, op_ret);
del_op->params.bucket_owner = s->bucket_owner.get_id();
del_op->params.versioning_status = s->bucket_info.versioning_status();
del_op->params.obj_owner = s->owner;

const auto obj_state = obj_ctx->get_state(obj);
bufferlist etag_bl;
const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";
int op_ret = del_op->delete_obj(null_yield);
if (op_ret == -ENOENT) {
op_ret = 0;
}

const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, obj_state->mtime, etag,
del_op.result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
store);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
// this should be global conf (probably returnign a different handler)
// so we don't need to read the configured values before we perform it
}
const auto obj_state = s->obj_ctx->get_state(obj);

bufferlist etag_bl;
const auto etag = obj_state->get_attr(RGW_ATTR_ETAG, etag_bl) ? etag_bl.to_str() : "";

const auto ret = rgw::notify::publish(s, obj.key, obj_state->size, obj_state->mtime, etag,
del_op->result.delete_marker && s->object.instance.empty() ? rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete,
store);

if (ret < 0) {
ldpp_dout(s, 5) << "WARNING: publishing notification failed, with error: " << ret << dendl;
// TODO: we should have conf to make send a blocking coroutine and reply with error in case sending failed
// this should be global conf (probably returnign a different handler)
// so we don't need to read the configured values before we perform it
}

ldpp_dout(s, 10) << "object deletion done!: " << obj << dendl;

return op_ret;
},
this, del_op
);

each_tod->start();
q_tods.push(each_tod);
}

/* set the return code to zero, errors at this point will be
dumped to the response */
op_ret = 0;

done:
// will likely segfault if begin_response() has not been called
end_response();
return;
wrap_up:
while (!q_tods.empty()) {
ThreadObjDelete* each_tod = q_tods.front();

error:
send_status();
return;
each_tod->wait_done();

RGWRados::Object::Delete* each_obj_delete = std::get<1>(each_tod->get_param());
int each_op_ret = each_tod->get_result();

rgw_obj obj = each_obj_delete->target->get_obj();

send_partial_response(obj.key, each_obj_delete->result.delete_marker, each_obj_delete->result.version_id, each_op_ret);

delete each_obj_delete->target;
delete each_obj_delete;

delete each_tod;
q_tods.pop();
}

(op_ret == 0) ? end_response() : send_status();

return;
}

bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo& binfo,
Expand Down

0 comments on commit 075f424

Please sign in to comment.