Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix interactions in RDF machinery with the DefinePerSample operation #13787

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion tree/dataframe/src/RLoopManager.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,6 @@ void RLoopManager::CleanUpNodes()

fCallbacks.clear();
fCallbacksOnce.clear();
fSampleCallbacks.clear();
}

/// Perform clean-up operations. To be called at the end of each task execution.
Expand Down
163 changes: 140 additions & 23 deletions tree/dataframe/test/dataframe_cloning.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ROOT/RDF/RDatasetSpec.hxx>
#include <ROOT/RDF/RInterface.hxx> // ChangeEmptyEntryRange, ChangeSpec
#include <ROOT/RDF/RResultMap.hxx> // CloneResultAndAction
#include <RtypesCore.h> // ULong64_t
#include <TSystem.h> // AccessPathName

#include <gtest/gtest.h>
Expand All @@ -28,6 +29,37 @@ void EXPECT_VEC_EQ(const std::vector<T> &v1, const std::vector<T> &v2)
}
}

struct InputFilesRAII {
std::vector<std::string> fFileNames;

InputFilesRAII(const std::string &treeName, const std::vector<std::string> &fileNames, ULong64_t entriesPerFile = 10,
const std::vector<ULong64_t> &beginEntryPerFile = {})
: fFileNames(fileNames)
{
auto realBeginEntries = beginEntryPerFile.empty() ? std::vector<ULong64_t>(fileNames.size()) : beginEntryPerFile;

for (std::size_t i = 0; i < fileNames.size(); i++) {
TFile f(fFileNames[i].c_str(), "recreate");
TTree t(treeName.c_str(), treeName.c_str());
// Always create a new TTree cluster every 10 entries
t.SetAutoFlush(10);
ULong64_t x;
t.Branch("x", &x);
for (ULong64_t j = realBeginEntries[i]; j < (realBeginEntries[i] + entriesPerFile); j++) {
x = j;
t.Fill();
}
t.Write();
}
}

~InputFilesRAII()
{
for (const auto &fileName : fFileNames)
gSystem->Unlink(fileName.c_str());
}
};

TEST(RDataFrameCloning, Count)
{
ROOT::RDataFrame df{100};
Expand Down Expand Up @@ -344,27 +376,27 @@ TEST(RDataFrameCloning, ChangeEmptyEntryRange)
TEST(RDataFrameCloning, ChangeSpec)
{
std::string treeName{"events"};
std::vector<std::string> fileNames{"dataframe_cloning_changespec_0.root", "dataframe_cloning_changespec_1.root",
"dataframe_cloning_changespec_2.root"};
std::size_t nFiles{3};
// Each file has 30 entries, starting from a different value
ULong64_t entriesPerFile{30};
std::vector<ULong64_t> beginEntryPerFile{0, 30, 60};
std::vector<std::string> fileNames(nFiles);
std::string prefix{"dataframe_cloning_changespec_"};
std::generate(fileNames.begin(), fileNames.end(), [n = 0, &prefix]() mutable {
auto name = prefix + std::to_string(n) + ".root";
n++;
return name;
});
InputFilesRAII files{treeName, fileNames, entriesPerFile, beginEntryPerFile};

// The dataset will have a total of 90 entries. We partition it in 6 different global ranges.
// Schema: one range per complete file, one range with a portion of a single file,
// two ranges that span more than one file.
std::vector<std::pair<Long64_t, Long64_t>> globalRanges{{0, 30}, {30, 60}, {60, 90}, {0, 20}, {20, 50}, {50, 90}};
{
ROOT::RDF::RSnapshotOptions opts;
opts.fAutoFlush = 10;

auto df = ROOT::RDataFrame(90).Define("x", [](ULong64_t e) { return e; }, {"rdfentry_"});
for (unsigned i = 0; i < 3; i++) {
// This makes sure that each output file has a different range of values for column x,
// according to the current global entry of the dataset.
ChangeEmptyEntryRange(df, std::pair<Long64_t, Long64_t>(globalRanges[i]));
df.Snapshot<ULong64_t>(treeName, fileNames[i], {"x"}, opts);
}
}

std::vector<ROOT::RDF::Experimental::RDatasetSpec> specs;
specs.reserve(6);
for (unsigned i = 0; i < 6; i++) {
specs.reserve(globalRanges.size());
for (unsigned i = 0; i < globalRanges.size(); i++) {
ROOT::RDF::Experimental::RDatasetSpec spec;
// Every spec represents a different portion of the global dataset
spec.AddSample({"", treeName, fileNames});
Expand All @@ -378,8 +410,8 @@ TEST(RDataFrameCloning, ChangeSpec)
// every partition and checking that they correspond to the values
// in the ranges defined by globalRanges.
std::vector<std::vector<ULong64_t>> expectedOutputs;
expectedOutputs.reserve(6);
for (unsigned i = 0; i < 6; i++) {
expectedOutputs.reserve(globalRanges.size());
for (unsigned i = 0; i < globalRanges.size(); i++) {
const auto &currentRange = globalRanges[i];
auto nValues{currentRange.second - currentRange.first};
std::vector<ULong64_t> takeValues(nValues);
Expand All @@ -393,15 +425,11 @@ TEST(RDataFrameCloning, ChangeSpec)
EXPECT_VEC_EQ(*take, expectedOutputs[0]);

// Other executions modify the internal spec
for (unsigned i = 1; i < 6; i++) {
for (unsigned i = 1; i < globalRanges.size(); i++) {
ChangeSpec(df, std::move(specs[i]));
auto clone = CloneResultAndAction(take);
EXPECT_VEC_EQ(*clone, expectedOutputs[i]);
}

for (const auto &name : fileNames) {
gSystem->Unlink(name.c_str());
}
}

ROOT::RDF::Experimental::RResultMap<TH1D> dataframe_cloning_vary_with_filters_analysis(ROOT::RDF::RNode df)
Expand Down Expand Up @@ -484,3 +512,92 @@ TEST(RDataFrameCloning, VaryWithFilters)
EXPECT_EQ(histo.GetEntries(), clone.GetEntries());
}
}

TEST(RDataFrameCloning, DefinePerSample)
{
std::string treeName{"events"};
std::size_t nFiles{3};
std::vector<std::string> fileNames(nFiles);
std::string prefix{"dataframe_cloning_definepersample_"};
std::generate(fileNames.begin(), fileNames.end(), [n = 0, &prefix]() mutable {
auto name = prefix + std::to_string(n) + ".root";
n++;
return name;
});
InputFilesRAII files{treeName, fileNames};
std::vector<double> weights(nFiles);
std::iota(weights.begin(), weights.end(), 1.);

// The first specification takes the first two files and reads them both from beginning to end.
// The second specification takes the second and third file, but only reads the third one.
// This simulates two tasks that might be created when logically splitting the input dataset.
std::vector<std::pair<Long64_t, Long64_t>> globalRanges{{0, 20}, {10, 20}};
std::vector<std::vector<std::string>> taskFileNames{
{"dataframe_cloning_definepersample_0.root", "dataframe_cloning_definepersample_1.root"},
{"dataframe_cloning_definepersample_1.root", "dataframe_cloning_definepersample_2.root"}};
std::vector<ROOT::RDF::Experimental::RDatasetSpec> specs;
specs.reserve(2);
for (unsigned i = 0; i < 2; i++) {
ROOT::RDF::Experimental::RDatasetSpec spec;
spec.AddSample({"", treeName, taskFileNames[i]});
spec.WithGlobalRange({globalRanges[i].first, globalRanges[i].second});
specs.push_back(spec);
}

// Launch first execution with dataset spec
ROOT::RDataFrame df{specs[0]};
auto dfWithCols =
df.DefinePerSample("sample_weight",
[&weights, &fileNames](unsigned int, const ROOT::RDF::RSampleInfo &id) {
if (id.Contains(fileNames[0])) {
return weights[0];
} else if (id.Contains(fileNames[1])) {
return weights[1];
} else if (id.Contains(fileNames[2])) {
return weights[2];
} else {
return -999.;
}
})
.DefinePerSample("sample_name", [](unsigned int, const ROOT::RDF::RSampleInfo &id) { return id.AsString(); });

// One filter per each different combination of weight and sample name
// Counting the entries passing each filter should return exactly the entries
// of the corresponding file, i.e. 10.
auto c0 = dfWithCols
.Filter(
[&weights, &fileNames, &treeName](double weight, const std::string &name) {
return weight == weights[0] && name == (fileNames[0] + "/" + treeName);
},
{"sample_weight", "sample_name"})
.Count();
auto c1 = dfWithCols
.Filter(
[&weights, &fileNames, &treeName](double weight, const std::string &name) {
return weight == weights[1] && name == (fileNames[1] + "/" + treeName);
},
{"sample_weight", "sample_name"})
.Count();
auto c2 = dfWithCols
.Filter(
[&weights, &fileNames, &treeName](double weight, const std::string &name) {
return weight == weights[2] && name == (fileNames[2] + "/" + treeName);
},
{"sample_weight", "sample_name"})
.Count();

std::vector<ULong64_t> expectedFirstTask{10, 10, 0};
EXPECT_EQ(*c0, expectedFirstTask[0]);
EXPECT_EQ(*c1, expectedFirstTask[1]);
EXPECT_EQ(*c2, expectedFirstTask[2]);

// Assign the other specification and clone actions
ChangeSpec(df, std::move(specs[1]));
auto c3 = CloneResultAndAction(c0);
auto c4 = CloneResultAndAction(c1);
auto c5 = CloneResultAndAction(c2);
std::vector<ULong64_t> expectedSecondTask{0, 0, 10};
EXPECT_EQ(*c3, expectedSecondTask[0]);
EXPECT_EQ(*c4, expectedSecondTask[1]);
EXPECT_EQ(*c5, expectedSecondTask[2]);
}
18 changes: 18 additions & 0 deletions tree/dataframe/test/dataframe_definepersample.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,24 @@ TEST(DefinePerSampleMore, GetDefinedColumnNames)
EXPECT_EQ(df.GetDefinedColumnNames(), std::vector<std::string>{"x"});
}

// Regression test for https://github.com/root-project/root/issues/12043
TEST(DefinePerSample, TwoExecutions)
{
bool flag = false;
auto df = ROOT::RDataFrame(1).DefinePerSample("x", [&flag](unsigned int, const ROOT::RDF::RSampleInfo &) {
flag = true;
return 0;
});
// Trigger the first execution of the event loop, the flag should be true.
df.Count().GetValue();
EXPECT_TRUE(flag);
// Reset the flag and trigger again, flag should be again set to true after
// the end of the second event loop.
flag = false;
df.Count().GetValue();
EXPECT_TRUE(flag);
}

/* TODO
// Not supported yet
TEST(DefinePerSample, DataSource)
Expand Down