Skip to content

Commit

Permalink
Merge pull request #118 from golemfactory/blue/suspend-resume
Browse files Browse the repository at this point in the history
Suspend and Resume
  • Loading branch information
shadeofblue authored Jul 7, 2023
2 parents ce0cb03 + f542f58 commit b1a15ac
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 28 deletions.
5 changes: 5 additions & 0 deletions dapp_runner/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ def _get_run_dir(run_id: str) -> Path:
is_flag=True,
help="Don't validate and report issues with the manifest, its certificate or signature.",
)
@click.option(
"--resume",
is_flag=True,
help="Treat the app descriptor a suspended application's GAOM state to resume from.",
)
def start(
descriptors: Tuple[Path],
config: Path,
Expand Down
19 changes: 18 additions & 1 deletion dapp_runner/api/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Dapp Runner API."""
import yaml
from fastapi import FastAPI
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse

from dapp_runner.runner import Runner

Expand All @@ -18,3 +19,19 @@ async def get_gaom():
"""Retrieve the application's GAOM tree."""
dapp_dict = Runner.get_instance().dapp.dict()
return JSONResponse(content=dapp_dict)


@app.post("/suspend")
async def suspend():
"""
Suspend the app.
Stop the dapp-runner without killing the services.
Send back the YAML-encoded GAOM tree state from just before suspension.
"""
runner = Runner.get_instance()
dapp_dict = runner.dapp.dict()

runner.request_suspend()

return PlainTextResponse(content=yaml.dump(dapp_dict))
8 changes: 8 additions & 0 deletions dapp_runner/descriptor/dapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,26 @@ class NetworkDescriptor(GaomBase):
owner_ip: Optional[str] = None
mask: Optional[str] = None
gateway: Optional[str] = None
network_id: Optional[str] = Field(runtime=True)
owner_id: Optional[str] = Field(runtime=True)
state: Optional[str] = Field(runtime=True, default=None)

class Config: # noqa: D106
extra = "forbid"

@classmethod
def from_network(cls, network: Network):
"""Get a NetworkDescriptor based on yapapi's `Network` object."""
network_serialized = network.serialize()

return cls(
ip=network.network_address,
owner_ip=network.owner_ip,
mask=network.netmask,
gateway=network.gateway,
network_id=network_serialized["_network_id"],
owner_id=network_serialized["owner_id"],
state=network_serialized["state"],
)


Expand Down
39 changes: 26 additions & 13 deletions dapp_runner/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ async def _run_app(
commands_f: Optional[TextIO],
silent=False,
skip_manifest_validation=False,
resume=False,
):
"""Run the dapp using the Runner."""

config = Config(**config_dict)
_update_api_config(config, api_config_dict)

Expand All @@ -60,9 +62,9 @@ async def _run_app(
r = Runner(config=config, dapp=dapp)
_print_env_info(r.golem)
if dapp.meta.name:
print(f"Starting app: {green(dapp.meta.name)}\n")
print(f"{'Starting' if not resume else 'Resuming'} app: {green(dapp.meta.name)}\n")

await r.start()
await r.start(resume=resume)
streamer = RunnerStreamer()
streamer.register_stream(
r.state_queue, state_f, lambda msg: json.dumps(msg, default=json_encoder)
Expand All @@ -88,7 +90,8 @@ async def _run_app(
max_running_time = timedelta(seconds=config.limits.max_running_time)

logger.info(
"Starting app: %s, startup timeout: %s, maximum running time: %s",
f"{'Starting' if not resume else 'Resuming'} app: %s, "
"startup timeout: %s, maximum running time: %s",
dapp.meta.name,
startup_timeout,
max_running_time,
Expand All @@ -112,6 +115,7 @@ async def _run_app(

while (
r.dapp_started
and not r.suspend_requested
and not r.api_shutdown
and not _running_time_elapsed(time_started, max_running_time)
):
Expand All @@ -120,9 +124,16 @@ async def _run_app(
if _running_time_elapsed(time_started, max_running_time):
logger.info("Maximum running time: %s elapsed.", max_running_time)

logger.info("Stopping the application...")
await r.stop()
if not r.suspend_requested:
logger.info("Stopping the application...")
await r.stop()
else:
logger.info("Suspending the application...")
await r.suspend()

logger.info("Stopping streamer...")
await streamer.stop()
logger.info("Streamer stopped...")


def start_runner(
Expand All @@ -140,6 +151,7 @@ def start_runner(
stderr: Optional[Path] = None,
silent=False,
skip_manifest_validation=False,
resume=False,
):
"""Launch the runner in an asyncio loop and wait for its shutdown."""

Expand All @@ -165,14 +177,15 @@ def start_runner(
loop = asyncio.get_event_loop()
task = loop.create_task(
_run_app(
config_dict,
api_config_dict,
dapp_dict,
data_f,
state_f,
commands_f,
silent,
skip_manifest_validation,
config_dict=config_dict,
api_config_dict=api_config_dict,
dapp_dict=dapp_dict,
data_f=data_f,
state_f=state_f,
commands_f=commands_f,
silent=silent,
skip_manifest_validation=skip_manifest_validation,
resume=resume,
)
)

Expand Down
106 changes: 93 additions & 13 deletions dapp_runner/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

import uvicorn

from ya_net.exceptions import ApiException
from yapapi import Golem
from yapapi.config import ApiConfig
from yapapi.contrib.service.http_proxy import LocalHttpProxy
from yapapi.contrib.service.socket_proxy import SocketProxy
from yapapi.events import CommandExecuted
from yapapi.network import Network
from yapapi.payload import Payload
from yapapi.services import Cluster, Service, ServiceState
from yapapi.services import Cluster, Service, ServiceSerialization, ServiceState

from dapp_runner._util import FreePortProvider, cancel_and_await_tasks, utcnow, utcnow_iso_str
from dapp_runner.descriptor import Config, DappDescriptor
Expand All @@ -27,6 +28,7 @@
ServiceDescriptor,
)

from .error import RunnerError
from .payload import get_payload
from .service import DappService, get_service

Expand Down Expand Up @@ -60,6 +62,7 @@ class Runner:
_networks: Dict[str, Network]
_tasks: List[asyncio.Task]
_startup_finished: bool
suspend_requested: bool

# TODO: Introduce ApplicationState instead of reusing ServiceState
_desired_app_state: ServiceState
Expand Down Expand Up @@ -92,6 +95,7 @@ def __init__(self, config: Config, dapp: DappDescriptor):
self.state_queue = asyncio.Queue()
self.command_queue = asyncio.Queue()
self._startup_finished = False
self.suspend_requested = False
self._desired_app_state = ServiceState.pending

self._report_status_change()
Expand Down Expand Up @@ -122,9 +126,26 @@ async def _start_api(self):
self.api_server = uvicorn.Server(config)
self._tasks.append(asyncio.create_task(self._serve_api()))

async def _create_networks(self):
async def _create_networks(self, resume=False):
for name, desc in self.dapp.networks.items():
network = await self.golem.create_network(**desc.dict())
if resume and desc.network_id:
desc_dict = desc.dict()
if desc_dict.get("mask"):
desc_dict["ip"] = f"{desc_dict.get('ip')}/{desc_dict.get('mask')}"
desc_dict["_network_id"] = desc_dict.pop("network_id")
desc_dict["nodes"] = {}
try:
network = await self.golem.resume_network(desc_dict) # type: ignore [arg-type]
except ApiException:
raise RunnerError(
f"Could not resume network {desc_dict['_network_id']}. "
"Probably it has already been destroyed.",
)
else:
network = await self.golem.create_network(
**{k: getattr(desc, k) for k in {"ip", "owner_ip", "mask", "gateway"}}
)

self._networks[name] = network

self.dapp.networks[name] = NetworkDescriptor.from_network(network)
Expand Down Expand Up @@ -167,7 +188,9 @@ async def _start_local_tcp_proxy(self, name: str, service: Service, port_mapping
port_mapping.local_port = port
port_mapping.address = proxy_address

async def _start_service(self, service_name: str, service_descriptor: ServiceDescriptor):
async def _start_service(
self, service_name: str, service_descriptor: ServiceDescriptor, resume=False
):
# if this service depends on another, wait until the dependency is up
if service_descriptor.depends_on:
for depends_name in service_descriptor.depends_on:
Expand All @@ -183,7 +206,27 @@ async def _start_service(self, service_name: str, service_descriptor: ServiceDes
cluster_class, run_params = await get_service(
service_name, service_descriptor, self._payloads, self._networks
)
cluster = await self.start_cluster(service_name, cluster_class, run_params)
if not resume:
cluster = await self.start_cluster(service_name, cluster_class, run_params)
else:
run_params["instances"] = [
ServiceSerialization(
params=params,
activity_id=service_descriptor.activity.id
if service_descriptor.activity
else None,
agreement_id=service_descriptor.agreement.id
if service_descriptor.agreement
else None,
state=service_descriptor.state or ServiceState.pending.value,
network_node=service_descriptor.network_node.dict()
if service_descriptor.network_node
else None,
)
for params in run_params.pop("instance_params")
]
run_params.pop("network_addresses", None)
cluster = await self.resume_cluster(service_name, cluster_class, run_params)

# start the tasks for the local proxies so that
# it doesn't delay the initialization process
Expand Down Expand Up @@ -212,13 +255,13 @@ async def _start_service(self, service_name: str, service_descriptor: ServiceDes
]
)

async def _start_services(self):
async def _start_services(self, resume=False):
for service_name, service_descriptor in self.dapp.nodes_prioritized():
await self._start_service(service_name, service_descriptor)
await self._start_service(service_name, service_descriptor, resume=resume)

self._startup_finished = True

async def start(self):
async def start(self, resume=False):
"""Start the Golem engine and the dapp."""

if self.config.api.enabled:
Expand All @@ -232,13 +275,13 @@ async def start(self):

await self.golem.start()

await self._create_networks()
await self._create_networks(resume=resume)
await self._load_payloads()

# we start services in a separate task,
# so that the service state can be tracked
# while the dapp is starting
self._tasks.append(asyncio.create_task(self._start_services()))
self._tasks.append(asyncio.create_task(self._start_services(resume=resume)))

# launch the incoming command processor
self._tasks.append(asyncio.create_task(self._listen_incoming_command_queue()))
Expand All @@ -249,6 +292,12 @@ async def start_cluster(self, cluster_name, cluster_class, run_params):
self.clusters[cluster_name] = cluster
return cluster

async def resume_cluster(self, cluster_name, cluster_class, run_params):
"""Resume control over an existing service cluster."""
cluster = await self.golem.resume_service(cluster_class, **run_params)
self.clusters[cluster_name] = cluster
return cluster

@property
def dapp_state(self) -> Dict[str, Dict[int, ServiceState]]:
"""Return the state of the dapp.
Expand Down Expand Up @@ -384,6 +433,8 @@ def _get_app_state_from_nodes(
return ServiceState.terminated
else:
return ServiceState.stopping
elif self._desired_app_state == ServiceState.suspended:
return ServiceState.suspended

# In other cases return pending
return ServiceState.pending
Expand Down Expand Up @@ -449,16 +500,20 @@ def _is_cluster_state(self, cluster_id: str, state: ServiceState) -> bool:
"""Return True if the state of all instances in the cluster is `state`."""
return all(s.state == state for s in self.clusters[cluster_id].instances)

async def _stop_proxies(self):
"""Stop the HTTP and TCP proxies."""
proxy_tasks = [p.stop() for p in self._http_proxies.values()]
proxy_tasks.extend([p.stop() for p in self._tcp_proxies.values()])
await asyncio.gather(*proxy_tasks)

async def stop(self):
"""Stop the dapp and the Golem engine."""
service_tasks: List[asyncio.Task] = []

# explicitly mark that we want dapp in terminated state
self._desired_app_state = ServiceState.terminated

proxy_tasks = [p.stop() for p in self._http_proxies.values()]
proxy_tasks.extend([p.stop() for p in self._tcp_proxies.values()])
await asyncio.gather(*proxy_tasks)
await self._stop_proxies()

for cluster in self.clusters.values():
cluster.stop()
Expand All @@ -474,3 +529,28 @@ async def stop(self):
await asyncio.gather(*service_tasks)

await cancel_and_await_tasks(*self._tasks)

def request_suspend(self):
"""Signal the runner to suspend its operation."""
self.suspend_requested = True

async def suspend(self):
"""Suspend the application and stop the Golem engine, without killing the activities."""
service_tasks: List[asyncio.Task] = []

# explicitly mark that we want dapp in terminated state
self._desired_app_state = ServiceState.suspended

await self._stop_proxies()

for cluster in self.clusters.values():
cluster.suspend()

for s in cluster.instances:
service_tasks.extend(s._tasks)

await self.golem.stop(wait_for_payments=False)

await asyncio.gather(*service_tasks)

await cancel_and_await_tasks(*self._tasks)
2 changes: 1 addition & 1 deletion dapp_runner/runner/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def shutdown(self):
yield script

async def _wait_for_termination(self):
while self._previous_state != ServiceState.terminated:
while self._previous_state not in {ServiceState.terminated, ServiceState.suspended}:
await asyncio.sleep(1.0)
self._report_state_change()

Expand Down

0 comments on commit b1a15ac

Please sign in to comment.