Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Endpoint#pending_outgoing_messages #10271

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 115 additions & 1 deletion lib/base/atomic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ namespace icinga
* initialization by std::atomic_init, see LWG issue 2334."
* -- https://en.cppreference.com/w/cpp/atomic/atomic/atomic
*
* Also, the second template parameter allows to specify the default memory order once for all operations.
*
* @ingroup base
*/
template<class T>
template<class T, std::memory_order m = std::memory_order_seq_cst>
class Atomic : public std::atomic<T> {
public:
/**
Expand All @@ -32,6 +34,118 @@ class Atomic : public std::atomic<T> {
inline Atomic(T desired) : std::atomic<T>(desired)
{
}

// The following methods have an argument with a default of std::memory_order_seq_cst hardcoded in the base class.
// Hence, we need to override them here to allow for a different default memory order.

void store(T desired, std::memory_order mo = m) noexcept
{
std::atomic<T>::store(desired, mo);
}

T load(std::memory_order mo = m) const noexcept
{
return std::atomic<T>::load(mo);
}

T exchange(T desired, std::memory_order mo = m) noexcept
{
return std::atomic<T>::exchange(desired, mo);
}

bool compare_exchange_weak(T& expected, T desired, std::memory_order mo = m) noexcept
{
return std::atomic<T>::compare_exchange_weak(expected, desired, mo);
}

bool compare_exchange_strong(T& expected, T desired, std::memory_order mo = m) noexcept
{
return std::atomic<T>::compare_exchange_strong(expected, desired, mo);
}

T fetch_add(T delta, std::memory_order mo = m) noexcept
{
return std::atomic<T>::fetch_add(delta, mo);
}

T fetch_sub(T delta, std::memory_order mo = m) noexcept
{
return std::atomic<T>::fetch_sub(delta, mo);
}

T fetch_and(T mask, std::memory_order mo = m) noexcept
{
return std::atomic<T>::fetch_and(mask, mo);
}

T fetch_or(T mask, std::memory_order mo = m) noexcept
{
return std::atomic<T>::fetch_or(mask, mo);
}

T fetch_xor(T mask, std::memory_order mo = m) noexcept
{
return std::atomic<T>::fetch_xor(mask, mo);
}

// The following operators call non-virtual methods we have overridden above.
// Hence, we need to override them here as well to allow for a different default memory order.

T operator=(T desired) noexcept
{
store(desired);
return desired;
}

operator T() const noexcept
{
return load();
}

T operator+=(T delta) noexcept
{
return fetch_add(delta) + delta;
}

T operator-=(T delta) noexcept
{
return fetch_sub(delta) - delta;
}

T operator++() noexcept
{
return *this += 1;
}

T operator++(int) noexcept
{
return fetch_add(1);
}

T operator--() noexcept
{
return *this -= 1;
}

T operator--(int) noexcept
{
return fetch_sub(1);
}

T operator&=(T mask) noexcept
{
return fetch_and(mask) & mask;
}

T operator|=(T mask) noexcept
{
return fetch_or(mask) | mask;
}

T operator^=(T mask) noexcept
{
return fetch_xor(mask) ^ mask;
}
};

/**
Expand Down
3 changes: 3 additions & 0 deletions lib/methods/clusterzonechecktask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che

double lastMessageSent = 0;
double lastMessageReceived = 0;
uint_fast64_t pendingOutgoingMessages = 0;
double messagesSentPerSecond = 0;
double messagesReceivedPerSecond = 0;
double bytesSentPerSecond = 0;
Expand All @@ -156,6 +157,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che
if (endpoint->GetLastMessageReceived() > lastMessageReceived)
lastMessageReceived = endpoint->GetLastMessageReceived();

pendingOutgoingMessages += endpoint->GetPendingOutgoingMessages();
messagesSentPerSecond += endpoint->GetMessagesSentPerSecond();
messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond();
bytesSentPerSecond += endpoint->GetBytesSentPerSecond();
Expand Down Expand Up @@ -207,6 +209,7 @@ void ClusterZoneCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const Che
new PerfdataValue("slave_lag", zoneLag, false, "s", lagWarning, lagCritical),
new PerfdataValue("last_messages_sent", lastMessageSent),
new PerfdataValue("last_messages_received", lastMessageReceived),
new PerfdataValue("sum_pending_outgoing_messages", pendingOutgoingMessages),
new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond),
new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond),
new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond),
Expand Down
3 changes: 3 additions & 0 deletions lib/methods/icingachecktask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes

double lastMessageSent = 0;
double lastMessageReceived = 0;
uint_fast64_t pendingOutgoingMessages = 0;
double messagesSentPerSecond = 0;
double messagesReceivedPerSecond = 0;
double bytesSentPerSecond = 0;
Expand All @@ -136,6 +137,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
if (endpoint->GetLastMessageReceived() > lastMessageReceived)
lastMessageReceived = endpoint->GetLastMessageReceived();

pendingOutgoingMessages += endpoint->GetPendingOutgoingMessages();
messagesSentPerSecond += endpoint->GetMessagesSentPerSecond();
messagesReceivedPerSecond += endpoint->GetMessagesReceivedPerSecond();
bytesSentPerSecond += endpoint->GetBytesSentPerSecond();
Expand All @@ -144,6 +146,7 @@ void IcingaCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes

perfdata->Add(new PerfdataValue("last_messages_sent", lastMessageSent));
perfdata->Add(new PerfdataValue("last_messages_received", lastMessageReceived));
perfdata->Add(new PerfdataValue("sum_pending_outgoing_messages", pendingOutgoingMessages));
perfdata->Add(new PerfdataValue("sum_messages_sent_per_second", messagesSentPerSecond));
perfdata->Add(new PerfdataValue("sum_messages_received_per_second", messagesReceivedPerSecond));
perfdata->Add(new PerfdataValue("sum_bytes_sent_per_second", bytesSentPerSecond));
Expand Down
12 changes: 12 additions & 0 deletions lib/remote/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ Endpoint::Ptr Endpoint::GetLocalEndpoint()
return listener->GetLocalEndpoint();
}

uint_fast64_t Endpoint::GetPendingOutgoingMessages() const
{
uint_fast64_t pending = 0;
std::unique_lock lock (m_ClientsLock);

for (auto& client : m_Clients) {
pending += client->GetPendingOutgoingMessages();
}

return pending;
}

void Endpoint::AddMessageSent(int bytes)
{
double time = Utility::GetTime();
Expand Down
1 change: 1 addition & 0 deletions lib/remote/endpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Endpoint final : public ObjectImpl<Endpoint>
static Endpoint::Ptr GetLocalEndpoint();

void SetCachedZone(const intrusive_ptr<Zone>& zone);
uint_fast64_t GetPendingOutgoingMessages() const override;

void AddMessageSent(int bytes);
void AddMessageReceived(int bytes);
Expand Down
4 changes: 4 additions & 0 deletions lib/remote/endpoint.ti
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class Endpoint : ConfigObject
Timestamp last_message_sent;
Timestamp last_message_received;

[no_user_modify, no_storage] uint_fast64_t pending_outgoing_messages {
get;
};

[no_user_modify, no_storage] double messages_sent_per_second {
get;
};
Expand Down
3 changes: 3 additions & 0 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
}

size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
m_PendingOutgoingMessages--;

if (m_Endpoint) {
m_Endpoint->AddMessageSent(bytesSent);
Expand Down Expand Up @@ -230,6 +231,7 @@ void JsonRpcConnection::SendRawMessage(const String& message)

m_OutgoingMessagesQueue.emplace_back(message);
m_OutgoingMessagesQueued.Set();
m_PendingOutgoingMessages++;
});
}

Expand All @@ -241,6 +243,7 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)

m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
m_OutgoingMessagesQueued.Set();
m_PendingOutgoingMessages++;
}

void JsonRpcConnection::Disconnect()
Expand Down
6 changes: 6 additions & 0 deletions lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class JsonRpcConnection final : public Object
Shared<AsioTlsStream>::Ptr GetStream() const;
ConnectionRole GetRole() const;

auto GetPendingOutgoingMessages() const noexcept
{
return m_PendingOutgoingMessages.load();
}

void Disconnect();

void SendMessage(const Dictionary::Ptr& request);
Expand All @@ -76,6 +81,7 @@ class JsonRpcConnection final : public Object
boost::asio::io_context::strand m_IoStrand;
std::vector<String> m_OutgoingMessagesQueue;
AsioConditionVariable m_OutgoingMessagesQueued;
Atomic<decltype(m_OutgoingMessagesQueue)::size_type, std::memory_order_relaxed> m_PendingOutgoingMessages {0};
AsioConditionVariable m_WriterDone;
Atomic<bool> m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
Expand Down
Loading