From 73c24a3d7cdc7c3337dc29380a462a8e7f5b1d8f Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 11 Dec 2024 16:14:36 +0100 Subject: [PATCH 1/3] Atomic: allow changing the default of std::memory_order_seq_cst --- lib/base/atomic.hpp | 116 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/lib/base/atomic.hpp b/lib/base/atomic.hpp index 855850336f..c64fc4d618 100644 --- a/lib/base/atomic.hpp +++ b/lib/base/atomic.hpp @@ -19,9 +19,11 @@ namespace icinga * initialization by std::atomic_init, see LWG issue 2334." * -- https://en.cppreference.com/w/cpp/atomic/atomic/atomic * + * Also, the second template parameter allows to specify the default memory order once for all operations. + * * @ingroup base */ -template +template class Atomic : public std::atomic { public: /** @@ -32,6 +34,118 @@ class Atomic : public std::atomic { inline Atomic(T desired) : std::atomic(desired) { } + + // The following methods have an argument with a default of std::memory_order_seq_cst hardcoded in the base class. + // Hence, we need to override them here to allow for a different default memory order. + + void store(T desired, std::memory_order mo = m) noexcept + { + std::atomic::store(desired, mo); + } + + T load(std::memory_order mo = m) const noexcept + { + return std::atomic::load(mo); + } + + T exchange(T desired, std::memory_order mo = m) noexcept + { + return std::atomic::exchange(desired, mo); + } + + bool compare_exchange_weak(T& expected, T desired, std::memory_order mo = m) noexcept + { + return std::atomic::compare_exchange_weak(expected, desired, mo); + } + + bool compare_exchange_strong(T& expected, T desired, std::memory_order mo = m) noexcept + { + return std::atomic::compare_exchange_strong(expected, desired, mo); + } + + T fetch_add(T delta, std::memory_order mo = m) noexcept + { + return std::atomic::fetch_add(delta, mo); + } + + T fetch_sub(T delta, std::memory_order mo = m) noexcept + { + return std::atomic::fetch_sub(delta, mo); + } + + T fetch_and(T mask, std::memory_order mo = m) noexcept + { + return std::atomic::fetch_and(mask, mo); + } + + T fetch_or(T mask, std::memory_order mo = m) noexcept + { + return std::atomic::fetch_or(mask, mo); + } + + T fetch_xor(T mask, std::memory_order mo = m) noexcept + { + return std::atomic::fetch_xor(mask, mo); + } + + // The following operators call non-virtual methods we have overridden above. + // Hence, we need to override them here as well to allow for a different default memory order. + + T operator=(T desired) noexcept + { + store(desired); + return desired; + } + + operator T() const noexcept + { + return load(); + } + + T operator+=(T delta) noexcept + { + return fetch_add(delta) + delta; + } + + T operator-=(T delta) noexcept + { + return fetch_sub(delta) - delta; + } + + T operator++() noexcept + { + return *this += 1; + } + + T operator++(int) noexcept + { + return fetch_add(1); + } + + T operator--() noexcept + { + return *this -= 1; + } + + T operator--(int) noexcept + { + return fetch_sub(1); + } + + T operator&=(T mask) noexcept + { + return fetch_and(mask) & mask; + } + + T operator|=(T mask) noexcept + { + return fetch_or(mask) | mask; + } + + T operator^=(T mask) noexcept + { + return fetch_xor(mask) ^ mask; + } }; /** From 8228f8cc820caa999996b0e887a895c9bdda3057 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 11 Dec 2024 16:57:50 +0100 Subject: [PATCH 2/3] Count still queued messages per JsonRpcConnection --- lib/remote/jsonrpcconnection.cpp | 3 +++ lib/remote/jsonrpcconnection.hpp | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index c70b82d94e..998b846ae8 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -154,6 +154,7 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) } size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc); + m_PendingOutgoingMessages--; if (m_Endpoint) { m_Endpoint->AddMessageSent(bytesSent); @@ -230,6 +231,7 @@ void JsonRpcConnection::SendRawMessage(const String& message) m_OutgoingMessagesQueue.emplace_back(message); m_OutgoingMessagesQueued.Set(); + m_PendingOutgoingMessages++; }); } @@ -241,6 +243,7 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) m_OutgoingMessagesQueue.emplace_back(JsonEncode(message)); m_OutgoingMessagesQueued.Set(); + m_PendingOutgoingMessages++; } void JsonRpcConnection::Disconnect() diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index ef83dce1b4..5836e120be 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -54,6 +54,11 @@ class JsonRpcConnection final : public Object Shared::Ptr GetStream() const; ConnectionRole GetRole() const; + auto GetPendingOutgoingMessages() const noexcept + { + return m_PendingOutgoingMessages.load(); + } + void Disconnect(); void SendMessage(const Dictionary::Ptr& request); @@ -76,6 +81,7 @@ class JsonRpcConnection final : public Object boost::asio::io_context::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; AsioConditionVariable m_OutgoingMessagesQueued; + Atomic m_PendingOutgoingMessages {0}; AsioConditionVariable m_WriterDone; Atomic m_ShuttingDown; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; From f722f8bfbb62933c82cbc28333b9c012b5be8395 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 11 Dec 2024 17:21:08 +0100 Subject: [PATCH 3/3] Introduce Endpoint#pending_outgoing_messages for the API --- lib/methods/clusterzonechecktask.cpp | 3 +++ lib/methods/icingachecktask.cpp | 3 +++ lib/remote/endpoint.cpp | 12 ++++++++++++ lib/remote/endpoint.hpp | 1 + lib/remote/endpoint.ti | 4 ++++ 5 files changed, 23 insertions(+) diff --git a/lib/methods/clusterzonechecktask.cpp b/lib/methods/clusterzonechecktask.cpp index fd52534c30..404f50dafe 100644 --- a/lib/methods/clusterzonechecktask.cpp +++ b/lib/methods/clusterzonechecktask.cpp @@ -133,6 +133,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che double lastMessageSent = 0; double lastMessageReceived = 0; + uint_fast64_t pendingOutgoingMessages = 0; double messagesSentPerSecond = 0; double messagesReceivedPerSecond = 0; double bytesSentPerSecond = 0; @@ -156,6 +157,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che if (endpoint->GetLastMessageReceived() > lastMessageReceived) lastMessageReceived = endpoint->GetLastMessageReceived(); + pendingOutgoingMessages += endpoint->GetPendingOutgoingMessages(); messagesSentPerSecond += endpoint->GetMessagesSentPerSecond(); messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond(); bytesSentPerSecond += endpoint->GetBytesSentPerSecond(); @@ -207,6 +209,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che new PerfdataValue("slave_lag", zoneLag, false, "s", lagWarning, lagCritical), new PerfdataValue("last_messages_sent", lastMessageSent), new PerfdataValue("last_messages_received", lastMessageReceived), + new PerfdataValue("sum_pending_outgoing_messages", pendingOutgoingMessages), new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond), new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond), new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond), diff --git a/lib/methods/icingachecktask.cpp b/lib/methods/icingachecktask.cpp index d3eae1f33f..5e66fdd310 100644 --- a/lib/methods/icingachecktask.cpp +++ b/lib/methods/icingachecktask.cpp @@ -123,6 +123,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes double lastMessageSent = 0; double lastMessageReceived = 0; + uint_fast64_t pendingOutgoingMessages = 0; double messagesSentPerSecond = 0; double messagesReceivedPerSecond = 0; double bytesSentPerSecond = 0; @@ -136,6 +137,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes if (endpoint->GetLastMessageReceived() > lastMessageReceived) lastMessageReceived = endpoint->GetLastMessageReceived(); + pendingOutgoingMessages += endpoint->GetPendingOutgoingMessages(); messagesSentPerSecond += endpoint->GetMessagesSentPerSecond(); messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond(); bytesSentPerSecond += endpoint->GetBytesSentPerSecond(); @@ -144,6 +146,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes perfdata->Add(new PerfdataValue("last_messages_sent", lastMessageSent)); perfdata->Add(new PerfdataValue("last_messages_received", lastMessageReceived)); + perfdata->Add(new PerfdataValue("sum_pending_outgoing_messages", pendingOutgoingMessages)); perfdata->Add(new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond)); perfdata->Add(new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond)); perfdata->Add(new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond)); diff --git a/lib/remote/endpoint.cpp b/lib/remote/endpoint.cpp index e534fc1784..fe6e099577 100644 --- a/lib/remote/endpoint.cpp +++ b/lib/remote/endpoint.cpp @@ -101,6 +101,18 @@ Endpoint::Ptr Endpoint::GetLocalEndpoint() return listener->GetLocalEndpoint(); } +uint_fast64_t Endpoint::GetPendingOutgoingMessages() const +{ + uint_fast64_t pending = 0; + std::unique_lock lock (m_ClientsLock); + + for (auto& client : m_Clients) { + pending += client->GetPendingOutgoingMessages(); + } + + return pending; +} + void Endpoint::AddMessageSent(int bytes) { double time = Utility::GetTime(); diff --git a/lib/remote/endpoint.hpp b/lib/remote/endpoint.hpp index d641c2c6b8..942ea42898 100644 --- a/lib/remote/endpoint.hpp +++ b/lib/remote/endpoint.hpp @@ -39,6 +39,7 @@ class Endpoint final : public ObjectImpl static Endpoint::Ptr GetLocalEndpoint(); void SetCachedZone(const intrusive_ptr& zone); + uint_fast64_t GetPendingOutgoingMessages() const override; void AddMessageSent(int bytes); void AddMessageReceived(int bytes); diff --git a/lib/remote/endpoint.ti b/lib/remote/endpoint.ti index 78551ecf0d..1a7c19dee0 100644 --- a/lib/remote/endpoint.ti +++ b/lib/remote/endpoint.ti @@ -39,6 +39,10 @@ class Endpoint : ConfigObject Timestamp last_message_sent; Timestamp last_message_received; + [no_user_modify, no_storage] uint_fast64_t pending_outgoing_messages { + get; + }; + [no_user_modify, no_storage] double messages_sent_per_second { get; };