Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency violation on interplay between Distributed and Base.Threads #73

Open
jonas-schulze opened this issue Sep 23, 2020 · 17 comments · Fixed by JuliaLang/julia#37905 or JuliaLang/julia#38134 · May be fixed by #101
Open

Concurrency violation on interplay between Distributed and Base.Threads #73

jonas-schulze opened this issue Sep 23, 2020 · 17 comments · Fixed by JuliaLang/julia#37905 or JuliaLang/julia#38134 · May be fixed by #101
Assignees
Labels
bug Something isn't working

Comments

@jonas-schulze
Copy link
Contributor

I’m working on a distributed pipeline algorithm that uses several stages per worker process. IIRC tasks cannot hop between threads once they’ve been scheduled. Since I want my stages to potentially run in parallel, I tried to create non-sticky tasks by chaining each D.@spawnat with a T.@spawn. However, this setup keeps failing/crashing and I don’t understand why.

I boiled it down to a minimal example:

using Distributed, Base.Threads

const D = Distributed
const T = Threads

pids = addprocs(10)
wids = repeat(pids, inner=2)

conns = map(RemoteChannel, wids)
fst = first(conns)
lst = RemoteChannel()
push!(conns, lst)

@everywhere begin
    function stillepost(i, prev, next)
        message = take!(prev)
        put!(next, message)
        @info "Player $i done"
    end
end

players = []
for i in 1:length(wids)
    w = wids[i]
    c1 = conns[i]
    c2 = conns[i+1]
    p = D.@spawnat w fetch(T.@spawn stillepost(i, c1, c2))
    push!(players, p)
end

game = @async begin
    m1 = "gibberish"
    put!(fst, m1)
    m2 = take!(lst)
    @info "'$m1' turned into '$m2'; well done!"
end

wait.(players)
wait(game)

Player 2 fails with a concurrency violation:

julia> include("stillepost.jl")
[ Info: Player 1 done
ERROR: LoadError: On worker 2:
TaskFailedException:
concurrency violation detected
error at ./error.jl:33
concurrency_violation at ./condition.jl:8
assert_havelock at ./condition.jl:25 [inlined]
assert_havelock at ./condition.jl:48 [inlined]
assert_havelock at ./condition.jl:72 [inlined]
wait at ./condition.jl:102
wait_for_conn at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:193
check_worker_state at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:168
send_msg_ at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:176
send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:134 [inlined]
#remotecall_fetch#143 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:389
remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:386
#remotecall_fetch#146 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:494
put! at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:595 [inlined]
stillepost at /Users/jonas/.../stillepost.jl:18
JuliaLang/julia#3 at ./threadingconstructs.jl:169
wait at ./task.jl:267 [inlined]

Am I holding it wrong?

Julia Version 1.5.1
Commit 697e782ab8 (2020-08-25 20:08 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin19.5.0)
  CPU: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, skylake)
Environment:
  JULIA_NUM_THREADS = 4
  JULIA_PROJECT = @.

See also this post on discourse. foobar_lv2 suggested this might be a bug in Distributed.

@jonas-schulze
Copy link
Contributor Author

The error persists on version 1.5.2 and here is the output for

Julia Version 1.6.0-DEV.1029
Commit f26a8c352f (2020-09-23 23:37 UTC)

which is the current version of julia-nightly on Homebrew:

[ Info: Player 1 done
ERROR: LoadError: On worker 2:
TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:303 [inlined]
 [2] fetch
   @ ./task.jl:318 [inlined]
 [3] JuliaLang/julia#2
   @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:87
 [4] JuliaLang/julia#103
   @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:290
 [5] run_work_thunk
   @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:79
 [6] run_work_thunk
   @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:88
 [7] JuliaLang/julia#96
   @ ./task.jl:392

    nested task error:
Stacktrace:
  [1] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:394
  [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any, N} where N)
    @ Distributed /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:386
  [3] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421
  [4] remotecall_fetch
    @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
  [5] call_on_owner
    @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:494 [inlined]
  [6] wait(r::Future)
    @ Distributed /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:515
  [7] _broadcast_getindex_evalf
    @ ./broadcast.jl:648 [inlined]
  [8] _broadcast_getindex
    @ ./broadcast.jl:621 [inlined]
  [9] getindex
    @ ./broadcast.jl:575 [inlined]
 [10] copyto_nonleaf!(dest::Vector{Future}, bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Tuple{Base.OneTo{Int64}}, typeof(wait), Tuple{Base.Broadcast.Extruded{Vector{Any}, Tuple{Bool}, Tuple{Int64}}}}, iter::Base.OneTo{Int64}, state::Int64, count::Int64)
    @ Base.Broadcast ./broadcast.jl:1026
 [11] copy
    @ ./broadcast.jl:880 [inlined]
 [12] materialize(bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Nothing, typeof(wait), Tuple{Vector{Any}}})
    @ Base.Broadcast ./broadcast.jl:837
 [13] top-level scope
    @ ~/.../stillepost.jl:44
in expression starting at /Users/jonas/.../stillepost.jl:44

@JeffBezanson
Copy link
Member

Distributed does not yet support messages from threads other than 1. Should not be too hard to fix.

@jonas-schulze
Copy link
Contributor Author

Indeed, if I funnel all the communication through thread 1, it works!

    # ...
    p = D.@spawnat w @sync begin
        funnel = Channel()
        T.@spawn stillepost(i, c1, funnel)
        stillepost(i+0.5, funnel, c2)
    end
    # ...

... but is this something we could hope for to be fixed before the next LTS is released?

Why is thread 1 so special? I am not at all familiar with the code in either of the two libraries, but if you could give me some pointers, I can give it a go. 🙂

@JeffBezanson
Copy link
Member

Thread 1 isn't special, this is just something that hasn't been updated for threads yet. The Condition objects used to synchronize workers need to be replaced with Threads.Condition plus locking, or possibly Threads.Event instead.

@jonas-schulze
Copy link
Contributor Author

TL;DR: There is a different issue. Above example is fixed by using addprocs(..., lazy=false).

I tried to write some tests that send and receive from all combinations of threads, processes and where the RemoteChannel is stored, but they pass (even if I increase the contention by using just a single channel). 🤔
In order to isolate the different situations, I tried to not reuse the worker processes (as a side effect even reducing contention on the channel), which for some reason caused the tests to fail (yay but not yay).

using Test
using Distributed, Base.Threads
using Base.Iterators: product

exeflags = ("--startup-file=no",
            "--check-bounds=yes",
            "--depwarn=error",
            "--threads=2")

function call_on(f, wid, tid)
  remotecall(wid) do
    t = Task(f)
    ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
    schedule(t)
    @assert threadid(t) == tid
    t
  end
end

# Run function on process holding the data to only serialize the result of f.
# This becomes useful for things that cannot be serialized (e.g. running tasks)
# or that would be unnecessarily big if serialized.
fetch_from_owner(f, rr) = remotecall_fetch(ffetch, rr.where, rr)

isdone(rr) = fetch_from_owner(istaskdone, rr)
isfailed(rr) = fetch_from_owner(istaskfailed, rr)

@testset "RemoteChannel is threadsafe" begin
  ws = ts = product(1:2, 1:2)
  timeout = 10.0
  @testset "from worker $w1 to $w2 via 1" for (w1, w2) in ws
    @testset "from thread $w1.$t1 to $w2.$t2" for (t1, t2) in ts
      procs_added = addprocs(2; exeflags)
      @everywhere procs_added using Base.Threads
      p1 = procs_added[w1]
      p2 = procs_added[w2]
      chan_id = first(procs_added)
      chan = RemoteChannel(chan_id)
      send = call_on(p1, t1) do
        put!(chan, nothing)
      end
      recv = call_on(p2, t2) do
        take!(chan)
      end
      timedwait(() -> isdone(send) && isdone(recv), timeout)
      @test isdone(send)
      @test isdone(recv)
      @test !isfailed(send)
      @test !isfailed(recv)
      rmprocs(procs_added)
    end
  end
end

Above tests fail consistently in these cases -- all errors being concurrency violations:

Test Summary:               | Pass  Fail  Error  Total
RemoteChannel is threadsafe |   48     4      4     56
  from worker 1 to 1 via 1  |   16                  16
  from worker 2 to 1 via 1  |    8            2     10
    from thread 2.1 to 1.1  |    4                   4
    from thread 2.2 to 1.1  |                 1      1
    from thread 2.1 to 1.2  |    4                   4
    from thread 2.2 to 1.2  |                 1      1
  from worker 1 to 2 via 1  |   14     2      1     17
    from thread 1.1 to 2.1  |    4                   4
    from thread 1.2 to 2.1  |    4                   4
    from thread 1.1 to 2.2  |    3     1      1      5
    from thread 1.2 to 2.2  |    3     1             4
  from worker 2 to 2 via 1  |   10     2      1     13
    from thread 2.1 to 2.1  |    4                   4
    from thread 2.2 to 2.1  |                 1      1
    from thread 2.1 to 2.2  |    4                   4
    from thread 2.2 to 2.2  |    2     2             4

Comparing the (nearly) complete outputs of those tests, I was lucky to see that some workers were not available:

$ julia-1.5 threads.jl > out1.txt
$ julia-1.5 threads.jl > out2.txt
$ diff out1.txt out2.txt
124,125c124,126
<   ProcessExitedException(29)
<   worker_from_id(::Distributed.ProcessGroup, ::Int64) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:1074
---
>   no process with id 29 exists
>   error(::String) at ./error.jl:33
>   worker_from_id(::Distributed.ProcessGroup, ::Int64) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:1079

So I tried adding the workers using addprocs(..., lazy=false) and, out of the blue, all the tests are passing. Even the original example works. @JeffBezanson do you have a clue about what could be going on here?

@jonas-schulze
Copy link
Contributor Author

Latest attempt is JuliaLang/julia#38405

@orenbenkiki
Copy link

orenbenkiki commented Jul 19, 2021

I hit on what is probably the same issue - using channels and futures to communicate between threads in a single process. The gist https://gist.github.com/orenbenkiki/ac71f348d4915b394805656b142b33fe contains small sample code and error traces, in case this isn't exactly the same issue (specifically, there are no worker processes involved here at all - the problem is purely lack of thread safety).

I found it surprising that channels/futures are not thread safe - I think it warrants an explicit warning in the documentation until this is fixed.

@KristofferC
Copy link
Member

Try again now that JuliaLang/julia#38405 is merged?

@orenbenkiki
Copy link

orenbenkiki commented Jul 19, 2021

Is that in the latest Julia 1.7 ? It isn't listed under https://github.com/JuliaLang/julia/blob/v1.7.0-beta2/NEWS.md#multi-threading-changes

Edit: It seems this was merged 30m ago? Wow, that's some timing. I suppose this means I'd have to download Julia from github and build it from source - I never tried doing that before... Is there somewhere one can download the compiled-bleeding-latest-githib-version (for Linux)?

Edit2: Ah, nighly builds. I'll give it a day or two so the merged version will get there and then give it a try. Thanks!

@KristofferC
Copy link
Member

It just got merged so it isn't in 1.7 (yet).

@jpsamaroo
Copy link
Member

With JuliaLang/julia#38405, 3 threads, and 3 workers, the second test (#73) passes for me, while the first test (#73) hangs with no CPU activity.

@orenbenkiki
Copy link

orenbenkiki commented Jul 19, 2021

Note I see non-deterministic results. If you run it multiple times, sometimes it passes, most likely it deadlocks, sometimes it crashes, even with the same number of threads/processes. Running it for longer (larger iterations count) increases the chance of a deadlock (unsurprisingly). This was all on Julia 1.6, mind you - I haven't tried it on the latest version yet, since the potential fix was only merged less than two hours ago - waiting for the nightly build to pick it up.

@vchuravy
Copy link
Member

JuliaLang/julia#38405 also only fixes a limit set of interactions and there is more work needed to ensure that Distributed.jl is fully thread-safe.

@vchuravy
Copy link
Member

JuliaLang/julia#41722 reverted the first fix again.

@vchuravy
Copy link
Member

vchuravy commented Oct 8, 2021

Moving this of the 1.7 milestone. Making Distributed.jl thread-safe will be more work, I am hopeful that we can make progress on this for 1.8

@vtjnash
Copy link
Member

vtjnash commented Mar 2, 2022

@vchuravy JuliaLang/julia#42239 seems stalled, so moving this off the v1.8 milestone

@jonas-schulze
Copy link
Contributor Author

I just watched the State of Julia 2022 which claimed that Distributed would now be thread-safe. However, the snippet above (#73) remains broken on 1.8.5 as well as 1.9.1 both with the following summary.

Test Summary:               | Pass  Fail  Error  Total     Time
RemoteChannel is threadsafe |   48     4      3     55  1m15.1s
  from worker 1 to 1 via 1  |   16                  16    16.4s
  from worker 2 to 1 via 1  |    8            2     10    20.1s
    from thread 2.1 to 1.1  |    4                   4     4.7s
    from thread 2.2 to 1.1  |                 1      1     5.3s
    from thread 2.1 to 1.2  |    4                   4     5.0s
    from thread 2.2 to 1.2  |                 1      1     5.1s
  from worker 1 to 2 via 1  |   14     2            16    18.8s
    from thread 1.1 to 2.1  |    4                   4     4.7s
    from thread 1.2 to 2.1  |    4                   4     4.5s
    from thread 1.1 to 2.2  |    3     1             4     4.7s
    from thread 1.2 to 2.2  |    3     1             4     4.9s
  from worker 2 to 2 via 1  |   10     2      1     13    19.8s
    from thread 2.1 to 2.1  |    4                   4     5.1s
    from thread 2.2 to 2.1  |                 1      1     5.0s
    from thread 2.1 to 2.2  |    4                   4     5.0s
    from thread 2.2 to 2.2  |    2     2             4     4.7s
ERROR: LoadError: Some tests did not pass: 48 passed, 4 failed, 3 errored, 0 broken.

Note that from thread 1.1 to 2.2 is now considered failing without an error.

CC @JeffBezanson

jonas-schulze referenced this issue in mpimd-csc/ParaReal.jl Aug 10, 2023
A pipeline objects holds references to the channels and tasks involved
in the execution. It further holds the configurations of each stage of
the pipeline.

Addresses #18
Opens #1
See https://github.com/JuliaLang/julia/issues/37706
@vtjnash vtjnash transferred this issue from JuliaLang/julia Feb 11, 2024
@JamesWrigley JamesWrigley linked a pull request May 23, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment