Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support wildcard mqtt subscribe #215

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Conversation

james-ctc
Copy link

feat: support wildcard mqtt subscribe
Subscribe supports wildcard topics
There is a second callback option that provides the full topic of the received message.

e.g. topic "everest/evse_manager/#" will receive "everest/evse_manager/main/var"
with the new subscribe callback "everest/evse_manager/main/var" will be the topic parameter.

Subscribe supports wildcard topics
There is a second callback option that provides the full topic of the received
message.

Signed-off-by: James Chapman <[email protected]>
Copy link
Contributor

@a-w50 a-w50 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it could be useful 👍

include/utils/error/error_database_map.hpp Outdated Show resolved Hide resolved
public:
struct MessageDetails {
std::string topic;
std::shared_ptr<json> data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unique_ptr should probably be enough here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code uses shared_ptr. I can change it but I don't know the original reason for using shared_ptr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know - then lets hope it will be refactored in a following commit.

Copy link
Author

@james-ctc james-ctc Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done some testing and it needs to be a shared_ptr.
in MQTTAbstractionImpl::on_mqtt_message() the same JSON message could be added to multiple message handlers. There is a vector of handlers and the same JSON data is added to all that match the required topic.
With unique_ptr that JSON data would need to be copied.

@@ -68,7 +74,7 @@ class MessageHandler {
~MessageHandler();

/// \brief Adds a \p message to the message queue which will be delivered to the registered handlers
void add(std::shared_ptr<json> message);
void add(MessageDetails message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing by value here is alright, but then, for efficiency, the caller should also move it in

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recommendation comes from clang-tidy suggestion (for constructors)

MyClass(std::string s) : m_string{std::move(s)} {}

Where the caller doesn't use move.
An alternative is

MyClass(std::string &&s) : m_string{std::move(s)} {}

where move would be required

In this particular case add is called:

handler.add({topic, data});

handler.add(std::move({topic, data})); // this gives a compile error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that suggestion is correct. The caller doesn't need to move it in, but has the flexibility to do so (and in our case we should do so, because we probably won't need the topic and data anymore afterwards).
If you call the method with aggregate initialization, the {topic, data} is already a temporary and doesn't need to be moved. std::move({topic, data}) probably gives a compiler error, because it can't deduce the type, that should be aggregated?

using StringHandler = std::function<void(std::string)>;
using StringPairHandler = std::function<void(const std::string& topic, const std::string& data)>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but function signatures usually are not named (using topic and data here) - although it improves readability. For this, type aliases might be an option (using Topic = std::string).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it isn't that different to a function declaration where naming the parameters is usually done.

lib/everest.cpp Outdated
@@ -361,7 +361,8 @@ json Everest::call_cmd(const Requirement& req, const std::string& cmd_name, json
std::promise<json> res_promise;
std::future<json> res_future = res_promise.get_future();

Handler res_handler = [this, &res_promise, call_id, connection, cmd_name, return_type](json data) {
Handler res_handler = [this, &res_promise, call_id, connection, cmd_name, return_type](const std::string&,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto res_handler = .. could probably also work

lib/everest.cpp Outdated
@@ -640,15 +642,15 @@ void Everest::subscribe_global_all_errors(const error::ErrorCallback& callback,
return;
}

Handler raise_handler = [this, callback](json const& data) {
Handler raise_handler = [this, callback](const std::string&, json const& data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this is, but it looks like some design flaw that these clear/raise handlers get the topic passed, which they don't want/care about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handler was updated so the topic was added. For existing subscribe calls the topic isn't needed.
It is only needed here because the Handler is being used directly rather than via the mqtt.subscribe...() methods.

lib/everest.cpp Outdated
Comment on lines 749 to 774
UnsubscribeToken Everest::provide_external_mqtt_handler(const std::string& topic, const StringPairHandler& handler) {
BOOST_LOG_FUNCTION();

// check if external mqtt is enabled
if (!this->module_manifest.contains("enable_external_mqtt") &&
this->module_manifest["enable_external_mqtt"] == false) {
EVLOG_AND_THROW(EverestApiError(fmt::format("Module {} tries to provide an external MQTT handler, but didn't "
"set 'enable_external_mqtt' to 'true' in its manifest",
this->config.printable_identifier(this->module_id))));
}

std::string external_topic = fmt::format("{}{}", this->mqtt_external_prefix, topic);

// must be json and not std::string
Handler external_handler = [handler](const std::string& topic, const json& data) {
EVLOG_verbose << fmt::format("Incoming external mqtt data for topic '{}'...", topic);
std::string data_s = (data.is_string()) ? std::string(data) : data.dump();
handler(topic, data_s);
};

std::shared_ptr<TypedHandler> token =
std::make_shared<TypedHandler>(HandlerType::ExternalMQTT, std::make_shared<Handler>(external_handler));
this->mqtt_abstraction.register_handler(external_topic, token, QOS::QOS0);
return [this, topic, token]() { this->mqtt_abstraction.unregister_handler(topic, token); };
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost 100% copy and paste of the above function. This could be generalized - probably by using a template.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've simplified what I can

@@ -68,7 +68,7 @@ MessageHandler::MessageHandler() : running(true) {
this->message_queue.pop();
lock.unlock();

auto data = *message.get();
auto data = *message.data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expensive, as the whole json object will get copied. Either do const auto& or use data->at(..) in the following.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference now used

{
std::lock_guard<std::mutex> lock(this->handler_ctrl_mutex);
this->message_queue.push(message);
this->message_queue.emplace(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use std::move(message) here, otherwise the topic string will get copied.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use push and move.

Comment on lines 299 to 307
bool topic_matches = false;
if (is_everest_topic) {
// everest topics never contain wildcards, so a direct comparison is enough
if (topic == handler_topic) {
topic_matches = true;
}
} else {
topic_matches = MQTTAbstractionImpl::check_topic_matches(topic, handler_topic);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more expensive. Are the wildcards necessary within the everest prefix namespace?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see a usecase where you want to subscribe (or publish) to any topics within the everest prefix namespace (with or without wildcards) from an external mqtt client. We typically discourage this as well, especially since the internal communication has changed in the past and will in the future

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the intended use cases is to support wildcard on everest topics.
(e.g. to listen to session events from any EVSE manager).
The existing design doesn't make it easy to set an optional flag to cover this case other than perhaps using a global variable which I don't like as a solution.

Copy link
Author

@james-ctc james-ctc Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently there is a check for the topic matching mqtt_everest_prefix

        if (topic.find(mqtt_everest_prefix) == 0) {

i.e. if the topic starts everest which would also include everest_api then wildcards are disabled.

There is existing use of the external mqtt API to publish to everest_api in the Setup module.
Currently everest_api (everest_external ...) are prevented from supporting wildcard matches.

If the aim is to not support wildcard under everest/ then the check can be improved however that would prevent being able to subscribe to all session events from all EVSE managers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably discuss this offline

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently there is a check for the topic matching mqtt_everest_prefix

        if (topic.find(mqtt_everest_prefix) == 0) {

i.e. if the topic starts everest which would also include everest_api then wildcards are disabled.

There is existing use of the external mqtt API to publish to everest_api in the Setup module. Currently everest_api (everest_external ...) are prevented from supporting wildcard matches.

This should not be a problem at the moment because the check is for everest/ (the forward slash at the end is always added to the prefix here:

// always make sure the everest mqtt prefix ends with '/'
), so everest_api etc. would be able to support wildcard matching without this change

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah - not spotted the addition of / it looked like it was set from a YAML config file.
Anyway as for this PR - I'll put that check back - but it would be useful to have the ability to subscribe for all session events. It would be all too easy for a missing connection in the YAML config file and events would not be seen.

Since 128 managers can be configured that is 128 separate subscribes that could instead be handled by a single wildcard.

  evse_manager:
    interface: evse_manager
    min_connections: 1
    max_connections: 128

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could think about adding some form of wildcard / "get all requirements of a certain interface" functionality directly to the framework (that you could enable in the manifest/config kind of like the global errors). But I would try to avoid poking around in the mqtt communication between modules

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've raised the following improvement suggestion to capture this: #217

public:
struct MessageDetails {
std::string topic;
std::shared_ptr<json> data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know - then lets hope it will be refactored in a following commit.

@@ -68,7 +74,7 @@ class MessageHandler {
~MessageHandler();

/// \brief Adds a \p message to the message queue which will be delivered to the registered handlers
void add(std::shared_ptr<json> message);
void add(MessageDetails message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that suggestion is correct. The caller doesn't need to move it in, but has the flexibility to do so (and in our case we should do so, because we probably won't need the topic and data anymore afterwards).
If you call the method with aggregate initialization, the {topic, data} is already a temporary and doesn't need to be moved. std::move({topic, data}) probably gives a compiler error, because it can't deduce the type, that should be aggregated?

@@ -1,5 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest
#include "utils/message_queue.hpp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try to keep the includes categorized.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintended consequence of using an editor that tries to be helpful and adds include files for you (whether you want them or not).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove along with any other unneeded includes

Comment on lines 299 to 307
bool topic_matches = false;
if (is_everest_topic) {
// everest topics never contain wildcards, so a direct comparison is enough
if (topic == handler_topic) {
topic_matches = true;
}
} else {
topic_matches = MQTTAbstractionImpl::check_topic_matches(topic, handler_topic);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably discuss this offline

Also added const where appropriate
New fucntion to check whether external MQTT is configured

Signed-off-by: James Chapman <[email protected]>
@james-ctc
Copy link
Author

I think I've addressed all the comments. Ready for re-review.

EVLOG_verbose << fmt::format("Incoming external mqtt data for topic '{}'...", topic);
std::string data_s = (data.is_string()) ? std::string(data) : data.dump();
const std::string data_s = (data.is_string()) ? std::string(data) : data.dump();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already much more readable 👍 . Is there any reason why this function is allowed to have json data other than a string and the upper not?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on the original lambda - I'm not sure why there is a restriction to only support strings.
While testing the wildcard subscribe I found issues when the data portion was a JSON object and not just a string.
I can't see any reason to prevent the use of JSON with external mqtt, hence I provided support for it.

@@ -306,7 +306,7 @@ void MQTTAbstractionImpl::on_mqtt_message(std::shared_ptr<Message> message) {

if (topic_matches) {
found = true;
handler.add({topic, data});
handler.add(std::unique_ptr<ParsedMessage>(new ParsedMessage{topic, std::move(data)}));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@a-w50
This line can be called multiple times. It is within:
for (auto& [handler_topic, handler] : this->message_handlers).

The first time it is called data is moved to the handler.
The second and subsequent calls data is empty.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants