Skip to content

Commit

Permalink
Merge pull request #11 from ewoutp/master
Browse files Browse the repository at this point in the history
Fix of "fixed header" decoding when remainingLength takes more than 1 byte
  • Loading branch information
jnguillerme authored Feb 1, 2018
2 parents 27da0ea + 9d2c50b commit b115db5
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 18 deletions.
24 changes: 20 additions & 4 deletions lib/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class MqttClient<E extends VirtualMqttConnection> {
Map<int, Completer> _messagesToCompleteMap;
final String _userName;
final String _password;
var _remData;

var _liveTimer;

Expand Down Expand Up @@ -192,11 +193,20 @@ class MqttClient<E extends VirtualMqttConnection> {
*/

void _processData(data) {
var remData = data;
if (_remData != null) {
// append data to remaining data
_remData.addAll(data);
} else {
// No remaining data
_remData = data.toList(growable:true);
}

var lenBefore, lenAfter;
do {
remData = _processMqttMessage(remData);
} while (remData != null);
lenBefore = _remData.length;
_remData = _processMqttMessage(_remData);
lenAfter = (_remData != null) ? _remData.length : 0;
} while (lenBefore != lenAfter && lenAfter >= 2);
}

/**
Expand Down Expand Up @@ -315,6 +325,10 @@ class MqttClient<E extends VirtualMqttConnection> {
*/
int _handlePublish(data) {
MqttMessagePublish m = new MqttMessagePublish.decode(data, debugMessage);
if (m.len > data.length) {
// Not enough data yet
return 0;
}

// QOS_1 and QOS_2 messages need to be acked
if (m.QoS > 0) {
Expand All @@ -324,7 +338,9 @@ class MqttClient<E extends VirtualMqttConnection> {
_resetTimer();
}

print("[mqttClient] [" + m._topic + "][" + m._payload + "]");
if (debugMessage) {
print("[mqttClient] [" + m._topic + "][" + m._payload + "]");
}
// notify the client of the new topic / payload
if (_onSubscribeDataMap != null && _onSubscribeDataMap[m._topic] != null)
_onSubscribeDataMap[m._topic](m._topic, m._payload);
Expand Down
21 changes: 13 additions & 8 deletions lib/mqtt_message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ abstract class MqttMessage {
* - a payload (message specific)
*/
MqttMessage.decode(List<int> data, [bool debugMessage = false]) {
decodeFixedHeader(data);
num vhLen = decodeVariableHeader(data.sublist(2));
if (data.length > 2 + vhLen) {
decodePayload(data.sublist(2 + vhLen));
num fhLen = decodeFixedHeader(data);
num vhLen = decodeVariableHeader(data.sublist(fhLen), fhLen);
if (data.length > fhLen + vhLen) {
decodePayload(data.sublist(fhLen + vhLen));
}

if (debugMessage) {
Expand Down Expand Up @@ -82,6 +82,7 @@ abstract class MqttMessage {
encodeFixedHeader();
encodeVariableHeader();
encodePayload();
len = _buf.length;
}

/**
Expand Down Expand Up @@ -137,9 +138,11 @@ abstract class MqttMessage {
* bit 2 - 1 : Qos Level
* bit 0 : RETAIN
*
* Byte 2 : Remaining length
* Byte 2 : Remaining length
*
* Returns length of fixed header.
*/
decodeFixedHeader(data) {
num decodeFixedHeader(data) {
type = data[0] >> 4;
DUP = data[0] & 0x1000;
QoS = (data[0]>>1) & QOS_ALL;
Expand All @@ -157,7 +160,9 @@ abstract class MqttMessage {
multiplier *= 128;
} while ( (digit & 0x80) != 0);

len = remLength + 2;
len = remLength + pos;

return pos;
}

/**
Expand All @@ -166,7 +171,7 @@ abstract class MqttMessage {
*
* Return the length of the variable header
*/
num decodeVariableHeader(List<int> data) { return 0; }
num decodeVariableHeader(List<int> data, int fhLen) { return 0; }

/**
* decodePayload
Expand Down
2 changes: 1 addition & 1 deletion lib/mqtt_message_assured.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ abstract class MqttMessageAssured extends MqttMessage {
* byte 1 - Message ID MSB
* byte 2 - Message ID LSB
*/
num decodeVariableHeader(List<int> data) {
num decodeVariableHeader(List<int> data, int fhLen) {
assert(data.length == 2);

_msgID_MSB = data[0];
Expand Down
2 changes: 1 addition & 1 deletion lib/mqtt_message_connack.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class MqttMessageConnack extends MqttMessage {
* byte 1 - reserved value. Not used
* byte 2 - return code
*/
num decodeVariableHeader(List<int> data) {
num decodeVariableHeader(List<int> data, int fhLen) {
assert(data.length == 2);

returnCode = data[1];
Expand Down
4 changes: 2 additions & 2 deletions lib/mqtt_message_publish.dart
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class MqttMessagePublish extends MqttMessage {
*
* Return the length of the variable header
*/
num decodeVariableHeader(List<int> data) {
num decodeVariableHeader(List<int> data, int fhLen) {
int pos = 0;
num topicLength = 256 * data[pos++] + data[pos++];

Expand All @@ -95,7 +95,7 @@ class MqttMessagePublish extends MqttMessage {
_msgID_LSB = data[pos++];
}

_payloadPos = 2 + pos; // position for the 1st payload character = 2 (fixed header length) + pos (variable header length)
_payloadPos = fhLen + pos; // position for the 1st payload character = 2 (fixed header length) + pos (variable header length)

return pos;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/mqtt_message_suback.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MqttMessageSuback extends MqttMessageAssured {
* byte 1 - Message ID MSB
* byte 2 - Message ID LSB
*/
num decodeVariableHeader(List<int> data) {
num decodeVariableHeader(List<int> data, int fhLen) {
// assert(data.length == 3);

messageID = 256 * data[0] + data[1];
Expand Down
2 changes: 1 addition & 1 deletion lib/mqtt_message_unsuback.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class MqttMessageUnsuback extends MqttMessageAssured {
* byte 1 - Message ID MSB
* byte 2 - Message ID LSB
*/
num decodeVariableHeader(List<int> data) {
num decodeVariableHeader(List<int> data, int fhLen) {
assert(data.length == 2);

messageID = 256 * data[0] + data[1];
Expand Down
6 changes: 6 additions & 0 deletions test/mqtt_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,11 @@ testPublish(String testName, num QoS, int retain) {
MqttMessagePublish m2 = new MqttMessagePublish.decode(m1.buf);

expect(m2, new MqttMessagePublishMatcher(m1));

MqttMessagePublish ml1 = new MqttMessagePublish.setOptions("topicTEST", "payloadTEST very long very long very long very long very long very long very long very long very long very long very long very long very long very long very long very long very long", 1, QoS, retain);
ml1.encode();
MqttMessagePublish ml2 = new MqttMessagePublish.decode(ml1.buf);

expect(ml2, new MqttMessagePublishMatcher(ml1));
});
}

0 comments on commit b115db5

Please sign in to comment.