From 7b783876960d39de1b87d55135c4207325c4ce69 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 14 Oct 2024 19:42:59 -0700 Subject: [PATCH 01/21] crimson: remove watchers upon object deletion Fixes: https://tracker.ceph.com/issues/68538 Signed-off-by: Samuel Just --- src/crimson/osd/ops_executer.cc | 36 ++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index df4f73d4077d1..4464466eff0d7 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -678,16 +678,32 @@ OpsExecuter::do_execute_op(OSDOp& osd_op) whiteout = true; } return do_write_op([this, whiteout](auto& backend, auto& os, auto& txn) { - int num_bytes = 0; - // Calculate num_bytes to be removed - if (obc->obs.oi.soid.is_snap()) { - ceph_assert(obc->ssc->snapset.clone_overlap.count(obc->obs.oi.soid.snap)); - num_bytes = obc->ssc->snapset.get_clone_bytes(obc->obs.oi.soid.snap); - } else { - num_bytes = obc->obs.oi.size; - } - return backend.remove(os, txn, *osd_op_params, - delta_stats, whiteout, num_bytes); + struct emptyctx_t {}; + return with_effect_on_obc( + emptyctx_t{}, + [&](auto &ctx) { + int num_bytes = 0; + // Calculate num_bytes to be removed + if (obc->obs.oi.soid.is_snap()) { + ceph_assert(obc->ssc->snapset.clone_overlap.count( + obc->obs.oi.soid.snap)); + num_bytes = obc->ssc->snapset.get_clone_bytes( + obc->obs.oi.soid.snap); + } else { + num_bytes = obc->obs.oi.size; + } + return backend.remove(os, txn, *osd_op_params, + delta_stats, whiteout, num_bytes); + }, + [](auto &&ctx, ObjectContextRef obc, Ref) { + return seastar::do_for_each( + obc->watchers, + [](auto &p) { return p.second->remove(); } + ).then([obc] { + obc->watchers.clear(); + return seastar::now(); + }); + }); }); } case CEPH_OSD_OP_CALL: From 1f99108d197f1c579838107d4b57be806b6807e1 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 14 Oct 2024 18:46:37 -0700 Subject: [PATCH 02/21] crimson: add missing field to SUBLOGDPPI and LOGDPPI SUBLOGDPPI and LOGDPPI need an extra {} for the interrupt_cond. Signed-off-by: Samuel Just --- src/crimson/common/log.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crimson/common/log.h b/src/crimson/common/log.h index 4f564ac044d05..c38b225c94b4f 100644 --- a/src/crimson/common/log.h +++ b/src/crimson/common/log.h @@ -90,7 +90,7 @@ static inline seastar::log_level to_log_level(int level) { #define SUBLOGDPP(subname_, level_, MSG, dpp, ...) \ LOGGER(subname_).log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__) #define SUBLOGDPPI(subname_, level_, MSG, dpp, ...) \ - LOGGER(subname_).log(level_, "{} {}: " MSG, \ + LOGGER(subname_).log(level_, "{} {} {}: " MSG, \ interruptor::get_interrupt_cond(), dpp, FNAME , ##__VA_ARGS__) #define SUBTRACEDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::trace, __VA_ARGS__) #define SUBTRACEDPPI(subname_, ...) SUBLOGDPPI(subname_, seastar::log_level::trace, __VA_ARGS__) @@ -106,7 +106,7 @@ static inline seastar::log_level to_log_level(int level) { #define LOGDPP(level_, MSG, dpp, ...) \ LOCAL_LOGGER.log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__) #define LOGDPPI(level_, MSG, dpp, ...) \ - LOCAL_LOGGER.log(level_, "{} {}: " MSG, \ + LOCAL_LOGGER.log(level_, "{} {} {}: " MSG, \ interruptor::get_interrupt_cond(), dpp, FNAME , ##__VA_ARGS__) #define TRACEDPP(...) LOGDPP(seastar::log_level::trace, __VA_ARGS__) #define TRACEDPPI(...) LOGDPPI(seastar::log_level::trace, __VA_ARGS__) From 4bea366e5de5b110086c8174eaf39798448ff77f Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 27 Aug 2024 19:08:10 +0000 Subject: [PATCH 03/21] crimson: fix typo OpsExecutor->OpsExecuter Signed-off-by: Samuel Just --- src/crimson/osd/ops_executer.cc | 8 ++++---- src/crimson/osd/ops_executer.h | 4 ++-- src/crimson/osd/pg_backend.cc | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 4464466eff0d7..0a07fa7ee293e 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -504,7 +504,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps( auto p = ss.clone_snaps.find(clone); if (p == ss.clone_snaps.end()) { logger().error( - "OpsExecutor::do_list_snaps: {} has inconsistent " + "OpsExecuter::do_list_snaps: {} has inconsistent " "clone_snaps, missing clone {}", os.oi.soid, clone); @@ -518,7 +518,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps( auto p = ss.clone_overlap.find(clone); if (p == ss.clone_overlap.end()) { logger().error( - "OpsExecutor::do_list_snaps: {} has inconsistent " + "OpsExecuter::do_list_snaps: {} has inconsistent " "clone_overlap, missing clone {}", os.oi.soid, clone); @@ -532,7 +532,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps( auto p = ss.clone_size.find(clone); if (p == ss.clone_size.end()) { logger().error( - "OpsExecutor::do_list_snaps: {} has inconsistent " + "OpsExecuter::do_list_snaps: {} has inconsistent " "clone_size, missing clone {}", os.oi.soid, clone); @@ -551,7 +551,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps( } resp.seq = ss.seq; logger().error( - "OpsExecutor::do_list_snaps: {}, resp.clones.size(): {}", + "OpsExecuter::do_list_snaps: {}, resp.clones.size(): {}", os.oi.soid, resp.clones.size()); resp.encode(osd_op.outdata); diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 0dea7d0515e93..e25a035616edd 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -213,10 +213,10 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { * execute_clone * * If snapc contains a snap which occurred logically after the last write - * seen by this object (see OpsExecutor::should_clone()), we first need + * seen by this object (see OpsExecuter::should_clone()), we first need * make a clone of the object at its current state. execute_clone primes * txn with that clone operation and returns an - * OpsExecutor::CloningContext which will allow us to fill in the corresponding + * OpsExecuter::CloningContext which will allow us to fill in the corresponding * metadata and log_entries once the operations have been processed. * * Note that this strategy differs from classic, which instead performs this diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index fa8201b61c28d..24a381b4cf7e2 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -1289,7 +1289,7 @@ void PGBackend::clone( const ObjectState& d_os, ceph::os::Transaction& txn) { - // See OpsExecutor::execute_clone documentation + // See OpsExecuter::execute_clone documentation txn.clone(coll->get_cid(), ghobject_t{os.oi.soid}, ghobject_t{d_os.oi.soid}); { ceph::bufferlist bv; From a7812e095c13debcd844883db5888bdf5a185170 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 10 Sep 2024 23:52:32 +0000 Subject: [PATCH 04/21] crimson/.../internal_client_request: remove unnecessary system_shutdown guard Signed-off-by: Samuel Just --- .../osd_operations/internal_client_request.cc | 150 +++++++++--------- 1 file changed, 74 insertions(+), 76 deletions(-) diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index a19bb0826f004..b1224f6e25942 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -53,88 +53,86 @@ CommonPGPipeline& InternalClientRequest::client_pp() seastar::future<> InternalClientRequest::start() { track_event(); - return crimson::common::handle_system_shutdown([this] { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("{}: in repeat", *this); - - return interruptor::with_interruption([this]() mutable { - return enter_stage( - client_pp().wait_for_active - ).then_interruptible([this] { - return with_blocking_event([this] (auto&& trigger) { - return pg->wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible([this] { - return enter_stage( - client_pp().recover_missing); - }).then_interruptible([this] { - return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); - }).then_interruptible([this](bool unfound) { - if (unfound) { - throw std::system_error( - std::make_error_code(std::errc::operation_canceled), - fmt::format("{} is unfound, drop it!", get_target_oid())); - } - return enter_stage( - client_pp().get_obc); - }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("{}: getting obc lock", *this); - return seastar::do_with(create_osd_ops(), - [this](auto& osd_ops) mutable { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("InternalClientRequest: got {} OSDOps to execute", - std::size(osd_ops)); + LOG_PREFIX(InternalClientRequest::start); + DEBUGI("{}: in repeat", *this); + + return interruptor::with_interruption([this]() mutable { + return enter_stage( + client_pp().wait_for_active + ).then_interruptible([this] { + return with_blocking_event([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible([this] { + return enter_stage( + client_pp().recover_missing); + }).then_interruptible([this] { + return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); + }).then_interruptible([this](bool unfound) { + if (unfound) { + throw std::system_error( + std::make_error_code(std::errc::operation_canceled), + fmt::format("{} is unfound, drop it!", get_target_oid())); + } + return enter_stage( + client_pp().get_obc); + }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { + LOG_PREFIX(InternalClientRequest::start); + DEBUGI("{}: getting obc lock", *this); + return seastar::do_with( + create_osd_ops(), + [this](auto& osd_ops) mutable { + LOG_PREFIX(InternalClientRequest::start); + DEBUGI("InternalClientRequest: got {} OSDOps to execute", + std::size(osd_ops)); [[maybe_unused]] const int ret = op_info.set_from_op( std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); assert(ret == 0); // call with_locked_obc() in order, but wait concurrently for loading. enter_stage_sync(client_pp().lock_obc); - return pg->with_locked_obc(get_target_oid(), op_info, - [&osd_ops, this](auto, auto obc) { - return enter_stage(client_pp().process - ).then_interruptible( - [obc=std::move(obc), &osd_ops, this] { - return pg->do_osd_ops( - std::move(obc), - osd_ops, - std::as_const(op_info), - get_do_osd_ops_params() - ).safe_then_unpack_interruptible( - [](auto submitted, auto all_completed) { - return all_completed.handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - return seastar::now(); - })); - }, crimson::ct_error::eagain::handle([] { - return interruptor::now(); - }) - ); - }); - }); + return pg->with_locked_obc( + get_target_oid(), op_info, + [&osd_ops, this](auto, auto obc) { + return enter_stage(client_pp().process + ).then_interruptible( + [obc=std::move(obc), &osd_ops, this] { + return pg->do_osd_ops( + std::move(obc), + osd_ops, + std::as_const(op_info), + get_do_osd_ops_params() + ).safe_then_unpack_interruptible( + [](auto submitted, auto all_completed) { + return all_completed.handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + return seastar::now(); + })); + }, crimson::ct_error::eagain::handle([] { + return interruptor::now(); + }) + ); + }); + }); }); - }).si_then([this] { - logger().debug("{}: complete", *this); - return handle.complete(); - }).handle_error_interruptible( - PG::load_obc_ertr::all_same_way([] { - return seastar::now(); - }) - ); - }, [](std::exception_ptr eptr) { - return seastar::now(); - }, pg, start_epoch - - ).then([this] { - track_event(); - }).handle_exception_type([](std::system_error &error) { - logger().debug("error {}, message: {}", error.code(), error.what()); - return seastar::now(); - }).finally([this] { - logger().debug("{}: exit", *this); - handle.exit(); - }); + }).si_then([this] { + logger().debug("{}: complete", *this); + return handle.complete(); + }).handle_error_interruptible( + PG::load_obc_ertr::all_same_way([] { + return seastar::now(); + }) + ); + }, [](std::exception_ptr eptr) { + return seastar::now(); + }, pg, start_epoch).then([this] { + track_event(); + }).handle_exception_type([](std::system_error &error) { + logger().debug("error {}, message: {}", error.code(), error.what()); + return seastar::now(); + }).finally([this] { + logger().debug("{}: exit", *this); + handle.exit(); }); } From a091414c67ba9f1407c3756dd75ca2aa3b1074ac Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 11 Sep 2024 01:31:57 +0000 Subject: [PATCH 05/21] crimson/.../internal_client_request: factor out with_interruption Signed-off-by: Samuel Just --- .../osd_operations/internal_client_request.cc | 138 +++++++++--------- .../osd_operations/internal_client_request.h | 2 + 2 files changed, 74 insertions(+), 66 deletions(-) diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index b1224f6e25942..d4213928a3e46 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -50,6 +50,77 @@ CommonPGPipeline& InternalClientRequest::client_pp() return pg->request_pg_pipeline; } +InternalClientRequest::interruptible_future<> +InternalClientRequest::with_interruption() +{ + return enter_stage( + client_pp().wait_for_active + ).then_interruptible([this] { + return with_blocking_event([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + }).then_interruptible([this] { + return enter_stage( + client_pp().recover_missing); + }).then_interruptible([this] { + return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); + }).then_interruptible([this](bool unfound) { + if (unfound) { + throw std::system_error( + std::make_error_code(std::errc::operation_canceled), + fmt::format("{} is unfound, drop it!", get_target_oid())); + } + return enter_stage( + client_pp().get_obc); + }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { + LOG_PREFIX(InternalClientRequest::with_interruption); + DEBUGI("{}: getting obc lock", *this); + return seastar::do_with( + create_osd_ops(), + [this](auto& osd_ops) mutable { + LOG_PREFIX(InternalClientRequest::with_interruption); + DEBUGI("InternalClientRequest: got {} OSDOps to execute", + std::size(osd_ops)); + [[maybe_unused]] const int ret = op_info.set_from_op( + std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); + assert(ret == 0); + // call with_locked_obc() in order, but wait concurrently for loading. + enter_stage_sync(client_pp().lock_obc); + return pg->with_locked_obc( + get_target_oid(), op_info, + [&osd_ops, this](auto, auto obc) { + return enter_stage(client_pp().process + ).then_interruptible( + [obc=std::move(obc), &osd_ops, this] { + return pg->do_osd_ops( + std::move(obc), + osd_ops, + std::as_const(op_info), + get_do_osd_ops_params() + ).safe_then_unpack_interruptible( + [](auto submitted, auto all_completed) { + return all_completed.handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + return seastar::now(); + })); + }, crimson::ct_error::eagain::handle([] { + return interruptor::now(); + }) + ); + }); + }); + }); + }).si_then([this] { + logger().debug("{}: complete", *this); + return handle.complete(); + }).handle_error_interruptible( + PG::load_obc_ertr::all_same_way([] { + return seastar::now(); + }) + ); +} + seastar::future<> InternalClientRequest::start() { track_event(); @@ -57,72 +128,7 @@ seastar::future<> InternalClientRequest::start() DEBUGI("{}: in repeat", *this); return interruptor::with_interruption([this]() mutable { - return enter_stage( - client_pp().wait_for_active - ).then_interruptible([this] { - return with_blocking_event([this] (auto&& trigger) { - return pg->wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible([this] { - return enter_stage( - client_pp().recover_missing); - }).then_interruptible([this] { - return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); - }).then_interruptible([this](bool unfound) { - if (unfound) { - throw std::system_error( - std::make_error_code(std::errc::operation_canceled), - fmt::format("{} is unfound, drop it!", get_target_oid())); - } - return enter_stage( - client_pp().get_obc); - }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("{}: getting obc lock", *this); - return seastar::do_with( - create_osd_ops(), - [this](auto& osd_ops) mutable { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("InternalClientRequest: got {} OSDOps to execute", - std::size(osd_ops)); - [[maybe_unused]] const int ret = op_info.set_from_op( - std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); - assert(ret == 0); - // call with_locked_obc() in order, but wait concurrently for loading. - enter_stage_sync(client_pp().lock_obc); - return pg->with_locked_obc( - get_target_oid(), op_info, - [&osd_ops, this](auto, auto obc) { - return enter_stage(client_pp().process - ).then_interruptible( - [obc=std::move(obc), &osd_ops, this] { - return pg->do_osd_ops( - std::move(obc), - osd_ops, - std::as_const(op_info), - get_do_osd_ops_params() - ).safe_then_unpack_interruptible( - [](auto submitted, auto all_completed) { - return all_completed.handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - return seastar::now(); - })); - }, crimson::ct_error::eagain::handle([] { - return interruptor::now(); - }) - ); - }); - }); - }); - }).si_then([this] { - logger().debug("{}: complete", *this); - return handle.complete(); - }).handle_error_interruptible( - PG::load_obc_ertr::all_same_way([] { - return seastar::now(); - }) - ); + return with_interruption(); }, [](std::exception_ptr eptr) { return seastar::now(); }, pg, start_epoch).then([this] { diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h index f198e58464338..2f3585013344d 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.h +++ b/src/crimson/osd/osd_operations/internal_client_request.h @@ -41,6 +41,8 @@ class InternalClientRequest : public PhasedOperationT, CommonPGPipeline& client_pp(); + InternalClientRequest::interruptible_future<> with_interruption(); + seastar::future<> do_process(); Ref pg; From 238f3e573d48a082f49713cfa310110190ee521d Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 11 Sep 2024 21:16:51 +0000 Subject: [PATCH 06/21] crimson/.../internal_client_request: convert with_interruption to coroutine Signed-off-by: Samuel Just --- .../osd_operations/internal_client_request.cc | 123 +++++++++--------- 1 file changed, 61 insertions(+), 62 deletions(-) diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index d4213928a3e46..d0aa0822f8030 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -53,72 +53,71 @@ CommonPGPipeline& InternalClientRequest::client_pp() InternalClientRequest::interruptible_future<> InternalClientRequest::with_interruption() { - return enter_stage( + LOG_PREFIX(InternalClientRequest::with_interruption); + co_await enter_stage( client_pp().wait_for_active - ).then_interruptible([this] { - return with_blocking_event([this] (auto&& trigger) { - return pg->wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible([this] { - return enter_stage( - client_pp().recover_missing); - }).then_interruptible([this] { - return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); - }).then_interruptible([this](bool unfound) { - if (unfound) { - throw std::system_error( - std::make_error_code(std::errc::operation_canceled), - fmt::format("{} is unfound, drop it!", get_target_oid())); - } - return enter_stage( - client_pp().get_obc); - }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { - LOG_PREFIX(InternalClientRequest::with_interruption); - DEBUGI("{}: getting obc lock", *this); - return seastar::do_with( - create_osd_ops(), - [this](auto& osd_ops) mutable { - LOG_PREFIX(InternalClientRequest::with_interruption); - DEBUGI("InternalClientRequest: got {} OSDOps to execute", - std::size(osd_ops)); - [[maybe_unused]] const int ret = op_info.set_from_op( - std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); - assert(ret == 0); - // call with_locked_obc() in order, but wait concurrently for loading. - enter_stage_sync(client_pp().lock_obc); - return pg->with_locked_obc( - get_target_oid(), op_info, - [&osd_ops, this](auto, auto obc) { - return enter_stage(client_pp().process - ).then_interruptible( - [obc=std::move(obc), &osd_ops, this] { - return pg->do_osd_ops( - std::move(obc), - osd_ops, - std::as_const(op_info), - get_do_osd_ops_params() - ).safe_then_unpack_interruptible( - [](auto submitted, auto all_completed) { - return all_completed.handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - return seastar::now(); - })); - }, crimson::ct_error::eagain::handle([] { - return interruptor::now(); - }) - ); - }); - }); + ); + + co_await with_blocking_event([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + + co_await enter_stage(client_pp().recover_missing); + + bool unfound = co_await do_recover_missing( + pg, get_target_oid(), osd_reqid_t()); + + if (unfound) { + throw std::system_error( + std::make_error_code(std::errc::operation_canceled), + fmt::format("{} is unfound, drop it!", get_target_oid())); + } + co_await enter_stage( + client_pp().get_obc); + + DEBUGI("{}: getting obc lock", *this); + + auto osd_ops = create_osd_ops(); + + DEBUGI("InternalClientRequest: got {} OSDOps to execute", + std::size(osd_ops)); + [[maybe_unused]] const int ret = op_info.set_from_op( + std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); + assert(ret == 0); + // call with_locked_obc() in order, but wait concurrently for loading. + enter_stage_sync(client_pp().lock_obc); + + auto fut = pg->with_locked_obc( + get_target_oid(), op_info, + [&osd_ops, this](auto, auto obc) { + return enter_stage(client_pp().process + ).then_interruptible( + [obc=std::move(obc), &osd_ops, this] { + return pg->do_osd_ops( + std::move(obc), + osd_ops, + std::as_const(op_info), + get_do_osd_ops_params() + ).safe_then_unpack_interruptible( + [](auto submitted, auto all_completed) { + return all_completed.handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + return seastar::now(); + })); + }, crimson::ct_error::eagain::handle([] { + return interruptor::now(); + }) + ); }); - }).si_then([this] { - logger().debug("{}: complete", *this); - return handle.complete(); }).handle_error_interruptible( - PG::load_obc_ertr::all_same_way([] { - return seastar::now(); - }) + crimson::ct_error::assert_all("unexpected error") ); + co_await std::move(fut); + + logger().debug("{}: complete", *this); + co_await interruptor::make_interruptible(handle.complete()); + co_return; } seastar::future<> InternalClientRequest::start() From 96c771383ae0458de68517f1e1f1757e27367d0d Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 13 Sep 2024 23:55:43 +0000 Subject: [PATCH 07/21] crimson: eliminate get_obc stage f90af12d introduced check_already_complete_get_obc to replace get_obc, but left get_obc and didn't update the other users. Signed-off-by: Samuel Just --- src/crimson/osd/osd_operation_external_tracking.h | 11 ----------- src/crimson/osd/osd_operations/client_request.h | 1 - src/crimson/osd/osd_operations/common/pg_pipeline.h | 3 --- .../osd/osd_operations/internal_client_request.cc | 2 +- .../osd/osd_operations/internal_client_request.h | 2 +- src/crimson/osd/osd_operations/snaptrim_event.cc | 2 +- src/crimson/osd/osd_operations/snaptrim_event.h | 2 +- 7 files changed, 4 insertions(+), 19 deletions(-) diff --git a/src/crimson/osd/osd_operation_external_tracking.h b/src/crimson/osd/osd_operation_external_tracking.h index 530732ba71028..d2786a95e4d3c 100644 --- a/src/crimson/osd/osd_operation_external_tracking.h +++ b/src/crimson/osd/osd_operation_external_tracking.h @@ -36,7 +36,6 @@ struct LttngBackend ClientRequest::PGPipeline::RecoverMissing:: BlockingEvent::ExitBarrierEvent::Backend, ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent::Backend, - ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::LockOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::LockOBC::BlockingEvent::ExitBarrierEvent::Backend, ClientRequest::PGPipeline::Process::BlockingEvent::Backend, @@ -117,10 +116,6 @@ struct LttngBackend const ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc& blocker) override { } - void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev, - const Operation& op, - const ClientRequest::PGPipeline::GetOBC& blocker) override { - } void handle(ClientRequest::PGPipeline::LockOBC::BlockingEvent& ev, const Operation& op, @@ -171,7 +166,6 @@ struct HistoricBackend ClientRequest::PGPipeline::RecoverMissing:: BlockingEvent::ExitBarrierEvent::Backend, ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent::Backend, - ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::LockOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::LockOBC::BlockingEvent::ExitBarrierEvent::Backend, ClientRequest::PGPipeline::Process::BlockingEvent::Backend, @@ -252,11 +246,6 @@ struct HistoricBackend const ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc& blocker) override { } - void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev, - const Operation& op, - const ClientRequest::PGPipeline::GetOBC& blocker) override { - } - void handle(ClientRequest::PGPipeline::LockOBC::BlockingEvent& ev, const Operation& op, const ClientRequest::PGPipeline::LockOBC& blocker) override { diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index ea7aade22ac75..f14e76504fcd6 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -104,7 +104,6 @@ class ClientRequest final : public PhasedOperationT, PGPipeline::RecoverMissing::BlockingEvent, scrub::PGScrubber::BlockingEvent, PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, - PGPipeline::GetOBC::BlockingEvent, PGPipeline::LockOBC::BlockingEvent, PGPipeline::Process::BlockingEvent, PGPipeline::WaitRepop::BlockingEvent, diff --git a/src/crimson/osd/osd_operations/common/pg_pipeline.h b/src/crimson/osd/osd_operations/common/pg_pipeline.h index 2b2d03ae4b3ed..0146cb247945f 100644 --- a/src/crimson/osd/osd_operations/common/pg_pipeline.h +++ b/src/crimson/osd/osd_operations/common/pg_pipeline.h @@ -23,9 +23,6 @@ class CommonPGPipeline { struct CheckAlreadyCompleteGetObc : OrderedExclusivePhaseT { static constexpr auto type_name = "CommonPGPipeline::check_already_complete_get_obc"; } check_already_complete_get_obc; - struct GetOBC : OrderedExclusivePhaseT { - static constexpr auto type_name = "CommonPGPipeline::get_obc"; - } get_obc; struct LockOBC : OrderedConcurrentPhaseT { static constexpr auto type_name = "CommonPGPipeline::lock_obc"; } lock_obc; diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index d0aa0822f8030..2bfa4296b2829 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -74,7 +74,7 @@ InternalClientRequest::with_interruption() fmt::format("{} is unfound, drop it!", get_target_oid())); } co_await enter_stage( - client_pp().get_obc); + client_pp().check_already_complete_get_obc); DEBUGI("{}: getting obc lock", *this); diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h index 2f3585013344d..6e31ee993b9cb 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.h +++ b/src/crimson/osd/osd_operations/internal_client_request.h @@ -58,7 +58,7 @@ class InternalClientRequest : public PhasedOperationT, CommonPGPipeline::WaitForActive::BlockingEvent, PGActivationBlocker::BlockingEvent, CommonPGPipeline::RecoverMissing::BlockingEvent, - CommonPGPipeline::GetOBC::BlockingEvent, + CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, CommonPGPipeline::LockOBC::BlockingEvent, CommonPGPipeline::Process::BlockingEvent, CompletionEvent diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index 7512b3d108dfc..9ed0b73cfb458 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -396,7 +396,7 @@ SnapTrimObjSubEvent::start() }); co_await enter_stage( - client_pp().get_obc); + client_pp().check_already_complete_get_obc); logger().debug("{}: getting obc for {}", *this, coid); // end of commonality diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h index 06d8f43c2f3c9..cdd82cdbf3086 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.h +++ b/src/crimson/osd/osd_operations/snaptrim_event.h @@ -170,7 +170,7 @@ class SnapTrimObjSubEvent : public PhasedOperationT { std::tuple< StartEvent, - CommonPGPipeline::GetOBC::BlockingEvent, + CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, CommonPGPipeline::Process::BlockingEvent, CommonPGPipeline::WaitRepop::BlockingEvent, CompletionEvent From 7da7c3d736cebed2233ed836f53219b8dfe85047 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 16 Sep 2024 22:16:37 +0000 Subject: [PATCH 08/21] crimson/osd: move pipelines to osd_operation.h Each of the two existing pipelines are shared across multiple ops. Rather than defining them in a specific op or in osd_operations/common/pg_pipeline.h, just declare them in osd_operation.h. Signed-off-by: Samuel Just --- src/crimson/osd/osd_operation.h | 31 ++++++++++++++++ .../osd/osd_operations/client_request.h | 1 - .../osd/osd_operations/common/pg_pipeline.h | 37 ------------------- .../osd_operations/internal_client_request.h | 1 - .../osd/osd_operations/peering_event.h | 9 ----- .../osd/osd_operations/snaptrim_event.h | 1 - 6 files changed, 31 insertions(+), 49 deletions(-) delete mode 100644 src/crimson/osd/osd_operations/common/pg_pipeline.h diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index fb0432edb8f9a..fd8b049c0bf08 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -40,6 +40,37 @@ struct PerShardPipeline { } create_or_wait_pg; }; +struct PGPeeringPipeline { + struct AwaitMap : OrderedExclusivePhaseT { + static constexpr auto type_name = "PeeringEvent::PGPipeline::await_map"; + } await_map; + struct Process : OrderedExclusivePhaseT { + static constexpr auto type_name = "PeeringEvent::PGPipeline::process"; + } process; +}; + +struct CommonPGPipeline { + struct WaitForActive : OrderedExclusivePhaseT { + static constexpr auto type_name = "CommonPGPipeline:::wait_for_active"; + } wait_for_active; + struct RecoverMissing : OrderedConcurrentPhaseT { + static constexpr auto type_name = "CommonPGPipeline::recover_missing"; + } recover_missing; + struct CheckAlreadyCompleteGetObc : OrderedExclusivePhaseT { + static constexpr auto type_name = "CommonPGPipeline::check_already_complete_get_obc"; + } check_already_complete_get_obc; + struct LockOBC : OrderedConcurrentPhaseT { + static constexpr auto type_name = "CommonPGPipeline::lock_obc"; + } lock_obc; + struct Process : OrderedExclusivePhaseT { + static constexpr auto type_name = "CommonPGPipeline::process"; + } process; + struct WaitRepop : OrderedConcurrentPhaseT { + static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop"; + } wait_repop; +}; + + enum class OperationTypeCode { client_request = 0, peering_event, diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index f14e76504fcd6..331cedaadfff2 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -14,7 +14,6 @@ #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request_common.h" -#include "crimson/osd/osd_operations/common/pg_pipeline.h" #include "crimson/osd/pg_activation_blocker.h" #include "crimson/osd/pg_map.h" #include "crimson/osd/scrub/pg_scrubber.h" diff --git a/src/crimson/osd/osd_operations/common/pg_pipeline.h b/src/crimson/osd/osd_operations/common/pg_pipeline.h deleted file mode 100644 index 0146cb247945f..0000000000000 --- a/src/crimson/osd/osd_operations/common/pg_pipeline.h +++ /dev/null @@ -1,37 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#pragma once - -#include "osd/osd_op_util.h" -#include "crimson/osd/osd_operation.h" - -namespace crimson::osd { - -class CommonPGPipeline { -protected: - friend class InternalClientRequest; - friend class SnapTrimEvent; - friend class SnapTrimObjSubEvent; - - struct WaitForActive : OrderedExclusivePhaseT { - static constexpr auto type_name = "CommonPGPipeline:::wait_for_active"; - } wait_for_active; - struct RecoverMissing : OrderedConcurrentPhaseT { - static constexpr auto type_name = "CommonPGPipeline::recover_missing"; - } recover_missing; - struct CheckAlreadyCompleteGetObc : OrderedExclusivePhaseT { - static constexpr auto type_name = "CommonPGPipeline::check_already_complete_get_obc"; - } check_already_complete_get_obc; - struct LockOBC : OrderedConcurrentPhaseT { - static constexpr auto type_name = "CommonPGPipeline::lock_obc"; - } lock_obc; - struct Process : OrderedExclusivePhaseT { - static constexpr auto type_name = "CommonPGPipeline::process"; - } process; - struct WaitRepop : OrderedConcurrentPhaseT { - static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop"; - } wait_repop; -}; - -} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h index 6e31ee993b9cb..782fb809042a6 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.h +++ b/src/crimson/osd/osd_operations/internal_client_request.h @@ -6,7 +6,6 @@ #include "crimson/common/type_helpers.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request_common.h" -#include "crimson/osd/osd_operations/common/pg_pipeline.h" #include "crimson/osd/pg.h" #include "crimson/osd/pg_activation_blocker.h" diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index 1e6bd957289ff..85de5c711d67c 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -23,15 +23,6 @@ class ShardServices; class PG; class BackfillRecovery; - struct PGPeeringPipeline { - struct AwaitMap : OrderedExclusivePhaseT { - static constexpr auto type_name = "PeeringEvent::PGPipeline::await_map"; - } await_map; - struct Process : OrderedExclusivePhaseT { - static constexpr auto type_name = "PeeringEvent::PGPipeline::process"; - } process; - }; - template class PeeringEvent : public PhasedOperationT { T* that() { diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h index cdd82cdbf3086..1164b3169d293 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.h +++ b/src/crimson/osd/osd_operations/snaptrim_event.h @@ -9,7 +9,6 @@ #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/osd_operation.h" #include "crimson/common/subop_blocker.h" -#include "crimson/osd/osd_operations/common/pg_pipeline.h" #include "crimson/osd/pg.h" #include "crimson/osd/pg_activation_blocker.h" #include "osd/osd_types.h" From 0a83d956e546d7d04c55de34a788234533ed5293 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 19 Sep 2024 00:59:21 +0000 Subject: [PATCH 09/21] crimson: remove the eagain error from PG::do_osd_ops The idea here is that PG::do_osd_ops propogates an eagain after starting a repair upon encountering an eio to indicate that the op should restart from the top of ClientRequest::process_op. However, InternalClientRequest's handler for this error simply ignores it. ClientRequest's handling, while superficially reasonable, doesn't actually work. Re-calling process_op would mean reentering previous stages. This is problematic for at least a few reasons: 1. Reentering a prior stage with the same handler doesn't actually work since the corresponding event entries will already be populated. 2. There might be other ops on the same object waiting on the process stage. They'd need to be sent back as well in order to preserve ordering. Because this mechanism doesn't really seem to be fully baked, let's remove it for now and try to reintroduce it later after do_osd_ops[_execute] are a bit simpler. Signed-off-by: Samuel Just --- .../osd/osd_operations/client_request.cc | 23 +++++++++++++------ .../osd/osd_operations/client_request.h | 7 +----- .../osd_operations/internal_client_request.cc | 2 ++ 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 8e9a7c4d7490c..6eed04df6a5ac 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -403,11 +403,6 @@ ClientRequest::process_op( *pg, *this, this_instance_id); return do_process( ihref, pg, obc, this_instance_id - ).handle_error_interruptible( - crimson::ct_error::eagain::handle( - [this, pg, this_instance_id, &ihref]() mutable { - return process_op(ihref, pg, this_instance_id); - }) ); } ); @@ -437,7 +432,7 @@ ClientRequest::process_op( co_await std::move(process); } -ClientRequest::do_process_iertr::future<> +ClientRequest::interruptible_future<> ClientRequest::do_process( instance_handle_t &ihref, Ref pg, crimson::osd::ObjectContextRef obc, @@ -509,12 +504,26 @@ ClientRequest::do_process( auto [submitted, all_completed] = co_await pg->do_osd_ops( m, r_conn, obc, op_info, snapc + ).handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + ceph_assert(0 == "not handled"); + return std::make_tuple( + interruptor::now(), + PG::do_osd_ops_iertr::make_ready_future>()); + }) ); co_await std::move(submitted); co_await ihref.enter_stage(client_pp(*pg).wait_repop, *this); - auto reply = co_await std::move(all_completed); + auto reply = co_await std::move( + all_completed + ).handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + ceph_assert(0 == "not handled"); + return MURef(); + }) + ); co_await ihref.enter_stage(client_pp(*pg).send_reply, *this); DEBUGDPP("{}.{}: sending response", diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 331cedaadfff2..6ee57e9874cd1 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -274,12 +274,7 @@ class ClientRequest final : public PhasedOperationT, interruptible_future<> with_sequencer(FuncT&& func); interruptible_future<> reply_op_error(const Ref& pg, int err); - - using do_process_iertr = - ::crimson::interruptible::interruptible_errorator< - ::crimson::osd::IOInterruptCondition, - ::crimson::errorator>; - do_process_iertr::future<> do_process( + interruptible_future<> do_process( instance_handle_t &ihref, Ref pg, crimson::osd::ObjectContextRef obc, diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index 2bfa4296b2829..dabff1a33bdb6 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -103,9 +103,11 @@ InternalClientRequest::with_interruption() [](auto submitted, auto all_completed) { return all_completed.handle_error_interruptible( crimson::ct_error::eagain::handle([] { + ceph_assert(0 == "not handled"); return seastar::now(); })); }, crimson::ct_error::eagain::handle([] { + ceph_assert(0 == "not handled"); return interruptor::now(); }) ); From a43452f47ee6f2eb7e2496ee242848acba8f8472 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 19 Sep 2024 23:58:48 +0000 Subject: [PATCH 10/21] crimson: OpsExecutor::flush_clone_metadata no longer needs to return a future Snapmapper updates happen during log commit now. Signed-off-by: Samuel Just --- src/crimson/osd/ops_executer.cc | 10 ++-------- src/crimson/osd/ops_executer.h | 4 ++-- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 0a07fa7ee293e..9bf60140374c8 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -973,7 +973,7 @@ void OpsExecuter::CloningContext::apply_to( processed_obc.ssc->snapset = std::move(new_snapset); } -OpsExecuter::interruptible_future> +std::vector OpsExecuter::flush_clone_metadata( std::vector&& log_entries, SnapMapper& snap_mapper, @@ -981,7 +981,6 @@ OpsExecuter::flush_clone_metadata( ceph::os::Transaction& txn) { assert(!txn.empty()); - auto maybe_snap_mapped = interruptor::now(); update_clone_overlap(); if (cloning_ctx) { std::move(*cloning_ctx).apply_to(log_entries, *obc); @@ -993,12 +992,7 @@ OpsExecuter::flush_clone_metadata( } logger().debug("{} done, initial snapset={}, new snapset={}", __func__, obc->obs.oi.soid, obc->ssc->snapset); - return std::move( - maybe_snap_mapped - ).then_interruptible([log_entries=std::move(log_entries)]() mutable { - return interruptor::make_ready_future>( - std::move(log_entries)); - }); + return std::move(log_entries); } ObjectContextRef OpsExecuter::prepare_clone( diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index e25a035616edd..0b61f80b9983b 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -267,7 +267,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { */ void update_clone_overlap(); - interruptible_future> flush_clone_metadata( + std::vector flush_clone_metadata( std::vector&& log_entries, SnapMapper& snap_mapper, OSDriver& osdriver, @@ -510,7 +510,7 @@ OpsExecuter::flush_changes_n_do_ops_effects( } if (want_mutate) { - auto log_entries = co_await flush_clone_metadata( + auto log_entries = flush_clone_metadata( prepare_transaction(ops), snap_mapper, osdriver, From 24b7b4f4b5d53927d5cc6689fd0ca1ec2276a5f3 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 20 Sep 2024 02:23:47 +0000 Subject: [PATCH 11/21] crimson: futures from flush_changes_n_do_ops_effects must not fail The return signature previously suggested that the second future returned could be an error. This seemed necessary due to how effects are handled: template OpsExecuter::rep_op_fut_t OpsExecuter::flush_changes_n_do_ops_effects( const std::vector& ops, SnapMapper& snap_mapper, OSDriver& osdriver, MutFunc mut_func) && { ... all_completed = std::move(all_completed).then_interruptible([this, pg=this->pg] { // let's do the cleaning of `op_effects` in destructor return interruptor::do_for_each(op_effects, [pg=std::move(pg)](auto& op_effect) { return op_effect->execute(pg); }); However, all of the actual execute implementations (created via OpsExecuter::with_effect_on_obc) return a bare seastar::future and cannot fail. In a larger sense, it's actually critical that neither future returned from flush_changes_n_do_ops_effects may fail -- they represent applying the transaction locally and remotely. If either portion fails, there would need to be an interval change to recover. Signed-off-by: Samuel Just --- src/crimson/osd/ops_executer.h | 11 ++++--- src/crimson/osd/pg.cc | 53 +++++++++++++++++++++++++--------- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 0b61f80b9983b..185ead24e7550 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -179,7 +179,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { // should be used. struct effect_t { // an effect can affect PG, i.e. create a watch timeout - virtual osd_op_errorator::future<> execute(Ref pg) = 0; + virtual seastar::future<> execute(Ref pg) = 0; virtual ~effect_t() = default; }; @@ -400,7 +400,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { execute_op(OSDOp& osd_op); using rep_op_fut_tuple = - std::tuple, osd_op_ierrorator::future<>>; + std::tuple, interruptible_future<>>; using rep_op_fut_t = interruptible_future; template @@ -475,7 +475,7 @@ auto OpsExecuter::with_effect_on_obc( effect_func(std::move(effect_func)), obc(std::move(obc)) { } - osd_op_errorator::future<> execute(Ref pg) final { + seastar::future<> execute(Ref pg) final { return std::move(effect_func)(std::move(ctx), std::move(obc), std::move(pg)); @@ -502,8 +502,7 @@ OpsExecuter::flush_changes_n_do_ops_effects( assert(obc); auto submitted = interruptor::now(); - auto all_completed = - interruptor::make_interruptible(osd_op_errorator::now()); + auto all_completed = interruptor::now(); if (cloning_ctx) { ceph_assert(want_mutate); @@ -536,7 +535,7 @@ OpsExecuter::flush_changes_n_do_ops_effects( // need extra ref pg due to apply_stats() which can be executed after // informing snap mapper all_completed = - std::move(all_completed).safe_then_interruptible([this, pg=this->pg] { + std::move(all_completed).then_interruptible([this, pg=this->pg] { // let's do the cleaning of `op_effects` in destructor return interruptor::do_for_each(op_effects, [pg=std::move(pg)](auto& op_effect) { diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 97d48c1fa454c..8ab4e4e899b8e 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -999,6 +999,28 @@ PG::do_osd_ops_execute( ceph_osd_op_name(osd_op.op.op)); return ox->execute_op(osd_op); }).safe_then_interruptible([this, ox, &ops] { + /* flush_changes_n_do_ops_effects now returns + * + * interruptible_future< + * tuple, interruptible_future<>>> + * + * Previously, this lambda relied on the second element of that tuple to + * include OpsExecutor::osd_op_errorator in order to propogate the + * following three errors to the next callback. This is actually quite + * awkward as the second future is the completion future, which really + * cannot fail (for it to do so would require an interval change to + * correct). + * + * Rather than reworking this now, I'll leave it as is and refactor it + * later. + */ + using complete_iertr = crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + OpsExecuter::osd_op_errorator>; + using ret_t = std::tuple< + interruptible_future<>, + complete_iertr::future<>>; + logger().debug( "do_osd_ops_execute: object {} all operations successful", ox->get_target()); @@ -1014,22 +1036,22 @@ PG::do_osd_ops_execute( // they tried, they failed. logger().info(" full, replying to FULL_TRY op"); if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) - return interruptor::make_ready_future( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::edquot::make())); + return interruptor::make_ready_future( + interruptor::now(), + complete_iertr::future<>( + crimson::ct_error::edquot::make())); else - return interruptor::make_ready_future( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::enospc::make())); + return interruptor::make_ready_future( + interruptor::now(), + complete_iertr::future<>( + crimson::ct_error::enospc::make())); } else { // drop request logger().info(" full, dropping request (bad client)"); - return interruptor::make_ready_future( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::eagain::make())); + return interruptor::make_ready_future( + interruptor::now(), + complete_iertr::future<>( + crimson::ct_error::eagain::make())); } } return std::move(*ox).flush_changes_n_do_ops_effects( @@ -1049,7 +1071,12 @@ PG::do_osd_ops_execute( std::move(txn), std::move(osd_op_p), std::move(log_entries)); - }); + }).then_interruptible([](auto &&futs) { + auto &&[submitted, completed] = std::move(futs); + return interruptor::make_ready_future( + std::move(submitted), + std::move(completed)); + }); }).safe_then_unpack_interruptible( [success_func=std::move(success_func), rollbacker, this, failure_func_ptr, obc] (auto submitted_fut, auto _all_completed_fut) mutable { From 5e28a3bd3b58353ff29cf1cd1b9627575158c290 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 20 Sep 2024 12:56:17 -0700 Subject: [PATCH 12/21] crimson: introduce rollback_obc_if_modified without an error argument Signed-off-by: Samuel Just --- src/crimson/osd/ops_executer.h | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 185ead24e7550..6986f49ea08a7 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -551,6 +551,7 @@ OpsExecuter::flush_changes_n_do_ops_effects( template struct OpsExecuter::RollbackHelper { + void rollback_obc_if_modified(); void rollback_obc_if_modified(const std::error_code& e); seastar::lw_shared_ptr ox; Func func; @@ -562,6 +563,33 @@ OpsExecuter::create_rollbacker(Func&& func) { return {shared_from_this(), std::forward(func)}; } +template +void OpsExecuter::RollbackHelper::rollback_obc_if_modified() +{ + // Oops, an operation had failed. do_osd_ops() altogether with + // OpsExecuter already dropped the ObjectStore::Transaction if + // there was any. However, this is not enough to completely + // rollback as we gave OpsExecuter the very single copy of `obc` + // we maintain and we did it for both reading and writing. + // Now all modifications must be reverted. + // + // The conditional's purpose is to efficiently handle hot errors + // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or + // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients + // typically append them before any write. If OpsExecuter hasn't + // seen any modifying operation, `obc` is supposed to be kept + // unchanged. + assert(ox); + const auto need_rollback = ox->has_seen_write(); + crimson::get_logger(ceph_subsys_osd).debug( + "{}: object {} got error, need_rollback={}", + __func__, + ox->obc->get_oid(), + need_rollback); + if (need_rollback) { + func(ox->obc); + } +} template void OpsExecuter::RollbackHelper::rollback_obc_if_modified( From 7a826eb86c423e895345557632091a934f7c7d7e Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 19 Sep 2024 19:39:08 -0700 Subject: [PATCH 13/21] crimson: PG::submit_error_log returns eversion_t rather than optional It seems like the motivation here was to allow do_osd_ops_execute to communicate that it didn't submit an error log by making maybe_submit_error_log a std::optional. However, submit_error_log itself always returns a version. Fix submit_error_log and compensate in do_osd_ops_execute. Signed-off-by: Samuel Just --- src/crimson/osd/pg.cc | 10 +++++++--- src/crimson/osd/pg.h | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 8ab4e4e899b8e..26d1fa883bbce 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1122,7 +1122,11 @@ PG::do_osd_ops_execute( if constexpr (!std::is_same_v) { if(op_info.may_write()) { maybe_submit_error_log = - submit_error_log(m, op_info, obc, e, rep_tid); + submit_error_log( + m, op_info, obc, e, rep_tid + ).then_interruptible([](auto &&e) { + return std::make_optional(std::move(e)); + }); } } return maybe_submit_error_log.then_interruptible( @@ -1175,7 +1179,7 @@ PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid, return result; } -PG::interruptible_future> PG::submit_error_log( +PG::interruptible_future PG::submit_error_log( Ref m, const OpInfo &op_info, ObjectContextRef obc, @@ -1241,7 +1245,7 @@ PG::interruptible_future> PG::submit_error_log( get_collection_ref(), std::move(t) ).then([this] { peering_state.update_trim_to(); - return seastar::make_ready_future>(projected_last_update); + return seastar::make_ready_future(projected_last_update); }); }); }); diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index d8bbc56abcc46..5bd5c3aeff849 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -621,7 +621,7 @@ class PG : public boost::intrusive_ref_counter< void dump_primary(Formatter*); interruptible_future<> complete_error_log(const ceph_tid_t& rep_tid, const eversion_t& version); - interruptible_future> submit_error_log( + interruptible_future submit_error_log( Ref m, const OpInfo &op_info, ObjectContextRef obc, From 00057b45f03ae9864a83451b498b4e0239496785 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 26 Sep 2024 14:10:06 -0700 Subject: [PATCH 14/21] crimson: introduce PG::run_executer,submit_executer These are intended to replace do_osd_ops*. The implementation is simpler and does not involve passing success and failure callbacks. It also moves responsibility for dealing with the MOSDOpReply and client related error handling over to ClientRequest. do_osd_op* will be removed once users are switched over. Signed-off-by: Samuel Just --- src/crimson/osd/pg.cc | 79 +++++++++++++++++++++++++++++++++++++++++++ src/crimson/osd/pg.h | 27 +++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 26d1fa883bbce..bb5c1e9000baf 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -13,6 +13,9 @@ #include #include #include + +#include + #include "include/utime_fmt.h" #include "common/hobject.h" @@ -1251,6 +1254,82 @@ PG::interruptible_future PG::submit_error_log( }); } +PG::run_executer_fut PG::run_executer( + seastar::lw_shared_ptr ox, + ObjectContextRef obc, + const OpInfo &op_info, + std::vector& ops) +{ + LOG_PREFIX(PG::run_executer); + auto rollbacker = ox->create_rollbacker( + [stored_obc=duplicate_obc(obc)](auto &obc) mutable { + obc->update_from(*stored_obc); + }); + auto rollback_on_error = seastar::defer([&rollbacker] { + rollbacker.rollback_obc_if_modified(); + }); + + for (auto &op: ops) { + DEBUGDPP("object {} handle op {}", *this, ox->get_target(), op); + co_await ox->execute_op(op); + } + DEBUGDPP("object {} all operations successful", *this, ox->get_target()); + + // check for full + if ((ox->delta_stats.num_bytes > 0 || + ox->delta_stats.num_objects > 0) && + get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) { + const auto& m = ox->get_message(); + if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now + m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) { + INFODPP("full, but proceeding due to FULL_FORCE, or MDS", *this); + } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) { + // they tried, they failed. + INFODPP("full, replying to FULL_TRY op", *this); + if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) { + co_await run_executer_fut( + crimson::ct_error::edquot::make()); + } else { + co_await run_executer_fut( + crimson::ct_error::enospc::make()); + } + } else { + // drop request + INFODPP("full, dropping request (bad client)", *this); + co_await run_executer_fut( + crimson::ct_error::eagain::make()); + } + } + rollback_on_error.cancel(); +} + +PG::submit_executer_fut PG::submit_executer( + seastar::lw_shared_ptr ox, + const std::vector& ops) +{ + LOG_PREFIX(PG::submit_executer); + // transaction must commit at this point + return std::move( + *ox + ).flush_changes_n_do_ops_effects( + ops, + snap_mapper, + osdriver, + [FNAME, this](auto&& txn, + auto&& obc, + auto&& osd_op_p, + auto&& log_entries) { + DEBUGDPP("object {} submitting txn", *this, obc->get_oid()); + mutate_object(obc, txn, osd_op_p); + return submit_transaction( + std::move(obc), + std::move(txn), + std::move(osd_op_p), + std::move(log_entries)); + }); +} + + PG::do_osd_ops_iertr::future>> PG::do_osd_ops( Ref m, diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 5bd5c3aeff849..c91f93171dbc1 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -645,6 +645,33 @@ class PG : public boost::intrusive_ref_counter< } } background_process_lock; + using run_executer_ertr = crimson::compound_errorator_t< + OpsExecuter::osd_op_errorator, + crimson::errorator< + crimson::ct_error::edquot, + crimson::ct_error::eagain, + crimson::ct_error::enospc + > + >; + using run_executer_iertr = crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + run_executer_ertr>; + using run_executer_fut = run_executer_iertr::future<>; + run_executer_fut run_executer( + seastar::lw_shared_ptr ox, + ObjectContextRef obc, + const OpInfo &op_info, + std::vector& ops); + + using submit_executer_ret = std::tuple< + interruptible_future<>, + interruptible_future<>>; + using submit_executer_fut = interruptible_future< + submit_executer_ret>; + submit_executer_fut submit_executer( + seastar::lw_shared_ptr ox, + const std::vector& ops); + using do_osd_ops_ertr = crimson::errorator< crimson::ct_error::eagain>; using do_osd_ops_iertr = From 304e20e9bcf6f29b0f0f22089665d78099265fec Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 26 Sep 2024 15:15:48 -0700 Subject: [PATCH 15/21] crimson: switch ClientRequest::do_request to use *_executer rather than do_osd_ops Signed-off-by: Samuel Just --- .../osd/osd_operations/client_request.cc | 141 +++++++++++++++--- 1 file changed, 117 insertions(+), 24 deletions(-) diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 6eed04df6a5ac..c226222fa0c75 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -502,36 +502,129 @@ ClientRequest::do_process( co_return; } - auto [submitted, all_completed] = co_await pg->do_osd_ops( - m, r_conn, obc, op_info, snapc - ).handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - ceph_assert(0 == "not handled"); - return std::make_tuple( - interruptor::now(), - PG::do_osd_ops_iertr::make_ready_future>()); + auto ox = seastar::make_lw_shared( + pg, obc, op_info, *m, r_conn, snapc); + auto ret = co_await pg->run_executer( + ox, obc, op_info, m->ops + ).si_then([]() -> std::optional { + return std::nullopt; + }).handle_error_interruptible(crimson::ct_error::all_same_way( + [](auto e) -> std::optional { + return e; }) ); - co_await std::move(submitted); - co_await ihref.enter_stage(client_pp(*pg).wait_repop, *this); + auto should_log_error = [](std::error_code e) -> bool { + switch (e.value()) { + case EDQUOT: + case ENOSPC: + case EAGAIN: + return false; + default: + return true; + } + }; - auto reply = co_await std::move( - all_completed - ).handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - ceph_assert(0 == "not handled"); - return MURef(); - }) - ); + if (ret && !should_log_error(*ret)) { + co_await reply_op_error(pg, -ret->value()); + co_return; + } + + { + auto all_completed = interruptor::now(); + if (ret) { + assert(should_log_error(*ret)); + if (op_info.may_write()) { + auto rep_tid = pg->shard_services.get_tid(); + auto version = co_await pg->submit_error_log( + m, op_info, obc, *ret, rep_tid); + + all_completed = pg->complete_error_log( + rep_tid, version); + } + // simply return the error below, leaving all_completed alone + } else { + auto submitted = interruptor::now(); + std::tie(submitted, all_completed) = co_await pg->submit_executer( + std::move(ox), m->ops); + co_await std::move(submitted); + } + co_await ihref.enter_stage(client_pp(*pg).wait_repop, *this); + + co_await std::move(all_completed); + } co_await ihref.enter_stage(client_pp(*pg).send_reply, *this); - DEBUGDPP("{}.{}: sending response", - *pg, *this, this_instance_id); - // TODO: gate the crosscore sending - co_await interruptor::make_interruptible( - get_foreign_connection().send_with_throttling(std::move(reply)) - ); + + if (ret) { + int err = -ret->value(); + DEBUGDPP("{}: replying with error {}", *pg, *this, err); + + auto reply = crimson::make_message( + m.get(), err, pg->get_osdmap_epoch(), 0, false); + + if (!m->ops.empty() && m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { + reply->set_result(0); + } + + // For all ops except for CMPEXT, the correct error value is encoded + // in e. For CMPEXT, osdop.rval has the actual error value. + if (err == -ct_error::cmp_fail_error_value) { + assert(!m->ops.empty()); + for (auto &osdop : m->ops) { + if (osdop.rval < 0) { + reply->set_result(osdop.rval); + break; + } + } + } + + reply->set_enoent_reply_versions( + pg->peering_state.get_info().last_update, + pg->peering_state.get_info().last_user_version); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + + // TODO: gate the crosscore sending + co_await interruptor::make_interruptible( + get_foreign_connection().send_with_throttling(std::move(reply))); + } else { + int result = m->ops.empty() ? 0 : m->ops.back().rval.code; + if (op_info.may_read() && result >= 0) { + for (auto &osdop : m->ops) { + if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { + result = osdop.rval.code; + break; + } + } + } else if (result > 0 && op_info.may_write() && !op_info.allows_returnvec()) { + result = 0; + } else if (result < 0 && + (m->ops.empty() ? + 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { + result = 0; + } + auto reply = crimson::make_message( + m.get(), + result, + pg->get_osdmap_epoch(), + 0, + false); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + if (obc->obs.exists) { + reply->set_reply_versions(pg->peering_state.get_info().last_update, + obc->obs.oi.user_version); + } else { + reply->set_reply_versions(pg->peering_state.get_info().last_update, + pg->peering_state.get_info().last_user_version); + } + + DEBUGDPP("{}.{}: sending response {}", + *pg, *this, this_instance_id, *m); + // TODO: gate the crosscore sending + co_await interruptor::make_interruptible( + get_foreign_connection().send_with_throttling(std::move(reply)) + ); + } } bool ClientRequest::is_misdirected(const PG& pg) const From fc41fcb9d2a7c5b589ea68ad0644ac92d22fe761 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 26 Sep 2024 22:30:59 +0000 Subject: [PATCH 16/21] crimson: factor out InternalClientRequest::do_process Signed-off-by: Samuel Just --- .../osd_operations/internal_client_request.cc | 44 +++++++++++-------- .../osd_operations/internal_client_request.h | 3 ++ 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index dabff1a33bdb6..d0ee392ecb638 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -50,6 +50,30 @@ CommonPGPipeline& InternalClientRequest::client_pp() return pg->request_pg_pipeline; } +InternalClientRequest::interruptible_future<> +InternalClientRequest::do_process( + crimson::osd::ObjectContextRef obc, + std::vector &osd_ops) +{ + return pg->do_osd_ops( + std::move(obc), + osd_ops, + std::as_const(op_info), + get_do_osd_ops_params() + ).safe_then_unpack_interruptible( + [](auto submitted, auto all_completed) { + return all_completed.handle_error_interruptible( + crimson::ct_error::eagain::handle([] { + ceph_assert(0 == "not handled"); + return seastar::now(); + })); + }, crimson::ct_error::eagain::handle([] { + ceph_assert(0 == "not handled"); + return interruptor::now(); + }) + ); +} + InternalClientRequest::interruptible_future<> InternalClientRequest::with_interruption() { @@ -93,24 +117,8 @@ InternalClientRequest::with_interruption() [&osd_ops, this](auto, auto obc) { return enter_stage(client_pp().process ).then_interruptible( - [obc=std::move(obc), &osd_ops, this] { - return pg->do_osd_ops( - std::move(obc), - osd_ops, - std::as_const(op_info), - get_do_osd_ops_params() - ).safe_then_unpack_interruptible( - [](auto submitted, auto all_completed) { - return all_completed.handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - ceph_assert(0 == "not handled"); - return seastar::now(); - })); - }, crimson::ct_error::eagain::handle([] { - ceph_assert(0 == "not handled"); - return interruptor::now(); - }) - ); + [obc=std::move(obc), &osd_ops, this]() mutable { + return do_process(std::move(obc), osd_ops); }); }).handle_error_interruptible( crimson::ct_error::assert_all("unexpected error") diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h index 782fb809042a6..6023db0a8dbe2 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.h +++ b/src/crimson/osd/osd_operations/internal_client_request.h @@ -41,6 +41,9 @@ class InternalClientRequest : public PhasedOperationT, CommonPGPipeline& client_pp(); InternalClientRequest::interruptible_future<> with_interruption(); + InternalClientRequest::interruptible_future<> do_process( + crimson::osd::ObjectContextRef obc, + std::vector &osd_ops); seastar::future<> do_process(); From c091f3b2ab6a89762e6fcf5ccaa49b65c9ab6fca Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 26 Sep 2024 22:43:35 +0000 Subject: [PATCH 17/21] crimson: convert InternalClientRequest::do_request to use *_executer rather than do_osd_ops* Signed-off-by: Samuel Just --- .../osd_operations/internal_client_request.cc | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index d0ee392ecb638..6ad447cf32ee4 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -55,23 +55,26 @@ InternalClientRequest::do_process( crimson::osd::ObjectContextRef obc, std::vector &osd_ops) { - return pg->do_osd_ops( - std::move(obc), - osd_ops, - std::as_const(op_info), - get_do_osd_ops_params() - ).safe_then_unpack_interruptible( - [](auto submitted, auto all_completed) { - return all_completed.handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - ceph_assert(0 == "not handled"); - return seastar::now(); - })); - }, crimson::ct_error::eagain::handle([] { - ceph_assert(0 == "not handled"); - return interruptor::now(); - }) + LOG_PREFIX(InternalClientRequest::do_process); + auto params = get_do_osd_ops_params(); + auto ox = seastar::make_lw_shared( + pg, obc, op_info, params, params.get_connection(), SnapContext{}); + co_await pg->run_executer( + ox, obc, op_info, osd_ops + ).handle_error_interruptible( + crimson::ct_error::all_same_way( + [this, FNAME](auto e) { + ERRORDPPI("{}: got unexpected error {}", *pg, *this, e); + ceph_assert(0 == "should not return an error"); + return interruptor::now(); + }) ); + + auto [submitted, completed] = co_await pg->submit_executer( + std::move(ox), osd_ops); + + co_await std::move(submitted); + co_await std::move(completed); } InternalClientRequest::interruptible_future<> From a0efff116cd038b08c0ce31a5c32c4b9df574088 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 10 Oct 2024 16:22:28 +0000 Subject: [PATCH 18/21] crimson: clarify ops_executer.h comment Signed-off-by: Samuel Just --- src/crimson/osd/ops_executer.h | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 6986f49ea08a7..3a7aaef7cd036 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -170,13 +170,9 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { object_stat_sum_t delta_stats; private: - // an operation can be divided into two stages: main and effect-exposing - // one. The former is performed immediately on call to `do_osd_op()` while - // the later on `submit_changes()` – after successfully processing main - // stages of all involved operations. When any stage fails, none of all - // scheduled effect-exposing stages will be executed. - // when operation requires this division, some variant of `with_effect()` - // should be used. + // with_effect can be used to schedule operations to be performed + // at commit time. effects will be discarded if the operation does + // not commit. struct effect_t { // an effect can affect PG, i.e. create a watch timeout virtual seastar::future<> execute(Ref pg) = 0; From 8f3ac965c310d80270e53644c56f3bca30511240 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 26 Sep 2024 22:49:59 +0000 Subject: [PATCH 19/21] crimson: remove now unused PG::do_osd_ops* and log_reply Signed-off-by: Samuel Just --- src/crimson/osd/pg.cc | 308 ------------------------------------------ src/crimson/osd/pg.h | 35 +---- 2 files changed, 1 insertion(+), 342 deletions(-) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index bb5c1e9000baf..9cdd19d01332f 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -978,181 +978,6 @@ ObjectContextRef duplicate_obc(const ObjectContextRef &obc) { return object_context; } -template -PG::do_osd_ops_iertr::future> -PG::do_osd_ops_execute( - seastar::lw_shared_ptr ox, - ObjectContextRef obc, - const OpInfo &op_info, - Ref m, - std::vector& ops, - SuccessFunc&& success_func, - FailureFunc&& failure_func) -{ - assert(ox); - auto rollbacker = ox->create_rollbacker( - [object_context=duplicate_obc(obc)] (auto& obc) mutable { - obc->update_from(*object_context); - }); - auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func)); - return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) { - logger().debug( - "do_osd_ops_execute: object {} - handling op {}", - ox->get_target(), - ceph_osd_op_name(osd_op.op.op)); - return ox->execute_op(osd_op); - }).safe_then_interruptible([this, ox, &ops] { - /* flush_changes_n_do_ops_effects now returns - * - * interruptible_future< - * tuple, interruptible_future<>>> - * - * Previously, this lambda relied on the second element of that tuple to - * include OpsExecutor::osd_op_errorator in order to propogate the - * following three errors to the next callback. This is actually quite - * awkward as the second future is the completion future, which really - * cannot fail (for it to do so would require an interval change to - * correct). - * - * Rather than reworking this now, I'll leave it as is and refactor it - * later. - */ - using complete_iertr = crimson::interruptible::interruptible_errorator< - ::crimson::osd::IOInterruptCondition, - OpsExecuter::osd_op_errorator>; - using ret_t = std::tuple< - interruptible_future<>, - complete_iertr::future<>>; - - logger().debug( - "do_osd_ops_execute: object {} all operations successful", - ox->get_target()); - // check for full - if ((ox->delta_stats.num_bytes > 0 || - ox->delta_stats.num_objects > 0) && - get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) { - const auto& m = ox->get_message(); - if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now - m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) { - logger().info(" full, but proceeding due to FULL_FORCE or MDS"); - } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) { - // they tried, they failed. - logger().info(" full, replying to FULL_TRY op"); - if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) - return interruptor::make_ready_future( - interruptor::now(), - complete_iertr::future<>( - crimson::ct_error::edquot::make())); - else - return interruptor::make_ready_future( - interruptor::now(), - complete_iertr::future<>( - crimson::ct_error::enospc::make())); - } else { - // drop request - logger().info(" full, dropping request (bad client)"); - return interruptor::make_ready_future( - interruptor::now(), - complete_iertr::future<>( - crimson::ct_error::eagain::make())); - } - } - return std::move(*ox).flush_changes_n_do_ops_effects( - ops, - snap_mapper, - osdriver, - [this] (auto&& txn, - auto&& obc, - auto&& osd_op_p, - auto&& log_entries) { - logger().debug( - "do_osd_ops_execute: object {} submitting txn", - obc->get_oid()); - mutate_object(obc, txn, osd_op_p); - return submit_transaction( - std::move(obc), - std::move(txn), - std::move(osd_op_p), - std::move(log_entries)); - }).then_interruptible([](auto &&futs) { - auto &&[submitted, completed] = std::move(futs); - return interruptor::make_ready_future( - std::move(submitted), - std::move(completed)); - }); - }).safe_then_unpack_interruptible( - [success_func=std::move(success_func), rollbacker, this, failure_func_ptr, obc] - (auto submitted_fut, auto _all_completed_fut) mutable { - - auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple( - std::move(success_func), - crimson::ct_error::object_corrupted::handle( - [rollbacker, this, obc] (const std::error_code& e) mutable { - // this is a path for EIO. it's special because we want to fix the obejct - // and try again. that is, the layer above `PG::do_osd_ops` is supposed to - // restart the execution. - rollbacker.rollback_obc_if_modified(e); - return repair_object(obc->obs.oi.soid, - obc->obs.oi.version - ).then_interruptible([] { - return do_osd_ops_iertr::future{crimson::ct_error::eagain::make()}; - }); - }), OpsExecuter::osd_op_errorator::all_same_way( - [rollbacker, failure_func_ptr] - (const std::error_code& e) mutable { - // handle non-fatal errors only - ceph_assert(e.value() == EDQUOT || - e.value() == ENOSPC || - e.value() == EAGAIN); - rollbacker.rollback_obc_if_modified(e); - return (*failure_func_ptr)(e); - })); - - return PG::do_osd_ops_iertr::make_ready_future>( - std::move(submitted_fut), - std::move(all_completed_fut) - ); - }, OpsExecuter::osd_op_errorator::all_same_way( - [this, op_info, m, obc, - rollbacker, failure_func_ptr] - (const std::error_code& e) mutable { - ceph_tid_t rep_tid = shard_services.get_tid(); - rollbacker.rollback_obc_if_modified(e); - // record error log - auto maybe_submit_error_log = - interruptor::make_ready_future>(std::nullopt); - // call submit_error_log only for non-internal clients - if constexpr (!std::is_same_v) { - if(op_info.may_write()) { - maybe_submit_error_log = - submit_error_log( - m, op_info, obc, e, rep_tid - ).then_interruptible([](auto &&e) { - return std::make_optional(std::move(e)); - }); - } - } - return maybe_submit_error_log.then_interruptible( - [this, failure_func_ptr, e, rep_tid] (auto version) { - auto all_completed = - [this, failure_func_ptr, e, rep_tid, version] { - if (version.has_value()) { - return complete_error_log(rep_tid, version.value() - ).then_interruptible([failure_func_ptr, e] { - return (*failure_func_ptr)(e); - }); - } else { - return (*failure_func_ptr)(e); - } - }; - return PG::do_osd_ops_iertr::make_ready_future>( - std::move(seastar::now()), - std::move(all_completed()) - ); - }); - })); -} - PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid, const eversion_t& version) { @@ -1329,139 +1154,6 @@ PG::submit_executer_fut PG::submit_executer( }); } - -PG::do_osd_ops_iertr::future>> -PG::do_osd_ops( - Ref m, - crimson::net::ConnectionXcoreRef conn, - ObjectContextRef obc, - const OpInfo &op_info, - const SnapContext& snapc) -{ - if (__builtin_expect(stopping, false)) { - throw crimson::common::system_shutdown_exception(); - } - return do_osd_ops_execute>( - seastar::make_lw_shared( - Ref{this}, obc, op_info, *m, conn, snapc), - obc, - op_info, - m, - m->ops, - // success_func - [this, m, obc, may_write = op_info.may_write(), - may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] { - // TODO: should stop at the first op which returns a negative retval, - // cmpext uses it for returning the index of first unmatched byte - int result = m->ops.empty() ? 0 : m->ops.back().rval.code; - if (may_read && result >= 0) { - for (auto &osdop : m->ops) { - if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { - result = osdop.rval.code; - break; - } - } - } else if (result > 0 && may_write && !rvec) { - result = 0; - } else if (result < 0 && (m->ops.empty() ? - 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { - result = 0; - } - auto reply = crimson::make_message(m.get(), - result, - get_osdmap_epoch(), - 0, - false); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - logger().debug( - "do_osd_ops: {} - object {} sending reply", - *m, - m->get_hobj()); - if (obc->obs.exists) { - reply->set_reply_versions(peering_state.get_info().last_update, - obc->obs.oi.user_version); - } else { - reply->set_reply_versions(peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - } - return do_osd_ops_iertr::make_ready_future>( - std::move(reply)); - }, - // failure_func - [m, this] - (const std::error_code& e) { - logger().error("do_osd_ops_execute::failure_func {} got error: {}", - *m, e); - return log_reply(m, e); - }); -} - -PG::do_osd_ops_iertr::future> -PG::log_reply( - Ref m, - const std::error_code& e) -{ - auto reply = crimson::make_message( - m.get(), -e.value(), get_osdmap_epoch(), 0, false); - if (m->ops.empty() ? 0 : - m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { - reply->set_result(0); - } - // For all ops except for CMPEXT, the correct error value is encoded - // in e.value(). For CMPEXT, osdop.rval has the actual error value. - if (e.value() == ct_error::cmp_fail_error_value) { - assert(!m->ops.empty()); - for (auto &osdop : m->ops) { - if (osdop.rval < 0) { - reply->set_result(osdop.rval); - break; - } - } - } - reply->set_enoent_reply_versions( - peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - return do_osd_ops_iertr::make_ready_future>( - std::move(reply)); -} - -PG::do_osd_ops_iertr::future> -PG::do_osd_ops( - ObjectContextRef obc, - std::vector& ops, - const OpInfo &op_info, - const do_osd_ops_params_t &&msg_params) -{ - // This overload is generally used for internal client requests, - // use an empty SnapContext. - return seastar::do_with( - std::move(msg_params), - [=, this, &ops, &op_info](auto &msg_params) { - return do_osd_ops_execute( - seastar::make_lw_shared( - Ref{this}, - obc, - op_info, - msg_params, - msg_params.get_connection(), - SnapContext{} - ), - obc, - op_info, - Ref(), - ops, - // success_func - [] { - return do_osd_ops_iertr::now(); - }, - // failure_func - [] (const std::error_code& e) { - return do_osd_ops_iertr::now(); - }); - }); -} - PG::interruptible_future> PG::do_pg_ops(Ref m) { if (__builtin_expect(stopping, false)) { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index c91f93171dbc1..3a8ddad922a50 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -672,41 +672,8 @@ class PG : public boost::intrusive_ref_counter< seastar::lw_shared_ptr ox, const std::vector& ops); - using do_osd_ops_ertr = crimson::errorator< - crimson::ct_error::eagain>; - using do_osd_ops_iertr = - ::crimson::interruptible::interruptible_errorator< - ::crimson::osd::IOInterruptCondition, - ::crimson::errorator>; - template - using pg_rep_op_fut_t = - std::tuple, - do_osd_ops_iertr::future>; - do_osd_ops_iertr::future>> do_osd_ops( - Ref m, - crimson::net::ConnectionXcoreRef conn, - ObjectContextRef obc, - const OpInfo &op_info, - const SnapContext& snapc); - struct do_osd_ops_params_t; - do_osd_ops_iertr::future> log_reply( - Ref m, - const std::error_code& e); - do_osd_ops_iertr::future> do_osd_ops( - ObjectContextRef obc, - std::vector& ops, - const OpInfo &op_info, - const do_osd_ops_params_t &¶ms); - template - do_osd_ops_iertr::future> do_osd_ops_execute( - seastar::lw_shared_ptr ox, - ObjectContextRef obc, - const OpInfo &op_info, - Ref m, - std::vector& ops, - SuccessFunc&& success_func, - FailureFunc&& failure_func); + interruptible_future> do_pg_ops(Ref m); interruptible_future< std::tuple, interruptible_future<>>> From 7ac64b0b245798b1d4a85b1da86497d2baf2bceb Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 1 Oct 2024 13:05:03 -0700 Subject: [PATCH 20/21] crimson: OpsExecuter no longer needs to be a lw shared ptr ClientRequest and InternalClientRequest can declare them as auto variables. Signed-off-by: Samuel Just --- src/crimson/osd/ops_executer.h | 6 ++--- .../osd/osd_operations/client_request.cc | 3 +-- .../osd_operations/internal_client_request.cc | 2 +- src/crimson/osd/pg.cc | 23 +++++++++---------- src/crimson/osd/pg.h | 4 ++-- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 3a7aaef7cd036..068f510d1ef82 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -40,7 +40,7 @@ namespace crimson::osd { class PG; // OpsExecuter -- a class for executing ops targeting a certain object. -class OpsExecuter : public seastar::enable_lw_shared_from_this { +class OpsExecuter { friend class SnapTrimObjSubEvent; using call_errorator = crimson::errorator< @@ -549,14 +549,14 @@ template struct OpsExecuter::RollbackHelper { void rollback_obc_if_modified(); void rollback_obc_if_modified(const std::error_code& e); - seastar::lw_shared_ptr ox; + OpsExecuter *ox; Func func; }; template inline OpsExecuter::RollbackHelper OpsExecuter::create_rollbacker(Func&& func) { - return {shared_from_this(), std::forward(func)}; + return {this, std::forward(func)}; } template diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index c226222fa0c75..a89fb2c84bc56 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -502,8 +502,7 @@ ClientRequest::do_process( co_return; } - auto ox = seastar::make_lw_shared( - pg, obc, op_info, *m, r_conn, snapc); + OpsExecuter ox(pg, obc, op_info, *m, r_conn, snapc); auto ret = co_await pg->run_executer( ox, obc, op_info, m->ops ).si_then([]() -> std::optional { diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index 6ad447cf32ee4..9e5867caf8067 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -57,7 +57,7 @@ InternalClientRequest::do_process( { LOG_PREFIX(InternalClientRequest::do_process); auto params = get_do_osd_ops_params(); - auto ox = seastar::make_lw_shared( + OpsExecuter ox( pg, obc, op_info, params, params.get_connection(), SnapContext{}); co_await pg->run_executer( ox, obc, op_info, osd_ops diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 9cdd19d01332f..744a1dbc02b97 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1080,13 +1080,13 @@ PG::interruptible_future PG::submit_error_log( } PG::run_executer_fut PG::run_executer( - seastar::lw_shared_ptr ox, + OpsExecuter &ox, ObjectContextRef obc, const OpInfo &op_info, std::vector& ops) { LOG_PREFIX(PG::run_executer); - auto rollbacker = ox->create_rollbacker( + auto rollbacker = ox.create_rollbacker( [stored_obc=duplicate_obc(obc)](auto &obc) mutable { obc->update_from(*stored_obc); }); @@ -1095,16 +1095,16 @@ PG::run_executer_fut PG::run_executer( }); for (auto &op: ops) { - DEBUGDPP("object {} handle op {}", *this, ox->get_target(), op); - co_await ox->execute_op(op); + DEBUGDPP("object {} handle op {}", *this, ox.get_target(), op); + co_await ox.execute_op(op); } - DEBUGDPP("object {} all operations successful", *this, ox->get_target()); + DEBUGDPP("object {} all operations successful", *this, ox.get_target()); // check for full - if ((ox->delta_stats.num_bytes > 0 || - ox->delta_stats.num_objects > 0) && + if ((ox.delta_stats.num_bytes > 0 || + ox.delta_stats.num_objects > 0) && get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) { - const auto& m = ox->get_message(); + const auto& m = ox.get_message(); if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) { INFODPP("full, but proceeding due to FULL_FORCE, or MDS", *this); @@ -1129,13 +1129,12 @@ PG::run_executer_fut PG::run_executer( } PG::submit_executer_fut PG::submit_executer( - seastar::lw_shared_ptr ox, - const std::vector& ops) -{ + OpsExecuter &&ox, + const std::vector& ops) { LOG_PREFIX(PG::submit_executer); // transaction must commit at this point return std::move( - *ox + ox ).flush_changes_n_do_ops_effects( ops, snap_mapper, diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 3a8ddad922a50..604f49005ff04 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -658,7 +658,7 @@ class PG : public boost::intrusive_ref_counter< run_executer_ertr>; using run_executer_fut = run_executer_iertr::future<>; run_executer_fut run_executer( - seastar::lw_shared_ptr ox, + OpsExecuter &ox, ObjectContextRef obc, const OpInfo &op_info, std::vector& ops); @@ -669,7 +669,7 @@ class PG : public boost::intrusive_ref_counter< using submit_executer_fut = interruptible_future< submit_executer_ret>; submit_executer_fut submit_executer( - seastar::lw_shared_ptr ox, + OpsExecuter &&ox, const std::vector& ops); struct do_osd_ops_params_t; From 2b562b64a64777b1428e9ad3187b50619cbf1a4d Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Tue, 1 Oct 2024 13:11:31 -0700 Subject: [PATCH 21/21] crimson: remove unused OpsExecuter::rollback_obc_if_modified overload Signed-off-by: Samuel Just --- src/crimson/osd/ops_executer.h | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 068f510d1ef82..e770e825b32d0 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -548,7 +548,6 @@ OpsExecuter::flush_changes_n_do_ops_effects( template struct OpsExecuter::RollbackHelper { void rollback_obc_if_modified(); - void rollback_obc_if_modified(const std::error_code& e); OpsExecuter *ox; Func func; }; @@ -587,36 +586,6 @@ void OpsExecuter::RollbackHelper::rollback_obc_if_modified() } } -template -void OpsExecuter::RollbackHelper::rollback_obc_if_modified( - const std::error_code& e) -{ - // Oops, an operation had failed. do_osd_ops() altogether with - // OpsExecuter already dropped the ObjectStore::Transaction if - // there was any. However, this is not enough to completely - // rollback as we gave OpsExecuter the very single copy of `obc` - // we maintain and we did it for both reading and writing. - // Now all modifications must be reverted. - // - // The conditional's purpose is to efficiently handle hot errors - // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or - // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients - // typically append them before any write. If OpsExecuter hasn't - // seen any modifying operation, `obc` is supposed to be kept - // unchanged. - assert(ox); - const auto need_rollback = ox->has_seen_write(); - crimson::get_logger(ceph_subsys_osd).debug( - "{}: object {} got error {}, need_rollback={}", - __func__, - ox->obc->get_oid(), - e, - need_rollback); - if (need_rollback) { - func(ox->obc); - } -} - // PgOpsExecuter -- a class for executing ops targeting a certain PG. class PgOpsExecuter { template