Skip to content

Commit

Permalink
cleanup and code reorg: move scheduler code to task.jl
Browse files Browse the repository at this point in the history
now multi.jl deals only with distributed parallelism

remove no-longer-used WeakRemoteRef
  • Loading branch information
JeffBezanson committed Feb 8, 2014
1 parent fc2b480 commit 7923ccf
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 212 deletions.
20 changes: 0 additions & 20 deletions base/inference.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1093,26 +1093,6 @@ f_argnames(ast) =

is_rest_arg(arg::ANY) = (ccall(:jl_is_rest_arg,Int32,(Any,), arg) != 0)

# function typeinf_task(caller)
# result = ()
# while true
# (caller, args) = yieldto(caller, result)
# result = typeinf_ext_(args...)
# end
# end

#Inference_Task = Task(typeinf_task, 2097152)
#yieldto(Inference_Task, current_task())

#function typeinf_ext(linfo, atypes, sparams, cop)
#C = current_task()
#args = (linfo, atypes, sparams, cop)
#if is(C, Inference_Task)
# return typeinf_ext_(args...)
#end
#return yieldto(Inference_Task, C, args)
#end

function typeinf_ext(linfo, atypes::ANY, sparams::ANY, def)
global inference_stack
last = inference_stack
Expand Down
154 changes: 0 additions & 154 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -394,24 +394,6 @@ type RemoteRef
RemoteRef(w::Worker) = RemoteRef(w.id)
RemoteRef() = RemoteRef(myid())

global WeakRemoteRef
function WeakRemoteRef(w, wh, id)
return new(w, wh, id)
end

function WeakRemoteRef(pid::Integer)
rr = WeakRemoteRef(pid, myid(), REQ_ID)
REQ_ID += 1
if mod(REQ_ID,200) == 0
gc()
end
rr
end

WeakRemoteRef(w::LocalProcess) = WeakRemoteRef(myid())
WeakRemoteRef(w::Worker) = WeakRemoteRef(w.id)
WeakRemoteRef() = WeakRemoteRef(myid())

global next_id
next_id() = (id=(myid(),REQ_ID); REQ_ID+=1; id)
end
Expand Down Expand Up @@ -742,36 +724,6 @@ end
take_ref(rid) = take(lookup_ref(rid))
take(rr::RemoteRef) = call_on_owner(take_ref, rr)

## work queue ##

function enq_work(t::Task)
ccall(:uv_stop,Void,(Ptr{Void},),eventloop())
unshift!(Workqueue, t)
end

function perform_work()
perform_work(pop!(Workqueue))
end

function perform_work(t::Task)
ct = current_task()
if ct.runnable && t !== ct
# still runnable; ensure we will return here
enq_work(ct)
end
if !istaskstarted(t)
# starting new task
result = yieldto(t)
else
# continuing interrupted work item
arg = t.result
t.result = nothing
t.runnable = true
result = yieldto(t, arg)
end
return result
end

function deliver_result(sock::IO, msg, oid, value)
#print("$(myid()) sending result $oid\n")
if is(msg,:call_fetch)
Expand Down Expand Up @@ -1222,37 +1174,6 @@ end

## higher-level functions: spawn, pmap, pfor, etc. ##

sync_begin() = task_local_storage(:SPAWNS, ({}, get(task_local_storage(), :SPAWNS, ())))

function sync_end()
spawns = get(task_local_storage(), :SPAWNS, ())
if is(spawns,())
error("sync_end() without sync_begin()")
end
refs = spawns[1]
task_local_storage(:SPAWNS, spawns[2])
for r in refs
wait(r)
end
end

macro sync(block)
quote
sync_begin()
v = $(esc(block))
sync_end()
v
end
end

function sync_add(r)
spawns = get(task_local_storage(), :SPAWNS, ())
if !is(spawns,())
push!(spawns[1], r)
end
r
end

let nextidx = 1
global chooseproc
function chooseproc(thunk::Function)
Expand Down Expand Up @@ -1303,23 +1224,6 @@ macro fetchfrom(p, expr)
:(remotecall_fetch($(esc(p)), $(esc(expr))))
end

function spawnlocal(thunk)
t = Task(thunk)
sync_add(t)
enq_work(t)
t
end

macro async(expr)
expr = localize_vars(:(()->($expr)), false)
:(spawnlocal($(esc(expr))))
end

macro spawnlocal(expr)
warn_once("@spawnlocal is deprecated, use @async instead.")
:(@async $(esc(expr)))
end

function at_each(f, args...)
for w in PGRP.workers
sync_add(remotecall(w.id, f, args...))
Expand Down Expand Up @@ -1537,64 +1441,6 @@ end
# 2/(nc/niter)
# end

## event processing, I/O and work scheduling ##
in_scheduler = false
yield() = yield_until()
function yield_until(return_test = (t::Task)->t.runnable)
ct = current_task()
# preserve Task.last across calls to the scheduler
prev = ct.last
global in_scheduler
if in_scheduler
# we don't want to execute yield recursively, because
# the return condition would be ill-defined
warn("yielding from inside scheduler callback")
end
try
if isempty(Workqueue) && return_test(ct)
process_events(false)
if isempty(Workqueue) && return_test(ct)
return nothing
end
end
in_scheduler = true
while true
if isempty(Workqueue)
c = process_events(true)
if c==0 && eventloop()!=C_NULL && isempty(Workqueue)
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
in_scheduler = false
result = perform_work()
in_scheduler = true
process_events(false)
if return_test(ct)
return result
end
end
end
finally
in_scheduler = false
ct.last = prev
end
@assert false
end

function pause()
@unix_only ccall(:pause, Void, ())
@windows_only ccall(:Sleep,stdcall, Void, (Uint32,), 0xffffffff)
end

# force a task to stop waiting with an exception
function interrupt_waiting_task(t::Task, err)
if !t.runnable
t.exception = err
enq_work(t)
end
end

function check_master_connect(timeout)
# If we do not have at least process 1 connect to us within timeout
Expand Down
Loading

1 comment on commit 7923ccf

@vtjnash
Copy link
Member

@vtjnash vtjnash commented on 7923ccf Feb 9, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great. The separation is so much more logical now.

Please sign in to comment.