Skip to content

Commit

Permalink
perf(delete): implements multi-threaded delete operation
Browse files Browse the repository at this point in the history
The delete operation has been sequential, till date. For large number of
deployments, it takes quite a lot of time to perform a cleanup. The
problem was performing the deletes in the right order and the
TopologicalSorter does not provide a way to do that while maintaining
concurrency.

This commit implements the required logic to get the right delete order
while processing them concurrently.

Wrike Ticket: https://www.wrike.com/open.htm?id=1467788260
  • Loading branch information
pallabpain committed Sep 4, 2024
1 parent b1a80ee commit a50fec0
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 95 deletions.
13 changes: 11 additions & 2 deletions riocli/apply/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from riocli.apply.explain import explain, list_examples
from riocli.apply.parse import Applier
from riocli.apply.template import template
from riocli.apply.util import print_resolved_objects
from riocli.apply.util import process_files_values_secrets
from riocli.constants import Colors
from riocli.utils import print_centered_text
Expand Down Expand Up @@ -116,12 +115,22 @@ def apply(
@click.option('-f', '--force', '--silent', 'silent', is_flag=True,
type=click.BOOL, default=False,
help="Skip confirmation")
@click.option('--workers', '-w',
help="number of parallel workers while running apply "
"command. defaults to 6.", type=int)
@click.option('--retry-count', '-rc', type=int, default=50,
help="Number of retries before a resource creation times out status, defaults to 50")
@click.option('--retry-interval', '-ri', type=int, default=6,
help="Interval between retries defaults to 6")
@click.argument('files', nargs=-1)
def delete(
values: str,
secrets: str,
files: Iterable[str],
retry_count: int = 50,
retry_interval: int = 6,
dryrun: bool = False,
workers: int = 6,
silent: bool = False
) -> None:
"""
Expand All @@ -145,4 +154,4 @@ def delete(
click.confirm("\nDo you want to proceed?", default=True, abort=True)

print_centered_text('Deleting Resources')
applier.delete(dryrun=dryrun)
applier.delete(dryrun=dryrun, workers=workers, retry_count=retry_count, retry_interval=retry_interval)
205 changes: 140 additions & 65 deletions riocli/apply/parse.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Rapyuta Robotics
# Copyright 2024 Rapyuta Robotics
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,17 +22,17 @@
import yaml
from munch import munchify

from riocli.apply.util import get_model, print_resolved_objects, message_with_prompt
from riocli.apply.util import get_model, message_with_prompt, print_resolved_objects
from riocli.constants import Colors, Symbols
from riocli.utils import dump_all_yaml, run_bash, print_centered_text
from riocli.exceptions import ResourceNotFound
from riocli.utils import dump_all_yaml, print_centered_text, run_bash
from riocli.utils.graph import Graphviz
from riocli.utils.spinner import with_spinner

DELETE_POLICY_LABEL = 'rapyuta.io/deletionPolicy'


class Applier(object):
DEFAULT_MAX_WORKERS = 6
DELETE_POLICY_LABEL = 'rapyuta.io/deletionPolicy'

def __init__(self, files: typing.List, values, secrets):
self.environment = None
Expand All @@ -57,8 +57,19 @@ def __init__(self, files: typing.List, values, secrets):

self._process_file_list(files)

def print_resolved_manifests(self):
"""Validates and prints the resolved manifests"""
manifests = []
for _, o in self.objects.items():
kls = get_model(o)
kls.validate(o)
manifests.append(o)

dump_all_yaml(manifests)

@with_spinner(text='Applying...', timer=True)
def apply(self, *args, **kwargs):
"""Apply the resources defined in the manifest files"""
spinner = kwargs.get('spinner')
kwargs['workers'] = int(kwargs.get('workers')
or self.DEFAULT_MAX_WORKERS)
Expand All @@ -76,81 +87,146 @@ def apply(self, *args, **kwargs):
spinner.red.fail(Symbols.ERROR)
raise SystemExit(1) from e

def apply_async(self, *args, **kwargs):
workers = int(kwargs.get('workers') or self.DEFAULT_MAX_WORKERS)
def apply_sync(self, *args, **kwargs):
self.graph.prepare()
while self.graph.is_active():
for obj in self.graph.get_ready():
if (obj in self.resolved_objects and
'manifest' in self.resolved_objects[obj]):
self._apply_manifest(obj, *args, **kwargs)
self.graph.done(obj)

def apply_async(self, *args, **kwargs):
task_queue = queue.Queue()
done_queue = queue.Queue()

def worker():
while True:
o = task_queue.get()
if o in self.resolved_objects and 'manifest' in self.resolved_objects[o]:
try:
self._apply_manifest(o, *args, **kwargs)
except Exception as ex:
done_queue.put(ex)
continue

task_queue.task_done()
done_queue.put(o)

# Start the workers that will accept tasks from the task_queue
# and process them. Upon completion, they will put the object
# in the done_queue.
worker_list = []
for worker_id in range(workers):
worker_list.append(threading.Thread(target=worker, daemon=True))
worker_list[worker_id].start()
self._start_apply_workers(
self._apply_manifest,
task_queue,
done_queue,
*args, **kwargs
)

self.graph.prepare()
while self.graph.is_active():
for obj in self.graph.get_ready():
task_queue.put(obj)

done_obj = done_queue.get()
if not isinstance(done_obj, Exception):
self.graph.done(done_obj)
else:
if isinstance(done_obj, Exception):
raise Exception(done_obj)

self.graph.done(done_obj)

# Block until the task_queue is empty.
task_queue.join()

def apply_sync(self, *args, **kwargs):
self.graph.prepare()
while self.graph.is_active():
for obj in self.graph.get_ready():
if (obj in self.resolved_objects and
'manifest' in self.resolved_objects[obj]):
self._apply_manifest(obj, *args, **kwargs)
self.graph.done(obj)

@with_spinner(text='Deleting...', timer=True)
def delete(self, *args, **kwargs):
"""Delete resources defined in manifests."""
spinner = kwargs.get('spinner')
delete_order = list(self.graph.static_order())
delete_order.reverse()
kwargs['workers'] = int(kwargs.get('workers')
or self.DEFAULT_MAX_WORKERS)

delete_func = self.delete_async
if kwargs['workers'] == 1:
delete_func = self.delete_sync

try:
for obj in delete_order:
if (obj in self.resolved_objects and
'manifest' in self.resolved_objects[obj]):
self._delete_manifest(obj, *args, **kwargs)
delete_func(*args, **kwargs)
spinner.text = 'Delete successful.'
spinner.green.ok(Symbols.SUCCESS)
except Exception as e:
spinner.text = 'Delete failed. Error: {}'.format(e)
spinner.red.fail(Symbols.ERROR)
raise SystemExit(1) from e

def print_resolved_manifests(self):
"""Validates and prints the resolved manifests"""
manifests = []
for _, o in self.objects.items():
kls = get_model(o)
kls.validate(o)
manifests.append(o)
def delete_sync(self, *args, **kwargs) -> None:
delete_order = list(self.graph.static_order())
delete_order.reverse()
for o in delete_order:
if o in self.resolved_objects and 'manifest' in self.resolved_objects[o]:
self._delete_manifest(o, *args, **kwargs)

dump_all_yaml(manifests)
def delete_async(self, *args, **kwargs) -> None:
task_queue = queue.Queue()
done_queue = queue.Queue()

self._start_apply_workers(
self._delete_manifest,
task_queue,
done_queue,
*args, **kwargs
)

for nodes in self._get_async_delete_order():
for node in nodes:
task_queue.put(node)

done_obj = done_queue.get()
if isinstance(done_obj, Exception):
raise Exception(done_obj)

task_queue.join()

def _start_apply_workers(
self,
func: typing.Callable,
tasks: queue.Queue,
done: queue.Queue,
*args, **kwargs,
) -> None:
"""A helper method to start workers for apply/delete operations
The `func` should have the following signature:
func(obj_key: str, *args, **kwargs) -> None
The `tasks` queue is used to pass objects to the workers for processing.
The `done` queue is used to pass the processed objects back to the main.
"""
def _worker():
while True:
o = tasks.get()
if o in self.resolved_objects and 'manifest' in self.resolved_objects[o]:
try:
func(o, *args, **kwargs)
except Exception as ex:
tasks.task_done()
done.put(ex)
continue

tasks.task_done()
done.put(o)

# Start the workers that will accept tasks from the task_queue
# and process them. Upon completion, they will put the object
# in the done_queue. The main thread will wait for the task_queue
# to be empty before exiting. The daemon threads will die with the
# main process.
n = int(kwargs.get('workers') or self.DEFAULT_MAX_WORKERS)
for i in range(n):
threading.Thread(
target=_worker,
daemon=True,
name=f'worker-{i}'
).start()

def _get_async_delete_order(self):
"""Returns the delete order for async delete operation
This method returns the delete order in a way that the
resources that are dependent on other resources are deleted
first while also ensuring that they can be processed concurrently.
"""
stack = []
self.graph.prepare()
while self.graph.is_active():
nodes = self.graph.get_ready()
stack.append(nodes)
self.graph.done(*nodes)

while stack:
yield stack.pop()

def parse_dependencies(self):
for _, data in self.files.items():
Expand Down Expand Up @@ -182,14 +258,14 @@ def _apply_manifest(self, obj_key: str, *args, **kwargs) -> None:
try:
if not dryrun:
ist.apply(*args, **kwargs)

message_with_prompt("{} Applied {}".format(
Symbols.SUCCESS, obj_key), fg=Colors.GREEN, spinner=spinner)
except Exception as ex:
message_with_prompt("{} Failed to apply {}. Error: {}".format(
Symbols.ERROR, obj_key, str(ex)), fg=Colors.RED, spinner=spinner)
raise ex

message_with_prompt("{} Applied {}".format(
Symbols.SUCCESS, obj_key), fg=Colors.GREEN, spinner=spinner)

def _delete_manifest(self, obj_key: str, *args, **kwargs) -> None:
"""Instantiate and delete the object manifest"""
spinner = kwargs.get('spinner')
Expand All @@ -206,21 +282,23 @@ def _delete_manifest(self, obj_key: str, *args, **kwargs) -> None:
# If a resource has a label with DELETE_POLICY_LABEL set
# to 'retain', it should not be deleted.
labels = obj.get('metadata', {}).get('labels', {})
can_delete = labels.get(DELETE_POLICY_LABEL) != 'retain'
can_delete = labels.get(self.DELETE_POLICY_LABEL) != 'retain'

try:
if not dryrun and can_delete:
ist.delete(*args, **kwargs)

message_with_prompt("{} Deleted {}.".format(
Symbols.SUCCESS, obj_key), fg=Colors.GREEN, spinner=spinner)
except ResourceNotFound:
message_with_prompt("{} {} not found".format(
Symbols.WARNING, obj_key), fg=Colors.YELLOW, spinner=spinner)
return
except Exception as ex:
message_with_prompt("{} Failed to delete {}. Error: {}".format(
Symbols.ERROR, obj_key, str(ex)), fg=Colors.RED, spinner=spinner)
raise ex

message_with_prompt("{} Deleted {}.".format(
Symbols.SUCCESS, obj_key), fg=Colors.GREEN, spinner=spinner)

# File Loading Operations

def _process_file_list(self, files):
for f in files:
data = self._load_file_content(f)
Expand Down Expand Up @@ -294,8 +372,6 @@ def _load_file_content(self, file_name, is_value=False, is_secret=False):

return loaded_data

# Graph Operations

def _add_graph_node(self, key):
self.graph.add(key)
self.diagram.node(key)
Expand All @@ -304,7 +380,6 @@ def _add_graph_edge(self, dependent_key, key):
self.graph.add(dependent_key, key)
self.diagram.edge(key, dependent_key)

# Dependency Resolution
def _parse_dependency(self, dependent_key, model):
# TODO(pallab): let resources determine their own dependencies and return them
# kls = get_model(model)
Expand Down
6 changes: 3 additions & 3 deletions riocli/deployment/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from munch import unmunchify

from riocli.config import new_v2_client
from riocli.exceptions import ResourceNotFound
from riocli.model import Model
from riocli.v2client.error import HttpAlreadyExistsError
from riocli.v2client.error import HttpNotFoundError
from riocli.v2client.error import HttpAlreadyExistsError, HttpNotFoundError


class Deployment(Model):
Expand All @@ -43,4 +43,4 @@ def delete(self, *args, **kwargs):
try:
client.delete_deployment(self.metadata.name)
except HttpNotFoundError:
pass
raise ResourceNotFound
13 changes: 4 additions & 9 deletions riocli/device/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,9 @@
from rapyuta_io.clients.device import Device as v1Device, DevicePythonVersion

from riocli.config import new_client
from riocli.device.util import (
create_hwil_device,
delete_hwil_device,
execute_onboard_command,
find_device_by_name,
make_device_labels_from_hwil_device,
DeviceNotFound,
)
from riocli.device.util import (DeviceNotFound, create_hwil_device, delete_hwil_device, execute_onboard_command,
find_device_by_name, make_device_labels_from_hwil_device)
from riocli.exceptions import ResourceNotFound
from riocli.model import Model


Expand Down Expand Up @@ -85,7 +80,7 @@ def delete(self, *args, **kwargs) -> None:
try:
device = find_device_by_name(client, self.metadata.name)
except DeviceNotFound:
return
raise ResourceNotFound

if self.spec.get('virtual', {}).get('enabled', False):
delete_hwil_device(device)
Expand Down
Loading

0 comments on commit a50fec0

Please sign in to comment.