Skip to content

Commit

Permalink
Merge pull request #326 from SpiNNakerManchester/p2p_ping
Browse files Browse the repository at this point in the history
P2p ping
  • Loading branch information
Christian-B authored Oct 11, 2023
2 parents 3138531 + 60179b2 commit 27295bf
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 70 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ SpiNNMan.egg-info
.coverage
.cache/
.pytest_cache/
/scripts/
3 changes: 2 additions & 1 deletion spinnman/connections/scp_request_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
MAX_SEQUENCE = 65536
RETRY_CODES = frozenset([
SCPResult.RC_TIMEOUT, SCPResult.RC_P2P_TIMEOUT, SCPResult.RC_LEN,
SCPResult.RC_P2P_NOREPLY])
SCPResult.RC_P2P_NOREPLY, SCPResult.RC_P2P_BUSY])

# Keep a global track of the sequence numbers used
_next_sequence = 0
Expand Down Expand Up @@ -294,6 +294,7 @@ def _handle_receive_timeout(self):
def _resend(self, seq, request_sent, reason):
if self._retries[seq] <= 0:
# Report timeouts as timeout exception
self._retry_reason[seq].append(reason)
if all(reason == "timeout" for reason in self._retry_reason[seq]):
raise SpinnmanTimeoutException(
request_sent,
Expand Down
3 changes: 3 additions & 0 deletions spinnman/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ class READ_TYPES(Enum):
#: This is the default timeout when using SCP
SCP_TIMEOUT = 1.0

#: This is the default timeout when using SCP count (can take a bit longer)
SCP_TIMEOUT_COUNT = 5.0

#: This is the default number of retries when using SCP
N_RETRIES = 10

Expand Down
1 change: 1 addition & 0 deletions spinnman/messages/scp/enums/scp_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class SCPCommand(Enum):
CMD_WRITE = (3, "Write SDRAM")
CMD_APLX = 4
CMD_FILL = 5
CMD_COUNT = (15, "Count the number of cores in a given state")
CMD_REMAP = 16
CMD_LINK_READ = (17, "Read neighbouring chip's memory.")
CMD_LINK_WRITE = (18, "Write neighbouring chip's memory.")
Expand Down
26 changes: 6 additions & 20 deletions spinnman/messages/scp/impl/count_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,14 @@
from spinnman.messages.sdp import SDPFlag, SDPHeader
from .count_state_response import CountStateResponse

_ALL_CORE_MASK = 0xFFFF
_COUNT_OPERATION = 1
_COUNT_MODE = 2
_COUNT_SIGNAL_TYPE = 1
_APP_MASK = 0xFF


def _get_data(app_id, state):
data = (_APP_MASK << 8) | app_id
data += (_COUNT_OPERATION << 22) | (_COUNT_MODE << 20)
data += state.value << 16
return data


class CountState(AbstractSCPRequest):
"""
An SCP Request to get a count of the cores in a particular state.
"""
__slots__ = []

def __init__(self, app_id, state):
def __init__(self, x, y, app_id, state):
"""
:param int app_id: The ID of the application, between 0 and 255
:param CPUState state: The state to count
Expand All @@ -48,12 +35,11 @@ def __init__(self, app_id, state):
SDPHeader(
flags=SDPFlag.REPLY_EXPECTED, destination_port=0,
destination_cpu=0,
destination_chip_x=self.DEFAULT_DEST_X_COORD,
destination_chip_y=self.DEFAULT_DEST_Y_COORD),
SCPRequestHeader(command=SCPCommand.CMD_SIG),
argument_1=_COUNT_SIGNAL_TYPE,
argument_2=_get_data(app_id, state),
argument_3=_ALL_CORE_MASK)
destination_chip_x=x,
destination_chip_y=y),
SCPRequestHeader(command=SCPCommand.CMD_COUNT),
argument_1=app_id,
argument_2=state.value)

@overrides(AbstractSCPRequest.get_scp_response)
def get_scp_response(self):
Expand Down
2 changes: 1 addition & 1 deletion spinnman/messages/scp/impl/count_state_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def read_data_bytestring(self, data, offset):
result = self.scp_response_header.result
if result != SCPResult.RC_OK:
raise SpinnmanUnexpectedResponseCodeException(
"CountState", "CMD_SIGNAL", result.name)
"CountState", "CMD_COUNT", result.name)
self._count = _ONE_WORD.unpack_from(data, offset)[0]

@property
Expand Down
Binary file modified spinnman/messages/spinnaker_boot/boot_data/scamp.boot
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SystemVariableDefinition(Enum):
_DataType.BYTE, offset=0x29,
doc="Nearest-Neighbour retry parameter")
link_peek_timeout_microseconds = _Definition(
_DataType.BYTE, offset=0x2a, default=100,
_DataType.BYTE, offset=0x2a, default=200,
doc="The link peek/poke timeout in microseconds")
led_half_period_10_ms = _Definition(
_DataType.BYTE, offset=0x2b, default=1,
Expand Down
4 changes: 3 additions & 1 deletion spinnman/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .get_exclude_cpu_info_process import GetExcludeCPUInfoProcess
from .get_include_cpu_info_process import GetIncludeCPUInfoProcess
from .get_machine_process import GetMachineProcess
from .get_n_cores_in_state_process import GetNCoresInStateProcess
from .get_routes_process import GetMultiCastRoutesProcess
from .get_tags_process import GetTagsProcess
from .get_version_process import GetVersionProcess
Expand All @@ -49,7 +50,8 @@
"GetCPUInfoProcess",
"GetExcludeCPUInfoProcess", "GetIncludeCPUInfoProcess",
"GetHeapProcess",
"GetMachineProcess", "GetMultiCastRoutesProcess", "GetTagsProcess",
"GetMachineProcess", "GetMultiCastRoutesProcess",
"GetNCoresInStateProcess", "GetTagsProcess",
"GetVersionProcess", "LoadFixedRouteRoutingEntryProcess",
"LoadMultiCastRoutesProcess", "MallocSDRAMProcess",
"ReadFixedRouteRoutingEntryProcess", "ReadIOBufProcess",
Expand Down
84 changes: 60 additions & 24 deletions spinnman/processes/application_copy_run_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,69 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
from spinnman.data import SpiNNManDataView
from spinnman.messages.scp.impl import AppCopyRun
from .abstract_multi_connection_process import AbstractMultiConnectionProcess

APP_COPY_RUN_TIMEOUT = 6.0


def _on_same_board(chip_1, chip_2):
return (chip_1.nearest_ethernet_x == chip_2.nearest_ethernet_x and
chip_1.nearest_ethernet_y == chip_2.nearest_ethernet_y)


def _get_next_chips(chips_done):
def _get_next_chips(chips_done, parent_chips, machine):
"""
Get the chips that are adjacent to the last set of chips, which
haven't yet been loaded. Also returned are the links for each chip,
which gives the link which should be read from to get the data.
:param set((int,int)) chips_done:
The coordinates of chips that have already been done
:return: A dict of chip coordinates to link to use, Chip
:rtype: dict((int,int), (int, Chip))
:param dict((int,int),list(int,int)) chips_done:
The coordinates of chips that have already been done by Ethernet
:param dict((int, int),~spinn_machine.Chip) parent_chips:
A dictionary of chip coordinates to chips that use that chip as a
parent
:return: A list of next chips to use
:rtype: list(chip)
"""
next_chips = dict()
for x, y in chips_done:
chip = SpiNNManDataView.get_chip_at(x, y)
for link in chip.router.links:
chip_coords = (link.destination_x, link.destination_y)
if chip_coords not in chips_done and chip_coords not in next_chips:
next_chip = SpiNNManDataView.get_chip_at(*chip_coords)
opp_link = (link.source_link_id + 3) % 6
next_chips[chip_coords] = (opp_link, next_chip)
# Only let one thing copy from this chip
break
next_chips = list()
for eth_chip in chips_done:
off_board_copy_done = False
for c_x, c_y in chips_done[eth_chip]:
chip_xy = machine.get_chip_at(c_x, c_y)
for chip in parent_chips[c_x, c_y]:
on_same_board = _on_same_board(chip, chip_xy)
eth = (chip.nearest_ethernet_x, chip.nearest_ethernet_y)
if (eth not in chips_done or
(chip.x, chip.y) not in chips_done[eth]):
if on_same_board or not off_board_copy_done:
next_chips.append(chip)
if not on_same_board:
off_board_copy_done = True
# Only do one copy from each chip at a time
break

return next_chips


def _compute_parent_chips(machine):
"""
Compute a dictionary of chip coordinates to list of chips who use that chip
as a parent in the tree.
:param ~spinn_machine.Machine machine: The machine to compute the map for
:rtype: dict((int, int), ~spinn_machine.Chip)
"""
chip_links = defaultdict(list)
for chip in machine.chips:
if chip.parent_link is not None:
link = chip.router.get_link(chip.parent_link)
chip_links[link.destination_x, link.destination_y].append(chip)
return chip_links


class ApplicationCopyRunProcess(AbstractMultiConnectionProcess):
"""
Process to start a binary on a subset of cores on a subset of chips
Expand All @@ -63,6 +91,10 @@ class ApplicationCopyRunProcess(AbstractMultiConnectionProcess):
"""
__slots__ = []

def __init__(self, next_connection_selector, timeout=APP_COPY_RUN_TIMEOUT):
AbstractMultiConnectionProcess.__init__(
self, next_connection_selector, timeout=timeout)

def run(self, size, app_id, core_subsets, chksum, wait):
"""
Run the process.
Expand All @@ -74,18 +106,22 @@ def run(self, size, app_id, core_subsets, chksum, wait):
:param bool wait:
Whether to put the binary in "wait" mode or run it straight away
"""
boot_chip = SpiNNManDataView.get_machine().boot_chip
chips_done = set([(boot_chip.x, boot_chip.y)])
next_chips = _get_next_chips(chips_done)
machine = SpiNNManDataView.get_machine()
boot_chip = machine.boot_chip
chips_done = defaultdict(list)
chips_done[boot_chip.x, boot_chip.y].append((boot_chip.x, boot_chip.y))
parent_chips = _compute_parent_chips(machine)
next_chips = _get_next_chips(chips_done, parent_chips, machine)

while next_chips:
# Do all the chips at the current level
for link, chip in next_chips.values():
for chip in next_chips:
subset = core_subsets.get_core_subset_for_chip(chip.x, chip.y)
self._send_request(AppCopyRun(
chip.x, chip.y, link, size, app_id, subset.processor_ids,
chksum, wait))
chips_done.add((chip.x, chip.y))
chip.x, chip.y, chip.parent_link, size, app_id,
subset.processor_ids, chksum, wait))
eth = (chip.nearest_ethernet_x, chip.nearest_ethernet_y)
chips_done[eth].append((chip.x, chip.y))
self._finish()
self.check_for_error()
next_chips = _get_next_chips(chips_done)
next_chips = _get_next_chips(chips_done, parent_chips, machine)
51 changes: 51 additions & 0 deletions spinnman/processes/get_n_cores_in_state_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from spinnman.messages.scp.impl import CountState
from .abstract_multi_connection_process import AbstractMultiConnectionProcess

# Timeout for getting core state count; higher due to more waiting needed
GET_CORE_COUNT_TIMEOUT = 2.0


class GetNCoresInStateProcess(AbstractMultiConnectionProcess):
__slots__ = [
"_n_cores"]

def __init__(self, connection_selector):
"""
:param connection_selector:
:type connection_selector:
AbstractMultiConnectionProcessConnectionSelector
"""
super().__init__(connection_selector, timeout=GET_CORE_COUNT_TIMEOUT)
self._n_cores = 0

def __handle_response(self, response):
self._n_cores += response.count

def get_n_cores_in_state(self, xys, app_id, state):
"""
:param list(int,int) xys:
:param int app_id:
:param int state:
:rtype: int
"""
for c_x, c_y in xys:
self._send_request(
CountState(c_x, c_y, app_id, state), self.__handle_response)
self._finish()
self.check_for_error()

return self._n_cores
33 changes: 33 additions & 0 deletions spinnman/spalloc/abstract_spalloc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,36 @@ def create_job_board(
:return: A handle for monitoring and interacting with the job.
:rtype: SpallocJob
"""

@abstractmethod
def create_job_rect_at_board(
self, width: int, height: int, triad: Tuple[int, int, int] = None,
physical: Tuple[int, int, int] = None, ip_address: str = None,
machine_name: str = None, keepalive: int = 45,
max_dead_boards: int = 0) -> SpallocJob:
"""
Create a job with a rectangle of boards starting at a specific board.
At least one of ``triad``, ``physical`` and ``ip_address`` must be not
``None``.
:param int width:
The width of rectangle to request
:param int height:
The height of rectangle to request
:param tuple(int,int,int) triad:
The logical coordinate of the board to request
:param tuple(int,int,int) physical:
The physical coordinate of the board to request
:param str ip_address:
The IP address of the board to request
:param str machine_name:
Which machine to run on? If omitted, the service's machine tagged
with ``default`` will be used.
:param int keepalive:
After how many seconds of no activity should a job become eligible
for automatic pruning?
:param int max_dead_boards:
How many dead boards can be included.
:return: A handle for monitoring and interacting with the job.
:rtype: SpallocJob
"""
Loading

0 comments on commit 27295bf

Please sign in to comment.