Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiver ping #1034

Merged
merged 4 commits into from
Mar 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 90 additions & 18 deletions spinn_front_end_common/utilities/connections/live_event_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,26 @@
# limitations under the License.

import logging
from threading import Thread
import struct
from threading import Thread, Condition
from time import sleep
from spinn_utilities.log import FormatAdapter
from spinnman.messages.eieio.data_messages import (
EIEIODataMessage, KeyPayloadDataElement)
from spinnman.messages.eieio import EIEIOType, AbstractEIEIOMessage
from spinnman.connections import ConnectionListener
from spinnman.connections.udp_packet_connections import EIEIOConnection
from spinnman.connections.udp_packet_connections import (
EIEIOConnection, UDPConnection)
from spinnman.messages.sdp.sdp_flag import SDPFlag
from spinnman.constants import SCP_SCAMP_PORT
from spinnman.utilities.utility_functions import send_port_trigger_message
from spinnman.messages.sdp.sdp_message import SDPMessage
from spinnman.messages.sdp.sdp_header import SDPHeader
from spinnman.utilities.utility_functions import reprogram_tag_to_listener
from spinnman.messages.eieio import (
read_eieio_command_message, read_eieio_data_message)
from spinn_front_end_common.utilities.constants import NOTIFY_PORT
from spinn_front_end_common.utilities.database import DatabaseConnection
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spinn_front_end_common.utilities.utility_calls import retarget_tag

logger = FormatAdapter(logging.getLogger(__name__))

Expand All @@ -41,6 +45,24 @@
# The maximum number of 32-bit keys with payloads that will fit in a packet
_MAX_FULL_KEYS_PAYLOADS_PER_PACKET = 31

# Decoding of a single short value
_ONE_SHORT = struct.Struct("<H")

# The size of a RAW SCP OK response (includes 2 bytes of padding)
_SCP_OK_SIZE = 18

# The byte of the RAW SCP packet that contains the flags
_SCP_FLAGS_BYTE = 2

# The byte of the RAW SCP packet that contains the destination cpu
_SCP_DEST_CPU_BYTE = 4

# The expected flags from a RAW SCP packet in response
_SCP_RESPONSE_FLAGS = 7

# The expected destination cpu from a RAW SCP packet in repsonse
_SCP_RESPONSE_DEST = 0xFF


class LiveEventConnection(DatabaseConnection):
""" A connection for receiving and sending live events from and to\
Expand All @@ -65,7 +87,12 @@ class LiveEventConnection(DatabaseConnection):
"__sender_connection",
"__start_resume_callbacks",
"__simulator",
"__spalloc_job"]
"__spalloc_job",
"__receiver_details",
"__is_running",
"__expect_scp_response",
"__expect_scp_response_lock",
"__scp_response_received"]

def __init__(self, live_packet_gather_label, receive_labels=None,
send_labels=None, local_host=None, local_port=NOTIFY_PORT):
Expand Down Expand Up @@ -105,6 +132,7 @@ def __init__(self, live_packet_gather_label, receive_labels=None,
self.__start_resume_callbacks = dict()
self.__pause_stop_callbacks = dict()
self.__init_callbacks = dict()
self.__receiver_details = list()
if receive_labels is not None:
for label in receive_labels:
self.__live_event_callbacks.append(list())
Expand All @@ -119,6 +147,10 @@ def __init__(self, live_packet_gather_label, receive_labels=None,
self.__receiver_listener = None
self.__receiver_connection = None
self.__error_keys = set()
self.__is_running = False
self.__expect_scp_response = False
self.__expect_scp_response_lock = Condition()
self.__scp_response_received = None

def add_send_label(self, label):
if self.__send_labels is None:
Expand Down Expand Up @@ -272,8 +304,7 @@ def __init_receivers(self, db, vertex_sizes):
if job:
self.__receiver_connection = job.open_listener_connection()
else:
self.__receiver_connection = EIEIOConnection()
indirect = hasattr(self.__receiver_connection, "update_tag")
self.__receiver_connection = UDPConnection()
receivers = set()
for label_id, label in enumerate(self.__receive_labels):
_, port, board_address, tag, x, y = self.__get_live_output_details(
Expand All @@ -282,15 +313,7 @@ def __init_receivers(self, db, vertex_sizes):
# Update the tag if not already done
if (board_address, port, tag) not in receivers:
receivers.add((board_address, port, tag))
if indirect:
self.__receiver_connection.update_tag(x, y, tag)
# No port trigger necessary; proxied already
else:
retarget_tag(
self.__receiver_connection, x, y, tag,
ip_address=board_address)
send_port_trigger_message(
self.__receiver_connection, board_address)
self.__receiver_details.append((x, y, tag, board_address))

logger.info(
"Listening for traffic from {} on board {} on {}:{}",
Expand Down Expand Up @@ -358,20 +381,68 @@ def __do_start_resume(self):
for label, callbacks in self.__start_resume_callbacks.items():
for callback in callbacks:
self.__launch_thread("start_resume", label, callback)
self.__is_running = True
thread = Thread(target=self.__send_tag_messages)
thread.start()

def __do_stop_pause(self):
self.__is_running = False
for label, callbacks in self.__pause_stop_callbacks.items():
for callback in callbacks:
self.__launch_thread("pause_stop", label, callback)

def __do_receive_packet(self, packet):
# pylint: disable=broad-except
def __send_tag_messages(self):
if self.__receiver_connection is None:
return
indirect = hasattr(self.__receiver_connection, "update_tag")
while self.__is_running:
for (x, y, tag, board_address) in self.__receiver_details:
with self.__expect_scp_response_lock:
self.__scp_response_received = None
self.__expect_scp_response = True
if indirect:
self.__receiver_connection.update_tag(
x, y, tag, do_receive=False)
# No port trigger necessary; proxied already
else:
reprogram_tag_to_listener(
self.__receiver_connection, x, y, board_address,
tag, read_response=False)
while (self.__scp_response_received is None and
self.__is_running):
self.__expect_scp_response_lock.wait(timeout=1.0)
sleep(10.0)

def __handle_scp_packet(self, data):
with self.__expect_scp_response_lock:
# SCP unexpected
if not self.__expect_scp_response:
return False

if (len(data) == _SCP_OK_SIZE and
data[_SCP_FLAGS_BYTE] == _SCP_RESPONSE_FLAGS and
data[_SCP_DEST_CPU_BYTE] == _SCP_RESPONSE_DEST):
self.__scp_response_received = data
self.__expect_scp_response = False
return True
return False

def __do_receive_packet(self, data):
if self.__handle_scp_packet(data):
return

logger.debug("Received packet")
try:
header = _ONE_SHORT.unpack_from(data)[0]
if header & 0xC000 == 0x4000:
return read_eieio_command_message(data, 0)
packet = read_eieio_data_message(data, 0)
if packet.eieio_header.is_time:
self.__handle_time_packet(packet)
else:
self.__handle_no_time_packet(packet)

# pylint: disable=broad-except
except Exception:
logger.warning("problem handling received packet", exc_info=True)

Expand Down Expand Up @@ -550,5 +621,6 @@ def _send(self, message: AbstractEIEIOMessage, x, y, p, ip_address):
(ip_address, SCP_SCAMP_PORT))

def close(self):
self.__is_running = False
self.__handle_possible_rerun_state()
super().close()