Skip to content

Commit

Permalink
Updated initial VPN support (#613)
Browse files Browse the repository at this point in the history
* minimal vpn support
vpn ssh example

* black

* mypy

* move `Node` and `Network` out of `yapapi.rest` for the sake of better separation
between the REST API and the high-level API

* move `Node` and `Network` out of `yapapi.rest` for the sake of better separation
between the REST API and the high-level API

* + tests

* black...

* don't import networkfactory in Python < 3.8 ...

* ah, those too require AsyncMock...

* support VPNs in the default Service implementation

* fixes

* fixes

* add SSH example's dockerfile

* fix network creation (don't generate the id within the agent)

* fix tests

* add a missing test for test_cluster

* fixes

* rename `.net` to `.network`

(leaving `rest.net` as this pertains to the Net API)

* + `yapapi.network` docstrings
* prevent race conditions when adding new nodes to the network

* + docstrings for `Node.get_deploy_args`

* fix the type of Network._owner_ip -> IpAddress

* explanation for ThreadPoolExecutor usage

* add explanation for pulling `provider_id` directly

* add an issue for the TODO

* remove the "should" et al from the `network_addresses` docstring

* s/`Network.add_address`/`Network.add_owner_address`/

* s/key in dict.keys()/key in dict/

* make the mid-level API not aware of the high-level one at all

* fix, update the Dockerfile with a less confusing password

* update the ssh websocket URL with a value coming from the actual net API address

* remove unnecessary conversion

* black

* a somewhat cleaner shutdown

Co-authored-by: mfranciszkiewicz <[email protected]>
  • Loading branch information
shadeofblue and mfranciszkiewicz authored Sep 8, 2021
1 parent 39dc73e commit 476fce2
Show file tree
Hide file tree
Showing 17 changed files with 776 additions and 23 deletions.
9 changes: 9 additions & 0 deletions examples/ssh/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM alpine:latest

RUN apk add --no-cache --update bash openssh iproute2 tcpdump net-tools screen
RUN echo "UseDNS no" >> /etc/ssh/sshd_config && \
echo "PermitRootLogin yes" >> /etc/ssh/sshd_config && \
echo "PasswordAuthentication yes" >> /etc/ssh/sshd_config

# set the password to some known value so that we can test the successful SSH connection later
RUN echo -e "pass.123\npass.123" | passwd
158 changes: 158 additions & 0 deletions examples/ssh/ssh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#!/usr/bin/env python3
import asyncio
import pathlib
import sys
from urllib.parse import urlparse


from datetime import datetime, timedelta

from yapapi import (
Golem,
NoPaymentAccountError,
__version__ as yapapi_version,
windows_event_loop_fix,
)
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from yapapi.payload import vm
from yapapi.services import Service

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
build_parser,
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
)


class SshService(Service):
@staticmethod
async def get_payload():
return await vm.repo(
image_hash="ea233c6774b1621207a48e10b46e3e1f944d881911f499f5cbac546a",
min_mem_gib=0.5,
min_storage_gib=2.0,
)

async def run(self):
ip = self.network_node.ip
net = self.network.network_id
app_key = self.cluster._engine._api_config.app_key
net_api_ws = (
urlparse(self.cluster._engine._api_config.net_url)._replace(scheme="ws").geturl()
)

self._ctx.run("/bin/bash", "-c", "syslogd")
self._ctx.run("/bin/bash", "-c", "ssh-keygen -A")
self._ctx.run("/bin/bash", "-c", "/usr/sbin/sshd")
yield self._ctx.commit()

print(
"Connect with:\n"
f"{TEXT_COLOR_CYAN}"
f"ssh -o ProxyCommand='websocat asyncstdio: {net_api_ws}/net/{net}/tcp/{ip}/22 --binary -H=Authorization:\"Bearer {app_key}\"' root@{ip}"
f"{TEXT_COLOR_DEFAULT}"
)

# await indefinitely...
await asyncio.Future()


async def main(subnet_tag, driver=None, network=None):
# By passing `event_consumer=log_summary()` we enable summary logging.
# See the documentation of the `yapapi.log` module on how to set
# the level of detail and format of the logged information.
async with Golem(
budget=1.0,
subnet_tag=subnet_tag,
driver=driver,
network=network,
) as golem:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{golem.subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{golem.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{golem.network}{TEXT_COLOR_DEFAULT}\n"
)

network = await golem.create_network("192.168.0.1/24")
cluster = await golem.run_service(SshService, network=network, num_instances=2)

def instances():
return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances]

def still_running():
return any([s for s in cluster.instances if s.is_available])

while True:
print(instances())
try:
await asyncio.sleep(5)
except (KeyboardInterrupt, asyncio.CancelledError):
break

cluster.stop()

cnt = 0
while cnt < 3 and still_running():
print(instances())
await asyncio.sleep(5)
cnt += 1


if __name__ == "__main__":
parser = build_parser("Golem VPN SSH example")
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"ssh-yapapi-{now}.log")
args = parser.parse_args()

# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()

enable_default_logger(
log_file=args.log_file,
debug_activity_api=True,
debug_market_api=True,
debug_payment_api=True,
debug_net_api=True,
)

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network)
)

try:
loop.run_until_complete(task)
except NoPaymentAccountError as e:
handbook_url = (
"https://handbook.golem.network/requestor-tutorials/"
"flash-tutorial-of-requestor-development"
)
print(
f"{TEXT_COLOR_RED}"
f"No payment account initialized for driver `{e.required_driver}` "
f"and network `{e.required_network}`.\n\n"
f"See {handbook_url} on how to initialize payment accounts for a requestor node."
f"{TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
print(
f"{TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a short while "
"or press Ctrl+C to exit immediately..."
f"{TEXT_COLOR_DEFAULT}"
)
task.cancel()
try:
loop.run_until_complete(task)
print(
f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}"
)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ rich = { version = "^10.2", optional = true }
async_exit_stack = "^1.0.1"
jsonrpc-base = "^1.0.3"

ya-aioclient = "^0.6"
ya-aioclient = "^0.6.3"
toml = "^0.10.1"
srvresolver = "^0.3.5"
colorama = "^0.4.4"
Expand Down
29 changes: 29 additions & 0 deletions tests/factories/network.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
from concurrent import futures
import factory
import faker
from unittest import mock

from yapapi.network import Network


class NetworkFactory(factory.Factory):
class Meta:
model = Network

ip = factory.Faker("ipv4", network=True)
owner_id = factory.LazyFunction(lambda: "0x" + faker.Faker().binary(length=20).hex())

@classmethod
def _create(cls, model_class, *args, **kwargs):
if "net_api" not in kwargs:
net_api = mock.AsyncMock()
net_api.create_network = mock.AsyncMock(
return_value=faker.Faker().binary(length=16).hex()
)
kwargs["net_api"] = net_api

# we're using `futures.ThreadPoolExecutor` here
# to run an async awaitable in a synchronous manner
pool = futures.ThreadPoolExecutor()
return pool.submit(asyncio.run, model_class.create(*args, **kwargs)).result()
25 changes: 18 additions & 7 deletions tests/services/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,58 @@ def _get_cluster():
[
(
{"num_instances": 1},
[call({})],
[call({}, None)],
None,
),
(
{"num_instances": 3},
[call({}) for _ in range(3)],
[call({}, None) for _ in range(3)],
None,
),
(
{"instance_params": [{}]},
[call({})],
[call({}, None)],
None,
),
(
{"instance_params": [{"n": 1}, {"n": 2}]},
[call({"n": 1}), call({"n": 2})],
[call({"n": 1}, None), call({"n": 2}, None)],
None,
),
(
# num_instances takes precedence
{"num_instances": 2, "instance_params": [{} for _ in range(3)]},
[call({}), call({})],
[call({}, None), call({}, None)],
None,
),
(
# num_instances takes precedence
{"num_instances": 3, "instance_params": ({"n": i} for i in itertools.count(1))},
[call({"n": 1}), call({"n": 2}), call({"n": 3})],
[call({"n": 1}, None), call({"n": 2}, None), call({"n": 3}, None)],
None,
),
(
# num_instances takes precedence
{"num_instances": 4, "instance_params": [{} for _ in range(3)]},
[call({}) for _ in range(3)],
[call({}, None) for _ in range(3)],
"`instance_params` iterable depleted after 3 spawned instances.",
),
(
{"num_instances": 0},
[],
None,
),
(
{
"num_instances": 3,
"network_addresses": [
"10.0.0.1",
"10.0.0.2",
],
},
[call({}, "10.0.0.1"), call({}, "10.0.0.2"), call({}, None)],
None,
),
],
)
@pytest.mark.asyncio
Expand Down
17 changes: 17 additions & 0 deletions tests/test_ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,23 @@ def test_start(self, args):

assert c.commands() == [{"start": {"args": args}}]

@pytest.mark.parametrize(
"kwargs",
(
{"foo": 42},
{},
),
)
def test_deploy(self, kwargs):
ctx = self._get_work_context()
ctx.deploy(**kwargs)
steps = ctx.commit()

c = CommandContainer()
steps.register(c)

assert c.commands() == [{"deploy": kwargs}]

def test_terminate(self):
ctx = self._get_work_context(None)
ctx.terminate()
Expand Down
Loading

0 comments on commit 476fce2

Please sign in to comment.