Skip to content

Commit

Permalink
Adding workflow to read in integrated clusters from root files
Browse files Browse the repository at this point in the history
- writing TFIDInfo to TTree
- moving writer to workflowIO
  • Loading branch information
matthias-kleiner committed Jan 24, 2023
1 parent 858a822 commit 773b62b
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 16 deletions.
6 changes: 5 additions & 1 deletion Detectors/TOF/workflow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ o2_add_library(TOFWorkflowUtils
src/CompressedAnalysisTask.cxx
src/EntropyEncoderSpec.cxx
src/EntropyDecoderSpec.cxx
src/TOFIntegrateClusterWriterSpec.cxx
src/TOFIntegrateClusterSpec.cxx
src/TOFMergeIntegrateClusterSpec.cxx
PUBLIC_LINK_LIBRARIES O2::Framework O2::DPLUtils O2::TOFBase O2::DataFormatsTOF O2::TOFReconstruction O2::TOFWorkflowIO O2::Steer O2::TOFCalibration)
Expand Down Expand Up @@ -61,6 +60,11 @@ o2_add_executable(integrate-cluster-workflow
COMPONENT_NAME tof
PUBLIC_LINK_LIBRARIES O2::TOFWorkflowUtils)

o2_add_executable(integrate-cluster-reader-workflow
SOURCES src/cluster-integrator-reader.cxx
COMPONENT_NAME tof
PUBLIC_LINK_LIBRARIES O2::TOFWorkflowUtils)

o2_add_executable(merge-integrate-cluster-workflow
SOURCES src/cluster-merge-integrator.cxx
COMPONENT_NAME tof
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace o2
namespace tof
{

o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec();
o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec(const bool disableWriter);

} // end namespace tof
} // end namespace o2
Expand Down
25 changes: 18 additions & 7 deletions Detectors/TOF/workflow/src/TOFIntegrateClusterSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "Framework/ConfigParamRegistry.h"
#include "DetectorsBase/GRPGeomHelper.h"
#include "TOFBase/Geo.h"
#include "DetectorsBase/TFIDInfoHelper.h"

using namespace o2::framework;

Expand All @@ -32,7 +33,7 @@ class TOFIntegrateClusters : public Task
{
public:
/// \constructor
TOFIntegrateClusters(std::shared_ptr<o2::base::GRPGeomRequest> req) : mCCDBRequest(req){};
TOFIntegrateClusters(std::shared_ptr<o2::base::GRPGeomRequest> req, const bool disableWriter) : mCCDBRequest(req), mDisableWriter(disableWriter){};

void init(framework::InitContext& ic) final
{
Expand Down Expand Up @@ -72,7 +73,7 @@ class TOFIntegrateClusters : public Task
std::transform(iTOFCNCl.begin(), iTOFCNCl.end(), iTOFCNCl.begin(), [sliceWidthMSinv](float& val) { return val * sliceWidthMSinv; });
std::transform(iTOFCqTot.begin(), iTOFCqTot.end(), iTOFCqTot.begin(), [sliceWidthMSinv](float& val) { return val * sliceWidthMSinv; });

sendOutput(pc.outputs(), std::move(iTOFCNCl), std::move(iTOFCqTot));
sendOutput(pc, std::move(iTOFCNCl), std::move(iTOFCqTot));
}

void endOfStream(EndOfStreamContext& eos) final
Expand All @@ -84,16 +85,23 @@ class TOFIntegrateClusters : public Task

private:
int mNSlicesTF = 11; ///< number of slices a TF is divided into
const bool mDisableWriter{false}; ///< flag if no ROOT output will be written
std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest; ///< info for CCDB request

void sendOutput(DataAllocator& output, o2::pmr::vector<float> iTOFCNCl, o2::pmr::vector<float> iTOFCqTot)
void sendOutput(ProcessingContext& pc, o2::pmr::vector<float> iTOFCNCl, o2::pmr::vector<float> iTOFCqTot)
{
output.adoptContainer(Output{header::gDataOriginTOF, "ITOFCN"}, std::move(iTOFCNCl));
output.adoptContainer(Output{header::gDataOriginTOF, "ITOFCQ"}, std::move(iTOFCqTot));
pc.outputs().adoptContainer(Output{header::gDataOriginTOF, "ITOFCN"}, std::move(iTOFCNCl));
pc.outputs().adoptContainer(Output{header::gDataOriginTOF, "ITOFCQ"}, std::move(iTOFCqTot));
// in case of ROOT output also store the TFinfo in the TTree
if (!mDisableWriter) {
o2::dataformats::TFIDInfo tfinfo;
o2::base::TFIDInfoHelper::fillTFIDInfo(pc, tfinfo);
pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFTFID"}, tfinfo);
}
}
};

o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec()
o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec(const bool disableWriter)
{
std::vector<InputSpec> inputs;
inputs.emplace_back("tofcluster", o2::header::gDataOriginTOF, "CLUSTERS", 0, Lifetime::Timeframe);
Expand All @@ -109,12 +117,15 @@ o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec()
std::vector<OutputSpec> outputs;
outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCN", 0, Lifetime::Sporadic);
outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCQ", 0, Lifetime::Sporadic);
if (!disableWriter) {
outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFTFID", 0, Lifetime::Sporadic);
}

return DataProcessorSpec{
"TOFIntegrateClusters",
inputs,
outputs,
AlgorithmSpec{adaptFromTask<TOFIntegrateClusters>(ccdbRequest)},
AlgorithmSpec{adaptFromTask<TOFIntegrateClusters>(ccdbRequest, disableWriter)},
Options{{"nSlicesTF", VariantType::Int, 11, {"number of slices into which a TF is divided"}}}};
}

Expand Down
25 changes: 25 additions & 0 deletions Detectors/TOF/workflow/src/cluster-integrator-reader.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "TOFWorkflowIO/TOFIntegrateClusterReaderSpec.h"
#include "CommonUtils/ConfigurableParam.h"
#include "Framework/ConfigParamSpec.h"

using namespace o2::framework;

#include "Framework/runDataProcessing.h"

WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
{
WorkflowSpec wf;
wf.emplace_back(o2::tof::getTOFIntegrateClusterReaderSpec());
return wf;
}
7 changes: 4 additions & 3 deletions Detectors/TOF/workflow/src/cluster-integrator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// or submit itself to any jurisdiction.

#include "TOFWorkflowUtils/TOFIntegrateClusterSpec.h"
#include "TOFWorkflowUtils/TOFIntegrateClusterWriterSpec.h"
#include "TOFWorkflowIO/TOFIntegrateClusterWriterSpec.h"
#include "CommonUtils/ConfigurableParam.h"
#include "Framework/ConfigParamSpec.h"

Expand Down Expand Up @@ -38,8 +38,9 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
WorkflowSpec wf;
// Update the (declared) parameters if changed from the command line
o2::conf::ConfigurableParam::updateFromString(cfgc.options().get<std::string>("configKeyValues"));
wf.emplace_back(o2::tof::getTOFIntegrateClusterSpec());
if (!cfgc.options().get<bool>("disable-root-output")) {
const bool disableWriter = cfgc.options().get<bool>("disable-root-output");
wf.emplace_back(o2::tof::getTOFIntegrateClusterSpec(disableWriter));
if (!disableWriter) {
wf.emplace_back(o2::tof::getTOFIntegrateClusterWriterSpec());
}

Expand Down
4 changes: 3 additions & 1 deletion Detectors/TOF/workflowIO/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ o2_add_library(TOFWorkflowIO
src/TOFCalibWriterSpec.cxx
src/TOFMatchedReaderSpec.cxx
src/CalibInfoReaderSpec.cxx
PUBLIC_LINK_LIBRARIES O2::Framework O2::DPLUtils O2::TOFBase O2::DataFormatsTOF O2::TOFReconstruction)
src/TOFIntegrateClusterWriterSpec.cxx
src/TOFIntegrateClusterReaderSpec.cxx
PUBLIC_LINK_LIBRARIES O2::Framework O2::DPLUtils O2::TOFBase O2::DataFormatsTOF O2::TOFReconstruction O2::Algorithm)

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_TOF_TOFINTEGRATECLUSTERREADERSPEC_SPEC
#define O2_TOF_TOFINTEGRATECLUSTERREADERSPEC_SPEC

#include "Framework/DataProcessorSpec.h"

namespace o2
{
namespace tof
{

o2::framework::DataProcessorSpec getTOFIntegrateClusterReaderSpec();

} // end namespace tof
} // end namespace o2

#endif
158 changes: 158 additions & 0 deletions Detectors/TOF/workflowIO/src/TOFIntegrateClusterReaderSpec.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

/// @file TOFIntegrateClusterReaderSpec.cxx

#include <vector>
#include <boost/algorithm/string/predicate.hpp>

#include "TOFWorkflowIO/TOFIntegrateClusterReaderSpec.h"
#include "Framework/Task.h"
#include "Framework/ControlService.h"
#include "Framework/ConfigParamRegistry.h"
#include "CommonUtils/NameConf.h"
#include "CommonDataFormat/TFIDInfo.h"
#include "Algorithm/RangeTokenizer.h"
#include "TChain.h"
#include "TGrid.h"

using namespace o2::framework;

namespace o2
{
namespace tof
{

class IntegratedClusterReader : public Task
{
public:
IntegratedClusterReader() = default;
~IntegratedClusterReader() override = default;
void init(InitContext& ic) final;
void run(ProcessingContext& pc) final;

private:
void connectTrees();

int mChainEntry = 0; ///< processed entries in the chain
std::unique_ptr<TChain> mChain; ///< input TChain
std::vector<std::string> mFileNames; ///< input files
std::vector<float> mTOFCNCl, *mTOFCNClPtr = &mTOFCNCl; ///< branch integrated number of cluster TOF currents
std::vector<float> mTOFCqTot, *mTOFCqTotPtr = &mTOFCqTot; ///< branch integrated q TOF currents
o2::dataformats::TFIDInfo mTFinfo, *mTFinfoPtr = &mTFinfo; ///< branch TFIDInfo for injecting correct time
std::vector<std::pair<unsigned long, int>> mIndices; ///< firstTfOrbit, file, index
};

void IntegratedClusterReader::init(InitContext& ic)
{
const auto dontCheckFileAccess = ic.options().get<bool>("dont-check-file-access");
auto fileList = o2::RangeTokenizer::tokenize<std::string>(ic.options().get<std::string>("tofcurrents-infiles"));

// check if only one input file (a txt file contaning a list of files is provided)
if (fileList.size() == 1) {
if (boost::algorithm::ends_with(fileList.front(), "txt")) {
LOGP(info, "Reading files from input file {}", fileList.front());
std::ifstream is(fileList.front());
std::istream_iterator<std::string> start(is);
std::istream_iterator<std::string> end;
std::vector<std::string> fileNamesTmp(start, end);
fileList = fileNamesTmp;
}
}

const std::string inpDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("input-dir"));
for (const auto& file : fileList) {
if ((file.find("alien://") == 0) && !gGrid && !TGrid::Connect("alien://")) {
LOG(fatal) << "Failed to open alien connection";
}
const auto fileDir = o2::utils::Str::concat_string(inpDir, file);
if (!dontCheckFileAccess) {
std::unique_ptr<TFile> filePtr(TFile::Open(fileDir.data()));
if (!filePtr || !filePtr->IsOpen() || filePtr->IsZombie()) {
LOGP(warning, "Could not open file {}", fileDir);
continue;
}
}
mFileNames.emplace_back(fileDir);
}

if (mFileNames.size() == 0) {
LOGP(error, "No input files to process");
}
connectTrees();
}

void IntegratedClusterReader::run(ProcessingContext& pc)
{
// check time order inside the TChain
if (mChainEntry == 0) {
mIndices.clear();
mIndices.reserve(mChain->GetEntries());
for (unsigned long i = 0; i < mChain->GetEntries(); i++) {
mChain->GetEntry(i);
mIndices.emplace_back(std::make_pair(mTFinfo.firstTForbit, i));
}
std::sort(mIndices.begin(), mIndices.end());
}

LOGP(debug, "Processing entry {}", mIndices[mChainEntry].second);
mChain->GetEntry(mIndices[mChainEntry++].second);

// inject correct timing informations
auto& timingInfo = pc.services().get<o2::framework::TimingInfo>();
timingInfo.firstTForbit = mTFinfo.firstTForbit;
timingInfo.tfCounter = mTFinfo.tfCounter;
timingInfo.runNumber = mTFinfo.runNumber;
timingInfo.creation = mTFinfo.creation;

pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFCN"}, mTOFCNCl);
pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFCQ"}, mTOFCqTot);
usleep(100);

if (mChainEntry >= mChain->GetEntries()) {
pc.services().get<ControlService>().endOfStream();
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
}
}

void IntegratedClusterReader::connectTrees()
{
mChain.reset(new TChain("itofc"));
for (const auto& file : mFileNames) {
LOGP(info, "Adding file to chain: {}", file);
mChain->AddFile(file.data());
}
assert(mChain->GetEntries());
mChain->SetBranchAddress("ITOFCN", &mTOFCNClPtr);
mChain->SetBranchAddress("ITOFCQ", &mTOFCqTotPtr);
mChain->SetBranchAddress("tfID", &mTFinfoPtr);
}

DataProcessorSpec getTOFIntegrateClusterReaderSpec()
{
std::vector<OutputSpec> outputs;
outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCN", 0, Lifetime::Sporadic);
outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCQ", 0, Lifetime::Sporadic);

return DataProcessorSpec{
"tof-integrated-cluster-reader",
Inputs{},
outputs,
AlgorithmSpec{adaptFromTask<IntegratedClusterReader>()},
Options{
{"tofcurrents-infiles", VariantType::String, "o2currents_tof.root", {"comma-separated list of input files or .txt file containing list of input files"}},
{"input-dir", VariantType::String, "none", {"Input directory"}},
{"dont-check-file-access", VariantType::Bool, false, {"Deactivate check if all files are accessible before adding them to the list of files"}},
}};
}

} // namespace tof
} // namespace o2
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
/// @file TOFIntegrateClusterWriterSpec.cxx

#include <vector>
#include "TOFWorkflowUtils/TOFIntegrateClusterWriterSpec.h"
#include "TOFWorkflowIO/TOFIntegrateClusterWriterSpec.h"
#include "DPLUtils/MakeRootTreeWriterSpec.h"
#include "CommonDataFormat/TFIDInfo.h"

using namespace o2::framework;

Expand All @@ -28,9 +29,10 @@ DataProcessorSpec getTOFIntegrateClusterWriterSpec()
{
return MakeRootTreeWriterSpec("tof-currents-writer",
"o2currents_tof.root",
"ITOFC",
"itofc",
BranchDefinition<std::vector<float>>{InputSpec{"itofcn", o2::header::gDataOriginTOF, "ITOFCN", 0}, "ITOFCN", 1},
BranchDefinition<std::vector<float>>{InputSpec{"itofcq", o2::header::gDataOriginTOF, "ITOFCQ", 0}, "ITOFCQ", 1})();
BranchDefinition<std::vector<float>>{InputSpec{"itofcq", o2::header::gDataOriginTOF, "ITOFCQ", 0}, "ITOFCQ", 1},
BranchDefinition<o2::dataformats::TFIDInfo>{InputSpec{"itoftfid", o2::header::gDataOriginTOF, "ITOFTFID", 0}, "tfID", 1})();
}

} // namespace tof
Expand Down

0 comments on commit 773b62b

Please sign in to comment.