From 5f8b02b06b7e70faae18c5c70924dd3adff37643 Mon Sep 17 00:00:00 2001 From: Stephen Fegan Date: Fri, 13 Dec 2024 14:29:55 +0100 Subject: [PATCH] Add message about merging --- src/iact_data/parallel_event_dispatcher.cpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/iact_data/parallel_event_dispatcher.cpp b/src/iact_data/parallel_event_dispatcher.cpp index 8c3dafe1..eded7d87 100644 --- a/src/iact_data/parallel_event_dispatcher.cpp +++ b/src/iact_data/parallel_event_dispatcher.cpp @@ -133,6 +133,7 @@ void ParallelEventDispatcher::process_src(calin::io::data_source::DataSource< std::chrono::milliseconds>(dt).count())*0.001,3) << " sec"; start_time = std::chrono::system_clock::now(); do_dispatcher_loop(src, log_frequency, run_config->num_events(), start_time, ndispatched); + write_final_log_message(run_config, start_time, ndispatched); } else { @@ -140,7 +141,6 @@ void ParallelEventDispatcher::process_src(calin::io::data_source::DataSource< do_parallel_dispatcher_loops(run_config, &pump, nthread, log_frequency, start_time, ndispatched); } - write_final_log_message(run_config, start_time, ndispatched); LOG(INFO) << "Finishing up ..."; start_time = std::chrono::system_clock::now(); dispatch_leave_run(); @@ -214,12 +214,14 @@ process_src_list(std::vector(dt).count())*0.001,3) << " sec"; start_time = std::chrono::system_clock::now(); do_dispatcher_loop(src_list[0], log_frequency, run_config->num_events(), start_time, ndispatched); + write_final_log_message(run_config, start_time, ndispatched); } else { io::data_source::VectorDataSourceFactory src_factory(src_list); + do_parallel_dispatcher_loops(run_config, &src_factory, src_list.size(), + log_frequency, start_time, ndispatched); } - write_final_log_message(run_config, start_time, ndispatched); LOG(INFO) << "Finishing up ..."; start_time = std::chrono::system_clock::now(); dispatch_leave_run(); @@ -246,7 +248,6 @@ process_src_factory(calin::io::data_source::DataSourceFactory< do_parallel_dispatcher_loops(run_config, src_factory, nthread, log_frequency, start_time, ndispatched); - write_final_log_message(run_config, start_time, ndispatched); LOG(INFO) << "Finishing up ..."; start_time = std::chrono::system_clock::now(); dispatch_leave_run(); @@ -430,12 +431,21 @@ void ParallelEventDispatcher::do_parallel_dispatcher_loops( throw std::runtime_error("Exception(s) thrown in threaded dispatcher loop"); } + write_final_log_message(run_config, start_time, ndispatched); + LOG(INFO) << "Merging results ..."; + start_time = std::chrono::system_clock::now(); + for(auto* d : sub_dispatchers) { d->dispatch_leave_run(); d->dispatch_merge_results(); delete d; } + + dt = std::chrono::system_clock::now() - start_time; + LOG(INFO) << "Merging results ... completed in " + << to_string_with_commas(double(std::chrono::duration_cast< + std::chrono::milliseconds>(dt).count())*0.001,3) << " sec"; } void ParallelEventDispatcher::do_dispatcher_loop(