Skip to content

Commit

Permalink
ServiceRunner and instance handles (#750)
Browse files Browse the repository at this point in the history
Main interface change: when running services in a Cluster,
spawned `Service` instances are immediately available.

Main internal changes:
* `yapapi.services` split to separate files
* `yapapi.services.Cluster` split to `ServiceRunner` and a `Cluster`

Additional interface changes: 
* `Service.reset()` - method called when a service is restarted, this was
  added because now handlers of restarted instances are called more
  then once
* One can use the `ServiceRunner` directly, e.g. via 
  `cluster.service_runner.add_instance()`, although this is not documented.

resolves #560
  • Loading branch information
johny-b authored Dec 16, 2021
1 parent bd2fbb1 commit 7df1d0b
Show file tree
Hide file tree
Showing 13 changed files with 1,151 additions and 990 deletions.
2 changes: 1 addition & 1 deletion docs/sphinx/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Service
-------

.. autoclass:: yapapi.services.Service
:members: id, provider_name, state, is_available, start, run, shutdown, send_message, send_message_nowait, receive_message, receive_message_nowait, get_payload, network, network_node
:members: id, provider_name, state, is_available, start, run, shutdown, reset, send_message, send_message_nowait, receive_message, receive_message_nowait, get_payload, network, network_node

Cluster
-------
Expand Down
4 changes: 4 additions & 0 deletions examples/custom-usage-counter/custom_usage_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ async def shutdown(self):
yield s
print(f"service {self.id} stopped on '{self.provider_name}'")

async def reset(self):
# We don't have to do anything when the service is restarted
pass


async def main(running_time_sec, subnet_tag, driver=None, network=None):

Expand Down
22 changes: 11 additions & 11 deletions examples/http-proxy/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async def handle_request(self, query_string: str):
by passing it to the instance through the VPN
"""
instance_ws = self.network_node.get_websocket_uri(80)
app_key = self.cluster._engine._api_config.app_key
app_key = self.cluster.service_runner._job.engine._api_config.app_key

print(
f"{TEXT_COLOR_GREEN}sending a remote request '{query_string}' to {self}{TEXT_COLOR_DEFAULT}"
Expand Down Expand Up @@ -150,6 +150,10 @@ async def handle_request(self, query_string: str):
await ws_session.close()
return response_text, status

async def reset(self):
# We don't have to do anything when the service is restarted
pass


# ######## Main application code which spawns the Golem service and the local HTTP server

Expand All @@ -173,19 +177,15 @@ async def main(

network = await golem.create_network("192.168.0.1/24")
cluster = await golem.run_service(HttpService, network=network, num_instances=num_instances)

def instances():
return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances]
instances = cluster.instances

def still_starting():
return len(cluster.instances) < num_instances or any(
s.state == ServiceState.starting for s in cluster.instances
)
return any(i.state in (ServiceState.pending, ServiceState.starting) for i in instances)

# wait until all remote http instances are started

while still_starting() and datetime.now() < commissioning_time + STARTING_TIMEOUT:
print(f"instances: {instances()}")
print(f"instances: {instances}")
await asyncio.sleep(5)

if still_starting():
Expand All @@ -204,7 +204,7 @@ def still_starting():
# wait until Ctrl-C

while True:
print(instances())
print(instances)
try:
await asyncio.sleep(10)
except (KeyboardInterrupt, asyncio.CancelledError):
Expand All @@ -218,8 +218,8 @@ def still_starting():
cluster.stop()

cnt = 0
while cnt < 3 and any(s.is_available for s in cluster.instances):
print(instances())
while cnt < 3 and any(s.is_available for s in instances):
print(instances)
await asyncio.sleep(5)
cnt += 1

Expand Down
25 changes: 10 additions & 15 deletions examples/simple-service-poc/simple_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ class SimpleService(Service):
SIMPLE_SERVICE = "/golem/run/simple_service.py"
SIMPLE_SERVICE_CTL = "/golem/run/simulate_observations_ctl.py"

def __repr__(self):
return f"<{self.__class__.__name__}: {self.name}>"

def __init__(self, *args, instance_name: str, show_usage: bool = False, **kwargs):
super().__init__(*args, **kwargs)
self.name = instance_name
Expand Down Expand Up @@ -111,6 +108,10 @@ async def shutdown(self):
cost = await self._ctx.get_cost()
print(f"{TEXT_COLOR_MAGENTA} --- {self.name} COST: {cost} {TEXT_COLOR_DEFAULT}")

async def reset(self):
# We don't have to do anything when the service is restarted
pass


async def main(
subnet_tag,
Expand Down Expand Up @@ -146,21 +147,15 @@ async def main(
],
expiration=datetime.now(timezone.utc) + timedelta(minutes=120),
)

# helper functions to display / filter instances

def instances():
return [f"{s.name}: {s.state.value} on {s.provider_name}" for s in cluster.instances]
instances = cluster.instances

def still_starting():
return len(cluster.instances) < num_instances or any(
s.state == ServiceState.starting for s in cluster.instances
)
return any(i.state in (ServiceState.pending, ServiceState.starting) for i in instances)

# wait until instances are started

while still_starting() and datetime.now() < commissioning_time + STARTING_TIMEOUT:
print(f"instances: {instances()}")
print(f"instances: {instances}")
await asyncio.sleep(5)

if still_starting():
Expand All @@ -174,7 +169,7 @@ def still_starting():
start_time = datetime.now()

while datetime.now() < start_time + timedelta(seconds=running_time):
print(f"instances: {instances()}")
print(f"instances: {instances}")
await asyncio.sleep(5)

print(f"{TEXT_COLOR_YELLOW}Stopping instances...{TEXT_COLOR_DEFAULT}")
Expand All @@ -184,10 +179,10 @@ def still_starting():

cnt = 0
while cnt < 10 and any(s.is_available for s in cluster.instances):
print(f"instances: {instances()}")
print(f"instances: {instances}")
await asyncio.sleep(5)

print(f"instances: {instances()}")
print(f"instances: {instances}")


if __name__ == "__main__":
Expand Down
16 changes: 9 additions & 7 deletions examples/ssh/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def start(self):
yield script

connection_uri = self.network_node.get_websocket_uri(22)
app_key = self.cluster._engine._api_config.app_key
app_key = self.cluster.service_runner._job.engine._api_config.app_key

print(
"Connect with:\n"
Expand All @@ -71,6 +71,10 @@ async def start(self):

print(f"{TEXT_COLOR_RED}password: {password}{TEXT_COLOR_DEFAULT}")

async def reset(self):
# We don't have to do anything when the service is restarted
pass


async def main(subnet_tag, payment_driver=None, payment_network=None):
# By passing `event_consumer=log_summary()` we enable summary logging.
Expand All @@ -87,12 +91,10 @@ async def main(subnet_tag, payment_driver=None, payment_network=None):
network = await golem.create_network("192.168.0.1/24")
async with network:
cluster = await golem.run_service(SshService, network=network, num_instances=2)

def instances():
return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances]
instances = cluster.instances

while True:
print(instances())
print(instances)
try:
await asyncio.sleep(5)
except (KeyboardInterrupt, asyncio.CancelledError):
Expand All @@ -101,8 +103,8 @@ def instances():
cluster.stop()

cnt = 0
while cnt < 3 and any(s.is_available for s in cluster.instances):
print(instances())
while cnt < 3 and any(s.is_available for s in instances):
print(instances)
await asyncio.sleep(5)
cnt += 1

Expand Down
63 changes: 37 additions & 26 deletions tests/services/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,62 @@
import itertools
import sys
import pytest
from unittest.mock import Mock, patch, call
from yapapi.services import Cluster, Service, ServiceError, ServiceInstance
from yapapi.services import Service, ServiceRunner
from unittest.mock import Mock, patch
from unittest import mock
from yapapi import Golem


class _TestService(Service):
pass
def __init__(self, **kwargs):
super().__init__()
self.init_kwargs = kwargs


class _BrokenService(Service):
async def start(self):
await asyncio.Future()


def _get_cluster() -> Cluster:
return Cluster(engine=Mock(), service_class=_TestService, payload=Mock())


@pytest.mark.parametrize(
"kwargs, calls, error",
"kwargs, args, error",
[
(
{"num_instances": 1},
[call({}, None)],
[({}, None)],
None,
),
(
{"num_instances": 3},
[call({}, None) for _ in range(3)],
[({}, None) for _ in range(3)],
None,
),
(
{"instance_params": [{}]},
[call({}, None)],
[({}, None)],
None,
),
(
{"instance_params": [{"n": 1}, {"n": 2}]},
[call({"n": 1}, None), call({"n": 2}, None)],
[({"n": 1}, None), ({"n": 2}, None)],
None,
),
(
# num_instances takes precedence
{"num_instances": 2, "instance_params": [{} for _ in range(3)]},
[call({}, None), call({}, None)],
[({}, None), ({}, None)],
None,
),
(
# num_instances takes precedence
{"num_instances": 3, "instance_params": ({"n": i} for i in itertools.count(1))},
[call({"n": 1}, None), call({"n": 2}, None), call({"n": 3}, None)],
[({"n": 1}, None), ({"n": 2}, None), ({"n": 3}, None)],
None,
),
(
# num_instances takes precedence
{"num_instances": 4, "instance_params": [{} for _ in range(3)]},
[call({}, None) for _ in range(3)],
[({}, None) for _ in range(3)],
"`instance_params` iterable depleted after 3 spawned instances.",
),
(
Expand All @@ -73,40 +73,51 @@ def _get_cluster() -> Cluster:
"10.0.0.2",
],
},
[call({}, "10.0.0.1"), call({}, "10.0.0.2"), call({}, None)],
[({}, "10.0.0.1"), ({}, "10.0.0.2"), ({}, None)],
None,
),
],
)
@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 8), reason="AsyncMock requires python 3.8+")
async def test_spawn_instances(kwargs, calls, error):
with patch("yapapi.services.Cluster.spawn_instance") as spawn_instance:
cluster = _get_cluster()
async def test_spawn_instances(kwargs, args, error, monkeypatch):
def _get_new_engine(self):
return mock.AsyncMock()

monkeypatch.setattr(Golem, "_get_new_engine", _get_new_engine)

with patch("yapapi.services.ServiceRunner.spawn_instance") as spawn_instance:
golem = Golem(budget=1)
try:
cluster.spawn_instances(**kwargs)
except ServiceError as e:
await golem.run_service(
service_class=_TestService, payload=Mock(), network=Mock(), **kwargs
)
except ValueError as e:
if error is not None:
assert str(e) == error
else:
assert False, e
else:
assert error is None, f"Expected ServiceError: {error}"

assert spawn_instance.mock_calls == calls
assert len(spawn_instance.mock_calls) == len(args)
for call_args, args in zip(spawn_instance.call_args_list, args):
service, network, network_address, restart_condition = call_args[0]
assert service.init_kwargs == args[0]
assert network_address == args[1]


@pytest.mark.parametrize(
"service, error",
(
(_TestService(Mock(), Mock()), None),
(_BrokenService(Mock(), Mock()), "must be an asynchronous generator"),
(_TestService(), None),
(_BrokenService(), "must be an asynchronous generator"),
),
)
def test_get_handler(service, error):
service_instance = ServiceInstance(service=service)
service.service_instance.service_state.lifecycle() # pending -> starting
try:
handler = Cluster._get_handler(service_instance)
handler = ServiceRunner._get_handler(service.service_instance)
assert handler
except TypeError as e:
if error is not None:
Expand Down
Loading

0 comments on commit 7df1d0b

Please sign in to comment.