Skip to content

Commit

Permalink
Add message about merging
Browse files Browse the repository at this point in the history
  • Loading branch information
sfegan committed Dec 13, 2024
1 parent 3a56972 commit 5f8b02b
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions src/iact_data/parallel_event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ void ParallelEventDispatcher::process_src(calin::io::data_source::DataSource<
std::chrono::milliseconds>(dt).count())*0.001,3) << " sec";
start_time = std::chrono::system_clock::now();
do_dispatcher_loop(src, log_frequency, run_config->num_events(), start_time, ndispatched);
write_final_log_message(run_config, start_time, ndispatched);
}
else
{
io::data_source::UnidirectionalBufferedDataSourcePump<TelescopeEvent> pump(src);
do_parallel_dispatcher_loops(run_config, &pump, nthread, log_frequency,
start_time, ndispatched);
}
write_final_log_message(run_config, start_time, ndispatched);
LOG(INFO) << "Finishing up ...";
start_time = std::chrono::system_clock::now();
dispatch_leave_run();
Expand Down Expand Up @@ -214,12 +214,14 @@ process_src_list(std::vector<calin::io::data_source::DataSource<
std::chrono::milliseconds>(dt).count())*0.001,3) << " sec";
start_time = std::chrono::system_clock::now();
do_dispatcher_loop(src_list[0], log_frequency, run_config->num_events(), start_time, ndispatched);
write_final_log_message(run_config, start_time, ndispatched);
}
else
{
io::data_source::VectorDataSourceFactory<TelescopeEvent> src_factory(src_list);
do_parallel_dispatcher_loops(run_config, &src_factory, src_list.size(),
log_frequency, start_time, ndispatched);
}
write_final_log_message(run_config, start_time, ndispatched);
LOG(INFO) << "Finishing up ...";
start_time = std::chrono::system_clock::now();
dispatch_leave_run();
Expand All @@ -246,7 +248,6 @@ process_src_factory(calin::io::data_source::DataSourceFactory<
do_parallel_dispatcher_loops(run_config, src_factory, nthread,
log_frequency, start_time, ndispatched);

write_final_log_message(run_config, start_time, ndispatched);
LOG(INFO) << "Finishing up ...";
start_time = std::chrono::system_clock::now();
dispatch_leave_run();
Expand Down Expand Up @@ -430,12 +431,21 @@ void ParallelEventDispatcher::do_parallel_dispatcher_loops(
throw std::runtime_error("Exception(s) thrown in threaded dispatcher loop");
}

write_final_log_message(run_config, start_time, ndispatched);
LOG(INFO) << "Merging results ...";
start_time = std::chrono::system_clock::now();

for(auto* d : sub_dispatchers)
{
d->dispatch_leave_run();
d->dispatch_merge_results();
delete d;
}

dt = std::chrono::system_clock::now() - start_time;
LOG(INFO) << "Merging results ... completed in "
<< to_string_with_commas(double(std::chrono::duration_cast<
std::chrono::milliseconds>(dt).count())*0.001,3) << " sec";
}

void ParallelEventDispatcher::do_dispatcher_loop(
Expand Down

0 comments on commit 5f8b02b

Please sign in to comment.