diff --git a/db_utils.py b/db_utils.py index 5dfede7..c68cc3f 100644 --- a/db_utils.py +++ b/db_utils.py @@ -1,11 +1,15 @@ import json import sqlite3 +from log_utils import get_logger + +logger = get_logger(name="db_utils") # Initialize SQLite database def initialize_database(): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() + # Updated table schema: matrix_event_id is now PRIMARY KEY, meshtastic_id is not necessarily unique cursor.execute( "CREATE TABLE IF NOT EXISTS longnames (meshtastic_id TEXT PRIMARY KEY, longname TEXT)" ) @@ -15,8 +19,23 @@ def initialize_database(): cursor.execute( "CREATE TABLE IF NOT EXISTS plugin_data (plugin_name TEXT, meshtastic_id TEXT, data TEXT, PRIMARY KEY (plugin_name, meshtastic_id))" ) - conn.commit() + # Changed the schema for message_map: matrix_event_id is now primary key + # Added a new column 'meshtastic_meshnet' to store the meshnet origin of the message. + # If table already exists, we try adding the column if it doesn't exist. + cursor.execute( + "CREATE TABLE IF NOT EXISTS message_map (meshtastic_id INTEGER, matrix_event_id TEXT PRIMARY KEY, matrix_room_id TEXT, meshtastic_text TEXT, meshtastic_meshnet TEXT)" + ) + # Attempt to add meshtastic_meshnet column if it's missing (for upgrades) + # This is a no-op if the column already exists. + # If user runs fresh, it will already be there from CREATE TABLE IF NOT EXISTS. + try: + cursor.execute("ALTER TABLE message_map ADD COLUMN meshtastic_meshnet TEXT") + except sqlite3.OperationalError: + # Column already exists, or table just created with it + pass + + conn.commit() def store_plugin_data(plugin_name, meshtastic_id, data): with sqlite3.connect("meshtastic.sqlite") as conn: @@ -27,7 +46,6 @@ def store_plugin_data(plugin_name, meshtastic_id, data): ) conn.commit() - def delete_plugin_data(plugin_name, meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -37,7 +55,6 @@ def delete_plugin_data(plugin_name, meshtastic_id): ) conn.commit() - # Get the data for a given plugin and Meshtastic ID def get_plugin_data_for_node(plugin_name, meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: @@ -52,7 +69,6 @@ def get_plugin_data_for_node(plugin_name, meshtastic_id): result = cursor.fetchone() return json.loads(result[0] if result else "[]") - # Get the data for a given plugin def get_plugin_data(plugin_name): with sqlite3.connect("meshtastic.sqlite") as conn: @@ -63,7 +79,6 @@ def get_plugin_data(plugin_name): ) return cursor.fetchall() - # Get the longname for a given Meshtastic ID def get_longname(meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: @@ -74,7 +89,6 @@ def get_longname(meshtastic_id): result = cursor.fetchone() return result[0] if result else None - def save_longname(meshtastic_id, longname): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -84,7 +98,6 @@ def save_longname(meshtastic_id, longname): ) conn.commit() - def update_longnames(nodes): if nodes: for node in nodes.values(): @@ -94,7 +107,6 @@ def update_longnames(nodes): longname = user.get("longName", "N/A") save_longname(meshtastic_id, longname) - def get_shortname(meshtastic_id): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -104,7 +116,6 @@ def get_shortname(meshtastic_id): result = cursor.fetchone() return result[0] if result else None - def save_shortname(meshtastic_id, shortname): with sqlite3.connect("meshtastic.sqlite") as conn: cursor = conn.cursor() @@ -114,7 +125,6 @@ def save_shortname(meshtastic_id, shortname): ) conn.commit() - def update_shortnames(nodes): if nodes: for node in nodes.values(): @@ -123,3 +133,57 @@ def update_shortnames(nodes): meshtastic_id = user["id"] shortname = user.get("shortName", "N/A") save_shortname(meshtastic_id, shortname) + +def store_message_map(meshtastic_id, matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet=None): + """ + Stores a message map in the database. + + :param meshtastic_id: The Meshtastic message ID (integer or None) + :param matrix_event_id: The Matrix event ID (string, primary key) + :param matrix_room_id: The Matrix room ID (string) + :param meshtastic_text: The text of the Meshtastic message + :param meshtastic_meshnet: The name of the meshnet this message originated from. + This helps us identify remote vs local mesh origins. + """ + with sqlite3.connect("meshtastic.sqlite") as conn: + cursor = conn.cursor() + logger.debug( + f"Storing message map: meshtastic_id={meshtastic_id}, matrix_event_id={matrix_event_id}, matrix_room_id={matrix_room_id}, meshtastic_text={meshtastic_text}, meshtastic_meshnet={meshtastic_meshnet}" + ) + cursor.execute( + "INSERT OR REPLACE INTO message_map (meshtastic_id, matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) VALUES (?, ?, ?, ?, ?)", + (meshtastic_id, matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet), + ) + conn.commit() + +def get_message_map_by_meshtastic_id(meshtastic_id): + with sqlite3.connect("meshtastic.sqlite") as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet FROM message_map WHERE meshtastic_id=?", + (meshtastic_id,), + ) + result = cursor.fetchone() + logger.debug( + f"Retrieved message map by meshtastic_id={meshtastic_id}: {result}" + ) + if result: + # result = (matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) + return result[0], result[1], result[2], result[3] + return None + +def get_message_map_by_matrix_event_id(matrix_event_id): + with sqlite3.connect("meshtastic.sqlite") as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT meshtastic_id, matrix_room_id, meshtastic_text, meshtastic_meshnet FROM message_map WHERE matrix_event_id=?", + (matrix_event_id,), + ) + result = cursor.fetchone() + logger.debug( + f"Retrieved message map by matrix_event_id={matrix_event_id}: {result}" + ) + if result: + # result = (meshtastic_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) + return result[0], result[1], result[2], result[3] + return None diff --git a/log_utils.py b/log_utils.py index a66ba46..92e2269 100644 --- a/log_utils.py +++ b/log_utils.py @@ -40,7 +40,7 @@ def get_logger(name): "backup_count", 1 ) # Default to 1 backup file_handler = RotatingFileHandler( - log_file, maxBytes=max_bytes, backupCount=backup_count + log_file, maxBytes=max_bytes, backupCount=backup_count, encoding='utf-8' ) file_handler.setFormatter( diff --git a/main.py b/main.py index 627c45d..dcf1076 100644 --- a/main.py +++ b/main.py @@ -9,7 +9,7 @@ import sys from typing import List -from nio import RoomMessageNotice, RoomMessageText +from nio import RoomMessageNotice, RoomMessageText, ReactionEvent, RoomMessageEmote # Import meshtastic_utils as a module to set event_loop import meshtastic_utils @@ -62,8 +62,10 @@ async def main(): # Register the message callback for Matrix matrix_logger.info("Listening for inbound Matrix messages...") matrix_client.add_event_callback( - on_room_message, (RoomMessageText, RoomMessageNotice) + on_room_message, (RoomMessageText, RoomMessageNotice, RoomMessageEmote) ) + # Add ReactionEvent callback so we can handle matrix reactions + matrix_client.add_event_callback(on_room_message, ReactionEvent) # Set up shutdown event shutdown_event = asyncio.Event() diff --git a/matrix_utils.py b/matrix_utils.py index b8d852b..cf060ab 100644 --- a/matrix_utils.py +++ b/matrix_utils.py @@ -13,9 +13,12 @@ MatrixRoom, RoomMessageNotice, RoomMessageText, + RoomMessageEmote, UploadResponse, WhoamiError, + ReactionEvent, ) + from PIL import Image from config import relay_config @@ -23,6 +26,7 @@ # Do not import plugin_loader here to avoid circular imports from meshtastic_utils import connect_meshtastic +from db_utils import get_message_map_by_matrix_event_id # Extract Matrix configuration matrix_homeserver = relay_config["matrix"]["homeserver"] @@ -39,14 +43,14 @@ matrix_client = None - def bot_command(command, payload): + # Checks if the given command is directed at the bot return f"{bot_user_name}: !{command}" in payload - async def connect_matrix(): """ Establish a connection to the Matrix homeserver. + Sets global matrix_client and detects the bot's display name. """ global matrix_client global bot_user_name @@ -90,11 +94,11 @@ async def connect_matrix(): return matrix_client - async def join_matrix_room(matrix_client, room_id_or_alias: str) -> None: """Join a Matrix room by its ID or alias.""" try: if room_id_or_alias.startswith("#"): + # If it's a room alias, resolve it to a room ID response = await matrix_client.room_resolve_alias(room_id_or_alias) if not response.room_id: logger.error( @@ -107,10 +111,10 @@ async def join_matrix_room(matrix_client, room_id_or_alias: str) -> None: if room_config["id"] == room_id_or_alias: room_config["id"] = room_id break - else: room_id = room_id_or_alias + # Attempt to join the room if not already joined if room_id not in matrix_client.rooms: response = await matrix_client.join(room_id) if response and hasattr(response, "room_id"): @@ -124,38 +128,67 @@ async def join_matrix_room(matrix_client, room_id_or_alias: str) -> None: except Exception as e: logger.error(f"Error joining room '{room_id_or_alias}': {e}") - -# Send message to the Matrix room -async def matrix_relay(room_id, message, longname, shortname, meshnet_name, portnum): +async def matrix_relay(room_id, message, longname, shortname, meshnet_name, portnum, meshtastic_id=None, meshtastic_replyId=None, meshtastic_text=None, emote=False, emoji=False): + """ + Relay a message from Meshtastic to Matrix, optionally storing message maps. + + :param room_id: The Matrix room ID to send to. + :param message: The text message to send. + :param longname: The sender's longname on the meshnet. + :param shortname: The sender's shortname on the meshnet. + :param meshnet_name: The meshnet name passed in, but we will always override with our own relay's meshnet_name. + :param portnum: The portnum or message type source from Meshtastic. + :param meshtastic_id: The Meshtastic ID of the message, if any. + :param meshtastic_replyId: The Meshtastic replyId if this is a reaction. + :param meshtastic_text: The original Meshtastic text message if available. + :param emote: If True, send as an emote instead of a normal message. + :param emoji: If True, indicates this was originally a reaction. + """ matrix_client = await connect_matrix() try: + # Always use our own local meshnet_name for outgoing events + local_meshnet_name = relay_config["meshtastic"]["meshnet_name"] content = { - "msgtype": "m.text", + "msgtype": "m.text" if not emote else "m.emote", "body": message, "meshtastic_longname": longname, "meshtastic_shortname": shortname, - "meshtastic_meshnet": meshnet_name, + "meshtastic_meshnet": local_meshnet_name, "meshtastic_portnum": portnum, } - await asyncio.wait_for( + if meshtastic_id is not None: + content["meshtastic_id"] = meshtastic_id + if meshtastic_replyId is not None: + content["meshtastic_replyId"] = meshtastic_replyId + if meshtastic_text is not None: + content["meshtastic_text"] = meshtastic_text + if emoji: + content["meshtastic_emoji"] = 1 + + response = await asyncio.wait_for( matrix_client.room_send( room_id=room_id, message_type="m.room.message", content=content, ), - timeout=0.5, + timeout=5.0, ) logger.info(f"Sent inbound radio message to matrix room: {room_id}") + # For inbound meshtastic->matrix messages, store mapping here if meshtastic_id is present and not a reaction + if meshtastic_id is not None and not emote: + from db_utils import store_message_map + # Always store our own local meshnet_name to identify origin + store_message_map(meshtastic_id, response.event_id, room_id, meshtastic_text if meshtastic_text else message, meshtastic_meshnet=local_meshnet_name) + except asyncio.TimeoutError: logger.error("Timed out while waiting for Matrix response") except Exception as e: logger.error(f"Error sending radio message to matrix room {room_id}: {e}") - def truncate_message( text, max_bytes=227 -): # 227 is the maximum that we can run without an error so far. 228 throws an error. +): """ Truncate the given text to fit within the specified byte size. @@ -166,59 +199,182 @@ def truncate_message( truncated_text = text.encode("utf-8")[:max_bytes].decode("utf-8", "ignore") return truncated_text - # Callback for new messages in Matrix room async def on_room_message( - room: MatrixRoom, event: Union[RoomMessageText, RoomMessageNotice] + room: MatrixRoom, event: Union[RoomMessageText, RoomMessageNotice, ReactionEvent, RoomMessageEmote] ) -> None: + """ + Handle new messages and reactions in Matrix. For reactions, we ensure that when relaying back + to Meshtastic, we always apply our local meshnet_name to outgoing events. + + We must be careful not to relay reactions to reactions (reaction-chains), + especially remote reactions that got relayed into the room as m.emote events, + as we do not store them in the database. If we can't find the original message in the DB, + it likely means it's a reaction to a reaction, and we stop there. + """ + from db_utils import store_message_map full_display_name = "Unknown user" message_timestamp = event.server_timestamp - # We do not relay the past + # We do not relay messages that occurred before the bot started if message_timestamp < bot_start_time: return + # Find the room_config that matches this room, if any room_config = None for config in matrix_rooms: if config["id"] == room.room_id: room_config = config break - # Only relay supported rooms + # Only proceed if the room is supported if not room_config: return - text = event.body.strip() + relates_to = event.source["content"].get("m.relates_to") + is_reaction = False + reaction_emoji = None + original_matrix_event_id = None + + # Check if this is a Matrix ReactionEvent (usually m.reaction) + if isinstance(event, ReactionEvent): + # This is a reaction event + is_reaction = True + logger.debug(f"Processing Matrix reaction event: {event.source}") + if relates_to and "event_id" in relates_to and "key" in relates_to: + # Extract the reaction emoji and the original event it relates to + reaction_emoji = relates_to["key"] + original_matrix_event_id = relates_to["event_id"] + logger.debug(f"Original matrix event ID: {original_matrix_event_id}, Reaction emoji: {reaction_emoji}") + + # Check if this is a Matrix RoomMessageEmote (m.emote) + if isinstance(event, RoomMessageEmote): + logger.debug(f"Processing Matrix reaction event: {event.source}") + # For RoomMessageEmote, treat as remote reaction if meshtastic_replyId exists + is_reaction = True + # We need to manually extract the reaction emoji from the body + reaction_body = event.source["content"].get("body", "") + reaction_match = re.search(r"reacted (.+?) to", reaction_body) + reaction_emoji = reaction_match.group(1).strip() if reaction_match else "?" + + text = event.body.strip() if (not is_reaction and hasattr(event, "body")) else "" longname = event.source["content"].get("meshtastic_longname") shortname = event.source["content"].get("meshtastic_shortname", None) meshnet_name = event.source["content"].get("meshtastic_meshnet") + meshtastic_replyId = event.source["content"].get("meshtastic_replyId") suppress = event.source["content"].get("mmrelay_suppress") - local_meshnet_name = relay_config["meshtastic"]["meshnet_name"] - # Do not process + # Retrieve the relay_reactions option from config + relay_reactions = relay_config["meshtastic"].get("relay_reactions", True) + + # If a message has suppress flag, do not process if suppress: return + # If this is a reaction and relay_reactions is False, do nothing + if is_reaction and not relay_reactions: + logger.debug("Reaction event encountered but relay_reactions is disabled. Doing nothing.") + return + + local_meshnet_name = relay_config["meshtastic"]["meshnet_name"] + + # If this is a reaction and relay_reactions is True, attempt to relay it + if is_reaction and relay_reactions: + # Check if we need to relay a reaction from a remote meshnet to our local meshnet. + # If meshnet_name != local_meshnet_name and meshtastic_replyId is present and this is an emote, + # it's a remote reaction that needs to be forwarded as a text message describing the reaction. + if meshnet_name and meshnet_name != local_meshnet_name and meshtastic_replyId and isinstance(event, RoomMessageEmote): + logger.info(f"Relaying reaction from remote meshnet: {meshnet_name}") + + short_meshnet_name = meshnet_name[:4] + + # Format the reaction message for relaying to the local meshnet. + # The necessary information is in the m.emote event + if not shortname: + shortname = longname[:3] if longname else "???" + + # Use meshtastic_text from content if available, this is the original message text from the remote mesh + meshtastic_text_db = event.source["content"].get("meshtastic_text", "") + meshtastic_text_db = meshtastic_text_db.replace('\n', ' ').replace('\r', ' ') + abbreviated_text = meshtastic_text_db[:40] + "..." if len(meshtastic_text_db) > 40 else meshtastic_text_db + + reaction_message = f"{shortname}/{short_meshnet_name} reacted {reaction_emoji} to \"{abbreviated_text}\"" + + # Relay the remote reaction to the local meshnet. + meshtastic_interface = connect_meshtastic() + from meshtastic_utils import logger as meshtastic_logger + meshtastic_channel = room_config["meshtastic_channel"] + + if relay_config["meshtastic"]["broadcast_enabled"]: + meshtastic_logger.info( + f"Relaying reaction from remote meshnet {meshnet_name} to radio broadcast" + ) + logger.debug(f"Sending reaction to Meshtastic with meshnet={local_meshnet_name}: {reaction_message}") + meshtastic_interface.sendText( + text=reaction_message, channelIndex=meshtastic_channel + ) + # We've relayed the remote reaction to our local mesh, so we're done. + return + + # If original_matrix_event_id is set, this is a reaction to some other matrix event + if original_matrix_event_id: + orig = get_message_map_by_matrix_event_id(original_matrix_event_id) + if not orig: + # If we don't find the original message in the DB, we suspect it's a reaction to a reaction or + # something we never recorded. In either case, we do not forward. + logger.debug("Original message for reaction not found in DB. Possibly a reaction-to-reaction scenario. Not forwarding.") + return + + # orig = (meshtastic_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) + meshtastic_id, matrix_room_id, meshtastic_text_db, meshtastic_meshnet_db = orig + display_name_response = await matrix_client.get_displayname(event.sender) + full_display_name = display_name_response.displayname or event.sender + + # If not from a remote meshnet, proceed as normal to relay back to the originating meshnet + short_display_name = full_display_name[:5] + prefix = f"{short_display_name}[M]: " + abbreviated_text = meshtastic_text_db[:40] + "..." if len(meshtastic_text_db) > 40 else meshtastic_text_db + + # Always use our local meshnet_name for outgoing events + reaction_message = f"{prefix}reacted {reaction_emoji} to \"{abbreviated_text}\"" + meshtastic_interface = connect_meshtastic() + from meshtastic_utils import logger as meshtastic_logger + meshtastic_channel = room_config["meshtastic_channel"] + + if relay_config["meshtastic"]["broadcast_enabled"]: + meshtastic_logger.info( + f"Relaying reaction from {full_display_name} to radio broadcast" + ) + logger.debug(f"Sending reaction to Meshtastic with meshnet={local_meshnet_name}: {reaction_message}") + meshtastic_interface.sendText( + text=reaction_message, channelIndex=meshtastic_channel + ) + return + + # For Matrix->Mesh messages from a remote meshnet, rewrite the message format if longname and meshnet_name: + # Always include the meshnet_name in the full display name. full_display_name = f"{longname}/{meshnet_name}" + if meshnet_name != local_meshnet_name: - logger.info(f"Processing message from remote meshnet: {text}") + # A message from a remote meshnet relayed into Matrix, now going back out + logger.info(f"Processing message from remote meshnet: {meshnet_name}") short_meshnet_name = meshnet_name[:4] - # If shortname is None, truncate the longname to 3 characters + # If shortname is not available, derive it from the longname if shortname is None: - shortname = longname[:3] - prefix = f"{shortname}/{short_meshnet_name}: " + shortname = longname[:3] if longname else "???" + # Remove the original prefix "[longname/meshnet]: " to avoid double-tagging text = re.sub( rf"^\[{full_display_name}\]: ", "", text - ) # Remove the original prefix from the text + ) text = truncate_message(text) - full_message = f"{prefix}{text}" + full_message = f"{shortname}/{short_meshnet_name}: {text}" else: - # This is a message from a local user, it should be ignored no log is needed + # If this message is from our local meshnet (loopback), we ignore it return - else: + # Normal Matrix message from a Matrix user display_name_response = await matrix_client.get_displayname(event.sender) full_display_name = display_name_response.displayname or event.sender short_display_name = full_display_name[:5] @@ -228,9 +384,8 @@ async def on_room_message( text = truncate_message(text) # Plugin functionality - from plugin_loader import load_plugins # Import here to avoid circular imports - - plugins = load_plugins() # Load plugins within the function + from plugin_loader import load_plugins + plugins = load_plugins() found_matching_plugin = False for plugin in plugins: @@ -251,49 +406,58 @@ async def on_room_message( if is_command: break + # If this is a command, we do not send it to the mesh if is_command: logger.debug("Message is a command, not sending to mesh") return + # Connect to Meshtastic meshtastic_interface = connect_meshtastic() from meshtastic_utils import logger as meshtastic_logger meshtastic_channel = room_config["meshtastic_channel"] + # If message is from Matrix and broadcast_enabled is True, relay to Meshtastic if not found_matching_plugin and event.sender != bot_user_id: if relay_config["meshtastic"]["broadcast_enabled"]: + portnum = event.source["content"].get("meshtastic_portnum") if ( - event.source["content"].get("meshtastic_portnum") - == "DETECTION_SENSOR_APP" + portnum == "DETECTION_SENSOR_APP" ): + # If detection_sensor is enabled, forward this data as detection sensor data if relay_config["meshtastic"].get("detection_sensor", False): - meshtastic_interface.sendData( + sent_packet = meshtastic_interface.sendData( data=full_message.encode("utf-8"), channelIndex=meshtastic_channel, portNum=meshtastic.protobuf.portnums_pb2.PortNum.DETECTION_SENSOR_APP, ) + # If we got a packet with an id and it's not a reaction, store mapping + if sent_packet and hasattr(sent_packet, 'id'): + store_message_map(sent_packet.id, event.event_id, room.room_id, text, meshtastic_meshnet=local_meshnet_name) else: meshtastic_logger.debug( - f"Detection sensor packet received from {full_display_name}, " - + "but detection sensor processing is disabled." + f"Detection sensor packet received from {full_display_name}, but detection sensor processing is disabled." ) else: meshtastic_logger.info( f"Relaying message from {full_display_name} to radio broadcast" ) - meshtastic_interface.sendText( + sent_packet = meshtastic_interface.sendText( text=full_message, channelIndex=meshtastic_channel ) - + if sent_packet and hasattr(sent_packet, 'id'): + store_message_map(sent_packet.id, event.event_id, room.room_id, text, meshtastic_meshnet=local_meshnet_name) else: logger.debug( f"Broadcast not supported: Message from {full_display_name} dropped." ) - async def upload_image( client: AsyncClient, image: Image.Image, filename: str ) -> UploadResponse: + """ + Uploads an image to Matrix and returns the UploadResponse containing the content URI. + """ buffer = io.BytesIO() image.save(buffer, format="PNG") image_data = buffer.getvalue() @@ -307,10 +471,12 @@ async def upload_image( return response - async def send_room_image( client: AsyncClient, room_id: str, upload_response: UploadResponse ): + """ + Sends an already uploaded image to the specified room. + """ await client.room_send( room_id=room_id, message_type="m.room.message", diff --git a/meshtastic_utils.py b/meshtastic_utils.py index bfa9280..ed8520b 100644 --- a/meshtastic_utils.py +++ b/meshtastic_utils.py @@ -12,7 +12,7 @@ from pubsub import pub from config import relay_config -from db_utils import get_longname, get_shortname, save_longname, save_shortname +from db_utils import get_longname, get_shortname, save_longname, save_shortname, get_message_map_by_meshtastic_id from log_utils import get_logger # Do not import plugin_loader here to avoid circular imports @@ -20,43 +20,33 @@ # Extract matrix rooms configuration matrix_rooms: List[dict] = relay_config["matrix_rooms"] -# Initialize logger +# Initialize logger for Meshtastic logger = get_logger(name="Meshtastic") -# Global variables +# Global variables for the Meshtastic connection and event loop management meshtastic_client = None event_loop = None # Will be set from main.py -meshtastic_lock = threading.Lock() # To prevent race conditions +meshtastic_lock = threading.Lock() # To prevent race conditions on meshtastic_client access reconnecting = False shutting_down = False reconnect_task = None # To keep track of the reconnect task - def serial_port_exists(port_name): """ Check if the specified serial port exists. - - Args: - port_name (str): The name of the serial port (e.g., 'COM15' or '/dev/ttyACM0'). - - Returns: - bool: True if the port exists, False otherwise. + This prevents attempting connections on non-existent ports. """ ports = [port.device for port in serial.tools.list_ports.comports()] return port_name in ports - def connect_meshtastic(force_connect=False): """ Establish a connection to the Meshtastic device. - - Args: - force_connect (bool): If True, forces a new connection even if one exists. - - Returns: - The Meshtastic client interface or None if connection fails. + Attempts a connection based on connection_type (serial/ble/network). + Retries until successful or shutting_down is set. + If already connected and not force_connect, returns the existing client. """ global meshtastic_client, shutting_down if shutting_down: @@ -67,7 +57,7 @@ def connect_meshtastic(force_connect=False): if meshtastic_client and not force_connect: return meshtastic_client - # Ensure previous connection is closed + # Close previous connection if exists if meshtastic_client: try: meshtastic_client.close() @@ -75,9 +65,9 @@ def connect_meshtastic(force_connect=False): logger.warning(f"Error closing previous connection: {e}") meshtastic_client = None - # Initialize Meshtastic interface based on connection type + # Determine connection type and attempt connection connection_type = relay_config["meshtastic"]["connection_type"] - retry_limit = 0 # 0 for infinite retries + retry_limit = 0 # 0 means infinite retries attempts = 1 successful = False @@ -88,10 +78,11 @@ def connect_meshtastic(force_connect=False): ): try: if connection_type == "serial": + # Serial connection serial_port = relay_config["meshtastic"]["serial_port"] logger.info(f"Connecting to serial port {serial_port} ...") - # Check if serial port exists + # Check if serial port exists before connecting if not serial_port_exists(serial_port): logger.warning( f"Serial port {serial_port} does not exist. Waiting..." @@ -103,7 +94,9 @@ def connect_meshtastic(force_connect=False): meshtastic_client = meshtastic.serial_interface.SerialInterface( serial_port ) + elif connection_type == "ble": + # BLE connection ble_address = relay_config["meshtastic"].get("ble_address") if ble_address: logger.info(f"Connecting to BLE address {ble_address} ...") @@ -116,7 +109,9 @@ def connect_meshtastic(force_connect=False): else: logger.error("No BLE address provided.") return None + else: + # Network (TCP) connection target_host = relay_config["meshtastic"]["host"] logger.info(f"Connecting to host {target_host} ...") meshtastic_client = meshtastic.tcp_interface.TCPInterface( @@ -129,7 +124,7 @@ def connect_meshtastic(force_connect=False): f"Connected to {nodeInfo['user']['shortName']} / {nodeInfo['user']['hwModel']}" ) - # Subscribe to message events + # Subscribe to message and connection lost events pub.subscribe(on_meshtastic_message, "meshtastic.receive") pub.subscribe( on_lost_meshtastic_connection, "meshtastic.connection.lost" @@ -146,7 +141,7 @@ def connect_meshtastic(force_connect=False): break attempts += 1 if retry_limit == 0 or attempts <= retry_limit: - wait_time = min(attempts * 2, 30) # Cap wait time to 30 seconds + wait_time = min(attempts * 2, 30) # Exponential backoff capped at 30s logger.warning( f"Attempt #{attempts - 1} failed. Retrying in {wait_time} secs: {e}" ) @@ -157,10 +152,10 @@ def connect_meshtastic(force_connect=False): return meshtastic_client - def on_lost_meshtastic_connection(interface=None): """ - Callback function invoked when the Meshtastic connection is lost. + Callback invoked when the Meshtastic connection is lost. + Initiates a reconnect sequence unless shutting_down is True. """ global meshtastic_client, reconnecting, shutting_down, event_loop, reconnect_task with meshtastic_lock: @@ -191,10 +186,10 @@ def on_lost_meshtastic_connection(interface=None): if event_loop: reconnect_task = asyncio.run_coroutine_threadsafe(reconnect(), event_loop) - async def reconnect(): """ - Asynchronously attempts to reconnect to the Meshtastic device with exponential backoff. + Asynchronously attempts to reconnect with exponential backoff. + Stops if shutting_down is set. """ global meshtastic_client, reconnecting, shutting_down backoff_time = 10 @@ -224,17 +219,22 @@ async def reconnect(): finally: reconnecting = False - def on_meshtastic_message(packet, interface): - # Filter out TEXT_MESSAGE_APP packets with emoji or replyId + """ + Handle incoming Meshtastic messages. For reaction messages, we now ensure that when relaying to Matrix, + we use the original message's meshtastic_meshnet from the DB so that remote-originated messages can properly + have their reactions relayed across multiple meshnets. + """ + # Apply reaction filtering based on config + relay_reactions = relay_config["meshtastic"].get("relay_reactions", True) + + # If relay_reactions is False, filter out reaction/tapback packets if packet.get('decoded', {}).get('portnum') == 'TEXT_MESSAGE_APP': decoded = packet.get('decoded', {}) - if 'emoji' in decoded or 'replyId' in decoded: - logger.debug('Filtered out reaction/tapback packet.') + if not relay_reactions and ('emoji' in decoded or 'replyId' in decoded): + logger.debug('Filtered out reaction/tapback packet due to relay_reactions=false.') return - """ - Handle incoming Meshtastic messages and relay them to Matrix. - """ + from matrix_utils import matrix_relay global event_loop @@ -254,23 +254,67 @@ def on_meshtastic_message(packet, interface): decoded = packet.get("decoded", {}) text = decoded.get("text") + replyId = decoded.get("replyId") + emoji_flag = 'emoji' in decoded and decoded['emoji'] == 1 - # Determine if the message is a direct message - myId = interface.myInfo.my_node_num # Get relay's own node number + # Determine if this is a direct message to the relay node from meshtastic.mesh_interface import BROADCAST_NUM + myId = interface.myInfo.my_node_num if toId == myId: is_direct_message = True elif toId == BROADCAST_NUM: is_direct_message = False else: - # Message to someone else; we may ignore it + # Message to someone else; ignoring for broadcasting logic is_direct_message = False + meshnet_name = relay_config["meshtastic"]["meshnet_name"] + + # Reaction handling (Meshtastic -> Matrix) + # If replyId and emoji_flag are present and relay_reactions is True, we relay as text reactions in Matrix + if replyId and emoji_flag and relay_reactions: + longname = get_longname(sender) or str(sender) + shortname = get_shortname(sender) or str(sender) + orig = get_message_map_by_meshtastic_id(replyId) + if orig: + # orig = (matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet) + matrix_event_id, matrix_room_id, meshtastic_text, meshtastic_meshnet = orig + abbreviated_text = meshtastic_text[:40] + "..." if len(meshtastic_text) > 40 else meshtastic_text + + # Ensure that meshnet_name is always included, using the our own meshnet for accuracy. + full_display_name = f"{longname}/{meshnet_name}" + + reaction_symbol = text.strip() if (text and text.strip()) else '⚠️' + reaction_message = f"\n [{full_display_name}] reacted {reaction_symbol} to \"{abbreviated_text}\"" + + # Relay the reaction as emote to Matrix, preserving the original meshnet name + asyncio.run_coroutine_threadsafe( + matrix_relay( + matrix_room_id, + reaction_message, + longname, + shortname, + meshnet_name, + decoded.get("portnum"), + meshtastic_id=packet.get("id"), + meshtastic_replyId=replyId, + meshtastic_text=meshtastic_text, + emote=True, + emoji=True + ), + loop=loop, + ) + else: + logger.debug("Original message for reaction not found in DB.") + return + + # Normal text messages or detection sensor messages if text: - # Determine the channel + # Determine the channel for this message channel = packet.get("channel") if channel is None: + # If channel not specified, deduce from portnum if ( decoded.get("portnum") == "TEXT_MESSAGE_APP" or decoded.get("portnum") == 1 @@ -284,7 +328,7 @@ def on_meshtastic_message(packet, interface): ) return - # Check if the channel is mapped to a Matrix room + # Check if channel is mapped to a Matrix room channel_mapped = False for room in matrix_rooms: if room["meshtastic_channel"] == channel: @@ -294,20 +338,19 @@ def on_meshtastic_message(packet, interface): if not channel_mapped: logger.debug(f"Skipping message from unmapped channel {channel}") return - if decoded.get("portnum") == "DETECTION_SENSOR_APP" and not relay_config[ - "meshtastic" - ].get("detection_sensor", False): + + # If detection_sensor is disabled and this is a detection sensor packet, skip it + if decoded.get("portnum") == "DETECTION_SENSOR_APP" and not relay_config["meshtastic"].get("detection_sensor", False): logger.debug( "Detection sensor packet received, but detection sensor processing is disabled." ) return - # Attempt to get longname from database + # Attempt to get longname/shortname from database or nodes longname = get_longname(sender) shortname = get_shortname(sender) if not longname or not shortname: - # Try to get node info from interface.nodes node = interface.nodes.get(sender) if node: user = node.get("user") @@ -321,23 +364,19 @@ def on_meshtastic_message(packet, interface): if shortname: save_shortname(sender, shortname) else: - # Node info not available yet logger.debug(f"Node info for sender {sender} not available yet.") - # If still not available, use sender ID as longname and shortname + # If still not available, fallback to sender ID if not longname: longname = str(sender) if not shortname: shortname = str(sender) - meshnet_name = relay_config["meshtastic"]["meshnet_name"] - formatted_message = f"[{longname}/{meshnet_name}]: {text}" - # Plugin functionality - from plugin_loader import load_plugins # Import here to avoid circular imports - - plugins = load_plugins() # Load plugins within the function + # Plugin functionality - Check if any plugin handles this message before relaying + from plugin_loader import load_plugins + plugins = load_plugins() found_matching_plugin = False for plugin in plugins: @@ -352,8 +391,7 @@ def on_meshtastic_message(packet, interface): if found_matching_plugin: logger.debug(f"Processed by plugin {plugin.plugin_name}") - # **Added DM Check Here** - # If the message is a DM or handled by a plugin, do not relay it to Matrix + # If message is a DM or handled by plugin, do not relay further if is_direct_message: logger.debug( f"Received a direct message from {longname}. Not relaying to Matrix." @@ -363,14 +401,15 @@ def on_meshtastic_message(packet, interface): logger.debug("Message was handled by a plugin. Not relaying to Matrix.") return + # Relay the message to all Matrix rooms mapped to this channel logger.info( f"Processing inbound radio message from {sender} on channel {channel}" ) logger.info(f"Relaying Meshtastic message from {longname} to Matrix") - - # Relay message to Matrix rooms for room in matrix_rooms: if room["meshtastic_channel"] == channel: + # For inbound meshtastic->matrix messages, store meshnet_name in message_map + # so that future reactions can properly identify origin. asyncio.run_coroutine_threadsafe( matrix_relay( room["id"], @@ -379,14 +418,15 @@ def on_meshtastic_message(packet, interface): shortname, meshnet_name, decoded.get("portnum"), + meshtastic_id=packet.get("id"), + meshtastic_text=text ), loop=loop, ) else: - # Handle non-text messages via plugins + # Non-text messages via plugins portnum = decoded.get("portnum") - from plugin_loader import load_plugins # Import here to avoid circular imports - + from plugin_loader import load_plugins plugins = load_plugins() found_matching_plugin = False for plugin in plugins: @@ -406,27 +446,26 @@ def on_meshtastic_message(packet, interface): f"Processed {portnum} with plugin {plugin.plugin_name}" ) - async def check_connection(): """ - Periodically checks the Meshtastic connection and attempts to reconnect if lost. + Periodically checks the Meshtastic connection by sending a ping. + If an error occurs, it attempts to reconnect. """ global meshtastic_client, shutting_down connection_type = relay_config["meshtastic"]["connection_type"] while not shutting_down: if meshtastic_client: try: - # Send a ping to check the connection meshtastic_client.sendPing() except Exception as e: logger.error(f"{connection_type.capitalize()} connection lost: {e}") on_lost_meshtastic_connection(meshtastic_client) await asyncio.sleep(5) # Check connection every 5 seconds - if __name__ == "__main__": + # If running this standalone (normally the main.py does the loop), just try connecting and run forever. meshtastic_client = connect_meshtastic() loop = asyncio.get_event_loop() - event_loop = loop # Set the event loop + event_loop = loop # Set the event loop for use in callbacks loop.create_task(check_connection()) - loop.run_forever() \ No newline at end of file + loop.run_forever() diff --git a/sample_config.yaml b/sample_config.yaml index ab577eb..d715350 100644 --- a/sample_config.yaml +++ b/sample_config.yaml @@ -18,6 +18,7 @@ meshtastic: broadcast_enabled: true # Must be set to true to enable Matrix to Meshtastic messages detection_sensor: true # Must be set to true to forward messages of Meshtastic's detection sensor module plugin_response_delay: 3 # Default response delay in seconds for plugins that respond on the mesh; + relay_reactions: true # Defaults to true, set to false to filter out reaction packets as before logging: level: info @@ -39,7 +40,7 @@ plugins: active: true # Does not need to specify channels, as it's a Matrix-only plugin -#community-plugins: # Note: Community plugins are a new feature. Please report any issues. +#community-plugins: # sample_plugin: # active: true # repository: https://github.com/username/sample_plugin.git