From 761f8200cf404911360b878b3c777809cb5a2a4c Mon Sep 17 00:00:00 2001 From: azawlocki Date: Wed, 2 Jun 2021 09:12:40 +0200 Subject: [PATCH] Apply suggestions from code review Co-authored-by: Kuba Mazurek Co-authored-by: shadeofblue --- yapapi/executor/__init__.py | 28 +++++++++++++++------------- yapapi/executor/services.py | 13 ++++++++----- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 6fb2e69e3..37a6c618c 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -92,8 +92,8 @@ class NoPaymentAccountError(Exception): def __init__(self, required_driver: str, required_network: str): """Initialize `NoPaymentAccountError`. - :param required_driver: payment driver for which initialization has been required - :param required_network: payment network for which initialization has been required + :param required_driver: payment driver for which initialization was required + :param required_network: payment network for which initialization was required """ self.required_driver: str = required_driver self.required_network: str = required_network @@ -142,7 +142,7 @@ def __init__( stream_output: bool = False, app_key: Optional[str] = None, ): - """Initialize a Golem engine. + """Initialize the Golem engine. :param budget: maximum budget for payments :param strategy: market strategy used to select providers from the market @@ -212,7 +212,7 @@ async def create_demand_builder( builder.add(NodeInfo(subnet_tag=self._subnet)) if self._subnet: builder.ensure(f"({NodeInfoKeys.subnet_tag}={self._subnet})") - await builder.decorate(self.payment_decoration, self.strategy, payload) + await builder.decorate(self.payment_decorator, self.strategy, payload) return builder @property @@ -265,7 +265,7 @@ def report_shutdown(*exc_info): payment_client = await stack.enter_async_context(self._api_config.payment()) self._payment_api = rest.Payment(payment_client) - self.payment_decoration = Golem.PaymentDecoration(await self._create_allocations()) + self.payment_decorator = Golem.PaymentDecorator(await self._create_allocations()) # TODO: make the method starting the process_invoices() task an async context manager # to simplify code in __aexit__() @@ -468,7 +468,7 @@ async def accept_payments_for_agreement( events.PaymentAccepted(agr_id=agreement_id, inv_id=inv.invoice_id, amount=inv.amount) ) - def approve_debit_notes_for_agreement(self, agreement_id): + def accept_debit_notes_for_agreement(self, agreement_id: str) -> None: """Add given agreement to the set of agreements for which debit notes should be accepted.""" self._agreements_accepting_debit_notes.add(agreement_id) @@ -482,7 +482,7 @@ def finalize_job(job: "Job"): job.finished.set() @dataclass - class PaymentDecoration(DemandDecorator): + class PaymentDecorator(DemandDecorator): """A `DemandDecorator` that adds payment-related constraints and properties to a Demand.""" market_decoration: rest.payment.MarketDecoration @@ -595,7 +595,7 @@ async def execute_tasks( :param worker: an async generator that takes a `WorkContext` object and a sequence of tasks, and generates as sequence of work items to be executed on providers in order to compute given tasks - :param data: an iterator of `Task` objects to be computed on providers + :param data: an iterable or an async generator of `Task` objects to be computed on providers :param payload: specification of the payload that needs to be deployed on providers (for example, a VM runtime package) in order to compute the tasks, passed to the created `Executor` instance @@ -626,10 +626,11 @@ async def run_service( """Run a number of instances of a service represented by a given `Service` subclass. :param service_class: a subclass of `Service` that represents the service to be run - :param num_instances: the number of service instances to run + :param num_instances: optional number of service instances to run, defaults to a single + instance :param payload: optional runtime definition for the service; if not provided, the payload specified by the `get_payload()` method of `service_class` is used - :param expiration:optional expiration date for the service + :param expiration: optional expiration datetime for the service :return: a `Cluster` of service instances """ @@ -1003,8 +1004,9 @@ async def submit( """Submit a computation to be executed on providers. :param worker: a callable that takes a WorkContext object and a list o tasks, - adds commands to the context object and yields committed commands - :param data: an iterator of Task objects to be computed on providers + adds commands to the context object and yields committed commands + :param data: an iterable or an async generator iterator of Task objects to be computed + on providers :return: yields computation progress events """ @@ -1094,7 +1096,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> async with act: self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id)) - self._engine.approve_debit_notes_for_agreement(agreement.id) + self._engine.accept_debit_notes_for_agreement(agreement.id) work_context = WorkContext( f"worker-{wid}", node_info, self._engine.storage_manager, emitter=self.emit ) diff --git a/yapapi/executor/services.py b/yapapi/executor/services.py index 3fb775e70..9757193b9 100644 --- a/yapapi/executor/services.py +++ b/yapapi/executor/services.py @@ -73,9 +73,9 @@ class Service: """ def __init__(self, cluster: "Cluster", ctx: WorkContext): - """Initialize a service for a specific cluster of instances and a specific work context. + """Initialize the service instance for a specific Cluster and a specific WorkContext. - :param cluster: a cluster to which an instance of this service belongs + :param cluster: a cluster to which this service instance of this service belongs :param ctx: a work context object for executing commands on a provider that runs this service instance. """ @@ -87,7 +87,10 @@ def __init__(self, cluster: "Cluster", ctx: WorkContext): @property def id(self): - """Return the id of the work context associated with this service instance.""" + """Return the id of this service instance. + + Guaranteed to be unique within a Cluster. + """ return self._ctx.id @property @@ -213,7 +216,7 @@ def __init__( num_instances: int = 1, expiration: Optional[datetime] = None, ): - """Initialize a Cluster of service instances. + """Initialize this Cluster. :param engine: an engine for running service instance :param service_class: service specification @@ -417,7 +420,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> async with act: spawned = True self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id)) - self._engine.approve_debit_notes_for_agreement(agreement.id) + self._engine.accept_debit_notes_for_agreement(agreement.id) work_context = WorkContext( act.id, node_info, self._engine.storage_manager, emitter=self.emit )