From a2de20a282a48eb7030fe3754b2227e9677a29db Mon Sep 17 00:00:00 2001 From: Pallab Pain Date: Thu, 12 Dec 2024 12:47:18 +0530 Subject: [PATCH] feat(device): execute command on multiple devices --- riocli/device/execute.py | 70 ++++++++++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/riocli/device/execute.py b/riocli/device/execute.py index 97ad1a19..be40c080 100644 --- a/riocli/device/execute.py +++ b/riocli/device/execute.py @@ -12,13 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import typing - +import functools import click from click_help_colors import HelpColorsCommand - +from concurrent.futures import ThreadPoolExecutor +from riocli.config import new_client from riocli.constants import Colors -from riocli.device.util import name_to_guid +from riocli.device.util import fetch_devices from riocli.utils.execute import run_on_device +from queue import Queue @click.command( @@ -37,17 +39,23 @@ default=False, help="Run the command asynchronously.", ) -@click.argument("device-name", type=str) +@click.option( + "--workers", + "-w", + help="Number of parallel workers for executing command. Defaults to 10.", + type=int, + default=10, +) +@click.argument("device-name-or-regex", type=str) @click.argument("command", nargs=-1) -@name_to_guid def execute_command( - device_name: str, - device_guid: str, + device_name_or_regex: str, user: str, timeout: int, shell: str, run_async: bool, command: typing.List[str], + workers: int = 10, ) -> None: """Execute commands on a device. @@ -65,17 +73,59 @@ def execute_command( $ rio device execute DEVICE_NAME "ls -l" """ + + client = new_client() + try: - response = run_on_device( - device_guid=device_guid, + devices = fetch_devices( + client, device_name_or_regex, include_all=False, online_devices=True + ) + except Exception as e: + click.secho(str(e), fg=Colors.RED) + raise SystemExit(1) from e + + if not devices: + click.secho("No device(s) found", fg=Colors.RED) + raise SystemExit(1) + + device_guids = [d.uuid for d in devices] + + try: + result = Queue() + func = functools.partial( + _run_on_device, user=user, shell=shell, command=command, background=run_async, timeout=timeout, + result=result, ) + with ThreadPoolExecutor(max_workers=workers) as executor: + executor.map(func, device_guids) - click.secho(response) + for device_guid, success, response in result.queue: + click.echo( + ">>> {}: {} -> {}".format( + device_guid, "Success" if success else "Failed", response + ) + ) except Exception as e: click.secho(str(e), fg=Colors.RED) raise SystemExit(1) from e + + +def _run_on_device(device_guid, user, shell, command, background, timeout, result): + """Wrapper on top of run_on_device to capture the output in a queue""" + try: + response = run_on_device( + device_guid=device_guid, + command=command, + user=user, + shell=shell, + background=background, + timeout=timeout, + ) + result.put((device_guid, True, response)) + except Exception as e: + result.put((device_guid, False, str(e)))