Skip to content

Commit

Permalink
Improve documentation for ClusterManagers, tighten function types (Ju…
Browse files Browse the repository at this point in the history
…liaLang#17688)

* Improve documentation for ClusterManagers, tighten function types

Cleaned up formatting and wording, documented `Base.process_messages` a
little, made `incoming` a `Bool` (since `message_handler_loop` requires
this anyway).

* Moved docs out of HelpDB, got rid of references to AsyncStream.

AsyncStream hasn't existed since JuliaLang#12839, nearly a year ago.
Updated the parallel docs to reflect this fact and to correspond
more closely with the current state of `multi.jl` and `managers.jl`.
  • Loading branch information
kshyatt authored and mfasi committed Sep 5, 2016
1 parent dcb6bfa commit 7c4435c
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 308 deletions.
168 changes: 0 additions & 168 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -646,16 +646,6 @@ A string giving the literal bit representation of a number.
"""
bits

"""
launch(manager::FooManager, params::Dict, launched::Vector{WorkerConfig}, launch_ntfy::Condition)
Implemented by cluster managers. For every Julia worker launched by this function, it should
append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit
once all workers, requested by `manager` have been launched. `params` is a dictionary of all
keyword arguments `addprocs` was called with.
"""
launch

"""
invdigamma(x)
Expand Down Expand Up @@ -1569,19 +1559,6 @@ Connect to the named pipe / UNIX domain socket at `path`.
"""
connect(path)

"""
connect(manager::FooManager, pid::Int, config::WorkerConfig) -> (instrm::AsyncStream, outstrm::AsyncStream)
Implemented by cluster managers using custom transports. It should establish a logical
connection to worker with id `pid`, specified by `config` and return a pair of `AsyncStream`
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
ensure that messages are delivered and received completely and in order.
`Base.connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
workers.
"""
connect(manager, pid::Int, config::WorkerConfig)

"""
mean(v[, region])
Expand Down Expand Up @@ -2918,109 +2895,6 @@ See [`RoundingMode`](:obj:`RoundingMode`) for available rounding modes.
"""
Float64

"""
```
addprocs(n::Integer; exeflags=``) -> List of process identifiers
```
Launches workers using the in-built `LocalManager` which only launches workers on the
local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4
processes on the local machine.
"""
addprocs(n::Integer)

"""
addprocs() -> List of process identifiers
Equivalent to `addprocs(Sys.CPU_CORES)`
Note that workers do not run a `.juliarc.jl` startup script, nor do they synchronize their
global state (such as global variables, new method definitions, and loaded modules) with any
of the other running processes.
"""
addprocs()

"""
```
addprocs(machines; keyword_args...) -> List of process identifiers
```
Add processes on remote machines via SSH. Requires `julia` to be installed in the same
location on each node, or to be available via a shared file system.
`machines` is a vector of machine specifications. Worker are started for each specification.
A machine specification is either a string `machine_spec` or a tuple - `(machine_spec, count)`.
`machine_spec` is a string of the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults
to current user, `port` to the standard ssh port. If `[bind_addr[:port]]` is specified, other
workers will connect to this worker at the specified `bind_addr` and `port`.
`count` is the number of workers to be launched on the specified host. If specified as `:auto`
it will launch as many workers as the number of cores on the specific host.
Keyword arguments:
* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
master process. Default is `false`.
* `sshflags`: specifies additional ssh options, e.g.
```
sshflags=`-i /home/foo/bar.pem`
```
* `max_parallel`: specifies the maximum number of workers connected to in parallel at a host.
Defaults to 10.
* `dir`: specifies the working directory on the workers. Defaults to the host's current
directory (as found by `pwd()`)
* `exename`: name of the `julia` executable. Defaults to `"\$JULIA_HOME/julia"` or
`"\$JULIA_HOME/julia-debug"` as the case may be.
* `exeflags`: additional flags passed to the worker processes.
* `topology`: Specifies how the workers connect to each other. Sending a message
between unconnected workers results in an error.
+ `topology=:all_to_all` : All processes are connected to each other.
This is the default.
+ `topology=:master_slave` : Only the driver process, i.e. pid 1 connects to the
workers. The workers do not connect to each other.
+ `topology=:custom` : The `launch` method of the cluster manager specifes the
connection topology via fields `ident` and `connect_idents` in
`WorkerConfig`. A worker with a cluster manager identity `ident`
will connect to all workers specified in `connect_idents`.
Environment variables :
If the master process fails to establish a connection with a newly launched worker within
60.0 seconds, the worker treats it a fatal situation and terminates. This timeout can be
controlled via environment variable `JULIA_WORKER_TIMEOUT`. The value of
`JULIA_WORKER_TIMEOUT` on the master process, specifies the number of seconds a newly
launched worker waits for connection establishment.
"""
addprocs(machines)

"""
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
Launches worker processes via the specified cluster manager.
For example Beowulf clusters are supported via a custom cluster manager implemented in
package `ClusterManagers`.
The number of seconds a newly launched worker waits for connection establishment from the
master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's
environment. Relevant only when using TCP/IP as transport.
"""
addprocs(manager::ClusterManager)

"""
mkpath(path, [mode])
Expand Down Expand Up @@ -3946,15 +3820,6 @@ Send a signal to a process. The default is to terminate the process.
"""
kill(p::Process, signum=SIGTERM)

"""
kill(manager::FooManager, pid::Int, config::WorkerConfig)
Implemented by cluster managers. It is called on the master process, by `rmprocs`. It should
cause the remote worker specified by `pid` to exit. `Base.kill(manager::ClusterManager.....)`
executes a remote `exit()` on `pid`
"""
kill(manager, pid::Int, config::WorkerConfig)

"""
sylvester(A, B, C)
Expand Down Expand Up @@ -4441,19 +4306,6 @@ Byte-swap an integer.
"""
bswap

"""
manage(manager::FooManager, pid::Int, config::WorkerConfig. op::Symbol)
Implemented by cluster managers. It is called on the master process, during a worker's
lifetime, with appropriate `op` values:
- with `:register`/`:deregister` when a worker is added / removed from the Julia worker pool.
- with `:interrupt` when `interrupt(workers)` is called. The [`ClusterManager`](:class:`ClusterManager`)
should signal the appropriate worker with an interrupt signal.
- with `:finalize` for cleanup purposes.
"""
manage

"""
resize!(collection, n) -> collection
Expand Down Expand Up @@ -4713,15 +4565,6 @@ Quit the program indicating that the processes completed successfully. This func
"""
quit

"""
init_worker(manager::FooManager)
Called by cluster managers implementing custom transports. It initializes a newly launched
process as a worker. Command line argument `--worker` has the effect of initializing a
process as a worker using TCP/IP sockets for transport.
"""
init_worker

"""
escape_string(io, str::AbstractString, esc::AbstractString)
Expand Down Expand Up @@ -8077,17 +7920,6 @@ true
"""
applicable

"""
Base.process_messages(instrm::AsyncStream, outstrm::AsyncStream)
Called by cluster managers using custom transports. It should be called when the custom
transport implementation receives the first message from a remote worker. The custom
transport must manage a logical connection to the remote worker and provide two
`AsyncStream` objects, one for incoming messages and the other for messages addressed to the
remote worker.
"""
Base.process_messages

"""
RandomDevice()
Expand Down
118 changes: 118 additions & 0 deletions base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,65 @@ end
# to be mutually reachable without a tunnel, as is often the case in a cluster.
# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config
# A machine is either a <hostname> or a tuple of (<hostname>, count)
"""
addprocs(machines; tunnel=false, sshflags=\`\`, max_parallel=10, kwargs...) -> List of process identifiers
Add processes on remote machines via SSH. Requires `julia` to be installed in the same
location on each node, or to be available via a shared file system.
`machines` is a vector of machine specifications. Workers are started for each specification.
A machine specification is either a string `machine_spec` or a tuple - `(machine_spec, count)`.
`machine_spec` is a string of the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults
to current user, `port` to the standard ssh port. If `[bind_addr[:port]]` is specified, other
workers will connect to this worker at the specified `bind_addr` and `port`.
`count` is the number of workers to be launched on the specified host. If specified as `:auto`
it will launch as many workers as the number of cores on the specific host.
Keyword arguments:
* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
master process. Default is `false`.
* `sshflags`: specifies additional ssh options, e.g.
```sshflags=\`-i /home/foo/bar.pem\` ```
* `max_parallel`: specifies the maximum number of workers connected to in parallel at a host.
Defaults to 10.
* `dir`: specifies the working directory on the workers. Defaults to the host's current
directory (as found by `pwd()`)
* `exename`: name of the `julia` executable. Defaults to `"\$JULIA_HOME/julia"` or
`"\$JULIA_HOME/julia-debug"` as the case may be.
* `exeflags`: additional flags passed to the worker processes.
* `topology`: Specifies how the workers connect to each other. Sending a message
between unconnected workers results in an error.
+ `topology=:all_to_all` : All processes are connected to each other.
This is the default.
+ `topology=:master_slave` : Only the driver process, i.e. pid 1 connects to the
workers. The workers do not connect to each other.
+ `topology=:custom` : The `launch` method of the cluster manager specifes the
connection topology via fields `ident` and `connect_idents` in
`WorkerConfig`. A worker with a cluster manager identity `ident`
will connect to all workers specified in `connect_idents`.
Environment variables :
If the master process fails to establish a connection with a newly launched worker within
60.0 seconds, the worker treats it as a fatal situation and terminates.
This timeout can be controlled via environment variable `JULIA_WORKER_TIMEOUT`.
The value of JULIA_WORKER_TIMEOUT` on the master process specifies the number of seconds a
newly launched worker waits for connection establishment.
"""
function addprocs(machines::AbstractVector; tunnel=false, sshflags=``, max_parallel=10, kwargs...)
check_addprocs_args(kwargs)
addprocs(SSHManager(machines); tunnel=tunnel, sshflags=sshflags, max_parallel=max_parallel, kwargs...)
Expand Down Expand Up @@ -223,7 +282,25 @@ immutable LocalManager <: ClusterManager
restrict::Bool # Restrict binding to 127.0.0.1 only
end

"""
addprocs(; kwargs...) -> List of process identifiers
Equivalent to `addprocs(Sys.CPU_CORES; kwargs...)`
Note that workers do not run a `.juliarc.jl` startup script, nor do they synchronize their
global state (such as global variables, new method definitions, and loaded modules) with any
of the other running processes.
"""
addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...)

"""
addprocs(np::Integer; restrict=true, kwargs...) -> List of process identifiers
Launches workers using the in-built `LocalManager` which only launches workers on the
local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4
processes on the local machine. If `restrict` is `true`, binding is restricted to
`127.0.0.1`.
"""
function addprocs(np::Integer; restrict=true, kwargs...)
check_addprocs_args(kwargs)
addprocs(LocalManager(np, restrict); kwargs...)
Expand Down Expand Up @@ -256,6 +333,28 @@ function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Sy
end
end

"""
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Implemented by cluster managers. For every Julia worker launched by this function, it should
append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit
once all workers, requested by `manager` have been launched. `params` is a dictionary of all
keyword arguments `addprocs` was called with.
"""
launch

"""
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Implemented by cluster managers. It is called on the master process, during a worker's
lifetime, with appropriate `op` values:
- with `:register`/`:deregister` when a worker is added / removed from the Julia worker pool.
- with `:interrupt` when `interrupt(workers)` is called. The [`ClusterManager`](:class:`ClusterManager`)
should signal the appropriate worker with an interrupt signal.
- with `:finalize` for cleanup purposes.
"""
manage

# DefaultClusterManager for the default TCP transport - used by both SSHManager and LocalManager

Expand All @@ -264,6 +363,17 @@ end

const tunnel_hosts_map = Dict{AbstractString, Semaphore}()

"""
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Implemented by cluster managers using custom transports. It should establish a logical
connection to worker with id `pid`, specified by `config` and return a pair of `IO`
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
ensure that messages are delivered and received completely and in order.
`Base.connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
workers.
"""
function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
if !isnull(config.connect_at)
# this is a worker-to-worker setup call.
Expand Down Expand Up @@ -389,6 +499,14 @@ function connect_to_worker(host::AbstractString, bind_addr::AbstractString, port
(s, bind_addr)
end


"""
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Implemented by cluster managers. It is called on the master process, by `rmprocs`. It should
cause the remote worker specified by `pid` to exit. `Base.kill(manager::ClusterManager.....)`
executes a remote `exit()` on `pid`
"""
function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(exit, pid) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
Expand Down
Loading

0 comments on commit 7c4435c

Please sign in to comment.