Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suspend and Resume #118

Merged
merged 5 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dapp_runner/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ def _get_run_dir(run_id: str) -> Path:
is_flag=True,
help="Don't validate and report issues with the manifest, its certificate or signature.",
)
@click.option(
"--resume",
is_flag=True,
help="Treat the app descriptor a suspended application's GAOM state to resume from.",
)
def start(
descriptors: Tuple[Path],
config: Path,
Expand Down
19 changes: 18 additions & 1 deletion dapp_runner/api/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Dapp Runner API."""
import yaml
from fastapi import FastAPI
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.responses import JSONResponse, PlainTextResponse, RedirectResponse

from dapp_runner.runner import Runner

Expand All @@ -18,3 +19,19 @@ async def get_gaom():
"""Retrieve the application's GAOM tree."""
dapp_dict = Runner.get_instance().dapp.dict()
return JSONResponse(content=dapp_dict)


@app.post("/suspend")
async def suspend():
"""
Suspend the app.

Stop the dapp-runner without killing the services.
Send back the YAML-encoded GAOM tree state from just before suspension.
"""
runner = Runner.get_instance()
dapp_dict = runner.dapp.dict()

runner.request_suspend()

return PlainTextResponse(content=yaml.dump(dapp_dict))
8 changes: 8 additions & 0 deletions dapp_runner/descriptor/dapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,26 @@ class NetworkDescriptor(GaomBase):
owner_ip: Optional[str] = None
mask: Optional[str] = None
gateway: Optional[str] = None
network_id: Optional[str] = Field(runtime=True)
owner_id: Optional[str] = Field(runtime=True)
state: Optional[str] = Field(runtime=True, default=None)

class Config: # noqa: D106
extra = "forbid"

@classmethod
def from_network(cls, network: Network):
"""Get a NetworkDescriptor based on yapapi's `Network` object."""
network_serialized = network.serialize()

return cls(
ip=network.network_address,
owner_ip=network.owner_ip,
mask=network.netmask,
gateway=network.gateway,
network_id=network_serialized["_network_id"],
owner_id=network_serialized["owner_id"],
state=network_serialized["state"],
)


Expand Down
39 changes: 26 additions & 13 deletions dapp_runner/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ async def _run_app(
commands_f: Optional[TextIO],
silent=False,
skip_manifest_validation=False,
resume=False,
):
"""Run the dapp using the Runner."""

config = Config(**config_dict)
_update_api_config(config, api_config_dict)

Expand All @@ -60,9 +62,9 @@ async def _run_app(
r = Runner(config=config, dapp=dapp)
_print_env_info(r.golem)
if dapp.meta.name:
print(f"Starting app: {green(dapp.meta.name)}\n")
print(f"{'Starting' if not resume else 'Resuming'} app: {green(dapp.meta.name)}\n")

await r.start()
await r.start(resume=resume)
streamer = RunnerStreamer()
streamer.register_stream(
r.state_queue, state_f, lambda msg: json.dumps(msg, default=json_encoder)
Expand All @@ -88,7 +90,8 @@ async def _run_app(
max_running_time = timedelta(seconds=config.limits.max_running_time)

logger.info(
"Starting app: %s, startup timeout: %s, maximum running time: %s",
f"{'Starting' if not resume else 'Resuming'} app: %s, "
"startup timeout: %s, maximum running time: %s",
dapp.meta.name,
startup_timeout,
max_running_time,
Expand All @@ -112,6 +115,7 @@ async def _run_app(

while (
r.dapp_started
and not r.suspend_requested
and not r.api_shutdown
and not _running_time_elapsed(time_started, max_running_time)
):
Expand All @@ -120,9 +124,16 @@ async def _run_app(
if _running_time_elapsed(time_started, max_running_time):
logger.info("Maximum running time: %s elapsed.", max_running_time)

logger.info("Stopping the application...")
await r.stop()
if not r.suspend_requested:
logger.info("Stopping the application...")
await r.stop()
else:
logger.info("Suspending the application...")
await r.suspend()

logger.info("Stopping streamer...")
await streamer.stop()
logger.info("Streamer stopped...")


def start_runner(
Expand All @@ -140,6 +151,7 @@ def start_runner(
stderr: Optional[Path] = None,
silent=False,
skip_manifest_validation=False,
resume=False,
):
"""Launch the runner in an asyncio loop and wait for its shutdown."""

Expand All @@ -165,14 +177,15 @@ def start_runner(
loop = asyncio.get_event_loop()
task = loop.create_task(
_run_app(
config_dict,
api_config_dict,
dapp_dict,
data_f,
state_f,
commands_f,
silent,
skip_manifest_validation,
config_dict=config_dict,
api_config_dict=api_config_dict,
dapp_dict=dapp_dict,
data_f=data_f,
state_f=state_f,
commands_f=commands_f,
silent=silent,
skip_manifest_validation=skip_manifest_validation,
resume=resume,
)
)

Expand Down
106 changes: 93 additions & 13 deletions dapp_runner/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

import uvicorn

from ya_net.exceptions import ApiException
from yapapi import Golem
from yapapi.config import ApiConfig
from yapapi.contrib.service.http_proxy import LocalHttpProxy
from yapapi.contrib.service.socket_proxy import SocketProxy
from yapapi.events import CommandExecuted
from yapapi.network import Network
from yapapi.payload import Payload
from yapapi.services import Cluster, Service, ServiceState
from yapapi.services import Cluster, Service, ServiceSerialization, ServiceState

from dapp_runner._util import FreePortProvider, cancel_and_await_tasks, utcnow, utcnow_iso_str
from dapp_runner.descriptor import Config, DappDescriptor
Expand All @@ -27,6 +28,7 @@
ServiceDescriptor,
)

from .error import RunnerError
from .payload import get_payload
from .service import DappService, get_service

Expand Down Expand Up @@ -60,6 +62,7 @@ class Runner:
_networks: Dict[str, Network]
_tasks: List[asyncio.Task]
_startup_finished: bool
suspend_requested: bool

# TODO: Introduce ApplicationState instead of reusing ServiceState
_desired_app_state: ServiceState
Expand Down Expand Up @@ -92,6 +95,7 @@ def __init__(self, config: Config, dapp: DappDescriptor):
self.state_queue = asyncio.Queue()
self.command_queue = asyncio.Queue()
self._startup_finished = False
self.suspend_requested = False
self._desired_app_state = ServiceState.pending

self._report_status_change()
Expand Down Expand Up @@ -122,9 +126,26 @@ async def _start_api(self):
self.api_server = uvicorn.Server(config)
self._tasks.append(asyncio.create_task(self._serve_api()))

async def _create_networks(self):
async def _create_networks(self, resume=False):
for name, desc in self.dapp.networks.items():
network = await self.golem.create_network(**desc.dict())
if resume and desc.network_id:
desc_dict = desc.dict()
if desc_dict.get("mask"):
desc_dict["ip"] = f"{desc_dict.get('ip')}/{desc_dict.get('mask')}"
desc_dict["_network_id"] = desc_dict.pop("network_id")
desc_dict["nodes"] = {}
try:
network = await self.golem.resume_network(desc_dict) # type: ignore [arg-type]
except ApiException:
raise RunnerError(
f"Could not resume network {desc_dict['_network_id']}. "
"Probably it has already been destroyed.",
)
else:
network = await self.golem.create_network(
**{k: getattr(desc, k) for k in {"ip", "owner_ip", "mask", "gateway"}}
)

self._networks[name] = network

self.dapp.networks[name] = NetworkDescriptor.from_network(network)
Expand Down Expand Up @@ -167,7 +188,9 @@ async def _start_local_tcp_proxy(self, name: str, service: Service, port_mapping
port_mapping.local_port = port
port_mapping.address = proxy_address

async def _start_service(self, service_name: str, service_descriptor: ServiceDescriptor):
async def _start_service(
self, service_name: str, service_descriptor: ServiceDescriptor, resume=False
):
# if this service depends on another, wait until the dependency is up
if service_descriptor.depends_on:
for depends_name in service_descriptor.depends_on:
Expand All @@ -183,7 +206,27 @@ async def _start_service(self, service_name: str, service_descriptor: ServiceDes
cluster_class, run_params = await get_service(
service_name, service_descriptor, self._payloads, self._networks
)
cluster = await self.start_cluster(service_name, cluster_class, run_params)
if not resume:
cluster = await self.start_cluster(service_name, cluster_class, run_params)
else:
run_params["instances"] = [
ServiceSerialization(
params=params,
activity_id=service_descriptor.activity.id
if service_descriptor.activity
else None,
agreement_id=service_descriptor.agreement.id
if service_descriptor.agreement
else None,
state=service_descriptor.state or ServiceState.pending.value,
network_node=service_descriptor.network_node.dict()
if service_descriptor.network_node
else None,
)
for params in run_params.pop("instance_params")
]
run_params.pop("network_addresses", None)
cluster = await self.resume_cluster(service_name, cluster_class, run_params)

# start the tasks for the local proxies so that
# it doesn't delay the initialization process
Expand Down Expand Up @@ -212,13 +255,13 @@ async def _start_service(self, service_name: str, service_descriptor: ServiceDes
]
)

async def _start_services(self):
async def _start_services(self, resume=False):
for service_name, service_descriptor in self.dapp.nodes_prioritized():
await self._start_service(service_name, service_descriptor)
await self._start_service(service_name, service_descriptor, resume=resume)

self._startup_finished = True

async def start(self):
async def start(self, resume=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer start, _create_networks and _start_service with resume=True to be separated methods. e.g. resume, _resume_networks and _resume_service.

This would limit the number of times resume arg is passed around and frequency of resume being checked in if statements.

I also wonder if resume realy is an options of exciting start command or should it be a separated command.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you can see there's still considerable overlap in the logic that happens nevertheless when executing those steps, so in my opinion "resume" still seems like a mode of starting the runner rather than a completely separate operation...

otherwise, we'll have to pull the logic to a larger number of internal methods, which will make the code more complicated...

so while I don't right out disagree with you, I feel specifying resume as a flag of the binary and a parameter of the various functions that start the components is reasonable and don't see strong arguments to separate them completely...

"""Start the Golem engine and the dapp."""

if self.config.api.enabled:
Expand All @@ -232,13 +275,13 @@ async def start(self):

await self.golem.start()

await self._create_networks()
await self._create_networks(resume=resume)
await self._load_payloads()

# we start services in a separate task,
# so that the service state can be tracked
# while the dapp is starting
self._tasks.append(asyncio.create_task(self._start_services()))
self._tasks.append(asyncio.create_task(self._start_services(resume=resume)))

# launch the incoming command processor
self._tasks.append(asyncio.create_task(self._listen_incoming_command_queue()))
Expand All @@ -249,6 +292,12 @@ async def start_cluster(self, cluster_name, cluster_class, run_params):
self.clusters[cluster_name] = cluster
return cluster

async def resume_cluster(self, cluster_name, cluster_class, run_params):
"""Resume control over an existing service cluster."""
cluster = await self.golem.resume_service(cluster_class, **run_params)
self.clusters[cluster_name] = cluster
return cluster

@property
def dapp_state(self) -> Dict[str, Dict[int, ServiceState]]:
"""Return the state of the dapp.
Expand Down Expand Up @@ -384,6 +433,8 @@ def _get_app_state_from_nodes(
return ServiceState.terminated
else:
return ServiceState.stopping
elif self._desired_app_state == ServiceState.suspended:
return ServiceState.suspended

# In other cases return pending
return ServiceState.pending
Expand Down Expand Up @@ -449,16 +500,20 @@ def _is_cluster_state(self, cluster_id: str, state: ServiceState) -> bool:
"""Return True if the state of all instances in the cluster is `state`."""
return all(s.state == state for s in self.clusters[cluster_id].instances)

async def _stop_proxies(self):
"""Stop the HTTP and TCP proxies."""
proxy_tasks = [p.stop() for p in self._http_proxies.values()]
proxy_tasks.extend([p.stop() for p in self._tcp_proxies.values()])
await asyncio.gather(*proxy_tasks)

async def stop(self):
"""Stop the dapp and the Golem engine."""
service_tasks: List[asyncio.Task] = []

# explicitly mark that we want dapp in terminated state
self._desired_app_state = ServiceState.terminated

proxy_tasks = [p.stop() for p in self._http_proxies.values()]
proxy_tasks.extend([p.stop() for p in self._tcp_proxies.values()])
await asyncio.gather(*proxy_tasks)
await self._stop_proxies()

for cluster in self.clusters.values():
cluster.stop()
Expand All @@ -474,3 +529,28 @@ async def stop(self):
await asyncio.gather(*service_tasks)

await cancel_and_await_tasks(*self._tasks)

def request_suspend(self):
"""Signal the runner to suspend its operation."""
self.suspend_requested = True

async def suspend(self):
"""Suspend the application and stop the Golem engine, without killing the activities."""
service_tasks: List[asyncio.Task] = []

# explicitly mark that we want dapp in terminated state
self._desired_app_state = ServiceState.suspended

await self._stop_proxies()

for cluster in self.clusters.values():
cluster.suspend()

for s in cluster.instances:
service_tasks.extend(s._tasks)

await self.golem.stop(wait_for_payments=False)

await asyncio.gather(*service_tasks)

await cancel_and_await_tasks(*self._tasks)
2 changes: 1 addition & 1 deletion dapp_runner/runner/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def shutdown(self):
yield script

async def _wait_for_termination(self):
while self._previous_state != ServiceState.terminated:
while self._previous_state not in {ServiceState.terminated, ServiceState.suspended}:
await asyncio.sleep(1.0)
self._report_state_change()

Expand Down