Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blue/minimal http proxy #656

Merged
merged 17 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/http-proxy/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FROM nginx:stable-alpine
237 changes: 237 additions & 0 deletions examples/http-proxy/http_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""
a simple http proxy example
"""
import asyncio

import aiohttp
from aiohttp import web
from datetime import datetime, timedelta
import functools
import pathlib
import shlex
import sys


from yapapi import (
__version__ as yapapi_version,
)
from yapapi import Golem
from yapapi.services import Service, Cluster, ServiceState

from yapapi.payload import vm

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
build_parser,
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
TEXT_COLOR_GREEN,
TEXT_COLOR_YELLOW,
run_golem_example,
)

STARTING_TIMEOUT = timedelta(minutes=4)


# ######## local http server

request_count = 0


async def request_handler(cluster: Cluster, request: web.Request):
global request_count

print(f"{TEXT_COLOR_GREEN}local HTTP request: {dict(request.query)}{TEXT_COLOR_DEFAULT}")

instance: HttpService = cluster.instances[request_count % len(cluster.instances)]
request_count += 1
response = await instance.handle_request(request.path_qs)
return web.Response(text=response)


async def run_local_server(cluster: Cluster, port: int):
shadeofblue marked this conversation as resolved.
Show resolved Hide resolved
"""
run a local HTTP server, listening on `port`
and passing all requests through the `request_handler` function above
"""
handler = functools.partial(request_handler, cluster)
runner = web.ServerRunner(web.Server(handler))
await runner.setup()
site = web.TCPSite(runner, port=port)
await site.start()

return site


# ######## Golem Service


class HttpService(Service):
@staticmethod
async def get_payload():
return await vm.repo(
image_hash="16ad039c00f60a48c76d0644c96ccba63b13296d140477c736512127",
# we're adding an additional constraint to only select those nodes that
# are offering VPN-capable VM runtimes so that we can connect them to the VPN
capabilities=[vm.VM_CAPS_VPN],
shadeofblue marked this conversation as resolved.
Show resolved Hide resolved
)

async def start(self):
async for s in super().start():
yield s

s = self._ctx.new_script()
s.run("/docker-entrypoint.sh")
s.run("/bin/chmod", "a+x", "/")
msg = f"Hello from inside Golem!\n... running on {self.provider_name}"
s.run(
"/bin/sh",
"-c",
f"echo {shlex.quote(msg)} > /usr/share/nginx/html/index.html",
)
s.run("/usr/sbin/nginx"),
yield s

# we don't need to implement `run` since, after the service is started,
# all communication is performed through the VPN

async def handle_request(self, query_string: str):
"""
handle the request coming from the local HTTP server
by passing it to the instance through the VPN
"""
instance_ws = self.network_node.get_websocket_uri(80)
app_key = self.cluster._engine._api_config.app_key

print(f"{TEXT_COLOR_GREEN}sending a remote request to {self}{TEXT_COLOR_DEFAULT}")
ws_session = aiohttp.ClientSession()
async with ws_session.ws_connect(
instance_ws, headers={"Authorization": f"Bearer {app_key}"}
) as ws:
await ws.send_str(f"GET {query_string} HTTP/1.0\n\n")
headers = await ws.__anext__()
print(f"{TEXT_COLOR_GREEN}remote headers: {headers.data} {TEXT_COLOR_DEFAULT}")
content = await ws.__anext__()
data: bytes = content.data
print(f"{TEXT_COLOR_GREEN}remote content: {data} {TEXT_COLOR_DEFAULT}")

response_text = data.decode("utf-8")
print(f"{TEXT_COLOR_GREEN}local response: {response_text}{TEXT_COLOR_DEFAULT}")

await ws_session.close()
return response_text


# ######## Main application code which spawns the Golem service and the local HTTP server


async def main(
subnet_tag,
driver=None,
network=None,
num_instances=1,
port=8080,
):
async with Golem(
budget=1.0,
subnet_tag=subnet_tag,
driver=driver,
network=network,
) as golem:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{golem.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{golem.network}{TEXT_COLOR_DEFAULT}\n"
)

commissioning_time = datetime.now()

network = await golem.create_network("192.168.0.1/24")
cluster = await golem.run_service(HttpService, network=network, num_instances=num_instances)

def instances():
return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances]

def still_running():
return any([s for s in cluster.instances if s.is_available])

def still_starting():
return len(cluster.instances) < num_instances or any(
s.state == ServiceState.starting for s in cluster.instances
)

# wait until all remote http instances are started

while still_starting() and datetime.now() < commissioning_time + STARTING_TIMEOUT:
johny-b marked this conversation as resolved.
Show resolved Hide resolved
print(f"instances: {instances()}")
await asyncio.sleep(5)

if still_starting():
raise Exception(
f"Failed to start instances after {STARTING_TIMEOUT.total_seconds()} seconds"
)

# service instances started, start the local HTTP server

site = await run_local_server(cluster, port)

print(
f"{TEXT_COLOR_CYAN}Local HTTP server listening on:\nhttp://localhost:{port}{TEXT_COLOR_DEFAULT}"
)

# wait until Ctrl-C

while True:
print(instances())
try:
await asyncio.sleep(10)
except (KeyboardInterrupt, asyncio.CancelledError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we capture CancelledError here?
I'm not saying it's wrong, I just don't understand, and this is an example - so things should be understandable : )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's because the KeyboardInterrupt can happen in some other part of async code and thus, the KeyboardInterrupt won't happen in the asyncio.sleep(10) but in some other place and that routine will have been cancelled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shadeofblue
OK, that makes sense.

We have a comment "wait until Ctrl+C". I think it would be nice to add the same explanation to the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.... that's also present in all the other examples, I'm unsure if we want to pollute this one with this particular explanation

break

# perform the shutdown of the local http server and the service cluster

await site.stop()
print(f"{TEXT_COLOR_CYAN}HTTP server stopped{TEXT_COLOR_DEFAULT}")

cluster.stop()

cnt = 0
while cnt < 3 and still_running():
print(instances())
await asyncio.sleep(5)
cnt += 1


if __name__ == "__main__":
parser = build_parser("An extremely simple http proxy")
parser.add_argument(
"--num-instances",
type=int,
default=2,
help="The number of instances of the http service to spawn",
)
parser.add_argument(
"--port",
type=int,
default=8080,
help="The local port to listen on",
)
now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"http-proxy-{now}.log")
args = parser.parse_args()

run_golem_example(
main(
subnet_tag=args.subnet_tag,
driver=args.driver,
network=args.network,
num_instances=args.num_instances,
port=args.port,
),
log_file=args.log_file,
)
60 changes: 7 additions & 53 deletions examples/ssh/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
import asyncio
import pathlib
import sys
from urllib.parse import urlparse
from uuid import uuid4


from datetime import datetime, timedelta
from datetime import datetime

from yapapi import (
Golem,
NoPaymentAccountError,
__version__ as yapapi_version,
windows_event_loop_fix,
)
from yapapi.log import enable_default_logger, log_summary, log_event_repr # noqa
from yapapi.payload import vm
Expand All @@ -26,6 +24,7 @@
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
run_golem_example,
)


Expand All @@ -40,12 +39,8 @@ async def get_payload():
)

async def run(self):
ip = self.network_node.ip
net = self.network.network_id
connection_uri = self.network_node.get_websocket_uri(22)
app_key = self.cluster._engine._api_config.app_key
net_api_ws = (
urlparse(self.cluster._engine._api_config.net_url)._replace(scheme="ws").geturl()
)

self._ctx.run("/bin/bash", "-c", "syslogd")
self._ctx.run("/bin/bash", "-c", "ssh-keygen -A")
Expand All @@ -55,7 +50,7 @@ async def run(self):
print(
"Connect with:\n"
f"{TEXT_COLOR_CYAN}"
f"ssh -o ProxyCommand='websocat asyncstdio: {net_api_ws}/net/{net}/tcp/{ip}/22 --binary -H=Authorization:\"Bearer {app_key}\"' root@{ip}"
f"ssh -o ProxyCommand='websocat asyncstdio: {connection_uri} --binary -H=Authorization:\"Bearer {app_key}\"' root@{uuid4().hex}"
f"{TEXT_COLOR_DEFAULT}"
)

Expand Down Expand Up @@ -112,48 +107,7 @@ def still_running():
parser.set_defaults(log_file=f"ssh-yapapi-{now}.log")
args = parser.parse_args()

# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()

enable_default_logger(
run_golem_example(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network),
log_file=args.log_file,
debug_activity_api=True,
debug_market_api=True,
debug_payment_api=True,
debug_net_api=True,
)

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network)
)

try:
loop.run_until_complete(task)
except NoPaymentAccountError as e:
handbook_url = (
"https://handbook.golem.network/requestor-tutorials/"
"flash-tutorial-of-requestor-development"
)
print(
f"{TEXT_COLOR_RED}"
f"No payment account initialized for driver `{e.required_driver}` "
f"and network `{e.required_network}`.\n\n"
f"See {handbook_url} on how to initialize payment accounts for a requestor node."
f"{TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
print(
f"{TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a short while "
"or press Ctrl+C to exit immediately..."
f"{TEXT_COLOR_DEFAULT}"
)
task.cancel()
try:
loop.run_until_complete(task)
print(
f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}"
)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
Loading