diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index c7b6bdbbf..bea31bb0c 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -128,7 +128,7 @@ def __init__( network: Optional[str] = None, event_consumer: Optional[Callable[[Event], None]] = None, stream_output: bool = False, - app_key: Optional[str] = None + app_key: Optional[str] = None, ): """ Base execution engine containing functions common to all modes of operation @@ -198,9 +198,7 @@ async def create_demand_builder( builder.add(NodeInfo(subnet_tag=self._subnet)) if self._subnet: builder.ensure(f"({NodeInfoKeys.subnet_tag}={self._subnet})") - await builder.decorate( - self.payment_decoration, self.strategy, payload - ) + await builder.decorate(self.payment_decoration, self.strategy, payload) return builder def _init_api(self, app_key: Optional[str] = None): @@ -280,9 +278,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): self._services, timeout=10, return_when=asyncio.ALL_COMPLETED ) if pending: - logger.debug( - "%s still running: %s", pluralize(len(pending), "service"), pending - ) + logger.debug("%s still running: %s", pluralize(len(pending), "service"), pending) except Exception: # TODO: add message logger.debug("TODO", exc_info=True) @@ -422,7 +418,9 @@ async def process_debit_notes(self) -> None: if self._payment_closing and not self._agreements_to_pay: break - async def accept_payment_for_agreement(self, agreement_id: str, *, partial: bool = False) -> None: + async def accept_payment_for_agreement( + self, agreement_id: str, *, partial: bool = False + ) -> None: self.emit(events.PaymentPrepared(agr_id=agreement_id)) inv = self._invoices.get(agreement_id) if inv is None: @@ -433,9 +431,7 @@ async def accept_payment_for_agreement(self, agreement_id: str, *, partial: bool allocation = self._get_allocation(inv) await inv.accept(amount=inv.amount, allocation=allocation) self.emit( - events.PaymentAccepted( - agr_id=agreement_id, inv_id=inv.invoice_id, amount=inv.amount - ) + events.PaymentAccepted(agr_id=agreement_id, inv_id=inv.invoice_id, amount=inv.amount) ) def approve_agreement_payments(self, agreement_id): @@ -458,26 +454,24 @@ async def decorate_demand(self, demand: DemandBuilder): demand.properties.update({p.key: p.value for p in self.market_decoration.properties}) async def execute_tasks( - self, - worker: Callable[ - [WorkContext, AsyncIterator[Task[D, R]]], - AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], - ], - data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], - payload: Payload, - max_workers: Optional[int] = None, - timeout: Optional[timedelta] = None, - budget: Optional[Union[float, Decimal]] = None, + self, + worker: Callable[ + [WorkContext, AsyncIterator[Task[D, R]]], + AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]], + ], + data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], + payload: Payload, + max_workers: Optional[int] = None, + timeout: Optional[timedelta] = None, + budget: Optional[Union[float, Decimal]] = None, ) -> AsyncIterator[Task[D, R]]: - kwargs: Dict[str, Any] = { - 'payload': payload - } + kwargs: Dict[str, Any] = {"payload": payload} if max_workers: - kwargs['max_workers'] = max_workers + kwargs["max_workers"] = max_workers if timeout: - kwargs['timeout'] = timeout - kwargs['budget'] = budget if budget is not None else self._budget_amount + kwargs["timeout"] = timeout + kwargs["budget"] = budget if budget is not None else self._budget_amount async with Executor(_engine=self, **kwargs) as executor: async for t in executor.submit(worker, data): @@ -485,8 +479,7 @@ async def execute_tasks( async def create_activity(self, agreement_id: str): return await self._activity_api.new_activity( - agreement_id, - stream_events=self._stream_output + agreement_id, stream_events=self._stream_output ) async def process_batches( @@ -569,10 +562,10 @@ class Job: """Functionality related to a single job.""" def __init__( - self, - engine: Golem, - expiration_time: datetime, - payload: Payload, + self, + engine: Golem, + expiration_time: datetime, + payload: Payload, ): self.engine = engine self.offers_collected: int = 0 @@ -584,9 +577,9 @@ def __init__( self.finished = asyncio.Event() async def _handle_proposal( - self, - proposal: OfferProposal, - demand_builder: DemandBuilder, + self, + proposal: OfferProposal, + demand_builder: DemandBuilder, ) -> events.Event: """Handle a single `OfferProposal`. @@ -642,7 +635,9 @@ async def reject_proposal(reason: str) -> events.ProposalRejected: return events.ProposalConfirmed(prop_id=proposal.id) async def _find_offers_for_subscription( - self, subscription: Subscription, demand_builder: DemandBuilder, + self, + subscription: Subscription, + demand_builder: DemandBuilder, ) -> None: """Create a market subscription and repeatedly collect offer proposals for it. @@ -664,7 +659,9 @@ async def _find_offers_for_subscription( async for proposal in proposals: - self.engine.emit(events.ProposalReceived(prop_id=proposal.id, provider_id=proposal.issuer)) + self.engine.emit( + events.ProposalReceived(prop_id=proposal.id, provider_id=proposal.issuer) + ) self.offers_collected += 1 async def handler(proposal_): @@ -696,9 +693,7 @@ async def find_offers(self) -> None: When the subscription expires, create a new one. And so on... """ - builder = await self.engine.create_demand_builder( - self.expiration_time, self.payload - ) + builder = await self.engine.create_demand_builder(self.expiration_time, self.payload) while True: try: @@ -745,7 +740,7 @@ def __init__( payload: Optional[Payload] = None, max_workers: int = 5, timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT, - _engine: Golem + _engine: Golem, ): # A variant with explicit `_engine` ... @@ -786,7 +781,6 @@ def __init__( # Standalone usage, with `package` parameter ... - def __init__( self, *, @@ -835,7 +829,7 @@ def __init__( else: warnings.warn( "stand-alone usage is deprecated, please `Golem.execute_task` class instead ", - DeprecationWarning + DeprecationWarning, ) self._engine = Golem( budget=budget, @@ -844,7 +838,7 @@ def __init__( driver=driver, network=network, event_consumer=event_consumer, - stream_output=stream_output + stream_output=stream_output, ) self.__standalone = True @@ -900,11 +894,7 @@ async def submit( :return: yields computation progress events """ - job = Job( - self._engine, - expiration_time=self._expires, - payload=self._payload - ) + job = Job(self._engine, expiration_time=self._expires, payload=self._payload) self._engine.add_job(job) services: Set[asyncio.Task] = set() @@ -997,14 +987,13 @@ async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> with work_queue.new_consumer() as consumer: try: + async def task_generator() -> AsyncIterator[Task[D, R]]: async for handle in consumer: task = Task.for_handle(handle, work_queue, self.emit) self._engine.emit( events.TaskStarted( - agr_id=agreement.id, - task_id=task.id, - task_data=task.data + agr_id=agreement.id, task_id=task.id, task_data=task.data ) ) yield task @@ -1029,10 +1018,7 @@ async def worker_starter() -> None: while True: await asyncio.sleep(2) await job.agreements_pool.cycle() - if ( - len(workers) < self._max_workers - and await work_queue.has_unassigned_items() - ): + if len(workers) < self._max_workers and await work_queue.has_unassigned_items(): new_task = None try: new_task = await job.agreements_pool.use_agreement(