diff --git a/yapapi/agreements_pool.py b/yapapi/agreements_pool.py index e39df1a9d..da67ece01 100644 --- a/yapapi/agreements_pool.py +++ b/yapapi/agreements_pool.py @@ -4,7 +4,7 @@ import logging import random import sys -from typing import Dict, NamedTuple, Optional, Set, Tuple +from typing import Dict, NamedTuple, Optional, Set, Tuple, Callable from yapapi import events from yapapi.props import Activity, NodeInfo @@ -68,8 +68,10 @@ async def add_proposal(self, score: float, proposal: OfferProposal) -> None: datetime.datetime.now(), score, proposal ) - async def use_agreement(self, cbk): - """Gets an agreement and performs cbk() on it""" + async def use_agreement( + self, cbk: Callable[[Agreement, NodeInfo], asyncio.Task] + ) -> Optional[asyncio.Task]: + """Get an agreement and start the `cbk()` task within it.""" async with self._lock: agreement_with_info = await self._get_agreement() if agreement_with_info is None: diff --git a/yapapi/engine.py b/yapapi/engine.py index c32608e0d..40e990bbf 100644 --- a/yapapi/engine.py +++ b/yapapi/engine.py @@ -31,12 +31,13 @@ from yapapi import rest, events from yapapi.agreements_pool import AgreementsPool -from yapapi.ctx import CommandContainer, ExecOptions, Work +from yapapi.ctx import CommandContainer, ExecOptions, Work, WorkContext from yapapi.payload import Payload -from yapapi.props import com, Activity, NodeInfo, NodeInfoKeys +from yapapi import props +from yapapi.props import com, NodeInfo from yapapi.props.builder import DemandBuilder, DemandDecorator -from yapapi.rest.activity import CommandExecutionError -from yapapi.rest.market import OfferProposal, Subscription +from yapapi.rest.activity import CommandExecutionError, Activity +from yapapi.rest.market import Agreement, OfferProposal, Subscription from yapapi.storage import gftp from yapapi.strategy import ( DecreaseScoreForUnconfirmedAgreement, @@ -186,10 +187,10 @@ async def create_demand_builder( ) -> DemandBuilder: """Create a `DemandBuilder` for given `payload` and `expiration_time`.""" builder = DemandBuilder() - builder.add(Activity(expiration=expiration_time, multi_activity=True)) - builder.add(NodeInfo(subnet_tag=self._subnet)) + builder.add(props.Activity(expiration=expiration_time, multi_activity=True)) + builder.add(props.NodeInfo(subnet_tag=self._subnet)) if self._subnet: - builder.ensure(f"({NodeInfoKeys.subnet_tag}={self._subnet})") + builder.ensure(f"({props.NodeInfoKeys.subnet_tag}={self._subnet})") await builder.decorate(self.payment_decorator, self.strategy, payload) return builder @@ -482,12 +483,43 @@ async def decorate_demand(self, demand: DemandBuilder): demand.ensure(constraint) demand.properties.update({p.key: p.value for p in self.market_decoration.properties}) - async def create_activity(self, agreement_id: str): + async def create_activity(self, agreement_id: str) -> Activity: """Create an activity for given `agreement_id`.""" return await self._activity_api.new_activity( agreement_id, stream_events=self._stream_output ) + async def start_worker( + self, job: "Job", worker: Callable[[Agreement, Activity, WorkContext], Awaitable] + ) -> Optional[asyncio.Task]: + loop = asyncio.get_event_loop() + + async def _worker(agreement: Agreement, node_info: NodeInfo): + self.emit(events.WorkerStarted(agr_id=agreement.id)) + + try: + activity = await self.create_activity(agreement.id) + except Exception: + self.emit( + events.ActivityCreateFailed( + agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore + ) + ) + raise + + async with activity: + self.emit(events.ActivityCreated(act_id=activity.id, agr_id=agreement.id)) + + self.accept_debit_notes_for_agreement(agreement.id) + work_context = WorkContext( + activity.id, node_info, self.storage_manager, emitter=self.emit + ) + await worker(agreement, activity, work_context) + + return await job.agreements_pool.use_agreement( + lambda agreement, node: loop.create_task(_worker(agreement, node)) + ) + async def process_batches( self, agreement_id: str, diff --git a/yapapi/executor/__init__.py b/yapapi/executor/__init__.py index 5c8f474ea..0718a9990 100644 --- a/yapapi/executor/__init__.py +++ b/yapapi/executor/__init__.py @@ -26,7 +26,7 @@ from yapapi.ctx import WorkContext from yapapi.events import Event from yapapi.payload import Payload -from yapapi.props import NodeInfo +from yapapi.rest.activity import Activity from yapapi.strategy import MarketStrategy from .task import Task, TaskStatus @@ -307,66 +307,41 @@ async def input_tasks() -> AsyncIterator[Task[D, R]]: work_queue = SmartQueue(input_tasks()) - last_wid = 0 + async def _worker( + agreement: rest.market.Agreement, activity: Activity, work_context: WorkContext + ) -> None: - async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None: - - nonlocal last_wid - wid = last_wid - last_wid += 1 - - self.emit(events.WorkerStarted(agr_id=agreement.id)) - - try: - act = await self._engine.create_activity(agreement.id) - except Exception: - self.emit( - events.ActivityCreateFailed( - agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore - ) - ) - self.emit(events.WorkerFinished(agr_id=agreement.id)) - raise - - async with act: - - self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id)) - self._engine.accept_debit_notes_for_agreement(agreement.id) - work_context = WorkContext( - f"worker-{wid}", node_info, self._engine.storage_manager, emitter=self.emit - ) - - with work_queue.new_consumer() as consumer: - try: + with work_queue.new_consumer() as consumer: + try: - async def task_generator() -> AsyncIterator[Task[D, R]]: - async for handle in consumer: - task = Task.for_handle(handle, work_queue, self.emit) - self._engine.emit( - events.TaskStarted( - agr_id=agreement.id, task_id=task.id, task_data=task.data - ) + async def task_generator() -> AsyncIterator[Task[D, R]]: + async for handle in consumer: + task = Task.for_handle(handle, work_queue, self.emit) + self._engine.emit( + events.TaskStarted( + agr_id=agreement.id, task_id=task.id, task_data=task.data ) - yield task - self._engine.emit( - events.TaskFinished(agr_id=agreement.id, task_id=task.id) - ) - - batch_generator = worker(work_context, task_generator()) - try: - await self._engine.process_batches(agreement.id, act, batch_generator) - except StopAsyncIteration: - pass - self.emit(events.WorkerFinished(agr_id=agreement.id)) - except Exception: - self.emit( - events.WorkerFinished( - agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore ) + yield task + self._engine.emit( + events.TaskFinished(agr_id=agreement.id, task_id=task.id) + ) + + batch_generator = worker(work_context, task_generator()) + try: + await self._engine.process_batches(agreement.id, activity, batch_generator) + except StopAsyncIteration: + pass + self.emit(events.WorkerFinished(agr_id=agreement.id)) + except Exception: + self.emit( + events.WorkerFinished( + agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore ) - raise - finally: - await self._engine.accept_payments_for_agreement(agreement.id) + ) + raise + finally: + await self._engine.accept_payments_for_agreement(agreement.id) async def worker_starter() -> None: while True: @@ -375,9 +350,7 @@ async def worker_starter() -> None: if len(workers) < self._max_workers and work_queue.has_unassigned_items(): new_task = None try: - new_task = await job.agreements_pool.use_agreement( - lambda agreement, node: loop.create_task(start_worker(agreement, node)) - ) + new_task = await self._engine.start_worker(job, _worker) if new_task is None: continue workers.add(new_task) diff --git a/yapapi/services.py b/yapapi/services.py index 58fa74990..40fe47e82 100644 --- a/yapapi/services.py +++ b/yapapi/services.py @@ -23,7 +23,7 @@ from yapapi.ctx import WorkContext from yapapi.engine import _Engine, Job from yapapi.payload import Payload -from yapapi.props import NodeInfo +from yapapi.rest.activity import Activity logger = logging.getLogger(__name__) @@ -420,68 +420,44 @@ async def spawn_instance(self) -> None: spawned = False agreement_id: Optional[str] # set in start_worker - async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None: - + async def _worker( + agreement: rest.market.Agreement, activity: Activity, work_context: WorkContext + ) -> None: nonlocal agreement_id, spawned - agreement_id = agreement.id - self.emit(events.WorkerStarted(agr_id=agreement.id)) - - try: - act = await self._engine.create_activity(agreement.id) - except Exception: - self.emit( - events.ActivityCreateFailed( - agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore - ) + spawned = True + + task_id = f"{self.id}:{next(self._task_ids)}" + self.emit( + events.TaskStarted( + agr_id=agreement.id, + task_id=task_id, + task_data=f"Service: {self._service_class.__name__}", ) - raise + ) - async with act: - spawned = True - self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id)) + try: + instance_batches = self._run_instance(work_context) + try: + await self._engine.process_batches(agreement.id, activity, instance_batches) + except StopAsyncIteration: + pass - self._engine.accept_debit_notes_for_agreement(agreement.id) - work_context = WorkContext( - act.id, node_info, self._engine.storage_manager, emitter=self.emit - ) - task_id = f"{self.id}:{next(self._task_ids)}" self.emit( - events.TaskStarted( + events.TaskFinished( agr_id=agreement.id, task_id=task_id, - task_data=f"Service: {self._service_class.__name__}", ) ) - - try: - instance_batches = self._run_instance(work_context) - try: - await self._engine.process_batches(agreement.id, act, instance_batches) - except StopAsyncIteration: - pass - - self.emit( - events.TaskFinished( - agr_id=agreement.id, - task_id=task_id, - ) - ) - self.emit(events.WorkerFinished(agr_id=agreement.id)) - finally: - await self._engine.accept_payments_for_agreement(agreement.id) - await self._job.agreements_pool.release_agreement( - agreement.id, allow_reuse=False - ) - - loop = asyncio.get_event_loop() + self.emit(events.WorkerFinished(agr_id=agreement.id)) + finally: + await self._engine.accept_payments_for_agreement(agreement.id) + await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False) while not spawned: agreement_id = None await asyncio.sleep(1.0) - task = await self._job.agreements_pool.use_agreement( - lambda agreement, node: loop.create_task(start_worker(agreement, node)) - ) + task = await self._engine.start_worker(self._job, _worker) if not task: continue try: