Skip to content

Commit

Permalink
Tests: improve coverage of NodeScanner (christiansandberg#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
erlend-aasland authored Aug 6, 2024
1 parent ecf216a commit d1c28e5
Showing 1 changed file with 66 additions and 6 deletions.
72 changes: 66 additions & 6 deletions test/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,73 @@ def periodicity():


class TestScanner(unittest.TestCase):
TIMEOUT = 0.1

def test_passive_scanning(self):
scanner = canopen.network.NodeScanner()
scanner.on_message_received(0x586)
scanner.on_message_received(0x587)
scanner.on_message_received(0x586)
self.assertListEqual(scanner.nodes, [6, 7])
def setUp(self):
self.scanner = canopen.network.NodeScanner()

def test_scanner_on_message_received(self):
# Emergency frames should be recognized.
self.scanner.on_message_received(0x081)
# Heartbeats should be recognized.
self.scanner.on_message_received(0x703)
# Tx PDOs should be recognized, but not Rx PDOs.
self.scanner.on_message_received(0x185)
self.scanner.on_message_received(0x206)
self.scanner.on_message_received(0x287)
self.scanner.on_message_received(0x308)
self.scanner.on_message_received(0x389)
self.scanner.on_message_received(0x40a)
self.scanner.on_message_received(0x48b)
self.scanner.on_message_received(0x50c)
# SDO responses from .search() should be recognized,
# but not SDO requests.
self.scanner.on_message_received(0x58d)
self.scanner.on_message_received(0x50e)
self.assertListEqual(self.scanner.nodes, [1, 3, 5, 7, 9, 11, 13])

def test_scanner_reset(self):
self.scanner.nodes = [1, 2, 3] # Mock scan.
self.scanner.reset()
self.assertListEqual(self.scanner.nodes, [])

def test_scanner_search_no_network(self):
with self.assertRaisesRegex(RuntimeError, "Network is required"):
self.scanner.search()

def test_scanner_search(self):
bus = can.Bus(interface="virtual", receive_own_messages=True)
net = canopen.Network(bus)
net.connect()
self.addCleanup(net.disconnect)

self.scanner.network = net
self.scanner.search()

payload = bytes([64, 0, 16, 0, 0, 0, 0, 0])
acc = [bus.recv(self.TIMEOUT) for _ in range(127)]
for node_id, msg in enumerate(acc, start=1):
with self.subTest(node_id=node_id):
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, 0x600 + node_id)
self.assertEqual(msg.data, payload)
# Check that no spurious packets were sent.
self.assertIsNone(bus.recv(self.TIMEOUT))

def test_scanner_search_limit(self):
bus = can.Bus(interface="virtual", receive_own_messages=True)
net = canopen.Network(bus)
net.connect()
self.addCleanup(net.disconnect)

self.scanner.network = net
self.scanner.search(limit=1)

msg = bus.recv(self.TIMEOUT)
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, 0x601)
# Check that no spurious packets were sent.
self.assertIsNone(bus.recv(self.TIMEOUT))


if __name__ == "__main__":
Expand Down

0 comments on commit d1c28e5

Please sign in to comment.