diff --git a/src/crimson/common/log.h b/src/crimson/common/log.h index 4f564ac044d05..c38b225c94b4f 100644 --- a/src/crimson/common/log.h +++ b/src/crimson/common/log.h @@ -90,7 +90,7 @@ static inline seastar::log_level to_log_level(int level) { #define SUBLOGDPP(subname_, level_, MSG, dpp, ...) \ LOGGER(subname_).log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__) #define SUBLOGDPPI(subname_, level_, MSG, dpp, ...) \ - LOGGER(subname_).log(level_, "{} {}: " MSG, \ + LOGGER(subname_).log(level_, "{} {} {}: " MSG, \ interruptor::get_interrupt_cond(), dpp, FNAME , ##__VA_ARGS__) #define SUBTRACEDPP(subname_, ...) SUBLOGDPP(subname_, seastar::log_level::trace, __VA_ARGS__) #define SUBTRACEDPPI(subname_, ...) SUBLOGDPPI(subname_, seastar::log_level::trace, __VA_ARGS__) @@ -106,7 +106,7 @@ static inline seastar::log_level to_log_level(int level) { #define LOGDPP(level_, MSG, dpp, ...) \ LOCAL_LOGGER.log(level_, "{} {}: " MSG, dpp, FNAME , ##__VA_ARGS__) #define LOGDPPI(level_, MSG, dpp, ...) \ - LOCAL_LOGGER.log(level_, "{} {}: " MSG, \ + LOCAL_LOGGER.log(level_, "{} {} {}: " MSG, \ interruptor::get_interrupt_cond(), dpp, FNAME , ##__VA_ARGS__) #define TRACEDPP(...) LOGDPP(seastar::log_level::trace, __VA_ARGS__) #define TRACEDPPI(...) LOGDPPI(seastar::log_level::trace, __VA_ARGS__) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index df4f73d4077d1..9bf60140374c8 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -504,7 +504,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps( auto p = ss.clone_snaps.find(clone); if (p == ss.clone_snaps.end()) { logger().error( - "OpsExecutor::do_list_snaps: {} has inconsistent " + "OpsExecuter::do_list_snaps: {} has inconsistent " "clone_snaps, missing clone {}", os.oi.soid, clone); @@ -518,7 +518,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps( auto p = ss.clone_overlap.find(clone); if (p == ss.clone_overlap.end()) { logger().error( - "OpsExecutor::do_list_snaps: {} has inconsistent " + "OpsExecuter::do_list_snaps: {} has inconsistent " "clone_overlap, missing clone {}", os.oi.soid, clone); @@ -532,7 +532,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps( auto p = ss.clone_size.find(clone); if (p == ss.clone_size.end()) { logger().error( - "OpsExecutor::do_list_snaps: {} has inconsistent " + "OpsExecuter::do_list_snaps: {} has inconsistent " "clone_size, missing clone {}", os.oi.soid, clone); @@ -551,7 +551,7 @@ OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps( } resp.seq = ss.seq; logger().error( - "OpsExecutor::do_list_snaps: {}, resp.clones.size(): {}", + "OpsExecuter::do_list_snaps: {}, resp.clones.size(): {}", os.oi.soid, resp.clones.size()); resp.encode(osd_op.outdata); @@ -678,16 +678,32 @@ OpsExecuter::do_execute_op(OSDOp& osd_op) whiteout = true; } return do_write_op([this, whiteout](auto& backend, auto& os, auto& txn) { - int num_bytes = 0; - // Calculate num_bytes to be removed - if (obc->obs.oi.soid.is_snap()) { - ceph_assert(obc->ssc->snapset.clone_overlap.count(obc->obs.oi.soid.snap)); - num_bytes = obc->ssc->snapset.get_clone_bytes(obc->obs.oi.soid.snap); - } else { - num_bytes = obc->obs.oi.size; - } - return backend.remove(os, txn, *osd_op_params, - delta_stats, whiteout, num_bytes); + struct emptyctx_t {}; + return with_effect_on_obc( + emptyctx_t{}, + [&](auto &ctx) { + int num_bytes = 0; + // Calculate num_bytes to be removed + if (obc->obs.oi.soid.is_snap()) { + ceph_assert(obc->ssc->snapset.clone_overlap.count( + obc->obs.oi.soid.snap)); + num_bytes = obc->ssc->snapset.get_clone_bytes( + obc->obs.oi.soid.snap); + } else { + num_bytes = obc->obs.oi.size; + } + return backend.remove(os, txn, *osd_op_params, + delta_stats, whiteout, num_bytes); + }, + [](auto &&ctx, ObjectContextRef obc, Ref) { + return seastar::do_for_each( + obc->watchers, + [](auto &p) { return p.second->remove(); } + ).then([obc] { + obc->watchers.clear(); + return seastar::now(); + }); + }); }); } case CEPH_OSD_OP_CALL: @@ -957,7 +973,7 @@ void OpsExecuter::CloningContext::apply_to( processed_obc.ssc->snapset = std::move(new_snapset); } -OpsExecuter::interruptible_future> +std::vector OpsExecuter::flush_clone_metadata( std::vector&& log_entries, SnapMapper& snap_mapper, @@ -965,7 +981,6 @@ OpsExecuter::flush_clone_metadata( ceph::os::Transaction& txn) { assert(!txn.empty()); - auto maybe_snap_mapped = interruptor::now(); update_clone_overlap(); if (cloning_ctx) { std::move(*cloning_ctx).apply_to(log_entries, *obc); @@ -977,12 +992,7 @@ OpsExecuter::flush_clone_metadata( } logger().debug("{} done, initial snapset={}, new snapset={}", __func__, obc->obs.oi.soid, obc->ssc->snapset); - return std::move( - maybe_snap_mapped - ).then_interruptible([log_entries=std::move(log_entries)]() mutable { - return interruptor::make_ready_future>( - std::move(log_entries)); - }); + return std::move(log_entries); } ObjectContextRef OpsExecuter::prepare_clone( diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 0dea7d0515e93..e770e825b32d0 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -40,7 +40,7 @@ namespace crimson::osd { class PG; // OpsExecuter -- a class for executing ops targeting a certain object. -class OpsExecuter : public seastar::enable_lw_shared_from_this { +class OpsExecuter { friend class SnapTrimObjSubEvent; using call_errorator = crimson::errorator< @@ -170,16 +170,12 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { object_stat_sum_t delta_stats; private: - // an operation can be divided into two stages: main and effect-exposing - // one. The former is performed immediately on call to `do_osd_op()` while - // the later on `submit_changes()` – after successfully processing main - // stages of all involved operations. When any stage fails, none of all - // scheduled effect-exposing stages will be executed. - // when operation requires this division, some variant of `with_effect()` - // should be used. + // with_effect can be used to schedule operations to be performed + // at commit time. effects will be discarded if the operation does + // not commit. struct effect_t { // an effect can affect PG, i.e. create a watch timeout - virtual osd_op_errorator::future<> execute(Ref pg) = 0; + virtual seastar::future<> execute(Ref pg) = 0; virtual ~effect_t() = default; }; @@ -213,10 +209,10 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { * execute_clone * * If snapc contains a snap which occurred logically after the last write - * seen by this object (see OpsExecutor::should_clone()), we first need + * seen by this object (see OpsExecuter::should_clone()), we first need * make a clone of the object at its current state. execute_clone primes * txn with that clone operation and returns an - * OpsExecutor::CloningContext which will allow us to fill in the corresponding + * OpsExecuter::CloningContext which will allow us to fill in the corresponding * metadata and log_entries once the operations have been processed. * * Note that this strategy differs from classic, which instead performs this @@ -267,7 +263,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { */ void update_clone_overlap(); - interruptible_future> flush_clone_metadata( + std::vector flush_clone_metadata( std::vector&& log_entries, SnapMapper& snap_mapper, OSDriver& osdriver, @@ -400,7 +396,7 @@ class OpsExecuter : public seastar::enable_lw_shared_from_this { execute_op(OSDOp& osd_op); using rep_op_fut_tuple = - std::tuple, osd_op_ierrorator::future<>>; + std::tuple, interruptible_future<>>; using rep_op_fut_t = interruptible_future; template @@ -475,7 +471,7 @@ auto OpsExecuter::with_effect_on_obc( effect_func(std::move(effect_func)), obc(std::move(obc)) { } - osd_op_errorator::future<> execute(Ref pg) final { + seastar::future<> execute(Ref pg) final { return std::move(effect_func)(std::move(ctx), std::move(obc), std::move(pg)); @@ -502,15 +498,14 @@ OpsExecuter::flush_changes_n_do_ops_effects( assert(obc); auto submitted = interruptor::now(); - auto all_completed = - interruptor::make_interruptible(osd_op_errorator::now()); + auto all_completed = interruptor::now(); if (cloning_ctx) { ceph_assert(want_mutate); } if (want_mutate) { - auto log_entries = co_await flush_clone_metadata( + auto log_entries = flush_clone_metadata( prepare_transaction(ops), snap_mapper, osdriver, @@ -536,7 +531,7 @@ OpsExecuter::flush_changes_n_do_ops_effects( // need extra ref pg due to apply_stats() which can be executed after // informing snap mapper all_completed = - std::move(all_completed).safe_then_interruptible([this, pg=this->pg] { + std::move(all_completed).then_interruptible([this, pg=this->pg] { // let's do the cleaning of `op_effects` in destructor return interruptor::do_for_each(op_effects, [pg=std::move(pg)](auto& op_effect) { @@ -552,21 +547,19 @@ OpsExecuter::flush_changes_n_do_ops_effects( template struct OpsExecuter::RollbackHelper { - void rollback_obc_if_modified(const std::error_code& e); - seastar::lw_shared_ptr ox; + void rollback_obc_if_modified(); + OpsExecuter *ox; Func func; }; template inline OpsExecuter::RollbackHelper OpsExecuter::create_rollbacker(Func&& func) { - return {shared_from_this(), std::forward(func)}; + return {this, std::forward(func)}; } - template -void OpsExecuter::RollbackHelper::rollback_obc_if_modified( - const std::error_code& e) +void OpsExecuter::RollbackHelper::rollback_obc_if_modified() { // Oops, an operation had failed. do_osd_ops() altogether with // OpsExecuter already dropped the ObjectStore::Transaction if @@ -584,10 +577,9 @@ void OpsExecuter::RollbackHelper::rollback_obc_if_modified( assert(ox); const auto need_rollback = ox->has_seen_write(); crimson::get_logger(ceph_subsys_osd).debug( - "{}: object {} got error {}, need_rollback={}", + "{}: object {} got error, need_rollback={}", __func__, ox->obc->get_oid(), - e, need_rollback); if (need_rollback) { func(ox->obc); diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index fb0432edb8f9a..fd8b049c0bf08 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -40,6 +40,37 @@ struct PerShardPipeline { } create_or_wait_pg; }; +struct PGPeeringPipeline { + struct AwaitMap : OrderedExclusivePhaseT { + static constexpr auto type_name = "PeeringEvent::PGPipeline::await_map"; + } await_map; + struct Process : OrderedExclusivePhaseT { + static constexpr auto type_name = "PeeringEvent::PGPipeline::process"; + } process; +}; + +struct CommonPGPipeline { + struct WaitForActive : OrderedExclusivePhaseT { + static constexpr auto type_name = "CommonPGPipeline:::wait_for_active"; + } wait_for_active; + struct RecoverMissing : OrderedConcurrentPhaseT { + static constexpr auto type_name = "CommonPGPipeline::recover_missing"; + } recover_missing; + struct CheckAlreadyCompleteGetObc : OrderedExclusivePhaseT { + static constexpr auto type_name = "CommonPGPipeline::check_already_complete_get_obc"; + } check_already_complete_get_obc; + struct LockOBC : OrderedConcurrentPhaseT { + static constexpr auto type_name = "CommonPGPipeline::lock_obc"; + } lock_obc; + struct Process : OrderedExclusivePhaseT { + static constexpr auto type_name = "CommonPGPipeline::process"; + } process; + struct WaitRepop : OrderedConcurrentPhaseT { + static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop"; + } wait_repop; +}; + + enum class OperationTypeCode { client_request = 0, peering_event, diff --git a/src/crimson/osd/osd_operation_external_tracking.h b/src/crimson/osd/osd_operation_external_tracking.h index 530732ba71028..d2786a95e4d3c 100644 --- a/src/crimson/osd/osd_operation_external_tracking.h +++ b/src/crimson/osd/osd_operation_external_tracking.h @@ -36,7 +36,6 @@ struct LttngBackend ClientRequest::PGPipeline::RecoverMissing:: BlockingEvent::ExitBarrierEvent::Backend, ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent::Backend, - ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::LockOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::LockOBC::BlockingEvent::ExitBarrierEvent::Backend, ClientRequest::PGPipeline::Process::BlockingEvent::Backend, @@ -117,10 +116,6 @@ struct LttngBackend const ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc& blocker) override { } - void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev, - const Operation& op, - const ClientRequest::PGPipeline::GetOBC& blocker) override { - } void handle(ClientRequest::PGPipeline::LockOBC::BlockingEvent& ev, const Operation& op, @@ -171,7 +166,6 @@ struct HistoricBackend ClientRequest::PGPipeline::RecoverMissing:: BlockingEvent::ExitBarrierEvent::Backend, ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent::Backend, - ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::LockOBC::BlockingEvent::Backend, ClientRequest::PGPipeline::LockOBC::BlockingEvent::ExitBarrierEvent::Backend, ClientRequest::PGPipeline::Process::BlockingEvent::Backend, @@ -252,11 +246,6 @@ struct HistoricBackend const ClientRequest::PGPipeline::CheckAlreadyCompleteGetObc& blocker) override { } - void handle(ClientRequest::PGPipeline::GetOBC::BlockingEvent& ev, - const Operation& op, - const ClientRequest::PGPipeline::GetOBC& blocker) override { - } - void handle(ClientRequest::PGPipeline::LockOBC::BlockingEvent& ev, const Operation& op, const ClientRequest::PGPipeline::LockOBC& blocker) override { diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 8e9a7c4d7490c..a89fb2c84bc56 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -403,11 +403,6 @@ ClientRequest::process_op( *pg, *this, this_instance_id); return do_process( ihref, pg, obc, this_instance_id - ).handle_error_interruptible( - crimson::ct_error::eagain::handle( - [this, pg, this_instance_id, &ihref]() mutable { - return process_op(ihref, pg, this_instance_id); - }) ); } ); @@ -437,7 +432,7 @@ ClientRequest::process_op( co_await std::move(process); } -ClientRequest::do_process_iertr::future<> +ClientRequest::interruptible_future<> ClientRequest::do_process( instance_handle_t &ihref, Ref pg, crimson::osd::ObjectContextRef obc, @@ -507,22 +502,128 @@ ClientRequest::do_process( co_return; } - auto [submitted, all_completed] = co_await pg->do_osd_ops( - m, r_conn, obc, op_info, snapc + OpsExecuter ox(pg, obc, op_info, *m, r_conn, snapc); + auto ret = co_await pg->run_executer( + ox, obc, op_info, m->ops + ).si_then([]() -> std::optional { + return std::nullopt; + }).handle_error_interruptible(crimson::ct_error::all_same_way( + [](auto e) -> std::optional { + return e; + }) ); - co_await std::move(submitted); - co_await ihref.enter_stage(client_pp(*pg).wait_repop, *this); + auto should_log_error = [](std::error_code e) -> bool { + switch (e.value()) { + case EDQUOT: + case ENOSPC: + case EAGAIN: + return false; + default: + return true; + } + }; - auto reply = co_await std::move(all_completed); + if (ret && !should_log_error(*ret)) { + co_await reply_op_error(pg, -ret->value()); + co_return; + } + + { + auto all_completed = interruptor::now(); + if (ret) { + assert(should_log_error(*ret)); + if (op_info.may_write()) { + auto rep_tid = pg->shard_services.get_tid(); + auto version = co_await pg->submit_error_log( + m, op_info, obc, *ret, rep_tid); + + all_completed = pg->complete_error_log( + rep_tid, version); + } + // simply return the error below, leaving all_completed alone + } else { + auto submitted = interruptor::now(); + std::tie(submitted, all_completed) = co_await pg->submit_executer( + std::move(ox), m->ops); + co_await std::move(submitted); + } + co_await ihref.enter_stage(client_pp(*pg).wait_repop, *this); + + co_await std::move(all_completed); + } co_await ihref.enter_stage(client_pp(*pg).send_reply, *this); - DEBUGDPP("{}.{}: sending response", - *pg, *this, this_instance_id); - // TODO: gate the crosscore sending - co_await interruptor::make_interruptible( - get_foreign_connection().send_with_throttling(std::move(reply)) - ); + + if (ret) { + int err = -ret->value(); + DEBUGDPP("{}: replying with error {}", *pg, *this, err); + + auto reply = crimson::make_message( + m.get(), err, pg->get_osdmap_epoch(), 0, false); + + if (!m->ops.empty() && m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { + reply->set_result(0); + } + + // For all ops except for CMPEXT, the correct error value is encoded + // in e. For CMPEXT, osdop.rval has the actual error value. + if (err == -ct_error::cmp_fail_error_value) { + assert(!m->ops.empty()); + for (auto &osdop : m->ops) { + if (osdop.rval < 0) { + reply->set_result(osdop.rval); + break; + } + } + } + + reply->set_enoent_reply_versions( + pg->peering_state.get_info().last_update, + pg->peering_state.get_info().last_user_version); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + + // TODO: gate the crosscore sending + co_await interruptor::make_interruptible( + get_foreign_connection().send_with_throttling(std::move(reply))); + } else { + int result = m->ops.empty() ? 0 : m->ops.back().rval.code; + if (op_info.may_read() && result >= 0) { + for (auto &osdop : m->ops) { + if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { + result = osdop.rval.code; + break; + } + } + } else if (result > 0 && op_info.may_write() && !op_info.allows_returnvec()) { + result = 0; + } else if (result < 0 && + (m->ops.empty() ? + 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { + result = 0; + } + auto reply = crimson::make_message( + m.get(), + result, + pg->get_osdmap_epoch(), + 0, + false); + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + if (obc->obs.exists) { + reply->set_reply_versions(pg->peering_state.get_info().last_update, + obc->obs.oi.user_version); + } else { + reply->set_reply_versions(pg->peering_state.get_info().last_update, + pg->peering_state.get_info().last_user_version); + } + + DEBUGDPP("{}.{}: sending response {}", + *pg, *this, this_instance_id, *m); + // TODO: gate the crosscore sending + co_await interruptor::make_interruptible( + get_foreign_connection().send_with_throttling(std::move(reply)) + ); + } } bool ClientRequest::is_misdirected(const PG& pg) const diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index ea7aade22ac75..6ee57e9874cd1 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -14,7 +14,6 @@ #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request_common.h" -#include "crimson/osd/osd_operations/common/pg_pipeline.h" #include "crimson/osd/pg_activation_blocker.h" #include "crimson/osd/pg_map.h" #include "crimson/osd/scrub/pg_scrubber.h" @@ -104,7 +103,6 @@ class ClientRequest final : public PhasedOperationT, PGPipeline::RecoverMissing::BlockingEvent, scrub::PGScrubber::BlockingEvent, PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, - PGPipeline::GetOBC::BlockingEvent, PGPipeline::LockOBC::BlockingEvent, PGPipeline::Process::BlockingEvent, PGPipeline::WaitRepop::BlockingEvent, @@ -276,12 +274,7 @@ class ClientRequest final : public PhasedOperationT, interruptible_future<> with_sequencer(FuncT&& func); interruptible_future<> reply_op_error(const Ref& pg, int err); - - using do_process_iertr = - ::crimson::interruptible::interruptible_errorator< - ::crimson::osd::IOInterruptCondition, - ::crimson::errorator>; - do_process_iertr::future<> do_process( + interruptible_future<> do_process( instance_handle_t &ihref, Ref pg, crimson::osd::ObjectContextRef obc, diff --git a/src/crimson/osd/osd_operations/common/pg_pipeline.h b/src/crimson/osd/osd_operations/common/pg_pipeline.h deleted file mode 100644 index 2b2d03ae4b3ed..0000000000000 --- a/src/crimson/osd/osd_operations/common/pg_pipeline.h +++ /dev/null @@ -1,40 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#pragma once - -#include "osd/osd_op_util.h" -#include "crimson/osd/osd_operation.h" - -namespace crimson::osd { - -class CommonPGPipeline { -protected: - friend class InternalClientRequest; - friend class SnapTrimEvent; - friend class SnapTrimObjSubEvent; - - struct WaitForActive : OrderedExclusivePhaseT { - static constexpr auto type_name = "CommonPGPipeline:::wait_for_active"; - } wait_for_active; - struct RecoverMissing : OrderedConcurrentPhaseT { - static constexpr auto type_name = "CommonPGPipeline::recover_missing"; - } recover_missing; - struct CheckAlreadyCompleteGetObc : OrderedExclusivePhaseT { - static constexpr auto type_name = "CommonPGPipeline::check_already_complete_get_obc"; - } check_already_complete_get_obc; - struct GetOBC : OrderedExclusivePhaseT { - static constexpr auto type_name = "CommonPGPipeline::get_obc"; - } get_obc; - struct LockOBC : OrderedConcurrentPhaseT { - static constexpr auto type_name = "CommonPGPipeline::lock_obc"; - } lock_obc; - struct Process : OrderedExclusivePhaseT { - static constexpr auto type_name = "CommonPGPipeline::process"; - } process; - struct WaitRepop : OrderedConcurrentPhaseT { - static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop"; - } wait_repop; -}; - -} // namespace crimson::osd diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index a19bb0826f004..9e5867caf8067 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -50,91 +50,107 @@ CommonPGPipeline& InternalClientRequest::client_pp() return pg->request_pg_pipeline; } +InternalClientRequest::interruptible_future<> +InternalClientRequest::do_process( + crimson::osd::ObjectContextRef obc, + std::vector &osd_ops) +{ + LOG_PREFIX(InternalClientRequest::do_process); + auto params = get_do_osd_ops_params(); + OpsExecuter ox( + pg, obc, op_info, params, params.get_connection(), SnapContext{}); + co_await pg->run_executer( + ox, obc, op_info, osd_ops + ).handle_error_interruptible( + crimson::ct_error::all_same_way( + [this, FNAME](auto e) { + ERRORDPPI("{}: got unexpected error {}", *pg, *this, e); + ceph_assert(0 == "should not return an error"); + return interruptor::now(); + }) + ); + + auto [submitted, completed] = co_await pg->submit_executer( + std::move(ox), osd_ops); + + co_await std::move(submitted); + co_await std::move(completed); +} + +InternalClientRequest::interruptible_future<> +InternalClientRequest::with_interruption() +{ + LOG_PREFIX(InternalClientRequest::with_interruption); + co_await enter_stage( + client_pp().wait_for_active + ); + + co_await with_blocking_event([this] (auto&& trigger) { + return pg->wait_for_active_blocker.wait(std::move(trigger)); + }); + + co_await enter_stage(client_pp().recover_missing); + + bool unfound = co_await do_recover_missing( + pg, get_target_oid(), osd_reqid_t()); + + if (unfound) { + throw std::system_error( + std::make_error_code(std::errc::operation_canceled), + fmt::format("{} is unfound, drop it!", get_target_oid())); + } + co_await enter_stage( + client_pp().check_already_complete_get_obc); + + DEBUGI("{}: getting obc lock", *this); + + auto osd_ops = create_osd_ops(); + + DEBUGI("InternalClientRequest: got {} OSDOps to execute", + std::size(osd_ops)); + [[maybe_unused]] const int ret = op_info.set_from_op( + std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); + assert(ret == 0); + // call with_locked_obc() in order, but wait concurrently for loading. + enter_stage_sync(client_pp().lock_obc); + + auto fut = pg->with_locked_obc( + get_target_oid(), op_info, + [&osd_ops, this](auto, auto obc) { + return enter_stage(client_pp().process + ).then_interruptible( + [obc=std::move(obc), &osd_ops, this]() mutable { + return do_process(std::move(obc), osd_ops); + }); + }).handle_error_interruptible( + crimson::ct_error::assert_all("unexpected error") + ); + co_await std::move(fut); + + logger().debug("{}: complete", *this); + co_await interruptor::make_interruptible(handle.complete()); + co_return; +} + seastar::future<> InternalClientRequest::start() { track_event(); - return crimson::common::handle_system_shutdown([this] { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("{}: in repeat", *this); - - return interruptor::with_interruption([this]() mutable { - return enter_stage( - client_pp().wait_for_active - ).then_interruptible([this] { - return with_blocking_event([this] (auto&& trigger) { - return pg->wait_for_active_blocker.wait(std::move(trigger)); - }); - }).then_interruptible([this] { - return enter_stage( - client_pp().recover_missing); - }).then_interruptible([this] { - return do_recover_missing(pg, get_target_oid(), osd_reqid_t()); - }).then_interruptible([this](bool unfound) { - if (unfound) { - throw std::system_error( - std::make_error_code(std::errc::operation_canceled), - fmt::format("{} is unfound, drop it!", get_target_oid())); - } - return enter_stage( - client_pp().get_obc); - }).then_interruptible([this] () -> PG::load_obc_iertr::future<> { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("{}: getting obc lock", *this); - return seastar::do_with(create_osd_ops(), - [this](auto& osd_ops) mutable { - LOG_PREFIX(InternalClientRequest::start); - DEBUGI("InternalClientRequest: got {} OSDOps to execute", - std::size(osd_ops)); - [[maybe_unused]] const int ret = op_info.set_from_op( - std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap()); - assert(ret == 0); - // call with_locked_obc() in order, but wait concurrently for loading. - enter_stage_sync(client_pp().lock_obc); - return pg->with_locked_obc(get_target_oid(), op_info, - [&osd_ops, this](auto, auto obc) { - return enter_stage(client_pp().process - ).then_interruptible( - [obc=std::move(obc), &osd_ops, this] { - return pg->do_osd_ops( - std::move(obc), - osd_ops, - std::as_const(op_info), - get_do_osd_ops_params() - ).safe_then_unpack_interruptible( - [](auto submitted, auto all_completed) { - return all_completed.handle_error_interruptible( - crimson::ct_error::eagain::handle([] { - return seastar::now(); - })); - }, crimson::ct_error::eagain::handle([] { - return interruptor::now(); - }) - ); - }); - }); - }); - }).si_then([this] { - logger().debug("{}: complete", *this); - return handle.complete(); - }).handle_error_interruptible( - PG::load_obc_ertr::all_same_way([] { - return seastar::now(); - }) - ); - }, [](std::exception_ptr eptr) { - return seastar::now(); - }, pg, start_epoch - - ).then([this] { - track_event(); - }).handle_exception_type([](std::system_error &error) { - logger().debug("error {}, message: {}", error.code(), error.what()); - return seastar::now(); - }).finally([this] { - logger().debug("{}: exit", *this); - handle.exit(); - }); + LOG_PREFIX(InternalClientRequest::start); + DEBUGI("{}: in repeat", *this); + + return interruptor::with_interruption([this]() mutable { + return with_interruption(); + }, [](std::exception_ptr eptr) { + return seastar::now(); + }, pg, start_epoch).then([this] { + track_event(); + }).handle_exception_type([](std::system_error &error) { + logger().debug("error {}, message: {}", error.code(), error.what()); + return seastar::now(); + }).finally([this] { + logger().debug("{}: exit", *this); + handle.exit(); }); } diff --git a/src/crimson/osd/osd_operations/internal_client_request.h b/src/crimson/osd/osd_operations/internal_client_request.h index f198e58464338..6023db0a8dbe2 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.h +++ b/src/crimson/osd/osd_operations/internal_client_request.h @@ -6,7 +6,6 @@ #include "crimson/common/type_helpers.h" #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request_common.h" -#include "crimson/osd/osd_operations/common/pg_pipeline.h" #include "crimson/osd/pg.h" #include "crimson/osd/pg_activation_blocker.h" @@ -41,6 +40,11 @@ class InternalClientRequest : public PhasedOperationT, CommonPGPipeline& client_pp(); + InternalClientRequest::interruptible_future<> with_interruption(); + InternalClientRequest::interruptible_future<> do_process( + crimson::osd::ObjectContextRef obc, + std::vector &osd_ops); + seastar::future<> do_process(); Ref pg; @@ -56,7 +60,7 @@ class InternalClientRequest : public PhasedOperationT, CommonPGPipeline::WaitForActive::BlockingEvent, PGActivationBlocker::BlockingEvent, CommonPGPipeline::RecoverMissing::BlockingEvent, - CommonPGPipeline::GetOBC::BlockingEvent, + CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, CommonPGPipeline::LockOBC::BlockingEvent, CommonPGPipeline::Process::BlockingEvent, CompletionEvent diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index 1e6bd957289ff..85de5c711d67c 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -23,15 +23,6 @@ class ShardServices; class PG; class BackfillRecovery; - struct PGPeeringPipeline { - struct AwaitMap : OrderedExclusivePhaseT { - static constexpr auto type_name = "PeeringEvent::PGPipeline::await_map"; - } await_map; - struct Process : OrderedExclusivePhaseT { - static constexpr auto type_name = "PeeringEvent::PGPipeline::process"; - } process; - }; - template class PeeringEvent : public PhasedOperationT { T* that() { diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index 7512b3d108dfc..9ed0b73cfb458 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -396,7 +396,7 @@ SnapTrimObjSubEvent::start() }); co_await enter_stage( - client_pp().get_obc); + client_pp().check_already_complete_get_obc); logger().debug("{}: getting obc for {}", *this, coid); // end of commonality diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h index 06d8f43c2f3c9..1164b3169d293 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.h +++ b/src/crimson/osd/osd_operations/snaptrim_event.h @@ -9,7 +9,6 @@ #include "crimson/osd/osdmap_gate.h" #include "crimson/osd/osd_operation.h" #include "crimson/common/subop_blocker.h" -#include "crimson/osd/osd_operations/common/pg_pipeline.h" #include "crimson/osd/pg.h" #include "crimson/osd/pg_activation_blocker.h" #include "osd/osd_types.h" @@ -170,7 +169,7 @@ class SnapTrimObjSubEvent : public PhasedOperationT { std::tuple< StartEvent, - CommonPGPipeline::GetOBC::BlockingEvent, + CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent, CommonPGPipeline::Process::BlockingEvent, CommonPGPipeline::WaitRepop::BlockingEvent, CompletionEvent diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 97d48c1fa454c..744a1dbc02b97 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -13,6 +13,9 @@ #include #include #include + +#include + #include "include/utime_fmt.h" #include "common/hobject.h" @@ -975,150 +978,6 @@ ObjectContextRef duplicate_obc(const ObjectContextRef &obc) { return object_context; } -template -PG::do_osd_ops_iertr::future> -PG::do_osd_ops_execute( - seastar::lw_shared_ptr ox, - ObjectContextRef obc, - const OpInfo &op_info, - Ref m, - std::vector& ops, - SuccessFunc&& success_func, - FailureFunc&& failure_func) -{ - assert(ox); - auto rollbacker = ox->create_rollbacker( - [object_context=duplicate_obc(obc)] (auto& obc) mutable { - obc->update_from(*object_context); - }); - auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func)); - return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) { - logger().debug( - "do_osd_ops_execute: object {} - handling op {}", - ox->get_target(), - ceph_osd_op_name(osd_op.op.op)); - return ox->execute_op(osd_op); - }).safe_then_interruptible([this, ox, &ops] { - logger().debug( - "do_osd_ops_execute: object {} all operations successful", - ox->get_target()); - // check for full - if ((ox->delta_stats.num_bytes > 0 || - ox->delta_stats.num_objects > 0) && - get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) { - const auto& m = ox->get_message(); - if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now - m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) { - logger().info(" full, but proceeding due to FULL_FORCE or MDS"); - } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) { - // they tried, they failed. - logger().info(" full, replying to FULL_TRY op"); - if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) - return interruptor::make_ready_future( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::edquot::make())); - else - return interruptor::make_ready_future( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::enospc::make())); - } else { - // drop request - logger().info(" full, dropping request (bad client)"); - return interruptor::make_ready_future( - seastar::now(), - OpsExecuter::osd_op_ierrorator::future<>( - crimson::ct_error::eagain::make())); - } - } - return std::move(*ox).flush_changes_n_do_ops_effects( - ops, - snap_mapper, - osdriver, - [this] (auto&& txn, - auto&& obc, - auto&& osd_op_p, - auto&& log_entries) { - logger().debug( - "do_osd_ops_execute: object {} submitting txn", - obc->get_oid()); - mutate_object(obc, txn, osd_op_p); - return submit_transaction( - std::move(obc), - std::move(txn), - std::move(osd_op_p), - std::move(log_entries)); - }); - }).safe_then_unpack_interruptible( - [success_func=std::move(success_func), rollbacker, this, failure_func_ptr, obc] - (auto submitted_fut, auto _all_completed_fut) mutable { - - auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple( - std::move(success_func), - crimson::ct_error::object_corrupted::handle( - [rollbacker, this, obc] (const std::error_code& e) mutable { - // this is a path for EIO. it's special because we want to fix the obejct - // and try again. that is, the layer above `PG::do_osd_ops` is supposed to - // restart the execution. - rollbacker.rollback_obc_if_modified(e); - return repair_object(obc->obs.oi.soid, - obc->obs.oi.version - ).then_interruptible([] { - return do_osd_ops_iertr::future{crimson::ct_error::eagain::make()}; - }); - }), OpsExecuter::osd_op_errorator::all_same_way( - [rollbacker, failure_func_ptr] - (const std::error_code& e) mutable { - // handle non-fatal errors only - ceph_assert(e.value() == EDQUOT || - e.value() == ENOSPC || - e.value() == EAGAIN); - rollbacker.rollback_obc_if_modified(e); - return (*failure_func_ptr)(e); - })); - - return PG::do_osd_ops_iertr::make_ready_future>( - std::move(submitted_fut), - std::move(all_completed_fut) - ); - }, OpsExecuter::osd_op_errorator::all_same_way( - [this, op_info, m, obc, - rollbacker, failure_func_ptr] - (const std::error_code& e) mutable { - ceph_tid_t rep_tid = shard_services.get_tid(); - rollbacker.rollback_obc_if_modified(e); - // record error log - auto maybe_submit_error_log = - interruptor::make_ready_future>(std::nullopt); - // call submit_error_log only for non-internal clients - if constexpr (!std::is_same_v) { - if(op_info.may_write()) { - maybe_submit_error_log = - submit_error_log(m, op_info, obc, e, rep_tid); - } - } - return maybe_submit_error_log.then_interruptible( - [this, failure_func_ptr, e, rep_tid] (auto version) { - auto all_completed = - [this, failure_func_ptr, e, rep_tid, version] { - if (version.has_value()) { - return complete_error_log(rep_tid, version.value() - ).then_interruptible([failure_func_ptr, e] { - return (*failure_func_ptr)(e); - }); - } else { - return (*failure_func_ptr)(e); - } - }; - return PG::do_osd_ops_iertr::make_ready_future>( - std::move(seastar::now()), - std::move(all_completed()) - ); - }); - })); -} - PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid, const eversion_t& version) { @@ -1148,7 +1007,7 @@ PG::interruptible_future<> PG::complete_error_log(const ceph_tid_t& rep_tid, return result; } -PG::interruptible_future> PG::submit_error_log( +PG::interruptible_future PG::submit_error_log( Ref m, const OpInfo &op_info, ObjectContextRef obc, @@ -1214,142 +1073,84 @@ PG::interruptible_future> PG::submit_error_log( get_collection_ref(), std::move(t) ).then([this] { peering_state.update_trim_to(); - return seastar::make_ready_future>(projected_last_update); + return seastar::make_ready_future(projected_last_update); }); }); }); } -PG::do_osd_ops_iertr::future>> -PG::do_osd_ops( - Ref m, - crimson::net::ConnectionXcoreRef conn, +PG::run_executer_fut PG::run_executer( + OpsExecuter &ox, ObjectContextRef obc, const OpInfo &op_info, - const SnapContext& snapc) + std::vector& ops) { - if (__builtin_expect(stopping, false)) { - throw crimson::common::system_shutdown_exception(); - } - return do_osd_ops_execute>( - seastar::make_lw_shared( - Ref{this}, obc, op_info, *m, conn, snapc), - obc, - op_info, - m, - m->ops, - // success_func - [this, m, obc, may_write = op_info.may_write(), - may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] { - // TODO: should stop at the first op which returns a negative retval, - // cmpext uses it for returning the index of first unmatched byte - int result = m->ops.empty() ? 0 : m->ops.back().rval.code; - if (may_read && result >= 0) { - for (auto &osdop : m->ops) { - if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { - result = osdop.rval.code; - break; - } - } - } else if (result > 0 && may_write && !rvec) { - result = 0; - } else if (result < 0 && (m->ops.empty() ? - 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) { - result = 0; - } - auto reply = crimson::make_message(m.get(), - result, - get_osdmap_epoch(), - 0, - false); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - logger().debug( - "do_osd_ops: {} - object {} sending reply", - *m, - m->get_hobj()); - if (obc->obs.exists) { - reply->set_reply_versions(peering_state.get_info().last_update, - obc->obs.oi.user_version); - } else { - reply->set_reply_versions(peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - } - return do_osd_ops_iertr::make_ready_future>( - std::move(reply)); - }, - // failure_func - [m, this] - (const std::error_code& e) { - logger().error("do_osd_ops_execute::failure_func {} got error: {}", - *m, e); - return log_reply(m, e); + LOG_PREFIX(PG::run_executer); + auto rollbacker = ox.create_rollbacker( + [stored_obc=duplicate_obc(obc)](auto &obc) mutable { + obc->update_from(*stored_obc); + }); + auto rollback_on_error = seastar::defer([&rollbacker] { + rollbacker.rollback_obc_if_modified(); }); -} -PG::do_osd_ops_iertr::future> -PG::log_reply( - Ref m, - const std::error_code& e) -{ - auto reply = crimson::make_message( - m.get(), -e.value(), get_osdmap_epoch(), 0, false); - if (m->ops.empty() ? 0 : - m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) { - reply->set_result(0); - } - // For all ops except for CMPEXT, the correct error value is encoded - // in e.value(). For CMPEXT, osdop.rval has the actual error value. - if (e.value() == ct_error::cmp_fail_error_value) { - assert(!m->ops.empty()); - for (auto &osdop : m->ops) { - if (osdop.rval < 0) { - reply->set_result(osdop.rval); - break; + for (auto &op: ops) { + DEBUGDPP("object {} handle op {}", *this, ox.get_target(), op); + co_await ox.execute_op(op); + } + DEBUGDPP("object {} all operations successful", *this, ox.get_target()); + + // check for full + if ((ox.delta_stats.num_bytes > 0 || + ox.delta_stats.num_objects > 0) && + get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) { + const auto& m = ox.get_message(); + if (m.get_reqid().name.is_mds() || // FIXME: ignore MDS for now + m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) { + INFODPP("full, but proceeding due to FULL_FORCE, or MDS", *this); + } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) { + // they tried, they failed. + INFODPP("full, replying to FULL_TRY op", *this); + if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) { + co_await run_executer_fut( + crimson::ct_error::edquot::make()); + } else { + co_await run_executer_fut( + crimson::ct_error::enospc::make()); } + } else { + // drop request + INFODPP("full, dropping request (bad client)", *this); + co_await run_executer_fut( + crimson::ct_error::eagain::make()); } } - reply->set_enoent_reply_versions( - peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - return do_osd_ops_iertr::make_ready_future>( - std::move(reply)); -} - -PG::do_osd_ops_iertr::future> -PG::do_osd_ops( - ObjectContextRef obc, - std::vector& ops, - const OpInfo &op_info, - const do_osd_ops_params_t &&msg_params) -{ - // This overload is generally used for internal client requests, - // use an empty SnapContext. - return seastar::do_with( - std::move(msg_params), - [=, this, &ops, &op_info](auto &msg_params) { - return do_osd_ops_execute( - seastar::make_lw_shared( - Ref{this}, - obc, - op_info, - msg_params, - msg_params.get_connection(), - SnapContext{} - ), - obc, - op_info, - Ref(), - ops, - // success_func - [] { - return do_osd_ops_iertr::now(); - }, - // failure_func - [] (const std::error_code& e) { - return do_osd_ops_iertr::now(); - }); - }); + rollback_on_error.cancel(); +} + +PG::submit_executer_fut PG::submit_executer( + OpsExecuter &&ox, + const std::vector& ops) { + LOG_PREFIX(PG::submit_executer); + // transaction must commit at this point + return std::move( + ox + ).flush_changes_n_do_ops_effects( + ops, + snap_mapper, + osdriver, + [FNAME, this](auto&& txn, + auto&& obc, + auto&& osd_op_p, + auto&& log_entries) { + DEBUGDPP("object {} submitting txn", *this, obc->get_oid()); + mutate_object(obc, txn, osd_op_p); + return submit_transaction( + std::move(obc), + std::move(txn), + std::move(osd_op_p), + std::move(log_entries)); + }); } PG::interruptible_future> PG::do_pg_ops(Ref m) diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index d8bbc56abcc46..604f49005ff04 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -621,7 +621,7 @@ class PG : public boost::intrusive_ref_counter< void dump_primary(Formatter*); interruptible_future<> complete_error_log(const ceph_tid_t& rep_tid, const eversion_t& version); - interruptible_future> submit_error_log( + interruptible_future submit_error_log( Ref m, const OpInfo &op_info, ObjectContextRef obc, @@ -645,41 +645,35 @@ class PG : public boost::intrusive_ref_counter< } } background_process_lock; - using do_osd_ops_ertr = crimson::errorator< - crimson::ct_error::eagain>; - using do_osd_ops_iertr = - ::crimson::interruptible::interruptible_errorator< - ::crimson::osd::IOInterruptCondition, - ::crimson::errorator>; - template - using pg_rep_op_fut_t = - std::tuple, - do_osd_ops_iertr::future>; - do_osd_ops_iertr::future>> do_osd_ops( - Ref m, - crimson::net::ConnectionXcoreRef conn, + using run_executer_ertr = crimson::compound_errorator_t< + OpsExecuter::osd_op_errorator, + crimson::errorator< + crimson::ct_error::edquot, + crimson::ct_error::eagain, + crimson::ct_error::enospc + > + >; + using run_executer_iertr = crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + run_executer_ertr>; + using run_executer_fut = run_executer_iertr::future<>; + run_executer_fut run_executer( + OpsExecuter &ox, ObjectContextRef obc, const OpInfo &op_info, - const SnapContext& snapc); + std::vector& ops); + + using submit_executer_ret = std::tuple< + interruptible_future<>, + interruptible_future<>>; + using submit_executer_fut = interruptible_future< + submit_executer_ret>; + submit_executer_fut submit_executer( + OpsExecuter &&ox, + const std::vector& ops); struct do_osd_ops_params_t; - do_osd_ops_iertr::future> log_reply( - Ref m, - const std::error_code& e); - do_osd_ops_iertr::future> do_osd_ops( - ObjectContextRef obc, - std::vector& ops, - const OpInfo &op_info, - const do_osd_ops_params_t &¶ms); - template - do_osd_ops_iertr::future> do_osd_ops_execute( - seastar::lw_shared_ptr ox, - ObjectContextRef obc, - const OpInfo &op_info, - Ref m, - std::vector& ops, - SuccessFunc&& success_func, - FailureFunc&& failure_func); + interruptible_future> do_pg_ops(Ref m); interruptible_future< std::tuple, interruptible_future<>>> diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index fa8201b61c28d..24a381b4cf7e2 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -1289,7 +1289,7 @@ void PGBackend::clone( const ObjectState& d_os, ceph::os::Transaction& txn) { - // See OpsExecutor::execute_clone documentation + // See OpsExecuter::execute_clone documentation txn.clone(coll->get_cid(), ghobject_t{os.oi.soid}, ghobject_t{d_os.oi.soid}); { ceph::bufferlist bv;