Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move nprocs and myid to Distributed #25139

Merged
merged 2 commits into from
Dec 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ function process_options(opts::JLOptions)
global PROGRAM_FILE = arg_is_program ? popfirst!(ARGS) : ""

# Load Distributed module only if any of the Distributed options have been specified.
if (opts.worker == 1) || (opts.nprocs > 0) || (opts.machinefile != C_NULL)
distributed_mode = (opts.worker == 1) || (opts.nprocs > 0) || (opts.machinefile != C_NULL)
if distributed_mode
eval(Main, :(using Distributed))
invokelatest(Main.Distributed.process_opts, opts)
end
Expand All @@ -303,9 +304,10 @@ function process_options(opts::JLOptions)
println()
elseif cmd == 'L'
# load file immediately on all processors
if nprocs() == 1
if !distributed_mode
include(Main, arg)
else
# TODO: Move this logic to Distributed and use a callback
@sync for p in invokelatest(Main.procs)
@async invokelatest(Main.remotecall_wait, include, p, Main, arg)
end
Expand Down
2 changes: 2 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,8 @@ export conv, conv2, deconv, filt, filt!, xcorr
@deprecate_moved interrupt "Distributed" true true
@deprecate_moved launch "Distributed" true true
@deprecate_moved manage "Distributed" true true
@deprecate_moved myid "Distributed" true true
@deprecate_moved nprocs "Distributed" true true
@deprecate_moved nworkers "Distributed" true true
@deprecate_moved pmap "Distributed" true true
@deprecate_moved procs "Distributed" true true
Expand Down
7 changes: 1 addition & 6 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1241,9 +1241,4 @@ export
spzeros,
rowvals,
nzrange,
nnz,

# Minimal set of Distributed exports - useful for a program to check if running
# in distributed mode or not.
myid,
nprocs
nnz
10 changes: 0 additions & 10 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,6 @@ end
# require always works in Main scope and loads files from node 1
const toplevel_load = Ref(true)

myid() = isdefined(Main, :Distributed) ? invokelatest(Main.Distributed.myid) : 1
nprocs() = isdefined(Main, :Distributed) ? invokelatest(Main.Distributed.nprocs) : 1

"""
require(module::Symbol)

Expand All @@ -317,13 +314,6 @@ function require(mod::Symbol)
if !root_module_exists(mod)
_require(mod)
# After successfully loading, notify downstream consumers
if toplevel_load[] && myid() == 1 && nprocs() > 1
# broadcast top-level import/using from node 1 (only)
@sync for p in invokelatest(Main.procs)
p == 1 && continue
@async invokelatest(Main.remotecall_wait, ()->(require(mod); nothing), p)
end
end
for callback in package_callbacks
invokelatest(callback, mod)
end
Expand Down
14 changes: 6 additions & 8 deletions base/serialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,15 @@ end

# TODO: make this bidirectional, so objects can be sent back via the same key
const object_numbers = WeakKeyDict()
const obj_number_salt = Ref(0)
function object_number(@nospecialize(l))
const obj_number_salt = Ref{UInt64}(0)
function object_number(s::AbstractSerializer, @nospecialize(l))
global obj_number_salt, object_numbers
if haskey(object_numbers, l)
return object_numbers[l]
end
# a hash function that always gives the same number to the same
# object on the same machine, and is unique over all machines.
ln = obj_number_salt[]+(UInt64(myid())<<44)
obj_number_salt[] += 1
ln = obj_number_salt[]
object_numbers[l] = ln
obj_number_salt[] += 1
return ln::UInt64
end

Expand All @@ -398,7 +396,7 @@ end
function serialize(s::AbstractSerializer, meth::Method)
serialize_cycle(s, meth) && return
writetag(s.io, METHOD_TAG)
write(s.io, object_number(meth))
write(s.io, object_number(s, meth))
serialize(s, meth.module)
serialize(s, meth.name)
serialize(s, meth.file)
Expand Down Expand Up @@ -476,7 +474,7 @@ end
function serialize(s::AbstractSerializer, t::TypeName)
serialize_cycle(s, t) && return
writetag(s.io, TYPENAME_TAG)
write(s.io, object_number(t))
write(s.io, object_number(s, t))
serialize_typename(s, t)
end

Expand Down
2 changes: 1 addition & 1 deletion base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ Base.require(:Printf)
@deprecate_binding Mmap root_module(:Mmap) true ", run `using Mmap` instead"
@deprecate_binding Profile root_module(:Profile) true ", run `using Profile` instead"
@deprecate_binding Dates root_module(:Dates) true ", run `using Dates` instead"
# @deprecate_binding Distributed root_module(:Distributed) true ", run `using Distributed` instead"
@deprecate_binding Distributed root_module(:Distributed) true ", run `using Distributed` instead"
end

empty!(LOAD_PATH)
Expand Down
1 change: 1 addition & 0 deletions doc/src/stdlib/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dates.md
unicode.md
iterativeeigensolvers.md
printf.md
distributed.md
19 changes: 16 additions & 3 deletions stdlib/Distributed/src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ export
interrupt,
launch,
manage,
# myid, # accessed via Base
# nprocs, # accessed via Base
myid,
nprocs,
nworkers,
pmap,
procs,
Expand Down Expand Up @@ -79,6 +79,19 @@ include("pmap.jl")
include("managers.jl") # LocalManager and SSHManager
include("precompile.jl")

__init__() = init_parallel()
function _require_callback(mod::Symbol)
if Base.toplevel_load[] && myid() == 1 && nprocs() > 1
# broadcast top-level import/using from node 1 (only)
@sync for p in procs()
p == 1 && continue
@async remotecall_wait(()->(Base.require(mod); nothing), p)
end
end
end

function __init__()
push!(Base.package_callbacks, _require_callback)
init_parallel()
end

end
23 changes: 19 additions & 4 deletions stdlib/Distributed/src/clusterserialize.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Base.Serializer: object_number, serialize_cycle, deserialize_cycle, writetag,
using Base.Serializer: serialize_cycle, deserialize_cycle, writetag,
__deserialized_types__, serialize_typename, deserialize_typename,
TYPENAME_TAG, object_numbers, reset_state, serialize_type
TYPENAME_TAG, reset_state, serialize_type

import Base.Serializer: lookup_object_number, remember_object
import Base.Serializer: object_number, lookup_object_number, remember_object

mutable struct ClusterSerializer{I<:IO} <: AbstractSerializer
io::I
Expand All @@ -26,6 +26,21 @@ mutable struct ClusterSerializer{I<:IO} <: AbstractSerializer
end
ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io)

const object_numbers = WeakKeyDict()
const obj_number_salt = Ref(0)
function object_number(s::ClusterSerializer, @nospecialize(l))
global obj_number_salt, object_numbers
if haskey(object_numbers, l)
return object_numbers[l]
end
# a hash function that always gives the same number to the same
# object on the same machine, and is unique over all machines.
ln = obj_number_salt[]+(UInt64(myid())<<44)
obj_number_salt[] += 1
object_numbers[l] = ln
return ln::UInt64
end

const known_object_data = Dict{UInt64,Any}()

function lookup_object_number(s::ClusterSerializer, n::UInt64)
Expand Down Expand Up @@ -61,7 +76,7 @@ function serialize(s::ClusterSerializer, t::TypeName)
serialize_cycle(s, t) && return
writetag(s.io, TYPENAME_TAG)

identifier = object_number(t)
identifier = object_number(s, t)
send_whole = !(identifier in s.tn_obj_sent)
serialize(s, send_whole)
write(s.io, identifier)
Expand Down
4 changes: 3 additions & 1 deletion stdlib/Test/src/Test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ export @testset
export @inferred
export detect_ambiguities, detect_unbound_args
export GenericString, GenericSet, GenericDict, GenericArray
export guardsrand
export guardsrand, TestSetException

import Distributed: myid

#-----------------------------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions test/compile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ let
"""
__precompile__(true)
module $ModuleA
import Distributed: myid
export f
f() = myid()
end
Expand Down