Skip to content

Commit

Permalink
Fixed outgoing MQTT message queue not maintaining its own order #1714
Browse files Browse the repository at this point in the history
  • Loading branch information
dennissiemensma committed Sep 29, 2022
1 parent 24a097b commit f5845a0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 8 deletions.
10 changes: 9 additions & 1 deletion docs/reference/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@ Current version
ℹ️ :doc:`How to update</how-to/upgrading/upgrade>` *(minor updates only)*


v5.8.0 - 2022-xx-xx
v5.8.0 - 2022-10-01
-------------------

.. attention::

This release fixes a four year old bug that may have disrupted the order of your MQTT messages sent by DSMR-reader.

It only affected installations with either a high throughput of data or a delayed backend process (or both).
You probably only have noticed it when running an installation similar to the one above, along using per-topic data sources.

- ``Fixed`` Outgoing MQTT message queue not maintaining its own order [`#1714 <https://github.com/dsmrreader/dsmr-reader/issues/1714>`_]


v5.7.0 - 2022-09-27
Expand Down
30 changes: 23 additions & 7 deletions dsmr_mqtt/services/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,29 @@ def initialize_client() -> Optional[paho.Client]:
def run(mqtt_client: paho.Client) -> None:
"""Reads any messages from the queue and publishing them to the MQTT broker."""

# Keep batches small, only send the latest X messages. The rest will be trimmed (in case of delay).
message_queue = queue.Message.objects.all().order_by("-pk")[
0 : settings.DSMRREADER_MQTT_MAX_MESSAGES_IN_QUEUE
]
try:
# Keep batches small, only send the latest X messages. So drop any excess first and fetch the remainder after.
lowest_pk_to_preserve = (
queue.Message.objects.all()
.order_by("-pk")[
settings.DSMRREADER_MQTT_MAX_MESSAGES_IN_QUEUE
- 1 # Zero indexed, so -1
]
.pk
)
except IndexError:
# Total count within limits. No cleanup required.
pass
else:
deletion_count, _ = queue.Message.objects.filter(
pk__lt=lowest_pk_to_preserve
).delete()
logger.warning(
"MQTT: Dropped %d message(s) from queue due to limit", deletion_count
)

# Remainder, preserving order.
message_queue = queue.Message.objects.all().order_by("pk")

if not message_queue:
return
Expand Down Expand Up @@ -121,9 +140,6 @@ def run(mqtt_client: paho.Client) -> None:
logger.debug("MQTT: Deleting published message (#%s) from queue", current.pk)
current.delete()

# Delete any overflow in messages.
queue.Message.objects.all().delete()


def signal_reconnect() -> None:
backend_restart_required.send_robust(None)
Expand Down
10 changes: 10 additions & 0 deletions dsmr_mqtt/tests/services/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ def test_run_cleanup(self, publish_mock, loop_mock):

self.assertFalse(queue.Message.objects.exists())

# Make sure whatever mechanism we're using, the queue order is always preserved.
previous_payload = 0

for current_call in publish_mock.call_args_list:
current_call_payload = int(current_call[1]["payload"])
self.assertGreater(
current_call_payload, previous_payload
) # Possible issues with queue order
previous_payload = current_call_payload

@mock.patch("paho.mqtt.client.Client.publish")
@mock.patch("paho.mqtt.client.Client.loop")
def test_run_disconnected(self, loop_mock, *mocks):
Expand Down

0 comments on commit f5845a0

Please sign in to comment.