Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

let's try from scratch #470

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions goth/runner/cli/yagna_payment_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,30 @@ def from_dict(source: dict) -> "PaymentStatus":
)


@dataclass(frozen=True)
class Network:
"""Contains information about `Network`."""

default_token: str
tokens: Dict[str, str]


@dataclass(frozen=True)
class Driver:
"""Contains driver details fields."""

default_network: str
networks: Dict[str, Network]

@staticmethod
def from_dict(source: dict):
"""Parse a dict into an instance of `Driver` class."""
return Driver(
default_network=source["default_network"],
networks={key: Network(**val) for key, val in source["networks"].items()},
)


class YagnaPaymentMixin:
"""A mixin class that adds support for `<yagna-cmd> payment` commands."""

Expand Down Expand Up @@ -97,3 +121,15 @@ def payment_status(
args = make_args("payment", "status", driver.name, data_dir=data_dir)
output = self.run_json_command(Dict, *args)
return PaymentStatus.from_dict(output)

def payment_drivers(
self: CommandRunner,
) -> Dict[str, Driver]:
"""Run `<cmd> payment drivers` without any extra args.

Parse the command's output as a `Dict[str, Driver]` and return it.
"""

args = make_args("payment", "drivers")
output = self.run_json_command(Dict, *args)
return {key: Driver.from_dict(val) for key, val in output.items()}
55 changes: 29 additions & 26 deletions goth/runner/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from goth.api_monitor.router_addon import RouterAddon
from goth.api_monitor.monitor_addon import MonitorAddon


# This function in `mitmproxy` will try to register signal handlers
# which will fail since the proxy does not run in the main thread.
# So we monkey-patch it to no-op.
Expand All @@ -30,7 +31,7 @@ class Proxy:
monitor: EventMonitor[APIEvent]
_proxy_thread: threading.Thread
_logger: logging.Logger
_loop: Optional[asyncio.AbstractEventLoop]
_mitmproxy_runner: Optional[dump.DumpMaster]
_node_names: Mapping[str, str]
_server_ready: threading.Event
"""Mapping of IP addresses to node names"""
Expand All @@ -47,18 +48,13 @@ def __init__(
self._node_names = node_names
self._ports = ports
self._logger = logging.getLogger(__name__)
self._loop = None
self._proxy_thread = threading.Thread(
target=self._run_mitmproxy, name="ProxyThread", daemon=True
)
self._server_ready = threading.Event()
self._mitmproxy_runner = None

def _stop_callback():
"""Stop `loop` so `proxy_thread` can terminate."""
if self._loop and self._loop.is_running():
self._loop.stop()

self.monitor = EventMonitor("rest", self._logger, on_stop=_stop_callback)
self.monitor = EventMonitor("rest", self._logger)
if assertions_module:
self.monitor.load_assertions(assertions_module)

Expand All @@ -69,23 +65,15 @@ def start(self):
self._server_ready.wait()

async def stop(self):
"""Start the proxy monitor and thread."""
if not self._loop:
raise RuntimeError("Event loop is not set")
await self.monitor.stop()
"""Stop the proxy thread and the monitor."""
if self._mitmproxy_runner:
self._mitmproxy_runner.shutdown()
self._proxy_thread.join()
self._logger.info("The mitmproxy thread has finished")
await self.monitor.stop()

def _run_mitmproxy(self):
"""Ran by `self.proxy_thread`."""

self._loop = asyncio.new_event_loop()
# Monkey patch the loop to set its `add_signal_handler` method to no-op.
# The original method would raise error since the loop will run in a non-main
# thread and hence cannot have signal handlers installed.
self._loop.add_signal_handler = lambda *args_: None
asyncio.set_event_loop(self._loop)

self._logger.info("Starting embedded mitmproxy...")
"""Run by `self.proxy_thread`."""

# This class is nested since it needs to refer to the `monitor` attribute
# of the enclosing instance of `Proxy`.
Expand All @@ -96,12 +84,27 @@ def __init__(inner_self, opts: options.Options) -> None:
inner_self.addons.add(MonitorAddon(self.monitor))

def start(inner_self):
self._server_ready.set()
self._logger.info("Embedded mitmproxy started")
super().start()
self._mitmproxy_runner = inner_self
self._logger.info("Embedded mitmproxy started")
self._server_ready.set()

try:
loop = asyncio.new_event_loop()
# Monkey patch the loop to set its `add_signal_handler` method to no-op.
# The original method would raise error since the loop will run in
# a non-main thread and hence cannot have signal handlers installed.
loop.add_signal_handler = lambda *args_: None
asyncio.set_event_loop(loop)

self._logger.info("Starting embedded mitmproxy...")

args = f"-q --mode reverse:http://127.0.0.1 --listen-port {MITM_PROXY_PORT}"
_main.run(MITMProxyRunner, cmdline.mitmdump, args.split())

except Exception:
self._logger.exception("Exception in mitmproxy thread")

args = f"-q --mode reverse:http://127.0.0.1 --listen-port {MITM_PROXY_PORT}"
_main.run(MITMProxyRunner, cmdline.mitmdump, args.split())
self._logger.info("Embedded mitmproxy exited")


Expand Down
67 changes: 67 additions & 0 deletions test/yagna/module/payments/test_payment_driver_list_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Tests payment driver list CLI command."""

import logging
from pathlib import Path
from typing import List

import pytest

from goth.address import (
PROXY_HOST,
YAGNA_REST_URL,
)
from goth.node import node_environment
from goth.runner import Runner
from goth.runner.container.payment import PaymentIdPool
from goth.runner.container.yagna import YagnaContainerConfig
from goth.runner.probe import RequestorProbe

logger = logging.getLogger(__name__)


def _topology(payment_id_pool: PaymentIdPool) -> List[YagnaContainerConfig]:
# Nodes are configured to communicate via proxy

requestor_env = node_environment(
rest_api_url_base=YAGNA_REST_URL.substitute(host=PROXY_HOST),
)

return [
YagnaContainerConfig(
name="requestor",
probe_type=RequestorProbe,
environment=requestor_env,
payment_id=payment_id_pool.get_id(),
),
]


@pytest.mark.asyncio
async def test_payment_driver_list(
assets_path: Path,
demand_constraints: str,
payment_id_pool: PaymentIdPool,
runner: Runner,
task_package_template: str,
):
"""Test just the requestor's CLI command, no need to setup provider."""

topology = _topology(payment_id_pool)

async with runner(topology):
requestor = runner.get_probes(probe_type=RequestorProbe)[0]

res = requestor.cli.payment_drivers()
assert res and res.items()
driver = next(iter(res.values()), None)

assert driver.default_network, "Default network should be set"

network = driver.networks.get(driver.default_network, None)
assert network, "Network should belong to the Driver"
assert network.default_token, "Default taken should be set"

token = network.tokens.get(network.default_token, None)
assert token, "Token should belong to the Network"

logger.info("Test succeeded!")