Skip to content

Commit

Permalink
Merge pull request #5687 from JuliaLang/jn/no_scheduler
Browse files Browse the repository at this point in the history
remove Scheduler
  • Loading branch information
JeffBezanson committed Feb 8, 2014
2 parents 3818db4 + 7923ccf commit 6a080ea
Show file tree
Hide file tree
Showing 18 changed files with 305 additions and 317 deletions.
1 change: 1 addition & 0 deletions base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
# last::Task
# storage::Any
# consumers
# started::Bool
# done::Bool
# runnable::Bool
# end
Expand Down
10 changes: 2 additions & 8 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -325,21 +325,15 @@ function init_load_path()
push!(LOAD_PATH,abspath(JULIA_HOME,"..","share","julia","site",vers))
end

function init_sched()
global const Workqueue = Any[]
end
global const Workqueue = Any[]

function init_head_sched()
# start in "head node" mode
global const Scheduler = Task(()->event_loop(true), 1024*1024)
global PGRP
global LPROC
LPROC.id = 1
assert(length(PGRP.workers) == 0)
register_worker(LPROC)
# make scheduler aware of current (root) task
unshift!(Workqueue, roottask)
yield()
end

function init_profiler()
Expand Down Expand Up @@ -376,10 +370,10 @@ function _start()
LinAlg.init()
GMP.gmp_init()
init_profiler()
start_gc_msgs_task()

#atexit(()->flush(STDOUT))
try
init_sched()
any(a->(a=="--worker"), ARGS) || init_head_sched()
init_load_path()
(quiet,repl,startup,color_set,history) = process_options(copy(ARGS))
Expand Down
20 changes: 0 additions & 20 deletions base/inference.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1093,26 +1093,6 @@ f_argnames(ast) =

is_rest_arg(arg::ANY) = (ccall(:jl_is_rest_arg,Int32,(Any,), arg) != 0)

# function typeinf_task(caller)
# result = ()
# while true
# (caller, args) = yieldto(caller, result)
# result = typeinf_ext_(args...)
# end
# end

#Inference_Task = Task(typeinf_task, 2097152)
#yieldto(Inference_Task, current_task())

#function typeinf_ext(linfo, atypes, sparams, cop)
#C = current_task()
#args = (linfo, atypes, sparams, cop)
#if is(C, Inference_Task)
# return typeinf_ext_(args...)
#end
#return yieldto(Inference_Task, C, args)
#end

function typeinf_ext(linfo, atypes::ANY, sparams::ANY, def)
global inference_stack
last = inference_stack
Expand Down
181 changes: 11 additions & 170 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -394,24 +394,6 @@ type RemoteRef
RemoteRef(w::Worker) = RemoteRef(w.id)
RemoteRef() = RemoteRef(myid())

global WeakRemoteRef
function WeakRemoteRef(w, wh, id)
return new(w, wh, id)
end

function WeakRemoteRef(pid::Integer)
rr = WeakRemoteRef(pid, myid(), REQ_ID)
REQ_ID += 1
if mod(REQ_ID,200) == 0
gc()
end
rr
end

WeakRemoteRef(w::LocalProcess) = WeakRemoteRef(myid())
WeakRemoteRef(w::Worker) = WeakRemoteRef(w.id)
WeakRemoteRef() = WeakRemoteRef(myid())

global next_id
next_id() = (id=(myid(),REQ_ID); REQ_ID+=1; id)
end
Expand Down Expand Up @@ -467,7 +449,14 @@ function del_clients(pairs::(Any,Any)...)
end
end

any_gc_flag = false
any_gc_flag = Condition()
function start_gc_msgs_task()
enq_work(Task(()->
while true
wait(any_gc_flag)
flush_gc_msgs()
end))
end

function send_del_client(rr::RemoteRef)
if rr.where == myid()
Expand All @@ -480,7 +469,7 @@ function send_del_client(rr::RemoteRef)
w = worker_from_id(rr.where)
push!(w.del_msgs, (rr2id(rr), myid()))
w.gcflag = true
global any_gc_flag = true
notify(any_gc_flag)
end
end

Expand Down Expand Up @@ -508,7 +497,7 @@ function send_add_client(rr::RemoteRef, i)
#println("$(myid()) adding $((rr2id(rr), i)) for $(rr.where)")
push!(w.add_msgs, (rr2id(rr), i))
w.gcflag = true
global any_gc_flag = true
notify(any_gc_flag)
end
end

Expand Down Expand Up @@ -735,35 +724,6 @@ end
take_ref(rid) = take(lookup_ref(rid))
take(rr::RemoteRef) = call_on_owner(take_ref, rr)

## work queue ##

function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
unshift!(Workqueue, t)
end

function perform_work()
perform_work(pop!(Workqueue))
end

function perform_work(t::Task)
if !istaskstarted(t)
# starting new task
yieldto(t)
else
# continuing interrupted work item
arg = t.result
t.result = nothing
t.runnable = true
yieldto(t, arg)
end
t = current_task().last
if t.runnable
# still runnable; return to queue
enq_work(t)
end
end

function deliver_result(sock::IO, msg, oid, value)
#print("$(myid()) sending result $oid\n")
if is(msg,:call_fetch)
Expand Down Expand Up @@ -956,11 +916,9 @@ function start_worker(out::IO)

ccall(:jl_install_sigint_handler, Void, ())

global const Scheduler = current_task()

try
check_master_connect(60.0)
event_loop(false)
wait()
catch err
print(STDERR, "unhandled exception on $(myid()): $(err)\nexiting.\n")
end
Expand Down Expand Up @@ -1216,37 +1174,6 @@ end

## higher-level functions: spawn, pmap, pfor, etc. ##

sync_begin() = task_local_storage(:SPAWNS, ({}, get(task_local_storage(), :SPAWNS, ())))

function sync_end()
spawns = get(task_local_storage(), :SPAWNS, ())
if is(spawns,())
error("sync_end() without sync_begin()")
end
refs = spawns[1]
task_local_storage(:SPAWNS, spawns[2])
for r in refs
wait(r)
end
end

macro sync(block)
quote
sync_begin()
v = $(esc(block))
sync_end()
v
end
end

function sync_add(r)
spawns = get(task_local_storage(), :SPAWNS, ())
if !is(spawns,())
push!(spawns[1], r)
end
r
end

let nextidx = 1
global chooseproc
function chooseproc(thunk::Function)
Expand Down Expand Up @@ -1297,23 +1224,6 @@ macro fetchfrom(p, expr)
:(remotecall_fetch($(esc(p)), $(esc(expr))))
end

function spawnlocal(thunk)
t = Task(thunk)
sync_add(t)
enq_work(t)
t
end

macro async(expr)
expr = localize_vars(:(()->($expr)), false)
:(spawnlocal($(esc(expr))))
end

macro spawnlocal(expr)
warn_once("@spawnlocal is deprecated, use @async instead.")
:(@async $(esc(expr)))
end

function at_each(f, args...)
for w in PGRP.workers
sync_add(remotecall(w.id, f, args...))
Expand Down Expand Up @@ -1531,75 +1441,6 @@ end
# 2/(nc/niter)
# end

## event processing, I/O and work scheduling ##

function yield(args...)
ct = current_task()
# preserve Task.last across calls to the scheduler
prev = ct.last
v = yieldto(Scheduler, args...)
ct.last = prev
return v
end

function pause()
@unix_only ccall(:pause, Void, ())
@windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff)
end

function event_loop(isclient)
iserr, lasterr, bt = false, nothing, {}
while true
try
if iserr
display_error(lasterr, bt)
println(STDERR)
iserr, lasterr, bt = false, nothing, {}
else
while true
if isempty(Workqueue)
if any_gc_flag
flush_gc_msgs()
end
c = process_events(true)
if c==0 && eventloop()!=C_NULL && isempty(Workqueue) && !any_gc_flag
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
perform_work()
process_events(false)
end
end
end
catch err
if iserr
ccall(:jl_, Void, (Any,), (
"\n!!!An ERROR occurred while printing the last error!!!\n",
lasterr,
bt
))
end
iserr, lasterr = true, err
bt = catch_backtrace()
if isclient && isa(err,InterruptException)
# root task is waiting for something on client. allow C-C
# to interrupt.
interrupt_waiting_task(roottask,err)
iserr, lasterr = false, nothing
end
end
end
end

# force a task to stop waiting with an exception
function interrupt_waiting_task(t::Task, err)
if !t.runnable
t.exception = err
enq_work(t)
end
end

function check_master_connect(timeout)
# If we do not have at least process 1 connect to us within timeout
Expand Down
1 change: 0 additions & 1 deletion base/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ precompile(start, (Dict{Any,Any},))
precompile(perform_work, ())
precompile(isempty, (Array{Any,1},))
precompile(getindex, (Dict{Any,Any}, Int32))
precompile(event_loop, (Bool,))
precompile(_start, ())
precompile(process_options, (Array{Any,1},))
precompile(run_repl, ())
Expand Down
43 changes: 13 additions & 30 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -723,50 +723,33 @@ end
write(s::TTY, p::Ptr, nb::Integer) = @uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb::Ptr{Void})

function write(s::AsyncStream, b::Uint8)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb::Ptr{Void})
end
@uv_write 1 ccall(:jl_putc_copy, Int32, (Uint8, Ptr{Void}, Ptr{Void}, Ptr{Void}), b, handle(s), uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return 1
end
function write(s::AsyncStream, c::Char)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb::Ptr{Void})
end
@uv_write utf8sizeof(c) ccall(:jl_pututf8_copy, Int32, (Ptr{Void},Uint32, Ptr{Void}, Ptr{Void}), handle(s), c, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return utf8sizeof(c)
end
function write{T}(s::AsyncStream, a::Array{T})
if isbits(T)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
n = uint(length(a)*sizeof(T))
@uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
write!(s,copy(a))
end
n = uint(length(a)*sizeof(T))
@uv_write n ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), a, n, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return int(length(a)*sizeof(T))
else
check_open(s)
invoke(write,(IO,Array),s,a)
end
end
function write(s::AsyncStream, p::Ptr, nb::Integer)
if isdefined(Main.Base,:Scheduler) && current_task() != Main.Base.Scheduler
@uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
else
check_open(s)
ccall(:jl_write, Uint, (Ptr{Void},Ptr{Void},Uint), handle(s), p, uint(nb))
end
@uv_write nb ccall(:jl_write_no_copy, Int32, (Ptr{Void}, Ptr{Void}, Uint, Ptr{Void}, Ptr{Void}), handle(s), p, nb, uvw, uv_jl_writecb_task::Ptr{Void})
uv_req_set_data(uvw,current_task())
wait()
return int(nb)
end

Expand Down
2 changes: 1 addition & 1 deletion base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ include("process.jl")
include("multimedia.jl")
importall .Multimedia
reinit_stdio()
ccall(:jl_get_uv_hooks, Void, ())
ccall(:jl_get_uv_hooks, Void, (Cint,), 0)
include("grisu.jl")
import .Grisu.print_shortest
include("printf.jl")
Expand Down
Loading

0 comments on commit 6a080ea

Please sign in to comment.