Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker threads not going idle #114

Closed
ryzhyk opened this issue Sep 14, 2018 · 15 comments
Closed

Worker threads not going idle #114

ryzhyk opened this issue Sep 14, 2018 · 15 comments

Comments

@ryzhyk
Copy link
Contributor

ryzhyk commented Sep 14, 2018

I've noticed that worker threads in my program go into some form of an infinite loop when there is no input data. Specifically, I use worker 0 to receive input from an I/O thread. This one behaves well and blocks waiting for data, but all other workers are consuming 100% of CPU even when there is no data to process.

Here is the high-level structure of the program:

            timely::execute(Configuration::Process(nworkers), move |worker: &mut Root<Allocator>| {
                ...
                let mut sessions = worker.dataflow::<u64,_,_>(|outer: &mut Child<Root<Allocator>, u64>| {
                    ...
                });
                /* Only worker 0 receives data */
                if worker_index != 0 {return;};

                loop {
                    /*wait for user input*/
                    ...
                };
            }).map(|g| {g.join(); ()})

The stack trace of the worker that consumes 100% CPU looks like this:

#0  0x000055f9f020f9f0 in core::iter::iterator::Iterator::try_for_each ()
#1  0x000055f9f020fb56 in core::iter::iterator::Iterator::any ()
#2  0x000055f9f020831a in <T as core::slice::SliceContains>::slice_contains ()
#3  0x000055f9f027b616 in <timely::progress::frontier::MutableAntichain<T>>::rebuild_and ()
#4  0x000055f9f033d909 in <timely::progress::frontier::MutableAntichain<T>>::update_iter_and ()
#5  0x000055f9f031cf50 in <differential_dataflow::trace::wrappers::rc::TraceBox<K, V, T, R, Tr>>::adjust_through_frontier ()
#6  0x000055f9f01f6186 in <differential_dataflow::operators::arrange::TraceAgent<K, V, T, R, Tr> as differential_dataflow::trace::TraceReader<K, V, T, R>>::distinguish_since ()
#7  0x000055f9f036fa7c in <differential_dataflow::operators::arrange::Arranged<G, K, V, R, T1> as differential_dataflow::operators::group::GroupArranged<G, K, V, R>>::group_arranged::{{closure}} ()
#8  0x000055f9f0361fbd in <timely::dataflow::stream::Stream<G, D1> as timely::dataflow::operators::generic::operator::Operator<G, D1>>::unary_notify::{{closure}}::{{closure}} ()
#9  0x000055f9f0362528 in <timely::dataflow::stream::Stream<G, D1> as timely::dataflow::operators::generic::operator::Operator<G, D1>>::unary_frontier::{{closure}}::{{closure}} ()
#10 0x000055f9f0387a9f in <timely::dataflow::operators::generic::builder_rc::OperatorBuilder<G>>::build::{{closure}} ()
#11 0x000055f9f0366733 in <timely::dataflow::operators::generic::builder_raw::OperatorCore<T, PEP, PIP> as timely::progress::operate::Operate<T>>::pull_internal_progress ()
#12 0x000055f9f01e12e6 in <timely::progress::nested::subgraph::PerOperatorState<T>>::exchange_progress ()
#13 0x000055f9f01c6bd1 in <timely::progress::nested::subgraph::Subgraph<TOuter, TInner> as timely::progress::operate::Operate<TOuter>>::pull_internal_progress ()
#14 0x000055f9f0453fd9 in timely::dataflow::scopes::root::Wrapper::step::{{closure}} ()
#15 0x000055f9f045363b in <core::option::Option<T>>::map ()
#16 0x000055f9f0453f5b in timely::dataflow::scopes::root::Wrapper::step ()
#17 0x000055f9f03b9428 in <timely::dataflow::scopes::root::Root<A>>::step ()
#18 0x000055f9f03e8468 in timely::execute::execute_logging::{{closure}} ()
#19 0x000055f9f03d7870 in timely_communication::initialize::initialize_from::{{closure}} ()
#20 0x000055f9f03da9c2 in std::sys_common::backtrace::__rust_begin_short_backtrace ()
#21 0x000055f9f03dba72 in std::thread::Builder::spawn::{{closure}}::{{closure}} ()
#22 0x000055f9f03cbff2 in <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once ()
#23 0x000055f9f02504e9 in std::panicking::try::do_call ()
#24 0x000055f9f04ad85a in __rust_maybe_catch_panic () at libpanic_unwind/lib.rs:102
#25 0x000055f9f025042b in std::panicking::try ()
#26 0x000055f9f03db1f2 in std::panic::catch_unwind ()
#27 0x000055f9f03db96c in std::thread::Builder::spawn::{{closure}} ()
#28 0x000055f9f03e3128 in <F as alloc::boxed::FnBox<A>>::call_box ()
#29 0x000055f9f04a059b in _$LT$alloc..boxed..Box$LT$$LP$dyn$u20$alloc..boxed..FnBox$LT$A$C$$u20$Output$u3d$R$GT$$u20$$u2b$$u20$$u27$a$RP$$GT$$u20$as$u20$core..ops..function..FnOnce$LT$A$GT$$GT$::call_once::h21b1f1e31097750a () at /checkout/src/liballoc/boxed.rs:656
#30 std::sys_common::thread::start_thread () at libstd/sys_common/thread.rs:24
#31 0x000055f9f048f116 in std::sys::unix::thread::Thread::new::thread_start () at libstd/sys/unix/thread.rs:90
#32 0x00007fbb66c746db in start_thread (arg=0x7fbb655fe700) at pthread_create.c:463
#33 0x00007fbb6678588f in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

I'd appreciate any hints on how to investigate this.

@frankmcsherry
Copy link
Member

frankmcsherry commented Sep 14, 2018

The worker threads in timely busy-wait, so all of the threads that have returned are spinning at 100% polling their input channels. That's, .. by design at the moment. Thread wake-up was a non-trivial source of latency in the Naiad design.

Depending on what your goals are (lower power util, cooler laptop) you can have the other workers not return; and instead while worker.step() { thread::sleep_ms(1); }. In the possibly near future there may be a more event-based scheduler (much of the information required for this is present), but at the moment most of the workloads are "under load", so there is always work to do.

Is your goal to use less power at the expense of some latency when events arrive? This is a noble goal, but just trying to grok your desiderata.

Edit: for context, in most streaming timely computations workers rarely wait for input, and even when there is not data there is information that the low watermark for timestamps have advanced, and the workers need to scurry a bit to determine and communicate that the output has not changed.

What you are seeing in the stack above is the group method's main logic, which is mostly ignored when there are no input data nor time advancement, and it appears to (possibly bug) unconditionally go and remind the trace where the operator is at (should be harmless, but also pointless).

An arguably better thing to do would be for each trace to put some effort towards progressive merging, in what is otherwise downtime. This would work towards reducing the memory footprint of in-progress merges in between "actual work".

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Sep 14, 2018

Interesting, thanks for the explanation! I cannot afford to busy-wait, because my differential program runs in a bigger system with many other things going on. The differential computation only kicks in when a certain trigger event happens, e.g., system configuration changes. Hogging the CPU is going to affect all other computations. Power and thermal considerations are important as well.

@frankmcsherry
Copy link
Member

frankmcsherry commented Sep 14, 2018

Probably the best thing to do is coordinate a wake-up, then. If you are just running single-process, you should be able to have the input thread signal each of the worker threads, then everyone works until the timestep has advanced, at which point they can go back to sleep. There is a bit of a concurrency mechanism issue there, but it should be tractable (e.g. an Arc<RwLock<Timestamp>> that worker zero occasionally writes to indicating the timestamp up through which everyone should be working, as well as signals for each other worker thread).

Probably the first thing to do is not have the other threads return, but rather spin explicitly so that you can add a condition that they test. E.g.

let mut time = ..; // largest time worker zero has advanced to
while probe.less_than(time) {
    worker.step();
}
// double-check time, consider suspending

Edit: I wrote a Signal object over in ./communication. It's not obviously the best implementation, but it allows threads to rendezvous and then wake each other up.

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Sep 14, 2018

Good idea! I just tried with sleep_ms(1), and it's still using ~25% CPU per worker. I'll try your recipe now.

@frankmcsherry
Copy link
Member

The "next-gen" worker implementations, with all the zero-copy goodness, actually each have one of those Signal types in their definition, they just don't use it yet. Maybe I can figure out a good way to expose it. Ideally, you should have the ability to say something like worker.step_or_park() which would either do work or park itself if there is no work to do.

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Sep 14, 2018

That would be lovely. I really don't think that my use case is unique. There must be many applications of differential where you want to quickly respond to a change in the input and then go idle.

In the meanwhile, the coordinated wake up you proposed should work just fine. I wish they did not remove semaphores from the Rust standard library; would have made the implementation easier :)

@frankmcsherry
Copy link
Member

The plan is to head towards something like step_or_park(), but the real motivation (personally) is that with a more precise understanding of what still needs to run at a worker we can avoid polling useless operators and leap directly to the work that needs to get done.

It's possible that there are applications that go idle, but .. so far most of the reports have been a mix of batch (go as fast as possible then terminate) and real-time streaming where you are always taking in data or the signal of no data. Yours isn't unreasonable, and at least one person asked about an even longer-latency version (run DD step; freeze computation; start up an hour later and make a change).

We'll get there, eventually. Mostly gated on my programming cycles, unfortunately. T.T

@frankmcsherry
Copy link
Member

Btw, timely has (recently) a ./synchronization directory where some synchronization primitives are homed (currently, a barrier and a global sequencer). If you come up with what might be a re-useable pattern, putting it in there might help.

I'm planning (based on this convo) to expose a worker.park() method, which will have the worker wait on its signal. Received messages should cause it to wake (though, process-local messages using the current default allocator will not currently, unfortunately).

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Sep 17, 2018

The solution I ended up with uses a combination of RwLock to share the timestamp between workers and std::sync::Barrier to synchronize all workers on every input batch. worker 0 reads input records, pushes them to input sessions and advances the shared timestamp before executing barrier.wait().

I was not able to get away with, e.g., the Signal abstraction, as it does not let me synchronize the initial round, where worker 0 must ping each other worker after it has started waiting.

Does this make sense and do you seen any value is packaging this solution in some reusable form?

@frankmcsherry
Copy link
Member

I started up an issue (https://github.com/frankmcsherry/timely-dataflow/issues/189) about this in the timely repo. Ideally medium/long term it would probably be best to make sure the channels all wake up workers, and expose a mechanism to park workers (in the issue: park operators, with the worker becoming parkable if all operators are).

However, I'm not 100% clear on the issue you mention about the first round. Can you explain that in more detail?

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Sep 17, 2018

Actually I was wrong, the problem is not in the first round. Here is the high-level structure I am trying to implement.

worker0:

loop {
   receive data from the user;
   update timestamp;
   wakeup workers[1..n];
   while probe.less_than(time) {worker.step()}
}

worker1..N

loop {
    wait for signal from worker0;
    read timestamp;
    while probe.less_than(time) {worker.step()};
}

The system livelocks if one thread enters the while loop while another thread is blocked waiting for a signal. Here is the simplest livelock scenario:

w1: reads old timestamp
w0: update timestamp
w0: send signal
w1: starts waiting (missed the signal)
w0: enters the while loop

w0 will loop forever (since without w1 it cannot make progress), and w1 will wait forever.

I could not think of an alternative scheme using only the Signal abstraction that would not have a similar issue.

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Dec 30, 2018

Does the event_driven feature address the issue discussed in this thread, i.e., put an end to busy-waiting in workers?

@frankmcsherry
Copy link
Member

No, it does not. That's on the road map, but it involves the cooperation of a few parts that don't cooperate yet (the typed inter-thread allocator uses MPSC channels, and they don't wake a thread in the way we want on message send).

@comnik
Copy link
Member

comnik commented Dec 30, 2018

One anecdotal thing is that our server's CPU usage when idle dropped significantly with event_driven. I haven't had time to investigate this yet, but I was surprised.

@frankmcsherry
Copy link
Member

I believe this issue is addressed with step_or_park(). I'm going to close it, but feel free to re-open if you think that is wrong.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants