From bd971727fe68fa43c50bf7011f9ad9d7bc38b42f Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Sat, 27 Jan 2018 21:46:39 +0100 Subject: [PATCH 1/6] Fix decoding of fixed header --- lib/mqtt_message.dart | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/mqtt_message.dart b/lib/mqtt_message.dart index 2a30ed1..416a43a 100644 --- a/lib/mqtt_message.dart +++ b/lib/mqtt_message.dart @@ -41,10 +41,10 @@ abstract class MqttMessage { * - a payload (message specific) */ MqttMessage.decode(List data, [bool debugMessage = false]) { - decodeFixedHeader(data); - num vhLen = decodeVariableHeader(data.sublist(2)); - if (data.length > 2 + vhLen) { - decodePayload(data.sublist(2 + vhLen)); + num fhLen = decodeFixedHeader(data); + num vhLen = decodeVariableHeader(data.sublist(fhLen)); + if (data.length > fhLen + vhLen) { + decodePayload(data.sublist(fhLen + vhLen)); } if (debugMessage) { @@ -137,9 +137,11 @@ abstract class MqttMessage { * bit 2 - 1 : Qos Level * bit 0 : RETAIN * - * Byte 2 : Remaining length + * Byte 2 : Remaining length + * + * Returns length of fixed header. */ - decodeFixedHeader(data) { + num decodeFixedHeader(data) { type = data[0] >> 4; DUP = data[0] & 0x1000; QoS = (data[0]>>1) & QOS_ALL; @@ -158,6 +160,8 @@ abstract class MqttMessage { } while ( (digit & 0x80) != 0); len = remLength + 2; + + return pos; } /** From 358a6c3f6d6ddc11018bf98d3c38396cecc086b1 Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Sat, 27 Jan 2018 22:05:20 +0100 Subject: [PATCH 2/6] Fix message length --- lib/mqtt_message.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mqtt_message.dart b/lib/mqtt_message.dart index 416a43a..21eac5d 100644 --- a/lib/mqtt_message.dart +++ b/lib/mqtt_message.dart @@ -159,7 +159,7 @@ abstract class MqttMessage { multiplier *= 128; } while ( (digit & 0x80) != 0); - len = remLength + 2; + len = remLength + pos; return pos; } From 7871bfb279bb3a454eca708784c3661be27bbc3c Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Sat, 27 Jan 2018 22:26:30 +0100 Subject: [PATCH 3/6] Fix fixedHeaderLength in all payload. Add longer tests --- lib/mqtt_message.dart | 5 +++-- lib/mqtt_message_assured.dart | 2 +- lib/mqtt_message_connack.dart | 2 +- lib/mqtt_message_publish.dart | 4 ++-- lib/mqtt_message_suback.dart | 2 +- lib/mqtt_message_unsuback.dart | 2 +- test/mqtt_test.dart | 6 ++++++ 7 files changed, 15 insertions(+), 8 deletions(-) diff --git a/lib/mqtt_message.dart b/lib/mqtt_message.dart index 21eac5d..3dbce97 100644 --- a/lib/mqtt_message.dart +++ b/lib/mqtt_message.dart @@ -42,7 +42,7 @@ abstract class MqttMessage { */ MqttMessage.decode(List data, [bool debugMessage = false]) { num fhLen = decodeFixedHeader(data); - num vhLen = decodeVariableHeader(data.sublist(fhLen)); + num vhLen = decodeVariableHeader(data.sublist(fhLen), fhLen); if (data.length > fhLen + vhLen) { decodePayload(data.sublist(fhLen + vhLen)); } @@ -82,6 +82,7 @@ abstract class MqttMessage { encodeFixedHeader(); encodeVariableHeader(); encodePayload(); + len = _buf.length; } /** @@ -170,7 +171,7 @@ abstract class MqttMessage { * * Return the length of the variable header */ - num decodeVariableHeader(List data) { return 0; } + num decodeVariableHeader(List data, int fhLen) { return 0; } /** * decodePayload diff --git a/lib/mqtt_message_assured.dart b/lib/mqtt_message_assured.dart index b04c752..9f5585b 100644 --- a/lib/mqtt_message_assured.dart +++ b/lib/mqtt_message_assured.dart @@ -34,7 +34,7 @@ abstract class MqttMessageAssured extends MqttMessage { * byte 1 - Message ID MSB * byte 2 - Message ID LSB */ - num decodeVariableHeader(List data) { + num decodeVariableHeader(List data, int fhLen) { assert(data.length == 2); _msgID_MSB = data[0]; diff --git a/lib/mqtt_message_connack.dart b/lib/mqtt_message_connack.dart index 4110e59..ad2e209 100644 --- a/lib/mqtt_message_connack.dart +++ b/lib/mqtt_message_connack.dart @@ -20,7 +20,7 @@ class MqttMessageConnack extends MqttMessage { * byte 1 - reserved value. Not used * byte 2 - return code */ - num decodeVariableHeader(List data) { + num decodeVariableHeader(List data, int fhLen) { assert(data.length == 2); returnCode = data[1]; diff --git a/lib/mqtt_message_publish.dart b/lib/mqtt_message_publish.dart index bd1be32..876e4ea 100644 --- a/lib/mqtt_message_publish.dart +++ b/lib/mqtt_message_publish.dart @@ -83,7 +83,7 @@ class MqttMessagePublish extends MqttMessage { * * Return the length of the variable header */ - num decodeVariableHeader(List data) { + num decodeVariableHeader(List data, int fhLen) { int pos = 0; num topicLength = 256 * data[pos++] + data[pos++]; @@ -95,7 +95,7 @@ class MqttMessagePublish extends MqttMessage { _msgID_LSB = data[pos++]; } - _payloadPos = 2 + pos; // position for the 1st payload character = 2 (fixed header length) + pos (variable header length) + _payloadPos = fhLen + pos; // position for the 1st payload character = 2 (fixed header length) + pos (variable header length) return pos; } diff --git a/lib/mqtt_message_suback.dart b/lib/mqtt_message_suback.dart index 7864ea4..66c15c0 100644 --- a/lib/mqtt_message_suback.dart +++ b/lib/mqtt_message_suback.dart @@ -21,7 +21,7 @@ class MqttMessageSuback extends MqttMessageAssured { * byte 1 - Message ID MSB * byte 2 - Message ID LSB */ - num decodeVariableHeader(List data) { + num decodeVariableHeader(List data, int fhLen) { // assert(data.length == 3); messageID = 256 * data[0] + data[1]; diff --git a/lib/mqtt_message_unsuback.dart b/lib/mqtt_message_unsuback.dart index 3a7fa7d..0d3894c 100644 --- a/lib/mqtt_message_unsuback.dart +++ b/lib/mqtt_message_unsuback.dart @@ -20,7 +20,7 @@ class MqttMessageUnsuback extends MqttMessageAssured { * byte 1 - Message ID MSB * byte 2 - Message ID LSB */ - num decodeVariableHeader(List data) { + num decodeVariableHeader(List data, int fhLen) { assert(data.length == 2); messageID = 256 * data[0] + data[1]; diff --git a/test/mqtt_test.dart b/test/mqtt_test.dart index a8f6b2c..25d6396 100644 --- a/test/mqtt_test.dart +++ b/test/mqtt_test.dart @@ -63,5 +63,11 @@ testPublish(String testName, num QoS, int retain) { MqttMessagePublish m2 = new MqttMessagePublish.decode(m1.buf); expect(m2, new MqttMessagePublishMatcher(m1)); + + MqttMessagePublish ml1 = new MqttMessagePublish.setOptions("topicTEST", "payloadTEST very long very long very long very long very long very long very long very long very long very long very long very long very long very long very long very long very long", 1, QoS, retain); + ml1.encode(); + MqttMessagePublish ml2 = new MqttMessagePublish.decode(ml1.buf); + + expect(ml2, new MqttMessagePublishMatcher(ml1)); }); } From f45a288451c01f3229170bc2790aa938b7249ca1 Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Sun, 28 Jan 2018 13:23:01 +0100 Subject: [PATCH 4/6] Support PUBLISH messages that take multiple network packets --- lib/mqtt_client.dart | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/mqtt_client.dart b/lib/mqtt_client.dart index 39c0c72..e3706f6 100644 --- a/lib/mqtt_client.dart +++ b/lib/mqtt_client.dart @@ -13,6 +13,7 @@ class MqttClient { Map _messagesToCompleteMap; final String _userName; final String _password; + var _remData; var _liveTimer; @@ -192,11 +193,20 @@ class MqttClient { */ void _processData(data) { - var remData = data; + if (_remData != null) { + // append data to remaining data + _remData.addAll(data); + } else { + // No remaining data + _remData = data; + } + var lenBefore, lenAfter; do { - remData = _processMqttMessage(remData); - } while (remData != null); + lenBefore = _remData.length; + _remData = _processMqttMessage(_remData); + lenAfter = (_remData != null) ? _remData.length : 0; + } while (lenBefore != lenAfter && lenAfter >= 2); } /** @@ -315,6 +325,10 @@ class MqttClient { */ int _handlePublish(data) { MqttMessagePublish m = new MqttMessagePublish.decode(data, debugMessage); + if (m.len > data.length) { + // Not enough data yet + return 0; + } // QOS_1 and QOS_2 messages need to be acked if (m.QoS > 0) { From 7783c066361e490e3560f9051118c566c36d092c Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Sun, 28 Jan 2018 13:46:27 +0100 Subject: [PATCH 5/6] Prevent adding to fixed list --- lib/mqtt_client.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mqtt_client.dart b/lib/mqtt_client.dart index e3706f6..02d7c8a 100644 --- a/lib/mqtt_client.dart +++ b/lib/mqtt_client.dart @@ -198,7 +198,7 @@ class MqttClient { _remData.addAll(data); } else { // No remaining data - _remData = data; + _remData = data.toList(growable:true); } var lenBefore, lenAfter; From 9d2c50b8be4d3997941c52badb670186330cd303 Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Sun, 28 Jan 2018 14:26:00 +0100 Subject: [PATCH 6/6] Prevent printing debug message when not requested --- lib/mqtt_client.dart | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/mqtt_client.dart b/lib/mqtt_client.dart index 02d7c8a..673ec4a 100644 --- a/lib/mqtt_client.dart +++ b/lib/mqtt_client.dart @@ -338,7 +338,9 @@ class MqttClient { _resetTimer(); } - print("[mqttClient] [" + m._topic + "][" + m._payload + "]"); + if (debugMessage) { + print("[mqttClient] [" + m._topic + "][" + m._payload + "]"); + } // notify the client of the new topic / payload if (_onSubscribeDataMap != null && _onSubscribeDataMap[m._topic] != null) _onSubscribeDataMap[m._topic](m._topic, m._payload);