diff --git a/riocli/apply/__init__.py b/riocli/apply/__init__.py index 63956be2..86401a92 100644 --- a/riocli/apply/__init__.py +++ b/riocli/apply/__init__.py @@ -20,7 +20,6 @@ from riocli.apply.explain import explain, list_examples from riocli.apply.parse import Applier from riocli.apply.template import template -from riocli.apply.util import print_resolved_objects from riocli.apply.util import process_files_values_secrets from riocli.constants import Colors from riocli.utils import print_centered_text @@ -116,12 +115,22 @@ def apply( @click.option('-f', '--force', '--silent', 'silent', is_flag=True, type=click.BOOL, default=False, help="Skip confirmation") +@click.option('--workers', '-w', + help="number of parallel workers while running apply " + "command. defaults to 6.", type=int) +@click.option('--retry-count', '-rc', type=int, default=50, + help="Number of retries before a resource creation times out status, defaults to 50") +@click.option('--retry-interval', '-ri', type=int, default=6, + help="Interval between retries defaults to 6") @click.argument('files', nargs=-1) def delete( values: str, secrets: str, files: Iterable[str], + retry_count: int = 50, + retry_interval: int = 6, dryrun: bool = False, + workers: int = 6, silent: bool = False ) -> None: """ @@ -145,4 +154,4 @@ def delete( click.confirm("\nDo you want to proceed?", default=True, abort=True) print_centered_text('Deleting Resources') - applier.delete(dryrun=dryrun) + applier.delete(dryrun=dryrun, workers=workers, retry_count=retry_count, retry_interval=retry_interval) diff --git a/riocli/apply/parse.py b/riocli/apply/parse.py index f861c59f..00a684f3 100644 --- a/riocli/apply/parse.py +++ b/riocli/apply/parse.py @@ -1,4 +1,4 @@ -# Copyright 2022 Rapyuta Robotics +# Copyright 2024 Rapyuta Robotics # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,17 +22,17 @@ import yaml from munch import munchify -from riocli.apply.util import get_model, print_resolved_objects, message_with_prompt +from riocli.apply.util import get_model, message_with_prompt, print_resolved_objects from riocli.constants import Colors, Symbols -from riocli.utils import dump_all_yaml, run_bash, print_centered_text +from riocli.exceptions import ResourceNotFound +from riocli.utils import dump_all_yaml, print_centered_text, run_bash from riocli.utils.graph import Graphviz from riocli.utils.spinner import with_spinner -DELETE_POLICY_LABEL = 'rapyuta.io/deletionPolicy' - class Applier(object): DEFAULT_MAX_WORKERS = 6 + DELETE_POLICY_LABEL = 'rapyuta.io/deletionPolicy' def __init__(self, files: typing.List, values, secrets): self.environment = None @@ -57,8 +57,19 @@ def __init__(self, files: typing.List, values, secrets): self._process_file_list(files) + def print_resolved_manifests(self): + """Validates and prints the resolved manifests""" + manifests = [] + for _, o in self.objects.items(): + kls = get_model(o) + kls.validate(o) + manifests.append(o) + + dump_all_yaml(manifests) + @with_spinner(text='Applying...', timer=True) def apply(self, *args, **kwargs): + """Apply the resources defined in the manifest files""" spinner = kwargs.get('spinner') kwargs['workers'] = int(kwargs.get('workers') or self.DEFAULT_MAX_WORKERS) @@ -76,32 +87,25 @@ def apply(self, *args, **kwargs): spinner.red.fail(Symbols.ERROR) raise SystemExit(1) from e - def apply_async(self, *args, **kwargs): - workers = int(kwargs.get('workers') or self.DEFAULT_MAX_WORKERS) + def apply_sync(self, *args, **kwargs): + self.graph.prepare() + while self.graph.is_active(): + for obj in self.graph.get_ready(): + if (obj in self.resolved_objects and + 'manifest' in self.resolved_objects[obj]): + self._apply_manifest(obj, *args, **kwargs) + self.graph.done(obj) + def apply_async(self, *args, **kwargs): task_queue = queue.Queue() done_queue = queue.Queue() - def worker(): - while True: - o = task_queue.get() - if o in self.resolved_objects and 'manifest' in self.resolved_objects[o]: - try: - self._apply_manifest(o, *args, **kwargs) - except Exception as ex: - done_queue.put(ex) - continue - - task_queue.task_done() - done_queue.put(o) - - # Start the workers that will accept tasks from the task_queue - # and process them. Upon completion, they will put the object - # in the done_queue. - worker_list = [] - for worker_id in range(workers): - worker_list.append(threading.Thread(target=worker, daemon=True)) - worker_list[worker_id].start() + self._start_apply_workers( + self._apply_manifest, + task_queue, + done_queue, + *args, **kwargs + ) self.graph.prepare() while self.graph.is_active(): @@ -109,48 +113,120 @@ def worker(): task_queue.put(obj) done_obj = done_queue.get() - if not isinstance(done_obj, Exception): - self.graph.done(done_obj) - else: + if isinstance(done_obj, Exception): raise Exception(done_obj) + self.graph.done(done_obj) + # Block until the task_queue is empty. task_queue.join() - def apply_sync(self, *args, **kwargs): - self.graph.prepare() - while self.graph.is_active(): - for obj in self.graph.get_ready(): - if (obj in self.resolved_objects and - 'manifest' in self.resolved_objects[obj]): - self._apply_manifest(obj, *args, **kwargs) - self.graph.done(obj) - @with_spinner(text='Deleting...', timer=True) def delete(self, *args, **kwargs): + """Delete resources defined in manifests.""" spinner = kwargs.get('spinner') - delete_order = list(self.graph.static_order()) - delete_order.reverse() + kwargs['workers'] = int(kwargs.get('workers') + or self.DEFAULT_MAX_WORKERS) + + delete_func = self.delete_async + if kwargs['workers'] == 1: + delete_func = self.delete_sync + try: - for obj in delete_order: - if (obj in self.resolved_objects and - 'manifest' in self.resolved_objects[obj]): - self._delete_manifest(obj, *args, **kwargs) + delete_func(*args, **kwargs) spinner.text = 'Delete successful.' spinner.green.ok(Symbols.SUCCESS) except Exception as e: spinner.text = 'Delete failed. Error: {}'.format(e) spinner.red.fail(Symbols.ERROR) + raise SystemExit(1) from e - def print_resolved_manifests(self): - """Validates and prints the resolved manifests""" - manifests = [] - for _, o in self.objects.items(): - kls = get_model(o) - kls.validate(o) - manifests.append(o) + def delete_sync(self, *args, **kwargs) -> None: + delete_order = list(self.graph.static_order()) + delete_order.reverse() + for o in delete_order: + if o in self.resolved_objects and 'manifest' in self.resolved_objects[o]: + self._delete_manifest(o, *args, **kwargs) - dump_all_yaml(manifests) + def delete_async(self, *args, **kwargs) -> None: + task_queue = queue.Queue() + done_queue = queue.Queue() + + self._start_apply_workers( + self._delete_manifest, + task_queue, + done_queue, + *args, **kwargs + ) + + for nodes in self._get_async_delete_order(): + for node in nodes: + task_queue.put(node) + + done_obj = done_queue.get() + if isinstance(done_obj, Exception): + raise Exception(done_obj) + + task_queue.join() + + def _start_apply_workers( + self, + func: typing.Callable, + tasks: queue.Queue, + done: queue.Queue, + *args, **kwargs, + ) -> None: + """A helper method to start workers for apply/delete operations + + The `func` should have the following signature: + func(obj_key: str, *args, **kwargs) -> None + + The `tasks` queue is used to pass objects to the workers for processing. + The `done` queue is used to pass the processed objects back to the main. + """ + def _worker(): + while True: + o = tasks.get() + if o in self.resolved_objects and 'manifest' in self.resolved_objects[o]: + try: + func(o, *args, **kwargs) + except Exception as ex: + tasks.task_done() + done.put(ex) + continue + + tasks.task_done() + done.put(o) + + # Start the workers that will accept tasks from the task_queue + # and process them. Upon completion, they will put the object + # in the done_queue. The main thread will wait for the task_queue + # to be empty before exiting. The daemon threads will die with the + # main process. + n = int(kwargs.get('workers') or self.DEFAULT_MAX_WORKERS) + for i in range(n): + threading.Thread( + target=_worker, + daemon=True, + name=f'worker-{i}' + ).start() + + def _get_async_delete_order(self): + """Returns the delete order for async delete operation + + This method returns the delete order in a way that the + resources that are dependent on other resources are deleted + first while also ensuring that they can be processed concurrently. + """ + stack = [] + self.graph.prepare() + while self.graph.is_active(): + nodes = self.graph.get_ready() + stack.append(nodes) + self.graph.done(*nodes) + + while stack: + yield stack.pop() def parse_dependencies(self): for _, data in self.files.items(): @@ -182,14 +258,14 @@ def _apply_manifest(self, obj_key: str, *args, **kwargs) -> None: try: if not dryrun: ist.apply(*args, **kwargs) + + message_with_prompt("{} Applied {}".format( + Symbols.SUCCESS, obj_key), fg=Colors.GREEN, spinner=spinner) except Exception as ex: message_with_prompt("{} Failed to apply {}. Error: {}".format( Symbols.ERROR, obj_key, str(ex)), fg=Colors.RED, spinner=spinner) raise ex - message_with_prompt("{} Applied {}".format( - Symbols.SUCCESS, obj_key), fg=Colors.GREEN, spinner=spinner) - def _delete_manifest(self, obj_key: str, *args, **kwargs) -> None: """Instantiate and delete the object manifest""" spinner = kwargs.get('spinner') @@ -206,21 +282,23 @@ def _delete_manifest(self, obj_key: str, *args, **kwargs) -> None: # If a resource has a label with DELETE_POLICY_LABEL set # to 'retain', it should not be deleted. labels = obj.get('metadata', {}).get('labels', {}) - can_delete = labels.get(DELETE_POLICY_LABEL) != 'retain' + can_delete = labels.get(self.DELETE_POLICY_LABEL) != 'retain' try: if not dryrun and can_delete: ist.delete(*args, **kwargs) + + message_with_prompt("{} Deleted {}.".format( + Symbols.SUCCESS, obj_key), fg=Colors.GREEN, spinner=spinner) + except ResourceNotFound: + message_with_prompt("{} {} not found".format( + Symbols.WARNING, obj_key), fg=Colors.YELLOW, spinner=spinner) + return except Exception as ex: message_with_prompt("{} Failed to delete {}. Error: {}".format( Symbols.ERROR, obj_key, str(ex)), fg=Colors.RED, spinner=spinner) raise ex - message_with_prompt("{} Deleted {}.".format( - Symbols.SUCCESS, obj_key), fg=Colors.GREEN, spinner=spinner) - - # File Loading Operations - def _process_file_list(self, files): for f in files: data = self._load_file_content(f) @@ -294,8 +372,6 @@ def _load_file_content(self, file_name, is_value=False, is_secret=False): return loaded_data - # Graph Operations - def _add_graph_node(self, key): self.graph.add(key) self.diagram.node(key) @@ -304,7 +380,6 @@ def _add_graph_edge(self, dependent_key, key): self.graph.add(dependent_key, key) self.diagram.edge(key, dependent_key) - # Dependency Resolution def _parse_dependency(self, dependent_key, model): # TODO(pallab): let resources determine their own dependencies and return them # kls = get_model(model) diff --git a/riocli/deployment/model.py b/riocli/deployment/model.py index 34fe9eed..b4af5254 100644 --- a/riocli/deployment/model.py +++ b/riocli/deployment/model.py @@ -16,9 +16,9 @@ from munch import unmunchify from riocli.config import new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model -from riocli.v2client.error import HttpAlreadyExistsError -from riocli.v2client.error import HttpNotFoundError +from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError class Deployment(Model): @@ -43,4 +43,4 @@ def delete(self, *args, **kwargs): try: client.delete_deployment(self.metadata.name) except HttpNotFoundError: - pass + raise ResourceNotFound diff --git a/riocli/device/model.py b/riocli/device/model.py index 1d3a53ba..5b950fad 100644 --- a/riocli/device/model.py +++ b/riocli/device/model.py @@ -15,14 +15,9 @@ from rapyuta_io.clients.device import Device as v1Device, DevicePythonVersion from riocli.config import new_client -from riocli.device.util import ( - create_hwil_device, - delete_hwil_device, - execute_onboard_command, - find_device_by_name, - make_device_labels_from_hwil_device, - DeviceNotFound, -) +from riocli.device.util import (DeviceNotFound, create_hwil_device, delete_hwil_device, execute_onboard_command, + find_device_by_name, make_device_labels_from_hwil_device) +from riocli.exceptions import ResourceNotFound from riocli.model import Model @@ -85,7 +80,7 @@ def delete(self, *args, **kwargs) -> None: try: device = find_device_by_name(client, self.metadata.name) except DeviceNotFound: - return + raise ResourceNotFound if self.spec.get('virtual', {}).get('enabled', False): delete_hwil_device(device) diff --git a/riocli/disk/model.py b/riocli/disk/model.py index 8bbe15fe..197e03c2 100644 --- a/riocli/disk/model.py +++ b/riocli/disk/model.py @@ -15,8 +15,9 @@ from munch import unmunchify from riocli.config import new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model -from riocli.v2client.error import HttpNotFoundError, HttpAlreadyExistsError +from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError class Disk(Model): @@ -51,4 +52,4 @@ def delete(self, *args, **kwargs) -> None: try: client.delete_disk(self.metadata.name) except HttpNotFoundError: - pass + raise ResourceNotFound diff --git a/riocli/exceptions/__init__.py b/riocli/exceptions/__init__.py index 677eb6b7..fb53854d 100644 --- a/riocli/exceptions/__init__.py +++ b/riocli/exceptions/__init__.py @@ -48,3 +48,9 @@ class DeviceNotFound(Exception): def __init__(self, message='device not found'): self.message = message super().__init__(self.message) + + +class ResourceNotFound(Exception): + def __init__(self, message='resource not found'): + self.message = message + super().__init__(self.message) diff --git a/riocli/managedservice/model.py b/riocli/managedservice/model.py index 72863589..b8b49164 100644 --- a/riocli/managedservice/model.py +++ b/riocli/managedservice/model.py @@ -15,6 +15,7 @@ from munch import unmunchify from riocli.config import new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError @@ -38,4 +39,4 @@ def delete(self, *args, **kwargs) -> None: try: client.delete_instance(self.metadata.name) except HttpNotFoundError: - pass + raise ResourceNotFound diff --git a/riocli/network/model.py b/riocli/network/model.py index 877b3ce2..4255d63f 100644 --- a/riocli/network/model.py +++ b/riocli/network/model.py @@ -15,8 +15,9 @@ from munch import unmunchify from riocli.config import new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model -from riocli.v2client.error import HttpNotFoundError, HttpAlreadyExistsError +from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError class Network(Model): @@ -47,4 +48,4 @@ def delete(self, *args, **kwargs) -> None: try: client.delete_network(self.metadata.name) except HttpNotFoundError: - pass \ No newline at end of file + raise ResourceNotFound diff --git a/riocli/package/model.py b/riocli/package/model.py index a0cbe52f..35bb7384 100644 --- a/riocli/package/model.py +++ b/riocli/package/model.py @@ -16,6 +16,7 @@ from munch import unmunchify from riocli.config import new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model from riocli.package.enum import RestartPolicy from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError @@ -51,7 +52,7 @@ def delete(self, *args, **kwargs) -> None: query={"version": self.metadata.version} ) except HttpNotFoundError: - pass + raise ResourceNotFound def _sanitize_package(self) -> typing.Dict: # Unset createdAt and updatedAt to avoid timestamp parsing issue. diff --git a/riocli/project/model.py b/riocli/project/model.py index c1883bc1..62830cf0 100644 --- a/riocli/project/model.py +++ b/riocli/project/model.py @@ -15,10 +15,11 @@ from munch import unmunchify from waiting import wait -from riocli.config import new_v2_client, Configuration +from riocli.config import Configuration, new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model -from riocli.project.util import find_project_guid, ProjectNotFound -from riocli.v2client.error import HttpNotFoundError, HttpAlreadyExistsError +from riocli.project.util import ProjectNotFound, find_project_guid +from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError PROJECT_READY_TIMEOUT = 150 @@ -55,7 +56,7 @@ def delete(self, *args, **kwargs) -> None: guid = find_project_guid(client, self.metadata.name, Configuration().data['organization_id']) client.delete_project(guid) except (HttpNotFoundError, ProjectNotFound): - pass + raise ResourceNotFound def is_ready(self) -> bool: client = new_v2_client() diff --git a/riocli/secret/model.py b/riocli/secret/model.py index 30baf2ba..46dfe7e3 100644 --- a/riocli/secret/model.py +++ b/riocli/secret/model.py @@ -15,6 +15,7 @@ from munch import unmunchify from riocli.config import new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError @@ -40,4 +41,4 @@ def delete(self, *args, **kwargs) -> None: try: client.delete_secret(self.metadata.name) except HttpNotFoundError: - pass + raise ResourceNotFound diff --git a/riocli/static_route/model.py b/riocli/static_route/model.py index 79c23112..13f3a1ad 100644 --- a/riocli/static_route/model.py +++ b/riocli/static_route/model.py @@ -13,7 +13,8 @@ # limitations under the License. from munch import unmunchify -from riocli.config import new_v2_client, Configuration +from riocli.config import Configuration, new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError @@ -41,4 +42,4 @@ def delete(self, *args, **kwargs) -> None: try: client.delete_static_route(f'{self.metadata.name}-{short_id}') except HttpNotFoundError: - pass + raise ResourceNotFound diff --git a/riocli/usergroup/model.py b/riocli/usergroup/model.py index 53a4fb81..ae1035dc 100644 --- a/riocli/usergroup/model.py +++ b/riocli/usergroup/model.py @@ -15,10 +15,11 @@ from munch import unmunchify -from riocli.config import new_client, new_v2_client, Configuration +from riocli.config import Configuration, new_client, new_v2_client +from riocli.exceptions import ResourceNotFound from riocli.model import Model from riocli.organization.utils import get_organization_details -from riocli.usergroup.util import find_usergroup_guid, UserGroupNotFound +from riocli.usergroup.util import UserGroupNotFound, find_usergroup_guid USER_GUID = 'guid' USER_EMAIL = 'emailID' @@ -76,7 +77,7 @@ def delete(self, *args, **kwargs) -> None: guid = find_usergroup_guid(v1client, organization_id, self.metadata.name) v1client.delete_usergroup(organization_id, guid) except UserGroupNotFound: - pass + raise ResourceNotFound except Exception as e: raise e