Skip to content

Commit

Permalink
Add large-payload API, make max header size a define, not magic number.
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian McEwen committed Aug 15, 2018
1 parent 54be6e8 commit 3b3a8da
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 16 deletions.
3 changes: 3 additions & 0 deletions keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ connect KEYWORD2
disconnect KEYWORD2
publish KEYWORD2
publish_P KEYWORD2
beginPublish KEYWORD2
endPublish KEYWORD2
write KEYWORD2
subscribe KEYWORD2
unsubscribe KEYWORD2
loop KEYWORD2
Expand Down
69 changes: 53 additions & 16 deletions src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
if (result == 1) {
nextMsgId = 1;
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
uint16_t length = MQTT_MAX_HEADER_SIZE;
unsigned int j;

#if MQTT_VERSION == MQTT_VERSION_3_1
Expand Down Expand Up @@ -171,7 +171,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
}
}

write(MQTTCONNECT,buffer,length-5);
write(MQTTCONNECT,buffer,length-MQTT_MAX_HEADER_SIZE);

lastInActivity = lastOutActivity = millis();

Expand Down Expand Up @@ -365,12 +365,12 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne

boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
if (connected()) {
if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) {
if (MQTT_MAX_PACKET_SIZE < MQTT_MAX_HEADER_SIZE + 2+strlen(topic) + plength) {
// Too long
return false;
}
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,buffer,length);
uint16_t i;
for (i=0;i<plength;i++) {
Expand All @@ -380,7 +380,7 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigne
if (retained) {
header |= 1;
}
return write(header,buffer,length-5);
return write(header,buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}
Expand Down Expand Up @@ -430,12 +430,43 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsig
return rc == tlen + 4 + plength;
}

boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
if (connected()) {
// Send the header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,buffer,length);
uint16_t i;
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
size_t hlen = buildHeader(header, buffer, plength+length-MQTT_MAX_HEADER_SIZE);
uint16_t rc = _client->write(buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
lastOutActivity = millis();
return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
}
return false;
}

int PubSubClient::endPublish() {
return 1;
}

size_t PubSubClient::write(uint8_t data) {
lastOutActivity = millis();
return _client->write(data);
}

size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
lastOutActivity = millis();
return _client->write(buffer,size);
}

size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
uint8_t lenBuf[4];
uint8_t llen = 0;
uint8_t digit;
uint8_t pos = 0;
uint16_t rc;
uint16_t len = length;
do {
digit = len % 128;
Expand All @@ -449,12 +480,18 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {

buf[4-llen] = header;
for (int i=0;i<llen;i++) {
buf[5-llen+i] = lenBuf[i];
buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
}
return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
}

boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
uint16_t rc;
uint8_t hlen = buildHeader(header, buf, length);

#ifdef MQTT_MAX_TRANSFER_SIZE
uint8_t* writeBuf = buf+(4-llen);
uint16_t bytesRemaining = length+1+llen; //Match the length type
uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
uint16_t bytesRemaining = length+hlen; //Match the length type
uint8_t bytesToWrite;
boolean result = true;
while((bytesRemaining > 0) && result) {
Expand All @@ -466,9 +503,9 @@ boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
}
return result;
#else
rc = _client->write(buf+(4-llen),length+1+llen);
rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
lastOutActivity = millis();
return (rc == 1+llen+length);
return (rc == hlen+length);
#endif
}

Expand All @@ -486,7 +523,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
}
if (connected()) {
// Leave room in the buffer for header and variable length field
uint16_t length = 5;
uint16_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
Expand All @@ -495,7 +532,7 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
buffer[length++] = (nextMsgId & 0xFF);
length = writeString((char*)topic, buffer,length);
buffer[length++] = qos;
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}
Expand All @@ -506,15 +543,15 @@ boolean PubSubClient::unsubscribe(const char* topic) {
return false;
}
if (connected()) {
uint16_t length = 5;
uint16_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
}
buffer[length++] = (nextMsgId >> 8);
buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, buffer,length);
return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}
Expand Down
25 changes: 25 additions & 0 deletions src/PubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
#define MQTTQOS1 (1 << 1)
#define MQTTQOS2 (2 << 1)

// Maximum size of fixed header and variable length size header
#define MQTT_MAX_HEADER_SIZE 5

#ifdef ESP8266
#include <functional>
#define MQTT_CALLBACK_SIGNATURE std::function<void(char*, uint8_t*, unsigned int)> callback
Expand All @@ -94,6 +97,11 @@ class PubSubClient {
boolean readByte(uint8_t * result, uint16_t * index);
boolean write(uint8_t header, uint8_t* buf, uint16_t length);
uint16_t writeString(const char* string, uint8_t* buf, uint16_t pos);
// Build up the header ready to send
// Returns the size of the header
// Note: the header is built at the end of the first MQTT_MAX_HEADER_SIZE bytes, so will start
// (MQTT_MAX_HEADER_SIZE - <returned size>) bytes into the buffer
size_t buildHeader(uint8_t header, uint8_t* buf, uint16_t length);
IPAddress ip;
const char* domain;
uint16_t port;
Expand Down Expand Up @@ -132,6 +140,23 @@ class PubSubClient {
boolean publish(const char* topic, const uint8_t * payload, unsigned int plength);
boolean publish(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained);
boolean publish_P(const char* topic, const uint8_t * payload, unsigned int plength, boolean retained);
// Start to publish a message.
// This API:
// beginPublish(...)
// one or more calls to write(...)
// endPublish()
// Allows for arbitrarily large payloads to be sent without them having to be copied into
// a new buffer and held in memory at one time
// Returns 1 if the message was started successfully, 0 if there was an error
boolean beginPublish(const char* topic, unsigned int plength, boolean retained);
// Finish off this publish message (started with beginPublish)
// Returns 1 if the packet was sent successfully, 0 if there was an error
int endPublish();
// Write a single byte of payload (only to be used with beginPublish/endPublish)
size_t write(uint8_t);
// Write size bytes from buffer into the payload (only to be used with beginPublish/endPublish)
// Returns the number of bytes written
size_t write(const uint8_t *buffer, size_t size);
boolean subscribe(const char* topic);
boolean subscribe(const char* topic, uint8_t qos);
boolean unsubscribe(const char* topic);
Expand Down

0 comments on commit 3b3a8da

Please sign in to comment.