From 1913c20e394d947ad9816124ab630ef00904c441 Mon Sep 17 00:00:00 2001 From: Pallab Pain Date: Wed, 12 Jun 2024 00:32:23 +0530 Subject: [PATCH] refactor(hwil): refactors the hwil command implementation --- docs/source/hwil.rst | 10 + docs/source/index.rst | 1 + riocli/config/config.py | 6 +- riocli/hwil/__init__.py | 14 +- riocli/hwil/create.py | 114 ++---- riocli/hwil/delete.py | 84 ++-- riocli/hwil/inspect.py | 48 ++- riocli/hwil/list.py | 18 +- riocli/hwil/login.py | 65 ++-- riocli/hwil/util.py | 22 +- riocli/hwilclient/client.py | 754 ++++-------------------------------- 11 files changed, 282 insertions(+), 854 deletions(-) create mode 100644 docs/source/hwil.rst diff --git a/docs/source/hwil.rst b/docs/source/hwil.rst new file mode 100644 index 00000000..bc25bf7a --- /dev/null +++ b/docs/source/hwil.rst @@ -0,0 +1,10 @@ +Hardware-in-Loop +================ + +.. toctree:: + :maxdepth: 3 + :caption: Contents: + +.. click:: riocli.hwil:hwildevice + :prog: rio hwil + :nested: full diff --git a/docs/source/index.rst b/docs/source/index.rst index c8698a5c..4bde6f42 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -65,6 +65,7 @@ Rapyuta CLI has commands for all rapyuta.io resources. You can read more about t Deployment Device Disk + Hardware-in-Loop ManagedService Network Organization diff --git a/riocli/config/config.py b/riocli/config/config.py index e8ddf70c..5a26031b 100644 --- a/riocli/config/config.py +++ b/riocli/config/config.py @@ -25,7 +25,7 @@ from riocli.exceptions import LoggedOut, NoOrganizationSelected, NoProjectSelected from riocli.v2client import Client as v2Client -from riocli.hwilclient import Client as hwilClient +from riocli.hwilclient import Client as HwilClient class Configuration(object): @@ -110,7 +110,7 @@ def new_v2_client(self: Configuration, with_project: bool = True) -> v2Client: return v2Client(self, auth_token=token, project=project) - def new_hwil_client(self: Configuration) -> hwilClient: + def new_hwil_client(self: Configuration) -> HwilClient: if 'hwil_auth_token' not in self.data: raise LoggedOut @@ -119,7 +119,7 @@ def new_hwil_client(self: Configuration) -> hwilClient: token = self.data.get('hwil_auth_token', None) - return hwilClient(auth_token=token) + return HwilClient(auth_token=token) def get_auth_header(self: Configuration) -> dict: if not ('auth_token' in self.data and 'project_id' in self.data): diff --git a/riocli/hwil/__init__.py b/riocli/hwil/__init__.py index 9a54344e..c1b34c38 100644 --- a/riocli/hwil/__init__.py +++ b/riocli/hwil/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2021 Rapyuta Robotics +# Copyright 2024 Rapyuta Robotics # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,23 +14,23 @@ import click from click_help_colors import HelpColorsGroup +from riocli.constants import Colors from riocli.hwil.create import create_device -from riocli.hwil.list import list_devices from riocli.hwil.delete import delete_device from riocli.hwil.inspect import inspect_device +from riocli.hwil.list import list_devices from riocli.hwil.login import login @click.group( + name="hwil", invoke_without_command=False, cls=HelpColorsGroup, - help_headers_color='yellow', - help_options_color='green', + help_headers_color=Colors.YELLOW, + help_options_color=Colors.GREEN, ) def hwildevice(): - """ - HWIL Devices on Rapyuta.io - """ + """Manage Hardware-in-the-Loop (HWIL) devices""" pass diff --git a/riocli/hwil/create.py b/riocli/hwil/create.py index bdbb0a96..91edf7fa 100644 --- a/riocli/hwil/create.py +++ b/riocli/hwil/create.py @@ -1,4 +1,4 @@ -# Copyright 2021 Rapyuta Robotics +# Copyright 2024 Rapyuta Robotics # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,94 +11,64 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import time +import typing + import click -from click_spinner import spinner from click_help_colors import HelpColorsCommand -from riocli.constants import Colors -from rapyuta_io.utils import ConflictError -from rapyuta_io.clients.device import DevicePythonVersion, Device, DeviceStatus -from riocli.device.util import find_device_guid -from riocli.hwil.util import name_to_id +from yaspin.api import Yaspin -from riocli.config import new_client, new_hwil_client -from riocli.hwilclient.client import Client -from rapyuta_io import Client as v1Client +from riocli.config import new_hwil_client +from riocli.constants import Colors, Symbols +from riocli.utils.spinner import with_spinner -@click.command('create', - cls=HelpColorsCommand, - help_headers_color=Colors.YELLOW, - help_options_color=Colors.GREEN,) -@click.option('--arch', 'arch', help='device family type', +@click.command( + 'create', + cls=HelpColorsCommand, + help_headers_color=Colors.YELLOW, + help_options_color=Colors.GREEN, +) +@click.option('--arch', 'arch', help='Device architecture', type=click.Choice(['amd64', 'arm64']), default='amd64') -@click.option('--os', 'os', help='type of os', +@click.option('--os', 'os', help='Type of the OS', type=click.Choice(['debian', 'ubuntu']), default='ubuntu') -@click.option('--codename', 'codename', help='code name of os', +@click.option('--codename', 'codename', help='Code name of the OS', type=click.Choice(['bionic', 'focal', 'jammy', 'bullseye']), default='focal') -@click.option('--onboard', 'onboard', is_flag=True, type=bool, default=False) @click.argument('device-name', type=str) +@with_spinner(text='Creating device...') +@click.pass_context def create_device( + ctx: click.Context, device_name: str, arch: str, os: str, codename: str, - onboard: bool, + spinner: Yaspin = None, ) -> None: - """ - Create a new virtual device on the cloud - """ - client = new_client() - hwil_client = new_hwil_client() - try: - with spinner(): - try: - hwil_client.create_device(device_name, arch, os, codename) - click.secho('HWIL Device created successfully!', fg='green') - except ConflictError: - click.secho('HWIL Device {} already exists in cluster!'.format(device_name), fg='green') - - if onboard: - try: - device = Device(name=device_name, description='onboarded using hwil', ros_distro='melodic', - runtime_docker=True, runtime_preinstalled=False, - python_version=DevicePythonVersion.PYTHON3) - device = client.create_device(device) - click.secho('Device created successfully in rapyuta.io!', fg='green') - onboard_command = device.onboard_script().full_command() - _onboard_hwil_device(hwil_client=hwil_client, client=client, device_name=device_name, - onboard_command=onboard_command, - device_uuid=device.uuid) - except ConflictError: - click.secho('Device {} already exists in rapyuta.io!'.format(device_name), fg='green') - device = client.get_device(device_id=find_device_guid(client, device_name)) - if device.is_online() or device.status == DeviceStatus.INITIALIZING: - click.secho('Device {} already {} in rapyuta.io!'.format(device.status, - device_name), fg='green') - raise SystemExit(0) - _onboard_hwil_device(hwil_client=hwil_client, client=client, device_name=device_name, - onboard_command=device.onboard_script().full_command(), - device_uuid=device.uuid) + """Create a new hardware-in-the-loop device.""" + info = click.style(f'{Symbols.INFO} Device configuration = {os}:{codename}:{arch}', + fg=Colors.CYAN, bold=True) + spinner.write(info) + client = new_hwil_client() + labels = prepare_device_labels_from_context(ctx) + try: + client.create_device(device_name, arch, os, codename, labels) + spinner.text = click.style(f'Device {device_name} created successfully.', fg=Colors.GREEN) + spinner.green.ok(Symbols.SUCCESS) except Exception as e: - click.secho(str(e), fg='red') + spinner.text = click.style(f'Failed to create device: {str(e)}', fg=Colors.RED) + spinner.red.fail(Symbols.ERROR) raise SystemExit(1) -@name_to_id -def _onboard_hwil_device(hwil_client: Client, client: v1Client, device_name: str, onboard_command: str, - device_id: int, device_uuid: str): - try: - hwil_client.poll_till_device_ready(device_id, sleep_interval=5, retry_limit=3) - hwil_client.execute_cmd(device_id, onboard_command) - for _ in range(10): - device = client.get_device(device_uuid) - if device.is_online(): - click.secho('Device {} came online in rapyuta.io!'.format(device_name), fg='green') - return - click.secho('Device {} state {} in rapyuta.io!'.format(device_name, device.status), fg='green') - time.sleep(20) - click.secho('Device {} state {} in rapyuta.io!'.format(device_name, device.status), fg='red') - except Exception as e: - click.secho(str(e), fg='red') - raise SystemExit(1) +def prepare_device_labels_from_context(ctx: click.Context) -> typing.Dict: + user_email = ctx.obj.data.get('email_id', '') + if user_email: + user_email = user_email.split('@')[0] + + return { + "user": user_email, + "organization": ctx.obj.data.get('organization_id', ''), + "project": ctx.obj.data.get('project_id', ''), + } diff --git a/riocli/hwil/delete.py b/riocli/hwil/delete.py index 569c27d6..c5781aeb 100644 --- a/riocli/hwil/delete.py +++ b/riocli/hwil/delete.py @@ -1,4 +1,4 @@ -# Copyright 2021 Rapyuta Robotics +# Copyright 2024 Rapyuta Robotics # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,45 +11,69 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import typing + import click -from click_spinner import spinner from click_help_colors import HelpColorsCommand -from riocli.config import new_client, new_hwil_client -from riocli.hwil.util import find_device_id, DeviceNotFound -from riocli.device.util import find_device_guid +from yaspin.api import Yaspin + +from riocli.config import new_hwil_client +from riocli.constants import Colors, Symbols +from riocli.utils.spinner import with_spinner @click.command( 'delete', cls=HelpColorsCommand, + help_headers_color=Colors.YELLOW, + help_options_color=Colors.GREEN, ) -# @click.argument('device-name', type=str, default="") -@click.argument('device-names', type=str, nargs=-1) -@click.option('--offboard', 'offboard', is_flag=True, type=bool, default=False) +@click.argument('devices', type=str, nargs=-1) +@click.option('--force', '-f', '--silent', 'force', is_flag=True, + default=False, help='Skip confirmation') +@with_spinner(text='Deleting device(s)...') def delete_device( - device_names: tuple, - offboard: bool, + devices: typing.List, + force: bool, + spinner: Yaspin = None, ) -> None: - """ - delete a virtual device on the cloud - """ - client = new_client() - hwil_client = new_hwil_client() + """Delete one or more devices""" + + if not devices: + spinner.text = click.style('No device names provided', fg=Colors.RED) + spinner.red.fail(Symbols.ERROR) + raise SystemExit(1) + + client = new_hwil_client() + fetched = [] + + try: + fetched = client.list_devices() + except Exception as e: + spinner.text = click.style(f'Error fetching device(s): {str(e)}', fg=Colors.RED) + spinner.red.fail(Symbols.ERROR) + + device_name_map = {name: None for name in devices} + + final = {d['id']: d['name'] for d in fetched + if d['name'] in device_name_map} + + if not final: + spinner.text = click.style(f'No devices found with name(s): {", ".join(devices)}', fg=Colors.RED) + spinner.red.fail(Symbols.ERROR) + raise SystemExit(1) + + with spinner.hidden(): + if not force: + click.confirm(f'Do you want to delete {", ".join(final.values())}', abort=True) + try: - with spinner(): - for device_name in device_names: - try: - hwil_client.delete_device(find_device_id(hwil_client, device_name)) - click.secho('HWIL Device {device_name} deleted successfully!', fg='green') - except DeviceNotFound: - click.secho('HWIL Device {device_name} already deleted!', fg='green') - - if offboard: - try: - client.delete_device(device_id=find_device_guid(client, device_name)) - click.secho('Rapyuta.io Device {device_name} deleted successfully in rapyuta.io!', fg='green') - except DeviceNotFound: - click.secho('Rapyuta.io Device {device_name} already deleted!', fg='green') + for device_id, device_name in final.items(): + spinner.text = f'Deleting device {device_name}...' + client.delete_device(device_id) + spinner.text = click.style(f'Device(s) deleted successfully!', fg=Colors.GREEN) + spinner.green.ok(Symbols.SUCCESS) except Exception as e: - click.secho(str(e), fg='red') + spinner.text = click.style(f'Error deleting device(s): {str(e)}', fg=Colors.RED) + spinner.red.fail(Symbols.ERROR) raise SystemExit(1) diff --git a/riocli/hwil/inspect.py b/riocli/hwil/inspect.py index 08e8e0b1..a38e536e 100644 --- a/riocli/hwil/inspect.py +++ b/riocli/hwil/inspect.py @@ -1,11 +1,25 @@ -import json +# Copyright 2024 Rapyuta Robotics +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import click -import yaml from click_help_colors import HelpColorsCommand +from munch import unmunchify + +from riocli.config import new_hwil_client from riocli.constants import Colors from riocli.hwil.util import name_to_id -from riocli.config import new_hwil_client +from riocli.utils import inspect_with_format @click.command( @@ -14,30 +28,22 @@ help_headers_color=Colors.YELLOW, help_options_color=Colors.GREEN, ) -@click.option('--format', '-f', 'format_type', +@click.option('--format', '-f', 'format_type', default='yaml', type=click.Choice(['json', 'yaml'], case_sensitive=False)) -@click.option('--filter', 'filter', multiple=True, - type=click.Choice(['static_ip', 'ip_address', 'status'], case_sensitive=True), - default=['static_ip', 'ip_address']) @click.argument('device-name', type=str) @name_to_id -def inspect_device(format_type: str, - filter: [], - device_name: str, - device_id: str) -> None: +def inspect_device( + format_type: str, + device_name: str, + device_id: str +) -> None: """ - Inspect the device resource + Inspect the hardware-in-the-loop device. """ + client = new_hwil_client() + try: - client = new_hwil_client() device = client.get_device(device_id) - click.secho('{}'.format(",".join([getattr(device, f) for f in filter]))) - if format_type: - if format_type == 'json': - click.echo_via_pager(json.dumps(device, indent=4)) - elif format_type == 'yaml': - click.echo_via_pager(yaml.dump(device, allow_unicode=True)) - else: - raise Exception('Invalid format') + inspect_with_format(unmunchify(device), format_type) except Exception as e: click.secho(str(e), fg=Colors.RED) diff --git a/riocli/hwil/list.py b/riocli/hwil/list.py index b0dd2865..429c046f 100644 --- a/riocli/hwil/list.py +++ b/riocli/hwil/list.py @@ -1,4 +1,4 @@ -# Copyright 2021 Rapyuta Robotics +# Copyright 2024 Rapyuta Robotics # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,11 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import typing + import click -from riocli.config import new_hwil_client from click_help_colors import HelpColorsCommand + +from riocli.config import new_hwil_client from riocli.constants import Colors from riocli.utils import tabulate_data @@ -27,12 +28,9 @@ help_options_color=Colors.GREEN, ) def list_devices() -> None: - """ - lists virtual devices on the cloud - """ + """Lists hardware-in-loop devices.""" try: - client = new_hwil_client() - devices = client.list_devices() + devices = new_hwil_client().list_devices() devices = sorted(devices, key=lambda d: d.name.lower()) _display_device_list(devices, show_header=True) except Exception as e: @@ -43,8 +41,8 @@ def list_devices() -> None: def _display_device_list(devices: typing.List[dict], show_header: bool = True) -> None: headers = [] if show_header: - headers = ('Device ID', 'Name', 'Status', 'Static IP', 'Dynamic IP') + headers = ('ID', 'Name', 'Status', 'Static IP', 'Dynamic IP', 'Flavor') - data = [[d.id, d.name, d.status, d.static_ip, d.ip_address] for d in devices] + data = [[d.id, d.name, d.status, d.static_ip, d.ip_address, d.flavor] for d in devices] tabulate_data(data, headers) diff --git a/riocli/hwil/login.py b/riocli/hwil/login.py index bc7507d9..e53a1970 100644 --- a/riocli/hwil/login.py +++ b/riocli/hwil/login.py @@ -13,13 +13,15 @@ # limitations under the License. import os +from base64 import b64encode + import click -from riocli.hwilclient import Client as HwilClient from click_help_colors import HelpColorsCommand +from rapyuta_io.utils import UnauthorizedError + from riocli.constants import Colors, Symbols +from riocli.hwilclient import Client as HwilClient from riocli.utils.context import get_root_context -from base64 import b64encode -from rapyuta_io.utils import UnauthorizedError from riocli.utils.spinner import with_spinner HWIL_LOGIN_SUCCESS = click.style('{} Successfully logged into HWIL!'.format(Symbols.SUCCESS), fg=Colors.GREEN) @@ -31,42 +33,46 @@ help_headers_color=Colors.YELLOW, help_options_color=Colors.GREEN, ) -@click.option('--hwil-user', required=True, help='Username for HWIL login') -@click.option('--hwil-password', required=True, help='Password for HWIL login') +@click.option('--username', help='Username for HWIL API') +@click.option('--password', help='Password for HWIL API') +@click.option('--interactive/--no-interactive', '--interactive/--silent', + is_flag=True, type=bool, default=True, + help='Make login interactive') @click.pass_context def login( ctx: click.Context, - hwil_user: str, - hwil_password: str, + username: str, + password: str, + interactive: bool = True, ) -> None: """Log in to HWIL.""" - ctx = get_root_context(ctx) - try: - if hwil_user and not hwil_password: - click.secho('hwil password not specified') - if hwil_password and not hwil_user: - click.secho('hwil user not specified') + if interactive: + username = username or click.prompt('Username') + password = password or click.prompt('Password', hide_input=True) - if hwil_user and hwil_password: - if not validate_and_set_hwil_token(ctx, hwil_user, hwil_password): - raise SystemExit(1) - except Exception as e: - click.echo(f"Login failed: {e}") + if not username: + click.secho(f'{Symbols.ERROR} Username not specified', fg=Colors.RED) + raise SystemExit(1) - ctx.obj.save() + if not password: + click.secho(f'{Symbols.ERROR} Password not specified', fg=Colors.RED) + raise SystemExit(1) - click.echo(HWIL_LOGIN_SUCCESS) + try: + validate_and_set_hwil_token(ctx, username, password) + except Exception as e: + raise SystemExit(1) from e -@with_spinner(text='Validating hwil credentials...') +@with_spinner(text='Validating credentials...') def validate_and_set_hwil_token( ctx: click.Context, username: str, password: str, spinner=None -) -> bool: +) -> None: """Validates an auth token.""" if 'environment' in ctx.obj.data: os.environ['RIO_CONFIG'] = ctx.obj.filepath @@ -77,13 +83,14 @@ def validate_and_set_hwil_token( try: client.list_devices() ctx.obj.data['hwil_auth_token'] = token - spinner.ok(Symbols.INFO) - return True - except UnauthorizedError: - spinner.text = click.style("incorrect credentials for hwil", fg=Colors.RED) + ctx.obj.save() + spinner.text = click.style('Successfully logged in.', fg=Colors.GREEN) + spinner.green.ok(Symbols.SUCCESS) + except UnauthorizedError as e: + spinner.red.text = click.style("Incorrect credentials.", fg=Colors.RED) spinner.red.fail(Symbols.ERROR) - return False + raise e except Exception as e: - spinner.text = click.style(str(e), fg=Colors.RED) + spinner.text = click.style(f'Failed to login: {str(e)}', fg=Colors.RED) spinner.red.fail(Symbols.ERROR) - return False + raise e diff --git a/riocli/hwil/util.py b/riocli/hwil/util.py index 1edbdb68..0b00c6ea 100644 --- a/riocli/hwil/util.py +++ b/riocli/hwil/util.py @@ -1,10 +1,26 @@ +# Copyright 2024 Rapyuta Robotics +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import functools import typing + import click -from riocli.constants import Colors + from riocli.config import new_hwil_client -from riocli.hwilclient import Client +from riocli.constants import Colors from riocli.device.util import DeviceNotFound +from riocli.hwilclient import Client def name_to_id(f: typing.Callable) -> typing.Callable: @@ -53,4 +69,4 @@ def find_device_id(client: Client, name: str) -> str: if device.name == name: return device.id - raise DeviceNotFound(message="Hwil Device not found") + raise DeviceNotFound(message="HWIL device not found") diff --git a/riocli/hwilclient/client.py b/riocli/hwilclient/client.py index c8e657a2..a1ed33d7 100644 --- a/riocli/hwilclient/client.py +++ b/riocli/hwilclient/client.py @@ -15,16 +15,12 @@ import http import json -import os import time -from hashlib import md5 -from typing import List, Optional, Dict, Any -import magic import requests -from munch import munchify, Munch +from munch import Munch, munchify +from rapyuta_io.utils import ConflictError, RetriesExhausted, UnauthorizedError from rapyuta_io.utils.rest_client import HttpMethod, RestClient -from rapyuta_io.utils import UnauthorizedError, ConflictError, RetriesExhausted def handle_server_errors(response: requests.Response): @@ -32,10 +28,10 @@ def handle_server_errors(response: requests.Response): # 409 Conflict if status_code == http.HTTPStatus.CONFLICT: - raise ConflictError + raise ConflictError('already exists') # 401 Unauthorized if status_code == http.HTTPStatus.UNAUTHORIZED: - raise UnauthorizedError + raise UnauthorizedError('unauthorized access') # 500 Internal Server Error if status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR: raise Exception('internal server error') @@ -58,84 +54,123 @@ def handle_server_errors(response: requests.Response): class Client(object): """ - hwil API Client + HWILv3 API Client """ HWIL_URL = "https://hwilv3.rapyuta.io" - ARCH_OS_DICT = {"amd64": - {"ubuntu": { - "bionic": "ubuntu-bionic-ros-melodic-py3", - "focal": "ubuntu-focal-ros-noetic-py3", - "jammy": "ubuntu-jammy-plain-py3", - }}, - "arm64": - {"ubuntu": {"focal": "ubuntu-focal-ros-noetic-py3"}, - "debian": {"bullseye": "debian-bullseye-docker"}} + ARCH_OS_DICT = { + "amd64": { + "ubuntu": { + "bionic": "ubuntu-bionic-ros-melodic-py3", + "focal": "ubuntu-focal-ros-noetic-py3", + "jammy": "ubuntu-jammy-plain-py3", + } + }, + "arm64": { + "ubuntu": { + "focal": "ubuntu-focal-ros-noetic-py3" + }, + "debian": { + "bullseye": "debian-bullseye-docker" + } + } } def __init__(self, auth_token: str): self._token = auth_token self._host = self.HWIL_URL - def create_device(self, name: str, arch: str, os: str, codename: str): + def create_device( + self: Client, + name: str, + arch: str, + os: str, + codename: str, + labels: dict = None, + ) -> Munch: + """Create a HWIL device.""" url = f"{self._host}/device/" headers = self._get_auth_header() + + labels = labels or {} + labels.update({"agent": "rapyuta-io-cli"}) + + payload = { + "kind": "VIRTUAL", + "name": name, + "architecture": arch, + "labels": labels, + "flavor": self.ARCH_OS_DICT.get(arch).get(os).get(codename) + } + response = RestClient(url).method(HttpMethod.POST).headers( - headers).execute(payload={"kind": "VIRTUAL", "name": name, "architecture": arch, - "flavor": self.ARCH_OS_DICT.get(arch).get(os).get(codename)}) + headers).execute(payload=payload) handle_server_errors(response) data = json.loads(response.text) if not response.ok: err_msg = data.get('error') - raise Exception("hwildevices: {}".format(err_msg)) + raise Exception("hwil: {}".format(err_msg)) return munchify(data) - def delete_device(self, id: int): - url = f"{self._host}/device/{id}" + def delete_device(self: Client, device_id: int) -> None: + """Delete a HWIL device.""" + url = f"{self._host}/device/{device_id}" headers = self._get_auth_header() response = RestClient(url).method(HttpMethod.DELETE).headers(headers).execute() handle_server_errors(response) - def get_device(self, id: int): - url = f"{self._host}/device/{id}" + def get_device(self: Client, device_id: int) -> Munch: + """Fetch a HWIL device.""" + url = f"{self._host}/device/{device_id}" headers = self._get_auth_header() response = RestClient(url).method(HttpMethod.GET).headers(headers).execute() handle_server_errors(response) data = json.loads(response.text) if not response.ok: err_msg = data.get('error') - raise Exception("hwil_devices: {}".format(err_msg)) + raise Exception("hwil: {}".format(err_msg)) return munchify(data) - def execute_cmd(self, id: int, command: str): + def execute_cmd(self: Client, device_id: int, command: str) -> Munch: + """Execute a command on the HWIL device.""" url = f"{self._host}/command/" headers = self._get_auth_header() + + payload = { + "kind": "VIRTUAL", + "device_id": device_id, + "command": command, + } + response = RestClient(url).method(HttpMethod.POST).headers( - headers).execute(payload={"kind": "VIRTUAL", "device_id": id, - "command": command}) + headers).execute(payload=payload) handle_server_errors(response) data = json.loads(response.text) if not response.ok: err_msg = data.get('error') - raise Exception("hwildevice: command {}".format(err_msg)) + raise Exception("hwil: {}".format(err_msg)) return munchify(data) - def poll_till_device_ready(self, id: int, sleep_interval: int, retry_limit: int): + def poll_till_device_ready(self: Client, device_id: int, sleep_interval: int, retry_limit: int) -> None: + """Poll until HWIL device is ready""" + url = f"{self._host}/device/{device_id}" + headers = self._get_auth_header() + for _ in range(retry_limit): - url = f"{self._host}/device/{id}" - headers = self._get_auth_header() response = RestClient(url).method(HttpMethod.GET).headers(headers).execute() + handle_server_errors(response) + data = json.loads(response.text) if not response.ok: err_msg = data.get('error') - raise Exception("hwil_devices: {}".format(err_msg)) + raise Exception("hwil: {}".format(err_msg)) device = munchify(data) if device.status != 'IDLE': @@ -144,10 +179,11 @@ def poll_till_device_ready(self, id: int, sleep_interval: int, retry_limit: int) return - msg = 'Retries exhausted: Tried {} times with {}s interval.'.format(retry_limit, sleep_interval) + msg = f'Retries exhausted: Tried {retry_limit} times with {sleep_interval}s interval.' raise RetriesExhausted(msg) - def list_devices(self): + def list_devices(self: Client): + """Fetch all HWIL devices""" url = f"{self._host}/device/" headers = self._get_auth_header() response = RestClient(url).method(HttpMethod.GET).headers(headers).execute() @@ -156,649 +192,9 @@ def list_devices(self): data = json.loads(response.text) if not response.ok: err_msg = data.get('error') - raise Exception("hwil_devices: {}".format(err_msg)) + raise Exception("hwil: {}".format(err_msg)) return munchify(data) def _get_auth_header(self: Client) -> dict: - headers = dict(Authorization=f'Basic {self._token}') - return headers - - # Project APIs - - def list_projects( - self, - organization_guid: str = None, - query: dict = None - ) -> Munch: - """ - List all projects in an organization - """ - - url = "{}/v2/projects/".format(self._host) - headers = self._get_auth_header(with_project=False) - - params = {} - - if organization_guid: - params.update({ - "organizations": organization_guid, - }) - - params.update(query or {}) - - client = RestClient(url).method(HttpMethod.GET).headers(headers) - return self._walk_pages(client, params=params) - - def get_project(self, project_guid: str) -> Munch: - """ - Get a project by its GUID - """ - url = "{}/v2/projects/{}/".format(self._host, project_guid) - headers = self._get_auth_header() - response = RestClient(url).method( - HttpMethod.GET).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("projects: {}".format(err_msg)) - - return munchify(data) - - def create_project(self, spec: dict) -> Munch: - """ - Create a new project - """ - url = "{}/v2/projects/".format(self._host) - headers = self._get_auth_header(with_project=False) - response = RestClient(url).method(HttpMethod.POST).headers( - headers).execute(payload=spec) - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("projects: {}".format(err_msg)) - - return munchify(data) - - def update_project(self, project_guid: str, spec: dict) -> Munch: - """ - Update an existing project - """ - url = "{}/v2/projects/{}/".format(self._host, project_guid) - headers = self._get_auth_header(with_project=False) - response = RestClient(url).method(HttpMethod.PUT).headers( - headers).execute(payload=spec) - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("projects: {}".format(err_msg)) - - return munchify(data) - - def update_project_owner(self, project_guid: str, new_owner_guid: str) -> Munch: - """ - Update an existing project's owner (creator) - """ - url = "{}/v2/projects/{}/owner/".format(self._host, project_guid) - headers = self._get_auth_header(with_project=False) - response = RestClient(url).method(HttpMethod.PUT).headers( - headers).execute(payload={'metadata': {'creatorGUID': new_owner_guid}}) - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("projects: {}".format(err_msg)) - - return munchify(data) - - def delete_project(self, project_guid: str) -> Munch: - """ - Delete a project by its GUID - """ - url = "{}/v2/projects/{}/".format(self._host, project_guid) - headers = self._get_auth_header(with_project=False) - response = RestClient(url).method( - HttpMethod.DELETE).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("projects: {}".format(err_msg)) - - return munchify(data) - - # ManagedService APIs - def list_providers(self) -> List: - """ - List all managedservice provider - """ - url = "{}/v2/managedservices/providers/".format(self._host) - headers = self._get_auth_header(with_project=False) - response = RestClient(url).method( - HttpMethod.GET).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("managedservice: {}".format(err_msg)) - - return munchify(data.get('items', [])) - - def list_instances(self) -> List: - """ - List all managedservice instances in a project - """ - url = "{}/v2/managedservices/".format(self._host) - headers = self._get_auth_header() - - client = RestClient(url).method(HttpMethod.GET).headers(headers) - return self._walk_pages(client) - - def get_instance(self, instance_name: str) -> Munch: - """ - Get a managedservice instance by instance_name - """ - url = "{}/v2/managedservices/{}/".format(self._host, instance_name) - headers = self._get_auth_header() - response = RestClient(url).method( - HttpMethod.GET).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("managedservice: {}".format(err_msg)) - - return munchify(data) - - def create_instance(self, instance: Dict) -> Munch: - url = "{}/v2/managedservices/".format(self._host) - headers = self._get_auth_header() - - response = RestClient(url).method(HttpMethod.POST).headers( - headers).execute(payload=instance) - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("managedservice: {}".format(err_msg)) - - return munchify(data) - - def delete_instance(self, instance_name) -> Munch: - url = "{}/v2/managedservices/{}/".format(self._host, instance_name) - headers = self._get_auth_header() - response = RestClient(url).method( - HttpMethod.DELETE).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("managedservice: {}".format(err_msg)) - - return munchify(data) - - def list_instance_bindings(self, instance_name: str, labels: str = '') -> List: - """ - List all managedservice instances in a project - """ - url = "{}/v2/managedservices/{}/bindings/".format(self._host, instance_name) - headers = self._get_auth_header() - - client = RestClient(url).method(HttpMethod.GET).headers(headers) - return self._walk_pages(client, params={'labelSelector': labels}) - - def create_instance_binding(self, instance_name, binding: dict) -> Munch: - """ - Create a new managed service instance binding - """ - url = "{}/v2/managedservices/{}/bindings/".format( - self._host, instance_name) - headers = self._get_auth_header() - response = RestClient(url).method( - HttpMethod.POST).headers(headers).execute(payload=binding) - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("managedservice: {}".format(err_msg)) - - return munchify(data) - - def get_instance_binding(self, instance_name: str, binding_name: str) -> Munch: - """ - Get a managed service instance binding - """ - url = "{}/v2/managedservices/{}/bindings/{}/".format( - self._host, instance_name, binding_name) - headers = self._get_auth_header() - response = RestClient(url).method( - HttpMethod.GET).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("managedservice: {}".format(err_msg)) - - return munchify(data) - - def delete_instance_binding(self, instance_name: str, binding_name: str) -> Munch: - """ - Delete a managed service instance binding - """ - url = "{}/v2/managedservices/{}/bindings/{}/".format( - self._host, instance_name, binding_name) - headers = self._get_auth_header() - response = RestClient(url).method( - HttpMethod.DELETE).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("managedservice: {}".format(err_msg)) - - return munchify(data) - - def list_static_routes( - self, - query: dict = None - ) -> Munch: - """ - List all static routes in a project - """ - url = "{}/v2/staticroutes/".format(self._host) - headers = self._get_auth_header() - - params = {} - params.update(query or {}) - - client = RestClient(url).method(HttpMethod.GET).headers(headers) - return self._walk_pages(client, params=params) - - def get_static_route(self, name: str) -> Munch: - """ - Get a static route by its name - """ - url = "{}/v2/staticroutes/{}/".format(self._host, name) - headers = self._get_auth_header() - response = RestClient(url).method( - HttpMethod.GET).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("static routes: {}".format(err_msg)) - - return munchify(data) - - def create_static_route(self, metadata: dict) -> Munch: - """ - Create a new static route - """ - url = "{}/v2/staticroutes/".format(self._host) - headers = self._get_auth_header() - response = RestClient(url).method(HttpMethod.POST).headers( - headers).execute(payload=metadata) - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("static routes: {}".format(err_msg)) - - return munchify(data) - - def update_static_route(self, name: str, sr: dict) -> Munch: - """ - Update the new static route - """ - url = "{}/v2/staticroutes/{}/".format(self._host, name) - headers = self._get_auth_header() - response = RestClient(url).method(HttpMethod.PUT).headers( - headers).execute(payload=sr) - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("static routes: {}".format(err_msg)) - - return munchify(data) - - def delete_static_route(self, name: str) -> Munch: - """ - Delete a static route by its name - """ - url = "{}/v2/staticroutes/{}/".format(self._host, name) - headers = self._get_auth_header() - response = RestClient(url).method( - HttpMethod.DELETE).headers(headers).execute() - - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("static routes: {}".format(err_msg)) - - return munchify(data) - - def create_secret(self, payload: dict) -> Munch: - """ - Create a new secret - """ - url = "{}/v2/secrets/".format(self._host) - headers = self._get_auth_header() - response = RestClient(url).method(HttpMethod.POST).headers( - headers).execute(payload=payload) - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("secret: {}".format(err_msg)) - - return munchify(data) - - def delete_secret(self, secret_name: str) -> Munch: - """ - Delete a secret - """ - url = "{}/v2/secrets/{}/".format(self._host, secret_name) - headers = self._get_auth_header() - response = RestClient(url).method(HttpMethod.DELETE).headers( - headers).execute() - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("secret: {}".format(err_msg)) - - return munchify(data) - - def list_secrets( - self, - query: dict = None - ) -> Munch: - """ - List all secrets in a project - """ - url = "{}/v2/secrets/".format(self._host) - headers = self._get_auth_header() - - params = {} - params.update(query or {}) - - client = RestClient(url).method(HttpMethod.GET).headers(headers) - return self._walk_pages(client, params=params) - - def get_secret( - self, - secret_name: str - ) -> Munch: - """ - Get secret by name - """ - url = "{}/v2/secrets/{}/".format(self._host, secret_name) - headers = self._get_auth_header() - - response = RestClient(url).method(HttpMethod.GET).headers(headers).execute() - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("secrets: {}".format(err_msg)) - - return munchify(data) - - def update_secret(self, secret_name: str, spec: dict) -> Munch: - """ - Update a secret - """ - url = "{}/v2/secrets/{}/".format(self._host, secret_name) - headers = self._get_auth_header() - response = RestClient(url).method(HttpMethod.PUT).headers( - headers).execute(payload=spec) - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("secret: {}".format(err_msg)) - - return munchify(data) - - # ConfigTrees APIs - def list_config_trees(self) -> Munch: - url = "{}/v2/configtrees/".format(self._host) - headers = self._get_auth_header(with_org=True) - client = RestClient(url).method(HttpMethod.GET).headers(headers) - return self._walk_pages(client) - - def create_config_tree(self, tree_spec: dict) -> Munch: - url = "{}/v2/configtrees/".format(self._host) - headers = self._get_auth_header(with_org=True) - response = RestClient(url).method(HttpMethod.POST).headers( - headers).execute(payload=tree_spec) - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def delete_config_tree(self, tree_name: str) -> Munch: - url = "{}/v2/configtrees/{}/".format(self._host, tree_name) - headers = self._get_auth_header(with_org=True) - response = RestClient(url).method(HttpMethod.DELETE).headers( - headers).execute() - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def get_config_tree(self, tree_name: str, rev_id: Optional[str] = None, - include_data: bool = False, filter_content_types: Optional[List[str]] = None, - filter_prefixes: Optional[List[str]] = None) -> Munch: - url = "{}/v2/configtrees/{}/".format(self._host, tree_name) - query = { - 'includeData': include_data, - 'contentTypes': filter_content_types, - 'keyPrefixes': filter_prefixes, - 'revision': rev_id, - } - headers = self._get_auth_header(with_org=True) - response = RestClient(url).method(HttpMethod.GET).headers( - headers).query_param(query).execute() - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def set_revision_config_tree(self, tree_name: str, spec: dict) -> Munch: - url = "{}/v2/configtrees/{}/".format(self._host, tree_name) - headers = self._get_auth_header(with_org=True) - response = RestClient(url).method(HttpMethod.PUT).headers( - headers).execute(payload=spec) - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def list_config_tree_revisions(self, tree_name: str) -> Munch: - url = "{}/v2/configtrees/{}/revisions/".format(self._host, tree_name) - headers = self._get_auth_header(with_org=True) - client = RestClient(url).method(HttpMethod.GET).headers(headers) - return self._walk_pages(client) - - def initialize_config_tree_revision(self, tree_name: str) -> Munch: - url = "{}/v2/configtrees/{}/revisions/".format(self._host, tree_name) - headers = self._get_auth_header(with_org=True) - response = RestClient(url).method(HttpMethod.POST).headers( - headers).execute() - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def commit_config_tree_revision(self, tree_name: str, rev_id: str, payload: dict) -> Munch: - url = "{}/v2/configtrees/{}/revisions/{}/".format(self._host, tree_name, rev_id) - headers = self._get_auth_header(with_org=True) - response = RestClient(url).method(HttpMethod.PATCH).headers( - headers).execute(payload=payload) - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def store_keys_in_revision(self, tree_name: str, rev_id: str, payload: Any) -> Munch: - url = "{}/v2/configtrees/{}/revisions/{}/".format(self._host, tree_name, rev_id) - headers = self._get_auth_header(with_org=True) - response = RestClient(url).method(HttpMethod.PUT).headers( - headers).execute(payload=payload) - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def store_key_in_revision(self, tree_name: str, rev_id: str, key: str, value: str, perms: int = 644) -> Munch: - url = "{}/v2/configtrees/{}/revisions/{}/{}".format(self._host, tree_name, rev_id, key) - headers = self._get_auth_header(with_org=True) - headers['Content-Type'] = 'kv' - headers['X-Checksum'] = md5(str(value).encode('utf-8')).hexdigest() - headers['X-Permissions'] = str(perms) - - response = RestClient(url).method(HttpMethod.PUT).headers( - headers).execute(value) - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def store_file_in_revision(self, tree_name: str, rev_id: str, key: str, file_path: str) -> Munch: - stat = os.stat(file_path) - perms = oct(stat.st_mode & 0o777)[-3:] - - content_type = magic.from_file(file_path, mime=True) - - url = "{}/v2/configtrees/{}/revisions/{}/{}".format(self._host, tree_name, rev_id, key) - headers = self._get_auth_header(with_org=True) - headers['Content-Type'] = content_type - headers['X-Permissions'] = perms - - with open(file_path, 'rb') as f: - file_hash = md5() - chunk = f.read(8192) - while chunk: - file_hash.update(chunk) - chunk = f.read(8192) - - headers['X-Checksum'] = file_hash.hexdigest() - f.seek(0) - - response = RestClient(url).method(HttpMethod.PUT).headers(headers).execute(payload=f, raw=True) - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def delete_key_in_revision(self, tree_name: str, rev_id: str, key: str) -> Munch: - url = "{}/v2/configtrees/{}/revisions/{}/{}".format(self._host, tree_name, rev_id, key) - headers = self._get_auth_header(with_org=True) - response = RestClient(url).method(HttpMethod.DELETE).headers(headers).execute() - handle_server_errors(response) - - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("configtree: {}".format(err_msg)) - - return munchify(data) - - def _walk_pages(self, c: RestClient, params: dict = {}, limit: Optional[int] = None) -> Munch: - offset, result = 0, [] - - if limit is not None: - params["limit"] = limit - - while True: - params["continue"] = offset - - response = c.query_param(params).execute() - data = json.loads(response.text) - if not response.ok: - err_msg = data.get('error') - raise Exception("listing: {}".format(err_msg)) - - items = data.get('items', []) - if not items: - break - - offset = data['metadata']['continue'] - result.extend(items) - - return munchify(result) + return dict(Authorization=f"Basic {self._token}")