Skip to content

Commit

Permalink
mqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
mvladic committed Dec 17, 2019
1 parent b377e71 commit e37fba4
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 124 deletions.
243 changes: 120 additions & 123 deletions src/eez/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ static const char *CLIENT_ID = "BB3_STM32";
static const char *CLIENT_ID = "BB3_Simulator";
#endif

static const uint32_t RECONNECT_AFTER_ERROR_MS = 1000;

static const size_t MAX_PUB_TOPIC_LENGTH = 50;
static const char *PUB_TOPIC_OE = "%s/ch/%d/oe";
static const char *PUB_TOPIC_U_SET = "%s/ch/%d/uset";
Expand All @@ -74,6 +76,7 @@ static const char *SUB_TOPIC = "%s/ch/+/+"; // for example: ch/1/set/oe, ch/1/se
static const size_t MAX_PAYLOAD_LENGTH = 100;

ConnectionState g_connectionState = CONNECTION_STATE_IDLE;
uint32_t g_connectionStateChangedTickCount;

static struct {
int oe;
Expand All @@ -94,6 +97,8 @@ static struct {
static uint8_t g_lastChannelIndex = 0;
static uint8_t g_lastValueIndex = 0;

void setState(ConnectionState connectionState);

void onIncomingPublish(const char *topic, const char *payload) {
const char *p = topic + 3;

Expand Down Expand Up @@ -186,20 +191,26 @@ static char g_topic[MAX_TOPIC_LEN + 1];
static const size_t MAX_PAYLOAD_LEN = 128;
static char g_payload[MAX_PAYLOAD_LEN + 1];
static size_t g_payloadLen;
static mqtt_connection_status_t m_mqttConnectionStatus;

static void dnsFoundCallback(const char* hostname, const ip_addr_t *ipaddr, void *arg) {
if (ipaddr != NULL) {
g_ipaddr = *ipaddr;
g_connectionState = CONNECTION_STATE_DNS_FOUND;
setState(CONNECTION_STATE_DNS_FOUND);
} else {
g_connectionState = CONNECTION_STATE_ERROR;
setState(CONNECTION_STATE_ERROR);
DebugTrace("mqtt dns error: server not found\n");
}
}

static void connectCallback(mqtt_client_t *client, void *arg, mqtt_connection_status_t status) {
m_mqttConnectionStatus = status;
if (g_connectionState == CONNECTION_STATE_CONNECTING) {
if (status == MQTT_CONNECT_ACCEPTED) {
setState(CONNECTION_STATE_CONNECTED);
} else {
setState(CONNECTION_STATE_ERROR);
DebugTrace("mqtt connect error: %d\n", (int)status);
}
}
}

static void requestCallback(void *arg, err_t err) {
Expand Down Expand Up @@ -246,6 +257,9 @@ bool publish(char *topic, char *payload, bool retain) {
if (result != ERR_OK) {
if (result != ERR_MEM) {
DebugTrace("mqtt publish error: %d\n", (int)result);
if (result == ERR_CONN) {
reconnect();
}
}
return false;
}
Expand Down Expand Up @@ -290,28 +304,31 @@ const char *getSubTopic() {
return g_subTopic;
}

void setConnected(uint32_t tickCount) {
g_connectionState = CONNECTION_STATE_CONNECTED;

void setState(ConnectionState connectionState) {
if (connectionState == CONNECTION_STATE_CONNECTED) {
#if defined(EEZ_PLATFORM_STM32)
mqtt_set_inpub_callback(g_client, incomingPublishCallback, incomingDataCallback, nullptr);
mqtt_subscribe(g_client, getSubTopic(), 0, requestCallback, nullptr);
mqtt_set_inpub_callback(g_client, incomingPublishCallback, incomingDataCallback, nullptr);
mqtt_subscribe(g_client, getSubTopic(), 0, requestCallback, nullptr);
#endif

#if defined(EEZ_PLATFORM_SIMULATOR)
mqtt_subscribe(&g_client, getSubTopic(), 0);
mqtt_subscribe(&g_client, getSubTopic(), 0);
#endif

for(int i = 0; i < CH_NUM; i++) {
g_channelStates[i].oe = -1;
g_channelStates[i].uSet = NAN;
g_channelStates[i].iSet = NAN;
g_channelStates[i].uMon = NAN;
g_channelStates[i].iMon = NAN;
for(int i = 0; i < CH_NUM; i++) {
g_channelStates[i].oe = -1;
g_channelStates[i].uSet = NAN;
g_channelStates[i].iSet = NAN;
g_channelStates[i].uMon = NAN;
g_channelStates[i].iMon = NAN;
}

g_lastChannelIndex = 0;
g_lastValueIndex = 0;
}

g_lastChannelIndex = 0;
g_lastValueIndex = 0;
g_connectionState = connectionState;
g_connectionStateChangedTickCount = millis();
}

void tick(uint32_t tickCount) {
Expand All @@ -321,7 +338,7 @@ void tick(uint32_t tickCount) {

else if (g_connectionState == CONNECTION_STATE_IDLE) {
if (persist_conf::devConf.mqttEnabled) {
g_connectionState = CONNECTION_STATE_CONNECT;
setState(CONNECTION_STATE_CONNECT);
}
}

Expand All @@ -331,12 +348,12 @@ void tick(uint32_t tickCount) {
ip_addr_t ipaddr;
err_t err = dns_gethostbyname(persist_conf::devConf.mqttHost, &ipaddr, dnsFoundCallback, NULL);
if (err == ERR_OK) {
g_connectionState = CONNECTION_STATE_DNS_FOUND;
setState(CONNECTION_STATE_DNS_FOUND);
g_ipaddr = ipaddr;
} else if (err == ERR_INPROGRESS) {
g_connectionState = CONNECTION_STATE_DNS_IN_PROGRESS;
setState(CONNECTION_STATE_DNS_IN_PROGRESS);
} else {
g_connectionState = CONNECTION_STATE_ERROR;
setState(CONNECTION_STATE_ERROR);
DebugTrace("mqtt dns error: %d\n", (int)err);
}
#endif
Expand All @@ -357,13 +374,13 @@ void tick(uint32_t tickCount) {

/* check that we don't have any errors */
if (g_client.error == MQTT_OK) {
setConnected(tickCount);
setState(CONNECTION_STATE_CONNECTED);
} else {
g_connectionState = CONNECTION_STATE_ERROR;
setState(CONNECTION_STATE_ERROR);
DebugTrace("mqtt error: %s\n", mqtt_error_str(g_client.error));
}
} else {
g_connectionState = CONNECTION_STATE_ERROR;
setState(CONNECTION_STATE_ERROR);
DebugTrace("mqtt error: failed to open socket\n");
}
#endif
Expand All @@ -380,15 +397,19 @@ void tick(uint32_t tickCount) {
mqtt_disconnect(&g_client);
#endif

g_connectionState = CONNECTION_STATE_IDLE;
setState(CONNECTION_STATE_IDLE);

if (g_connectionState == CONNECTION_STATE_RECONNECT) {
g_connectionState = CONNECTION_STATE_CONNECT;
setState(CONNECTION_STATE_CONNECT);
}
}

else if (g_connectionState == CONNECTION_STATE_ERROR) {
// pass
if (persist_conf::devConf.mqttEnabled) {
if (millis() - g_connectionStateChangedTickCount > RECONNECT_AFTER_ERROR_MS) {
setState(CONNECTION_STATE_CONNECT);
}
}
}

#if defined(EEZ_PLATFORM_STM32)
Expand All @@ -404,125 +425,101 @@ void tick(uint32_t tickCount) {
clientInfo.keep_alive = 60; // seconds
clientInfo.will_topic = nullptr; // not used

m_mqttConnectionStatus = MQTT_CONNECT_ACCEPTED;

err_t result = mqtt_client_connect(g_client, &g_ipaddr, persist_conf::devConf.mqttPort, connectCallback, nullptr, &clientInfo);
if (result == ERR_OK) {
g_connectionState = CONNECTION_STATE_CONNECTING;
setState(CONNECTION_STATE_CONNECTING);
} else {
g_connectionState = CONNECTION_STATE_ERROR;
setState(CONNECTION_STATE_ERROR);
DebugTrace("mqtt connect error: %d\n", (int)result);
}
} else {
g_connectionState = CONNECTION_STATE_ERROR;
setState(CONNECTION_STATE_ERROR);
DebugTrace("mqtt error: failed to create a client\n");
}
return;
}
#endif

else if (g_connectionState == CONNECTION_STATE_CONNECTING) {
#if defined(EEZ_PLATFORM_STM32)
if (mqtt_client_is_connected(g_client)) {
setConnected(tickCount);
} else {
if (m_mqttConnectionStatus != MQTT_CONNECT_ACCEPTED) {
g_connectionState = CONNECTION_STATE_ERROR;
DebugTrace("mqtt connect error: %d\n", (int)m_mqttConnectionStatus);
}
}
#endif
}

else if (g_connectionState == CONNECTION_STATE_CONNECTED) {
#if defined(EEZ_PLATFORM_STM32)
if (mqtt_client_is_connected(g_client)) {
#endif
uint8_t lastChannelIndexAtStart = g_lastChannelIndex;
uint8_t lastValueIndexAtStart = g_lastValueIndex;

do {
uint8_t channelIndex = g_lastChannelIndex;
Channel &channel = Channel::get(channelIndex);

uint32_t period = (uint32_t)roundf(persist_conf::devConf.mqttPeriod * 1000000);

if (g_lastValueIndex == 0) {
int oe = channel.isOutputEnabled() ? 1 : 0;
if (oe != g_channelStates[channelIndex].oe) {
if (!publish(channelIndex, PUB_TOPIC_OE, oe)) {
break;
}
g_channelStates[channelIndex].oe = oe;
}
} else if (g_lastValueIndex == 1) {
float uSet = channel_dispatcher::getUSet(channel);
if ((isNaN(g_channelStates[channelIndex].uSet) || uSet != g_channelStates[channelIndex].uSet) && (tickCount - g_channelStates[channelIndex].g_uSetTick) >= period) {
if (!publish(channelIndex, PUB_TOPIC_U_SET, uSet)) {
break;
}
g_channelStates[channelIndex].uSet = uSet;
g_channelStates[channelIndex].g_uSetTick = tickCount;
}
} else if (g_lastValueIndex == 2) {
float iSet = channel_dispatcher::getISet(channel);
if ((isNaN(g_channelStates[channelIndex].iSet) || iSet != g_channelStates[channelIndex].iSet) && (tickCount - g_channelStates[channelIndex].g_iSetTick) >= period) {
if (!publish(channelIndex, PUB_TOPIC_I_SET, iSet)) {
break;
}
g_channelStates[channelIndex].iSet = iSet;
g_channelStates[channelIndex].g_iSetTick = tickCount;
}
} else if (g_lastValueIndex == 3) {
float uMon = channel_dispatcher::getUMonLast(channel);
if ((isNaN(g_channelStates[channelIndex].uMon) || uMon != g_channelStates[channelIndex].uMon) && (tickCount - g_channelStates[channelIndex].g_uMonTick) >= period) {
if (!publish(channelIndex, PUB_TOPIC_U_MON, uMon)) {
break;
}
g_channelStates[channelIndex].uMon = uMon;
g_channelStates[channelIndex].g_uMonTick = tickCount;
}
} else {
float iMon = channel_dispatcher::getIMonLast(channel);
if ((isNaN(g_channelStates[channelIndex].iMon) || iMon != g_channelStates[channelIndex].iMon) && (tickCount - g_channelStates[channelIndex].g_iMonTick) >= period) {
if (!publish(channelIndex, PUB_TOPIC_I_MON, iMon)) {
break;
}
g_channelStates[channelIndex].iMon = iMon;
g_channelStates[channelIndex].g_iMonTick = tickCount;
}
}

if (++g_lastValueIndex == 5) {
g_lastValueIndex = 0;
if (++g_lastChannelIndex == CH_NUM) {
g_lastChannelIndex = 0;
}
}
} while (g_lastChannelIndex != lastChannelIndexAtStart || g_lastValueIndex != lastValueIndexAtStart);
uint8_t lastChannelIndexAtStart = g_lastChannelIndex;
uint8_t lastValueIndexAtStart = g_lastValueIndex;

do {
uint8_t channelIndex = g_lastChannelIndex;
Channel &channel = Channel::get(channelIndex);

uint32_t period = (uint32_t)roundf(persist_conf::devConf.mqttPeriod * 1000000);

if (g_lastValueIndex == 0) {
int oe = channel.isOutputEnabled() ? 1 : 0;
if (oe != g_channelStates[channelIndex].oe) {
if (!publish(channelIndex, PUB_TOPIC_OE, oe)) {
break;
}
g_channelStates[channelIndex].oe = oe;
}
} else if (g_lastValueIndex == 1) {
float uSet = channel_dispatcher::getUSet(channel);
if ((isNaN(g_channelStates[channelIndex].uSet) || uSet != g_channelStates[channelIndex].uSet) && (tickCount - g_channelStates[channelIndex].g_uSetTick) >= period) {
if (!publish(channelIndex, PUB_TOPIC_U_SET, uSet)) {
break;
}
g_channelStates[channelIndex].uSet = uSet;
g_channelStates[channelIndex].g_uSetTick = tickCount;
}
} else if (g_lastValueIndex == 2) {
float iSet = channel_dispatcher::getISet(channel);
if ((isNaN(g_channelStates[channelIndex].iSet) || iSet != g_channelStates[channelIndex].iSet) && (tickCount - g_channelStates[channelIndex].g_iSetTick) >= period) {
if (!publish(channelIndex, PUB_TOPIC_I_SET, iSet)) {
break;
}
g_channelStates[channelIndex].iSet = iSet;
g_channelStates[channelIndex].g_iSetTick = tickCount;
}
} else if (g_lastValueIndex == 3) {
float uMon = channel_dispatcher::getUMonLast(channel);
if ((isNaN(g_channelStates[channelIndex].uMon) || uMon != g_channelStates[channelIndex].uMon) && (tickCount - g_channelStates[channelIndex].g_uMonTick) >= period) {
if (!publish(channelIndex, PUB_TOPIC_U_MON, uMon)) {
break;
}
g_channelStates[channelIndex].uMon = uMon;
g_channelStates[channelIndex].g_uMonTick = tickCount;
}
} else {
float iMon = channel_dispatcher::getIMonLast(channel);
if ((isNaN(g_channelStates[channelIndex].iMon) || iMon != g_channelStates[channelIndex].iMon) && (tickCount - g_channelStates[channelIndex].g_iMonTick) >= period) {
if (!publish(channelIndex, PUB_TOPIC_I_MON, iMon)) {
break;
}
g_channelStates[channelIndex].iMon = iMon;
g_channelStates[channelIndex].g_iMonTick = tickCount;
}
}

if (++g_lastValueIndex == 5) {
g_lastValueIndex = 0;
if (++g_lastChannelIndex == CH_NUM) {
g_lastChannelIndex = 0;
}
}
} while (g_lastChannelIndex != lastChannelIndexAtStart || g_lastValueIndex != lastValueIndexAtStart);

#if defined(EEZ_PLATFORM_SIMULATOR)
mqtt_sync(&g_client);
#endif

#if defined(EEZ_PLATFORM_STM32)
} else {
g_connectionState = CONNECTION_STATE_RECONNECT;
}
mqtt_sync(&g_client);
#endif
}
}

void reconnect() {
if (persist_conf::devConf.mqttEnabled) {
if (g_connectionState == CONNECTION_STATE_IDLE || g_connectionState != CONNECTION_STATE_ERROR) {
g_connectionState = CONNECTION_STATE_CONNECT;
if (g_connectionState == CONNECTION_STATE_IDLE || g_connectionState == CONNECTION_STATE_ERROR) {
setState(CONNECTION_STATE_CONNECT);
} else {
g_connectionState = CONNECTION_STATE_RECONNECT;
setState(CONNECTION_STATE_RECONNECT);
}
} else {
if (g_connectionState == CONNECTION_STATE_IDLE || g_connectionState != CONNECTION_STATE_ERROR) {
g_connectionState = CONNECTION_STATE_DISCONNECT;
if (g_connectionState != CONNECTION_STATE_IDLE && g_connectionState != CONNECTION_STATE_ERROR) {
setState(CONNECTION_STATE_DISCONNECT);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/eez/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ void tick(uint32_t tickCount);
void reconnect();

} // mqtt
} // eez
} // eez

0 comments on commit e37fba4

Please sign in to comment.