Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong interaction of DefinePerSample with multiple executions #12043

Closed
vepadulano opened this issue Jan 17, 2023 · 1 comment · Fixed by #13787
Closed

Wrong interaction of DefinePerSample with multiple executions #12043

vepadulano opened this issue Jan 17, 2023 · 1 comment · Fixed by #13787
Assignees
Milestone

Comments

@vepadulano
Copy link
Member

vepadulano commented Jan 17, 2023

From this simple reproducer:

#include <TCanvas.h>
#include <TFile.h>
#include <TTree.h>

#include <ROOT/RDataFrame.hxx>
#include <iostream>
#include <string>
#include <vector>

void generateData(const char *filename, int n, double value)
{
   TFile f{filename, "RECREATE", "file for testing"};

   double var1;

   TTree tree{"AnalysisTree", "AnalysisTree"};
   tree.Branch("column1", &var1, "column1/D");

   for (unsigned int i = 0; i < n; i++) {
      var1 = value;
      tree.Fill();
   }

   tree.Write();
   f.Write();
   f.Close();
}

int main()
{
   std::vector<std::string> fileNames{"test1.root", "test2.root"};
   std::vector<double> weights{2, 0.5};

   // Create two files with trees for testing
   // 10 entries of 0.5
   generateData("test1.root", 10, 0.5);
   // 10 entries of 2
   generateData("test2.root", 10, 2);

   // Create Dataframe from files
   ROOT::RDataFrame df("AnalysisTree", fileNames);

   // Define weights depending on input file
   auto df2 =
      df.DefinePerSample("weightbysample", [&fileNames, &weights](unsigned int, const ROOT::RDF::RSampleInfo &id) {
         for (unsigned int i = 0; i < fileNames.size(); i++)
            if (id.Contains(fileNames[i]))
               return weights[i];
         return -1.;
      });

   auto s0 = df2.Sum<double>("weightbysample");
   auto d0 = df2.Display({"weightbysample"}, 20);
   std::cout << "sum of weights: " << *s0 << "\n";
   d0->Print();

   auto s1 = df2.Sum<double>("weightbysample");
   auto d1 = df2.Display({"weightbysample"}, 20);
   std::cout << "sum of weights: " << *s1 << "\n";
   d1->Print();

}

The DefinePerSample operation defines a column of 20 entries, 10 should have value 2, the following 10 should have value 0.5. The first set of Sum and Display operations show the correct behaviour, then the second set of operations reports a wrong result. All the 20 entries of the column are 0.5:

sum of weights: 25
+-----+----------------+
| Row | weightbysample | 
+-----+----------------+
| 0   | 2.0000000      | 
+-----+----------------+
| 1   | 2.0000000      | 
+-----+----------------+
| 2   | 2.0000000      | 
+-----+----------------+
| 3   | 2.0000000      | 
+-----+----------------+
| 4   | 2.0000000      | 
+-----+----------------+
| 5   | 2.0000000      | 
+-----+----------------+
| 6   | 2.0000000      | 
+-----+----------------+
| 7   | 2.0000000      | 
+-----+----------------+
| 8   | 2.0000000      | 
+-----+----------------+
| 9   | 2.0000000      | 
+-----+----------------+
| 10  | 0.50000000     | 
+-----+----------------+
| 11  | 0.50000000     | 
+-----+----------------+
| 12  | 0.50000000     | 
+-----+----------------+
| 13  | 0.50000000     | 
+-----+----------------+
| 14  | 0.50000000     | 
+-----+----------------+
| 15  | 0.50000000     | 
+-----+----------------+
| 16  | 0.50000000     | 
+-----+----------------+
| 17  | 0.50000000     | 
+-----+----------------+
| 18  | 0.50000000     | 
+-----+----------------+
| 19  | 0.50000000     | 
+-----+----------------+
sum of weights: 10
+-----+----------------+
| Row | weightbysample | 
+-----+----------------+
| 0   | 0.50000000     | 
+-----+----------------+
| 1   | 0.50000000     | 
+-----+----------------+
| 2   | 0.50000000     | 
+-----+----------------+
| 3   | 0.50000000     | 
+-----+----------------+
| 4   | 0.50000000     | 
+-----+----------------+
| 5   | 0.50000000     | 
+-----+----------------+
| 6   | 0.50000000     | 
+-----+----------------+
| 7   | 0.50000000     | 
+-----+----------------+
| 8   | 0.50000000     | 
+-----+----------------+
| 9   | 0.50000000     | 
+-----+----------------+
| 10  | 0.50000000     | 
+-----+----------------+
| 11  | 0.50000000     | 
+-----+----------------+
| 12  | 0.50000000     | 
+-----+----------------+
| 13  | 0.50000000     | 
+-----+----------------+
| 14  | 0.50000000     | 
+-----+----------------+
| 15  | 0.50000000     | 
+-----+----------------+
| 16  | 0.50000000     | 
+-----+----------------+
| 17  | 0.50000000     | 
+-----+----------------+
| 18  | 0.50000000     | 
+-----+----------------+
| 19  | 0.50000000     | 
+-----+----------------+
@vepadulano vepadulano added the bug label Jan 17, 2023
@vepadulano vepadulano changed the title Wrong interaction of DefinePerSample with multipl Wrong interaction of DefinePerSample with multiple executions Jan 17, 2023
@vepadulano vepadulano self-assigned this Jan 17, 2023
vepadulano added a commit to vepadulano/root that referenced this issue Oct 3, 2023
@vepadulano vepadulano added this to the 6.30/00 milestone Oct 3, 2023
@vepadulano
Copy link
Member Author

A surprisingly related problem appeared sporadically in Jenkins CI builds after the patch to avoid re-jitting distributed RDataFrame tasks. See for example https://lcgapp-services.cern.ch/root-jenkins/job/root-pullrequests-build/186294/testReport/projectroot.roottest.python.distrdf/dask/roottest_python_distrdf_dask_test_all/ . Here is a copy-paste of the failure for when the CI log will be deleted

**=================================== FAILURES ===================================
_______________ TestDefinePerSample.test_definepersample_simple ________________

self = <check_definepersample.TestDefinePerSample object at 0x139017700>
connection = <Client: 'tcp://127.0.0.1:58532' processes=2 threads=2, memory=4.00 GiB>

    def test_definepersample_simple(self, connection):
        """
        Test DefinePerSample operation on three samples using a predefined
        string of operations.
        """
    
        df = Dask.RDataFrame(self.maintreename, self.filenames, daskclient=connection)
    
        # Associate a number to each sample
        definepersample_code = """
        if(rdfsampleinfo_.Contains(\"{}\")) return 1;
        else if (rdfsampleinfo_.Contains(\"{}\")) return 2;
        else if (rdfsampleinfo_.Contains(\"{}\")) return 3;
        else return 0;
        """.format(*self.samples)
    
        df1 = df.DefinePerSample("sampleid", definepersample_code)
    
        # Filter by the sample number. Each filtered dataframe should contain
        # 10 entries, equal to the number of entries per sample
        samplescounts = [df1.Filter("sampleid == {}".format(id)).Count() for id in [1, 2, 3]]
    
        for count in samplescounts:
>           assert count.GetValue() == 10
E           AssertionError

../../../../../roottest/python/distrdf/dask/check_definepersample.py:62: AssertionError
---------------------------- Captured stderr setup -----------------------------
RDataFrame::Run: event loop was interrupted
2023-09-30 20:12:08,054 - distributed.worker - WARNING - Compute Failed
Key:       dask_mapper-2d1d1d8c-3a72-43e4-9753-d94b58f79b62
Function:  execute_task
args:      ((<function DaskBackend.dask_mapper at 0x13277bb80>, EmptySourceRange(exec_id=ExecutionIdentifier(rdf_uuid=UUID('3fb6f445-a73d-47db-9f12-af184ca535cd'), graph_uuid=UUID('3edfdf66-5f8c-428b-8862-6e21ac68f9b5')), id=0, start=0, end=50), (<class 'set'>, []), (<class 'set'>, []), functools.partial(<function distrdf_mapper at 0x12a9c4160>, build_rdf_from_range=<function EmptySourceHeadNode._generate_rdf_creator.<locals>.build_rdf_from_range at 0x134902e50>, computation_graph_callable=functools.partial(<function trigger_computation_graph at 0x12a9b6a60>, {0: <DistRDF.HeadNode.EmptySourceHeadNode object at 0x13490b1c0>, 1: <DistRDF.Node.Node object at 0x13490b220>, 2: <DistRDF.Node.Node object at 0x13490b340>, 3: <DistRDF.Node.Node object at 0x13490b430>}), initialization_fn=functools.partial(<function TestInitialization.test_initialization_method.<locals>.init at 0x134902ee0>, 123))))
kwargs:    {}
Exception: "RuntimeError('C++ exception thrown:\\n\\truntime_error: Graph was applied to a mix of scalar values and collections. This is not supported.')

That type of failures in fact boils down to this same issue. It can happen that two tasks get assigned to the same Dask worker process in the CI. The first task runs normally the DefinePerSample operation (after having jitted the computation graph). The second task clones the actions of the first in order to re-use the already jitted nodes. This also means that the RDefinePerSample object that was created during the first task will be reused. But the corresponding callback has been deleted at the end of the first event loop by RLoopManager at

fSampleCallbacks.clear();
. The linked PR fixes both problems.

vepadulano added a commit to vepadulano/root that referenced this issue Oct 3, 2023
vepadulano added a commit to vepadulano/root that referenced this issue Oct 3, 2023
vepadulano added a commit to vepadulano/root that referenced this issue Oct 3, 2023
vepadulano added a commit to vepadulano/root that referenced this issue Oct 3, 2023
vepadulano added a commit that referenced this issue Oct 3, 2023
vepadulano added a commit to vepadulano/root that referenced this issue Oct 3, 2023
vepadulano added a commit to vepadulano/root that referenced this issue Oct 3, 2023
vepadulano added a commit that referenced this issue Oct 3, 2023
vepadulano added a commit that referenced this issue Oct 3, 2023
maksgraczyk pushed a commit to maksgraczyk/root that referenced this issue Jan 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants