Skip to content

Commit

Permalink
get individual results instead of the accumulated ones (#682)
Browse files Browse the repository at this point in the history
* get individual results instead of the accumulated ones
* update readme and docstrings...
  • Loading branch information
shadeofblue authored Sep 27, 2021
1 parent 106b02a commit b649a89
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 33 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ from yapapi.payload import vm

async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
async for task in tasks:
context.run("/bin/sh", "-c", "date")
script = context.new_script()
future_result = script.run("/bin/sh", "-c", "date")

future_results = yield context.commit()
results = await future_results
task.accept_result(result=results[-1])
yield script
task.accept_result(result=await future_result)


async def main():
Expand All @@ -174,7 +174,7 @@ async def main():

tasks = [Task(data=None)]

async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem:
async with Golem(budget=1.0, subnet_tag="devnet-beta") as golem:
async for completed in golem.execute_tasks(worker, tasks, payload=package):
print(completed.result.stdout)

Expand Down
8 changes: 4 additions & 4 deletions examples/hello-world/hello.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
async for task in tasks:
script = context.new_script()
script.run("/bin/sh", "-c", "date")
future_result = script.run("/bin/sh", "-c", "date")

future_results = yield script
results = await future_results
task.accept_result(result=results[-1])
yield script

task.accept_result(result=await future_result)


async def main():
Expand Down
9 changes: 5 additions & 4 deletions examples/hello-world/hello_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ async def run(self):
while True:
await asyncio.sleep(REFRESH_INTERVAL_SEC)
script = self._ctx.new_script()
script.run(
future_result = script.run(
"/bin/sh",
"-c",
f"cat {DATE_OUTPUT_PATH}",
)

future_results = yield script
results = await future_results
print(results[0].stdout.strip() if results[0].stdout else "")
yield script

result = (await future_result).stdout
print(result.strip() if result else "")


async def main():
Expand Down
12 changes: 6 additions & 6 deletions examples/simple-service-poc/simple_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ async def run(self):
while True:
await asyncio.sleep(10)
script = self._ctx.new_script()
script.run(self.SIMPLE_SERVICE, "--stats") # idx 0
script.run(self.SIMPLE_SERVICE, "--plot", "dist") # idx 1
stats_results = script.run(self.SIMPLE_SERVICE, "--stats")
plot_results = script.run(self.SIMPLE_SERVICE, "--plot", "dist")

future_results = yield script
results = await future_results
stats = results[0].stdout.strip()
plot = results[1].stdout.strip().strip('"')
yield script

stats = (await stats_results).stdout.strip()
plot = (await plot_results).stdout.strip().strip('"')

print(f"{TEXT_COLOR_CYAN}stats: {stats}{TEXT_COLOR_DEFAULT}")

Expand Down
18 changes: 12 additions & 6 deletions yapapi/golem.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,26 +335,32 @@ async def get_payload():
)
async def start(self):
async for script in super().start():
yield script
# every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH`
self._ctx.run(
script = self._ctx.new_script()
script.run(
"/bin/sh",
"-c",
f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &",
)
yield self._ctx.commit()
yield script
async def run(self):
while True:
await asyncio.sleep(REFRESH_INTERVAL_SEC)
self._ctx.run(
script = self._ctx.new_script()
future_result = script.run(
"/bin/sh",
"-c",
f"cat {DATE_OUTPUT_PATH}",
)
future_results = yield self._ctx.commit()
results = await future_results
print(results[0].stdout.strip())
yield script
result = (await future_result).stdout
print(result.strip() if result else "")
async def main():
Expand Down
18 changes: 10 additions & 8 deletions yapapi/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,17 @@ async def start(self) -> AsyncGenerator[Script, Awaitable[List[events.CommandEve
**Example**::
async def start(self):
s = self._ctx.new_script()
# deploy the exe-unit
self._ctx.deploy()
s.deploy(**self.get_deploy_args())
# start the exe-unit's container
self._ctx.start()
s.start()
# start some service process within the container
self._ctx.run("/golem/run/service_ctl", "--start")
s.run("/golem/run/service_ctl", "--start")
# send the batch to the provider
yield self._ctx.commit()
yield s
### Default implementation
Expand Down Expand Up @@ -332,10 +334,10 @@ async def run(self) -> AsyncGenerator[Script, Awaitable[List[events.CommandEvent
async def run(self):
while True:
self._ctx.run("/golem/run/report", "--stats") # index 0
future_results = yield self._ctx.commit()
results = await future_results
stats = results[0].stdout.strip() # retrieve from index 0
script = self._ctx.new_script()
stats_results = script.run(self.SIMPLE_SERVICE, "--stats")
yield script
stats = (await stats_results).stdout.strip()
print(f"stats: {stats}")
**Default implementation**
Expand Down

0 comments on commit b649a89

Please sign in to comment.