Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
shadeofblue committed Jun 20, 2023
1 parent 6b22114 commit 9366cfc
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 72 deletions.
22 changes: 10 additions & 12 deletions examples/webapp/webapp_suspend_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from yapapi.props import com
from yapapi.services import Service, ServiceState
from yapapi.strategy import (
PROP_DEBIT_NOTE_INTERVAL_SEC,
DecreaseScoreForUnconfirmedAgreement,
LeastExpensiveLinearPayuMS,
PROP_DEBIT_NOTE_INTERVAL_SEC,
PropValueRange,
)

Expand Down Expand Up @@ -79,7 +79,10 @@ async def start(self):
yield script

def _serialize_init_params(self):
return {"db_address": self._db_address, "db_port": self._db_port, }
return {
"db_address": self._db_address,
"db_port": self._db_port,
}


class DbService(Service):
Expand Down Expand Up @@ -108,7 +111,6 @@ class MyMarketStrategy(LeastExpensiveLinearPayuMS):


async def main(subnet_tag, payment_driver, payment_network, port):

base_strategy = MyMarketStrategy(
max_fixed_price=Decimal("1.0"),
max_price_for={com.Counter.CPU: Decimal("0.2"), com.Counter.TIME: Decimal("0.1")},
Expand Down Expand Up @@ -186,7 +188,7 @@ def raise_exception_if_still_starting(cluster):
await proxy.stop()
print(f"{TEXT_COLOR_CYAN}HTTP server stopped{TEXT_COLOR_DEFAULT}")

print("=================================================================== SERIALIZING AND DROPPING CURRENT STATE")
print("=============================================== SERIALIZING AND DROPPING CURRENT STATE")

network_serialized = network.serialize()
db_serialized = db_cluster.serialize_instances()
Expand All @@ -198,22 +200,21 @@ def raise_exception_if_still_starting(cluster):
print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}")
await asyncio.sleep(secs)

print("=================================================================== STOPPING GOLEM ENGINE")
print("=============================================== STOPPING GOLEM ENGINE")

await golem.stop(wait_for_payments=False)

print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}")
await asyncio.sleep(secs)


print("=================================================================== SERIALIZED STATE: ")
print("=============================================== SERIALIZED STATE: ")

print(json.dumps([network_serialized, db_serialized, web_serialized], indent=4))

print(f"{TEXT_COLOR_CYAN}waiting {secs} seconds...{TEXT_COLOR_DEFAULT}")
await asyncio.sleep(secs)

print("=================================================================== RESTARTING THE ENGINE AND THE SERVICES")
print("=============================================== RESTARTING THE ENGINE AND THE SERVICES")

golem = Golem(
budget=1.0,
Expand All @@ -226,7 +227,6 @@ def raise_exception_if_still_starting(cluster):

print_env_info(golem)


network = Network.deserialize(golem._engine._net_api, network_serialized)

db_cluster = await golem.resume_service(DbService, instances=db_serialized, network=network)
Expand Down Expand Up @@ -262,9 +262,7 @@ def raise_exception_if_still_starting(cluster):
db_cluster.stop()

cnt = 0
while cnt < 3 and any(
s.is_available for s in web_cluster.instances + db_cluster.instances
):
while cnt < 3 and any(s.is_available for s in web_cluster.instances + db_cluster.instances):
print(web_cluster.instances + db_cluster.instances)
await asyncio.sleep(5)
cnt += 1
Expand Down
2 changes: 1 addition & 1 deletion yapapi/agreements_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from yapapi import events
from yapapi.props import Activity, NodeInfo
from yapapi.rest.market import Agreement, AgreementDetails, ApiException, OfferProposal, Market
from yapapi.rest.market import Agreement, AgreementDetails, ApiException, Market, OfferProposal

logger = logging.getLogger(__name__)

Expand Down
37 changes: 25 additions & 12 deletions yapapi/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,8 @@ async def _process_invoices(self) -> None:
invoice_manager.add_invoice(invoice)
await self._agreement_payment_attempt(invoice.agreement_id)
if self._payment_closing and not (
self._await_payments and invoice_manager.has_payable_unpaid_agreements
self._await_payments and invoice_manager.has_payable_unpaid_agreements
):

break

async def accept_payments_for_agreement(self, job_id: str, agreement_id: str) -> None:
Expand Down Expand Up @@ -638,9 +637,7 @@ async def create_activity(self, agreement_id: str) -> Activity:

async def fetch_activity(self, activity_id: str) -> Activity:
"""Create an activity for given `agreement_id`."""
return await self._activity_api.use_activity(
activity_id, stream_events=self._stream_output
)
return await self._activity_api.use_activity(activity_id, stream_events=self._stream_output)

async def start_worker(
self,
Expand All @@ -650,6 +647,22 @@ async def start_worker(
existing_agreement_id: Optional[str] = None,
existing_activity_id: Optional[str] = None,
) -> Optional[asyncio.Task]:
"""Start a single worker (activity) within a Job.
:param job: :class:`Job` within which the worker is launched.
:param run_worker: an async function which receives the work context and performs
any neccessary operations on it. If the function returns `True`, the activity
respective :class:`WorkContext` won't be terminated after the worker finishes.
:param on_agreement_ready: an optional callable to be called when the agreement is
created or initialized
:param existing_agreement_id: optional identifier of an existing agreement.
if given, the engine will attempt to use this agreement to launch the activity,
instead of signing a new one.
:param existing_activity_id: optional identifier of an existing activity.
if given, the engine won't launch a new activity and will try to reuse an existing one
instead.
"""

loop = asyncio.get_event_loop()

async def worker_task(agreement: Agreement):
Expand Down Expand Up @@ -680,7 +693,7 @@ async def worker_task(agreement: Agreement):
"Activity init failed with error: %s. agreement: %s, existing activity id: %s",
e,
agreement.id,
existing_activity_id
existing_activity_id,
)
job.emit(events.ActivityCreateFailed, agreement=agreement, exc_info=sys.exc_info())
raise
Expand Down Expand Up @@ -729,10 +742,7 @@ async def process_batches(
) -> None:
"""Send command batches produced by `batch_generator` to `activity`."""

try:
script: Script = await batch_generator.__anext__()
except Exception as e:
raise
script: Script = await batch_generator.__anext__()

while True:
batch_deadline = (
Expand All @@ -744,6 +754,7 @@ async def process_batches(
batch: List[BatchCommand] = script._evaluate()
remote = await activity.send(batch, deadline=batch_deadline)
except Exception as e:
logger.error("Error executing script %s: %s(%s).", script, type(e), str(e))
script = await batch_generator.athrow(*sys.exc_info())
continue

Expand Down Expand Up @@ -786,7 +797,7 @@ async def get_batch_results() -> List[events.CommandEvent]:
future_results = loop.create_task(get_batch_results())
script = await batch_generator.asend(future_results)

def recycle_offer(self, offer: OfferProposal) -> None:
def recycle_offer(self, offer: OfferProposal) -> None:
"""Mark given offer as a fresh one, regardless of its previous processing.
This offer was already processed, but something happened and we should treat it as a
Expand Down Expand Up @@ -868,7 +879,9 @@ def __init__(
self.expiration_time: datetime = expiration_time
self.payload: Payload = payload

self.agreements_pool = AgreementsPool(self.emit, self.engine.recycle_offer, market_api=self.engine._market_api)
self.agreements_pool = AgreementsPool(
self.emit, self.engine.recycle_offer, market_api=self.engine._market_api
)
self.finished = asyncio.Event()

self._demand_builder: Optional[DemandBuilder] = None
Expand Down
4 changes: 3 additions & 1 deletion yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async def input_tasks() -> AsyncIterator[Task[D, R]]:

work_queue = SmartQueue(input_tasks())

async def run_worker(work_context: WorkContext) -> None:
async def run_worker(work_context: WorkContext) -> bool:
"""Run an instance of `worker` for the particular work context."""
agreement = work_context._agreement
activity = work_context._activity
Expand Down Expand Up @@ -237,6 +237,8 @@ async def task_generator() -> AsyncGenerator[Task[D, R], None]:
if consumer.finished:
raise StopAsyncIteration()

return False

async def worker_starter() -> None:
while True:
await asyncio.sleep(2)
Expand Down
6 changes: 4 additions & 2 deletions yapapi/golem.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from yapapi.payload import Payload
from yapapi.props import com
from yapapi.script import Script
from yapapi.services import Cluster, ServiceType, ServiceSerialization
from yapapi.services import Cluster, ServiceSerialization, ServiceType
from yapapi.strategy import DecreaseScoreForUnconfirmedAgreement, LeastExpensiveLinearPayuMS

if TYPE_CHECKING:
Expand Down Expand Up @@ -299,7 +299,9 @@ async def __aenter__(self) -> "Golem":
async def __aexit__(self, *exc_info) -> Optional[bool]:
return await self._stop_with_exc_info(*exc_info)

async def _stop_with_exc_info(self, *exc_info, wait_for_payments: bool = True) -> Optional[bool]:
async def _stop_with_exc_info(
self, *exc_info, wait_for_payments: bool = True
) -> Optional[bool]:
async with self._engine_state_lock:
res = await self._engine.stop(*exc_info, wait_for_payments=wait_for_payments)
await self._event_dispatcher.stop()
Expand Down
17 changes: 9 additions & 8 deletions yapapi/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,18 @@ def deserialize(
) -> "Network":
network = cls(
net_api,
ip=obj_dict.get("ip"),
owner_id=obj_dict.get("owner_id"),
owner_ip=obj_dict.get("owner_ip"),
ip=obj_dict["ip"],
owner_id=obj_dict["owner_id"],
owner_ip=obj_dict["owner_ip"],
gateway=obj_dict.get("gateway"),
)
network._network_id = obj_dict.get("_network_id")
network._state_machine.current_state_value = obj_dict.get("state")
if obj_dict.get("nodes"):
for _id, ip in obj_dict.get("nodes").items():
network._nodes[_id] = Node(network=network, node_id=_id, ip=ip)
network._network_id = obj_dict["_network_id"]
network._state_machine.current_state_value = obj_dict["state"]
for _id, ip in obj_dict["nodes"].items():
network._nodes[_id] = Node(network=network, node_id=_id, ip=ip)
return network


class NetworkSerialization(TypedDict):
_network_id: str
ip: str
Expand All @@ -359,5 +359,6 @@ class NetworkSerialization(TypedDict):
state: str
nodes: Dict[str, str]


class NetworkError(Exception):
"""Exception raised by :class:`Network` when an operation is not possible."""
4 changes: 1 addition & 3 deletions yapapi/rest/market.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ class Agreement(object):
"""Mid-level interface to the REST's Agreement model."""

def __init__(
self, api: RequestorApi,
agreement_id: str,
subscription: Optional["Subscription"] = None
self, api: RequestorApi, agreement_id: str, subscription: Optional["Subscription"] = None
):
self._api = api
self._subscription = subscription
Expand Down
2 changes: 1 addition & 1 deletion yapapi/services/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .cluster import Cluster
from .service import Service, ServiceInstance, ServiceType, ServiceSerialization
from .service import Service, ServiceInstance, ServiceSerialization, ServiceType
from .service_runner import ServiceRunner
from .service_state import ServiceState

Expand Down
15 changes: 6 additions & 9 deletions yapapi/services/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from yapapi.network import Network
from yapapi.payload import Payload

from .service import ServiceType, ServiceSerialization
from .service import ServiceSerialization, ServiceType
from .service_runner import ServiceRunner

DEFAULT_SERVICE_EXPIRATION: Final[timedelta] = timedelta(minutes=180)
Expand Down Expand Up @@ -166,23 +166,19 @@ def spawn_instances(
self.service_runner.add_instance(service, self.network, network_address)
service._set_cluster(self)

def resume_instances(self,
serialized_instances: List[ServiceSerialization],
):
def resume_instances(self, serialized_instances: List[ServiceSerialization]):
for service_obj in serialized_instances:
service = self.service_class(**service_obj.get('params', dict()))
service = self.service_class(**service_obj.get("params", dict()))
self.service_runner.add_existing_instance(
service,
service_obj.get("state"),
service_obj["state"],
service_obj.get("agreement_id"),
service_obj.get("activity_id"),
self.network,
service_obj.get("network_node", dict()),
)
service._set_cluster(self)



def _resolve_instance_params(
self,
num_instances: Optional[int],
Expand All @@ -205,7 +201,8 @@ def _resolve_instance_params(
f"`instance_params` iterable depleted after {i} spawned instances."
)

def _default_expiration(self):
@staticmethod
def _default_expiration():
return datetime.now(timezone.utc) + DEFAULT_SERVICE_EXPIRATION

def serialize_instances(self) -> List[ServiceSerialization]:
Expand Down
22 changes: 15 additions & 7 deletions yapapi/services/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Service:
_network_node: Optional[Node] = None

def __init__(self, _id: Optional[str] = None):
self.__id = _id or str(uuid.uuid4())
self.__id = _id or str(uuid.uuid4())

self.__inqueue: asyncio.Queue[ServiceSignal] = asyncio.Queue()
self.__outqueue: asyncio.Queue[ServiceSignal] = asyncio.Queue()
Expand Down Expand Up @@ -433,18 +433,26 @@ def _serialize_init_params(self) -> Dict[str, Any]:
def serialize(self) -> "ServiceSerialization":
return {
"params": self._serialize_init_params(),
"activity_id": self._ctx._activity.id,
"agreement_id": self._ctx._agreement.id,
"activity_id": self._ctx._activity.id if self._ctx else None,
"agreement_id": self._ctx._agreement.id if self._ctx else None,
"state": self.state.value,
"network_node": {"network_id": self._network_node.network.network_id, "node_id": self._network_node.node_id, "ip": self._network_node.ip, }
"network_node": {
"network_id": self._network_node.network.network_id,
"node_id": self._network_node.node_id,
"ip": self._network_node.ip,
}
if self._network_node
else None,
}


class ServiceSerialization(TypedDict):
params: Dict[str, Any]
activity_id: str
agreement_id: str
activity_id: Optional[str]
agreement_id: Optional[str]
state: str
network_node: Dict[str, str]
network_node: Optional[Dict[str, str]]


ServiceType = TypeVar("ServiceType", bound=Service)

Expand Down
Loading

0 comments on commit 9366cfc

Please sign in to comment.