Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make the common part of start_worker indeed, common #496

Merged
merged 5 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions yapapi/agreements_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import random
import sys
from typing import Dict, NamedTuple, Optional, Set, Tuple
from typing import Dict, NamedTuple, Optional, Set, Tuple, Callable

from yapapi import events
from yapapi.props import Activity, NodeInfo
Expand Down Expand Up @@ -68,8 +68,10 @@ async def add_proposal(self, score: float, proposal: OfferProposal) -> None:
datetime.datetime.now(), score, proposal
)

async def use_agreement(self, cbk):
"""Gets an agreement and performs cbk() on it"""
async def use_agreement(
self, cbk: Callable[[Agreement, NodeInfo], asyncio.Task]
) -> Optional[asyncio.Task]:
"""Get an agreement and start the `cbk()` task within it."""
async with self._lock:
agreement_with_info = await self._get_agreement()
if agreement_with_info is None:
Expand Down
48 changes: 40 additions & 8 deletions yapapi/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@

from yapapi import rest, events
from yapapi.agreements_pool import AgreementsPool
from yapapi.ctx import CommandContainer, ExecOptions, Work
from yapapi.ctx import CommandContainer, ExecOptions, Work, WorkContext
from yapapi.payload import Payload
from yapapi.props import com, Activity, NodeInfo, NodeInfoKeys
from yapapi import props
from yapapi.props import com, NodeInfo
from yapapi.props.builder import DemandBuilder, DemandDecorator
from yapapi.rest.activity import CommandExecutionError
from yapapi.rest.market import OfferProposal, Subscription
from yapapi.rest.activity import CommandExecutionError, Activity
from yapapi.rest.market import Agreement, OfferProposal, Subscription
from yapapi.storage import gftp
from yapapi.strategy import (
DecreaseScoreForUnconfirmedAgreement,
Expand Down Expand Up @@ -186,10 +187,10 @@ async def create_demand_builder(
) -> DemandBuilder:
"""Create a `DemandBuilder` for given `payload` and `expiration_time`."""
builder = DemandBuilder()
builder.add(Activity(expiration=expiration_time, multi_activity=True))
builder.add(NodeInfo(subnet_tag=self._subnet))
builder.add(props.Activity(expiration=expiration_time, multi_activity=True))
builder.add(props.NodeInfo(subnet_tag=self._subnet))
if self._subnet:
builder.ensure(f"({NodeInfoKeys.subnet_tag}={self._subnet})")
builder.ensure(f"({props.NodeInfoKeys.subnet_tag}={self._subnet})")
await builder.decorate(self.payment_decorator, self.strategy, payload)
return builder

Expand Down Expand Up @@ -482,12 +483,43 @@ async def decorate_demand(self, demand: DemandBuilder):
demand.ensure(constraint)
demand.properties.update({p.key: p.value for p in self.market_decoration.properties})

async def create_activity(self, agreement_id: str):
async def create_activity(self, agreement_id: str) -> Activity:
"""Create an activity for given `agreement_id`."""
return await self._activity_api.new_activity(
agreement_id, stream_events=self._stream_output
)

async def start_worker(
self, job: "Job", worker: Callable[[Agreement, Activity, WorkContext], Awaitable]
) -> Optional[asyncio.Task]:
loop = asyncio.get_event_loop()

async def _worker(agreement: Agreement, node_info: NodeInfo):
self.emit(events.WorkerStarted(agr_id=agreement.id))

try:
activity = await self.create_activity(agreement.id)
except Exception:
self.emit(
events.ActivityCreateFailed(
agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore
)
)
raise

async with activity:
self.emit(events.ActivityCreated(act_id=activity.id, agr_id=agreement.id))

self.accept_debit_notes_for_agreement(agreement.id)
work_context = WorkContext(
activity.id, node_info, self.storage_manager, emitter=self.emit
)
await worker(agreement, activity, work_context)

return await job.agreements_pool.use_agreement(
lambda agreement, node: loop.create_task(_worker(agreement, node))
)

async def process_batches(
self,
agreement_id: str,
Expand Down
91 changes: 32 additions & 59 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from yapapi.ctx import WorkContext
from yapapi.events import Event
from yapapi.payload import Payload
from yapapi.props import NodeInfo
from yapapi.rest.activity import Activity
from yapapi.strategy import MarketStrategy

from .task import Task, TaskStatus
Expand Down Expand Up @@ -307,66 +307,41 @@ async def input_tasks() -> AsyncIterator[Task[D, R]]:

work_queue = SmartQueue(input_tasks())

last_wid = 0
async def _worker(
agreement: rest.market.Agreement, activity: Activity, work_context: WorkContext
) -> None:

async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None:

nonlocal last_wid
wid = last_wid
last_wid += 1

self.emit(events.WorkerStarted(agr_id=agreement.id))

try:
act = await self._engine.create_activity(agreement.id)
except Exception:
self.emit(
events.ActivityCreateFailed(
agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore
)
)
self.emit(events.WorkerFinished(agr_id=agreement.id))
raise

async with act:

self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id))
self._engine.accept_debit_notes_for_agreement(agreement.id)
work_context = WorkContext(
f"worker-{wid}", node_info, self._engine.storage_manager, emitter=self.emit
)

with work_queue.new_consumer() as consumer:
try:
with work_queue.new_consumer() as consumer:
try:

async def task_generator() -> AsyncIterator[Task[D, R]]:
async for handle in consumer:
task = Task.for_handle(handle, work_queue, self.emit)
self._engine.emit(
events.TaskStarted(
agr_id=agreement.id, task_id=task.id, task_data=task.data
)
async def task_generator() -> AsyncIterator[Task[D, R]]:
async for handle in consumer:
task = Task.for_handle(handle, work_queue, self.emit)
self._engine.emit(
events.TaskStarted(
agr_id=agreement.id, task_id=task.id, task_data=task.data
)
yield task
self._engine.emit(
events.TaskFinished(agr_id=agreement.id, task_id=task.id)
)

batch_generator = worker(work_context, task_generator())
try:
await self._engine.process_batches(agreement.id, act, batch_generator)
except StopAsyncIteration:
pass
self.emit(events.WorkerFinished(agr_id=agreement.id))
except Exception:
self.emit(
events.WorkerFinished(
agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore
)
yield task
self._engine.emit(
events.TaskFinished(agr_id=agreement.id, task_id=task.id)
)

batch_generator = worker(work_context, task_generator())
try:
await self._engine.process_batches(agreement.id, activity, batch_generator)
except StopAsyncIteration:
pass
self.emit(events.WorkerFinished(agr_id=agreement.id))
except Exception:
self.emit(
events.WorkerFinished(
agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore
)
raise
finally:
await self._engine.accept_payments_for_agreement(agreement.id)
)
raise
finally:
await self._engine.accept_payments_for_agreement(agreement.id)

async def worker_starter() -> None:
while True:
Expand All @@ -375,9 +350,7 @@ async def worker_starter() -> None:
if len(workers) < self._max_workers and work_queue.has_unassigned_items():
new_task = None
try:
new_task = await job.agreements_pool.use_agreement(
lambda agreement, node: loop.create_task(start_worker(agreement, node))
)
new_task = await self._engine.start_worker(job, _worker)
if new_task is None:
continue
workers.add(new_task)
Expand Down
74 changes: 25 additions & 49 deletions yapapi/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from yapapi.ctx import WorkContext
from yapapi.engine import _Engine, Job
from yapapi.payload import Payload
from yapapi.props import NodeInfo
from yapapi.rest.activity import Activity


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -420,68 +420,44 @@ async def spawn_instance(self) -> None:
spawned = False
agreement_id: Optional[str] # set in start_worker

async def start_worker(agreement: rest.market.Agreement, node_info: NodeInfo) -> None:

async def _worker(
agreement: rest.market.Agreement, activity: Activity, work_context: WorkContext
) -> None:
nonlocal agreement_id, spawned

agreement_id = agreement.id
self.emit(events.WorkerStarted(agr_id=agreement.id))

try:
act = await self._engine.create_activity(agreement.id)
except Exception:
self.emit(
events.ActivityCreateFailed(
agr_id=agreement.id, exc_info=sys.exc_info() # type: ignore
)
spawned = True

task_id = f"{self.id}:{next(self._task_ids)}"
self.emit(
events.TaskStarted(
agr_id=agreement.id,
task_id=task_id,
task_data=f"Service: {self._service_class.__name__}",
)
raise
)

async with act:
spawned = True
self.emit(events.ActivityCreated(act_id=act.id, agr_id=agreement.id))
try:
instance_batches = self._run_instance(work_context)
try:
await self._engine.process_batches(agreement.id, activity, instance_batches)
except StopAsyncIteration:
pass

self._engine.accept_debit_notes_for_agreement(agreement.id)
work_context = WorkContext(
act.id, node_info, self._engine.storage_manager, emitter=self.emit
)
task_id = f"{self.id}:{next(self._task_ids)}"
self.emit(
events.TaskStarted(
events.TaskFinished(
agr_id=agreement.id,
task_id=task_id,
task_data=f"Service: {self._service_class.__name__}",
)
)

try:
instance_batches = self._run_instance(work_context)
try:
await self._engine.process_batches(agreement.id, act, instance_batches)
except StopAsyncIteration:
pass

self.emit(
events.TaskFinished(
agr_id=agreement.id,
task_id=task_id,
)
)
self.emit(events.WorkerFinished(agr_id=agreement.id))
finally:
await self._engine.accept_payments_for_agreement(agreement.id)
await self._job.agreements_pool.release_agreement(
agreement.id, allow_reuse=False
)

loop = asyncio.get_event_loop()
self.emit(events.WorkerFinished(agr_id=agreement.id))
finally:
await self._engine.accept_payments_for_agreement(agreement.id)
await self._job.agreements_pool.release_agreement(agreement.id, allow_reuse=False)

while not spawned:
agreement_id = None
await asyncio.sleep(1.0)
task = await self._job.agreements_pool.use_agreement(
lambda agreement, node: loop.create_task(start_worker(agreement, node))
)
task = await self._engine.start_worker(self._job, _worker)
if not task:
continue
try:
Expand Down