Skip to content

Commit

Permalink
move nprocs and myid to Distributed
Browse files Browse the repository at this point in the history
  • Loading branch information
vchuravy committed Dec 18, 2017
1 parent 77ef406 commit e1a0a70
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 34 deletions.
6 changes: 4 additions & 2 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ function process_options(opts::JLOptions)
global PROGRAM_FILE = arg_is_program ? shift!(ARGS) : ""

# Load Distributed module only if any of the Distributed options have been specified.
if (opts.worker == 1) || (opts.nprocs > 0) || (opts.machinefile != C_NULL)
distributed_mode = (opts.worker == 1) || (opts.nprocs > 0) || (opts.machinefile != C_NULL)
if distributed_mode
eval(Main, :(using Distributed))
invokelatest(Main.Distributed.process_opts, opts)
end
Expand All @@ -303,9 +304,10 @@ function process_options(opts::JLOptions)
println()
elseif cmd == 'L'
# load file immediately on all processors
if nprocs() == 1
if !distributed_mode
include(Main, arg)
else
# TODO: Move this logic to Distributed and use a callback
@sync for p in invokelatest(Main.procs)
@async invokelatest(Main.remotecall_wait, include, p, Main, arg)
end
Expand Down
2 changes: 2 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,8 @@ export conv, conv2, deconv, filt, filt!, xcorr
@deprecate_moved interrupt "Distributed" true true
@deprecate_moved launch "Distributed" true true
@deprecate_moved manage "Distributed" true true
@deprecate_moved myid "Distributed" true true
@deprecate_moved nprocs "Distributed" true true
@deprecate_moved nworkers "Distributed" true true
@deprecate_moved pmap "Distributed" true true
@deprecate_moved procs "Distributed" true true
Expand Down
7 changes: 1 addition & 6 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1241,9 +1241,4 @@ export
spzeros,
rowvals,
nzrange,
nnz,

# Minimal set of Distributed exports - useful for a program to check if running
# in distributed mode or not.
myid,
nprocs
nnz
10 changes: 0 additions & 10 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ end
# require always works in Main scope and loads files from node 1
const toplevel_load = Ref(true)

myid() = isdefined(Main, :Distributed) ? invokelatest(Main.Distributed.myid) : 1
nprocs() = isdefined(Main, :Distributed) ? invokelatest(Main.Distributed.nprocs) : 1

"""
require(module::Symbol)
Expand All @@ -297,13 +294,6 @@ function require(mod::Symbol)
if !root_module_exists(mod)
_require(mod)
# After successfully loading, notify downstream consumers
if toplevel_load[] && myid() == 1 && nprocs() > 1
# broadcast top-level import/using from node 1 (only)
@sync for p in invokelatest(Main.procs)
p == 1 && continue
@async invokelatest(Main.remotecall_wait, ()->(require(mod); nothing), p)
end
end
for callback in package_callbacks
invokelatest(callback, mod)
end
Expand Down
14 changes: 6 additions & 8 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,15 @@ end

# TODO: make this bidirectional, so objects can be sent back via the same key
const object_numbers = WeakKeyDict()
const obj_number_salt = Ref(0)
function object_number(@nospecialize(l))
const obj_number_salt = Ref{UInt64}(0)
function object_number(s::AbstractSerializer, @nospecialize(l))
global obj_number_salt, object_numbers
if haskey(object_numbers, l)
return object_numbers[l]
end
# a hash function that always gives the same number to the same
# object on the same machine, and is unique over all machines.
ln = obj_number_salt[]+(UInt64(myid())<<44)
obj_number_salt[] += 1
ln = obj_number_salt[]
object_numbers[l] = ln
obj_number_salt[] += 1
return ln::UInt64
end

Expand All @@ -398,7 +396,7 @@ end
function serialize(s::AbstractSerializer, meth::Method)
serialize_cycle(s, meth) && return
writetag(s.io, METHOD_TAG)
write(s.io, object_number(meth))
write(s.io, object_number(s, meth))
serialize(s, meth.module)
serialize(s, meth.name)
serialize(s, meth.file)
Expand Down Expand Up @@ -476,7 +474,7 @@ end
function serialize(s::AbstractSerializer, t::TypeName)
serialize_cycle(s, t) && return
writetag(s.io, TYPENAME_TAG)
write(s.io, object_number(t))
write(s.io, object_number(s, t))
serialize_typename(s, t)
end

Expand Down
2 changes: 1 addition & 1 deletion base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ Base.require(:Printf)
@deprecate_binding Mmap root_module(:Mmap) true ", run `using Mmap` instead"
@deprecate_binding Profile root_module(:Profile) true ", run `using Profile` instead"
@deprecate_binding Dates root_module(:Dates) true ", run `using Dates` instead"
# @deprecate_binding Distributed root_module(:Distributed) true ", run `using Distributed` instead"
@deprecate_binding Distributed root_module(:Distributed) true ", run `using Distributed` instead"
end

empty!(LOAD_PATH)
Expand Down
1 change: 1 addition & 0 deletions doc/src/stdlib/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dates.md
unicode.md
iterativeeigensolvers.md
printf.md
distributed.md
19 changes: 16 additions & 3 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ export
interrupt,
launch,
manage,
# myid, # accessed via Base
# nprocs, # accessed via Base
myid,
nprocs,
nworkers,
pmap,
procs,
Expand Down Expand Up @@ -79,6 +79,19 @@ include("pmap.jl")
include("managers.jl") # LocalManager and SSHManager
include("precompile.jl")

__init__() = init_parallel()
function _require_callback(mod::Symbol)
if Base.toplevel_load[] && myid() == 1 && nprocs() > 1
# broadcast top-level import/using from node 1 (only)
@sync for p in procs()
p == 1 && continue
@async remotecall_wait(()->(Base.require(mod); nothing), p)
end
end
end

function __init__()
push!(Base.package_callbacks, _require_callback)
init_parallel()
end

end
23 changes: 19 additions & 4 deletions stdlib/Distributed/src/clusterserialize.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Base.Serializer: object_number, serialize_cycle, deserialize_cycle, writetag,
using Base.Serializer: serialize_cycle, deserialize_cycle, writetag,
__deserialized_types__, serialize_typename, deserialize_typename,
TYPENAME_TAG, object_numbers, reset_state, serialize_type
TYPENAME_TAG, reset_state, serialize_type

import Base.Serializer: lookup_object_number, remember_object
import Base.Serializer: object_number, lookup_object_number, remember_object

mutable struct ClusterSerializer{I<:IO} <: AbstractSerializer
io::I
Expand All @@ -26,6 +26,21 @@ mutable struct ClusterSerializer{I<:IO} <: AbstractSerializer
end
ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io)

const object_numbers = WeakKeyDict()
const obj_number_salt = Ref(0)
function object_number(s::ClusterSerializer, @nospecialize(l))
global obj_number_salt, object_numbers
if haskey(object_numbers, l)
return object_numbers[l]
end
# a hash function that always gives the same number to the same
# object on the same machine, and is unique over all machines.
ln = obj_number_salt[]+(UInt64(myid())<<44)
obj_number_salt[] += 1
object_numbers[l] = ln
return ln::UInt64
end

const known_object_data = Dict{UInt64,Any}()

function lookup_object_number(s::ClusterSerializer, n::UInt64)
Expand Down Expand Up @@ -61,7 +76,7 @@ function serialize(s::ClusterSerializer, t::TypeName)
serialize_cycle(s, t) && return
writetag(s.io, TYPENAME_TAG)

identifier = object_number(t)
identifier = object_number(s, t)
send_whole = !(identifier in s.tn_obj_sent)
serialize(s, send_whole)
write(s.io, identifier)
Expand Down
2 changes: 2 additions & 0 deletions stdlib/Test/src/Test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export detect_ambiguities, detect_unbound_args
export GenericString, GenericSet, GenericDict, GenericArray
export guardsrand

import Distributed: myid

#-----------------------------------------------------------------------

# Backtrace utility functions
Expand Down

0 comments on commit e1a0a70

Please sign in to comment.