Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for LSF #74

Merged
merged 2 commits into from
Jan 7, 2019
Merged

add support for LSF #74

merged 2 commits into from
Jan 7, 2019

Conversation

bjarthur
Copy link
Collaborator

@bjarthur bjarthur commented Aug 1, 2017

closes #72

@bjarthur bjarthur mentioned this pull request Aug 1, 2017
@raminammour
Copy link

Ben,
Here are the changes that I have done. On one of my systems bsub allocates a full node, so I needed to launch workers separately. The flag -x may be necessary on some systems (and may not be allowed on others). I could not figure out a way around xargs every other syntax i tried led to a syntax error.

export LSFManager, addprocs_lsf,master_worker_dict

immutable LSFManager <: ClusterManager
    np::Integer
    flags::Cmd
    workers_per_proc::Integer
end

LSFManager(np,flags)=LSFManager(np,flags,1)
LSFManager(np,wpc)=LSFManager(np,``,wpc)
LSFManager(np)=LSFManager(np,``,1)

global master_worker_dict=Dict(1=>[1])

function launch(manager::LSFManager, params::Dict, launched::Array, c::Condition)

    try
        dir = params[:dir]
        exename = params[:exename]
        exeflags = params[:exeflags]

        np = manager.np

        jobname = `julia-$(getpid())`

        # bsub -x is important, otherwise can fall on the same host as another job!
        cmd = `cd $dir ";" hostname -i "|" xargs $exename $exeflags $(worker_arg) --bind-to `
        bsub_cmd = `bsub -I -x $(manager.flags) -cwd $dir -J $jobname "$cmd"`


        info(bsub_cmd)

        stream_proc = [open(bsub_cmd) for i in 1:np]


        #sleep(10)
        for i in 1:np
            config = WorkerConfig()
            config.io, io_proc = stream_proc[i]
            config.userdata = Dict{Symbol, Any}(:task => i, :process => io_proc)
            config.count=manager.workers_per_proc
            config.exename=exename
            #(a1,p1)=Base.Distributed.read_worker_host_port(stream_proc[i][1])
            config.exeflags=`$worker_arg`
            #@show config.bind_addr,config.port
            push!(launched, config)
            #println("here,",readlines(config.io.value))
            notify(c)
        end

    catch e
        println("Error launching workers")
        println(e)
    end
    
end

manage(manager::LSFManager, id::Int64, config::WorkerConfig, op::Symbol) = nothing

function kill(manager::LSFManager, id::Int64, config::WorkerConfig)
    warn("When bsub head process is terminated, all workers on process will be terminated too",once=true)

    if haskey(master_worker_dict,id)
        rmprocs(master_worker_dict[id])
        remote_do(exit,id)
        nothing
    else
        remote_do(exit,id)
        nothing
    end     
end

addprocs_lsf(np::Integer; workers_per_proc::Int=1,flags::Cmd=``) = addprocs(LSFManager(np, flags,workers_per_proc))

function Base.Distributed.launch_n_additional_processes(manager, frompid, fromconfig, cnt, launched_q)
           @sync begin
               exename = get(fromconfig.exename)
               exeflags = get(fromconfig.exeflags, ``);
               cmd = `$exename $exeflags --bind-to $(fromconfig.host.value)`
               #info(cmd)
               master_worker_dict[frompid]=[]
               new_addresses = remotecall_fetch(Base.Distributed.launch_additional, frompid, cnt, cmd)
               for address in new_addresses
                   (bind_addr, port) = address

                   wconfig = WorkerConfig()
                   for x in [:host, :tunnel, :sshflags, :exeflags, :exename, :enable_threaded_blas]
                       setfield!(wconfig, x, getfield(fromconfig, x))
                   end
                   wconfig.bind_addr = bind_addr
                   wconfig.port = port

                   let wconfig=wconfig
                       @async begin
                           pid = Base.Distributed.create_worker(manager, wconfig)
                           remote_do(Base.Distributed.redirect_output_from_additional_worker, frompid, pid, port)
                           push!(master_worker_dict[frompid],pid)
                           push!(launched_q, pid)
                       end
                   end
               end
           end
       end

I realize that all of this may not be necessary/appropriate for all the systems, but in case it helps someone.

Cheers!

@bjarthur
Copy link
Collaborator Author

bjarthur commented Sep 6, 2017

thanks for this.

it'd be really nice though if we could make ClusterManagers.jl general enough to handle all systems. towards this end, i've incorporated some of your changes, but will probably need to ask @amitmurthy for help with the rest.

a couple questions:

did you try passing the -x flag in as an LSFManager.flag? like, addprocs_lsf(3; flags=`-x`) ?

also, by saying you couldn't figure out a way around xargs, do you mean that you tried passing in --bind-to as an exeflags and it didn't work. like, addprocs_lsf(3; exeflags=`--bind-to $(hostname -i)`) ?

@raminammour
Copy link

raminammour commented Sep 6, 2017

addprocs_lsf(3; flags=`-x`) works.

If I debug, addprocs(ClusterManagers.LSFManager(3,``),exeflags=`--bind-to \$(hostname -i)`), I can see that bsub has an error ERROR: unknown option `-)` . The syntax that julia is passing is not correct.

For refrence the command is:
bsub -I -J julia-24036 /data/gpfs/Users/j0280401/julia-next/julia-903644385b/bin/julia --worker=JQcg7b88DFZIS4QM --bind-to '$(hostname' '-i)'

If I manually remove the quotes and run the bsub:
bsub -I -J julia-10762 cd /tmp/;/data/gpfs/Users/j0280401/julia-next/julia-903644385b/bin/julia --worker=WE8ZtbvDKYgAIgdt --bind-to $(hostname -i)
It works (prints the right ip-address and worker waits for 60 sec before exiting).

@bjarthur
Copy link
Collaborator Author

bjarthur commented Sep 6, 2017

you need to quote the space inbetween hostname and -i. but that still might not work. --bind-to needs the address of the worker, right? so it somehow needs to be quoted so it doesn't run on the master process.

LSF has an environment variable called LSB_HOSTS, which for a job with a single task, like these, contains just a single host name, that of the worker. problem is that --bind-to needs the ip address.

@raminammour
Copy link

I had thought about mapping hostnames to ip addresses, saving in a dictionary and then passing that to the worker config. In my mind, the cleanest solution remains to give the user some control over the output of getipaddress().

@raminammour
Copy link

raminammour commented Sep 6, 2017

I got it to work (with your original PR, and an almost cleaner solution) with the following:

 ss="";
for i=1:length(worker_arg.exec);ss=string(ss,worker_arg[i]," ");end
cmd = "cd $dir;$exename $ss $exeflags"
bsub_cmd = `bsub -I $(manager.flags) -J $jobname "$cmd"`

And then passing exeflags as a string:

addprocs(ClusterManagers.LSFManager(10,``),exeflags="--bind-to \$(hostname  --ip-address)",dir="/tmp/")

Copy link

@raminammour raminammour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When count >1, rmprocs will almost bkill the job. The other processes that were launched will be killed too on the bsub side, but julia will think they are still connected.

Hence my circuitous global dict and the changes to the kill function.

@bjarthur
Copy link
Collaborator Author

to get this working we need JuliaLang/julia#29770

@bjarthur
Copy link
Collaborator Author

i've pushed some changes which make this work on julia 1.0.2. thanks @vchuravy ! there is one warning though that we need to take care of:

julia> addprocs_lsf(1)
<<Waiting for dispatch ...>>
<<Starting on h11u01>>
	From failed worker startup:	Job <50096054> is submitted to default queue <interactive>.
1-element Array{Int64,1}:
 2

@bjarthur
Copy link
Collaborator Author

everything works for me, despite the "failed worker startup". if i'm reading the code correctly, that text will be printed even for successful workers. am i missing something?

@raminammour can you please test?

@raminammour
Copy link

I do not have access to an lsf cluster at the moment. However, I had to support someone who did two weeks ago. I cherry picked the pr mentioned above, built Julia and the lsf manager worked. I did get the weird warning too, but it worked, and we were in a hurry :)

@bjarthur
Copy link
Collaborator Author

@amitmurthy this is ready to merge. can you please review? thanks.

src/lsf.jl Show resolved Hide resolved
src/lsf.jl Show resolved Hide resolved
@amitmurthy amitmurthy merged commit 0b54922 into JuliaParallel:master Jan 7, 2019
@amitmurthy
Copy link
Contributor

Thanks @bjarthur

@bjarthur bjarthur deleted the bja/lsf branch January 7, 2019 03:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

LSF manager with bsub
3 participants