Skip to content

Commit

Permalink
IDCs: Add option to write IDCDelta to calibration file (#10260)
Browse files Browse the repository at this point in the history
* IDCs: Add option to write IDCDelta to calibration file

* IDCs: fix creation of calib file

* IDCs: Fix start range of for loop for C-Side
  • Loading branch information
matthias-kleiner authored Nov 12, 2022
1 parent 4b9fc94 commit d869925
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Detectors/TPC/calibration/src/IDCAverageGroup.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void o2::tpc::IDCAverageGroup<o2::tpc::IDCAverageGroupTPC>::processIDCs(const Ca
const int cruEnd = (mSide == Side::A) ? CRU::MaxCRU / 2 : CRU::MaxCRU;

#pragma omp parallel for num_threads(sNThreads)
for (unsigned int i = 0; i < cruEnd; ++i) {
for (unsigned int i = cruStart; i < cruEnd; ++i) {
const unsigned int threadNum = omp_get_thread_num();
const CRU cru(i);
idcStruct[threadNum].setCRU(cru);
Expand Down
115 changes: 88 additions & 27 deletions Detectors/TPC/workflow/include/TPCWorkflow/TPCFactorizeIDCSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

#include <vector>
#include <fmt/format.h>
#include <filesystem>
#include "Framework/Task.h"
#include "Framework/ControlService.h"
#include "Framework/Logger.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DataTakingContext.h"
#include "DetectorsCommonDataFormats/FileMetaData.h"
#include "Headers/DataHeader.h"
#include "TPCCalibration/IDCFactorization.h"
#include "TPCCalibration/IDCAverageGroup.h"
Expand Down Expand Up @@ -59,9 +62,19 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
mDumpIDC0 = ic.options().get<bool>("dump-IDC0");
mDumpIDC1 = ic.options().get<bool>("dump-IDC1");
mDumpIDCDelta = ic.options().get<bool>("dump-IDCDelta");
mDumpIDCDeltaCalibData = ic.options().get<bool>("dump-IDCDelta-calib-data");
mDumpIDCs = ic.options().get<bool>("dump-IDCs");
mOffsetCCDB = ic.options().get<bool>("add-offset-for-CCDB-timestamp");
mDisableIDCDelta = ic.options().get<bool>("disable-IDCDelta");
mCalibFileDir = ic.options().get<std::string>("output-dir");
if (mCalibFileDir != "/dev/null") {
mCalibFileDir = o2::utils::Str::rectifyDirectory(mCalibFileDir);
}
mMetaFileDir = ic.options().get<std::string>("meta-output-dir");
if (mMetaFileDir != "/dev/null") {
mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir);
}

const std::string refGainMapFile = ic.options().get<std::string>("gainMapFile");
if (!refGainMapFile.empty()) {
LOGP(info, "Loading GainMap from file {}", refGainMapFile);
Expand Down Expand Up @@ -89,6 +102,12 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
LOGP(warning, "firstTF not Found!!! Found valid inputs {}. Setting {} as first TF", pc.inputs().countValidInputs(), mTFFirst);
}

// set data taking context only once
if (mSetDataTakingCont) {
mDataTakingContext = pc.services().get<DataTakingContext>();
mSetDataTakingCont = false;
}

const long relTF = (mTFFirst == -1) ? 0 : (currTF - mTFFirst) / mNTFsBuffer;

// loop over input data
Expand Down Expand Up @@ -164,29 +183,34 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
static constexpr header::DataDescription getDataDescriptionCCDBIDCPadFlag() { return header::DataDescription{"TPC_CalibFlags"}; }

private:
const std::vector<uint32_t> mCRUs{}; ///< CRUs to process in this instance
unsigned int mProcessedCRUs{}; ///< number of processed CRUs to keep track of when the writing to CCDB etc. will be done
IDCFactorization mIDCFactorization; ///< object aggregating the IDCs and performing the factorization of the IDCs
IDCAverageGroup<IDCAverageGroupTPC> mIDCGrouping; ///< object for averaging and grouping of the IDCs
const IDCDeltaCompression mCompressionDeltaIDC{}; ///< compression type for IDC Delta
const bool mUsePrecisetimeStamp{true}; ///< use precise time stamp when writing to CCDB
const bool mSendOutFFT{false}; ///< flag if the output will be send for the FFT
const bool mSendOutCCDB{false}; ///< sending the outputs for ccdb populator
long mTFFirst{-1}; ///< first TF of current aggregation interval
bool mUpdateGroupingPar{true}; ///< flag to set if grouping parameters should be updated or not
const int mLaneId{0}; ///< the id of the current process within the parallel pipeline
std::vector<Side> mSides{}; ///< processed TPC sides
const int mNTFsBuffer{1}; ///< number of TFs for which the IDCs will be buffered
std::unique_ptr<CalDet<PadFlags>> mPadFlagsMap; ///< status flag for each pad (i.e. if the pad is dead). This map is buffered to check if something changed, when a new map is created
int mNOrbitsIDC{12}; ///< Number of orbits over which the IDCs are integrated.
bool mDumpIDC0{false}; ///< Dump IDC0 to file
bool mDumpIDC1{false}; ///< Dump IDC1 to file
bool mDumpIDCDelta{false}; ///< Dump IDCDelta to file
bool mDumpIDCs{false}; ///< dump IDCs to file
bool mOffsetCCDB{false}; ///< flag for setting and offset for CCDB timestamp
bool mDisableIDCDelta{false}; ///< disable the processing and storage of IDCDelta
dataformats::Pair<long, int> mTFInfo{}; ///< orbit reset time for CCDB time stamp writing
bool mEnableWritingPadStatusMap{false}; ///< do not store the pad status map in the CCDB
const std::vector<uint32_t> mCRUs{}; ///< CRUs to process in this instance
unsigned int mProcessedCRUs{}; ///< number of processed CRUs to keep track of when the writing to CCDB etc. will be done
std::string mMetaFileDir{};
std::string mCalibFileDir{};
IDCFactorization mIDCFactorization; ///< object aggregating the IDCs and performing the factorization of the IDCs
IDCAverageGroup<IDCAverageGroupTPC> mIDCGrouping; ///< object for averaging and grouping of the IDCs
const IDCDeltaCompression mCompressionDeltaIDC{}; ///< compression type for IDC Delta
const bool mUsePrecisetimeStamp{true}; ///< use precise time stamp when writing to CCDB
const bool mSendOutFFT{false}; ///< flag if the output will be send for the FFT
const bool mSendOutCCDB{false}; ///< sending the outputs for ccdb populator
long mTFFirst{-1}; ///< first TF of current aggregation interval
bool mUpdateGroupingPar{true}; ///< flag to set if grouping parameters should be updated or not
const int mLaneId{0}; ///< the id of the current process within the parallel pipeline
std::vector<Side> mSides{}; ///< processed TPC sides
const int mNTFsBuffer{1}; ///< number of TFs for which the IDCs will be buffered
std::unique_ptr<CalDet<PadFlags>> mPadFlagsMap; ///< status flag for each pad (i.e. if the pad is dead). This map is buffered to check if something changed, when a new map is created
int mNOrbitsIDC{12}; ///< Number of orbits over which the IDCs are integrated.
bool mDumpIDC0{false}; ///< Dump IDC0 to file
bool mDumpIDC1{false}; ///< Dump IDC1 to file
bool mDumpIDCDelta{false}; ///< Dump IDCDelta to file
bool mDumpIDCDeltaCalibData{false}; ///< dump the IDC Delta as a calibration file
bool mDumpIDCs{false}; ///< dump IDCs to file
bool mOffsetCCDB{false}; ///< flag for setting and offset for CCDB timestamp
bool mDisableIDCDelta{false}; ///< disable the processing and storage of IDCDelta
dataformats::Pair<long, int> mTFInfo{}; ///< orbit reset time for CCDB time stamp writing
bool mEnableWritingPadStatusMap{false}; ///< do not store the pad status map in the CCDB
o2::framework::DataTakingContext mDataTakingContext{};
bool mSetDataTakingCont{true};
const std::vector<InputSpec> mFilter = {{"idcagg", ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeIDCSpec::getDataDescriptionIDC(mLaneId)}, Lifetime::Sporadic}}; ///< filter for looping over input data

void sendOutput(DataAllocator& output, const long timeStampStart)
Expand Down Expand Up @@ -286,7 +310,7 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
totalTime += time.count();
}

if (!mDisableIDCDelta) {
if (!mDisableIDCDelta || mDumpIDCDeltaCalibData) {
start = timer::now();
for (unsigned int iChunk = 0; iChunk < mIDCFactorization.getNChunks(side); ++iChunk) {
auto startGrouping = timer::now();
Expand Down Expand Up @@ -325,9 +349,43 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
imageIDCDelta = o2::ccdb::CcdbApi::createObjectImage(&idcDelta, &ccdbInfoIDCDelta);
break;
}
LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", ccdbInfoIDCDelta.getPath(), ccdbInfoIDCDelta.getFileName(), imageIDCDelta->size(), ccdbInfoIDCDelta.getStartValidityTimestamp(), ccdbInfoIDCDelta.getEndValidityTimestamp());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBIDCDelta(), iChunk}, *imageIDCDelta.get());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBIDCDelta(), iChunk}, ccdbInfoIDCDelta);

if (!mDisableIDCDelta) {
LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", ccdbInfoIDCDelta.getPath(), ccdbInfoIDCDelta.getFileName(), imageIDCDelta->size(), ccdbInfoIDCDelta.getStartValidityTimestamp(), ccdbInfoIDCDelta.getEndValidityTimestamp());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBIDCDelta(), iChunk}, *imageIDCDelta.get());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBIDCDelta(), iChunk}, ccdbInfoIDCDelta);
}

if (mDumpIDCDeltaCalibData && mCalibFileDir != "/dev/null") {
const std::string sideStr = sideA ? "A" : "C";
std::string calibFName = fmt::format("IDCDelta_side{}_cal_data_{}.root", sideStr, ccdbInfoIDCDelta.getStartValidityTimestamp());
try {
std::ofstream calFile(fmt::format("{}{}", mCalibFileDir, calibFName), std::ios::out | std::ios::binary);
calFile.write(imageIDCDelta->data(), imageIDCDelta->size());
calFile.close();
} catch (std::exception const& e) {
LOG(error) << "Failed to store IDC calibration data file " << calibFName << ", reason: " << e.what();
}

if (mMetaFileDir != "/dev/null") {
o2::dataformats::FileMetaData calMetaData;
calMetaData.fillFileData(calibFName);
calMetaData.setDataTakingContext(mDataTakingContext);
calMetaData.type = "calib";
calMetaData.priority = "low";
auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, calibFName);
auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, calibFName);
try {
std::ofstream metaFileOut(metaFileNameTmp);
metaFileOut << calMetaData;
metaFileOut.close();
std::filesystem::rename(metaFileNameTmp, metaFileName);
} catch (std::exception const& e) {
LOG(error) << "Failed to store CTF meta data file " << metaFileName << ", reason: " << e.what();
}
}
}

auto stopCCDBIDCDelta = timer::now();
time = stopCCDBIDCDelta - startCCDBIDCDelta;
LOGP(info, "Compression and CCDB object creation time: {}", time.count());
Expand Down Expand Up @@ -408,7 +466,10 @@ DataProcessorSpec getTPCFactorizeIDCSpec(const int lane, const std::vector<uint3
{"dump-IDC1", VariantType::Bool, false, {"Dump IDC1 to file"}},
{"disable-IDCDelta", VariantType::Bool, false, {"Disable processing of IDCDelta and storage in the CCDB"}},
{"dump-IDCDelta", VariantType::Bool, false, {"Dump IDCDelta to file"}},
{"dump-IDCDelta-calib-data", VariantType::Bool, false, {"Dump IDCDelta as calibration data to file"}},
{"add-offset-for-CCDB-timestamp", VariantType::Bool, false, {"Add an offset of 1 hour for the validity range of the CCDB objects"}},
{"output-dir", VariantType::String, "none", {"calibration files output directory, must exist"}},
{"meta-output-dir", VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
{"update-not-grouping-parameter", VariantType::Bool, false, {"Do NOT Update/Writing grouping parameters to CCDB."}}}}; // end DataProcessorSpec
spec.rank = lane;
return spec;
Expand Down

0 comments on commit d869925

Please sign in to comment.