Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Kuba Mazurek <[email protected]>
Co-authored-by: shadeofblue <[email protected]>
  • Loading branch information
3 people committed Jun 2, 2021
1 parent ebcdbf4 commit 761f820
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
28 changes: 15 additions & 13 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class NoPaymentAccountError(Exception):
def __init__(self, required_driver: str, required_network: str):
"""Initialize `NoPaymentAccountError`.
:param required_driver: payment driver for which initialization has been required
:param required_network: payment network for which initialization has been required
:param required_driver: payment driver for which initialization was required
:param required_network: payment network for which initialization was required
"""
self.required_driver: str = required_driver
self.required_network: str = required_network
Expand Down Expand Up @@ -142,7 +142,7 @@ def __init__(
stream_output: bool = False,
app_key: Optional[str] = None,
):
"""Initialize a Golem engine.
"""Initialize the Golem engine.
:param budget: maximum budget for payments
:param strategy: market strategy used to select providers from the market
Expand Down Expand Up @@ -212,7 +212,7 @@ async def create_demand_builder(
builder.add(NodeInfo(subnet_tag=self._subnet))
if self._subnet:
builder.ensure(f"({NodeInfoKeys.subnet_tag}={self._subnet})")
await builder.decorate(self.payment_decoration, self.strategy, payload)
await builder.decorate(self.payment_decorator, self.strategy, payload)
return builder

@property
Expand Down Expand Up @@ -265,7 +265,7 @@ def report_shutdown(*exc_info):
payment_client = await stack.enter_async_context(self._api_config.payment())
self._payment_api = rest.Payment(payment_client)

self.payment_decoration = Golem.PaymentDecoration(await self._create_allocations())
self.payment_decorator = Golem.PaymentDecorator(await self._create_allocations())

# TODO: make the method starting the process_invoices() task an async context manager
# to simplify code in __aexit__()
Expand Down Expand Up @@ -468,7 +468,7 @@ async def accept_payments_for_agreement(
events.PaymentAccepted(agr_id=agreement_id, inv_id=inv.invoice_id, amount=inv.amount)
)

def approve_debit_notes_for_agreement(self, agreement_id):
def accept_debit_notes_for_agreement(self, agreement_id: str) -> None:
"""Add given agreement to the set of agreements for which debit notes should be accepted."""
self._agreements_accepting_debit_notes.add(agreement_id)

Expand All @@ -482,7 +482,7 @@ def finalize_job(job: "Job"):
job.finished.set()

@dataclass
class PaymentDecoration(DemandDecorator):
class PaymentDecorator(DemandDecorator):
"""A `DemandDecorator` that adds payment-related constraints and properties to a Demand."""

market_decoration: rest.payment.MarketDecoration
Expand Down Expand Up @@ -595,7 +595,7 @@ async def execute_tasks(
:param worker: an async generator that takes a `WorkContext` object and a sequence
of tasks, and generates as sequence of work items to be executed on providers in order
to compute given tasks
:param data: an iterator of `Task` objects to be computed on providers
:param data: an iterable or an async generator of `Task` objects to be computed on providers
:param payload: specification of the payload that needs to be deployed on providers
(for example, a VM runtime package) in order to compute the tasks, passed to
the created `Executor` instance
Expand Down Expand Up @@ -626,10 +626,11 @@ async def run_service(
"""Run a number of instances of a service represented by a given `Service` subclass.
:param service_class: a subclass of `Service` that represents the service to be run
:param num_instances: the number of service instances to run
:param num_instances: optional number of service instances to run, defaults to a single
instance
:param payload: optional runtime definition for the service; if not provided, the
payload specified by the `get_payload()` method of `service_class` is used
:param expiration:optional expiration date for the service
:param expiration: optional expiration datetime for the service
:return: a `Cluster` of service instances
"""

Expand Down Expand Up @@ -1003,8 +1004,9 @@ async def submit(
"""Submit a computation to be executed on providers.
:param worker: a callable that takes a WorkContext object and a list o tasks,
adds commands to the context object and yields committed commands
:param data: an iterator of Task objects to be computed on providers
adds commands to the context object and yields committed commands
:param data: an iterable or an async generator iterator of Task objects to be computed
on providers
:return: yields computation progress events
"""

Expand Down Expand Up @@ -1094,7 +1096,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
async with act:

self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id))
self._engine.approve_debit_notes_for_agreement(agreement.id)
self._engine.accept_debit_notes_for_agreement(agreement.id)
work_context = WorkContext(
f"worker-{wid}", node_info, self._engine.storage_manager, emitter=self.emit
)
Expand Down
13 changes: 8 additions & 5 deletions yapapi/executor/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ class Service:
"""

def __init__(self, cluster: "Cluster", ctx: WorkContext):
"""Initialize a service for a specific cluster of instances and a specific work context.
"""Initialize the service instance for a specific Cluster and a specific WorkContext.
:param cluster: a cluster to which an instance of this service belongs
:param cluster: a cluster to which this service instance of this service belongs
:param ctx: a work context object for executing commands on a provider that runs this
service instance.
"""
Expand All @@ -87,7 +87,10 @@ def __init__(self, cluster: "Cluster", ctx: WorkContext):

@property
def id(self):
"""Return the id of the work context associated with this service instance."""
"""Return the id of this service instance.
Guaranteed to be unique within a Cluster.
"""
return self._ctx.id

@property
Expand Down Expand Up @@ -213,7 +216,7 @@ def __init__(
num_instances: int = 1,
expiration: Optional[datetime] = None,
):
"""Initialize a Cluster of service instances.
"""Initialize this Cluster.
:param engine: an engine for running service instance
:param service_class: service specification
Expand Down Expand Up @@ -417,7 +420,7 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->
async with act:
spawned = True
self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id))
self._engine.approve_debit_notes_for_agreement(agreement.id)
self._engine.accept_debit_notes_for_agreement(agreement.id)
work_context = WorkContext(
act.id, node_info, self._engine.storage_manager, emitter=self.emit
)
Expand Down

0 comments on commit 761f820

Please sign in to comment.