diff --git a/dapp_runner/__main__.py b/dapp_runner/__main__.py index 0e8cf7d..bbcdda2 100644 --- a/dapp_runner/__main__.py +++ b/dapp_runner/__main__.py @@ -127,6 +127,11 @@ def _get_run_dir(run_id: str) -> Path: is_flag=True, help="Don't validate and report issues with the manifest, its certificate or signature.", ) +@click.option( + "--resume", + is_flag=True, + help="Treat the app descriptor a suspended application's GAOM state to resume from.", +) def start( descriptors: Tuple[Path], config: Path, diff --git a/dapp_runner/api/__main__.py b/dapp_runner/api/__main__.py index 40755ac..98b23d3 100644 --- a/dapp_runner/api/__main__.py +++ b/dapp_runner/api/__main__.py @@ -1,6 +1,7 @@ """Dapp Runner API.""" +import yaml from fastapi import FastAPI -from fastapi.responses import JSONResponse, RedirectResponse +from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse from dapp_runner.runner import Runner @@ -18,3 +19,19 @@ async def get_gaom(): """Retrieve the application's GAOM tree.""" dapp_dict = Runner.get_instance().dapp.dict() return JSONResponse(content=dapp_dict) + + +@app.post("/suspend") +async def suspend(): + """ + Suspend the app. + + Stop the dapp-runner without killing the services. + Send back the YAML-encoded GAOM tree state from just before suspension. + """ + runner = Runner.get_instance() + dapp_dict = runner.dapp.dict() + + runner.request_suspend() + + return PlainTextResponse(content=yaml.dump(dapp_dict)) diff --git a/dapp_runner/descriptor/dapp.py b/dapp_runner/descriptor/dapp.py index 74e8fe2..e5448cc 100644 --- a/dapp_runner/descriptor/dapp.py +++ b/dapp_runner/descriptor/dapp.py @@ -224,6 +224,9 @@ class NetworkDescriptor(GaomBase): owner_ip: Optional[str] = None mask: Optional[str] = None gateway: Optional[str] = None + network_id: Optional[str] = Field(runtime=True) + owner_id: Optional[str] = Field(runtime=True) + state: Optional[str] = Field(runtime=True, default=None) class Config: # noqa: D106 extra = "forbid" @@ -231,11 +234,16 @@ class Config: # noqa: D106 @classmethod def from_network(cls, network: Network): """Get a NetworkDescriptor based on yapapi's `Network` object.""" + network_serialized = network.serialize() + return cls( ip=network.network_address, owner_ip=network.owner_ip, mask=network.netmask, gateway=network.gateway, + network_id=network_serialized["_network_id"], + owner_id=network_serialized["owner_id"], + state=network_serialized["state"], ) diff --git a/dapp_runner/runner/__init__.py b/dapp_runner/runner/__init__.py index c55ba86..faa884c 100644 --- a/dapp_runner/runner/__init__.py +++ b/dapp_runner/runner/__init__.py @@ -48,8 +48,10 @@ async def _run_app( commands_f: Optional[TextIO], silent=False, skip_manifest_validation=False, + resume=False, ): """Run the dapp using the Runner.""" + config = Config(**config_dict) _update_api_config(config, api_config_dict) @@ -60,9 +62,9 @@ async def _run_app( r = Runner(config=config, dapp=dapp) _print_env_info(r.golem) if dapp.meta.name: - print(f"Starting app: {green(dapp.meta.name)}\n") + print(f"{'Starting' if not resume else 'Resuming'} app: {green(dapp.meta.name)}\n") - await r.start() + await r.start(resume=resume) streamer = RunnerStreamer() streamer.register_stream( r.state_queue, state_f, lambda msg: json.dumps(msg, default=json_encoder) @@ -88,7 +90,8 @@ async def _run_app( max_running_time = timedelta(seconds=config.limits.max_running_time) logger.info( - "Starting app: %s, startup timeout: %s, maximum running time: %s", + f"{'Starting' if not resume else 'Resuming'} app: %s, " + "startup timeout: %s, maximum running time: %s", dapp.meta.name, startup_timeout, max_running_time, @@ -112,6 +115,7 @@ async def _run_app( while ( r.dapp_started + and not r.suspend_requested and not r.api_shutdown and not _running_time_elapsed(time_started, max_running_time) ): @@ -120,9 +124,16 @@ async def _run_app( if _running_time_elapsed(time_started, max_running_time): logger.info("Maximum running time: %s elapsed.", max_running_time) - logger.info("Stopping the application...") - await r.stop() + if not r.suspend_requested: + logger.info("Stopping the application...") + await r.stop() + else: + logger.info("Suspending the application...") + await r.suspend() + + logger.info("Stopping streamer...") await streamer.stop() + logger.info("Streamer stopped...") def start_runner( @@ -140,6 +151,7 @@ def start_runner( stderr: Optional[Path] = None, silent=False, skip_manifest_validation=False, + resume=False, ): """Launch the runner in an asyncio loop and wait for its shutdown.""" @@ -165,14 +177,15 @@ def start_runner( loop = asyncio.get_event_loop() task = loop.create_task( _run_app( - config_dict, - api_config_dict, - dapp_dict, - data_f, - state_f, - commands_f, - silent, - skip_manifest_validation, + config_dict=config_dict, + api_config_dict=api_config_dict, + dapp_dict=dapp_dict, + data_f=data_f, + state_f=state_f, + commands_f=commands_f, + silent=silent, + skip_manifest_validation=skip_manifest_validation, + resume=resume, ) ) diff --git a/dapp_runner/runner/runner.py b/dapp_runner/runner/runner.py index 31760d5..4932a52 100644 --- a/dapp_runner/runner/runner.py +++ b/dapp_runner/runner/runner.py @@ -6,6 +6,7 @@ import uvicorn +from ya_net.exceptions import ApiException from yapapi import Golem from yapapi.config import ApiConfig from yapapi.contrib.service.http_proxy import LocalHttpProxy @@ -13,7 +14,7 @@ from yapapi.events import CommandExecuted from yapapi.network import Network from yapapi.payload import Payload -from yapapi.services import Cluster, Service, ServiceState +from yapapi.services import Cluster, Service, ServiceSerialization, ServiceState from dapp_runner._util import FreePortProvider, cancel_and_await_tasks, utcnow, utcnow_iso_str from dapp_runner.descriptor import Config, DappDescriptor @@ -27,6 +28,7 @@ ServiceDescriptor, ) +from .error import RunnerError from .payload import get_payload from .service import DappService, get_service @@ -60,6 +62,7 @@ class Runner: _networks: Dict[str, Network] _tasks: List[asyncio.Task] _startup_finished: bool + suspend_requested: bool # TODO: Introduce ApplicationState instead of reusing ServiceState _desired_app_state: ServiceState @@ -92,6 +95,7 @@ def __init__(self, config: Config, dapp: DappDescriptor): self.state_queue = asyncio.Queue() self.command_queue = asyncio.Queue() self._startup_finished = False + self.suspend_requested = False self._desired_app_state = ServiceState.pending self._report_status_change() @@ -122,9 +126,26 @@ async def _start_api(self): self.api_server = uvicorn.Server(config) self._tasks.append(asyncio.create_task(self._serve_api())) - async def _create_networks(self): + async def _create_networks(self, resume=False): for name, desc in self.dapp.networks.items(): - network = await self.golem.create_network(**desc.dict()) + if resume and desc.network_id: + desc_dict = desc.dict() + if desc_dict.get("mask"): + desc_dict["ip"] = f"{desc_dict.get('ip')}/{desc_dict.get('mask')}" + desc_dict["_network_id"] = desc_dict.pop("network_id") + desc_dict["nodes"] = {} + try: + network = await self.golem.resume_network(desc_dict) # type: ignore [arg-type] + except ApiException: + raise RunnerError( + f"Could not resume network {desc_dict['_network_id']}. " + "Probably it has already been destroyed.", + ) + else: + network = await self.golem.create_network( + **{k: getattr(desc, k) for k in {"ip", "owner_ip", "mask", "gateway"}} + ) + self._networks[name] = network self.dapp.networks[name] = NetworkDescriptor.from_network(network) @@ -167,7 +188,9 @@ async def _start_local_tcp_proxy(self, name: str, service: Service, port_mapping port_mapping.local_port = port port_mapping.address = proxy_address - async def _start_service(self, service_name: str, service_descriptor: ServiceDescriptor): + async def _start_service( + self, service_name: str, service_descriptor: ServiceDescriptor, resume=False + ): # if this service depends on another, wait until the dependency is up if service_descriptor.depends_on: for depends_name in service_descriptor.depends_on: @@ -183,7 +206,27 @@ async def _start_service(self, service_name: str, service_descriptor: ServiceDes cluster_class, run_params = await get_service( service_name, service_descriptor, self._payloads, self._networks ) - cluster = await self.start_cluster(service_name, cluster_class, run_params) + if not resume: + cluster = await self.start_cluster(service_name, cluster_class, run_params) + else: + run_params["instances"] = [ + ServiceSerialization( + params=params, + activity_id=service_descriptor.activity.id + if service_descriptor.activity + else None, + agreement_id=service_descriptor.agreement.id + if service_descriptor.agreement + else None, + state=service_descriptor.state or ServiceState.pending.value, + network_node=service_descriptor.network_node.dict() + if service_descriptor.network_node + else None, + ) + for params in run_params.pop("instance_params") + ] + run_params.pop("network_addresses", None) + cluster = await self.resume_cluster(service_name, cluster_class, run_params) # start the tasks for the local proxies so that # it doesn't delay the initialization process @@ -212,13 +255,13 @@ async def _start_service(self, service_name: str, service_descriptor: ServiceDes ] ) - async def _start_services(self): + async def _start_services(self, resume=False): for service_name, service_descriptor in self.dapp.nodes_prioritized(): - await self._start_service(service_name, service_descriptor) + await self._start_service(service_name, service_descriptor, resume=resume) self._startup_finished = True - async def start(self): + async def start(self, resume=False): """Start the Golem engine and the dapp.""" if self.config.api.enabled: @@ -232,13 +275,13 @@ async def start(self): await self.golem.start() - await self._create_networks() + await self._create_networks(resume=resume) await self._load_payloads() # we start services in a separate task, # so that the service state can be tracked # while the dapp is starting - self._tasks.append(asyncio.create_task(self._start_services())) + self._tasks.append(asyncio.create_task(self._start_services(resume=resume))) # launch the incoming command processor self._tasks.append(asyncio.create_task(self._listen_incoming_command_queue())) @@ -249,6 +292,12 @@ async def start_cluster(self, cluster_name, cluster_class, run_params): self.clusters[cluster_name] = cluster return cluster + async def resume_cluster(self, cluster_name, cluster_class, run_params): + """Resume control over an existing service cluster.""" + cluster = await self.golem.resume_service(cluster_class, **run_params) + self.clusters[cluster_name] = cluster + return cluster + @property def dapp_state(self) -> Dict[str, Dict[int, ServiceState]]: """Return the state of the dapp. @@ -384,6 +433,8 @@ def _get_app_state_from_nodes( return ServiceState.terminated else: return ServiceState.stopping + elif self._desired_app_state == ServiceState.suspended: + return ServiceState.suspended # In other cases return pending return ServiceState.pending @@ -449,6 +500,12 @@ def _is_cluster_state(self, cluster_id: str, state: ServiceState) -> bool: """Return True if the state of all instances in the cluster is `state`.""" return all(s.state == state for s in self.clusters[cluster_id].instances) + async def _stop_proxies(self): + """Stop the HTTP and TCP proxies.""" + proxy_tasks = [p.stop() for p in self._http_proxies.values()] + proxy_tasks.extend([p.stop() for p in self._tcp_proxies.values()]) + await asyncio.gather(*proxy_tasks) + async def stop(self): """Stop the dapp and the Golem engine.""" service_tasks: List[asyncio.Task] = [] @@ -456,9 +513,7 @@ async def stop(self): # explicitly mark that we want dapp in terminated state self._desired_app_state = ServiceState.terminated - proxy_tasks = [p.stop() for p in self._http_proxies.values()] - proxy_tasks.extend([p.stop() for p in self._tcp_proxies.values()]) - await asyncio.gather(*proxy_tasks) + await self._stop_proxies() for cluster in self.clusters.values(): cluster.stop() @@ -474,3 +529,28 @@ async def stop(self): await asyncio.gather(*service_tasks) await cancel_and_await_tasks(*self._tasks) + + def request_suspend(self): + """Signal the runner to suspend its operation.""" + self.suspend_requested = True + + async def suspend(self): + """Suspend the application and stop the Golem engine, without killing the activities.""" + service_tasks: List[asyncio.Task] = [] + + # explicitly mark that we want dapp in terminated state + self._desired_app_state = ServiceState.suspended + + await self._stop_proxies() + + for cluster in self.clusters.values(): + cluster.suspend() + + for s in cluster.instances: + service_tasks.extend(s._tasks) + + await self.golem.stop(wait_for_payments=False) + + await asyncio.gather(*service_tasks) + + await cancel_and_await_tasks(*self._tasks) diff --git a/dapp_runner/runner/service.py b/dapp_runner/runner/service.py index d3cbe44..e27fb38 100644 --- a/dapp_runner/runner/service.py +++ b/dapp_runner/runner/service.py @@ -95,7 +95,7 @@ async def shutdown(self): yield script async def _wait_for_termination(self): - while self._previous_state != ServiceState.terminated: + while self._previous_state not in {ServiceState.terminated, ServiceState.suspended}: await asyncio.sleep(1.0) self._report_state_change()