Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: future.is_ready and future.wait_for(0) always return False #2729

Closed
jameshcorbett opened this issue Feb 11, 2020 · 8 comments
Closed

Comments

@jameshcorbett
Copy link
Member

It seems that when I submit a job and then repeatedly try to check (without waiting) whether the job is done, it never completes. If I do wait, however, it does work.

Here's a little testing script I made. When I executed it, it ran indefinitely. The same happened if I replaced while not check_completed_future(job_completion_future): with while not job_completion_future.is_ready():.

import time

import flux, flux.job


def check_completed_future(future):
    try:
        future.wait_for(0)
    except EnvironmentError:
        return False
    else:
        return True


broker = flux.Flux()
jobspec = flux.job.JobspecV1.from_command(["sleep", "1"], num_tasks=1, cores_per_task=1)
job_id = flux.job.submit(broker, jobspec, flags=flux.constants.FLUX_JOB_WAITABLE)
job_completion_future = flux.job.wait_async(broker, job_id)
while not check_completed_future(job_completion_future):
    print("sleeping!")
    time.sleep(1)
print("done!")

Tracing the issue, I found that calls to future.is_ready() fall back to future_is_ready (flux_future_t *f) in future.c. I'm guessing the result_valid attribute of the flux_future_t struct is set if I wait for a positive amount of time, but is never set if I don't wait. (I assumed it would set by some other thread, but I guess not.)

@grondo
Copy link
Contributor

grondo commented Feb 11, 2020

For the returned job_completion_future to be fulfilled you have to enter Flux's reactor, whether by calling reactor_run() directly, or by calling a synchronous get which enters a temporary reactor under the covers. flux_future_is_ready() is guaranteed not to block, so that only tells you if flux_future_get() would block.

In this case if you change future.wait_for(0) to future.get() or using a non-zero wait time, it should work. However, that might not be the right approach for your use case.

If you describe your use case, perhaps we can help find a good solution.

@jameshcorbett
Copy link
Member Author

Ah, OK. I thought it might be a bug. My use case is as follows:

I want to execute M total job steps as quickly as possible, subject to the constraint that I can only have N of them out at any one time. Usually what I do is loop over my N existing job steps, and check whether they've finished. Something like the following:

for step in existing_steps:
  if step.done() and remaining_jobs > 0:
    replace

@SteVwonder
Copy link
Member

Do you want to block waiting for any job to complete? If so, @cmoussa1 and @garlick put together a workflow example that matches your use-case pretty well. It uses the wait interface to block until any of the jobs you submitted previously complete. At which point, you can peek at the job status to see if it completely successfully or failed as well as replacing that job within another job submission before returning to the wait: https://github.com/flux-framework/flux-workflow-examples/blob/master/example10/submitter_sliding_window.py

If you want to do the checks asynchronously, I don't think the wait interface example that I just referenced is what you want.

@grondo
Copy link
Contributor

grondo commented Feb 11, 2020

Yeah, that example is straight from our testsuite:

https://github.com/flux-framework/flux-core/blob/master/t/job-manager/submit-sliding-window.py

There are some other nice examples of the job.wait() interface in that directory.

BTW, if you change the future.wait_for(0) above to future.wait_for(0.01), then this gives the reactor a chance to run on each iteration and your script will "work" for some definition thereof.

Also, as the example shows, you can wait for any waitable job if you leave off the jobid argument to job.wait(). So if you want to check if any job has finished, you only need to check a single future (creating a new one each time the last was fulfilled).

Example derived from your use case (submit 20 jobs with 5 running at a time)

Note the time.sleep(1) is replaced with future.wait_for(1) which sleeps in the flux reactor up to 1s waiting for the next job to complete.

import time
import flux, flux.job

remaining = 20
running = 0

def check_completed_future(future):
    try:
        future.wait_for(1)
    except EnvironmentError:
        return False
    else:
        return True


def submit_job(h, jobspec):
    global remaining
    global running
    flux.job.submit(h, jobspec, waitable=True)
    remaining = remaining - 1
    running = running + 1

h = flux.Flux()
jobspec = flux.job.JobspecV1.from_command(["sleep", "1"],
                                          num_tasks=1,
                                          cores_per_task=1)
# submit 5 jobs
for i in range(5):
    submit_job(h, jobspec)

future = flux.job.wait_async(h)
while running:
    if check_completed_future(future):
        # future.get guaranteed not to block now:
        jobid, success, msg = flux.job.wait_get_status(future)
        print("job {} finished: {}".format(jobid, msg.decode('utf-8')))
        running = running - 1

        if remaining > 0:
            submit_job(h, jobspec)

        # get a new future
        future = flux.job.wait_async(h)

print("done!")

@grondo
Copy link
Contributor

grondo commented Feb 11, 2020

Unfortunately when I run the above test, it works, but I get the errors described in #2671

$ flux python test.py
job 168057372672 finished: task(s) exited with exit code 0
job 185186910208 finished: task(s) exited with exit code 0
job 186126434304 finished: task(s) exited with exit code 0
job 185841221632 finished: task(s) exited with exit code 0
job 17398191095808 finished: task(s) exited with exit code 0
job 17398509862912 finished: task(s) exited with exit code 0
job 17398744743936 finished: task(s) exited with exit code 0
job 17399013179392 finished: task(s) exited with exit code 0
job 17399331946496 finished: task(s) exited with exit code 0
job 17399583604736 finished: task(s) exited with exit code 0
job 17399919149056 finished: task(s) exited with exit code 0
job 17400154030080 finished: task(s) exited with exit code 0
job 17400388911104 finished: task(s) exited with exit code 0
job 17416310489088 finished: task(s) exited with exit code 0
job 17416830582784 finished: task(s) exited with exit code 0
done!
Exception ignored in: <bound method Wrapper.__del__ of <flux.core.handle.Flux object at 0x7fbeee534b70>>
Traceback (most recent call last):
  File "/home/grondo/git/f.git/src/bindings/python/flux/wrapper.py", line 363, in __del__
  File "/home/grondo/git/f.git/src/bindings/python/flux/wrapper.py", line 339, in _clear
  File "/home/grondo/git/f.git/src/bindings/python/flux/wrapper.py", line 166, in __call__
ReferenceError: weakly-referenced object no longer exists
Exception ignored in: <bound method Wrapper.__del__ of <flux.future.Future.InnerWrapper object at 0x7fbeea5e8198>>
Traceback (most recent call last):
  File "/home/grondo/git/f.git/src/bindings/python/flux/wrapper.py", line 363, in __del__
  File "/home/grondo/git/f.git/src/bindings/python/flux/wrapper.py", line 339, in _clear
  File "/home/grondo/git/f.git/src/bindings/python/flux/wrapper.py", line 166, in __call__
ReferenceError: weakly-referenced object no longer exists


@jameshcorbett
Copy link
Member Author

Thanks @grondo and @SteVwonder. I did want to wait asynchronously for each job to complete, but only because that fits with the existing infrastructure I've built around CLIs--where the only way I have to tell whether a job has completed is to poll the mini run process, and there isn't any built-in wait_any.

The sliding_window model would absolutely fit my use-case, but it sounds like I'll have to implement it as a separate submission/monitoring mechanism, or just change my CLI implementation to write my own wait_any function (shouldn't be too hard). I wrote the CLI implementation assuming that Flux futures were like Python's concurrent.future.Future, where they fulfill in the background because Flux forks a thread somewhere, but it sounds like they're more like C++'s Futures with the deferred policy.

@garlick
Copy link
Member

garlick commented Feb 11, 2020

I've built around CLIs--where the only way I have to tell whether a job has completed is to poll the mini run process, and there isn't any built-in wait_any

I think you've moved on to the Python API which is faster and has more flexibility than using the Flux CLI, but I wanted to point out that flux job wait with no jobid argument is a CLI "wait any". E.g. this works

#!/bin/bash
  
count=10

echo submitting
for i in $(seq 1 $count); do
    flux mini submit --flags=waitable /bin/true
done

echo waiting
for i in $(seq 1 $count); do
    flux job wait
done

@jameshcorbett
Copy link
Member Author

Actually, it sounds like we're going to continue using the CLI when our code runs in Python 2 (#2405), so that's actually pretty helpful, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants