diff --git a/manual_scripts/get_triad.py b/manual_scripts/get_triad.py new file mode 100644 index 000000000..370fe47bd --- /dev/null +++ b/manual_scripts/get_triad.py @@ -0,0 +1,58 @@ +# Copyright (c) 2024 The University of Manchester +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from spinn_utilities.config_holder import set_config +from spinnman.spalloc import SpallocClient +from spinnman.config_setup import unittest_setup + + +SPALLOC_URL = "https://spinnaker.cs.man.ac.uk/spalloc" +SPALLOC_USERNAME = "" +SPALLOC_PASSWORD = "" + +SPALLOC_MACHINE = "SpiNNaker1M" + +x = 0 +y = 3 +b = 0 # Must be 0 if requesting a rect +RECT = True +WIDTH = 1 # In triads! +HEIGHT = 1 # In triads! + +unittest_setup() +set_config("Machine", "version",5) +client = SpallocClient(SPALLOC_URL, SPALLOC_USERNAME, SPALLOC_PASSWORD) +if RECT: + job = client.create_job_rect_at_board( + WIDTH, HEIGHT, triad=(x, y, b), machine_name=SPALLOC_MACHINE, + max_dead_boards=1) +else: + job = client.create_job_board( + triad=(x, y, b), machine_name=SPALLOC_MACHINE) +print(job) +print("Waiting until ready...") +with job: + job.wait_until_ready() + print(job.get_connections()) + + txrx = job.create_transceiver() + # This call is for testing and can be changed without notice! + dims = txrx._get_machine_dimensions() + print(f"{dims.height=}, {dims.width=}") + + machine = txrx.get_machine_details() + print(machine) + + input("Press Enter to release...") +client.close()#print(2)#print(2^(1/(2^1))) \ No newline at end of file diff --git a/spinnman/spalloc/spalloc_client.py b/spinnman/spalloc/spalloc_client.py index 8ed750490..d2a58ace4 100644 --- a/spinnman/spalloc/spalloc_client.py +++ b/spinnman/spalloc/spalloc_client.py @@ -14,16 +14,15 @@ """ Implementation of the client for the Spalloc web service. """ - -from contextlib import contextmanager +import time from logging import getLogger -from multiprocessing import Process, Queue + import queue import struct import threading from time import sleep -from typing import (Any, ContextManager, Callable, Dict, FrozenSet, Iterable, - Iterator, List, Mapping, Optional, Tuple, cast) +from typing import (Any, Callable, Dict, FrozenSet, Iterable, List, Mapping, + Optional, Tuple, cast) from urllib.parse import urlparse, urlunparse, ParseResult from packaging.version import Version @@ -69,6 +68,8 @@ _msg = struct.Struct(" str: """ @@ -199,7 +200,7 @@ def _create(self, create: Mapping[str, JsonValue], def create_job( self, num_boards: int = 1, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob: return self._create({ "num-boards": int(num_boards), "keepalive-interval": f"PT{int(keepalive)}S" @@ -209,7 +210,7 @@ def create_job( def create_job_rect( self, width: int, height: int, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob: return self._create({ "dimensions": { "width": int(width), @@ -224,7 +225,7 @@ def create_job_board( physical: Optional[Tuple[int, int, int]] = None, ip_address: Optional[str] = None, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob: board: JsonObject if triad: x, y, z = triad @@ -248,7 +249,8 @@ def create_job_rect_at_board( triad: Optional[Tuple[int, int, int]] = None, physical: Optional[Tuple[int, int, int]] = None, ip_address: Optional[str] = None, - machine_name: Optional[str] = None, keepalive: int = 45, + machine_name: Optional[str] = None, + keepalive: int = KEEP_ALIVE_PERIOND, max_dead_boards: int = 0) -> SpallocJob: board: JsonObject if triad: @@ -285,27 +287,6 @@ class _ProxyServiceError(IOError): """ -def _spalloc_keepalive(url, interval, term_queue, cookies, headers): - """ - Actual keepalive task implementation. Don't use directly. - """ - headers["Content-Type"] = "text/plain; charset=UTF-8" - while True: - requests.put(url, data="alive", cookies=cookies, headers=headers, - allow_redirects=False, timeout=10) - try: - term_queue.get(True, interval) - break - except queue.Empty: - continue - # On ValueError or OSError, just terminate the keepalive process - # They happen when the term_queue is directly closed - except ValueError: - break - except OSError: - break - - class _SpallocMachine(SessionAware, SpallocMachine): """ Represents a Spalloc-controlled machine. @@ -507,7 +488,7 @@ class _SpallocJob(SessionAware, SpallocJob): Don't make this yourself. Use :py:class:`SpallocClient` instead. """ __slots__ = ("__machine_url", "__chip_url", - "_keepalive_url", "__keepalive_handle", "__proxy_handle", + "_keepalive_url", "__proxy_handle", "__proxy_thread", "__proxy_ping") def __init__(self, session: Session, job_handle: str): @@ -519,11 +500,13 @@ def __init__(self, session: Session, job_handle: str): logger.info("established job at {}", job_handle) self.__machine_url = self._url + "machine" self.__chip_url = self._url + "chip" - self._keepalive_url = self._url + "keepalive" - self.__keepalive_handle: Optional[Queue] = None + self._keepalive_url: Optional[str] = self._url + "keepalive" self.__proxy_handle: Optional[WebSocket] = None self.__proxy_thread: Optional[_ProxyReceiver] = None self.__proxy_ping: Optional[_ProxyPing] = None + keep_alive = threading.Thread( + target=self.__start_keepalive, daemon=True) + keep_alive.start() @overrides(SpallocJob.get_session_credentials_for_db) def get_session_credentials_for_db(self) -> Mapping[Tuple[str, str], str]: @@ -651,9 +634,7 @@ def wait_until_ready(self, timeout: Optional[int] = None, @overrides(SpallocJob.destroy) def destroy(self, reason: str = "finished"): - if self.__keepalive_handle: - self.__keepalive_handle.close() - self.__keepalive_handle = None + self._keepalive_url = None if self.__proxy_handle is not None: if self.__proxy_thread: self.__proxy_thread.close() @@ -663,38 +644,32 @@ def destroy(self, reason: str = "finished"): self._delete(self._url, reason=str(reason)) logger.info("deleted job at {}", self._url) - @overrides(SpallocJob.keepalive) - def keepalive(self) -> None: - self._put(self._keepalive_url, "alive") + def __keepalive(self) -> bool: + """ + Signal spalloc that we want the job to stay alive for a while longer. - @overrides(SpallocJob.launch_keepalive_task, extend_doc=True) - def launch_keepalive_task( - self, period: float = 30) -> ContextManager[Process]: + :return: False if the job has not been destroyed + :rtype: bool """ - .. note:: - Tricky! *Cannot* be done with a thread, as the main thread is known - to do significant amounts of CPU-intensive work. + if self._keepalive_url is None: + return False + cookies, headers = self._session_credentials + headers["Content-Type"] = "text/plain; charset=UTF-8" + logger.debug(self._keepalive_url) + requests.put(self._keepalive_url, data="alive", cookies=cookies, + headers=headers, allow_redirects=False, timeout=10) + return True + + def __start_keepalive(self) -> None: + """ + Method for keep alive thread to start the keep alive class + """ - if self.__keepalive_handle is not None: - raise SpallocException("cannot keep job alive from two tasks") - q: Queue = Queue(1) - p = Process(target=_spalloc_keepalive, args=( - self._keepalive_url, 0 + period, q, - *self._session_credentials), daemon=True) - p.start() - self.__keepalive_handle = q - return self.__closer(q, p) - - @contextmanager - def __closer(self, q: Queue, p: Process) -> Iterator[Process]: try: - yield p - finally: - q.put("quit") - # Give it a second, and if it still isn't dead, kill it - p.join(1) - if p.is_alive(): - p.kill() + while self.__keepalive(): + time.sleep(KEEP_ALIVE_PERIOND / 2) + except Exception as ex: # pylint: disable=broad-except + logger.exception(ex) @overrides(SpallocJob.where_is_machine) def where_is_machine(self, x: int, y: int) -> Optional[ @@ -705,16 +680,6 @@ def where_is_machine(self, x: int, y: int) -> Optional[ return cast(Tuple[int, int, int], tuple( r.json()["physical-board-coordinates"])) - @property - def _keepalive_handle(self) -> Optional[Queue]: - return self.__keepalive_handle - - @_keepalive_handle.setter - def _keepalive_handle(self, handle: Queue): - if self.__keepalive_handle is not None: - raise SpallocException("cannot keep job alive from two tasks") - self.__keepalive_handle = handle - @overrides(SpallocJob.create_transceiver) def create_transceiver(self) -> Transceiver: if self.get_state() != SpallocState.READY: diff --git a/spinnman/spalloc/spalloc_job.py b/spinnman/spalloc/spalloc_job.py index 14f5ba429..04ffa8715 100644 --- a/spinnman/spalloc/spalloc_job.py +++ b/spinnman/spalloc/spalloc_job.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from contextlib import AbstractContextManager from typing import Dict, Mapping, Optional, Tuple from spinn_utilities.abstract_base import AbstractBase, abstractmethod from spinnman.constants import SCP_SCAMP_PORT @@ -182,29 +181,6 @@ def destroy(self, reason: str = "finished"): """ raise NotImplementedError() - @abstractmethod - def keepalive(self) -> None: - """ - Signal the job that we want it to stay alive for a while longer. - """ - raise NotImplementedError() - - @abstractmethod - def launch_keepalive_task( - self, period: int = 30) -> AbstractContextManager: - """ - Starts a periodic task to keep a job alive. - - :param SpallocJob job: - The job to keep alive - :param int period: - How often to send a keepalive message (in seconds) - :return: - Some kind of closeable task handle; closing it terminates the task. - Destroying the job will also terminate the task. - """ - raise NotImplementedError() - @abstractmethod def where_is_machine(self, x: int, y: int) -> Optional[ Tuple[int, int, int]]: