diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index b81a85382f0e2..951b0ea012576 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -101,7 +101,7 @@ server::server( ss::sharded& tx_gateway_frontend, ss::sharded& tx_registry_frontend, std::optional qdc_config, - ssx::thread_worker& tw, + ssx::singleton_thread_worker& tw, const std::unique_ptr& sr) noexcept : net::server(cfg, klog) , _smp_group(smp) diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index afa0d3fbfe5e4..c2fcddd495c86 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -65,7 +65,7 @@ class server final ss::sharded&, ss::sharded&, std::optional, - ssx::thread_worker&, + ssx::singleton_thread_worker&, const std::unique_ptr&) noexcept; ~server() noexcept override = default; @@ -161,7 +161,7 @@ class server final latency_probe& latency_probe() { return *_probe; } - ssx::thread_worker& thread_worker() { return _thread_worker; } + ssx::singleton_thread_worker& thread_worker() { return _thread_worker; } const std::unique_ptr& schema_registry() { return _schema_registry; @@ -222,7 +222,7 @@ class server final ssx::metrics::metric_groups _metrics = ssx::metrics::metric_groups::make_internal(); std::unique_ptr _probe; - ssx::thread_worker& _thread_worker; + ssx::singleton_thread_worker& _thread_worker; std::unique_ptr _replica_selector; const std::unique_ptr& _schema_registry; }; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 05192fe3fae7d..acc041fe49447 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1059,7 +1059,9 @@ void application::wire_up_redpanda_services( resources::available_memory::local().register_metrics(); }).get(); - construct_single_service(thread_worker); + construct_single_service( + thread_worker, + ssx::thread_worker_config{.pin_to_shard_core = false, .name = "thread"}); // cluster syschecks::systemd_message("Initializing connection cache").get(); diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 3aa0ae8ac49a4..0a54199c4d814 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -143,7 +143,7 @@ class application { std::unique_ptr controller; - std::unique_ptr thread_worker; + std::unique_ptr thread_worker; ss::sharded _kafka_server; diff --git a/src/v/security/gssapi_authenticator.cc b/src/v/security/gssapi_authenticator.cc index 873ce420efbe9..09d62e69f9156 100644 --- a/src/v/security/gssapi_authenticator.cc +++ b/src/v/security/gssapi_authenticator.cc @@ -169,7 +169,7 @@ class gssapi_authenticator::impl { }; gssapi_authenticator::gssapi_authenticator( - ssx::thread_worker& thread_worker, + ssx::singleton_thread_worker& thread_worker, std::vector rules, ss::sstring principal, ss::sstring keytab) diff --git a/src/v/security/gssapi_authenticator.h b/src/v/security/gssapi_authenticator.h index 36a62a6943c15..3908009dedbb0 100644 --- a/src/v/security/gssapi_authenticator.h +++ b/src/v/security/gssapi_authenticator.h @@ -21,7 +21,7 @@ class gssapi_authenticator final : public sasl_mechanism { static constexpr const char* name = "GSSAPI"; gssapi_authenticator( - ssx::thread_worker& thread_worker, + ssx::singleton_thread_worker& thread_worker, std::vector rules, ss::sstring principal, ss::sstring keytab); @@ -40,7 +40,7 @@ class gssapi_authenticator final : public sasl_mechanism { friend std::ostream& operator<<(std::ostream& os, gssapi_authenticator::state const s); - ssx::thread_worker& _worker; + ssx::singleton_thread_worker& _worker; security::acl_principal _principal; state _state{state::init}; class impl; diff --git a/src/v/ssx/fwd.h b/src/v/ssx/fwd.h index 04717c769f53d..2decf6c83de28 100644 --- a/src/v/ssx/fwd.h +++ b/src/v/ssx/fwd.h @@ -13,6 +13,6 @@ namespace ssx { -class thread_worker; +class singleton_thread_worker; } // namespace ssx diff --git a/src/v/ssx/tests/thread_worker.cc b/src/v/ssx/tests/thread_worker.cc index 95d1d7736c3ca..69d08a5075273 100644 --- a/src/v/ssx/tests/thread_worker.cc +++ b/src/v/ssx/tests/thread_worker.cc @@ -34,7 +34,7 @@ template auto thread_worker_test() { BOOST_REQUIRE_GT(ss::smp::count, 1); - auto w = ssx::thread_worker{}; + auto w = ssx::singleton_thread_worker{{}}; w.start().get(); std::vector>> all_results(ss::smp::count); diff --git a/src/v/ssx/tests/thread_worker_bench.cc b/src/v/ssx/tests/thread_worker_bench.cc index e3167eda103a3..409ce97652da6 100644 --- a/src/v/ssx/tests/thread_worker_bench.cc +++ b/src/v/ssx/tests/thread_worker_bench.cc @@ -13,7 +13,7 @@ #include ss::future<> run_test(size_t data_size) { - auto w = ssx::thread_worker{}; + auto w = ssx::singleton_thread_worker{{}}; co_await w.start(); std::vector> vec; diff --git a/src/v/ssx/thread_worker.h b/src/v/ssx/thread_worker.h index 357cb10dfaf5a..838d003404172 100644 --- a/src/v/ssx/thread_worker.h +++ b/src/v/ssx/thread_worker.h @@ -21,19 +21,39 @@ #include #include #include +#include +#include #include +#include #include #include +#include #include #include #include +#include +#include #include #include namespace ssx { +/** + * configuration for single thread_worker + */ +struct thread_worker_config { + // If the thread should be pinned to a the same core as the shard it was + // created on. + bool pin_to_shard_core = false; + // NOTE: pthread names must be less than 12 bytes + // + // Why 12 bytes? pthread only support names of length 16 and we want to + // suffix threads with the core it's associated with via `-xxx`. + ss::sstring name = "thread"; +}; + namespace impl { class task_base { @@ -44,13 +64,36 @@ class task_base { task_base& operator=(task_base&&) = delete; task_base& operator=(task_base const&) = delete; - virtual void process(ss::alien::instance&, ss::shard_id) = 0; + virtual ss::stop_iteration process(ss::alien::instance&, ss::shard_id) = 0; virtual void set_exception(ss::alien::instance&, ss::shard_id, std::exception_ptr) = 0; virtual ~task_base() = default; }; +class stop_task final : public task_base { +public: + ss::future<> get_future() noexcept { return _promise.get_future(); } + + ss::stop_iteration + process(ss::alien::instance& alien, ss::shard_id shard) final { + ss::alien::run_on( + alien, shard, [this]() noexcept { _promise.set_value(); }); + return ss::stop_iteration::yes; + } + + void set_exception( + ss::alien::instance& alien, + ss::shard_id shard, + std::exception_ptr p) final { + ss::alien::run_on( + alien, shard, [this, p]() noexcept { _promise.set_exception(p); }); + } + +private: + ss::promise<> _promise; +}; + template class worker_task final : public task_base { using value_type = std::invoke_result_t; @@ -63,7 +106,8 @@ class worker_task final : public task_base { return _promise.get_future(); } - void process(ss::alien::instance& alien, ss::shard_id shard) final { + ss::stop_iteration + process(ss::alien::instance& alien, ss::shard_id shard) final { try { if constexpr (std::is_void_v) { _func(); @@ -79,6 +123,7 @@ class worker_task final : public task_base { } catch (...) { set_exception(alien, shard, std::current_exception()); }; + return ss::stop_iteration::no; } void set_exception( @@ -95,13 +140,11 @@ class worker_task final : public task_base { }; class thread_worker { - static constexpr eventfd_t RUNNABLE_INIT = 0; - static constexpr eventfd_t RUNNABLE_READY = 1; - static constexpr eventfd_t RUNNABLE_STOP = 2; - public: - thread_worker() - : _alien{ss::engine().alien()} + explicit thread_worker(thread_worker_config* config) + : _alien(&ss::engine().alien()) + , _config(config) + , _cpu_id(::sched_getcpu()) , _shard_id(ss::this_shard_id()) {} void start() { @@ -110,73 +153,111 @@ class thread_worker { ss::future<> stop() { co_await _gate.close(); - _ready.signal(RUNNABLE_STOP); + auto task = impl::stop_task(); + vassert( + _queue.push(&task), + "expected to be able to push a task onto the queue"); + _ready.signal(1); + co_await task.get_future(); if (_worker.joinable()) { _worker.join(); } } template - auto submit(Func func) { + auto submit(Func func) -> + typename ss::futurize>::type { auto gh = _gate.hold(); - return _mutex.get_units().then( - [this, func{std::move(func)}, gh{std::move(gh)}](auto units) mutable { - vassert(_task == nullptr, "Only one task supported at a time"); - auto task = std::make_unique>( - std::move(func)); - auto f = task->get_future().finally( - [this, gh{std::move(gh)}, units{std::move(units)}] { - _task.reset(); - }); - _task = std::move(task); - _ready.signal(RUNNABLE_READY); - return f; - }); + auto units = co_await ss::get_units(_semaphore, 1); + auto task = impl::worker_task(std::move(func)); + vassert( + _queue.push(&task), + "expected to be able to push a task onto the queue"); + _ready.signal(1); + co_return co_await task.get_future(); } private: void configure_thread() { - ::pthread_setname_np(::pthread_self(), "ssx::thread_worker"); + auto name = ss::format("{}_{}", _config->name.c_str(), _shard_id); + ss::throw_pthread_error( + ::pthread_setname_np(::pthread_self(), name.c_str())); + if (_config->pin_to_shard_core) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(_cpu_id, &cs); + ss::throw_pthread_error( + pthread_setaffinity_np(pthread_self(), sizeof(cs), &cs)); + } // Ignore all signals - let seastar handle them sigset_t mask; ::sigfillset(&mask); ss::throw_pthread_error(::pthread_sigmask(SIG_BLOCK, &mask, nullptr)); } - size_t run() { + void run() { configure_thread(); + std::array items{}; while (true) { eventfd_t result = 0; auto r = ::eventfd_read(_ready.get_read_fd(), &result); - if (r < 0 || result != RUNNABLE_READY) { - if (_task) { - _task->set_exception( - _alien, - _shard_id, - std::make_exception_ptr(ss::abort_requested_exception{})); + while (!_queue.empty()) { + size_t num_tasks = _queue.pop(items.data(), items.size()); + if (process_tasks({items.data(), num_tasks})) { + // stop was requested, we can be done + vassert( + _queue.empty(), + "expected a stop task to be the last task in the task " + "queue, but there were {} available to process.", + _queue.read_available()); + return; } } if (r < 0) { - return r; - } else if (result == RUNNABLE_STOP) { - return 0; + return; + } + } + } + + ss::stop_iteration process_tasks(std::span tasks) { + for (size_t i = 0; i < tasks.size(); ++i) { + if (tasks[i]->process(*_alien, _shard_id)) { + fail_tasks(tasks.subspan(i + 1)); + return ss::stop_iteration::yes; } - _task->process(_alien, _shard_id); } + return ss::stop_iteration::no; } - ss::alien::instance& _alien; + void fail_tasks(std::span tasks) { + for (auto* task : tasks) { + task->set_exception( + *_alien, + _shard_id, + std::make_exception_ptr(ss::abort_requested_exception{})); + } + } + + ss::alien::instance* _alien; + thread_worker_config* _config; + unsigned _cpu_id; ss::shard_id _shard_id; std::thread _worker; ss::gate _gate; - mutex _mutex; - seastar::writeable_eventfd _ready{RUNNABLE_INIT}; - std::unique_ptr _task{nullptr}; + ss::writeable_eventfd _ready; + + constexpr static size_t max_length = 128; + ss::semaphore _semaphore = ss::semaphore(max_length); + boost::lockfree:: + spsc_queue> + _queue; }; } // namespace impl /** - * thread_worker runs tasks in a std::thread. + * singleton_thread_worker runs tasks in a std::thread, it's expected that only + * a single core manages the lifetime of this shard, and all other shards submit + * their requests through that shard. * * By running in a std::thread, it's possible to make blocking calls such as * file I/O and posix thread primitives without blocking a reactor. @@ -186,10 +267,11 @@ class thread_worker { * class is most suited to run for the lifetime of an application, rather than * short-lived. */ -class thread_worker { +class singleton_thread_worker { public: static constexpr ss::shard_id shard_id = 0; - thread_worker() = default; + explicit singleton_thread_worker(thread_worker_config config) + : _config(std::move(config)){}; /** * start the background thread. @@ -237,7 +319,52 @@ class thread_worker { private: ss::sharded _gate; - impl::thread_worker _impl; + thread_worker_config _config; + impl::thread_worker _impl = impl::thread_worker(&_config); +}; + +/** + * sharded_thread_worker runs tasks in a std::thread, and creates a thread core + * each reactor core so that there is a twin std::thread for each reactor core. + * + * By running in a std::thread, it's possible to make blocking calls such as + * file I/O and posix thread primitives without blocking a reactor. + * + * The thread worker will drain all operations before joining the thread in + * stop(), but it should be noted that joining a thread may block. As such, this + * class is most suited to run for the lifetime of an application, rather than + * short-lived. + */ +class sharded_thread_worker { +public: + explicit sharded_thread_worker(thread_worker_config config) + : _config(std::move(config)){}; + + /** + * start the background thread. + */ + ss::future<> start() { co_await _impl.start(&_config); } + + /** + * stop and join the background thread. + * + * Although the work has completed, it should be noted that joining a thread + * may block the reactor. + */ + ss::future<> stop() { co_await _impl.stop(); } + + /** + * submit a task to the background thread + */ + template + auto submit(Func func) -> + typename ss::futurize>::type { + return _impl.local().submit(std::move(func)); + } + +private: + thread_worker_config _config; + ss::sharded _impl; }; } // namespace ssx