From e9d2fb5e7e4a98133a3e55d3058d725bf45e30fa Mon Sep 17 00:00:00 2001 From: Miha Lunar Date: Sun, 8 Oct 2017 01:59:47 +0200 Subject: [PATCH] Separated out MQTT boilerplate, updated paho, added retry logic --- CMakeLists.txt | 4 +- src/msgqueue.cpp | 200 +++++++++++++++++++++++++++++++ src/msgqueue.h | 58 +++++++++ src/screenmqtt.cpp | 262 +++++++++++++---------------------------- src/vendor/paho.mqtt.c | 2 +- 5 files changed, 347 insertions(+), 179 deletions(-) create mode 100644 src/msgqueue.cpp create mode 100644 src/msgqueue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a42afb..89250b6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,12 +7,14 @@ add_definitions( ) set(includes + src/ src/vendor/paho.mqtt.c/src/ src/vendor/yaml-cpp/include/ ) file(GLOB sources - src/*.cpp + src/screenmqtt.cpp + src/msgqueue.cpp ) add_subdirectory(src/vendor/paho.mqtt.c) diff --git a/src/msgqueue.cpp b/src/msgqueue.cpp new file mode 100644 index 0000000..06e7902 --- /dev/null +++ b/src/msgqueue.cpp @@ -0,0 +1,200 @@ +#include +#include +#include +#include +#include +#include + +#include "msgqueue.h" + +using namespace std::chrono; + +static const MessageQueueOptions DEFAULT_OPTIONS; + +static int logError(int code, std::string message) { + std::cerr << message << " (" << code << ")" << std::endl; + return code; +} + +int MessageQueue::connect() +{ + // Cleanup + if (client) { + MQTTClient_destroy(&client); + client = nullptr; + } + + if (!backoff_cur) backoff_cur = config.backoff_min; + + // Setup config + MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer; + if (!config.username.empty()) opts.username = config.username.c_str(); + if (!config.password.empty()) opts.password = config.password.c_str(); + opts.keepAliveInterval = config.keep_alive_interval; + opts.cleansession = true; + opts.connectTimeout = config.connectTimeout; + opts.retryInterval = -1; + + int error; + + // Create client + error = MQTTClient_create( + &client, + config.uri.c_str(), + config.client_id.c_str(), + MQTTCLIENT_PERSISTENCE_NONE, + nullptr + ); + if (error) { + return logError(error, "Unable to create MQTT client"); + } + + // Set callbacks + error = MQTTClient_setCallbacks( + client, + this, + mqtt_disconnected, + mqtt_received, + mqtt_delivered + ); + if (error) { + return logError(error, "Unable to set MQTT callbacks"); + } + + std::cout << "Connecting" << std::endl; + + // Connect to server + error = MQTTClient_connect(client, &opts); + if (error) { + logError(error, "Unable to connect to server"); + if (config.reconnect) { + return reconnect(); + } else { + return error; + } + } + + backoff_cur = config.backoff_min; + std::cout << std::endl << "Connected" << std::endl << std::endl; + + return 0; +} + +int MessageQueue::reconnect() +{ + std::cout << "Reconnecting in " << backoff_cur << " ms" << std::endl; + std::this_thread::sleep_for(milliseconds(backoff_cur)); + backoff_cur += std::rand() % backoff_cur; + backoff_cur = std::min(backoff_cur, config.backoff_max); + return connect(); +} + +int MessageQueue::disconnect() +{ + int error; + if (client) { + error = MQTTClient_disconnect(client, config.disconnectTimeout*1000); + if (error) { + return logError(error, "Unable to disconnect from server"); + } + } + MQTTClient_destroy(&client); + return 0; +} + +int MessageQueue::subscribe(std::string topic, const MessageQueueOptions *options) +{ + if (!options) options = &DEFAULT_OPTIONS; + + int error; + error = MQTTClient_subscribe(client, topic.c_str(), options->qos); + if (error) { + return logError(error, "Unable to subscribe"); + } + + std::cout << "Subscribed to " << topic << std::endl; + return 0; +} + +int MessageQueue::publish(std::string topic, std::string payload, const MessageQueueOptions *options, MessageQueueToken *token) +{ + if (!options) options = &DEFAULT_OPTIONS; + + int error; + error = MQTTClient_publish( + client, + topic.c_str(), + payload.size(), + &payload[0], + options->qos, + options->retained, + token + ); + if (error) { + return logError(error, "Unable to publish"); + } + return 0; +} + + +void MessageQueue::handleReceived(std::string topic, std::string payload) +{ + std::cout << "> " << topic << ": " << payload << std::endl; + if (onReceived) onReceived(topic, payload); +} + +void MessageQueue::handleDelivered(MessageQueueToken token) +{ + std::cout << "Delivered: " << token << std::endl; + if (onDelivered) onDelivered(token); +} + +void MessageQueue::handleDisconnected(std::string cause) +{ + std::string postfix = cause.empty() ? "" : ": " + cause; + std::cout << "Connection lost" << postfix << std::endl; + if (onDisconnected) onDisconnected(cause); + reconnect(); +} + + +int mqtt_received(void *context, char *topicName, int topicLen, MQTTClient_message *message) { + auto queue = static_cast(context); + + std::string topic, payload; + + if (topicLen) { + topic.resize(topicLen); + memcpy(&topic[0], topicName, topicLen); + } else { + topic = topicName; + } + + if (message->payloadlen) { + payload.resize(message->payloadlen); + memcpy(&payload[0], message->payload, message->payloadlen); + } else { + payload = ""; + } + + queue->handleReceived(topic, payload); + + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + return 1; +} + +void mqtt_delivered(void *context, MQTTClient_deliveryToken dt) +{ + auto queue = static_cast(context); + queue->handleDelivered(dt); +} + +void mqtt_disconnected(void *context, char *cause) +{ + auto queue = static_cast(context); + std::string message = ""; + if (cause) message = cause; + queue->handleDisconnected(message); +} + diff --git a/src/msgqueue.h b/src/msgqueue.h new file mode 100644 index 0000000..2942111 --- /dev/null +++ b/src/msgqueue.h @@ -0,0 +1,58 @@ +#pragma once + +#include + +#include "MQTTClient.h" + +typedef int MessageQueueToken; + +struct MessageQueueConfig { + std::string uri; + std::string client_id; + std::string username; + std::string password; + + bool reconnect = true; + + int keep_alive_interval = 15; + + int backoff_min = 50; + int backoff_max = 2000; + + int connectTimeout = 1; + int disconnectTimeout = 5; +}; + +struct MessageQueueOptions { + int qos = 0; + bool retained = false; +}; + +class MessageQueue { +protected: + MQTTClient client = nullptr; + + int backoff_cur = 0; + + void handleReceived(std::string topic, std::string payload); + void handleDelivered(MessageQueueToken token); + void handleDisconnected(std::string cause); + friend int mqtt_received(void *context, char *topicName, int topicLen, MQTTClient_message *message); + friend void mqtt_delivered(void *context, MQTTClient_deliveryToken dt); + friend void mqtt_disconnected(void *context, char *cause); + + int reconnect(); + +public: + MessageQueueConfig config; + + std::function onReceived; + std::function onDelivered; + std::function onDisconnected; + + int connect(); + int disconnect(); + + int subscribe(std::string topic, const MessageQueueOptions *options = nullptr); + int publish(std::string topic, std::string payload, const MessageQueueOptions *options = nullptr, MessageQueueToken *token = nullptr); +}; diff --git a/src/screenmqtt.cpp b/src/screenmqtt.cpp index 7d5be16..a24c408 100644 --- a/src/screenmqtt.cpp +++ b/src/screenmqtt.cpp @@ -23,26 +23,17 @@ #include "LowLevelMonitorConfigurationAPI.h" #include "HighLevelMonitorConfigurationAPI.h" -#include "MQTTClient.h" #include "yaml-cpp/yaml.h" -const std::string name = "screenmqtt 0.3.1"; +#include "msgqueue.h" +const std::string name = "screenmqtt 0.4.0"; +MessageQueueOptions global_options; -#define CHECK(call) \ - if (!call) { \ - std::cerr << "Call to " #call " failed" << std::endl; \ - return true; \ - } \ - -#define MCHECK(call) { \ - int rc = call; \ - if (rc != MQTTCLIENT_SUCCESS) { \ - std::cerr << "Call to " #call " failed (" << rc << ")" << std::endl; \ - } \ -} +MessageQueue queue; +std::string global_topic_prefix; std::mutex mutex; std::condition_variable cv; bool done = false; @@ -51,120 +42,6 @@ std::string topic_all_power_command; HWND window; -int mqtt_received(void *context, char *topicName, int topicLen, MQTTClient_message *message); -void mqtt_delivered(void *context, MQTTClient_deliveryToken dt); -void mqtt_disconnected(void *context, char *cause); -void receive_monitors(const std::string &topic, const std::string &payload); - -class MessageQueue { - - MQTTClient client = nullptr; - int qos = 0; - int timeout = 10000; - -public: - - std::string topic_prefix; - - ~MessageQueue() { - disconnect(); - } - - void connect( - const std::string uri, - const std::string client_id, - const std::string topic_prefix, - MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer - ) { - opts.cleansession = true; - - this->topic_prefix = topic_prefix; - - MCHECK(MQTTClient_create(&client, uri.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr)); - MCHECK(MQTTClient_setCallbacks(client, this, mqtt_disconnected, mqtt_received, mqtt_delivered)); - MCHECK(MQTTClient_connect(client, &opts)); - - printf("Topic prefix: %s\nClient: %s\nQoS: %d\n\nPress q or Ctrl-C to quit\n\n", - topic_prefix.c_str(), client_id.c_str(), qos); - } - - void disconnect() { - if (client) MCHECK(MQTTClient_disconnect(this->client, timeout)); - MQTTClient_destroy(&client); - } - - void subscribe(std::string topic) { - std::cout << "Subscribed to " << topic << std::endl; - MCHECK(MQTTClient_subscribe(client, topic.c_str(), qos)); - } - - void publish(std::string topic, std::string payload) { - MCHECK(MQTTClient_publish( - client, - topic.c_str(), - payload.size(), - &payload[0], - qos, - true, - nullptr - )); - } - - int received(char *topicName, int topicLen, MQTTClient_message *message) - { - std::string topic; - - if (topicLen > 0) { - topic.resize(topicLen); - memcpy(&topic[0], topicName, topicLen); - } - else { - topic = topicName; - } - - std::string payload; - payload.resize(message->payloadlen); - memcpy(&payload[0], message->payload, message->payloadlen); - printf("> %s: %s\n", topic.c_str(), payload.c_str()); - - receive_monitors(topic, payload); - - MQTTClient_freeMessage(&message); - MQTTClient_free(topicName); - return 1; - } - - void delivered(void *context, MQTTClient_deliveryToken dt) - { - fprintf(stderr, "Delivered: %d\n", dt); - } - - void disconnected(void *context, char *cause) - { - fprintf(stderr, "\nConnection lost: %s\n", cause); - } - -} queue; - -int mqtt_received(void *context, char *topicName, int topicLen, MQTTClient_message *message) { - auto queue = static_cast(context); - return queue->received(topicName, topicLen, message); -} - -void mqtt_delivered(void *context, MQTTClient_deliveryToken dt) -{ - auto queue = static_cast(context); - fprintf(stderr, "Delivered: %d\n", dt); -} - -void mqtt_disconnected(void *context, char *cause) -{ - auto queue = static_cast(context); - fprintf(stderr, "\nConnection lost: %s\n", cause); -} - - - void receive_monitors_all(const std::string &payload) { if (payload == "ON") { // -1 doesn't seem to work, so send mouse move event instead @@ -182,7 +59,7 @@ void receive_monitors_all(const std::string &payload) { } void publish_monitors_all(const std::string &payload) { - queue.publish(topic_all_power_state, payload); + queue.publish(topic_all_power_state, payload, &global_options); } @@ -476,7 +353,20 @@ struct Screen { } void printError(const char *msg) { - std::wcerr << index << ": " << desc << ": " << msg << std::endl; + DWORD lastError = GetLastError(); + wchar_t lastErrorMsg[1024]; + bool lastErrorValid = FormatMessageW( + FORMAT_MESSAGE_FROM_SYSTEM, + NULL, + lastError, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + lastErrorMsg, + sizeof(lastErrorMsg), + NULL + ) > 0; + std::wcerr << index << ": " << desc << ": " << msg; + if (lastErrorValid) std::wcerr << ": " << lastErrorMsg; + std::wcerr << " (" << GetLastError() << ")" << std::endl; } void updateInfo() { @@ -489,7 +379,7 @@ struct Screen { &cap_flags, &cap_color_temp )) { - printError("error updating flags"); + printError("Error updating flags"); return; } @@ -506,7 +396,7 @@ struct Screen { printf(" length\n"); DWORD cap_len; if (!GetCapabilitiesStringLength(monitor, &cap_len)) { - printError("error getting length"); + printError("Error getting length"); return; } std::string cap; @@ -518,7 +408,7 @@ struct Screen { (LPSTR)&cap[0], cap_len )) { - printError("request failed"); + printError("Request failed"); return; } @@ -567,7 +457,7 @@ struct Screen { { std::string state = state_override.empty() ? param.get_state(monitor) : state_override; printf("< %s: %s\n", param.topic_state.c_str(), state.c_str()); - queue->publish(param.topic_state, state); + queue->publish(param.topic_state, state, &global_options); } void publishAll() @@ -599,7 +489,7 @@ struct Screen { if (queue == nullptr) return; this->queue = queue; - this->topic_prefix = queue->topic_prefix + model + "/"; + this->topic_prefix = global_topic_prefix + model + "/"; for (auto ¶m : supported_params) { if (param.get_state) { @@ -662,7 +552,11 @@ BOOL CALLBACK EnumDisplayCallback( monitors.resize(physical_num); - CHECK(GetPhysicalMonitorsFromHMONITOR(hMonitor, physical_num, &monitors[0])); + bool success = GetPhysicalMonitorsFromHMONITOR(hMonitor, physical_num, &monitors[0]) == TRUE; + if (!success) { + std::cerr << "Unable to get physical monitors" << std::endl; + return true; + } for (DWORD i = 0; i < physical_num; i++) { PHYSICAL_MONITOR *mon = &monitors[i]; @@ -793,58 +687,72 @@ std::string get_default_topic_prefix() { return topic_prefix; } +template +bool initOptionSilent(const YAML::Node &yaml, std::string prop, std::string name, T* config) { + auto value = yaml[prop]; + if (value) { + *config = value.as(); + return true; + } + return false; +} + +template +bool initOption(const YAML::Node &yaml, std::string prop, std::string name, T* config) { + bool exists = initOptionSilent(yaml, prop, name, config); + if (exists) std::cout << name << ": " << *config << std::boolalpha << std::endl; + return exists; +} +template +bool initOptionHidden(const YAML::Node &yaml, std::string prop, std::string name, T* config) { + bool exists = initOptionSilent(yaml, prop, name, config); + if (exists) std::cout << name << ": *****" << std::endl; + return exists; +} + +void queueDisconnected(std::string cause) { + if (!queue.config.reconnect) exit(3); +} + +void queueReceived(std::string topic, std::string payload) { + receive_monitors(topic, payload); +} int main(int argc, char *argv[]) { std::cout << name << "\n\n"; - std::string uri; - std::string user; - std::string pass; - std::string topic_prefix = get_default_topic_prefix(); + global_options.retained = true; - try { - YAML::Node config = YAML::LoadFile("config.yaml"); + queue.config.client_id = name; + queue.onReceived = std::bind(&queueReceived, std::placeholders::_1, std::placeholders::_2); + queue.onDisconnected = std::bind(&queueDisconnected, std::placeholders::_1); - MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer; + global_topic_prefix = get_default_topic_prefix(); - auto broker = config["broker"]; - auto username = config["username"]; - auto password = config["password"]; - auto keepalive = config["keepalive"]; - auto prefix = config["prefix"]; + try { + YAML::Node yaml = YAML::LoadFile("config.yaml"); + auto broker = yaml["broker"]; if (!broker) throw "'broker' not found"; - uri = broker.as(); - std::cout << "Broker: " << uri << std::endl; - - if (username) { - user = username.as(); - opts.username = user.c_str(); - std::cout << "Username: " << opts.username << std::endl; - } - if (password) { - pass = password.as(); - opts.password = pass.c_str(); - std::cout << "Password: *****" << std::endl; - } - if (keepalive) { - opts.keepAliveInterval = keepalive.as(); - std::cout << "Keep alive: " << opts.keepAliveInterval << std::endl; + queue.config.uri = broker.as(); + std::cout << "Broker: " << queue.config.uri << std::endl; + + initOption(yaml, "username", "Username", &queue.config.username); + initOptionHidden(yaml, "password", "Password", &queue.config.password); + initOption(yaml, "keepalive", "Keep alive", &queue.config.keep_alive_interval); + initOption(yaml, "prefix", "Topic prefix", &global_topic_prefix); + initOption(yaml, "reconnect", "Reconnect", &queue.config.reconnect); + initOption(yaml, "connect timeout", "Connect timeout", &queue.config.connectTimeout); + initOption(yaml, "disconnect timeout", "Disconnect timeout", &queue.config.disconnectTimeout); + initOption(yaml, "backoff min", "Backoff minimum", &queue.config.backoff_min); + initOption(yaml, "backoff max", "Backoff maximum", &queue.config.backoff_max); + + int error = queue.connect(); + if (error) { + return 3; } - if (prefix) { - topic_prefix = prefix.as(); - } - - std::cout << std::endl; - - queue.connect( - uri, - name, - topic_prefix, - opts - ); } catch (YAML::BadFile badFile) { std::cerr << "Error: config.yaml not found (" << badFile.msg << ")" << std::endl; @@ -854,7 +762,7 @@ int main(int argc, char *argv[]) { return 2; } - auto topic_prefix_all = queue.topic_prefix + "all/"; + std::string topic_prefix_all = global_topic_prefix + "all/"; topic_all_power_state = topic_prefix_all + "power/state"; topic_all_power_command = topic_prefix_all + "power/command"; std::cout << "Publishing to " << topic_all_power_state << std::endl; diff --git a/src/vendor/paho.mqtt.c b/src/vendor/paho.mqtt.c index 5571f9b..94cc8f4 160000 --- a/src/vendor/paho.mqtt.c +++ b/src/vendor/paho.mqtt.c @@ -1 +1 @@ -Subproject commit 5571f9b1cdfba5e628be44e4e84962ede6497e84 +Subproject commit 94cc8f431c7746d23dbd015c643e1f9c5cbbbef3