Skip to content

Commit

Permalink
Fix to way fragments are handled (don't glob twice)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfegan committed Sep 26, 2024
1 parent 22eb4ad commit 2332194
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 58 deletions.
3 changes: 2 additions & 1 deletion include/iact_data/parallel_event_dispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class ParallelEventDispatcher: protected
telescope_run_configuration::TelescopeRunConfiguration* run_config, int nthread);

void write_final_log_message(
unsigned log_frequency, const std::chrono::system_clock::time_point& start_time,
calin::ix::iact_data::telescope_run_configuration::TelescopeRunConfiguration* run_config,
const std::chrono::system_clock::time_point& start_time,
std::atomic<uint_fast64_t>& ndispatched);

std::vector<calin::iact_data::event_visitor::ParallelEventVisitor*> adopted_visitors_;
Expand Down
3 changes: 3 additions & 0 deletions proto/iact_data/zfits_data_source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ message ZFITSDataSourceConfig {
uint64 max_seq_index = 7 [
(CFO).desc = "Maximum event sequence number. This effectively sets the "
"number of events that can be read. Zero means there is no limit." ];
repeated string forced_file_fragments_list = 8 [
(CFO).desc = "Forced file fragments list. If set the file fragments are not found through globbing." ];

bool dont_read_run_header = 11 [
(CFO).desc = "Don't read the run header or data stream." ];
bool ignore_run_header_errors = 12 [
Expand Down
44 changes: 24 additions & 20 deletions src/iact_data/parallel_event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void ParallelEventDispatcher::process_run(calin::io::data_source::DataSource<
do_parallel_dispatcher_loops(run_config, &pump, nthread, log_frequency,
start_time, ndispatched);
}
write_final_log_message(run_config, start_time, ndispatched);
LOG(INFO) << "Finishing up ...";
start_time = std::chrono::system_clock::now();
dispatch_leave_run();
Expand Down Expand Up @@ -205,6 +206,7 @@ process_run(std::vector<calin::io::data_source::DataSource<
do_parallel_dispatcher_loops(run_config, &src_factory, src_list.size(),
log_frequency, start_time, ndispatched);
}
write_final_log_message(run_config, start_time, ndispatched);
LOG(INFO) << "Finishing up ...";
start_time = std::chrono::system_clock::now();
dispatch_leave_run();
Expand All @@ -218,8 +220,7 @@ void ParallelEventDispatcher::
process_cta_zfits_run(const std::string& filename,
const calin::ix::iact_data::event_dispatcher::EventDispatcherConfig& config)
{
auto zfits_config = config.zfits();
auto* cta_file = new CTAZFITSDataSource(filename, config.decoder(), zfits_config);
auto* cta_file = new CTAZFITSDataSource(filename, config.decoder(), config.zfits());
TelescopeRunConfiguration* run_config = cta_file->get_run_configuration();

auto fragments = cta_file->all_fragment_names();
Expand All @@ -239,13 +240,15 @@ process_cta_zfits_run(const std::string& filename,
throw;
}
} else {
zfits_config.set_file_fragment_stride(
nthread*std::max(1U, zfits_config.file_fragment_stride()));

std::vector<calin::iact_data::telescope_data_source::
TelescopeDataSource*> src_list(nthread);
try {
for(unsigned ithread=0; ithread<nthread; ithread++) {
CTAZFITSDataSource::config_type zfits_config = config.zfits();
zfits_config.clear_forced_file_fragments_list();
for(unsigned iff=ithread;iff<fragments.size();iff+=nthread) {
zfits_config.add_forced_file_fragments_list(fragments[iff]);
}
src_list[ithread] =
new CTAZFITSDataSource(fragments[ithread], cta_file, zfits_config);
}
Expand Down Expand Up @@ -475,24 +478,25 @@ void ParallelEventDispatcher::write_initial_log_message(
}

void ParallelEventDispatcher::write_final_log_message(
unsigned log_frequency, const std::chrono::system_clock::time_point& start_time,
calin::ix::iact_data::telescope_run_configuration::TelescopeRunConfiguration* run_config,
const std::chrono::system_clock::time_point& start_time,
std::atomic<uint_fast64_t>& ndispatched)
{
using namespace std::chrono;
if(log_frequency)
{
auto dt = system_clock::now() - start_time;
if(ndispatched) {
LOG(INFO) << "Dispatched "
<< to_string_with_commas(uint64_t(ndispatched)) << " events in "
<< to_string_with_commas(double(duration_cast<milliseconds>(dt).count())*0.001,3) << " sec, "
<< to_string_with_commas(duration_cast<microseconds>(dt).count()/ndispatched)
<< " us/event (finished)";
} else {
LOG(INFO) << "Dispatched "
<< to_string_with_commas(uint64_t(ndispatched)) << " events in "
<< to_string_with_commas(double(duration_cast<milliseconds>(dt).count())*0.001,3) << " sec (finished)";
}
auto dt = system_clock::now() - start_time;
if(run_config->num_events() == 0) {
LOG(INFO) << "Dispatched "
<< to_string_with_commas(uint64_t(ndispatched)) << " events in "
<< to_string_with_commas(double(duration_cast<milliseconds>(dt).count())*0.001,3) << " sec, "
<< to_string_with_commas(duration_cast<microseconds>(dt).count()/ndispatched)
<< " us/event (finished)";
} else if(ndispatched != run_config->num_events()) {
LOG(WARNING) << "Dispatched "
<< to_string_with_commas(uint64_t(ndispatched)) << " events in "
<< to_string_with_commas(double(duration_cast<milliseconds>(dt).count())*0.001,3) << " sec, "
<< to_string_with_commas(duration_cast<microseconds>(dt).count()/ndispatched)
<< " us/event (finished)\n"
<< "Number of dispatched event does not match number of expected events: " << run_config->num_events();
}
}

Expand Down
80 changes: 43 additions & 37 deletions src/iact_data/zfits_acada_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,48 +608,54 @@ ZFITSACADACameraEventDataSourceOpener(std::string filename, const config_type& c
ACADACameraEventRandomAccessDataSourceWithRunHeader<MessageSet> >(),
config_(config)
{
const unsigned istride = std::max(1U,config.file_fragment_stride());
filename = expand_filename(filename);
if(is_file(filename))
filenames_.emplace_back(filename);
else
throw(std::runtime_error("File not found: " + filename));

if(not config_.exact_filename_only())
{
const std::string extension = config_.extension();
auto ifind = filename.rfind(extension);
if(ifind == filename.size()-extension.size())
if(config.forced_file_fragments_list_size() > 0) {
for(auto& ffn : config.forced_file_fragments_list()) {
filenames_.emplace_back(ffn);
}
} else {
const unsigned istride = std::max(1U,config.file_fragment_stride());
filename = expand_filename(filename);
if(is_file(filename))
filenames_.emplace_back(filename);
else
throw(std::runtime_error("File not found: " + filename));

if(not config_.exact_filename_only())
{
filename = filename.substr(0, ifind);

unsigned istart = 0;
if(not is_file(filename+".1"+extension))
const std::string extension = config_.extension();
auto ifind = filename.rfind(extension);
if(ifind == filename.size()-extension.size())
{
ifind = filename.rfind('.');
if(ifind != std::string::npos and
std::all_of(filename.begin() + ifind + 1, filename.end(), ::isdigit))
filename = filename.substr(0, ifind);

unsigned istart = 0;
if(not is_file(filename+".1"+extension))
{
istart = std::stoi(filename.substr(ifind + 1));
filename = filename.substr(0, ifind);
ifind = filename.rfind('.');
if(ifind != std::string::npos and
std::all_of(filename.begin() + ifind + 1, filename.end(), ::isdigit))
{
istart = std::stoi(filename.substr(ifind + 1));
filename = filename.substr(0, ifind);
}
}
}

bool fragment_found = true;
for(unsigned i=istart+istride; fragment_found and (config_.max_file_fragments()==0 or
filenames_.size()<config_.max_file_fragments()) ; i+=istride)
{
fragment_found = false;
std::string fragment_i { std::to_string(i) };
do {
std::string filename_i { filename+"."+fragment_i+extension };
if(is_file(filename_i)) {
filenames_.emplace_back(filename_i);
fragment_found = true;
} else {
fragment_i = std::string("0") + fragment_i;
}
}while(not fragment_found and fragment_i.size() <= 6);
bool fragment_found = true;
for(unsigned i=istart+istride; fragment_found and (config_.max_file_fragments()==0 or
filenames_.size()<config_.max_file_fragments()) ; i+=istride)
{
fragment_found = false;
std::string fragment_i { std::to_string(i) };
do {
std::string filename_i { filename+"."+fragment_i+extension };
if(is_file(filename_i)) {
filenames_.emplace_back(filename_i);
fragment_found = true;
} else {
fragment_i = std::string("0") + fragment_i;
}
}while(not fragment_found and fragment_i.size() <= 6);
}
}
}
}
Expand Down

0 comments on commit 2332194

Please sign in to comment.