Skip to content

Commit

Permalink
- Give monitors an optional parameter that can be used as a handle wh…
Browse files Browse the repository at this point in the history
…en making extensions

- Update the pub/sub monitor to use this functionality.
  • Loading branch information
nfogh committed Sep 27, 2023
1 parent 05cb1c4 commit ea8baf5
Show file tree
Hide file tree
Showing 22 changed files with 350 additions and 221 deletions.
40 changes: 25 additions & 15 deletions examples/pubsub/FloatMessage.hpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
#pragma once
#include "PubSub/IMessage.hpp"
namespace PubSubMonitoring {
struct FloatMessage : public IMessage {
explicit FloatMessage(float f = 0.0f, std::string name = "FloatMessage") : mData{f}, mName(std::move(name)) {}
const void *GetData() const override { return &mData; }
void *GetData() override { return &mData; }
int GetSize() const override { return sizeof(mData); }
const std::string& GetName() const override { return mName; }
struct FloatMessage : public IMessage
{
explicit FloatMessage(std::string name = "FloatMessage") : mName(std::move(name)) {}
FloatMessage(float f = 0.0f, std::string name = "FloatMessage") : mData{ f }, mName(std::move(name)) {}
const void *GetData() const override { return &mData; }
void *GetData() override { return &mData; }
int GetSize() const override { return sizeof(mData); }
const std::string &GetName() const override { return mName; }

float GetF() const { return mData.f; }
void SetF(float f) { mData.f = f; }
float GetF() const { return mData.f; }
void SetF(float f) { mData.f = f; }

private:
struct Data {
float f = 0.0f;
} mData;
bool operator>=(const FloatMessage &other) const { return mData.f >= other.mData.f; }
bool operator<=(const FloatMessage &other) const { return mData.f <= other.mData.f; }
bool operator>(const FloatMessage &other) const { return mData.f > other.mData.f; }
bool operator<(const FloatMessage &other) const { return mData.f < other.mData.f; }
bool operator==(const FloatMessage &other) const { return mData.f == other.mData.f; }
bool operator!=(const FloatMessage &other) const { return mData.f != other.mData.f; }

const std::string mName = "FloatMessage";
};
}
private:
struct Data
{
float f = 0.0f;
} mData;

const std::string mName = "FloatMessage";
};
}// namespace PubSubMonitoring
39 changes: 21 additions & 18 deletions examples/pubsub/PubSub/PubSub.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once
#include "IPubSub.hpp"
#include <cstring>
#include <cassert>
#include <cstring>

#include <iostream>

Expand All @@ -10,40 +10,43 @@ struct PubSub : public IPubSub
{
void Publish(const IMessage &message) override
{
for (auto &subscription : mSubscriptions)
{
if (subscription.message->GetName() == message.GetName())
{
mMessages[message.GetName()].resize(static_cast<size_t>(message.GetSize()));
::memcpy(mMessages[message.GetName()].data(), message.GetData(), static_cast<size_t>(message.GetSize()));
for (auto &subscription : mSubscriptions) {
if (subscription.message->GetName() == message.GetName()) {
assert(subscription.message->GetSize() == message.GetSize());
::memcpy(subscription.message->GetData(), message.GetData(), static_cast<size_t>(subscription.message->GetSize()));
::memcpy(
subscription.message->GetData(), message.GetData(), static_cast<size_t>(subscription.message->GetSize()));
subscription.handler();
}
}
};

void Subscribe(IMessage &message, std::function<void()> handler) override
{
mSubscriptions.push_back({&message, handler});
auto messageIt = mMessages.find(message.GetName());
if (messageIt != mMessages.end()) {
::memcpy(message.GetData(), messageIt->second.data(), static_cast<size_t>(message.GetSize()));
}
mSubscriptions.push_back({ &message, handler });
};

void Unsubscribe(const IMessage &message) override
{
auto it = std::find_if(mSubscriptions.begin(), mSubscriptions.end(), [&](const auto &subscription) {
return subscription.message == &message;
});
if (it != mSubscriptions.end())
{
mSubscriptions.erase(it);
}
if (it != mSubscriptions.end()) { mSubscriptions.erase(it); }
};

private:
struct Subscription
{
IMessage* message;
std::function<void()> handler;
};
struct Subscription
{
IMessage *message;
std::function<void()> handler;
};

std::vector<Subscription> mSubscriptions;
std::vector<Subscription> mSubscriptions;
std::unordered_map<std::string, std::vector<unsigned char>> mMessages;
};
}
}// namespace PubSubMonitoring
148 changes: 81 additions & 67 deletions examples/pubsub/PubSubMonitor.hpp
Original file line number Diff line number Diff line change
@@ -1,91 +1,105 @@
#pragma once

#include "PubSub/IPubSub.hpp"
#include "PubSub/IMessage.hpp"
#include "PubSub/IPubSub.hpp"
#include <functional>
#include <monitoring/monitor.hpp>
#include <tuple>

namespace PubSubMonitoring {
namespace intern
namespace intern {
template<typename TupleT, typename Fn> void for_each_tuple(Fn &&fn, TupleT &&tp)
{
template <typename TupleT, typename Fn>
void for_each_tuple(Fn&& fn, TupleT&& tp) {
std::apply
(
[&fn](auto&& ...args)
{
(fn(std::forward<decltype(args)>(args)), ...);
}, std::forward<TupleT>(tp)
);
}
std::apply([&fn](auto &&...args) { (fn(std::forward<decltype(args)>(args)), ...); }, std::forward<TupleT>(tp));
}
}// namespace intern

template<typename MessagesTpl, typename MonitorT>
struct PubSubMonitor
{
PubSubMonitor(IPubSub &pubsub, MessagesTpl messages, MonitorT monitor) : mPubSub(pubsub), mMessages(std::move(messages)), mMonitor(std::move(monitor))
{
intern::for_each_tuple([&](auto &arg) { mPubSub.Subscribe(arg, [&] { Check(); }); }, mMessages);
}

~PubSubMonitor()
{
intern::for_each_tuple([&](auto& arg) { mPubSub.Unsubscribe(arg); }, mMessages);
}

private:
void Check()
{
std::apply([&](auto&&... args) { mMonitor(std::forward<decltype(args)>(args)...); }, mMessages);
}

IPubSub& mPubSub;
MessagesTpl mMessages;
MonitorT mMonitor;
};

template<typename MessagesT, typename MonitorT>
struct MonitorRequirementsHandlers
struct SubscribeAndCheckFunc
{
SubscribeAndCheckFunc(IPubSub &pubsub) : mPubSub(pubsub) {}

void SetCheckFunc(std::function<void(void)> checkFunc) { mCheckFunc = std::move(checkFunc); }

template<typename Message> void operator()(Message &&message)
{
MonitorRequirementsHandlers(MessagesT messages, MonitorT monitor) : mMessages(std::move(messages)), mMonitor(std::move(monitor)) {}
mPubSub.Subscribe(message, [&] { mCheckFunc(); });
}

[[nodiscard]] auto With(IPubSub& pubsub)
{
return PubSubMonitor(pubsub, mMessages, mMonitor);
}
private:
IPubSub &mPubSub;
std::function<void(void)> mCheckFunc;
};

private:
MessagesT mMessages;
MonitorT mMonitor;
};
template<typename MessagesT, typename MonitorT> struct PubSubMonitor
{
PubSubMonitor(IPubSub &pubsub, MessagesT messages, MonitorT monitor)
: mPubSub(pubsub), mMessages(std::move(messages)), mMonitor(std::move(monitor))
{
mMonitor.GetOpt().SetCheckFunc([&] { Check(); });
intern::for_each_tuple([&](auto &arg) { mPubSub.Subscribe(arg, [&] { Check(); }); }, mMessages);
Check();
}

~PubSubMonitor()
{
intern::for_each_tuple([&](auto &arg) { mPubSub.Unsubscribe(arg); }, mMessages);
}

template<typename MessagesT, typename ConditionsT>
struct MonitorRequirements
private:
void Check()
{
MonitorRequirements(MessagesT messages, ConditionsT conditions) : mMessages(std::move(messages)), mConditions(std::move(conditions)) {}
std::apply([&](auto &&...args) { mMonitor(std::forward<decltype(args)>(args)...); }, mMessages);
}

IPubSub &mPubSub;
MessagesT mMessages;
MonitorT mMonitor;
};

template<typename... HandlersT>
[[nodiscard]] auto Handler(HandlersT&&... handlers)
{
return MonitorRequirementsHandlers(mMessages, Monitoring::Monitor(mConditions, std::forward<HandlersT>(handlers)...));
}
template<typename MessagesT, typename RequirementsT> struct MonitorWithRequirements
{
MonitorWithRequirements(MessagesT messages, IPubSub &pubsub, RequirementsT requirements)
: mMessages(std::move(messages)), mPubSub(pubsub), mRequirements(std::move(requirements))
{}

template<typename... HandlersT> [[nodiscard]] auto Handler(HandlersT &&...handlers)
{
auto mergedHandlers = [handlers = std::forward_as_tuple(handlers...)](bool result) {
std::apply([result](auto &&...handlers) { (..., handlers(result)); }, handlers);

Check warning on line 68 in examples/pubsub/PubSubMonitor.hpp

View workflow job for this annotation

GitHub Actions / Analyze (cpp, gcc-11, Ninja Multi-Config, Debug, ON)

declaration of ‘handlers’ shadows a lambda capture [-Wshadow]

Check warning on line 68 in examples/pubsub/PubSubMonitor.hpp

View workflow job for this annotation

GitHub Actions / Analyze (cpp, gcc-11, Ninja Multi-Config, Debug, ON)

declaration of ‘handlers’ shadows a lambda capture [-Wshadow]
};
auto subscribeAndCheckFunc = SubscribeAndCheckFunc(mPubSub);
return PubSubMonitor(mPubSub, mMessages, Monitoring::Monitor(mRequirements, mergedHandlers, subscribeAndCheckFunc));
}

private:
MessagesT mMessages;
ConditionsT mConditions;
};
private:
MessagesT mMessages;
IPubSub &mPubSub;
RequirementsT mRequirements;
};

template<typename MessagesT> struct MonitorWith
{
MonitorWith(MessagesT messages, IPubSub &pubsub) : mMessages(std::move(messages)), mPubSub(pubsub) {}

template<typename... MessageT>
struct Monitor
template<typename RequirementsT> [[nodiscard]] auto Require(RequirementsT &&requirements)
{
Monitor(MessageT... messages) : mMessages(messages...) {}
return MonitorWithRequirements(mMessages, mPubSub, std::forward<RequirementsT>(requirements));
}

private:
MessagesT mMessages;
IPubSub &mPubSub;
};


template<typename... MessageT> struct Monitor
{
Monitor(MessageT... messages) : mMessages(messages...) {}

template<typename ConditionsT>
[[nodiscard]] auto Require(ConditionsT conditions) { return MonitorRequirements(mMessages, conditions); }
[[nodiscard]] auto With(IPubSub &pubsub) { return MonitorWith(mMessages, pubsub); }

private:
std::tuple<MessageT...> mMessages;
};
private:
std::tuple<MessageT...> mMessages;
};

}// namespace PubSubMonitoring
34 changes: 25 additions & 9 deletions examples/pubsub/PubSubMonitorExample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "IntMessage.hpp"
#include "PubSub/PubSub.hpp"
#include "PubSubMonitor.hpp"
#include "minmax.hpp"

auto FloatField()
{
Expand All @@ -15,27 +16,42 @@ auto IntField()
return [](const PubSubMonitoring::IntMessage &msg) { return msg.GetI(); };
}

auto PrintResult()
{
return [](bool good) { std::cout << std::boolalpha << "Good? " << good << std::endl; };
}

constexpr auto Print = [](bool good) { std::cout << std::boolalpha << "Good? " << good << std::endl; };

int main()
{
using namespace Monitoring;
using namespace PubSubMonitoring;
PubSub ps;

std::cout << "Test min/max: " << std::endl;
auto monitor =
PubSubMonitoring::Monitor(FloatMessage("MySignal")).With(ps).Require(Min(FloatMessage("MyLimit"))).Handler(Print);

auto monitor = PubSubMonitoring::Monitor(FloatMessage(0.0f))
.Require(Min(FloatField(), 1.0f) && Max(FloatField(), 10.0f))
.Handler([](bool good) { std::cout << std::boolalpha << "Good? " << good << std::endl; })
.With(ps);
std::cout << "Setting new limit to 5.0\n";
ps.Publish(FloatMessage(5.0f, "MyLimit"));

for (auto i = 0; i <= 15; ++i) {
for (auto i = 0; i <= 10; ++i) {
std::cout << "Checking value of " << i << ": ";
ps.Publish(FloatMessage(static_cast<float>(i), "MySignal"));
}

std::cout << "Setting new limit to 8.0\n";
ps.Publish(FloatMessage(8.0f, "MyLimit"));

for (auto i = 0; i <= 10; ++i) {
std::cout << i << ": ";
ps.Publish(FloatMessage(static_cast<float>(i)));
ps.Publish(FloatMessage(static_cast<float>(i), "MySignal"));
}

std::cout << std::endl << "Test diff: " << std::endl;
// std::cout << std::endl << "Test diff: " << std::endl;

auto monitor2 = PubSubMonitoring::Monitor(IntMessage(0, "IntMessage1"), IntMessage(0, "IntMessage2"))
/*auto monitor2 = PubSubMonitoring::Monitor(IntMessage(0, "IntMessage1"), IntMessage(0, "IntMessage2"))
.Require(MaxDifference(IntField(), 10))
.Handler([](bool good) { std::cout << std::boolalpha << "Good? " << good << std::endl; })
.With(ps);
Expand All @@ -44,5 +60,5 @@ int main()
std::cout << i << "," << -i << " = " << abs(2 * i) << ": ";
ps.Publish(IntMessage(i, "IntMessage1"));
ps.Publish(IntMessage(-i, "IntMessage2"));
}
}*/
}
41 changes: 41 additions & 0 deletions examples/pubsub/minmax.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include "FloatMessage.hpp"
#include "IntMessage.hpp"
#include <monitoring/conditions/minmax.hpp>

namespace Monitoring {
template<typename T, typename GetterT, typename CompT> auto ParamComparison(GetterT getter, T otherVal, CompT comp)
{
auto lamb = [otherVal = std::move(otherVal), getter = std::move(getter), comp = std::move(comp), subscribed = false](
auto &subscriber, const auto &val) mutable {
if (!subscribed) {
subscriber(otherVal);
subscribed = true;
}
return comp(otherVal, getter(val));
};
return lamb;
}

template<typename Getter> auto Max(Getter getter, PubSubMonitoring::FloatMessage max)
{
return ParamComparison(std::move(getter), std::move(max), std::greater_equal<PubSubMonitoring::FloatMessage>());
}

template<> auto Max(PubSubMonitoring::FloatMessage max)
{
return Max(Intern::Identity<PubSubMonitoring::FloatMessage>(), std::move(max));
}

template<typename Getter> auto Min(Getter getter, PubSubMonitoring::FloatMessage min)
{
return ParamComparison(std::move(getter), std::move(min), std::less_equal<PubSubMonitoring::FloatMessage>());
}

template<> auto Min(PubSubMonitoring::FloatMessage min)
{
return Min(Intern::Identity<PubSubMonitoring::FloatMessage>(), std::move(min));
}

}// namespace Monitoring
Loading

0 comments on commit ea8baf5

Please sign in to comment.