Skip to content

Commit

Permalink
Tracker Sync
Browse files Browse the repository at this point in the history
Tracker Sync across gateways to avoid intermittent AWAY statuses
  • Loading branch information
DigiH committed Jun 25, 2024
1 parent 5756332 commit c2e20c6
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 22 deletions.
7 changes: 7 additions & 0 deletions TheengsGateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import sys
import uuid
from pathlib import Path

from .ble_gateway import run
Expand Down Expand Up @@ -48,6 +49,12 @@ def main() -> None:
if configuration["discovery_topic"].endswith("/sensor"):
configuration["discovery_topic"] = configuration["discovery_topic"][:-7]

# Get the MAC address of the gateway.
mac_address = uuid.UUID(int=uuid.getnode()).hex[-12:]
configuration["gateway_id"] = ":".join(
[mac_address[i : i + 2] for i in range(0, 12, 2)]
).upper()

if not configuration["host"]:
sys.exit("MQTT host is not specified")

Expand Down
65 changes: 44 additions & 21 deletions TheengsGateway/ble_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def on_connect(
retain=True,
)
self.subscribe(self.configuration["subscribe_topic"])
self.subscribe("theengs/internal/#")
else:
logger.error(
"Failed to connect to MQTT broker %s:%d reason code: %s",
Expand Down Expand Up @@ -226,28 +227,50 @@ def subscribe(self, sub_topic: str) -> None:
"""Subscribe to MQTT topic <sub_topic>."""

def on_message(client, userdata, msg) -> None: # noqa: ANN001,ARG001
logger.info(
"Received `%s` from `%s` topic",
msg.payload.decode(),
msg.topic,
)
try:
msg_json = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(
"Invalid JSON message %s: %s", msg.payload.decode(), exception
)
return
# Theengs internal
if "theengs/internal/" in msg.topic:
# Evaluate trackersync messages
if msg.topic == "theengs/internal/trackersync":
msg_json = json.loads(msg.payload)
logger.debug("trackersync message: %s", msg_json)

if (
msg_json["gatewayid"] != self.configuration["gateway_id"]
and msg_json["tracker"] in self.discovered_trackers
and self.discovered_trackers[msg_json["tracker"]].time != 0
):
self.discovered_trackers[msg_json["tracker"]].time = 0
logger.info(
"Tracker %s disassociated by gateway %s",
msg_json["tracker"],
msg_json["gatewayid"],
)

logger.debug("[DIS] Discovered Trackers: %s", self.discovered_trackers)

try:
msg_json["id"] = self.rpa2id(msg_json["id"])
except KeyError:
logger.warning(
"JSON message %s doesn't contain id", msg.payload.decode()
else:
logger.info(
"Received `%s` from `%s` topic",
msg.payload.decode(),
msg.topic,
)
return
try:
msg_json = json.loads(msg.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as exception:
logger.warning(
"Invalid JSON message %s: %s", msg.payload.decode(), exception
)
return

try:
msg_json["id"] = self.rpa2id(msg_json["id"])
except KeyError:
logger.warning(
"JSON message %s doesn't contain id", msg.payload.decode()
)
return

self.decode_advertisement(msg_json)
self.decode_advertisement(msg_json)

self.client.subscribe(sub_topic)
self.client.on_message = on_message
Expand Down Expand Up @@ -392,7 +415,7 @@ def check_tracker_timeout(self) -> None:
)
time_model.time = 0
self.discovered_trackers[address] = time_model
logger.debug("Discovered Trackers: %s", self.discovered_trackers)
logger.debug("[TO] Discovered Trackers: %s", self.discovered_trackers)

async def ble_scan_loop(self) -> None:
"""Scan for BLE devices."""
Expand Down Expand Up @@ -614,7 +637,7 @@ def publish_json(
round(time()),
str(data_json["model_id"]),
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)
logger.debug("[GP] Discovered Trackers: %s", self.discovered_trackers)

# Remove "track" if PUBLISH_ADVDATA is 0
if not self.configuration["publish_advdata"] and "track" in data_json:
Expand Down
11 changes: 10 additions & 1 deletion TheengsGateway/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,17 @@ def copy_pub_device(self, device: dict) -> dict:
self.discovered_trackers[device["id"]] = TnM(
round(time()), device["model_id"]
)
logger.debug("Discovered Trackers: %s", self.discovered_trackers)

# Publish trackersync message
message = json.dumps(
{"gatewayid": self.configuration["gateway_id"], "tracker": device["id"]}
)
self.publish(
message,
"theengs/internal/trackersync",
)

logger.debug("Discovered Trackers: %s", self.discovered_trackers)
pub_device_copy = device.copy()
# Remove "track" if PUBLISH_ADVDATA is 0
if not self.configuration["publish_advdata"] and "track" in pub_device_copy:
Expand Down

0 comments on commit c2e20c6

Please sign in to comment.