From 2332194bf2dbd478d7c9441a0ade9f7dc376dd1b Mon Sep 17 00:00:00 2001 From: Stephen Fegan Date: Thu, 26 Sep 2024 12:04:29 +0200 Subject: [PATCH] Fix to way fragments are handled (don't glob twice) --- .../iact_data/parallel_event_dispatcher.hpp | 3 +- proto/iact_data/zfits_data_source.proto | 3 + src/iact_data/parallel_event_dispatcher.cpp | 44 +++++----- src/iact_data/zfits_acada_data_source.cpp | 80 ++++++++++--------- 4 files changed, 72 insertions(+), 58 deletions(-) diff --git a/include/iact_data/parallel_event_dispatcher.hpp b/include/iact_data/parallel_event_dispatcher.hpp index 19b625f6..d4e0b5e8 100644 --- a/include/iact_data/parallel_event_dispatcher.hpp +++ b/include/iact_data/parallel_event_dispatcher.hpp @@ -141,7 +141,8 @@ class ParallelEventDispatcher: protected telescope_run_configuration::TelescopeRunConfiguration* run_config, int nthread); void write_final_log_message( - unsigned log_frequency, const std::chrono::system_clock::time_point& start_time, + calin::ix::iact_data::telescope_run_configuration::TelescopeRunConfiguration* run_config, + const std::chrono::system_clock::time_point& start_time, std::atomic& ndispatched); std::vector adopted_visitors_; diff --git a/proto/iact_data/zfits_data_source.proto b/proto/iact_data/zfits_data_source.proto index f83eeca8..c5cf2773 100644 --- a/proto/iact_data/zfits_data_source.proto +++ b/proto/iact_data/zfits_data_source.proto @@ -53,6 +53,9 @@ message ZFITSDataSourceConfig { uint64 max_seq_index = 7 [ (CFO).desc = "Maximum event sequence number. This effectively sets the " "number of events that can be read. Zero means there is no limit." ]; + repeated string forced_file_fragments_list = 8 [ + (CFO).desc = "Forced file fragments list. If set the file fragments are not found through globbing." ]; + bool dont_read_run_header = 11 [ (CFO).desc = "Don't read the run header or data stream." ]; bool ignore_run_header_errors = 12 [ diff --git a/src/iact_data/parallel_event_dispatcher.cpp b/src/iact_data/parallel_event_dispatcher.cpp index 54f66fc0..f44c02ab 100644 --- a/src/iact_data/parallel_event_dispatcher.cpp +++ b/src/iact_data/parallel_event_dispatcher.cpp @@ -171,6 +171,7 @@ void ParallelEventDispatcher::process_run(calin::io::data_source::DataSource< do_parallel_dispatcher_loops(run_config, &pump, nthread, log_frequency, start_time, ndispatched); } + write_final_log_message(run_config, start_time, ndispatched); LOG(INFO) << "Finishing up ..."; start_time = std::chrono::system_clock::now(); dispatch_leave_run(); @@ -205,6 +206,7 @@ process_run(std::vectorget_run_configuration(); auto fragments = cta_file->all_fragment_names(); @@ -239,13 +240,15 @@ process_cta_zfits_run(const std::string& filename, throw; } } else { - zfits_config.set_file_fragment_stride( - nthread*std::max(1U, zfits_config.file_fragment_stride())); - std::vector src_list(nthread); try { for(unsigned ithread=0; ithread& ndispatched) { using namespace std::chrono; - if(log_frequency) - { - auto dt = system_clock::now() - start_time; - if(ndispatched) { - LOG(INFO) << "Dispatched " - << to_string_with_commas(uint64_t(ndispatched)) << " events in " - << to_string_with_commas(double(duration_cast(dt).count())*0.001,3) << " sec, " - << to_string_with_commas(duration_cast(dt).count()/ndispatched) - << " us/event (finished)"; - } else { - LOG(INFO) << "Dispatched " - << to_string_with_commas(uint64_t(ndispatched)) << " events in " - << to_string_with_commas(double(duration_cast(dt).count())*0.001,3) << " sec (finished)"; - } + auto dt = system_clock::now() - start_time; + if(run_config->num_events() == 0) { + LOG(INFO) << "Dispatched " + << to_string_with_commas(uint64_t(ndispatched)) << " events in " + << to_string_with_commas(double(duration_cast(dt).count())*0.001,3) << " sec, " + << to_string_with_commas(duration_cast(dt).count()/ndispatched) + << " us/event (finished)"; + } else if(ndispatched != run_config->num_events()) { + LOG(WARNING) << "Dispatched " + << to_string_with_commas(uint64_t(ndispatched)) << " events in " + << to_string_with_commas(double(duration_cast(dt).count())*0.001,3) << " sec, " + << to_string_with_commas(duration_cast(dt).count()/ndispatched) + << " us/event (finished)\n" + << "Number of dispatched event does not match number of expected events: " << run_config->num_events(); } } diff --git a/src/iact_data/zfits_acada_data_source.cpp b/src/iact_data/zfits_acada_data_source.cpp index c4966420..4baf0f00 100644 --- a/src/iact_data/zfits_acada_data_source.cpp +++ b/src/iact_data/zfits_acada_data_source.cpp @@ -608,48 +608,54 @@ ZFITSACADACameraEventDataSourceOpener(std::string filename, const config_type& c ACADACameraEventRandomAccessDataSourceWithRunHeader >(), config_(config) { - const unsigned istride = std::max(1U,config.file_fragment_stride()); - filename = expand_filename(filename); - if(is_file(filename)) - filenames_.emplace_back(filename); - else - throw(std::runtime_error("File not found: " + filename)); - - if(not config_.exact_filename_only()) - { - const std::string extension = config_.extension(); - auto ifind = filename.rfind(extension); - if(ifind == filename.size()-extension.size()) + if(config.forced_file_fragments_list_size() > 0) { + for(auto& ffn : config.forced_file_fragments_list()) { + filenames_.emplace_back(ffn); + } + } else { + const unsigned istride = std::max(1U,config.file_fragment_stride()); + filename = expand_filename(filename); + if(is_file(filename)) + filenames_.emplace_back(filename); + else + throw(std::runtime_error("File not found: " + filename)); + + if(not config_.exact_filename_only()) { - filename = filename.substr(0, ifind); - - unsigned istart = 0; - if(not is_file(filename+".1"+extension)) + const std::string extension = config_.extension(); + auto ifind = filename.rfind(extension); + if(ifind == filename.size()-extension.size()) { - ifind = filename.rfind('.'); - if(ifind != std::string::npos and - std::all_of(filename.begin() + ifind + 1, filename.end(), ::isdigit)) + filename = filename.substr(0, ifind); + + unsigned istart = 0; + if(not is_file(filename+".1"+extension)) { - istart = std::stoi(filename.substr(ifind + 1)); - filename = filename.substr(0, ifind); + ifind = filename.rfind('.'); + if(ifind != std::string::npos and + std::all_of(filename.begin() + ifind + 1, filename.end(), ::isdigit)) + { + istart = std::stoi(filename.substr(ifind + 1)); + filename = filename.substr(0, ifind); + } } - } - bool fragment_found = true; - for(unsigned i=istart+istride; fragment_found and (config_.max_file_fragments()==0 or - filenames_.size()