Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix subscription packet callbacks not executing within processPacketsUntil() #215

Merged
merged 6 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions Adafruit_MQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,32 @@ int8_t Adafruit_MQTT::connect(const char *user, const char *pass) {
return connect();
}

void Adafruit_MQTT::processSubscriptionPacket(Adafruit_MQTT_Subscribe *sub) {
if (sub->callback_uint32t != NULL) {
// execute callback in integer mode
uint32_t data = 0;
data = atoi((char *)sub->lastread);
sub->callback_uint32t(data);
} else if (sub->callback_double != NULL) {
// execute callback in doublefloat mode
double data = 0;
data = atof((char *)sub->lastread);
sub->callback_double(data);
} else if (sub->callback_buffer != NULL) {
// execute callback in buffer mode
sub->callback_buffer((char *)sub->lastread, sub->datalen);
} else if (sub->callback_io != NULL) {
// execute callback in io mode
((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread, sub->datalen);
} else {
DEBUG_PRINTLN(
"ERROR: Subscription packet did not have an associated callback");
return;
}
brentru marked this conversation as resolved.
Show resolved Hide resolved
// mark subscription message as "read""
sub->new_message = false;
}

uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer,
uint8_t waitforpackettype,
uint16_t timeout) {
Expand All @@ -239,7 +265,9 @@ uint16_t Adafruit_MQTT::processPacketsUntil(uint8_t *buffer,
return len;
} else {
if (packetType == MQTT_CTRL_PUBLISH) {
handleSubscriptionPacket(len);
Adafruit_MQTT_Subscribe *sub = handleSubscriptionPacket(len);
if (sub)
processSubscriptionPacket(sub);
} else {
ERROR_PRINTLN(F("Dropped a packet"));
}
Expand Down Expand Up @@ -478,27 +506,8 @@ void Adafruit_MQTT::processPackets(int16_t timeout) {

while (elapsed < (uint32_t)timeout) {
Adafruit_MQTT_Subscribe *sub = readSubscription(timeout - elapsed);
if (sub) {
if (sub->callback_uint32t != NULL) {
// huh lets do the callback in integer mode
uint32_t data = 0;
data = atoi((char *)sub->lastread);
sub->callback_uint32t(data);
} else if (sub->callback_double != NULL) {
// huh lets do the callback in doublefloat mode
double data = 0;
data = atof((char *)sub->lastread);
sub->callback_double(data);
} else if (sub->callback_buffer != NULL) {
// huh lets do the callback in buffer mode
sub->callback_buffer((char *)sub->lastread, sub->datalen);
} else if (sub->callback_io != NULL) {
// huh lets do the callback in io mode
((sub->io_mqtt)->*(sub->callback_io))((char *)sub->lastread,
sub->datalen);
}
}

if (sub)
processSubscriptionPacket(sub);
// keep track over elapsed time
endtime = millis();
if (endtime < starttime) {
Expand Down
8 changes: 5 additions & 3 deletions Adafruit_MQTT.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#define ADAFRUIT_MQTT_VERSION_PATCH 0

// Uncomment/comment to turn on/off debug output messages.
//#define MQTT_DEBUG
// #define MQTT_DEBUG
// Uncomment/comment to turn on/off error output messages.
#define MQTT_ERROR

Expand Down Expand Up @@ -209,10 +209,12 @@ class Adafruit_MQTT {
// messages!
Adafruit_MQTT_Subscribe *readSubscription(int16_t timeout = 0);

// Handle any data coming in for subscriptions and fires them off to the
// appropriate callback
// Handle any data coming in for subscriptions
Adafruit_MQTT_Subscribe *handleSubscriptionPacket(uint16_t len);

// Execute a subscription packet's associated callback and mark as "read"
void processSubscriptionPacket(Adafruit_MQTT_Subscribe *sub);

void processPackets(int16_t timeout);

// Ping the server to ensure the connection is still alive.
Expand Down
2 changes: 1 addition & 1 deletion library.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name=Adafruit MQTT Library
version=2.4.3
version=2.5.0
author=Adafruit
maintainer=Adafruit <[email protected]>
sentence=MQTT library that supports the FONA, ESP8266, ESP32, Yun, and generic Arduino Client hardware.
Expand Down