From 9b83c8a84e48cf79d0e56345c3a812e544a5e774 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Fri, 24 Sep 2021 15:04:51 +0200 Subject: [PATCH] Blue/various changes (#674) * simplify the ex `erigon` example Update exescript commands in all examples to use the new interface Add a common "environment info" message for all examples Add a common boilerplate to all examples Add ability to pass parameters to `Script()` through `WorkContext.new_script()` Update default subnet to `devnet-beta` --- README.md | 2 +- examples/blender/blender.py | 90 ++++----------- .../custom_usage_counter.py | 5 +- examples/custom_runtime/custom_runtime.py | 72 ++++++++++++ examples/erigon/erigon.py | 106 ------------------ examples/hello-world/hello.py | 7 +- examples/hello-world/hello_service.py | 17 ++- examples/http-proxy/http_proxy.py | 35 +++--- examples/simple-service-poc/simple_service.py | 49 ++++---- examples/ssh/ssh.py | 16 +-- examples/utils.py | 18 ++- examples/yacat/yacat.py | 63 ++--------- tests/factories/props/__init__.py | 2 +- yapapi/ctx.py | 6 +- yapapi/engine.py | 2 +- yapapi/golem.py | 4 +- yapapi/script/command.py | 2 +- 17 files changed, 198 insertions(+), 298 deletions(-) create mode 100644 examples/custom_runtime/custom_runtime.py delete mode 100644 examples/erigon/erigon.py diff --git a/README.md b/README.md index 2a28321da..f11f76cc4 100644 --- a/README.md +++ b/README.md @@ -198,7 +198,7 @@ It's possible to set various elements of `yagna` configuration through environme - `YAGNA_PAYMENT_NETWORK`, Ethereum network name for `yagna` to use, e.g. `rinkeby` - `YAGNA_PAYMENT_DRIVER`, payment driver name for `yagna` to use, e.g. `zksync` - `YAGNA_PAYMENT_URL`, URL to `yagna` payment API, e.g. `http://localhost:7500/payment-api/v1` -- `YAGNA_SUBNET`, name of the `yagna` sub network to be used, e.g. `devnet-beta.2` +- `YAGNA_SUBNET`, name of the `yagna` sub network to be used, e.g. `devnet-beta` - `YAPAPI_USE_GFTP_CLOSE`, if set to a _truthy_ value (e.g. "1", "Y", "True", "on") then `yapapi` will ask `gftp` to close files when there's no need to publish them any longer. This may greatly reduce the number of files kept open while `yapapi` is running but requires `yagna` diff --git a/examples/blender/blender.py b/examples/blender/blender.py index d38dd044b..cf95f5bbc 100755 --- a/examples/blender/blender.py +++ b/examples/blender/blender.py @@ -1,18 +1,13 @@ #!/usr/bin/env python3 -import asyncio from datetime import datetime, timedelta import pathlib import sys from yapapi import ( Golem, - NoPaymentAccountError, Task, - __version__ as yapapi_version, WorkContext, - windows_event_loop_fix, ) -from yapapi.log import enable_default_logger from yapapi.payload import vm from yapapi.rest.activity import BatchTimeoutError @@ -27,6 +22,8 @@ TEXT_COLOR_YELLOW, TEXT_COLOR_MAGENTA, format_usage, + run_golem_example, + print_env_info, ) @@ -40,12 +37,19 @@ async def main(subnet_tag, payment_driver=None, payment_network=None, show_usage async def worker(ctx: WorkContext, tasks): script_dir = pathlib.Path(__file__).resolve().parent scene_path = str(script_dir / "cubes.blend") - ctx.send_file(scene_path, "/golem/resource/scene.blend") + + # Set timeout for the first script executed on the provider. Usually, 30 seconds + # should be more than enough for computing a single frame of the provided scene, + # however a provider may require more time for the first task if it needs to download + # the VM image first. Once downloaded, the VM image will be cached and other tasks that use + # that image will be computed faster. + script = ctx.new_script(timeout=timedelta(minutes=10)) + script.upload_file(scene_path, "/golem/resource/scene.blend") + async for task in tasks: frame = task.data crops = [{"outfilebasename": "out", "borders_x": [0.0, 1.0], "borders_y": [0.0, 1.0]}] - ctx.send_json( - "/golem/work/params.json", + script.upload_json( { "scene_file": "/golem/resource/scene.blend", "resolution": (400, 300), @@ -58,17 +62,14 @@ async def worker(ctx: WorkContext, tasks): "WORK_DIR": "/golem/work", "OUTPUT_DIR": "/golem/output", }, + "/golem/work/params.json", ) - ctx.run("/golem/entrypoints/run-blender.sh") + + script.run("/golem/entrypoints/run-blender.sh") output_file = f"output_{frame}.png" - ctx.download_file(f"/golem/output/out{frame:04d}.png", output_file) + script.download_file(f"/golem/output/out{frame:04d}.png", output_file) try: - # Set timeout for executing the script on the provider. Usually, 30 seconds - # should be more than enough for computing a single frame, however a provider - # may require more time for the first task if it needs to download a VM image - # first. Once downloaded, the VM image will be cached and other tasks that use - # that image will be computed faster. - yield ctx.commit(timeout=timedelta(minutes=10)) + yield script # TODO: Check if job results are valid # and reject by: task.reject_task(reason = 'invalid file') task.accept_result(result=output_file) @@ -80,6 +81,9 @@ async def worker(ctx: WorkContext, tasks): ) raise + # reinitialize the script which we send to the engine to compute subsequent frames + script = ctx.new_script(timeout=timedelta(minutes=1)) + if show_usage: raw_state = await ctx.get_raw_state() usage = format_usage(await ctx.get_usage()) @@ -118,13 +122,7 @@ async def worker(ctx: WorkContext, tasks): payment_driver=payment_driver, payment_network=payment_network, ) as golem: - - print( - f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n" - f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, " - f"payment driver: {TEXT_COLOR_YELLOW}{golem.payment_driver}{TEXT_COLOR_DEFAULT}, " - f"and network: {TEXT_COLOR_YELLOW}{golem.payment_network}{TEXT_COLOR_DEFAULT}\n" - ) + print_env_info(golem) num_tasks = 0 start_time = datetime.now() @@ -158,52 +156,12 @@ async def worker(ctx: WorkContext, tasks): parser.set_defaults(log_file=f"blender-yapapi-{now}.log") args = parser.parse_args() - # This is only required when running on Windows with Python prior to 3.8: - windows_event_loop_fix() - - enable_default_logger( - log_file=args.log_file, - debug_activity_api=True, - debug_market_api=True, - debug_payment_api=True, - ) - - loop = asyncio.get_event_loop() - task = loop.create_task( + run_golem_example( main( subnet_tag=args.subnet_tag, payment_driver=args.payment_driver, payment_network=args.payment_network, show_usage=args.show_usage, - ) + ), + log_file=args.log_file, ) - - try: - loop.run_until_complete(task) - except NoPaymentAccountError as e: - handbook_url = ( - "https://handbook.golem.network/requestor-tutorials/" - "flash-tutorial-of-requestor-development" - ) - print( - f"{TEXT_COLOR_RED}" - f"No payment account initialized for driver `{e.required_driver}` " - f"and network `{e.required_network}`.\n\n" - f"See {handbook_url} on how to initialize payment accounts for a requestor node." - f"{TEXT_COLOR_DEFAULT}" - ) - except KeyboardInterrupt: - print( - f"{TEXT_COLOR_YELLOW}" - "Shutting down gracefully, please wait a short while " - "or press Ctrl+C to exit immediately..." - f"{TEXT_COLOR_DEFAULT}" - ) - task.cancel() - try: - loop.run_until_complete(task) - print( - f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}" - ) - except (asyncio.CancelledError, KeyboardInterrupt): - pass diff --git a/examples/custom-usage-counter/custom_usage_counter.py b/examples/custom-usage-counter/custom_usage_counter.py index 34b5010aa..8bc9e85e3 100755 --- a/examples/custom-usage-counter/custom_usage_counter.py +++ b/examples/custom-usage-counter/custom_usage_counter.py @@ -44,8 +44,9 @@ async def run(self): start_time = datetime.now() print(f"service {self.id} running on '{self.provider_name}'...") while datetime.now() < start_time + timedelta(seconds=self._running_time_sec): - self._ctx.run("sleep", "1000") - yield self._ctx.commit() + script = self._ctx.new_script() + script.run("sleep", "1000") + yield script usage: ActivityUsage = await self._ctx.get_usage() cost = await self._ctx.get_cost() print(f"total cost so far: {cost}; activity usage: {usage.current_usage}") diff --git a/examples/custom_runtime/custom_runtime.py b/examples/custom_runtime/custom_runtime.py new file mode 100644 index 000000000..ce918feb0 --- /dev/null +++ b/examples/custom_runtime/custom_runtime.py @@ -0,0 +1,72 @@ +import asyncio + +from dataclasses import dataclass + +from yapapi.props.base import prop, constraint +from yapapi.props import inf + +from yapapi.payload import Payload +from yapapi import Golem +from yapapi.services import Service + + +RUNTIME_NAME = "my-runtime" +SOME_CUSTOM_PROPERTY = "golem.srv.app.eth.network" + +# The `CustomPayload` in this example is an arbitrary definition of some demand +# that the requestor might wish to be fulfilled by the providers on the Golem network +# +# This payload must correspond with a runtime running on providers, either a runtime +# written by some other party, or developed alongside its requestor counterpart using +# the Runtime SDK (https://github.com/golemfactory/ya-runtime-sdk/) +# +# It is up to the author of said runtime to define any additional properties that would +# describe the requestor's demand and `custom_property` is just an example. + + +@dataclass +class CustomPayload(Payload): + custom_property: str = prop(SOME_CUSTOM_PROPERTY) + + runtime: str = constraint(inf.INF_RUNTIME_NAME, default=RUNTIME_NAME) + min_mem_gib: float = constraint(inf.INF_MEM, operator=">=", default=16) + min_storage_gib: float = constraint(inf.INF_STORAGE, operator=">=", default=1024) + + +class CustomRuntimeService(Service): + @staticmethod + async def get_payload(): + return CustomPayload(custom_property="whatever") + + +async def main(subnet_tag, driver=None, network=None): + + async with Golem( + budget=10.0, + subnet_tag=subnet_tag, + payment_driver=driver, + payment_network=network, + ) as golem: + cluster = await golem.run_service( + CustomRuntimeService, + ) + + def instances(): + return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances] + + cnt = 0 + while cnt < 10: + print(f"instances: {instances()}") + await asyncio.sleep(3) + + cluster.stop() + + cnt = 0 + while cnt < 10 and any(s.is_available for s in cluster.instances): + print(f"instances: {instances()}") + await asyncio.sleep(1) + + print(f"instances: {instances()}") + + +asyncio.run(main(None)) diff --git a/examples/erigon/erigon.py b/examples/erigon/erigon.py deleted file mode 100644 index 70b5b93be..000000000 --- a/examples/erigon/erigon.py +++ /dev/null @@ -1,106 +0,0 @@ -import asyncio - -from dataclasses import dataclass - -from yapapi.props.base import prop, constraint -from yapapi.props import inf - -from yapapi.payload import Payload -from yapapi.executor import Golem -from yapapi.executor.services import Service -from yapapi.log import enable_default_logger - - -TURBOGETH_RUNTIME_NAME = "turbogeth-managed" -PROP_ERIGON_ETHEREUM_NETWORK = "golem.srv.app.eth.network" - - -@dataclass -class ErigonPayload(Payload): - payment_network: str = prop(PROP_ERIGON_ETHEREUM_NETWORK) - - runtime: str = constraint(inf.INF_RUNTIME_NAME, default=TURBOGETH_RUNTIME_NAME) - min_mem_gib: float = constraint(inf.INF_MEM, operator=">=", default=16) - min_storage_gib: float = constraint(inf.INF_STORAGE, operator=">=", default=1024) - - -class ErigonService(Service): - credentials = None - - def post_init(self): - self.credentials = {} - - def __repr__(self): - srv_repr = super().__repr__() - return f"{srv_repr}, credentials: {self.credentials}" - - @staticmethod - async def get_payload(): - return ErigonPayload(payment_network="rinkeby") - - async def start(self): - deploy_idx = self.ctx.deploy() - self.ctx.start() - future_results = yield self.ctx.commit() - results = await future_results - self.credentials = "RECEIVED" or results[deploy_idx] # (NORMALLY, WOULD BE PARSED) - - async def run(self): - - while True: - print(f"service {self.ctx.id} running on {self.ctx.provider_name} ... ") - signal = self._listen_nowait() - if signal and signal.message == "go": - self.ctx.run("go!") - yield self.ctx.commit() - else: - await asyncio.sleep(1) - yield - - async def shutdown(self): - self.ctx.download_file("some/service/state", "temp/path") - yield self.ctx.commit() - - -async def main(subnet_tag, payment_driver=None, payment_network=None): - - async with Golem( - budget=10.0, - subnet_tag=subnet_tag, - payment_driver=payment_driver, - payment_network=payment_network, - ) as golem: - cluster = await golem.run_service( - ErigonService, - num_instances=1, - ) - - def instances(): - return [{s.ctx.id, s.state.value} for s in cluster.instances] - - def still_running(): - return any([s for s in cluster.instances if s.is_available]) - - cnt = 0 - while cnt < 10: - print(f"instances: {instances()}") - await asyncio.sleep(3) - cnt += 1 - if cnt == 3: - if len(cluster.instances) > 1: - cluster.instances[0].send_message_nowait("go") - - for s in cluster.instances: - cluster.stop_instance(s) - - print(f"instances: {instances()}") - - cnt = 0 - while cnt < 10 and still_running(): - print(f"instances: {instances()}") - await asyncio.sleep(1) - - print(f"instances: {instances()}") - - -asyncio.run(main(None)) diff --git a/examples/hello-world/hello.py b/examples/hello-world/hello.py index 5dabd352e..8955f3a40 100755 --- a/examples/hello-world/hello.py +++ b/examples/hello-world/hello.py @@ -9,9 +9,10 @@ async def worker(context: WorkContext, tasks: AsyncIterable[Task]): async for task in tasks: - context.run("/bin/sh", "-c", "date") + script = context.new_script() + script.run("/bin/sh", "-c", "date") - future_results = yield context.commit() + future_results = yield script results = await future_results task.accept_result(result=results[-1]) @@ -23,7 +24,7 @@ async def main(): tasks = [Task(data=None)] - async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem: + async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: async for completed in golem.execute_tasks(worker, tasks, payload=package): print(completed.result.stdout) diff --git a/examples/hello-world/hello_service.py b/examples/hello-world/hello_service.py index 286820a27..63e3c26c4 100755 --- a/examples/hello-world/hello_service.py +++ b/examples/hello-world/hello_service.py @@ -19,30 +19,35 @@ async def get_payload(): ) async def start(self): + async for script in super().start(): + yield script + # every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH` - self._ctx.run( + script = self._ctx.new_script() + script.run( "/bin/sh", "-c", f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &", ) - yield self._ctx.commit() + yield script async def run(self): while True: await asyncio.sleep(REFRESH_INTERVAL_SEC) - self._ctx.run( + script = self._ctx.new_script() + script.run( "/bin/sh", "-c", f"cat {DATE_OUTPUT_PATH}", ) - future_results = yield self._ctx.commit() + future_results = yield script results = await future_results - print(results[0].stdout.strip()) + print(results[0].stdout.strip() if results[0].stdout else "") async def main(): - async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem: + async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: cluster = await golem.run_service(DateService, num_instances=1) start_time = datetime.now() diff --git a/examples/http-proxy/http_proxy.py b/examples/http-proxy/http_proxy.py index 5d508d0a2..8802b0e16 100644 --- a/examples/http-proxy/http_proxy.py +++ b/examples/http-proxy/http_proxy.py @@ -31,6 +31,7 @@ TEXT_COLOR_GREEN, TEXT_COLOR_YELLOW, run_golem_example, + print_env_info, ) STARTING_TIMEOUT = timedelta(minutes=4) @@ -80,20 +81,23 @@ async def get_payload(): ) async def start(self): - async for s in super().start(): - yield s - - s = self._ctx.new_script() - s.run("/docker-entrypoint.sh") - s.run("/bin/chmod", "a+x", "/") + # perform the initialization of the Service + # (which includes sending the network details within the `deploy` command) + async for script in super().start(): + yield script + + # start the remote HTTP server and give it some content to serve in the `index.html` + script = self._ctx.new_script() + script.run("/docker-entrypoint.sh") + script.run("/bin/chmod", "a+x", "/") msg = f"Hello from inside Golem!\n... running on {self.provider_name}" - s.run( + script.run( "/bin/sh", "-c", f"echo {shlex.quote(msg)} > /usr/share/nginx/html/index.html", ) - s.run("/usr/sbin/nginx"), - yield s + script.run("/usr/sbin/nginx"), + yield script # we don't need to implement `run` since, after the service is started, # all communication is performed through the VPN @@ -141,13 +145,7 @@ async def main( payment_driver=payment_driver, payment_network=payment_network, ) as golem: - - print( - f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n" - f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, " - f"payment driver: {TEXT_COLOR_YELLOW}{golem.payment_driver}{TEXT_COLOR_DEFAULT}, " - f"and network: {TEXT_COLOR_YELLOW}{golem.payment_network}{TEXT_COLOR_DEFAULT}\n" - ) + print_env_info(golem) commissioning_time = datetime.now() @@ -157,9 +155,6 @@ async def main( def instances(): return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances] - def still_running(): - return any([s for s in cluster.instances if s.is_available]) - def still_starting(): return len(cluster.instances) < num_instances or any( s.state == ServiceState.starting for s in cluster.instances @@ -201,7 +196,7 @@ def still_starting(): cluster.stop() cnt = 0 - while cnt < 3 and still_running(): + while cnt < 3 and any(s.is_available for s in cluster.instances): print(instances()) await asyncio.sleep(5) cnt += 1 diff --git a/examples/simple-service-poc/simple_service.py b/examples/simple-service-poc/simple_service.py index 5ea69016c..4f8014193 100755 --- a/examples/simple-service-poc/simple_service.py +++ b/examples/simple-service-poc/simple_service.py @@ -32,6 +32,7 @@ TEXT_COLOR_YELLOW, TEXT_COLOR_MAGENTA, format_usage, + print_env_info, ) STARTING_TIMEOUT = timedelta(minutes=4) @@ -58,20 +59,26 @@ async def get_payload(): ) async def start(self): - # handler responsible for starting the service + """handler responsible for starting the service.""" + + # perform the initialization of the Service async for script in super().start(): yield script - self._ctx.run(self.SIMPLE_SERVICE_CTL, "--start") - yield self._ctx.commit() + + # start the service + script = self._ctx.new_script() + script.run(self.SIMPLE_SERVICE_CTL, "--start") + yield script async def run(self): # handler responsible for providing the required interactions while the service is running while True: await asyncio.sleep(10) - self._ctx.run(self.SIMPLE_SERVICE, "--stats") # idx 0 - self._ctx.run(self.SIMPLE_SERVICE, "--plot", "dist") # idx 1 + script = self._ctx.new_script() + script.run(self.SIMPLE_SERVICE, "--stats") # idx 0 + script.run(self.SIMPLE_SERVICE, "--plot", "dist") # idx 1 - future_results = yield self._ctx.commit() + future_results = yield script results = await future_results stats = results[0].stdout.strip() plot = results[1].stdout.strip().strip('"') @@ -82,12 +89,9 @@ async def run(self): print( f"{TEXT_COLOR_CYAN}downloading plot: {plot} to {plot_filename}{TEXT_COLOR_DEFAULT}" ) - self._ctx.download_file( - plot, str(pathlib.Path(__file__).resolve().parent / plot_filename) - ) - - steps = self._ctx.commit() - yield steps + script = self._ctx.new_script() + script.download_file(plot, str(pathlib.Path(__file__).resolve().parent / plot_filename)) + yield script if self._show_usage: print( @@ -108,8 +112,10 @@ async def run(self): async def shutdown(self): # handler reponsible for executing operations on shutdown - self._ctx.run(self.SIMPLE_SERVICE_CTL, "--stop") - yield self._ctx.commit() + script = self._ctx.new_script() + script.run(self.SIMPLE_SERVICE_CTL, "--stop") + yield script + if self._show_usage: print( f"{TEXT_COLOR_MAGENTA}" @@ -132,13 +138,7 @@ async def main( payment_driver=payment_driver, payment_network=payment_network, ) as golem: - - print( - f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n" - f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, " - f"payment driver: {TEXT_COLOR_YELLOW}{golem.payment_driver}{TEXT_COLOR_DEFAULT}, " - f"and network: {TEXT_COLOR_YELLOW}{golem.payment_network}{TEXT_COLOR_DEFAULT}\n" - ) + print_env_info(golem) commissioning_time = datetime.now() @@ -164,12 +164,9 @@ async def main( def instances(): return [f"{s.name}: {s.state.value} on {s.provider_name}" for s in cluster.instances] - def still_running(): - return any([s for s in cluster.instances if s.is_available]) - def still_starting(): return len(cluster.instances) < num_instances or any( - [s for s in cluster.instances if s.state == ServiceState.starting] + s.state == ServiceState.starting for s in cluster.instances ) # wait until instances are started @@ -198,7 +195,7 @@ def still_starting(): # wait for instances to stop cnt = 0 - while cnt < 10 and still_running(): + while cnt < 10 and any(s.is_available for s in cluster.instances): print(f"instances: {instances()}") await asyncio.sleep(5) diff --git a/examples/ssh/ssh.py b/examples/ssh/ssh.py index 12b840149..ad823b212 100755 --- a/examples/ssh/ssh.py +++ b/examples/ssh/ssh.py @@ -35,6 +35,8 @@ async def get_payload(): image_hash="ea233c6774b1621207a48e10b46e3e1f944d881911f499f5cbac546a", min_mem_gib=0.5, min_storage_gib=2.0, + # we're adding an additional constraint to only select those nodes that + # are offering VPN-capable VM runtimes so that we can connect them to the VPN capabilities=[vm.VM_CAPS_VPN], ) @@ -42,10 +44,11 @@ async def run(self): connection_uri = self.network_node.get_websocket_uri(22) app_key = self.cluster._engine._api_config.app_key - self._ctx.run("/bin/bash", "-c", "syslogd") - self._ctx.run("/bin/bash", "-c", "ssh-keygen -A") - self._ctx.run("/bin/bash", "-c", "/usr/sbin/sshd") - yield self._ctx.commit() + script = self._ctx.new_script() + script.run("/bin/bash", "-c", "syslogd") + script.run("/bin/bash", "-c", "ssh-keygen -A") + script.run("/bin/bash", "-c", "/usr/sbin/sshd") + yield script print( "Connect with:\n" @@ -82,9 +85,6 @@ async def main(subnet_tag, payment_driver=None, payment_network=None): def instances(): return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances] - def still_running(): - return any([s for s in cluster.instances if s.is_available]) - while True: print(instances()) try: @@ -95,7 +95,7 @@ def still_running(): cluster.stop() cnt = 0 - while cnt < 3 and still_running(): + while cnt < 3 and any(s.is_available for s in cluster.instances): print(instances()) await asyncio.sleep(5) cnt += 1 diff --git a/examples/utils.py b/examples/utils.py index e85424e61..e3f4bfa6b 100644 --- a/examples/utils.py +++ b/examples/utils.py @@ -7,7 +7,12 @@ import colorama # type: ignore -from yapapi import windows_event_loop_fix, NoPaymentAccountError +from yapapi import ( + Golem, + windows_event_loop_fix, + NoPaymentAccountError, + __version__ as yapapi_version, +) from yapapi.log import enable_default_logger @@ -35,7 +40,7 @@ def build_parser(description: str) -> argparse.ArgumentParser: parser.add_argument( "--payment-network", "--network", help="Payment network name, for example `rinkeby`" ) - parser.add_argument("--subnet-tag", help="Subnet name, for example `devnet-beta.2`") + parser.add_argument("--subnet-tag", help="Subnet name, for example `devnet-beta`") parser.add_argument( "--log-file", default=str(default_log_path), @@ -51,6 +56,15 @@ def format_usage(usage): } +def print_env_info(golem: Golem): + print( + f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n" + f"Using subnet: {TEXT_COLOR_YELLOW}{golem.subnet_tag}{TEXT_COLOR_DEFAULT}, " + f"payment driver: {TEXT_COLOR_YELLOW}{golem.payment_driver}{TEXT_COLOR_DEFAULT}, " + f"and network: {TEXT_COLOR_YELLOW}{golem.payment_network}{TEXT_COLOR_DEFAULT}\n" + ) + + def run_golem_example(example_main, log_file=None): # This is only required when running on Windows with Python prior to 3.8: windows_event_loop_fix() diff --git a/examples/yacat/yacat.py b/examples/yacat/yacat.py index 764eae9c0..dfb24d3f6 100755 --- a/examples/yacat/yacat.py +++ b/examples/yacat/yacat.py @@ -8,9 +8,8 @@ from tempfile import gettempdir from typing import AsyncIterable, List, Optional -from yapapi import Golem, NoPaymentAccountError, Task, WorkContext, windows_event_loop_fix +from yapapi import Golem, Task, WorkContext from yapapi.events import CommandExecuted -from yapapi.log import enable_default_logger from yapapi.payload import vm from yapapi.rest.activity import CommandExecutionError @@ -24,6 +23,8 @@ TEXT_COLOR_GREEN, TEXT_COLOR_RED, TEXT_COLOR_YELLOW, + print_env_info, + run_golem_example, ) HASHCAT_ATTACK_MODE = 3 # stands for mask attack, hashcat -a option @@ -77,10 +78,11 @@ async def compute_keyspace(context: WorkContext, tasks: AsyncIterable[Task]): """ async for task in tasks: cmd = f"hashcat --keyspace " f"-a {HASHCAT_ATTACK_MODE} -m {args.hash_type} {args.mask}" - context.run("/bin/bash", "-c", cmd) + s = context.new_script(timeout=KEYSPACE_TIMEOUT) + s.run("/bin/bash", "-c", cmd) try: - future_result = yield context.commit(timeout=KEYSPACE_TIMEOUT) + future_result = yield s # each item is the result of a single command on the provider (including setup commands) result: List[CommandExecuted] = await future_result @@ -106,12 +108,13 @@ async def perform_mask_attack(ctx: WorkContext, tasks: AsyncIterable[Task]): output_name = f"yacat_{skip}.potfile" worker_output_path = f"/golem/output/{output_name}" - ctx.run(f"/bin/sh", "-c", _make_attack_command(skip, limit, worker_output_path)) + script = ctx.new_script(timeout=MASK_ATTACK_TIMEOUT) + script.run(f"/bin/sh", "-c", _make_attack_command(skip, limit, worker_output_path)) try: output_file = Path(gettempdir()) / output_name - ctx.download_file(worker_output_path, str(output_file)) + script.download_file(worker_output_path, str(output_file)) - yield ctx.commit(timeout=MASK_ATTACK_TIMEOUT) + yield script with output_file.open() as f: result = f.readline() @@ -154,12 +157,7 @@ async def main(args): payment_driver=args.payment_driver, payment_network=args.payment_network, ) as golem: - - print( - f"Using subnet: {TEXT_COLOR_YELLOW}{args.subnet_tag}{TEXT_COLOR_DEFAULT}, " - f"payment driver: {TEXT_COLOR_YELLOW}{golem.payment_driver}{TEXT_COLOR_DEFAULT}, " - f"and network: {TEXT_COLOR_YELLOW}{golem.payment_network}{TEXT_COLOR_DEFAULT}\n" - ) + print_env_info(golem) start_time = datetime.now() @@ -213,41 +211,4 @@ async def main(args): if __name__ == "__main__": args = arg_parser.parse_args() - - # This is only required when running on Windows with Python prior to 3.8: - windows_event_loop_fix() - - enable_default_logger(log_file=args.log_file) - - loop = asyncio.get_event_loop() - task = loop.create_task(main(args)) - - try: - loop.run_until_complete(task) - except NoPaymentAccountError as e: - handbook_url = ( - "https://handbook.golem.network/requestor-tutorials/" - "flash-tutorial-of-requestor-development" - ) - print( - f"{TEXT_COLOR_RED}" - f"No payment account initialized for driver `{e.required_driver}` " - f"and network `{e.required_network}`.\n\n" - f"See {handbook_url} on how to initialize payment accounts for a requestor node." - f"{TEXT_COLOR_DEFAULT}" - ) - except KeyboardInterrupt: - print( - f"{TEXT_COLOR_YELLOW}" - "Shutting down gracefully, please wait a short while " - "or press Ctrl+C to exit immediately..." - f"{TEXT_COLOR_DEFAULT}" - ) - task.cancel() - try: - loop.run_until_complete(task) - print( - f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}" - ) - except KeyboardInterrupt: - pass + run_golem_example(main(args), log_file=args.log_file) diff --git a/tests/factories/props/__init__.py b/tests/factories/props/__init__.py index 0b7c3362b..f0e329997 100644 --- a/tests/factories/props/__init__.py +++ b/tests/factories/props/__init__.py @@ -8,4 +8,4 @@ class Meta: model = NodeInfo name = factory.Faker("pystr") - subnet_tag = "devnet-beta.1" + subnet_tag = "devnet-beta" diff --git a/yapapi/ctx.py b/yapapi/ctx.py index d998be71e..a270fad66 100644 --- a/yapapi/ctx.py +++ b/yapapi/ctx.py @@ -129,13 +129,15 @@ def _payment_model(self) -> ComLinear: return self.__payment_model - def new_script(self): + def new_script( + self, timeout: Optional[timedelta] = None, wait_for_results: bool = True + ) -> Script: """Create an instance of :class:`~yapapi.script.Script` attached to this :class:`WorkContext` instance. This is equivalent to calling `Script(work_context)`. This method is intended to provide a direct link between the two object instances. """ - return Script(self) + return Script(self, timeout=timeout, wait_for_results=wait_for_results) @deprecated(version="0.7.0", reason="please use a Script object via WorkContext.new_script") def deploy(self, **kwargs) -> Awaitable[CommandExecuted]: diff --git a/yapapi/engine.py b/yapapi/engine.py index ec95b1245..416e35658 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -57,7 +57,7 @@ DEFAULT_DRIVER: str = os.getenv("YAGNA_PAYMENT_DRIVER", "zksync").lower() DEFAULT_NETWORK: str = os.getenv("YAGNA_PAYMENT_NETWORK", "rinkeby").lower() -DEFAULT_SUBNET: Optional[str] = os.getenv("YAGNA_SUBNET", "devnet-beta.2") +DEFAULT_SUBNET: Optional[str] = os.getenv("YAGNA_SUBNET", "devnet-beta") logger = logging.getLogger("yapapi.executor") diff --git a/yapapi/golem.py b/yapapi/golem.py index 401baa8d8..e303bd823 100644 --- a/yapapi/golem.py +++ b/yapapi/golem.py @@ -265,7 +265,7 @@ async def worker(context: WorkContext, tasks: AsyncIterable[Task]): image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376", ) - async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem: + async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package): print(completed.result.stdout) @@ -358,7 +358,7 @@ async def run(self): async def main(): - async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem: + async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem: cluster = await golem.run_service(DateService, num_instances=1) start_time = datetime.now() diff --git a/yapapi/script/command.py b/yapapi/script/command.py index 2c1d342b2..b249e1e38 100644 --- a/yapapi/script/command.py +++ b/yapapi/script/command.py @@ -113,7 +113,7 @@ def __init__(self, data: bytes, dst_path: str): self._data: Optional[bytes] = data async def _do_upload(self, storage: StorageProvider) -> Source: - assert self._data is not None, "buffer unintialized" + assert self._data is not None, f"{self}: buffer unintialized" src = await storage.upload_bytes(self._data) self._data = None return src