Skip to content

Commit

Permalink
Merge pull request #13056 from rockwotj/optimize-thread-worker
Browse files Browse the repository at this point in the history
Optimize thread_worker
  • Loading branch information
rockwotj authored Aug 31, 2023
2 parents ea21f08 + 1edf4c8 commit 603c9bc
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ server::server(
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cluster::tx_registry_frontend>& tx_registry_frontend,
std::optional<qdc_monitor::config> qdc_config,
ssx::thread_worker& tw,
ssx::singleton_thread_worker& tw,
const std::unique_ptr<pandaproxy::schema_registry::api>& sr) noexcept
: net::server(cfg, klog)
, _smp_group(smp)
Expand Down
6 changes: 3 additions & 3 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class server final
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cluster::tx_registry_frontend>&,
std::optional<qdc_monitor::config>,
ssx::thread_worker&,
ssx::singleton_thread_worker&,
const std::unique_ptr<pandaproxy::schema_registry::api>&) noexcept;

~server() noexcept override = default;
Expand Down Expand Up @@ -161,7 +161,7 @@ class server final

latency_probe& latency_probe() { return *_probe; }

ssx::thread_worker& thread_worker() { return _thread_worker; }
ssx::singleton_thread_worker& thread_worker() { return _thread_worker; }

const std::unique_ptr<pandaproxy::schema_registry::api>& schema_registry() {
return _schema_registry;
Expand Down Expand Up @@ -222,7 +222,7 @@ class server final
ssx::metrics::metric_groups _metrics
= ssx::metrics::metric_groups::make_internal();
std::unique_ptr<class latency_probe> _probe;
ssx::thread_worker& _thread_worker;
ssx::singleton_thread_worker& _thread_worker;
std::unique_ptr<replica_selector> _replica_selector;
const std::unique_ptr<pandaproxy::schema_registry::api>& _schema_registry;
};
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2136,7 +2136,7 @@ void application::start_runtime_services(
});
});

thread_worker->start().get();
thread_worker->start({.name = "worker"}).get();

// single instance
node_status_backend.invoke_on_all(&cluster::node_status_backend::start)
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class application {

std::unique_ptr<cluster::controller> controller;

std::unique_ptr<ssx::thread_worker> thread_worker;
std::unique_ptr<ssx::singleton_thread_worker> thread_worker;

ss::sharded<kafka::server> _kafka_server;

Expand Down
2 changes: 1 addition & 1 deletion src/v/security/gssapi_authenticator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class gssapi_authenticator::impl {
};

gssapi_authenticator::gssapi_authenticator(
ssx::thread_worker& thread_worker,
ssx::singleton_thread_worker& thread_worker,
std::vector<gssapi_rule> rules,
ss::sstring principal,
ss::sstring keytab)
Expand Down
4 changes: 2 additions & 2 deletions src/v/security/gssapi_authenticator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class gssapi_authenticator final : public sasl_mechanism {
static constexpr const char* name = "GSSAPI";

gssapi_authenticator(
ssx::thread_worker& thread_worker,
ssx::singleton_thread_worker& thread_worker,
std::vector<gssapi_rule> rules,
ss::sstring principal,
ss::sstring keytab);
Expand All @@ -40,7 +40,7 @@ class gssapi_authenticator final : public sasl_mechanism {
friend std::ostream&
operator<<(std::ostream& os, gssapi_authenticator::state const s);

ssx::thread_worker& _worker;
ssx::singleton_thread_worker& _worker;
security::acl_principal _principal;
state _state{state::init};
class impl;
Expand Down
2 changes: 1 addition & 1 deletion src/v/ssx/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@

namespace ssx {

class thread_worker;
class singleton_thread_worker;

} // namespace ssx
11 changes: 9 additions & 2 deletions src/v/ssx/tests/thread_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ template<size_t tries, size_t stop_at>
auto thread_worker_test() {
BOOST_REQUIRE_GT(ss::smp::count, 1);

auto w = ssx::thread_worker{};
w.start().get();
auto w = ssx::singleton_thread_worker{};
w.start({}).get();

std::vector<std::vector<ss::future<move_only>>> all_results(ss::smp::count);

Expand Down Expand Up @@ -73,6 +73,13 @@ auto thread_worker_test() {
}
}

SEASTAR_THREAD_TEST_CASE(thread_worker_can_be_stopped_before_its_started) {
// During application startup, we register a defer for stopping before the
// worker is started.
auto w = ssx::singleton_thread_worker{};
w.stop().get();
}

SEASTAR_THREAD_TEST_CASE(thread_worker_single_cancel_after) {
// cancel thread worker at the after all of the gets
thread_worker_test<1, 1>();
Expand Down
4 changes: 2 additions & 2 deletions src/v/ssx/tests/thread_worker_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#include <seastar/testing/perf_tests.hh>

ss::future<> run_test(size_t data_size) {
auto w = ssx::thread_worker{};
co_await w.start();
auto w = ssx::singleton_thread_worker{};
co_await w.start({});

std::vector<ss::future<size_t>> vec;
vec.reserve(data_size);
Expand Down
Loading

0 comments on commit 603c9bc

Please sign in to comment.