Skip to content

Commit

Permalink
Support multiple different progress modes
Browse files Browse the repository at this point in the history
In order to reliably and timely make progress within UCX the user is
responsible for occasionally call `progess` on the UCX worker.
Originally I used a Julia `Timer` object to gurantee progress especially
in the context of asymmetric communication, e.g. active messages.
The `Timer` object would trigger ever millisecond resulting in a much
higher latency, following that I implemented the polling interface using
the WAKEUP feature, but that turns of support for shared memory openucx/ucx#5322
and turned out to have relativly high overhead in a pure latency test
on the order of ~20microseconds. I experimented with two other modes
(1) the busy waiting mode, but that is using unfair scheduling and might
livelock and libuv `Idle`. Idle callbacks are a bit odd, but seem to work well
everytime Julia ticks the event loop libuv will call the progress function.
The performanc of busy waiting seems to degrade with multiple threads,
while the idler performs well, but I have not yet performed a whole system
comparision.
  • Loading branch information
vchuravy committed Mar 3, 2021
1 parent d06ae15 commit f6d1c47
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 81 deletions.
220 changes: 146 additions & 74 deletions src/UCX.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ using Sockets: InetAddr, IPv4, listenany
using Random
import FunctionWrappers: FunctionWrapper

const PROGRESS_MODE = Ref(:idling)

include("api.jl")

function __init__()
Expand All @@ -19,6 +21,17 @@ function __init__()
ccall((:ucs_debug_disable_signals, API.libucs), Cvoid, ())

@assert version() >= VersionNumber(API.UCP_API_MAJOR, API.UCP_API_MINOR)
mode = get(ENV, "JLUCX_PROGRESS_MODE", "idling")
if mode == "busy"
PROGRESS_MODE[] = :busy
elseif mode == "idling"
PROGRESS_MODE[] = :idling
elseif mode == "polling"
PROGRESS_MODE[] = :polling
else
error("JLUCX_PROGRESS_MODE set to unkown progress mode: $mode")
end
@debug "UCX progress mode" mode
end

function memzero!(ref::Ref)
Expand Down Expand Up @@ -155,17 +168,23 @@ mutable struct UCXContext
handle::API.ucp_context_h
config::Dict{Symbol, String}

function UCXContext(; kwargs...)
function UCXContext(wakeup = PROGRESS_MODE[] == :polling; kwargs...)
field_mask = API.UCP_PARAM_FIELD_FEATURES

# We always request UCP_FEATURE_WAKEUP even when in blocking mode
# Note: ucx-py always request UCP_FEATURE_WAKEUP even when in blocking mode
# See <https://github.com/rapidsai/ucx-py/pull/377>
# There is also AMO32 & AMO64 (atomic), RMA, and AM
# But FEATURE_WAKEUP disables shared memory, see https://github.com/openucx/ucx/issues/5322
# and https://github.com/JuliaParallel/UCX.jl/issues/36

# There is also AMO32 & AMO64 (atomic), RMA,
features = API.UCP_FEATURE_TAG |
API.UCP_FEATURE_WAKEUP |
API.UCP_FEATURE_STREAM |
API.UCP_FEATURE_AM

if wakeup
features |= API.UCP_FEATURE_WAKEUP
end

params = Ref{API.ucp_params}()
memzero!(params)
set!(params, :field_mask, field_mask)
Expand Down Expand Up @@ -207,16 +226,23 @@ function info(ucx::UCXContext)
str
end

# ucp_context_query
function query(ctx::UCXContext)
r_attr = Ref{API.ucp_context_attr_t}()
API.ucp_context_query(ctx, r_attr)
r_attr[]
end

mutable struct UCXWorker
handle::API.ucp_worker_h
fd::RawFD
context::UCXContext
inflight::IdDict{Any, Nothing} # IdSet -- Can't use UCXRequest since that is defined after
am_handlers::Dict{UInt16, Any}
in_amhandler::Vector{Bool}
open::Bool
mode::Symbol

function UCXWorker(context::UCXContext)
function UCXWorker(context::UCXContext; progress_mode=PROGRESS_MODE[])
field_mask = API.UCP_WORKER_PARAM_FIELD_THREAD_MODE
thread_mode = API.UCS_THREAD_MODE_MULTI

Expand All @@ -225,18 +251,24 @@ mutable struct UCXWorker
set!(params, :field_mask, field_mask)
set!(params, :thread_mode, thread_mode)

@debug "Creating UCXWorker" thread_mode progress_mode

r_handle = Ref{API.ucp_worker_h}()
@check API.ucp_worker_create(context, params, r_handle)
handle = r_handle[]

# Instead of Timer
r_fd = Ref{API.Cint}()
@check API.ucp_worker_get_efd(handle, r_fd)
fd = Libc.RawFD(r_fd[])
# TODO: Verify that UCXContext has been created with WAKEUP
if progress_mode === :polling
r_fd = Ref{API.Cint}()
@check API.ucp_worker_get_efd(handle, r_fd)
fd = Libc.RawFD(r_fd[])
else
fd = RawFD(-1)
end

worker = new(handle, fd, context, IdDict{Any,Nothing}(), Dict{UInt16, Any}())
worker = new(handle, fd, context, IdDict{Any,Nothing}(), Dict{UInt16, Any}(), fill(false, Base.Threads.nthreads()), true, progress_mode)
finalizer(worker) do worker
worker.fd = RawFD(-1)
worker.open = false
@assert isempty(worker.inflight)
API.ucp_worker_destroy(worker)
end
Expand All @@ -245,6 +277,9 @@ mutable struct UCXWorker
end
Base.unsafe_convert(::Type{API.ucp_worker_h}, worker::UCXWorker) = worker.handle

ispolling(worker::UCXWorker) = worker.fd != RawFD(-1)
progress_mode(worker::UCXWorker) = worker.mode

"""
progress(worker::UCXWorker)
Expand All @@ -254,60 +289,103 @@ and call callbacks.
Returns `true` if progress was made, `false` if no work was waiting.
"""
function progress(worker::UCXWorker)
API.ucp_worker_progress(worker) != 0
tid = Base.Threads.threadid()
if worker.in_amhandler[tid]
@debug """
UCXWorker is processing a Active Message on this thread
Calling `progress` is not permitted and leads to recursion.
""" tid exception=(UCXException(API.UCS_ERR_NO_PROGRESS), catch_backtrace())
yield()
return false
else
return API.ucp_worker_progress(worker) != 0
end
end

function fence(worker::UCXWorker)
@check API.ucp_worker_fence(worker)
end

function lock_am(worker::UCXWorker)
tid = Base.Threads.threadid()
if worker.in_amhandler[tid]
error("UCXWorker already in AMHandler on this thread! Concurrency violation.")
end
worker.in_amhandler[tid] = true
end

function unlock_am(worker::UCXWorker)
tid = Base.Threads.threadid()
if !worker.in_amhandler[tid]
error("UCXWorker is not in AMHandler on this thread! Concurrency violation.")
end
worker.in_amhandler[tid] = false
end

include("idle.jl")

import FileWatching: poll_fd
function Base.wait(worker::UCXWorker)
# Approach 0: Timer
# Turns out to be very slow
# Approach 2: Busy loop
# Unfair scheduling :/
# while progress(worker) && isopen(worker)
# yield()
# end
# Approach 3
# About 2x slower on small messages than busy
while isopen(worker)
if progress(worker)
# progress was made
yield()
continue
end
if ispolling(worker)
@assert progress_mode(worker) === :polling
# Use fd_poll to suspend worker when no progress is being made
while isopen(worker)
if progress(worker)
# progress was made
yield()
continue
end

# Wait for poll
status = API.ucp_worker_arm(worker)
if status == API.UCS_OK
if !isopen(worker)
error("UCXWorker already closed")
# Wait for poll
status = API.ucp_worker_arm(worker)
if status == API.UCS_OK
if !isopen(worker)
error("UCXWorker already closed")
end
# wait for events
poll_fd(worker.fd; writable=true, readable=true)
progress(worker)
break
elseif status == API.UCS_ERR_BUSY
# could not arm, need to progress more
continue
else
@check status
end
# wait for events
poll_fd(worker.fd; writable=true, readable=true)
end
elseif progress_mode(worker) === :idling
idler = UvWorkerIdle(worker)
wait(idler)
close(idler)
elseif progress_mode(worker) === :busy
progress(worker)
while isopen(worker)
# Temporary solution before we have gc transition support in codegen.
# XXX: `yield()` is supposed to be a safepoint, but without this we easily
# deadlock in a multithreaded test.
ccall(:jl_gc_safepoint, Cvoid, ())
yield()
progress(worker)
break
elseif status == API.UCS_ERR_BUSY
# could not arm, need to progress more
continue
else
@check status
end
else
throw(UCXException(API.UCS_ERR_UNREACHABLE))
end
end

function Base.notify(worker::UCXWorker)
@check API.ucp_worker_signal(worker)
# If we don't use polling, we can't signal the worker
if ispolling(worker)
@check API.ucp_worker_signal(worker)
end
end

function Base.isopen(worker::UCXWorker)
worker.fd != RawFD(-1)
worker.open
end

function Base.close(worker::UCXWorker)
worker.fd = RawFD(-1)
@debug "Close worker"
worker.open = false
notify(worker)
end

Expand All @@ -334,16 +412,20 @@ end
function am_recv_callback(arg::Ptr{Cvoid}, header::Ptr{Cvoid}, header_length::Csize_t, data::Ptr{Cvoid}, length::Csize_t, param::Ptr{API.ucp_am_recv_param_t})::API.ucs_status_t
handler = Base.unsafe_pointer_to_objref(arg)::AMHandler
try
lock_am(handler.worker)
return handler.func(handler.worker, header, header_length, data, length, param)::API.ucs_status_t
catch err
showerror(stderr, err, catch_backtrace())
return API.UCS_OK
finally
unlock_am(handler.worker)
end
end

AMHandler(f, w::UCXWorker, id) = AMHandler(w, f, id)

function AMHandler(worker::UCXWorker, func, id)
@debug "Installing AMHandler on worker" func id
handler = AMHandler(func, worker)
worker.am_handlers[id] = handler # root handler in worker

Expand Down Expand Up @@ -620,39 +702,29 @@ end

Base.notify(req::UCXRequest) = notify(req.event)
function Base.wait(req::UCXRequest)
## 1: Full suspend
# Since we are using polling, we can't suspend fully
# wait(req.event)
# @check req.status

## 2: Timer Based
# start a timer that regularly makes progress
# Turns out this makes `benchmarks/ucx` 5x slower
# progress(req.worker)
# timer = Timer(0, interval=0.001) do t # 0.001 smallest interval
# progress(req.worker)
# end
# @assert ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), timer) > 0
# wait(req.event)
# close(timer)
# @check req.status

## 3: Busy loop
# - It can be that only due to us calling progress we will trigger
# the Event.
progress(req.worker)
ev = req.event
while true
lock(ev.notify)
if ev.set
break
if progress_mode(req.worker) === :busy
# The worker will make independent progress
# and if we don't suspend here we will get a livelock.
wait(req.event)
@check req.status
else
# Request busy loop
# It can be that only due to us calling progress we will trigger
# the Event, and the Worker will suspend due to the use of polling.
progress(req.worker)
ev = req.event
while true
lock(ev.notify)
if ev.set
break
end
unlock(ev.notify)
yield()
progress(req.worker)
end
unlock(ev.notify)
yield()
progress(req.worker)
@check req.status
end
unlock(ev.notify)
@check req.status
end

##
Expand Down
Loading

0 comments on commit f6d1c47

Please sign in to comment.