Skip to content
This repository has been archived by the owner on Oct 4, 2021. It is now read-only.

Commit

Permalink
fix for mqtt resubscribe when mqtt drops-out. renamed topic to "syste…
Browse files Browse the repository at this point in the history
…m_cmd" - #445
  • Loading branch information
proddy committed Aug 4, 2020
1 parent c3d81d8 commit e8c3b07
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 12 deletions.
15 changes: 7 additions & 8 deletions src/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void Mqtt::add_command(const uint8_t device_type, const __FlashStringHelper * cm
// unless its a system MQTT command
std::string cmd_topic(40, '\0');
if (device_type == EMSdevice::DeviceType::SERVICEKEY) {
cmd_topic = "system"; // hard-coded system
cmd_topic = MQTT_SYSTEM_CMD; // hard-coded system
} else {
snprintf_P(&cmd_topic[0], 40, PSTR("%s_cmd"), EMSdevice::device_type_topic_name(device_type).c_str());
}
Expand Down Expand Up @@ -127,7 +127,7 @@ void Mqtt::resubscribe() {
}

for (const auto & mqtt_subfunction : mqtt_subfunctions_) {
queue_message(Operation::SUBSCRIBE, mqtt_subfunction.topic_, "", false, true); // no payload, no topic prefixing
queue_subscribe_message(mqtt_subfunction.topic_);
}
}

Expand Down Expand Up @@ -235,9 +235,9 @@ void Mqtt::incoming(char * topic, char * payload) {
bool Mqtt::call_command(const uint8_t device_type, const char * cmd, const char * value, const int8_t id) {
#ifdef EMSESP_DEBUG
if (id == -1) {
LOG_DEBUG(F("calling command %s, value %s, id is default"), cmd, value);
LOG_DEBUG(F("[DEBUG] Calling command %s, value %s, id is default"), cmd, value);
} else {
LOG_DEBUG(F("calling command %s, value %s, id is %d"), cmd, value, id);
LOG_DEBUG(F("[DEBUG] Calling command %s, value %s, id is %d"), cmd, value, id);
}
#endif

Expand Down Expand Up @@ -430,7 +430,7 @@ void Mqtt::on_connect() {
resubscribe(); // in case this is a reconnect, re-subscribe again to all MQTT topics

// add the system MQTT subscriptions, only if its a fresh start with no previous subscriptions
// these commands respond to the topic "system" and take a payload like {cmd:"", data:"", id:""}
// these commands respond to the topic "system_cmd" and take a payload like {cmd:"", data:"", id:""}
if (mqtt_subfunctions_.empty()) {
add_command(EMSdevice::DeviceType::SERVICEKEY, F("gpio"), System::mqtt_command_gpio);
add_command(EMSdevice::DeviceType::SERVICEKEY, F("send"), System::mqtt_command_send);
Expand All @@ -442,15 +442,14 @@ void Mqtt::on_connect() {
// add sub or pub task to the queue.
// a fully-qualified topic is created by prefixing the hostname, unless it's HA
// returns a pointer to the message created
std::shared_ptr<const MqttMessage>
Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain, bool no_prefix) {
std::shared_ptr<const MqttMessage> Mqtt::queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain) {
if (topic.empty()) {
return nullptr;
}

// take the topic and prefix the hostname, unless its for HA
std::shared_ptr<MqttMessage> message;
if ((strncmp(topic.c_str(), "homeassistant/", 13) == 0) || no_prefix) {
if ((strncmp(topic.c_str(), "homeassistant/", 13) == 0)) {
// leave topic as it is
message = std::make_shared<MqttMessage>(operation, topic, payload, retain);
} else {
Expand Down
3 changes: 1 addition & 2 deletions src/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ class Mqtt {
static constexpr uint32_t MQTT_PUBLISH_WAIT = 200; // delay between sending publishes, to account for large payloads
static constexpr uint8_t MQTT_PUBLISH_MAX_RETRY = 3; // max retries for giving up on publishing

static std::shared_ptr<const MqttMessage>
queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain, bool no_prefix = false);
static std::shared_ptr<const MqttMessage> queue_message(const uint8_t operation, const std::string & topic, const std::string & payload, const bool retain);
static std::shared_ptr<const MqttMessage> queue_publish_message(const std::string & topic, const std::string & payload, const bool retain);
static std::shared_ptr<const MqttMessage> queue_subscribe_message(const std::string & topic);

Expand Down
2 changes: 2 additions & 0 deletions src/system.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

using uuid::console::Shell;

#define MQTT_SYSTEM_CMD "system_cmd"

namespace emsesp {

class System {
Expand Down
7 changes: 5 additions & 2 deletions src/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace emsesp {
// used with the 'test' command, under su/admin
void Test::run_test(uuid::console::Shell & shell, const std::string & command) {
if (command == "default") {
run_test(shell, "gpio"); // add the default test case here
run_test(shell, "mqtt"); // add the default test case here
}

if (command.empty()) {
Expand Down Expand Up @@ -523,7 +523,7 @@ void Test::run_test(uuid::console::Shell & shell, const std::string & command) {
// test publish and adding to queue
EMSESP::txservice_.flush_tx_queue();
EMSESP::EMSESP::mqtt_.publish("boiler_cmd", "test me");
Mqtt::show_mqtt(shell);
Mqtt::show_mqtt(shell); // show queue

strcpy(topic, "ems-esp/boiler_cmd");
strcpy(payload, "12345");
Expand Down Expand Up @@ -601,6 +601,9 @@ void Test::run_test(uuid::console::Shell & shell, const std::string & command) {
shell.invoke_command("call wwmode");
shell.invoke_command("call mode auto 2");

Mqtt::resubscribe();
Mqtt::show_mqtt(shell); // show queue

shell.loop_all();
}

Expand Down

0 comments on commit e8c3b07

Please sign in to comment.