-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix interactions in RDF machinery with the DefinePerSample operation #13787
Fix interactions in RDF machinery with the DefinePerSample operation #13787
Conversation
Sample callbacks can be registered by an RAction or an RDefinePerSample instance. In both cases, the lifetime of the callback is tied to the lifetime of the object itself. Avoid eager clearing of the callbacks so to not interfer with the normal functioning.
Starting build on |
As a completely anecdotical evidence, I have been running on the
|
Build failed on ROOT-ubuntu2204/nortcxxmod. Warnings:
|
Build failed on windows10/default. Errors:
|
20ddc0b
to
4314008
Compare
Starting build on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for this change, which is minimal but results from a series of long debugging sessions! I added 2 minimal comments. Besides those, LGTM.
4314008
to
5f33910
Compare
Starting build on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vepadulano !
My only comment is that the tests seem to be too complicated compared to the root cause of the failure. The regression test for #12043, for example, could be simply:
bool flag = false;
auto df = ROOT::RDataFrame(1)
.DefinePerSample("x", [&](unsigned int, const ROOT::RDF::RSampleInfo &) { flag = true; return 0; });
df.Count().GetValue();
EXPECT_TRUE(flag);
flag = false;
df.Count().GetValue();
EXPECT_TRUE(flag);
Simpler tests that go straight to the point are easier to debug when they break.
That's a good point yes. I will modify the reproducer for the linked issue. I prefer to keep the reproducer of the cloning issue because it's also mimicking the extra machinery involved in creating different tasks, changing the RDF spec and cloning the actions in a specific way |
This is a reproducer test for some sporadic CI failures, e.g. ```python ========================================================================== FAILURES =========================================================================== _______________________________________________________ TestDefinePerSample.test_definepersample_simple _______________________________________________________ self = <check_definepersample.TestDefinePerSample object at 0x13e0c6190>, connection = <Client: 'tcp://127.0.0.1:55253' processes=2 threads=2, memory=4.00 GiB> def test_definepersample_simple(self, connection): """ Test DefinePerSample operation on three samples using a predefined string of operations. """ df = Dask.RDataFrame(self.maintreename, self.filenames, daskclient=connection) # Associate a number to each sample definepersample_code = """ if(rdfsampleinfo_.Contains(\"{}\")) return 1; else if (rdfsampleinfo_.Contains(\"{}\")) return 2; else if (rdfsampleinfo_.Contains(\"{}\")) return 3; else return 0; """.format(*self.samples) df1 = df.DefinePerSample("sampleid", definepersample_code) # Filter by the sample number. Each filtered dataframe should contain # 10 entries, equal to the number of entries per sample samplescounts = [df1.Filter("sampleid == {}".format(id)).Count() for id in [1, 2, 3]] for count in samplescounts: > assert count.GetValue() == 10 E AssertionError check_definepersample.py:62: AssertionError -------------------------------------------------------------------- Captured stderr setup -------------------------------------------------------------------- RDataFrame::Run: event loop was interrupted 2023-09-08 14:51:57,002 - distributed.worker - WARNING - Compute Failed Key: dask_mapper-a92ac090-9407-4849-921a-d187ceffd3ed Function: dask_mapper args: (EmptySourceRange(exec_id=ExecutionIdentifier(rdf_uuid=UUID('5d67c0a7-58f4-488d-8e44-bb5aa0fac480'), graph_uuid=UUID('69353465-0a90-4eef-b101-a1eb93f0c13a')), id=0, start=0, end=50)) kwargs: {} Exception: "RuntimeError('C++ exception thrown:\\n\\truntime_error: Graph was applied to a mix of scalar values and collections. This is not supported.')" ``` Which is due to Dask assigning two tasks to the same worker for the test with the DefinePeSample calls. The Count operation would fail to report the correct amount of entries due to the fact that the DefinePerSample callback was previously deleted at the end of every event loop, specifically at the end of the first task's event loop. Consequently, when the second task starts and it picks up the same RDataFrame to clone the action, the DefinePerSample would never be actually called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot Vincenzo! For me it's ready to be merged.
5f33910
to
1748a80
Compare
Starting build on |
This PR fixes #12043 . It should also address sporadic failures seen in our jenkins CI due to sometimes Dask assigning two tasks to the same worker process, thus the second task using the same DefinePerSample node of the first task and collapsing into the same situation as the linked issue.
The first commit contains the actual fix, then tests.