#!/usr/bin/env python3 """ Integration test for the Bridge Daemon. Requires a running MQTT broker on localhost:1883 (e.g., Mosquitto from backend docker-compose). """ import json import os import sys import time import unittest from datetime import datetime, timezone from unittest.mock import MagicMock, patch import paho.mqtt.client as mqtt # Ensure src is importable sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) os.environ.setdefault("MQTT_HOST", "localhost") os.environ.setdefault("MQTT_PORT", "1883") os.environ.setdefault("MESHTASTIC_DEVICE", "/dev/fake") os.environ.setdefault("MESHTASTIC_HOST", "") os.environ.setdefault("GATEWAY_NODE_ID", "!testgateway") from main import BridgeDaemon import config class TestBridgeDaemon(unittest.TestCase): @classmethod def setUpClass(cls): cls.mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) cls.mqtt_client.connect("localhost", 1883, 60) cls.mqtt_client.loop_start() cls.inbound_msgs = [] cls.ingest_msgs = [] def on_message(client, userdata, msg): payload = json.loads(msg.payload.decode()) if msg.topic == "kosmo/mesh/inbound": cls.inbound_msgs.append(payload) elif msg.topic == "kosmo/ingest/enviro": cls.ingest_msgs.append(payload) cls.mqtt_client.on_message = on_message cls.mqtt_client.subscribe("kosmo/mesh/inbound") cls.mqtt_client.subscribe("kosmo/ingest/enviro") @classmethod def tearDownClass(cls): cls.mqtt_client.loop_stop() cls.mqtt_client.disconnect() def test_outbound_mesh_injection(self): """MQTT -> Mesh: outbound message should trigger sendText on the mock interface.""" mock_iface = MagicMock() mock_iface.myInfo = MagicMock() mock_iface.myInfo.my_node_num = 0xDEADBEEF with patch("main.MeshtasticClient") as MockMesh: instance = MockMesh.return_value instance.iface = mock_iface instance.start = MagicMock() instance.stop = MagicMock() instance.send_text = MagicMock(return_value=True) daemon = BridgeDaemon() daemon.mesh = instance daemon.mqtt.start() time.sleep(1) # Publish outbound message outbound = {"message_id": "msg-123", "text": "Hello mesh"} self.mqtt_client.publish("kosmo/mesh/outbound/!a1b2c3d4", json.dumps(outbound)) time.sleep(1.5) instance.send_text.assert_called() args, kwargs = instance.send_text.call_args self.assertIn("[Web] Hello mesh", args[0]) self.assertEqual(kwargs.get("destination_id"), "!a1b2c3d4") daemon.mqtt.stop() def test_inbound_mesh_to_mqtt(self): """Mesh -> MQTT: text packet from mesh should appear on kosmo/mesh/inbound.""" mock_iface = MagicMock() mock_iface.myInfo = MagicMock() mock_iface.myInfo.my_node_num = 0xDEADBEEF with patch("main.MeshtasticClient") as MockMesh: instance = MockMesh.return_value instance.iface = mock_iface instance.start = MagicMock() instance.stop = MagicMock() daemon = BridgeDaemon() daemon.mesh = instance daemon.mqtt.start() time.sleep(0.5) # Simulate mesh text packet packet = { "fromId": "!deadbeef", "decoded": { "portnum": "TEXT_MESSAGE_APP", "payload": b"Hello from the woods", }, "hopLimit": 5, "hopStart": 7, "rxRssi": -88, "rxSnr": 9.5, } daemon._on_mesh_packet(packet) time.sleep(1) daemon.mqtt.stop() # Give MQTT a moment to flush time.sleep(0.5) self.assertTrue( any(m.get("source_node_id") == "!deadbeef" and m.get("text") == "Hello from the woods" for m in self.inbound_msgs), f"Expected inbound message not found. Got: {self.inbound_msgs}" ) def test_enviro_json_text_fallback(self): """Mesh -> MQTT: text packet containing enviro JSON should be routed to kosmo/ingest/enviro.""" mock_iface = MagicMock() mock_iface.myInfo = MagicMock() mock_iface.myInfo.my_node_num = 0xCAFEBABE with patch("main.MeshtasticClient") as MockMesh: instance = MockMesh.return_value instance.iface = mock_iface instance.start = MagicMock() instance.stop = MagicMock() daemon = BridgeDaemon() daemon.mesh = instance daemon.mqtt.start() time.sleep(0.5) enviro_payload = { "type": "enviro_reading", "node_id": "!enviro01", "payload": { "temperature_c": 21.5, "humidity_percent": 55.0, }, } packet = { "fromId": "!enviro01", "decoded": { "portnum": "TEXT_MESSAGE_APP", "payload": json.dumps(enviro_payload).encode(), }, "hopLimit": 3, "hopStart": 5, } daemon._on_mesh_packet(packet) time.sleep(1) daemon.mqtt.stop() time.sleep(0.5) self.assertTrue( any(m.get("node_id") == "!enviro01" and m.get("payload", {}).get("temperature_c") == 21.5 for m in self.ingest_msgs), f"Expected ingest message not found. Got: {self.ingest_msgs}" ) class TestMeshtasticClientConnection(unittest.TestCase): def test_serial_interface_used_by_default(self): with patch("meshtastic_client.SerialInterface") as MockSerial, \ patch("meshtastic_client.TCPInterface") as MockTCP: from meshtastic_client import MeshtasticClient client = MeshtasticClient(lambda p: None) client._connect() MockSerial.assert_called_once_with(devPath="/dev/fake") MockTCP.assert_not_called() client.stop() def test_tcp_interface_used_when_host_set(self): with patch.dict(os.environ, {"MESHTASTIC_HOST": "192.168.1.45", "MESHTASTIC_TCP_PORT": "4403"}, clear=False): # re-import config to pick up env change import importlib import config as cfg_module importlib.reload(cfg_module) with patch("meshtastic_client.SerialInterface") as MockSerial, \ patch("meshtastic_client.TCPInterface") as MockTCP: from meshtastic_client import MeshtasticClient client = MeshtasticClient(lambda p: None) client._connect() MockTCP.assert_called_once_with(hostname="192.168.1.45", portNumber=4403) MockSerial.assert_not_called() client.stop() if __name__ == "__main__": unittest.main()