Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
shadeofblue committed May 18, 2021
1 parent 71181ce commit 142d3a3
Showing 1 changed file with 43 additions and 57 deletions.
100 changes: 43 additions & 57 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __init__(
network: Optional[str] = None,
event_consumer: Optional[Callable[[Event], None]] = None,
stream_output: bool = False,
app_key: Optional[str] = None
app_key: Optional[str] = None,
):
"""
Base execution engine containing functions common to all modes of operation
Expand Down Expand Up @@ -198,9 +198,7 @@ async def create_demand_builder(
builder.add(NodeInfo(subnet_tag=self._subnet))
if self._subnet:
builder.ensure(f"({NodeInfoKeys.subnet_tag}={self._subnet})")
await builder.decorate(
self.payment_decoration, self.strategy, payload
)
await builder.decorate(self.payment_decoration, self.strategy, payload)
return builder

def _init_api(self, app_key: Optional[str] = None):
Expand Down Expand Up @@ -280,9 +278,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
self._services, timeout=10, return_when=asyncio.ALL_COMPLETED
)
if pending:
logger.debug(
"%s still running: %s", pluralize(len(pending), "service"), pending
)
logger.debug("%s still running: %s", pluralize(len(pending), "service"), pending)
except Exception:
# TODO: add message
logger.debug("TODO", exc_info=True)
Expand Down Expand Up @@ -422,7 +418,9 @@ async def process_debit_notes(self) -> None:
if self._payment_closing and not self._agreements_to_pay:
break

async def accept_payment_for_agreement(self, agreement_id: str, *, partial: bool = False) -> None:
async def accept_payment_for_agreement(
self, agreement_id: str, *, partial: bool = False
) -> None:
self.emit(events.PaymentPrepared(agr_id=agreement_id))
inv = self._invoices.get(agreement_id)
if inv is None:
Expand All @@ -433,9 +431,7 @@ async def accept_payment_for_agreement(self, agreement_id: str, *, partial: bool
allocation = self._get_allocation(inv)
await inv.accept(amount=inv.amount, allocation=allocation)
self.emit(
events.PaymentAccepted(
agr_id=agreement_id, inv_id=inv.invoice_id, amount=inv.amount
)
events.PaymentAccepted(agr_id=agreement_id, inv_id=inv.invoice_id, amount=inv.amount)
)

def approve_agreement_payments(self, agreement_id):
Expand All @@ -458,35 +454,32 @@ async def decorate_demand(self, demand: DemandBuilder):
demand.properties.update({p.key: p.value for p in self.market_decoration.properties})

async def execute_tasks(
self,
worker: Callable[
[WorkContext, AsyncIterator[Task[D, R]]],
AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]],
],
data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]],
payload: Payload,
max_workers: Optional[int] = None,
timeout: Optional[timedelta] = None,
budget: Optional[Union[float, Decimal]] = None,
self,
worker: Callable[
[WorkContext, AsyncIterator[Task[D, R]]],
AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]],
],
data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]],
payload: Payload,
max_workers: Optional[int] = None,
timeout: Optional[timedelta] = None,
budget: Optional[Union[float, Decimal]] = None,
) -> AsyncIterator[Task[D, R]]:

kwargs: Dict[str, Any] = {
'payload': payload
}
kwargs: Dict[str, Any] = {"payload": payload}
if max_workers:
kwargs['max_workers'] = max_workers
kwargs["max_workers"] = max_workers
if timeout:
kwargs['timeout'] = timeout
kwargs['budget'] = budget if budget is not None else self._budget_amount
kwargs["timeout"] = timeout
kwargs["budget"] = budget if budget is not None else self._budget_amount

async with Executor(_engine=self, **kwargs) as executor:
async for t in executor.submit(worker, data):
yield t

async def create_activity(self, agreement_id: str):
return await self._activity_api.new_activity(
agreement_id,
stream_events=self._stream_output
agreement_id, stream_events=self._stream_output
)

async def process_batches(
Expand Down Expand Up @@ -569,10 +562,10 @@ class Job:
"""Functionality related to a single job."""

def __init__(
self,
engine: Golem,
expiration_time: datetime,
payload: Payload,
self,
engine: Golem,
expiration_time: datetime,
payload: Payload,
):
self.engine = engine
self.offers_collected: int = 0
Expand All @@ -584,9 +577,9 @@ def __init__(
self.finished = asyncio.Event()

async def _handle_proposal(
self,
proposal: OfferProposal,
demand_builder: DemandBuilder,
self,
proposal: OfferProposal,
demand_builder: DemandBuilder,
) -> events.Event:
"""Handle a single `OfferProposal`.
Expand Down Expand Up @@ -642,7 +635,9 @@ async def reject_proposal(reason: str) -> events.ProposalRejected:
return events.ProposalConfirmed(prop_id=proposal.id)

async def _find_offers_for_subscription(
self, subscription: Subscription, demand_builder: DemandBuilder,
self,
subscription: Subscription,
demand_builder: DemandBuilder,
) -> None:
"""Create a market subscription and repeatedly collect offer proposals for it.
Expand All @@ -664,7 +659,9 @@ async def _find_offers_for_subscription(

async for proposal in proposals:

self.engine.emit(events.ProposalReceived(prop_id=proposal.id, provider_id=proposal.issuer))
self.engine.emit(
events.ProposalReceived(prop_id=proposal.id, provider_id=proposal.issuer)
)
self.offers_collected += 1

async def handler(proposal_):
Expand Down Expand Up @@ -696,9 +693,7 @@ async def find_offers(self) -> None:
When the subscription expires, create a new one. And so on...
"""

builder = await self.engine.create_demand_builder(
self.expiration_time, self.payload
)
builder = await self.engine.create_demand_builder(self.expiration_time, self.payload)

while True:
try:
Expand Down Expand Up @@ -745,7 +740,7 @@ def __init__(
payload: Optional[Payload] = None,
max_workers: int = 5,
timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT,
_engine: Golem
_engine: Golem,
):
# A variant with explicit `_engine`
...
Expand Down Expand Up @@ -786,7 +781,6 @@ def __init__(
# Standalone usage, with `package` parameter
...


def __init__(
self,
*,
Expand Down Expand Up @@ -835,7 +829,7 @@ def __init__(
else:
warnings.warn(
"stand-alone usage is deprecated, please `Golem.execute_task` class instead ",
DeprecationWarning
DeprecationWarning,
)
self._engine = Golem(
budget=budget,
Expand All @@ -844,7 +838,7 @@ def __init__(
driver=driver,
network=network,
event_consumer=event_consumer,
stream_output=stream_output
stream_output=stream_output,
)
self.__standalone = True

Expand Down Expand Up @@ -900,11 +894,7 @@ async def submit(
:return: yields computation progress events
"""

job = Job(
self._engine,
expiration_time=self._expires,
payload=self._payload
)
job = Job(self._engine, expiration_time=self._expires, payload=self._payload)
self._engine.add_job(job)

services: Set[asyncio.Task] = set()
Expand Down Expand Up @@ -997,14 +987,13 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) ->

with work_queue.new_consumer() as consumer:
try:

async def task_generator() -> AsyncIterator[Task[D, R]]:
async for handle in consumer:
task = Task.for_handle(handle, work_queue, self.emit)
self._engine.emit(
events.TaskStarted(
agr_id=agreement.id,
task_id=task.id,
task_data=task.data
agr_id=agreement.id, task_id=task.id, task_data=task.data
)
)
yield task
Expand All @@ -1029,10 +1018,7 @@ async def worker_starter() -> None:
while True:
await asyncio.sleep(2)
await job.agreements_pool.cycle()
if (
len(workers) < self._max_workers
and await work_queue.has_unassigned_items()
):
if len(workers) < self._max_workers and await work_queue.has_unassigned_items():
new_task = None
try:
new_task = await job.agreements_pool.use_agreement(
Expand Down

0 comments on commit 142d3a3

Please sign in to comment.