diff --git a/Detectors/TOF/workflow/CMakeLists.txt b/Detectors/TOF/workflow/CMakeLists.txt index 70ecfe4cb0ecc..02f504d4a3965 100644 --- a/Detectors/TOF/workflow/CMakeLists.txt +++ b/Detectors/TOF/workflow/CMakeLists.txt @@ -16,7 +16,6 @@ o2_add_library(TOFWorkflowUtils src/CompressedAnalysisTask.cxx src/EntropyEncoderSpec.cxx src/EntropyDecoderSpec.cxx - src/TOFIntegrateClusterWriterSpec.cxx src/TOFIntegrateClusterSpec.cxx src/TOFMergeIntegrateClusterSpec.cxx PUBLIC_LINK_LIBRARIES O2::Framework O2::DPLUtils O2::TOFBase O2::DataFormatsTOF O2::TOFReconstruction O2::TOFWorkflowIO O2::Steer O2::TOFCalibration) @@ -61,6 +60,11 @@ o2_add_executable(integrate-cluster-workflow COMPONENT_NAME tof PUBLIC_LINK_LIBRARIES O2::TOFWorkflowUtils) +o2_add_executable(integrate-cluster-reader-workflow + SOURCES src/cluster-integrator-reader.cxx + COMPONENT_NAME tof + PUBLIC_LINK_LIBRARIES O2::TOFWorkflowUtils) + o2_add_executable(merge-integrate-cluster-workflow SOURCES src/cluster-merge-integrator.cxx COMPONENT_NAME tof diff --git a/Detectors/TOF/workflow/include/TOFWorkflowUtils/TOFIntegrateClusterSpec.h b/Detectors/TOF/workflow/include/TOFWorkflowUtils/TOFIntegrateClusterSpec.h index f34edd832e057..6715896fe9440 100644 --- a/Detectors/TOF/workflow/include/TOFWorkflowUtils/TOFIntegrateClusterSpec.h +++ b/Detectors/TOF/workflow/include/TOFWorkflowUtils/TOFIntegrateClusterSpec.h @@ -19,7 +19,7 @@ namespace o2 namespace tof { -o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec(); +o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec(const bool disableWriter); } // end namespace tof } // end namespace o2 diff --git a/Detectors/TOF/workflow/src/TOFIntegrateClusterSpec.cxx b/Detectors/TOF/workflow/src/TOFIntegrateClusterSpec.cxx index 79eb7f8000cf3..e2e57dc7ad377 100644 --- a/Detectors/TOF/workflow/src/TOFIntegrateClusterSpec.cxx +++ b/Detectors/TOF/workflow/src/TOFIntegrateClusterSpec.cxx @@ -20,6 +20,7 @@ #include "Framework/ConfigParamRegistry.h" #include "DetectorsBase/GRPGeomHelper.h" #include "TOFBase/Geo.h" +#include "DetectorsBase/TFIDInfoHelper.h" using namespace o2::framework; @@ -32,7 +33,7 @@ class TOFIntegrateClusters : public Task { public: /// \constructor - TOFIntegrateClusters(std::shared_ptr req) : mCCDBRequest(req){}; + TOFIntegrateClusters(std::shared_ptr req, const bool disableWriter) : mCCDBRequest(req), mDisableWriter(disableWriter){}; void init(framework::InitContext& ic) final { @@ -72,7 +73,7 @@ class TOFIntegrateClusters : public Task std::transform(iTOFCNCl.begin(), iTOFCNCl.end(), iTOFCNCl.begin(), [sliceWidthMSinv](float& val) { return val * sliceWidthMSinv; }); std::transform(iTOFCqTot.begin(), iTOFCqTot.end(), iTOFCqTot.begin(), [sliceWidthMSinv](float& val) { return val * sliceWidthMSinv; }); - sendOutput(pc.outputs(), std::move(iTOFCNCl), std::move(iTOFCqTot)); + sendOutput(pc, std::move(iTOFCNCl), std::move(iTOFCqTot)); } void endOfStream(EndOfStreamContext& eos) final @@ -84,16 +85,23 @@ class TOFIntegrateClusters : public Task private: int mNSlicesTF = 11; ///< number of slices a TF is divided into + const bool mDisableWriter{false}; ///< flag if no ROOT output will be written std::shared_ptr mCCDBRequest; ///< info for CCDB request - void sendOutput(DataAllocator& output, o2::pmr::vector iTOFCNCl, o2::pmr::vector iTOFCqTot) + void sendOutput(ProcessingContext& pc, o2::pmr::vector iTOFCNCl, o2::pmr::vector iTOFCqTot) { - output.adoptContainer(Output{header::gDataOriginTOF, "ITOFCN"}, std::move(iTOFCNCl)); - output.adoptContainer(Output{header::gDataOriginTOF, "ITOFCQ"}, std::move(iTOFCqTot)); + pc.outputs().adoptContainer(Output{header::gDataOriginTOF, "ITOFCN"}, std::move(iTOFCNCl)); + pc.outputs().adoptContainer(Output{header::gDataOriginTOF, "ITOFCQ"}, std::move(iTOFCqTot)); + // in case of ROOT output also store the TFinfo in the TTree + if (!mDisableWriter) { + o2::dataformats::TFIDInfo tfinfo; + o2::base::TFIDInfoHelper::fillTFIDInfo(pc, tfinfo); + pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFTFID"}, tfinfo); + } } }; -o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec() +o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec(const bool disableWriter) { std::vector inputs; inputs.emplace_back("tofcluster", o2::header::gDataOriginTOF, "CLUSTERS", 0, Lifetime::Timeframe); @@ -109,12 +117,15 @@ o2::framework::DataProcessorSpec getTOFIntegrateClusterSpec() std::vector outputs; outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCN", 0, Lifetime::Sporadic); outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCQ", 0, Lifetime::Sporadic); + if (!disableWriter) { + outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFTFID", 0, Lifetime::Sporadic); + } return DataProcessorSpec{ "TOFIntegrateClusters", inputs, outputs, - AlgorithmSpec{adaptFromTask(ccdbRequest)}, + AlgorithmSpec{adaptFromTask(ccdbRequest, disableWriter)}, Options{{"nSlicesTF", VariantType::Int, 11, {"number of slices into which a TF is divided"}}}}; } diff --git a/Detectors/TOF/workflow/src/cluster-integrator-reader.cxx b/Detectors/TOF/workflow/src/cluster-integrator-reader.cxx new file mode 100644 index 0000000000000..21bf8ca64141f --- /dev/null +++ b/Detectors/TOF/workflow/src/cluster-integrator-reader.cxx @@ -0,0 +1,25 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "TOFWorkflowIO/TOFIntegrateClusterReaderSpec.h" +#include "CommonUtils/ConfigurableParam.h" +#include "Framework/ConfigParamSpec.h" + +using namespace o2::framework; + +#include "Framework/runDataProcessing.h" + +WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) +{ + WorkflowSpec wf; + wf.emplace_back(o2::tof::getTOFIntegrateClusterReaderSpec()); + return wf; +} diff --git a/Detectors/TOF/workflow/src/cluster-integrator.cxx b/Detectors/TOF/workflow/src/cluster-integrator.cxx index d9b75d40cecf8..dcc65c2890fb9 100644 --- a/Detectors/TOF/workflow/src/cluster-integrator.cxx +++ b/Detectors/TOF/workflow/src/cluster-integrator.cxx @@ -10,7 +10,7 @@ // or submit itself to any jurisdiction. #include "TOFWorkflowUtils/TOFIntegrateClusterSpec.h" -#include "TOFWorkflowUtils/TOFIntegrateClusterWriterSpec.h" +#include "TOFWorkflowIO/TOFIntegrateClusterWriterSpec.h" #include "CommonUtils/ConfigurableParam.h" #include "Framework/ConfigParamSpec.h" @@ -38,8 +38,9 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) WorkflowSpec wf; // Update the (declared) parameters if changed from the command line o2::conf::ConfigurableParam::updateFromString(cfgc.options().get("configKeyValues")); - wf.emplace_back(o2::tof::getTOFIntegrateClusterSpec()); - if (!cfgc.options().get("disable-root-output")) { + const bool disableWriter = cfgc.options().get("disable-root-output"); + wf.emplace_back(o2::tof::getTOFIntegrateClusterSpec(disableWriter)); + if (!disableWriter) { wf.emplace_back(o2::tof::getTOFIntegrateClusterWriterSpec()); } diff --git a/Detectors/TOF/workflowIO/CMakeLists.txt b/Detectors/TOF/workflowIO/CMakeLists.txt index 39cd64fcfe3fa..9a0a6a3c38ea4 100644 --- a/Detectors/TOF/workflowIO/CMakeLists.txt +++ b/Detectors/TOF/workflowIO/CMakeLists.txt @@ -21,5 +21,7 @@ o2_add_library(TOFWorkflowIO src/TOFCalibWriterSpec.cxx src/TOFMatchedReaderSpec.cxx src/CalibInfoReaderSpec.cxx - PUBLIC_LINK_LIBRARIES O2::Framework O2::DPLUtils O2::TOFBase O2::DataFormatsTOF O2::TOFReconstruction) + src/TOFIntegrateClusterWriterSpec.cxx + src/TOFIntegrateClusterReaderSpec.cxx + PUBLIC_LINK_LIBRARIES O2::Framework O2::DPLUtils O2::TOFBase O2::DataFormatsTOF O2::TOFReconstruction O2::Algorithm) diff --git a/Detectors/TOF/workflowIO/include/TOFWorkflowIO/TOFIntegrateClusterReaderSpec.h b/Detectors/TOF/workflowIO/include/TOFWorkflowIO/TOFIntegrateClusterReaderSpec.h new file mode 100644 index 0000000000000..516b639f2e3c7 --- /dev/null +++ b/Detectors/TOF/workflowIO/include/TOFWorkflowIO/TOFIntegrateClusterReaderSpec.h @@ -0,0 +1,27 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_TOF_TOFINTEGRATECLUSTERREADERSPEC_SPEC +#define O2_TOF_TOFINTEGRATECLUSTERREADERSPEC_SPEC + +#include "Framework/DataProcessorSpec.h" + +namespace o2 +{ +namespace tof +{ + +o2::framework::DataProcessorSpec getTOFIntegrateClusterReaderSpec(); + +} // end namespace tof +} // end namespace o2 + +#endif diff --git a/Detectors/TOF/workflow/include/TOFWorkflowUtils/TOFIntegrateClusterWriterSpec.h b/Detectors/TOF/workflowIO/include/TOFWorkflowIO/TOFIntegrateClusterWriterSpec.h similarity index 100% rename from Detectors/TOF/workflow/include/TOFWorkflowUtils/TOFIntegrateClusterWriterSpec.h rename to Detectors/TOF/workflowIO/include/TOFWorkflowIO/TOFIntegrateClusterWriterSpec.h diff --git a/Detectors/TOF/workflowIO/src/TOFIntegrateClusterReaderSpec.cxx b/Detectors/TOF/workflowIO/src/TOFIntegrateClusterReaderSpec.cxx new file mode 100644 index 0000000000000..05ccde2427d2e --- /dev/null +++ b/Detectors/TOF/workflowIO/src/TOFIntegrateClusterReaderSpec.cxx @@ -0,0 +1,158 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file TOFIntegrateClusterReaderSpec.cxx + +#include +#include + +#include "TOFWorkflowIO/TOFIntegrateClusterReaderSpec.h" +#include "Framework/Task.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamRegistry.h" +#include "CommonUtils/NameConf.h" +#include "CommonDataFormat/TFIDInfo.h" +#include "Algorithm/RangeTokenizer.h" +#include "TChain.h" +#include "TGrid.h" + +using namespace o2::framework; + +namespace o2 +{ +namespace tof +{ + +class IntegratedClusterReader : public Task +{ + public: + IntegratedClusterReader() = default; + ~IntegratedClusterReader() override = default; + void init(InitContext& ic) final; + void run(ProcessingContext& pc) final; + + private: + void connectTrees(); + + int mChainEntry = 0; ///< processed entries in the chain + std::unique_ptr mChain; ///< input TChain + std::vector mFileNames; ///< input files + std::vector mTOFCNCl, *mTOFCNClPtr = &mTOFCNCl; ///< branch integrated number of cluster TOF currents + std::vector mTOFCqTot, *mTOFCqTotPtr = &mTOFCqTot; ///< branch integrated q TOF currents + o2::dataformats::TFIDInfo mTFinfo, *mTFinfoPtr = &mTFinfo; ///< branch TFIDInfo for injecting correct time + std::vector> mIndices; ///< firstTfOrbit, file, index +}; + +void IntegratedClusterReader::init(InitContext& ic) +{ + const auto dontCheckFileAccess = ic.options().get("dont-check-file-access"); + auto fileList = o2::RangeTokenizer::tokenize(ic.options().get("tofcurrents-infiles")); + + // check if only one input file (a txt file contaning a list of files is provided) + if (fileList.size() == 1) { + if (boost::algorithm::ends_with(fileList.front(), "txt")) { + LOGP(info, "Reading files from input file {}", fileList.front()); + std::ifstream is(fileList.front()); + std::istream_iterator start(is); + std::istream_iterator end; + std::vector fileNamesTmp(start, end); + fileList = fileNamesTmp; + } + } + + const std::string inpDir = o2::utils::Str::rectifyDirectory(ic.options().get("input-dir")); + for (const auto& file : fileList) { + if ((file.find("alien://") == 0) && !gGrid && !TGrid::Connect("alien://")) { + LOG(fatal) << "Failed to open alien connection"; + } + const auto fileDir = o2::utils::Str::concat_string(inpDir, file); + if (!dontCheckFileAccess) { + std::unique_ptr filePtr(TFile::Open(fileDir.data())); + if (!filePtr || !filePtr->IsOpen() || filePtr->IsZombie()) { + LOGP(warning, "Could not open file {}", fileDir); + continue; + } + } + mFileNames.emplace_back(fileDir); + } + + if (mFileNames.size() == 0) { + LOGP(error, "No input files to process"); + } + connectTrees(); +} + +void IntegratedClusterReader::run(ProcessingContext& pc) +{ + // check time order inside the TChain + if (mChainEntry == 0) { + mIndices.clear(); + mIndices.reserve(mChain->GetEntries()); + for (unsigned long i = 0; i < mChain->GetEntries(); i++) { + mChain->GetEntry(i); + mIndices.emplace_back(std::make_pair(mTFinfo.firstTForbit, i)); + } + std::sort(mIndices.begin(), mIndices.end()); + } + + LOGP(debug, "Processing entry {}", mIndices[mChainEntry].second); + mChain->GetEntry(mIndices[mChainEntry++].second); + + // inject correct timing informations + auto& timingInfo = pc.services().get(); + timingInfo.firstTForbit = mTFinfo.firstTForbit; + timingInfo.tfCounter = mTFinfo.tfCounter; + timingInfo.runNumber = mTFinfo.runNumber; + timingInfo.creation = mTFinfo.creation; + + pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFCN"}, mTOFCNCl); + pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFCQ"}, mTOFCqTot); + usleep(100); + + if (mChainEntry >= mChain->GetEntries()) { + pc.services().get().endOfStream(); + pc.services().get().readyToQuit(QuitRequest::Me); + } +} + +void IntegratedClusterReader::connectTrees() +{ + mChain.reset(new TChain("itofc")); + for (const auto& file : mFileNames) { + LOGP(info, "Adding file to chain: {}", file); + mChain->AddFile(file.data()); + } + assert(mChain->GetEntries()); + mChain->SetBranchAddress("ITOFCN", &mTOFCNClPtr); + mChain->SetBranchAddress("ITOFCQ", &mTOFCqTotPtr); + mChain->SetBranchAddress("tfID", &mTFinfoPtr); +} + +DataProcessorSpec getTOFIntegrateClusterReaderSpec() +{ + std::vector outputs; + outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCN", 0, Lifetime::Sporadic); + outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCQ", 0, Lifetime::Sporadic); + + return DataProcessorSpec{ + "tof-integrated-cluster-reader", + Inputs{}, + outputs, + AlgorithmSpec{adaptFromTask()}, + Options{ + {"tofcurrents-infiles", VariantType::String, "o2currents_tof.root", {"comma-separated list of input files or .txt file containing list of input files"}}, + {"input-dir", VariantType::String, "none", {"Input directory"}}, + {"dont-check-file-access", VariantType::Bool, false, {"Deactivate check if all files are accessible before adding them to the list of files"}}, + }}; +} + +} // namespace tof +} // namespace o2 diff --git a/Detectors/TOF/workflow/src/TOFIntegrateClusterWriterSpec.cxx b/Detectors/TOF/workflowIO/src/TOFIntegrateClusterWriterSpec.cxx similarity index 80% rename from Detectors/TOF/workflow/src/TOFIntegrateClusterWriterSpec.cxx rename to Detectors/TOF/workflowIO/src/TOFIntegrateClusterWriterSpec.cxx index f6b7c8f2ea2cc..991992783de3a 100644 --- a/Detectors/TOF/workflow/src/TOFIntegrateClusterWriterSpec.cxx +++ b/Detectors/TOF/workflowIO/src/TOFIntegrateClusterWriterSpec.cxx @@ -12,8 +12,9 @@ /// @file TOFIntegrateClusterWriterSpec.cxx #include -#include "TOFWorkflowUtils/TOFIntegrateClusterWriterSpec.h" +#include "TOFWorkflowIO/TOFIntegrateClusterWriterSpec.h" #include "DPLUtils/MakeRootTreeWriterSpec.h" +#include "CommonDataFormat/TFIDInfo.h" using namespace o2::framework; @@ -28,9 +29,10 @@ DataProcessorSpec getTOFIntegrateClusterWriterSpec() { return MakeRootTreeWriterSpec("tof-currents-writer", "o2currents_tof.root", - "ITOFC", + "itofc", BranchDefinition>{InputSpec{"itofcn", o2::header::gDataOriginTOF, "ITOFCN", 0}, "ITOFCN", 1}, - BranchDefinition>{InputSpec{"itofcq", o2::header::gDataOriginTOF, "ITOFCQ", 0}, "ITOFCQ", 1})(); + BranchDefinition>{InputSpec{"itofcq", o2::header::gDataOriginTOF, "ITOFCQ", 0}, "ITOFCQ", 1}, + BranchDefinition{InputSpec{"itoftfid", o2::header::gDataOriginTOF, "ITOFTFID", 0}, "tfID", 1})(); } } // namespace tof