Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IDCs: Add option to write IDCDelta to calibration file #10260

Merged
merged 3 commits into from
Nov 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Detectors/TPC/calibration/src/IDCAverageGroup.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void o2::tpc::IDCAverageGroup<o2::tpc::IDCAverageGroupTPC>::processIDCs(const Ca
const int cruEnd = (mSide == Side::A) ? CRU::MaxCRU / 2 : CRU::MaxCRU;

#pragma omp parallel for num_threads(sNThreads)
for (unsigned int i = 0; i < cruEnd; ++i) {
for (unsigned int i = cruStart; i < cruEnd; ++i) {
const unsigned int threadNum = omp_get_thread_num();
const CRU cru(i);
idcStruct[threadNum].setCRU(cru);
Expand Down
115 changes: 88 additions & 27 deletions Detectors/TPC/workflow/include/TPCWorkflow/TPCFactorizeIDCSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

#include <vector>
#include <fmt/format.h>
#include <filesystem>
#include "Framework/Task.h"
#include "Framework/ControlService.h"
#include "Framework/Logger.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DataTakingContext.h"
#include "DetectorsCommonDataFormats/FileMetaData.h"
#include "Headers/DataHeader.h"
#include "TPCCalibration/IDCFactorization.h"
#include "TPCCalibration/IDCAverageGroup.h"
Expand Down Expand Up @@ -59,9 +62,19 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
mDumpIDC0 = ic.options().get<bool>("dump-IDC0");
mDumpIDC1 = ic.options().get<bool>("dump-IDC1");
mDumpIDCDelta = ic.options().get<bool>("dump-IDCDelta");
mDumpIDCDeltaCalibData = ic.options().get<bool>("dump-IDCDelta-calib-data");
mDumpIDCs = ic.options().get<bool>("dump-IDCs");
mOffsetCCDB = ic.options().get<bool>("add-offset-for-CCDB-timestamp");
mDisableIDCDelta = ic.options().get<bool>("disable-IDCDelta");
mCalibFileDir = ic.options().get<std::string>("output-dir");
if (mCalibFileDir != "/dev/null") {
mCalibFileDir = o2::utils::Str::rectifyDirectory(mCalibFileDir);
}
mMetaFileDir = ic.options().get<std::string>("meta-output-dir");
if (mMetaFileDir != "/dev/null") {
mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir);
}

const std::string refGainMapFile = ic.options().get<std::string>("gainMapFile");
if (!refGainMapFile.empty()) {
LOGP(info, "Loading GainMap from file {}", refGainMapFile);
Expand Down Expand Up @@ -89,6 +102,12 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
LOGP(warning, "firstTF not Found!!! Found valid inputs {}. Setting {} as first TF", pc.inputs().countValidInputs(), mTFFirst);
}

// set data taking context only once
if (mSetDataTakingCont) {
mDataTakingContext = pc.services().get<DataTakingContext>();
mSetDataTakingCont = false;
}

const long relTF = (mTFFirst == -1) ? 0 : (currTF - mTFFirst) / mNTFsBuffer;

// loop over input data
Expand Down Expand Up @@ -164,29 +183,34 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
static constexpr header::DataDescription getDataDescriptionCCDBIDCPadFlag() { return header::DataDescription{"TPC_CalibFlags"}; }

private:
const std::vector<uint32_t> mCRUs{}; ///< CRUs to process in this instance
unsigned int mProcessedCRUs{}; ///< number of processed CRUs to keep track of when the writing to CCDB etc. will be done
IDCFactorization mIDCFactorization; ///< object aggregating the IDCs and performing the factorization of the IDCs
IDCAverageGroup<IDCAverageGroupTPC> mIDCGrouping; ///< object for averaging and grouping of the IDCs
const IDCDeltaCompression mCompressionDeltaIDC{}; ///< compression type for IDC Delta
const bool mUsePrecisetimeStamp{true}; ///< use precise time stamp when writing to CCDB
const bool mSendOutFFT{false}; ///< flag if the output will be send for the FFT
const bool mSendOutCCDB{false}; ///< sending the outputs for ccdb populator
long mTFFirst{-1}; ///< first TF of current aggregation interval
bool mUpdateGroupingPar{true}; ///< flag to set if grouping parameters should be updated or not
const int mLaneId{0}; ///< the id of the current process within the parallel pipeline
std::vector<Side> mSides{}; ///< processed TPC sides
const int mNTFsBuffer{1}; ///< number of TFs for which the IDCs will be buffered
std::unique_ptr<CalDet<PadFlags>> mPadFlagsMap; ///< status flag for each pad (i.e. if the pad is dead). This map is buffered to check if something changed, when a new map is created
int mNOrbitsIDC{12}; ///< Number of orbits over which the IDCs are integrated.
bool mDumpIDC0{false}; ///< Dump IDC0 to file
bool mDumpIDC1{false}; ///< Dump IDC1 to file
bool mDumpIDCDelta{false}; ///< Dump IDCDelta to file
bool mDumpIDCs{false}; ///< dump IDCs to file
bool mOffsetCCDB{false}; ///< flag for setting and offset for CCDB timestamp
bool mDisableIDCDelta{false}; ///< disable the processing and storage of IDCDelta
dataformats::Pair<long, int> mTFInfo{}; ///< orbit reset time for CCDB time stamp writing
bool mEnableWritingPadStatusMap{false}; ///< do not store the pad status map in the CCDB
const std::vector<uint32_t> mCRUs{}; ///< CRUs to process in this instance
unsigned int mProcessedCRUs{}; ///< number of processed CRUs to keep track of when the writing to CCDB etc. will be done
std::string mMetaFileDir{};
std::string mCalibFileDir{};
IDCFactorization mIDCFactorization; ///< object aggregating the IDCs and performing the factorization of the IDCs
IDCAverageGroup<IDCAverageGroupTPC> mIDCGrouping; ///< object for averaging and grouping of the IDCs
const IDCDeltaCompression mCompressionDeltaIDC{}; ///< compression type for IDC Delta
const bool mUsePrecisetimeStamp{true}; ///< use precise time stamp when writing to CCDB
const bool mSendOutFFT{false}; ///< flag if the output will be send for the FFT
const bool mSendOutCCDB{false}; ///< sending the outputs for ccdb populator
long mTFFirst{-1}; ///< first TF of current aggregation interval
bool mUpdateGroupingPar{true}; ///< flag to set if grouping parameters should be updated or not
const int mLaneId{0}; ///< the id of the current process within the parallel pipeline
std::vector<Side> mSides{}; ///< processed TPC sides
const int mNTFsBuffer{1}; ///< number of TFs for which the IDCs will be buffered
std::unique_ptr<CalDet<PadFlags>> mPadFlagsMap; ///< status flag for each pad (i.e. if the pad is dead). This map is buffered to check if something changed, when a new map is created
int mNOrbitsIDC{12}; ///< Number of orbits over which the IDCs are integrated.
bool mDumpIDC0{false}; ///< Dump IDC0 to file
bool mDumpIDC1{false}; ///< Dump IDC1 to file
bool mDumpIDCDelta{false}; ///< Dump IDCDelta to file
bool mDumpIDCDeltaCalibData{false}; ///< dump the IDC Delta as a calibration file
bool mDumpIDCs{false}; ///< dump IDCs to file
bool mOffsetCCDB{false}; ///< flag for setting and offset for CCDB timestamp
bool mDisableIDCDelta{false}; ///< disable the processing and storage of IDCDelta
dataformats::Pair<long, int> mTFInfo{}; ///< orbit reset time for CCDB time stamp writing
bool mEnableWritingPadStatusMap{false}; ///< do not store the pad status map in the CCDB
o2::framework::DataTakingContext mDataTakingContext{};
bool mSetDataTakingCont{true};
const std::vector<InputSpec> mFilter = {{"idcagg", ConcreteDataTypeMatcher{gDataOriginTPC, TPCDistributeIDCSpec::getDataDescriptionIDC(mLaneId)}, Lifetime::Sporadic}}; ///< filter for looping over input data

void sendOutput(DataAllocator& output, const long timeStampStart)
Expand Down Expand Up @@ -286,7 +310,7 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
totalTime += time.count();
}

if (!mDisableIDCDelta) {
if (!mDisableIDCDelta || mDumpIDCDeltaCalibData) {
start = timer::now();
for (unsigned int iChunk = 0; iChunk < mIDCFactorization.getNChunks(side); ++iChunk) {
auto startGrouping = timer::now();
Expand Down Expand Up @@ -325,9 +349,43 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
imageIDCDelta = o2::ccdb::CcdbApi::createObjectImage(&idcDelta, &ccdbInfoIDCDelta);
break;
}
LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", ccdbInfoIDCDelta.getPath(), ccdbInfoIDCDelta.getFileName(), imageIDCDelta->size(), ccdbInfoIDCDelta.getStartValidityTimestamp(), ccdbInfoIDCDelta.getEndValidityTimestamp());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBIDCDelta(), iChunk}, *imageIDCDelta.get());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBIDCDelta(), iChunk}, ccdbInfoIDCDelta);

if (!mDisableIDCDelta) {
LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", ccdbInfoIDCDelta.getPath(), ccdbInfoIDCDelta.getFileName(), imageIDCDelta->size(), ccdbInfoIDCDelta.getStartValidityTimestamp(), ccdbInfoIDCDelta.getEndValidityTimestamp());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBIDCDelta(), iChunk}, *imageIDCDelta.get());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBIDCDelta(), iChunk}, ccdbInfoIDCDelta);
}

if (mDumpIDCDeltaCalibData && mCalibFileDir != "/dev/null") {
const std::string sideStr = sideA ? "A" : "C";
std::string calibFName = fmt::format("IDCDelta_side{}_cal_data_{}.root", sideStr, ccdbInfoIDCDelta.getStartValidityTimestamp());
try {
std::ofstream calFile(fmt::format("{}{}", mCalibFileDir, calibFName), std::ios::out | std::ios::binary);
calFile.write(imageIDCDelta->data(), imageIDCDelta->size());
calFile.close();
} catch (std::exception const& e) {
LOG(error) << "Failed to store IDC calibration data file " << calibFName << ", reason: " << e.what();
}

if (mMetaFileDir != "/dev/null") {
o2::dataformats::FileMetaData calMetaData;
calMetaData.fillFileData(calibFName);
calMetaData.setDataTakingContext(mDataTakingContext);
calMetaData.type = "calib";
calMetaData.priority = "low";
auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, calibFName);
auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, calibFName);
try {
std::ofstream metaFileOut(metaFileNameTmp);
metaFileOut << calMetaData;
metaFileOut.close();
std::filesystem::rename(metaFileNameTmp, metaFileName);
} catch (std::exception const& e) {
LOG(error) << "Failed to store CTF meta data file " << metaFileName << ", reason: " << e.what();
}
}
}

auto stopCCDBIDCDelta = timer::now();
time = stopCCDBIDCDelta - startCCDBIDCDelta;
LOGP(info, "Compression and CCDB object creation time: {}", time.count());
Expand Down Expand Up @@ -408,7 +466,10 @@ DataProcessorSpec getTPCFactorizeIDCSpec(const int lane, const std::vector<uint3
{"dump-IDC1", VariantType::Bool, false, {"Dump IDC1 to file"}},
{"disable-IDCDelta", VariantType::Bool, false, {"Disable processing of IDCDelta and storage in the CCDB"}},
{"dump-IDCDelta", VariantType::Bool, false, {"Dump IDCDelta to file"}},
{"dump-IDCDelta-calib-data", VariantType::Bool, false, {"Dump IDCDelta as calibration data to file"}},
{"add-offset-for-CCDB-timestamp", VariantType::Bool, false, {"Add an offset of 1 hour for the validity range of the CCDB objects"}},
{"output-dir", VariantType::String, "none", {"calibration files output directory, must exist"}},
{"meta-output-dir", VariantType::String, "/dev/null", {"calibration metadata output directory, must exist (if not /dev/null)"}},
{"update-not-grouping-parameter", VariantType::Bool, false, {"Do NOT Update/Writing grouping parameters to CCDB."}}}}; // end DataProcessorSpec
spec.rank = lane;
return spec;
Expand Down