diff --git a/.gitignore b/.gitignore index ed7a51646..b8df89eb8 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ SpiNNMan.egg-info .coverage .cache/ .pytest_cache/ +/scripts/ diff --git a/spinnman/connections/scp_request_pipeline.py b/spinnman/connections/scp_request_pipeline.py index 70f18f5bc..afee084ff 100644 --- a/spinnman/connections/scp_request_pipeline.py +++ b/spinnman/connections/scp_request_pipeline.py @@ -22,7 +22,7 @@ MAX_SEQUENCE = 65536 RETRY_CODES = frozenset([ SCPResult.RC_TIMEOUT, SCPResult.RC_P2P_TIMEOUT, SCPResult.RC_LEN, - SCPResult.RC_P2P_NOREPLY]) + SCPResult.RC_P2P_NOREPLY, SCPResult.RC_P2P_BUSY]) # Keep a global track of the sequence numbers used _next_sequence = 0 @@ -294,6 +294,7 @@ def _handle_receive_timeout(self): def _resend(self, seq, request_sent, reason): if self._retries[seq] <= 0: # Report timeouts as timeout exception + self._retry_reason[seq].append(reason) if all(reason == "timeout" for reason in self._retry_reason[seq]): raise SpinnmanTimeoutException( request_sent, diff --git a/spinnman/constants.py b/spinnman/constants.py index 1ccd0185c..9363c7555 100644 --- a/spinnman/constants.py +++ b/spinnman/constants.py @@ -213,6 +213,9 @@ class READ_TYPES(Enum): #: This is the default timeout when using SCP SCP_TIMEOUT = 1.0 +#: This is the default timeout when using SCP count (can take a bit longer) +SCP_TIMEOUT_COUNT = 5.0 + #: This is the default number of retries when using SCP N_RETRIES = 10 diff --git a/spinnman/messages/scp/enums/scp_command.py b/spinnman/messages/scp/enums/scp_command.py index d1cedd107..5c1131788 100644 --- a/spinnman/messages/scp/enums/scp_command.py +++ b/spinnman/messages/scp/enums/scp_command.py @@ -25,6 +25,7 @@ class SCPCommand(Enum): CMD_WRITE = (3, "Write SDRAM") CMD_APLX = 4 CMD_FILL = 5 + CMD_COUNT = (15, "Count the number of cores in a given state") CMD_REMAP = 16 CMD_LINK_READ = (17, "Read neighbouring chip's memory.") CMD_LINK_WRITE = (18, "Write neighbouring chip's memory.") diff --git a/spinnman/messages/scp/impl/count_state.py b/spinnman/messages/scp/impl/count_state.py index 59dcf3749..305c5118f 100644 --- a/spinnman/messages/scp/impl/count_state.py +++ b/spinnman/messages/scp/impl/count_state.py @@ -19,19 +19,6 @@ from spinnman.messages.sdp import SDPFlag, SDPHeader from .count_state_response import CountStateResponse -_ALL_CORE_MASK = 0xFFFF -_COUNT_OPERATION = 1 -_COUNT_MODE = 2 -_COUNT_SIGNAL_TYPE = 1 -_APP_MASK = 0xFF - - -def _get_data(app_id, state): - data = (_APP_MASK << 8) | app_id - data += (_COUNT_OPERATION << 22) | (_COUNT_MODE << 20) - data += state.value << 16 - return data - class CountState(AbstractSCPRequest): """ @@ -39,7 +26,7 @@ class CountState(AbstractSCPRequest): """ __slots__ = [] - def __init__(self, app_id, state): + def __init__(self, x, y, app_id, state): """ :param int app_id: The ID of the application, between 0 and 255 :param CPUState state: The state to count @@ -48,12 +35,11 @@ def __init__(self, app_id, state): SDPHeader( flags=SDPFlag.REPLY_EXPECTED, destination_port=0, destination_cpu=0, - destination_chip_x=self.DEFAULT_DEST_X_COORD, - destination_chip_y=self.DEFAULT_DEST_Y_COORD), - SCPRequestHeader(command=SCPCommand.CMD_SIG), - argument_1=_COUNT_SIGNAL_TYPE, - argument_2=_get_data(app_id, state), - argument_3=_ALL_CORE_MASK) + destination_chip_x=x, + destination_chip_y=y), + SCPRequestHeader(command=SCPCommand.CMD_COUNT), + argument_1=app_id, + argument_2=state.value) @overrides(AbstractSCPRequest.get_scp_response) def get_scp_response(self): diff --git a/spinnman/messages/scp/impl/count_state_response.py b/spinnman/messages/scp/impl/count_state_response.py index b55912b04..033df4526 100644 --- a/spinnman/messages/scp/impl/count_state_response.py +++ b/spinnman/messages/scp/impl/count_state_response.py @@ -37,7 +37,7 @@ def read_data_bytestring(self, data, offset): result = self.scp_response_header.result if result != SCPResult.RC_OK: raise SpinnmanUnexpectedResponseCodeException( - "CountState", "CMD_SIGNAL", result.name) + "CountState", "CMD_COUNT", result.name) self._count = _ONE_WORD.unpack_from(data, offset)[0] @property diff --git a/spinnman/messages/spinnaker_boot/boot_data/scamp.boot b/spinnman/messages/spinnaker_boot/boot_data/scamp.boot index 7d9cc841e..2cd695394 100644 Binary files a/spinnman/messages/spinnaker_boot/boot_data/scamp.boot and b/spinnman/messages/spinnaker_boot/boot_data/scamp.boot differ diff --git a/spinnman/messages/spinnaker_boot/system_variable_boot_values.py b/spinnman/messages/spinnaker_boot/system_variable_boot_values.py index e2fc3451a..20fc50804 100644 --- a/spinnman/messages/spinnaker_boot/system_variable_boot_values.py +++ b/spinnman/messages/spinnaker_boot/system_variable_boot_values.py @@ -141,7 +141,7 @@ class SystemVariableDefinition(Enum): _DataType.BYTE, offset=0x29, doc="Nearest-Neighbour retry parameter") link_peek_timeout_microseconds = _Definition( - _DataType.BYTE, offset=0x2a, default=100, + _DataType.BYTE, offset=0x2a, default=200, doc="The link peek/poke timeout in microseconds") led_half_period_10_ms = _Definition( _DataType.BYTE, offset=0x2b, default=1, diff --git a/spinnman/processes/__init__.py b/spinnman/processes/__init__.py index 3e11e3b14..edaf8c2b0 100644 --- a/spinnman/processes/__init__.py +++ b/spinnman/processes/__init__.py @@ -24,6 +24,7 @@ from .get_exclude_cpu_info_process import GetExcludeCPUInfoProcess from .get_include_cpu_info_process import GetIncludeCPUInfoProcess from .get_machine_process import GetMachineProcess +from .get_n_cores_in_state_process import GetNCoresInStateProcess from .get_routes_process import GetMultiCastRoutesProcess from .get_tags_process import GetTagsProcess from .get_version_process import GetVersionProcess @@ -49,7 +50,8 @@ "GetCPUInfoProcess", "GetExcludeCPUInfoProcess", "GetIncludeCPUInfoProcess", "GetHeapProcess", - "GetMachineProcess", "GetMultiCastRoutesProcess", "GetTagsProcess", + "GetMachineProcess", "GetMultiCastRoutesProcess", + "GetNCoresInStateProcess", "GetTagsProcess", "GetVersionProcess", "LoadFixedRouteRoutingEntryProcess", "LoadMultiCastRoutesProcess", "MallocSDRAMProcess", "ReadFixedRouteRoutingEntryProcess", "ReadIOBufProcess", diff --git a/spinnman/processes/application_copy_run_process.py b/spinnman/processes/application_copy_run_process.py index b9ee64ef0..69d8800c5 100644 --- a/spinnman/processes/application_copy_run_process.py +++ b/spinnman/processes/application_copy_run_process.py @@ -12,41 +12,69 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict from spinnman.data import SpiNNManDataView from spinnman.messages.scp.impl import AppCopyRun from .abstract_multi_connection_process import AbstractMultiConnectionProcess +APP_COPY_RUN_TIMEOUT = 6.0 + def _on_same_board(chip_1, chip_2): return (chip_1.nearest_ethernet_x == chip_2.nearest_ethernet_x and chip_1.nearest_ethernet_y == chip_2.nearest_ethernet_y) -def _get_next_chips(chips_done): +def _get_next_chips(chips_done, parent_chips, machine): """ Get the chips that are adjacent to the last set of chips, which haven't yet been loaded. Also returned are the links for each chip, which gives the link which should be read from to get the data. - :param set((int,int)) chips_done: - The coordinates of chips that have already been done - :return: A dict of chip coordinates to link to use, Chip - :rtype: dict((int,int), (int, Chip)) + :param dict((int,int),list(int,int)) chips_done: + The coordinates of chips that have already been done by Ethernet + :param dict((int, int),~spinn_machine.Chip) parent_chips: + A dictionary of chip coordinates to chips that use that chip as a + parent + :return: A list of next chips to use + :rtype: list(chip) """ - next_chips = dict() - for x, y in chips_done: - chip = SpiNNManDataView.get_chip_at(x, y) - for link in chip.router.links: - chip_coords = (link.destination_x, link.destination_y) - if chip_coords not in chips_done and chip_coords not in next_chips: - next_chip = SpiNNManDataView.get_chip_at(*chip_coords) - opp_link = (link.source_link_id + 3) % 6 - next_chips[chip_coords] = (opp_link, next_chip) - # Only let one thing copy from this chip - break + next_chips = list() + for eth_chip in chips_done: + off_board_copy_done = False + for c_x, c_y in chips_done[eth_chip]: + chip_xy = machine.get_chip_at(c_x, c_y) + for chip in parent_chips[c_x, c_y]: + on_same_board = _on_same_board(chip, chip_xy) + eth = (chip.nearest_ethernet_x, chip.nearest_ethernet_y) + if (eth not in chips_done or + (chip.x, chip.y) not in chips_done[eth]): + if on_same_board or not off_board_copy_done: + next_chips.append(chip) + if not on_same_board: + off_board_copy_done = True + # Only do one copy from each chip at a time + break + return next_chips +def _compute_parent_chips(machine): + """ + Compute a dictionary of chip coordinates to list of chips who use that chip + as a parent in the tree. + + :param ~spinn_machine.Machine machine: The machine to compute the map for + :rtype: dict((int, int), ~spinn_machine.Chip) + """ + chip_links = defaultdict(list) + for chip in machine.chips: + if chip.parent_link is not None: + link = chip.router.get_link(chip.parent_link) + chip_links[link.destination_x, link.destination_y].append(chip) + return chip_links + + class ApplicationCopyRunProcess(AbstractMultiConnectionProcess): """ Process to start a binary on a subset of cores on a subset of chips @@ -63,6 +91,10 @@ class ApplicationCopyRunProcess(AbstractMultiConnectionProcess): """ __slots__ = [] + def __init__(self, next_connection_selector, timeout=APP_COPY_RUN_TIMEOUT): + AbstractMultiConnectionProcess.__init__( + self, next_connection_selector, timeout=timeout) + def run(self, size, app_id, core_subsets, chksum, wait): """ Run the process. @@ -74,18 +106,22 @@ def run(self, size, app_id, core_subsets, chksum, wait): :param bool wait: Whether to put the binary in "wait" mode or run it straight away """ - boot_chip = SpiNNManDataView.get_machine().boot_chip - chips_done = set([(boot_chip.x, boot_chip.y)]) - next_chips = _get_next_chips(chips_done) + machine = SpiNNManDataView.get_machine() + boot_chip = machine.boot_chip + chips_done = defaultdict(list) + chips_done[boot_chip.x, boot_chip.y].append((boot_chip.x, boot_chip.y)) + parent_chips = _compute_parent_chips(machine) + next_chips = _get_next_chips(chips_done, parent_chips, machine) while next_chips: # Do all the chips at the current level - for link, chip in next_chips.values(): + for chip in next_chips: subset = core_subsets.get_core_subset_for_chip(chip.x, chip.y) self._send_request(AppCopyRun( - chip.x, chip.y, link, size, app_id, subset.processor_ids, - chksum, wait)) - chips_done.add((chip.x, chip.y)) + chip.x, chip.y, chip.parent_link, size, app_id, + subset.processor_ids, chksum, wait)) + eth = (chip.nearest_ethernet_x, chip.nearest_ethernet_y) + chips_done[eth].append((chip.x, chip.y)) self._finish() self.check_for_error() - next_chips = _get_next_chips(chips_done) + next_chips = _get_next_chips(chips_done, parent_chips, machine) diff --git a/spinnman/processes/get_n_cores_in_state_process.py b/spinnman/processes/get_n_cores_in_state_process.py new file mode 100644 index 000000000..5778c5698 --- /dev/null +++ b/spinnman/processes/get_n_cores_in_state_process.py @@ -0,0 +1,51 @@ +# Copyright (c) 2015 The University of Manchester +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from spinnman.messages.scp.impl import CountState +from .abstract_multi_connection_process import AbstractMultiConnectionProcess + +# Timeout for getting core state count; higher due to more waiting needed +GET_CORE_COUNT_TIMEOUT = 2.0 + + +class GetNCoresInStateProcess(AbstractMultiConnectionProcess): + __slots__ = [ + "_n_cores"] + + def __init__(self, connection_selector): + """ + :param connection_selector: + :type connection_selector: + AbstractMultiConnectionProcessConnectionSelector + """ + super().__init__(connection_selector, timeout=GET_CORE_COUNT_TIMEOUT) + self._n_cores = 0 + + def __handle_response(self, response): + self._n_cores += response.count + + def get_n_cores_in_state(self, xys, app_id, state): + """ + :param list(int,int) xys: + :param int app_id: + :param int state: + :rtype: int + """ + for c_x, c_y in xys: + self._send_request( + CountState(c_x, c_y, app_id, state), self.__handle_response) + self._finish() + self.check_for_error() + + return self._n_cores diff --git a/spinnman/spalloc/abstract_spalloc_client.py b/spinnman/spalloc/abstract_spalloc_client.py index 700175669..60ac8d7ce 100644 --- a/spinnman/spalloc/abstract_spalloc_client.py +++ b/spinnman/spalloc/abstract_spalloc_client.py @@ -118,3 +118,36 @@ def create_job_board( :return: A handle for monitoring and interacting with the job. :rtype: SpallocJob """ + + @abstractmethod + def create_job_rect_at_board( + self, width: int, height: int, triad: Tuple[int, int, int] = None, + physical: Tuple[int, int, int] = None, ip_address: str = None, + machine_name: str = None, keepalive: int = 45, + max_dead_boards: int = 0) -> SpallocJob: + """ + Create a job with a rectangle of boards starting at a specific board. + At least one of ``triad``, ``physical`` and ``ip_address`` must be not + ``None``. + + :param int width: + The width of rectangle to request + :param int height: + The height of rectangle to request + :param tuple(int,int,int) triad: + The logical coordinate of the board to request + :param tuple(int,int,int) physical: + The physical coordinate of the board to request + :param str ip_address: + The IP address of the board to request + :param str machine_name: + Which machine to run on? If omitted, the service's machine tagged + with ``default`` will be used. + :param int keepalive: + After how many seconds of no activity should a job become eligible + for automatic pruning? + :param int max_dead_boards: + How many dead boards can be included. + :return: A handle for monitoring and interacting with the job. + :rtype: SpallocJob + """ diff --git a/spinnman/spalloc/spalloc_client.py b/spinnman/spalloc/spalloc_client.py index 326302612..e905f41af 100644 --- a/spinnman/spalloc/spalloc_client.py +++ b/spinnman/spalloc/spalloc_client.py @@ -19,6 +19,7 @@ from multiprocessing import Process, Queue from time import sleep from packaging.version import Version +from urllib.parse import urlparse, urlunparse, ParseResult import queue import requests import struct @@ -60,6 +61,17 @@ _msg_to = struct.Struct(" SpallocState: + def get_state(self, wait_for_change=False) -> SpallocState: """ Get the current state of the machine. + :param bool wait_for_change: Whether to wait for a change in state + :rtype: SpallocState """ @@ -129,12 +131,16 @@ def create_transceiver(self) -> Transceiver: """ @abstractmethod - def wait_for_state_change(self, old_state: SpallocState) -> SpallocState: + def wait_for_state_change(self, old_state: SpallocState, + timeout: int = None) -> SpallocState: """ Wait until the allocation is not in the given old state. :param SpallocState old_state: The state that we are looking to change out of. + :param timeout: + The time to wait, or None to wait forever + :type timeout: int or None :return: The state that the allocation is now in. .. note:: @@ -143,10 +149,15 @@ def wait_for_state_change(self, old_state: SpallocState) -> SpallocState: """ @abstractmethod - def wait_until_ready(self): + def wait_until_ready(self, timeout: int = None, n_retries: int = None): """ Wait until the allocation is in the ``READY`` state. + :param timeout: The timeout or None to wait forever + :type timeout: int or None + :param n_retries: + The number of times to retry, or None to retry forever + :type n_retries: int or None :raises Exception: If the allocation is destroyed """ diff --git a/spinnman/transceiver/base_transceiver.py b/spinnman/transceiver/base_transceiver.py index 2fa7ae3b6..9646439da 100644 --- a/spinnman/transceiver/base_transceiver.py +++ b/spinnman/transceiver/base_transceiver.py @@ -55,7 +55,7 @@ from spinnman.messages.scp.impl import ( BMPGetVersion, SetPower, ReadFPGARegister, WriteFPGARegister, IPTagSetTTO, ReverseIPTagSet, - CountState, WriteMemory, SendSignal, AppStop, + WriteMemory, SendSignal, AppStop, IPTagSet, IPTagClear, RouterClear, DoSync) from spinnman.connections.udp_packet_connections import ( BMPConnection, BootConnection, SCAMPConnection) @@ -68,15 +68,16 @@ ReadFixedRouteRoutingEntryProcess, LoadMultiCastRoutesProcess, GetTagsProcess, GetMultiCastRoutesProcess, SendSingleCommandProcess, ReadRouterDiagnosticsProcess, - MostDirectConnectionSelector, ApplicationCopyRunProcess) -from spinnman.utilities.utility_functions import get_vcpu_address + MostDirectConnectionSelector, ApplicationCopyRunProcess, + GetNCoresInStateProcess) from spinnman.transceiver.transceiver import Transceiver from spinnman.transceiver.extendable_transceiver import ExtendableTransceiver +from spinnman.utilities.utility_functions import get_vcpu_address logger = FormatAdapter(logging.getLogger(__name__)) _SCAMP_NAME = "SC&MP" -_SCAMP_VERSION = (3, 0, 1) +_SCAMP_VERSION = (4, 0, 0) _BMP_NAME = "BC&MP" _BMP_MAJOR_VERSIONS = [1, 2] @@ -746,25 +747,8 @@ def add_cpu_information_from_core(self, cpu_infos, x, y, p, states): new_infos = self.get_cpu_infos(core_subsets) cpu_infos.add_infos(new_infos, states) + @overrides(Transceiver.get_region_base_address) def get_region_base_address(self, x, y, p): - """ - Gets the base address of the Region Table - - :param int x: The x-coordinate of the chip containing the processor - :param int y: The y-coordinate of the chip containing the processor - :param int p: The ID of the processor to get the address - :return: The adddress of the Region table for the selected core - :rtype: int - :raise SpinnmanIOException: - If there is an error communicating with the board - :raise SpinnmanInvalidPacketException: - If a packet is received that is not in the valid format - :raise SpinnmanInvalidParameterException: - * If x, y, p is not a valid processor - * If a packet is received that has invalid parameters - :raise SpinnmanUnexpectedResponseCodeException: - If a response indicates an error during the exchange - """ return self.read_user(x, y, p, 0) @overrides(Transceiver.get_iobuf) @@ -781,10 +765,15 @@ def get_iobuf(self, core_subsets=None): return process.read_iobuf(self._iobuf_size, core_subsets) @overrides(Transceiver.get_core_state_count) - def get_core_state_count(self, app_id, state): - process = SendSingleCommandProcess(self._scamp_connection_selector) - response = process.execute(CountState(app_id, state)) - return response.count # pylint: disable=no-member + def get_core_state_count(self, app_id, state, xys=None): + process = GetNCoresInStateProcess(self._scamp_connection_selector) + chip_xys = xys + if xys is None: + machine = SpiNNManDataView.get_machine() + chip_xys = [(ch.x, ch.y) + for ch in machine.ethernet_connected_chips] + + return process.get_n_cores_in_state(chip_xys, app_id, state) @contextmanager def _flood_execute_lock(self): diff --git a/spinnman/transceiver/mockable_transceiver.py b/spinnman/transceiver/mockable_transceiver.py index d53e46c61..80cbe259a 100644 --- a/spinnman/transceiver/mockable_transceiver.py +++ b/spinnman/transceiver/mockable_transceiver.py @@ -77,7 +77,7 @@ def get_iobuf(self, core_subsets=None): raise NotImplementedError("Needs to be mocked") @overrides(Transceiver.get_core_state_count) - def get_core_state_count(self, app_id, state): + def get_core_state_count(self, app_id, state, xys=None): raise NotImplementedError("Needs to be mocked") @overrides(Transceiver.execute_flood) diff --git a/spinnman/transceiver/transceiver.py b/spinnman/transceiver/transceiver.py index 02ab8508c..e9a5b65da 100644 --- a/spinnman/transceiver/transceiver.py +++ b/spinnman/transceiver/transceiver.py @@ -271,13 +271,14 @@ def get_iobuf(self, core_subsets=None): # Used by IOBufExtractor @abstractmethod - def get_core_state_count(self, app_id, state): + def get_core_state_count(self, app_id, state, xys=None): """ Get a count of the number of cores which have a given state. :param int app_id: The ID of the application from which to get the count. :param CPUState state: The state count to get + :param list(int,int) xys: The chips to query, or None for all :return: A count of the cores with the given status :rtype: int :raise SpinnmanIOException: