diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 0cc302e09..d99e3c3c0 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -1,6 +1,4 @@ -""" -An implementation of the new Golem's task executor. -""" +"""An implementation of the new Golem's task executor.""" import asyncio from asyncio import CancelledError import contextlib @@ -49,7 +47,7 @@ from ..rest.activity import CommandExecutionError from ..rest.market import OfferProposal, Subscription from ..storage import gftp -from ._smartq import Consumer, Handle, SmartQueue +from ._smartq import SmartQueue if TYPE_CHECKING: from .services import Cluster, Service @@ -92,6 +90,11 @@ class NoPaymentAccountError(Exception): """Network required for the account.""" def __init__(self, required_driver: str, required_network: str): + """Initialize `NoPaymentAccountError`. + + :param required_driver: payment driver for which initialization was required + :param required_network: payment network for which initialization was required + """ self.required_driver: str = required_driver self.required_network: str = required_network @@ -106,8 +109,9 @@ def __str__(self) -> str: """The type of items yielded by a generator created by the `worker` function supplied by user.""" -def unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]: +def _unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]: """Extract `Work` object and options from a work item. + If the item does not specify options, default ones are provided. """ if isinstance(item, tuple): @@ -124,6 +128,8 @@ def unpack_work_item(item: WorkItem) -> Tuple[Work, ExecOptions]: class Golem(AsyncContextManager): + """Base execution engine containing functions common to all modes of operation.""" + def __init__( self, *, @@ -136,8 +142,7 @@ def __init__( stream_output: bool = False, app_key: Optional[str] = None, ): - """ - Base execution engine containing functions common to all modes of operation + """Initialize the Golem engine. :param budget: maximum budget for payments :param strategy: market strategy used to select providers from the market @@ -207,30 +212,37 @@ async def create_demand_builder( builder.add(NodeInfo(subnet_tag=self._subnet)) if self._subnet: builder.ensure(f"({NodeInfoKeys.subnet_tag}={self._subnet})") - await builder.decorate(self.payment_decoration, self.strategy, payload) + await builder.decorate(self.payment_decorator, self.strategy, payload) return builder @property def driver(self) -> str: + """Return the name of the payment driver used by this engine.""" return self._driver @property def network(self) -> str: + """Return the name of the payment network used by this engine.""" return self._network @property def storage_manager(self): + """Return the storage manager used by this engine.""" return self._storage_manager @property def strategy(self) -> MarketStrategy: + """Return the instance of `MarketStrategy` used by this engine.""" return self._strategy - def emit(self, *args, **kwargs) -> None: + def emit(self, event: events.Event) -> None: + """Emit an event to be consumed by this engine's event consumer.""" if self._wrapped_consumer: - self._wrapped_consumer.async_call(*args, **kwargs) + self._wrapped_consumer.async_call(event) async def __aenter__(self) -> "Golem": + """Initialize resources and start background services used by this engine.""" + try: stack = self._stack @@ -253,14 +265,14 @@ def report_shutdown(*exc_info): payment_client = await stack.enter_async_context(self._api_config.payment()) self._payment_api = rest.Payment(payment_client) - self.payment_decoration = Golem.PaymentDecoration(await self._create_allocations()) + self.payment_decorator = Golem.PaymentDecorator(await self._create_allocations()) # TODO: make the method starting the process_invoices() task an async context manager # to simplify code in __aexit__() loop = asyncio.get_event_loop() - self._process_invoices_job = loop.create_task(self.process_invoices()) + self._process_invoices_job = loop.create_task(self._process_invoices()) self._services.add(self._process_invoices_job) - self._services.add(loop.create_task(self.process_debit_notes())) + self._services.add(loop.create_task(self._process_debit_notes())) self._storage_manager = await stack.enter_async_context(gftp.provider()) @@ -329,8 +341,8 @@ async def _create_allocations(self) -> rest.payment.MarketDecoration: network = account.network.lower() if (driver, network) != (self._driver, self._network): logger.debug( - f"Not using payment platform `%s`, platform's driver/network " - f"`%s`/`%s` is different than requested driver/network `%s`/`%s`", + "Not using payment platform `%s`, platform's driver/network " + "`%s`/`%s` is different than requested driver/network `%s`/`%s`", account.platform, driver, network, @@ -372,7 +384,9 @@ def _get_allocation( except: raise ValueError(f"No allocation for {item.payment_platform} {item.payer_addr}.") - async def process_invoices(self) -> None: + async def _process_invoices(self) -> None: + """Process incoming invoices.""" + async for invoice in self._payment_api.incoming_invoices(): if invoice.agreement_id in self._agreements_to_pay: self.emit( @@ -409,8 +423,9 @@ async def process_invoices(self) -> None: if self._payment_closing and not self._agreements_to_pay: break - # TODO Consider processing invoices and debit notes together - async def process_debit_notes(self) -> None: + async def _process_debit_notes(self) -> None: + """Process incoming debit notes.""" + async for debit_note in self._payment_api.incoming_debit_notes(): if debit_note.agreement_id in self._agreements_accepting_debit_notes: self.emit( @@ -436,9 +451,11 @@ async def process_debit_notes(self) -> None: if self._payment_closing and not self._agreements_to_pay: break - async def accept_payment_for_agreement( + async def accept_payments_for_agreement( self, agreement_id: str, *, partial: bool = False ) -> None: + """Add given agreement to the set of agreements for which invoices should be accepted.""" + self.emit(events.PaymentPrepared(agr_id=agreement_id)) inv = self._invoices.get(agreement_id) if inv is None: @@ -452,26 +469,33 @@ async def accept_payment_for_agreement( events.PaymentAccepted(agr_id=agreement_id, inv_id=inv.invoice_id, amount=inv.amount) ) - def approve_agreement_payments(self, agreement_id): + def accept_debit_notes_for_agreement(self, agreement_id: str) -> None: + """Add given agreement to the set of agreements for which debit notes should be accepted.""" self._agreements_accepting_debit_notes.add(agreement_id) def add_job(self, job: "Job"): + """Register a job with this engine.""" self._jobs.add(job) @staticmethod def finalize_job(job: "Job"): + """Mark a job as finished.""" job.finished.set() @dataclass - class PaymentDecoration(DemandDecorator): + class PaymentDecorator(DemandDecorator): + """A `DemandDecorator` that adds payment-related constraints and properties to a Demand.""" + market_decoration: rest.payment.MarketDecoration async def decorate_demand(self, demand: DemandBuilder): + """Add properties and constraints to a Demand.""" for constraint in self.market_decoration.constraints: demand.ensure(constraint) demand.properties.update({p.key: p.value for p in self.market_decoration.properties}) async def create_activity(self, agreement_id: str): + """Create an activity for given `agreement_id`.""" return await self._activity_api.new_activity( agreement_id, stream_events=self._stream_output ) @@ -480,7 +504,7 @@ async def process_batches( self, agreement_id: str, activity: rest.activity.Activity, - command_generator: AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], + command_generator: AsyncGenerator[WorkItem, Awaitable[List[events.CommandEvent]]], ) -> None: """Send command batches produced by `command_generator` to `activity`.""" @@ -488,7 +512,7 @@ async def process_batches( while True: - batch, exec_options = unpack_work_item(item) + batch, exec_options = _unpack_work_item(item) # TODO: `task_id` should really be `batch_id`, but then we should also rename # `task_id` field of several events (e.g. `ScriptSent`) @@ -528,7 +552,7 @@ async def get_batch_results() -> List[events.CommandEvent]: self.emit(events.GettingResults(agr_id=agreement_id, script_id=script_id)) await batch.post() self.emit(events.ScriptFinished(agr_id=agreement_id, script_id=script_id)) - await self.accept_payment_for_agreement(agreement_id, partial=True) + await self.accept_payments_for_agreement(agreement_id, partial=True) return results loop = asyncio.get_event_loop() @@ -539,7 +563,7 @@ async def get_batch_results() -> List[events.CommandEvent]: future_results = loop.create_future() results = await get_batch_results() future_results.set_result(results) - except Exception as e: + except Exception: # Raise the exception in `command_generator` (the `worker` coroutine). # If the client code is able to handle it then we'll proceed with # subsequent batches. Otherwise the worker finishes with error. @@ -556,7 +580,7 @@ async def execute_tasks( self, worker: Callable[ [WorkContext, AsyncIterator[Task[D, R]]], - AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], + AsyncGenerator[WorkItem, Awaitable[List[events.CommandEvent]]], ], data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], payload: Payload, @@ -564,6 +588,23 @@ async def execute_tasks( timeout: Optional[timedelta] = None, budget: Optional[Union[float, Decimal]] = None, ) -> AsyncIterator[Task[D, R]]: + """Submit a sequence of tasks to be executed on providers. + + Internally, this method creates an instance of `yapapi.executor.Executor` + and calls its `submit()` method with given worker function and sequence of tasks. + + :param worker: an async generator that takes a `WorkContext` object and a sequence + of tasks, and generates as sequence of work items to be executed on providers in order + to compute given tasks + :param data: an iterable or an async generator of `Task` objects to be computed on providers + :param payload: specification of the payload that needs to be deployed on providers + (for example, a VM runtime package) in order to compute the tasks, passed to + the created `Executor` instance + :param max_workers: maximum number of concurrent workers, passed to the `Executor` instance + :param timeout: timeout for computing all tasks, passed to the `Executor` instance + :param budget: budget for computing all tasks, passed to the `Executor` instance + :return: an iterator that yields completed `Task` objects + """ kwargs: Dict[str, Any] = {"payload": payload} if max_workers: @@ -583,13 +624,25 @@ async def run_service( payload: Optional[Payload] = None, expiration: Optional[datetime] = None, ) -> "Cluster": + """Run a number of instances of a service represented by a given `Service` subclass. + + :param service_class: a subclass of `Service` that represents the service to be run + :param num_instances: optional number of service instances to run, defaults to a single + instance + :param payload: optional runtime definition for the service; if not provided, the + payload specified by the `get_payload()` method of `service_class` is used + :param expiration: optional expiration datetime for the service + :return: a `Cluster` of service instances + """ + from .services import Cluster # avoid circular dependency payload = payload or await service_class.get_payload() if not payload: raise ValueError( - f"No payload returned from {service_class.__name__}.get_payload() nor given in the `payload` argument." + f"No payload returned from {service_class.__name__}.get_payload()" + " nor given in the `payload` argument." ) cluster = Cluster( @@ -605,7 +658,10 @@ async def run_service( class Job: - """Functionality related to a single job.""" + """Functionality related to a single job. + + Responsible for posting a Demand to market and collecting Offer proposals for the Demand. + """ def __init__( self, @@ -613,6 +669,15 @@ def __init__( expiration_time: datetime, payload: Payload, ): + """Initialize a `Job` instance. + + param engine: a `Golem` engine which will run this job + param expiration_time: expiration time for the job; all agreements created for this job + must expire before this date + param payload: definition of a service runtime or a runtime package that needs to + be deployed on providers for executing this job + """ + self.id = str(uuid.uuid4()) self.engine = engine self.offers_collected: int = 0 @@ -712,7 +777,7 @@ async def _find_offers_for_subscription( self.offers_collected += 1 async def handler(proposal_): - """A coroutine that wraps `_handle_proposal()` method with error handling.""" + """Wrap `_handle_proposal()` method with error handling.""" try: event = await self._handle_proposal(proposal_, demand_builder) assert isinstance(event, events.ProposalEvent) @@ -737,6 +802,7 @@ async def handler(proposal_): async def find_offers(self) -> None: """Create demand subscription and process offers. + When the subscription expires, create a new one. And so on... """ @@ -773,10 +839,10 @@ def _get_common_payment_platforms(self, proposal: rest.market.OfferProposal) -> class Executor(AsyncContextManager): - """ - Task executor. + """Task executor. - Used to run batch tasks using the specified application package within providers' execution units. + Used to run batch tasks using the specified application package within providers' + execution units. """ @overload @@ -788,8 +854,7 @@ def __init__( timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT, _engine: Golem, ): - # A variant with explicit `_engine` - ... + """Initialize the `Executor` to use a specific Golem `_engine`.""" @overload def __init__( @@ -806,8 +871,7 @@ def __init__( max_workers: int = 5, timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT, ): - # Standalone usage, with `payload` parameter - ... + """Initialize the `Executor` for standalone usage, with `payload` parameter.""" @overload def __init__( @@ -824,8 +888,7 @@ def __init__( timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT, package: Optional[Payload] = None, ): - # Standalone usage, with `package` parameter - ... + """Initialize the `Executor` for standalone usage, with `package` parameter.""" def __init__( self, @@ -843,7 +906,7 @@ def __init__( payload: Optional[Payload] = None, _engine: Optional[Golem] = None, ): - """Create a new executor. + """Initialize an `Executor`. :param budget: [DEPRECATED use `Golem` instead] maximum budget for payments :param strategy: [DEPRECATED use `Golem` instead] market strategy used to @@ -861,11 +924,10 @@ def __init__( by default it is a function that logs all events :param stream_output: [DEPRECATED use `Golem` instead] stream computation output from providers - - :param max_workers: maximum number of workers performing the computation + :param max_workers: maximum number of concurrent workers performing the computation + :param payload: specification of payload (for example a VM package) that needs to be + deployed on providers in order to compute tasks with this Executor :param timeout: timeout for the whole computation - :param package: a package common for all tasks; vm.repo() function may be used - to return package from a repository """ logger.debug("Creating Executor instance; parameters: %s", locals()) self.__standalone = False @@ -895,7 +957,8 @@ def __init__( if payload: raise ValueError("Cannot use `payload` and `package` at the same time") logger.warning( - f"`package` argument to `{self.__class__}` is deprecated, please use `payload` instead" + f"`package` argument to `{self.__class__}` is deprecated," + " please use `payload` instead" ) payload = package if not payload: @@ -908,37 +971,43 @@ def __init__( @property def driver(self) -> str: + """Return the payment driver used for this `Executor`'s engine.""" return self._engine.driver @property def network(self) -> str: + """Return the payment network used for this `Executor`'s engine.""" return self._engine.network async def __aenter__(self) -> "Executor": + """Start computation using this `Executor`.""" if self.__standalone: await self._stack.enter_async_context(self._engine) self._expires = datetime.now(timezone.utc) + self._timeout return self async def __aexit__(self, exc_type, exc_val, exc_tb): + """Release resources used by this `Executor`.""" await self._stack.aclose() def emit(self, event: events.Event) -> None: + """Emit a computation event using this `Executor`'s engine.""" self._engine.emit(event) async def submit( self, worker: Callable[ [WorkContext, AsyncIterator[Task[D, R]]], - AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], + AsyncGenerator[WorkItem, Awaitable[List[events.CommandEvent]]], ], data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], ) -> AsyncIterator[Task[D, R]]: """Submit a computation to be executed on providers. :param worker: a callable that takes a WorkContext object and a list o tasks, - adds commands to the context object and yields committed commands - :param data: an iterator of Task objects to be computed on providers + adds commands to the context object and yields committed commands + :param data: an iterable or an async generator iterator of Task objects to be computed + on providers :return: yields computation progress events """ @@ -975,7 +1044,7 @@ async def _submit( self, worker: Callable[ [WorkContext, AsyncIterator[Task[D, R]]], - AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], + AsyncGenerator[WorkItem, Awaitable[List[events.CommandEvent]]], ], data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], services: Set[asyncio.Task], @@ -1028,7 +1097,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> async with act: self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id)) - self._engine.approve_agreement_payments(agreement.id) + self._engine.accept_debit_notes_for_agreement(agreement.id) work_context = WorkContext( f"worker-{wid}", node_info, self._engine.storage_manager, emitter=self.emit ) @@ -1063,7 +1132,7 @@ async def task_generator() -> AsyncIterator[Task[D, R]]: ) raise finally: - await self._engine.accept_payment_for_agreement(agreement.id) + await self._engine.accept_payments_for_agreement(agreement.id) async def worker_starter() -> None: while True: diff --git a/yapapi/executor/_smartq.py b/yapapi/executor/_smartq.py index 52e671c07..d1d20f0a0 100644 --- a/yapapi/executor/_smartq.py +++ b/yapapi/executor/_smartq.py @@ -148,9 +148,7 @@ async def mark_done(self, handle: Handle[Item]) -> None: self._new_items.notify_all() if _logger.isEnabledFor(logging.DEBUG): stats = self.stats() - _logger.debug( - "status: " + ", ".join(f"{key}: {val}" for key, val in self.stats().items()) - ) + _logger.debug("status: " + ", ".join(f"{key}: {val}" for key, val in stats.items())) async def reschedule(self, handle: Handle[Item]) -> None: """Free the item for reassignment to another consumer.""" diff --git a/yapapi/executor/ctx.py b/yapapi/executor/ctx.py index e25eb9ab8..9248806ba 100644 --- a/yapapi/executor/ctx.py +++ b/yapapi/executor/ctx.py @@ -102,7 +102,7 @@ def __init__(self, storage: StorageProvider, data: bytes, dst_path: str): self._data: Optional[bytes] = data async def do_upload(self, storage: StorageProvider) -> Source: - assert self._data is not None, f"buffer unintialized" + assert self._data is not None, "buffer unintialized" src = await storage.upload_bytes(self._data) self._data = None return src diff --git a/yapapi/executor/events.py b/yapapi/executor/events.py index e95e6b2b7..9a2417904 100644 --- a/yapapi/executor/events.py +++ b/yapapi/executor/events.py @@ -2,7 +2,6 @@ import dataclasses from datetime import datetime, timedelta from dataclasses import dataclass -import json import logging from types import TracebackType from typing import Any, Optional, Type, Tuple, List diff --git a/yapapi/executor/services.py b/yapapi/executor/services.py index be4350454..0bf5387fc 100644 --- a/yapapi/executor/services.py +++ b/yapapi/executor/services.py @@ -1,3 +1,4 @@ +"""Implementation of high-level services API.""" import asyncio import itertools from dataclasses import dataclass, field @@ -28,16 +29,17 @@ logger = logging.getLogger(__name__) -# current default for yagna providers as of yagna 0.6.x -DEFAULT_SERVICE_EXPIRATION: Final[timedelta] = timedelta(minutes=175) +# current defaults for yagna providers as of yagna 0.6.x, see +# https://github.com/golemfactory/yagna/blob/c37dbd1a2bc918a511eed12f2399eb9fd5bbf2a2/agent/provider/src/market/negotiator/factory.rs#L20 +MIN_AGREEMENT_EXPIRATION: Final[timedelta] = timedelta(minutes=5) +MAX_AGREEMENT_EXPIRATION: Final[timedelta] = timedelta(minutes=180) +DEFAULT_SERVICE_EXPIRATION: Final[timedelta] = MAX_AGREEMENT_EXPIRATION - timedelta(minutes=5) cluster_ids = itertools.count(1) class ServiceState(statemachine.StateMachine): - """ - State machine describing the state and lifecycle of a Service instance. - """ + """State machine describing the state and lifecycle of a Service instance.""" # states starting = statemachine.State("starting", initial=True) @@ -60,52 +62,68 @@ class ServiceState(statemachine.StateMachine): @dataclass class ServiceSignal: - """ - Simple container to carry information between the client code and the Service instance. - """ + """Simple container to carry information between the client code and the Service instance.""" message: Any response_to: Optional["ServiceSignal"] = None class Service: - """ - Base Service class to be extended by application developers to define their own, - specialized Service specifications. + """Base class for service specifications. + + To be extended by application developers to define their own, specialized + Service specifications. """ def __init__(self, cluster: "Cluster", ctx: WorkContext): + """Initialize the service instance for a specific Cluster and a specific WorkContext. + + :param cluster: a cluster to which this service instance of this service belongs + :param ctx: a work context object for executing commands on a provider that runs this + service instance. + """ self._cluster: "Cluster" = cluster self._ctx: WorkContext = ctx self.__inqueue: asyncio.Queue[ServiceSignal] = asyncio.Queue() self.__outqueue: asyncio.Queue[ServiceSignal] = asyncio.Queue() - self.post_init() - - def post_init(self): - pass @property def id(self): + """Return the id of this service instance. + + Guaranteed to be unique within a Cluster. + """ return self._ctx.id @property def provider_name(self): + """Return the name of the provider that runs this service instance.""" return self._ctx.provider_name def __repr__(self): return f"<{self.__class__.__name__}: {self.id}>" async def send_message(self, message: Any = None): + """Send a control message to this instance.""" await self.__inqueue.put(ServiceSignal(message=message)) def send_message_nowait(self, message: Optional[Any] = None): + """Send a control message to this instance without blocking. + + May raise `asyncio.QueueFull` if the channel for sending control messages is full. + """ self.__inqueue.put_nowait(ServiceSignal(message=message)) async def receive_message(self) -> ServiceSignal: + """Wait for a control message sent to this instance.""" return await self.__outqueue.get() def receive_message_nowait(self) -> Optional[ServiceSignal]: + """Retrieve a control message sent to this instance. + + Return `None` if no message is available. + """ try: return self.__outqueue.get_nowait() except asyncio.QueueEmpty: @@ -136,32 +154,38 @@ async def get_payload() -> Optional[Payload]: pass async def start(self): + """Implement the `starting` state of the service.""" + self._ctx.deploy() self._ctx.start() yield self._ctx.commit() async def run(self): + """Implement the `running` state of the service.""" + while True: await asyncio.sleep(10) yield async def shutdown(self): + """Implement the `stopping` state of the service.""" + self._ctx.terminate() yield self._ctx.commit() @property def is_available(self): + """Return `True` iff this instance is available (that is, starting, running or stopping).""" return self._cluster.get_state(self) in ServiceState.AVAILABLE @property def state(self): + """Return the current state of this instance.""" return self._cluster.get_state(self) class ControlSignal(enum.Enum): - """ - Control signal, used to request an instance's state change from the controlling Cluster. - """ + """Control signal, used to request an instance's state change from the controlling Cluster.""" stop = "stop" @@ -180,13 +204,12 @@ class ServiceInstance: @property def state(self) -> ServiceState: + """Return the current state of this instance.""" return self.service_state.current_state class Cluster(AsyncContextManager): - """ - Golem's sub-engine used to spawn and control instances of a single Service. - """ + """Golem's sub-engine used to spawn and control instances of a single Service.""" def __init__( self, @@ -196,6 +219,16 @@ def __init__( num_instances: int = 1, expiration: Optional[datetime] = None, ): + """Initialize this Cluster. + + :param engine: an engine for running service instance + :param service_class: service specification + :param payload: definition of service runtime for this Cluster + :param num_instances: number of instances to spawn in this Cluster + :param expiration: a date before which all agreements related to running services + in this Cluster should be terminated + """ + self.id = str(next(cluster_ids)) self._engine = engine @@ -213,9 +246,14 @@ def __init__( """Set of asyncio tasks that run spawn_service()""" def __repr__(self): - return f"Cluster {self.id}: {self._num_instances}x[Service: {self._service_class.__name__}, Payload: {self._payload}]" + return ( + f"Cluster {self.id}: {self._num_instances}x[Service: {self._service_class.__name__}, " + f"Payload: {self._payload}]" + ) async def __aenter__(self): + """Post a Demand and start collecting provider Offers for running service instances.""" + self.__services: Set[asyncio.Task] = set() """Asyncio tasks running within this cluster""" @@ -236,6 +274,8 @@ async def agreements_pool_cycler(): self.__services.add(loop.create_task(agreements_pool_cycler())) async def __aexit__(self, exc_type, exc_val, exc_tb): + """Release resources used by this Cluster.""" + logger.debug("%s is shutting down...", self) # Give the instance tasks some time to terminate gracefully. @@ -270,10 +310,12 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): self._engine.finalize_job(self._job) def emit(self, event: events.Event) -> None: + """Emit an event using this Cluster's engine.""" self._engine.emit(event) @property def instances(self) -> List[Service]: + """Return the list of service instances in this Cluster.""" return [i.service for i in self.__instances] def __get_service_instance(self, service: Service) -> ServiceInstance: @@ -283,6 +325,7 @@ def __get_service_instance(self, service: Service) -> ServiceInstance: assert False, f"No instance found for {service}" def get_state(self, service: Service) -> ServiceState: + """Return the state of the specific instance in this Cluster.""" instance = self.__get_service_instance(service) return instance.state @@ -369,7 +412,9 @@ async def _run_instance(self, ctx: WorkContext): logger.info("%s decomissioned", instance.service) - async def spawn_instance(self): + async def spawn_instance(self) -> None: + """Spawn a new service instance within this Cluster.""" + logger.debug("spawning instance within %s", self) spawned = False @@ -390,7 +435,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> async with act: spawned = True self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id)) - self._engine.approve_agreement_payments(agreement.id) + self._engine.accept_debit_notes_for_agreement(agreement.id) work_context = WorkContext( act.id, node_info, self._engine.storage_manager, emitter=self.emit ) @@ -425,7 +470,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> ) raise finally: - await self._engine.accept_payment_for_agreement(agreement.id) + await self._engine.accept_payments_for_agreement(agreement.id) await self._job.agreements_pool.release_agreement( agreement.id, allow_reuse=False ) @@ -441,15 +486,16 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> await task def stop_instance(self, service: Service): + """Stop the specific service instance belonging to this Cluster.""" + instance = self.__get_service_instance(service) instance.control_queue.put_nowait(ControlSignal.stop) def spawn_instances(self, num_instances: Optional[int] = None) -> None: - """ - Spawn new instances within this Cluster. + """Spawn new instances within this Cluster. :param num_instances: number of instances to commission. - if not given, spawns the number that the Cluster has been initialized with. + if not given, spawns the number that the Cluster has been initialized with. """ if num_instances: self._num_instances += num_instances diff --git a/yapapi/log.py b/yapapi/log.py index a12e41679..18307353f 100644 --- a/yapapi/log.py +++ b/yapapi/log.py @@ -53,8 +53,9 @@ import time from typing import Any, Callable, Dict, Iterator, List, Optional, Set -import yapapi.executor.events as events from yapapi import __version__ as yapapi_version +import yapapi.executor.events as events +from yapapi.executor.services import MAX_AGREEMENT_EXPIRATION, MIN_AGREEMENT_EXPIRATION from yapapi.rest.activity import CommandExecutionError event_logger = logging.getLogger("yapapi.events") @@ -350,13 +351,18 @@ def _handle(self, event: events.Event): # This means another computation run in the current Executor instance. self._print_total_cost(partial=True) timeout = event.expires - datetime.now(timezone.utc) - if not timedelta(minutes=5, seconds=5) <= timeout <= timedelta(minutes=30): + # Compute the timeout as it will be seen by providers, assuming they will see + # the Demand 5 seconds from now + provider_timeout = timeout - timedelta(seconds=5) + if not MIN_AGREEMENT_EXPIRATION <= provider_timeout <= MAX_AGREEMENT_EXPIRATION: min, sec = divmod(round(timeout.total_seconds()), 60) + seconds_str = f" {sec} sec " if sec else " " + max_minutes = round(MAX_AGREEMENT_EXPIRATION.seconds / 60) self.logger.warning( - f"Expiration time for your tasks is set to {min} min {sec} sec from now." - " Providers will probably not respond to tasks which expire sooner than 5 min" - " or later than 30 min, counting from the moment they get your demand." - " Use the `timeout` parameter to `Executor()` to adjust the timeout." + f"Expiration time for your tasks is set to {min} min{seconds_str}from now." + " Providers may not be willing to take up tasks which expire sooner than 5 min" + f" or later than {max_minutes} min, counting from the moment they get your" + " demand." ) elif isinstance(event, events.ProposalReceived):