diff --git a/goth/runner/cli/yagna_payment_cmd.py b/goth/runner/cli/yagna_payment_cmd.py index 6d8f96038..422bd6811 100644 --- a/goth/runner/cli/yagna_payment_cmd.py +++ b/goth/runner/cli/yagna_payment_cmd.py @@ -45,6 +45,30 @@ def from_dict(source: dict) -> "PaymentStatus": ) +@dataclass(frozen=True) +class Network: + """Contains information about `Network`.""" + + default_token: str + tokens: Dict[str, str] + + +@dataclass(frozen=True) +class Driver: + """Contains driver details fields.""" + + default_network: str + networks: Dict[str, Network] + + @staticmethod + def from_dict(source: dict): + """Parse a dict into an instance of `Driver` class.""" + return Driver( + default_network=source["default_network"], + networks={key: Network(**val) for key, val in source["networks"].items()}, + ) + + class YagnaPaymentMixin: """A mixin class that adds support for ` payment` commands.""" @@ -97,3 +121,15 @@ def payment_status( args = make_args("payment", "status", driver.name, data_dir=data_dir) output = self.run_json_command(Dict, *args) return PaymentStatus.from_dict(output) + + def payment_drivers( + self: CommandRunner, + ) -> Dict[str, Driver]: + """Run ` payment drivers` without any extra args. + + Parse the command's output as a `Dict[str, Driver]` and return it. + """ + + args = make_args("payment", "drivers") + output = self.run_json_command(Dict, *args) + return {key: Driver.from_dict(val) for key, val in output.items()} diff --git a/goth/runner/proxy.py b/goth/runner/proxy.py index a38605bfc..e029a55c8 100644 --- a/goth/runner/proxy.py +++ b/goth/runner/proxy.py @@ -15,6 +15,7 @@ from goth.api_monitor.router_addon import RouterAddon from goth.api_monitor.monitor_addon import MonitorAddon + # This function in `mitmproxy` will try to register signal handlers # which will fail since the proxy does not run in the main thread. # So we monkey-patch it to no-op. @@ -30,7 +31,7 @@ class Proxy: monitor: EventMonitor[APIEvent] _proxy_thread: threading.Thread _logger: logging.Logger - _loop: Optional[asyncio.AbstractEventLoop] + _mitmproxy_runner: Optional[dump.DumpMaster] _node_names: Mapping[str, str] _server_ready: threading.Event """Mapping of IP addresses to node names""" @@ -47,18 +48,13 @@ def __init__( self._node_names = node_names self._ports = ports self._logger = logging.getLogger(__name__) - self._loop = None self._proxy_thread = threading.Thread( target=self._run_mitmproxy, name="ProxyThread", daemon=True ) self._server_ready = threading.Event() + self._mitmproxy_runner = None - def _stop_callback(): - """Stop `loop` so `proxy_thread` can terminate.""" - if self._loop and self._loop.is_running(): - self._loop.stop() - - self.monitor = EventMonitor("rest", self._logger, on_stop=_stop_callback) + self.monitor = EventMonitor("rest", self._logger) if assertions_module: self.monitor.load_assertions(assertions_module) @@ -69,23 +65,15 @@ def start(self): self._server_ready.wait() async def stop(self): - """Start the proxy monitor and thread.""" - if not self._loop: - raise RuntimeError("Event loop is not set") - await self.monitor.stop() + """Stop the proxy thread and the monitor.""" + if self._mitmproxy_runner: + self._mitmproxy_runner.shutdown() self._proxy_thread.join() + self._logger.info("The mitmproxy thread has finished") + await self.monitor.stop() def _run_mitmproxy(self): - """Ran by `self.proxy_thread`.""" - - self._loop = asyncio.new_event_loop() - # Monkey patch the loop to set its `add_signal_handler` method to no-op. - # The original method would raise error since the loop will run in a non-main - # thread and hence cannot have signal handlers installed. - self._loop.add_signal_handler = lambda *args_: None - asyncio.set_event_loop(self._loop) - - self._logger.info("Starting embedded mitmproxy...") + """Run by `self.proxy_thread`.""" # This class is nested since it needs to refer to the `monitor` attribute # of the enclosing instance of `Proxy`. @@ -96,12 +84,27 @@ def __init__(inner_self, opts: options.Options) -> None: inner_self.addons.add(MonitorAddon(self.monitor)) def start(inner_self): - self._server_ready.set() - self._logger.info("Embedded mitmproxy started") super().start() + self._mitmproxy_runner = inner_self + self._logger.info("Embedded mitmproxy started") + self._server_ready.set() + + try: + loop = asyncio.new_event_loop() + # Monkey patch the loop to set its `add_signal_handler` method to no-op. + # The original method would raise error since the loop will run in + # a non-main thread and hence cannot have signal handlers installed. + loop.add_signal_handler = lambda *args_: None + asyncio.set_event_loop(loop) + + self._logger.info("Starting embedded mitmproxy...") + + args = f"-q --mode reverse:http://127.0.0.1 --listen-port {MITM_PROXY_PORT}" + _main.run(MITMProxyRunner, cmdline.mitmdump, args.split()) + + except Exception: + self._logger.exception("Exception in mitmproxy thread") - args = f"-q --mode reverse:http://127.0.0.1 --listen-port {MITM_PROXY_PORT}" - _main.run(MITMProxyRunner, cmdline.mitmdump, args.split()) self._logger.info("Embedded mitmproxy exited") diff --git a/test/yagna/module/payments/test_payment_driver_list_cmd.py b/test/yagna/module/payments/test_payment_driver_list_cmd.py new file mode 100644 index 000000000..d33321395 --- /dev/null +++ b/test/yagna/module/payments/test_payment_driver_list_cmd.py @@ -0,0 +1,67 @@ +"""Tests payment driver list CLI command.""" + +import logging +from pathlib import Path +from typing import List + +import pytest + +from goth.address import ( + PROXY_HOST, + YAGNA_REST_URL, +) +from goth.node import node_environment +from goth.runner import Runner +from goth.runner.container.payment import PaymentIdPool +from goth.runner.container.yagna import YagnaContainerConfig +from goth.runner.probe import RequestorProbe + +logger = logging.getLogger(__name__) + + +def _topology(payment_id_pool: PaymentIdPool) -> List[YagnaContainerConfig]: + # Nodes are configured to communicate via proxy + + requestor_env = node_environment( + rest_api_url_base=YAGNA_REST_URL.substitute(host=PROXY_HOST), + ) + + return [ + YagnaContainerConfig( + name="requestor", + probe_type=RequestorProbe, + environment=requestor_env, + payment_id=payment_id_pool.get_id(), + ), + ] + + +@pytest.mark.asyncio +async def test_payment_driver_list( + assets_path: Path, + demand_constraints: str, + payment_id_pool: PaymentIdPool, + runner: Runner, + task_package_template: str, +): + """Test just the requestor's CLI command, no need to setup provider.""" + + topology = _topology(payment_id_pool) + + async with runner(topology): + requestor = runner.get_probes(probe_type=RequestorProbe)[0] + + res = requestor.cli.payment_drivers() + assert res and res.items() + driver = next(iter(res.values()), None) + + assert driver.default_network, "Default network should be set" + + network = driver.networks.get(driver.default_network, None) + assert network, "Network should belong to the Driver" + assert network.default_token, "Default taken should be set" + + token = network.tokens.get(network.default_token, None) + assert token, "Token should belong to the Network" + + logger.info("Test succeeded!")