Skip to content

Commit

Permalink
Merge branch 'blue/executor-split' of github.com:golemfactory/yapapi …
Browse files Browse the repository at this point in the history
…into blue/executor-split
  • Loading branch information
azawlocki committed May 18, 2021
2 parents 8c13ca0 + 9d8a9a3 commit 19d0cc0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
48 changes: 24 additions & 24 deletions yapapi/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,30 +469,6 @@ async def decorate_demand(self, demand: DemandBuilder):
demand.ensure(constraint)
demand.properties.update({p.key: p.value for p in self.market_decoration.properties})

async def execute_tasks(
self,
worker: Callable[
[WorkContext, AsyncIterator[Task[D, R]]],
AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]],
],
data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]],
payload: Payload,
max_workers: Optional[int] = None,
timeout: Optional[timedelta] = None,
budget: Optional[Union[float, Decimal]] = None,
) -> AsyncIterator[Task[D, R]]:

kwargs: Dict[str, Any] = {"payload": payload}
if max_workers:
kwargs["max_workers"] = max_workers
if timeout:
kwargs["timeout"] = timeout
kwargs["budget"] = budget if budget is not None else self._budget_amount

async with Executor(_engine=self, **kwargs) as executor:
async for t in executor.submit(worker, data):
yield t

async def create_activity(self, agreement_id: str):
return await self._activity_api.new_activity(
agreement_id, stream_events=self._stream_output
Expand Down Expand Up @@ -573,6 +549,30 @@ async def get_batch_results() -> List[events.CommandEvent]:
future_results = loop.create_task(get_batch_results())
item = await command_generator.asend(future_results)

async def execute_tasks(
self,
worker: Callable[
[WorkContext, AsyncIterator[Task[D, R]]],
AsyncGenerator[Work, Awaitable[List[events.CommandEvent]]],
],
data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]],
payload: Payload,
max_workers: Optional[int] = None,
timeout: Optional[timedelta] = None,
budget: Optional[Union[float, Decimal]] = None,
) -> AsyncIterator[Task[D, R]]:

kwargs: Dict[str, Any] = {"payload": payload}
if max_workers:
kwargs["max_workers"] = max_workers
if timeout:
kwargs["timeout"] = timeout
kwargs["budget"] = budget if budget is not None else self._budget_amount

async with Executor(_engine=self, **kwargs) as executor:
async for t in executor.submit(worker, data):
yield t


class Job:
"""Functionality related to a single job."""
Expand Down
2 changes: 1 addition & 1 deletion yapapi/props/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class DemandBuilder:
```python
>>> import yapapi
>>> from yapapi import properties as yp
>>> from yapapi import props as yp
>>> from yapapi.props.builder import DemandBuilder
>>> from datetime import datetime, timezone
>>> builder = DemandBuilder()
Expand Down

0 comments on commit 19d0cc0

Please sign in to comment.