Skip to content

Commit

Permalink
i#6938 sched migrate: Print blocked times in queues (#7019)
Browse files Browse the repository at this point in the history
Improves diagnostics by augmenting the all-runqueue printing:

+ It now constructs its many-line string in memory and then prints it
all at once, to make it more atomic.

+ It includes the remaining blocked times for blocked inputs.

+ It is moved from pop_from_ready_queue() where the popped input is in
flux to pick_next_input() where the current running input is valid.

+ It is printed more frequently.

Also prints the size of the unscheduled queue when moving it.

Issue: #6938
  • Loading branch information
derekbruening committed Oct 4, 2024
1 parent 4c3fcb8 commit 5927c0a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
51 changes: 37 additions & 14 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2828,13 +2828,6 @@ scheduler_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue(
}
status = pop_from_ready_queue_hold_locks(from_output, for_output, new_input);
}
VDO(this, 1, {
static int global_heartbeat;
// We are ok with races as the cadence is approximate.
if (++global_heartbeat % 100000 == 0) {
print_queue_stats();
}
});
return status;
}

Expand Down Expand Up @@ -3175,6 +3168,14 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t output,
uint64_t blocked_time)
{
VDO(this, 1, {
static int global_heartbeat;
// We are ok with races as the cadence is approximate.
if (++global_heartbeat % 10000 == 0) {
print_queue_stats();
}
});

sched_type_t::stream_status_t res = sched_type_t::STATUS_OK;
const input_ordinal_t prev_index = outputs_[output].cur_input;
input_ordinal_t index = INVALID_INPUT_ORDINAL;
Expand Down Expand Up @@ -4310,15 +4311,34 @@ scheduler_tmpl_t<RecordType, ReaderType>::print_queue_stats()
unsched_size = unscheduled_priority_.queue.size();
}
int live = live_input_count_.load(std::memory_order_acquire);
VPRINT(this, 1, "inputs: %zd scheduleable, %zd unscheduled, %zd eof\n",
live - unsched_size, unsched_size, inputs_.size() - live);
// Make our multi-line output more atomic.
std::ostringstream ostr;
ostr << "Queue snapshot: inputs: " << live - unsched_size << " schedulable, "
<< unsched_size << " unscheduled, " << inputs_.size() - live << " eof\n";
for (unsigned int i = 0; i < outputs_.size(); ++i) {
auto lock = acquire_scoped_output_lock_if_necessary(i);
VPRINT(this, 1, " out #%d: running #%d; %zd in queue; %d blocked\n", i,
// XXX: Reading this is racy; we're ok with that.
outputs_[i].cur_input, outputs_[i].ready_queue.queue.size(),
outputs_[i].ready_queue.num_blocked);
uint64_t cur_time = get_output_time(i);
ostr << " out #" << i << " @" << cur_time << ": running #"
<< outputs_[i].cur_input << "; " << outputs_[i].ready_queue.queue.size()
<< " in queue; " << outputs_[i].ready_queue.num_blocked << " blocked\n";
std::set<input_info_t *> readd;
input_info_t *res = nullptr;
while (!outputs_[i].ready_queue.queue.empty()) {
res = outputs_[i].ready_queue.queue.top();
readd.insert(res);
outputs_[i].ready_queue.queue.pop();
std::lock_guard<mutex_dbg_owned> input_lock(*res->lock);
if (res->blocked_time > 0) {
ostr << " " << res->index << " still blocked for "
<< res->blocked_time - (cur_time - res->blocked_start_time) << "\n";
}
}
// Re-add the ones we skipped, but without changing their counters so we preserve
// the prior FIFO order.
for (input_info_t *add : readd)
outputs_[i].ready_queue.queue.push(add);
}
VPRINT(this, 0, "%s\n", ostr.str().c_str());
}

template <typename RecordType, typename ReaderType>
Expand Down Expand Up @@ -4358,7 +4378,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::rebalance_queues(
}
if (live_input_count_.load(std::memory_order_acquire) ==
static_cast<int>(unsched_size)) {
VPRINT(this, 1, "rebalancing moving entire unscheduled queue to ready_queues\n");
VPRINT(
this, 1,
"rebalancing moving entire unscheduled queue (%zu entries) to ready_queues\n",
unsched_size);
{
std::lock_guard<mutex_dbg_owned> unsched_lock(*unscheduled_priority_.lock);
while (!unscheduled_priority_.queue.empty()) {
Expand Down
1 change: 1 addition & 0 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
rebalance_queues(output_ordinal_t triggering_output,
std::vector<input_ordinal_t> inputs_to_add);

// Up to the caller to check verbosity before calling.
void
print_queue_stats();

Expand Down
2 changes: 1 addition & 1 deletion clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5055,7 +5055,7 @@ test_unscheduled_initially()
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
// We have a medium idle period before A becomes scheduleable.
// We have a medium idle period before A becomes schedulable.
static const char *const CORE0_SCHED_STRING =
"...B....._____.....A.__________________________________B....B.";

Expand Down

0 comments on commit 5927c0a

Please sign in to comment.