From ac790563b17cdc50223ce88de23a4379b1ab6c34 Mon Sep 17 00:00:00 2001 From: Yubin Ruan Date: Fri, 22 Mar 2024 14:42:23 +0800 Subject: [PATCH] Make async_logger::flush() synchronous and wait for the flush to complete --- include/spdlog/async_logger-inl.h | 12 ++++--- include/spdlog/details/thread_pool-inl.h | 11 ++++-- include/spdlog/details/thread_pool.h | 19 +++++++++-- tests/test_async.cpp | 43 ++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 11 deletions(-) diff --git a/include/spdlog/async_logger-inl.h b/include/spdlog/async_logger-inl.h index 1e7947985..499800d88 100644 --- a/include/spdlog/async_logger-inl.h +++ b/include/spdlog/async_logger-inl.h @@ -43,13 +43,15 @@ SPDLOG_LOGGER_CATCH(msg.source) } // send flush request to the thread pool -SPDLOG_INLINE void spdlog::async_logger::flush_(){ - SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){ - pool_ptr->post_flush(shared_from_this(), overflow_policy_); -} -else { +SPDLOG_INLINE void spdlog::async_logger::flush_(){SPDLOG_TRY{auto pool_ptr = thread_pool_.lock(); +if (!pool_ptr) { throw_spdlog_ex("async flush: thread pool doesn't exist anymore"); } + +std::future future = pool_ptr->post_flush(shared_from_this(), overflow_policy_); +// Wait for the flush operation to complete. +// This might throw exception if the flush message get dropped because of overflow. +future.get(); } SPDLOG_LOGGER_CATCH(source_loc()) } diff --git a/include/spdlog/details/thread_pool-inl.h b/include/spdlog/details/thread_pool-inl.h index 17e01c092..ccc1dc971 100644 --- a/include/spdlog/details/thread_pool-inl.h +++ b/include/spdlog/details/thread_pool-inl.h @@ -62,9 +62,13 @@ void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr, post_async_msg_(std::move(async_m), overflow_policy); } -void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr, - async_overflow_policy overflow_policy) { - post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy); +std::future SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr, + async_overflow_policy overflow_policy) { + std::promise promise; + std::future future = promise.get_future(); + post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, std::move(promise)), + overflow_policy); + return future; } size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); } @@ -108,6 +112,7 @@ bool SPDLOG_INLINE thread_pool::process_next_msg_() { } case async_msg_type::flush: { incoming_async_msg.worker_ptr->backend_flush_(); + incoming_async_msg.flush_promise.set_value(); return true; } diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h index f22b07821..286352e31 100644 --- a/include/spdlog/details/thread_pool.h +++ b/include/spdlog/details/thread_pool.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -27,6 +28,7 @@ enum class async_msg_type { log, flush, terminate }; struct async_msg : log_msg_buffer { async_msg_type msg_type{async_msg_type::log}; async_logger_ptr worker_ptr; + std::promise flush_promise; async_msg() = default; ~async_msg() = default; @@ -56,12 +58,22 @@ struct async_msg : log_msg_buffer { async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m) : log_msg_buffer{m}, msg_type{the_type}, - worker_ptr{std::move(worker)} {} + worker_ptr{std::move(worker)}, + flush_promise{} {} async_msg(async_logger_ptr &&worker, async_msg_type the_type) : log_msg_buffer{}, msg_type{the_type}, - worker_ptr{std::move(worker)} {} + worker_ptr{std::move(worker)}, + flush_promise{} {} + + async_msg(async_logger_ptr &&worker, + async_msg_type the_type, + std::promise &&promise) + : log_msg_buffer{}, + msg_type{the_type}, + worker_ptr{std::move(worker)}, + flush_promise{std::move(promise)} {} explicit async_msg(async_msg_type the_type) : async_msg{nullptr, the_type} {} @@ -88,7 +100,8 @@ class SPDLOG_API thread_pool { void post_log(async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy); - void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy); + std::future post_flush(async_logger_ptr &&worker_ptr, + async_overflow_policy overflow_policy); size_t overrun_counter(); void reset_overrun_counter(); size_t discard_counter(); diff --git a/tests/test_async.cpp b/tests/test_async.cpp index 76fdd7c6b..0782c274e 100644 --- a/tests/test_async.cpp +++ b/tests/test_async.cpp @@ -93,6 +93,49 @@ TEST_CASE("flush", "[async]") { REQUIRE(test_sink->flush_counter() == 1); } +TEST_CASE("multithread flush", "[async]") { + auto test_sink = std::make_shared(); + size_t queue_size = 2; + size_t messages = 10; + size_t n_threads = 10; + size_t flush_count = 1024; + std::mutex mtx; + std::vector errmsgs; + { + auto tp = std::make_shared(queue_size, 1); + auto logger = std::make_shared( + "as", test_sink, tp, spdlog::async_overflow_policy::discard_new); + + logger->set_error_handler([&](const std::string &) { + std::unique_lock lock(mtx); + errmsgs.push_back("Broken promise"); + }); + + for (size_t i = 0; i < messages; i++) { + logger->info("Hello message #{}", i); + } + + std::vector threads; + for (size_t i = 0; i < n_threads; i++) { + threads.emplace_back([logger, flush_count] { + for (size_t j = 0; j < flush_count; j++) { + // flush does not throw exception even if failed. + // Instead, the error handler is invoked. + logger->flush(); + } + }); + } + + for (auto &t : threads) { + t.join(); + } + } + REQUIRE(test_sink->flush_counter() >= 1); + REQUIRE(test_sink->flush_counter() + errmsgs.size() == n_threads * flush_count); + REQUIRE(errmsgs.size() >= 1); + REQUIRE(errmsgs[0] == "Broken promise"); +} + TEST_CASE("async periodic flush", "[async]") { auto logger = spdlog::create_async("as"); auto test_sink = std::static_pointer_cast(logger->sinks()[0]);