Skip to content

Commit

Permalink
Defer reading of headers where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
sfegan committed Sep 30, 2024
1 parent 4a0d61a commit e2f1a6b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 38 deletions.
2 changes: 2 additions & 0 deletions include/iact_data/zfits_acada_data_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ class ZFITSACADACameraEventDataSource:
using BaseDataSource::open_file;

private:
void load_run_header();
config_type config_;
bool run_header_loaded_ = false;
header_type* run_header_ = nullptr;
data_stream_type* data_stream_ = nullptr;
};
Expand Down
12 changes: 5 additions & 7 deletions src/iact_data/parallel_event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,8 @@ process_cta_zfits_run(const std::string& filename,
CTAZFITSDataSource cta_file(filename, config.decoder(), config.zfits());
TelescopeRunConfiguration* run_config = cta_file.get_run_configuration();

auto fragments = cta_file.all_fragment_names();
if(fragments.empty()) {
// This should never happen (I guess) as we should already have had an exception
throw std::runtime_error("process_cta_zfits_run: file not found: " + filename);
}

unsigned nthread = std::min(std::max(config.nthread(), 1U), unsigned(fragments.size()));
unsigned nfragments = cta_file.num_fragments();
unsigned nthread = std::min(std::max(config.nthread(), 1U), nfragments);

if(nthread == 1) {
try {
Expand Down Expand Up @@ -400,6 +395,9 @@ void ParallelEventDispatcher::do_parallel_dispatcher_loops(
} catch(const std::exception& x) {
util::log::LOG(util::log::FATAL) << x.what();
++exceptions_raised;
delete bsrc;
--threads_active;
return;
}
delete bsrc;
bsrc = src_factory->new_data_source();
Expand Down
104 changes: 73 additions & 31 deletions src/iact_data/zfits_acada_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ namespace {
template<> std::string default_message_table_name<ACADA_EventMessage_R1v1>() { return "Events"; }
template<> std::string default_message_table_name<ACADA_DataStreamMessage_R1v1>() { return "DataStream"; }

template<typename Message> bool is_message_type() { return true; }
template<> bool is_message_type<void>() { return true; }

template<typename Message> inline Message* read_one_message_from_zfits_table(
const std::string& filename, const std::string& tablename, bool suppress_file_record = false)
{
Expand Down Expand Up @@ -344,31 +347,6 @@ ZFITSSingleFileACADACameraEventDataSource(const std::string& filename, const con
if(!is_readable(filename_))
throw std::runtime_error(std::string("File not readable: ")+filename_);

if(!config_.dont_read_run_header())
{
try {
run_header_ = read_one_message_from_zfits_table<header_type>(
filename_, config_.run_header_table_name(), /* suppress_file_record = */true);
} catch(...) {
if(!config_.ignore_run_header_errors())
LOG(WARNING)
<< "ZFITSSingleFileACADACameraEventDataSource<" << MessageSet::name()
<< ">: Could not read run header from "
<< filename_ << " -> " << config_.run_header_table_name();
}

try {
data_stream_ = read_one_message_from_zfits_table<data_stream_type>(
filename_, config_.data_stream_table_name(), /* suppress_file_record = */true);
} catch(...) {
if(!config_.ignore_run_header_errors())
LOG(WARNING)
<< "ZFITSSingleFileACADACameraEventDataSource<" << MessageSet::name()
<< ">: Could not read data stream from "
<< filename_ << " -> " << config_.data_stream_table_name();
}
}

zfits_ = new ZFITSSingleFileSingleMessageDataSource<event_type>(filename_, config_.events_table_name());
}

Expand Down Expand Up @@ -433,17 +411,44 @@ template<typename MessageSet>
typename MessageSet::header_type* ZFITSSingleFileACADACameraEventDataSource<MessageSet>::
get_run_header()
{
if(run_header_ == nullptr and not config_.dont_read_run_header())
{
try {
run_header_ = read_one_message_from_zfits_table<header_type>(
filename_, config_.run_header_table_name(), /* suppress_file_record = */ true);
} catch(...) {
if(!config_.ignore_run_header_errors())
LOG(WARNING)
<< "ZFITSSingleFileACADACameraEventDataSource<" << MessageSet::name()
<< ">: Could not read run header from "
<< filename_ << " -> " << config_.run_header_table_name();
}
}

return copy_message(run_header_);
}

template<typename MessageSet>
typename MessageSet::data_stream_type* ZFITSSingleFileACADACameraEventDataSource<MessageSet>::
get_data_stream()
{
if(is_message_type<data_stream_type>() and data_stream_ == nullptr and not config_.dont_read_run_header())
{
try {
data_stream_ = read_one_message_from_zfits_table<data_stream_type>(
filename_, config_.data_stream_table_name(), /* suppress_file_record = */true);
} catch(...) {
if(!config_.ignore_run_header_errors())
LOG(WARNING)
<< "ZFITSSingleFileACADACameraEventDataSource<" << MessageSet::name()
<< ">: Could not read data stream from "
<< filename_ << " -> " << config_.data_stream_table_name();
}
}

return copy_message(data_stream_);
}


template<typename MessageSet>
typename ZFITSSingleFileACADACameraEventDataSource<MessageSet>::config_type
ZFITSSingleFileACADACameraEventDataSource<MessageSet>::default_config()
Expand Down Expand Up @@ -481,7 +486,9 @@ ZFITSACADACameraEventDataSource(const std::string& filename, const config_type&
run_header_ = source_->get_run_header();
data_stream_ = source_->get_data_stream();
}
if((run_header_ == nullptr or data_stream_ == nullptr) and opener_->num_sources() > 1) {
if((run_header_ == nullptr
or (is_message_type<data_stream_type>() and data_stream_ == nullptr))
and opener_->num_sources() > 1) {
// In this case the first file fragment is missing the RunHeader or DataStream, search
// for it in later fragments then reopen the first fragment

Expand All @@ -508,20 +515,55 @@ ZFITSACADACameraEventDataSource<MessageSet>::
delete_message(data_stream_);
}

template<typename MessageSet>
void ZFITSACADACameraEventDataSource<MessageSet>::load_run_header()
{
if(!run_header_loaded_) {
auto saved_isource = isource_;
if(isource_!=0 or source_==nullptr) {
isource_=0;
open_file();
}
if(source_) {
run_header_ = source_->get_run_header();
data_stream_ = source_->get_data_stream();
}
while((run_header_ == nullptr
or (is_message_type<data_stream_type>() and data_stream_ == nullptr))
and isource_<opener_->num_sources()-1)
{
++isource_;
open_file();
if(run_header_ == nullptr)
run_header_ = source_ ? source_->get_run_header() : nullptr;
if(data_stream_ == nullptr)
data_stream_ = source_ ? source_->get_data_stream() : nullptr;
}
if(isource_ != saved_isource) {
isource_ = saved_isource;
open_file();
}
run_header_loaded_ = true;
}
}

template<typename MessageSet>
typename MessageSet::header_type* ZFITSACADACameraEventDataSource<MessageSet>::
get_run_header()
{
if(!run_header_)return nullptr;
auto* run_header = new header_type();
run_header->CopyFrom(*run_header_);
return run_header;
if(run_header_ == nullptr) {
load_run_header();
}
return copy_message(run_header_);
}

template<typename MessageSet>
typename MessageSet::data_stream_type* ZFITSACADACameraEventDataSource<MessageSet>::
get_data_stream()
{
if(is_message_type<data_stream_type>() and data_stream_ == nullptr) {
load_run_header();
}
return copy_message(data_stream_);
}

Expand Down

0 comments on commit e2f1a6b

Please sign in to comment.