Skip to content

Commit

Permalink
Merge pull request #195 from rgorosito/fix-loss-subscriptions
Browse files Browse the repository at this point in the history
Fix: Subscriptions can't be read if arrive when waiting for response of ping or publish
  • Loading branch information
brentru authored Jun 22, 2021
2 parents e524b5f + ab78b29 commit 3a319ab
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
43 changes: 36 additions & 7 deletions Adafruit_MQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,13 @@ bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen,

// If QOS level is high enough verify the response packet.
if (qos > 0) {
len = readFullPacket(buffer, MAXBUFFERSIZE, PUBLISH_TIMEOUT_MS);
len = processPacketsUntil(buffer, MQTT_CTRL_PUBACK, PUBLISH_TIMEOUT_MS);

DEBUG_PRINT(F("Publish QOS1+ reply:\t"));
DEBUG_PRINTBUFFER(buffer, len);
if (len != 4)
return false;
if ((buffer[0] >> 4) != MQTT_CTRL_PUBACK)
return false;

uint16_t packnum = buffer[2];
packnum <<= 8;
packnum |= buffer[3];
Expand Down Expand Up @@ -508,10 +508,32 @@ void Adafruit_MQTT::processPackets(int16_t timeout) {
}
}
Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
// Check if data is available to read.
uint16_t len =
readFullPacket(buffer, MAXBUFFERSIZE, timeout); // return one full packet
return handleSubscriptionPacket(len);

// Sync or Async subscriber with message
Adafruit_MQTT_Subscribe *s = 0;

// Check if are unread messages
for (uint8_t i = 0; i < MAXSUBSCRIPTIONS; i++) {
if (subscriptions[i] && subscriptions[i]->new_message) {
s = subscriptions[i];
break;
}
}

// not unread message
if (!s) {
// Check if data is available to read.
uint16_t len = readFullPacket(buffer, MAXBUFFERSIZE,
timeout); // return one full packet
s = handleSubscriptionPacket(len);
}

// it there is a message, mark it as not pending
if (s) {
s->new_message = false;
}

return s;
}

Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
Expand Down Expand Up @@ -551,6 +573,12 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
topiclen) == 0) {
DEBUG_PRINT(F("Found sub #"));
DEBUG_PRINTLN(i);
if (subscriptions[i]->new_message) {
DEBUG_PRINTLN(F("Lost previous message"));
} else {
subscriptions[i]->new_message = true;
}

break;
}
}
Expand Down Expand Up @@ -910,6 +938,7 @@ Adafruit_MQTT_Subscribe::Adafruit_MQTT_Subscribe(Adafruit_MQTT *mqttserver,
callback_double = 0;
callback_io = 0;
io_mqtt = 0;
new_message = false;
}

void Adafruit_MQTT_Subscribe::setCallback(SubscribeCallbackUInt32Type cb) {
Expand Down
2 changes: 2 additions & 0 deletions Adafruit_MQTT.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ class Adafruit_MQTT_Subscribe {

AdafruitIO_MQTT *io_mqtt;

bool new_message;

private:
Adafruit_MQTT *mqtt;
};
Expand Down

0 comments on commit 3a319ab

Please sign in to comment.