Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL Analysis: add possibility to set maximum size of output files #10232

Merged
merged 1 commit into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Framework/Core/include/Framework/DataOutputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,20 @@ struct DataOutputDirector {
// get the matching TFile
FileAndFolder getFileFolder(DataOutputDescriptor* dodesc, uint64_t folderNumber, std::string parentFileName);

// check file sizes
bool checkFileSizes();
// close all files
void closeDataFiles();

// setters
void setResultDir(std::string resDir);
void setFilenameBase(std::string dfn);
void setMaximumFileSize(float maxfs);

void printOut();

private:
std::string mresultDirectory{"."};
std::string mfilenameBase;
std::string* const mfilenameBasePtr = &mfilenameBase;
std::vector<DataOutputDescriptor*> mDataOutputDescriptors;
Expand All @@ -105,6 +112,8 @@ struct DataOutputDirector {
std::vector<TFile*> mfilePtrs;
std::vector<TMap*> mParentMaps;
bool mdebugmode = false;
int mfileCounter = 1;
float mmaxfilesize = -1.;
int mnumberTimeFramesToMerge = 1;
std::string mfileMode = "RECREATE";

Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/src/CommonDataProcessors.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ DataProcessorSpec
tfFilenames.insert(std::pair<uint64_t, std::string>(startTime, aodInputFile));
}

// close all output files if one has reached size limit
dod->checkFileSizes();

// loop over the DataRefs which are contained in pc.inputs()
for (const auto& ref : pc.inputs()) {
if (!ref.spec) {
Expand Down
100 changes: 99 additions & 1 deletion Framework/Core/src/DataOutputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
#include "Framework/DataOutputDirector.h"
#include "Framework/Logger.h"

#include <filesystem>

#include "rapidjson/document.h"
#include "rapidjson/prettywriter.h"
#include "rapidjson/filereadstream.h"

#include "TMap.h"
#include "TObjString.h"

namespace fs = std::filesystem;

namespace o2
{
namespace framework
Expand Down Expand Up @@ -150,6 +154,7 @@ void DataOutputDirector::reset()
closeDataFiles();
mfilePtrs.clear();
mfilenameBase = std::string("");
mfileCounter = 1;
};

void DataOutputDirector::readString(std::string const& keepString)
Expand Down Expand Up @@ -260,8 +265,10 @@ std::tuple<std::string, std::string, int> DataOutputDirector::readJsonDocument(D
const char* itemName;

// initialisations
std::string resdir(".");
std::string dfn("");
std::string fmode("");
float maxfs = -1.;
int ntfm = -1;

// is it a proper json document?
Expand Down Expand Up @@ -298,6 +305,17 @@ std::tuple<std::string, std::string, int> DataOutputDirector::readJsonDocument(D
dodirItem.Accept(writer);
}

itemName = "resdir";
if (dodirItem.HasMember(itemName)) {
if (dodirItem[itemName].IsString()) {
resdir = dodirItem[itemName].GetString();
setResultDir(resdir);
} else {
LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
return memptyanswer;
}
}

itemName = "resfile";
if (dodirItem.HasMember(itemName)) {
if (dodirItem[itemName].IsString()) {
Expand All @@ -320,6 +338,17 @@ std::tuple<std::string, std::string, int> DataOutputDirector::readJsonDocument(D
}
}

itemName = "maxfilesize";
if (dodirItem.HasMember(itemName)) {
if (dodirItem[itemName].IsNumber()) {
maxfs = dodirItem[itemName].GetFloat();
setMaximumFileSize(maxfs);
} else {
LOGP(error, "Check the JSON document! Item \"{}\" must be a number!", itemName);
return memptyanswer;
}
}

itemName = "ntfmerge";
if (dodirItem.HasMember(itemName)) {
if (dodirItem[itemName].IsNumber()) {
Expand Down Expand Up @@ -444,9 +473,30 @@ FileAndFolder DataOutputDirector::getFileFolder(DataOutputDescriptor* dodesc, ui
auto it = std::find(mfilenameBases.begin(), mfilenameBases.end(), dodesc->getFilenameBase());
if (it != mfilenameBases.end()) {
int ind = std::distance(mfilenameBases.begin(), it);

// open new output file
if (!mfilePtrs[ind]->IsOpen()) {
auto fn = mfilenameBases[ind] + ".root";
// output directory
auto resdirname = mresultDirectory;
// is the maximum-file-size check enabled?
if (mmaxfilesize > 0.) {
// subdirectory ./xxx
char chcnt[4];
std::snprintf(chcnt, sizeof(chcnt), "%03d", mfileCounter);
resdirname += "/" + std::string(chcnt);
}
auto resdir = fs::path{resdirname.c_str()};

if (!fs::is_directory(resdir)) {
if (!fs::create_directories(resdir)) {
LOGF(fatal, "Could not create output directory %s", resdirname.c_str());
}
}

// complete file name
auto fn = resdirname + "/" + mfilenameBases[ind] + ".root";
delete mfilePtrs[ind];
mParentMaps[ind]->Clear();
mfilePtrs[ind] = TFile::Open(fn.c_str(), mfileMode.c_str(), "", 501);
}
fileAndFolder.file = mfilePtrs[ind];
Expand All @@ -467,6 +517,44 @@ FileAndFolder DataOutputDirector::getFileFolder(DataOutputDescriptor* dodesc, ui
return fileAndFolder;
}

bool DataOutputDirector::checkFileSizes()
{
// is the maximum-file-size check enabled?
if (mmaxfilesize <= 0.) {
return true;
}

// current result directory
char chcnt[4];
std::snprintf(chcnt, sizeof(chcnt), "%03d", mfileCounter);
std::string strcnt{chcnt};
auto resdirname = mresultDirectory + "/" + strcnt;

// loop over all files
// if one file is large, then all files need to be closed
for (int i = 0; i < mfilenameBases.size(); i++) {
if (!mfilePtrs[i]) {
continue;
}
// size of fn
auto fn = resdirname + "/" + mfilenameBases[i] + ".root";
auto resfile = fs::path{fn.c_str()};
if (!fs::exists(resfile)) {
continue;
}
auto fsize = (float)fs::file_size(resfile) / 1.E6; // MBytes
LOGF(debug, "File %s: %f MBytes", fn.c_str(), fsize);
if (fsize >= mmaxfilesize) {
closeDataFiles();
// increment the subdirectory counter
mfileCounter++;
return false;
}
}

return true;
}

void DataOutputDirector::closeDataFiles()
{
for (int i = 0; i < mfilePtrs.size(); i++) {
Expand Down Expand Up @@ -498,6 +586,11 @@ void DataOutputDirector::printOut()
}
}

void DataOutputDirector::setResultDir(std::string resDir)
{
mresultDirectory = resDir;
}

void DataOutputDirector::setFilenameBase(std::string dfn)
{
// reset
Expand Down Expand Up @@ -534,5 +627,10 @@ void DataOutputDirector::setFilenameBase(std::string dfn)
}
}

void DataOutputDirector::setMaximumFileSize(float maxfs)
{
mmaxfilesize = maxfs;
}

} // namespace framework
} // namespace o2