-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use TTaskGroup interface to unzip baskets in parallel. #1010
Conversation
Can one of the admins verify this patch? |
Hi @zzxuanyuan , a lot of work. Thanks for exploring these aspects. I would have two comments:
|
Hi @dpiparo ,
|
Hi @zzxuanyuan , if this new setup reveals good enough to get back that 3% we can even consider having an execution policy for Async to allow the user to hit the runtime, and therefore the workers pool, or spawn a new thread. |
@dpiparo , we could start some simple APIs and I hope we could validate that ~3% does be caused by TTaskGroup.Run. |
@zzxuanyuan , sure. Before diving in the API upgrade, let's be sure the 3% is gone and start with the thread implementation of the parallel decompression and its tests. Upgrading your solid work will be then straightforward! |
Hi @zzxuanyuan , I think we discussed this already, but I 'd like to go through this again. You need TTaskGroup to have something asynchronous. The real work is done by a "parallel for" incarnated in an invocation of the TThreadExecutor. |
Hi @dpiparo , We want to group small baskets together so that a task can work on enough amount of work. Before TThreadExecutor starts decompressing baskets, our original plan (without async APIs) is that the main thread needs to iterate all baskets and assign a task with a group of baskets where their accumulated size is beyond 100KB (For a large basket >100 KB, we just assign that single basket to a task). Iterating all baskets and assigning them to tasks cannot be parallelized. It will block the main thread and it could be harmful if there are lots of baskets to decompress. Therefore, we decided to hide this sequential processing to some background thread and then we came up with idea that adopt async function calls to our code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A number of items to fix - some small (code formatting, header organization), some large (use TTaskGroup
throughout instead of parallel_for
).
See review comments.
tree/tree/inc/TTreeCacheUnzip.h
Outdated
Int_t fBlocksToGo; | ||
|
||
// Unzipping related members | ||
Int_t *fUnzipLen; ///<! [fNseek] Length of the unzipped buffers | ||
char **fUnzipChunks; ///<! [fNseek] Individual unzipped chunks. Their summed size is kept under control. | ||
Byte_t *fUnzipStatus; ///<! [fNSeek] For each blk, tells us if it's unzipped or pending | ||
Long64_t fTotalUnzipBytes; ///<! The total sum of the currently unzipped blks | ||
std::atomic<Byte_t> *fUnzipStatus; ///<! [fNSeek] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix alignment as best you can.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
#include "TEnv.h" | ||
|
||
#define THREADCNT 2 | ||
#include "ROOT/TThreadExecutor.hxx" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Order headers alphabetically, group by generality.
Put #include "ROOT/TThreadExecutor.hxx"
into the group of ROOT headers.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
@@ -122,14 +118,9 @@ TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tre | |||
|
|||
void TTreeCacheUnzip::Init() | |||
{ | |||
fMutexList = new TMutex(kTRUE); | |||
fUnzipTaskGroup = nullptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please switch to std::unique_ptr
instead.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
} | ||
// Prepare a static tmp buf of adequate size | ||
if(locbuffsz < rdlen) { | ||
if (locbuff) delete [] locbuff; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of the manual memory management here; use std::vector
instead.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
if (locbuff) delete [] locbuff; | ||
locbuffsz = rdlen; | ||
locbuff = new char[locbuffsz]; | ||
//memset(locbuff, 0, locbuffsz); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented-out code.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
UnzipCache(reqi, locbuffsz, locbuff); | ||
} | ||
} else { | ||
usleep(200000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave a TODO
to implement a task-stealing scheme instead of sleeping for a random amount of time.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
} | ||
|
||
} // scope of the lock! | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove dead code block.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
delete [] fCompBuffer; | ||
fCompBuffer = new char[len*2]; | ||
fCompBufferSize = len*2; | ||
delete [] fCompBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of manual memory management (or at least use std::unique_ptr
).
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove trailing whitespace.
accusz = 0; | ||
} | ||
ROOT::TThreadExecutor pool; | ||
pool.Foreach(unzipFunction, basketIndices); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the source of your speed issues.
Using TThreadExecutor
causes you to have to pre-create all the tasks; the first one isn't executed until all are created. Use the TTaskGroup
object and it will schedule the tasks as they are created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed earlier, the performance was decremented by another ~3% if I use TTaskGroup.Run here.
Since I still keep outer TTaskGroup, so there is still ~3% performance drop, but replacing inner TTaskGroup by TThreadExecutor mitigate the slowdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok - per our discussion at the ROOT IO meeting, let's leave this for now and just focus on the code style cleanups.
@zzxuanyuan - is it possible to get this PR updated / revised in the next day or two? I'll be at FNAL on Wednesday and would like to discuss this one with @pcanal. |
7715825
to
1f1a841
Compare
@bbockelm I fix several issues addressed in your comments. Could you look at it now? |
Possible alternative to sleep, at least on Linux. http://man7.org/linux/man-pages/man2/sched_yield.2.html |
Hi, I think the sleep could be replaced by a condition variable? The stl also provide an implementation: http://en.cppreference.com/w/cpp/thread/condition_variable |
@dpiparo - we discussed the idea of a condition variable, but I'm wary. The condition variable approach causes a per-task overhead that is always paid (as notify would have to be done from the tasks). This per-task overhead is one of the things that make the pthreads implementation problematic. However, this particular case is a fairly obscure corner case: this code is only triggered when there is exactly one remaining basket to unzip, the main thread needs it, and one of the TBB threads is currently working on it. Talking to Zhe, I think the best way to go is a busy-loop (with |
e802259
to
349cb8d
Compare
The updates address some issues for random read case and the code should be good now. Some updates after last Friday meeting: As we discussed last Friday, random read performance is very slow. It technically cannot be improved if we decide to use cache. I also tried random read workload with pthread. The performance was the same with tbb. I think the reason is obvious that reading next random event will invalidate current cache and all baskets need to be reset and cache buffer has to be filled by next cluster of events. Based on current cache replacement policy, the slow performance is reasonable. Philippe pointed out the common use case for ROOT should be mostly sequential reads plus little random reads. I was thinking it would be not helpful if we store decompressed baskets by main thread (when cache miss happens) back to cache. Because for sequential read, they will not be accessed again, neither random reads since the cache will be invalidate and all decompressed baskets in cache should be marked as invalid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pcanal - I think it's now ready for your review!
Hi Danilo, Based on current cache replacement policy, the cache will be invalidated (set fIsTransferred to kFALSE) immediately once the first event miss occurs. In my current implementation, each task monitors fIsTransferred and return immediately without doing actual unzipping work. But we still need to create tasks corresponding to the number of baskets. I am wondering if we should add task_group.cancel() function into TTaskGroup interface? In that case, the main thread only needs to cancel all tasks once the cache is invalid. With event simulation benchmark, I did not see too much difference between task_group wait and cancel. But I guess it could be more efficient once the number of baskets in cache buffer becomes larger. |
if (fUnzipChunks) delete [] fUnzipChunks; | ||
if (fUnzipStatus) delete [] fUnzipStatus; | ||
} | ||
void Clear(Int_t size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sort the member alphabetically (at least within grouping of functionality)
tree/tree/src/TTree.cxx
Outdated
pf = new TTreeCacheUnzip(this, cacheSize); | ||
#else | ||
pf = new TTreeCache(this, cacheSize); | ||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to write it as:
#ifdef R__USE_IMT
if(TTreeCacheUnzip::IsParallelUnzip() && file->GetCompressionLevel() > 0)
pf = new TTreeCacheUnzip(this, cacheSize);
else
#endif
pf = new TTreeCache(this, cacheSize);
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
@@ -41,21 +41,23 @@ TTreeCache::SetUnzipBufferSize(Long64_t bufferSize) | |||
where bufferSize must be passed in bytes. | |||
*/ | |||
|
|||
#include "Bytes.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be with the other header (after TTreeCacheUnzip.h)?
If it is needed inside TTreeCacheUnzip.h then it should be include there (because each header should be standalone).
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
//////////////////////////////////////////////////////////////////////////////// | ||
/// Reset all baskets' state arrays. | ||
|
||
void TTreeCacheUnzip::UnzipState::Reset(Int_t oldSize, Int_t newSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since fUnzipStatus point to atomic, they are presumably accessed from multiple thread. Can 'Reset' be called when there is a potential access from other threads? If so, was the ordering of the operation here checked for thread safety. If so, can you add documentation/explanation. If Reset does not need to be thread, please note in comment why this is the case.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
|
||
//////////////////////////////////////////////////////////////////////////////// | ||
|
||
Bool_t TTreeCacheUnzip::UnzipState::IsUntouched(Int_t index) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark this function (and all function to which it applies) as const.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
// Triggered by the user, not the learning phase | ||
if (entry == -1) entry=0; | ||
|
||
TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following code feels copy/pasted (and likely adapted from somewhere else. If it is following the pattern of another routine and there is no easy way to factor them out, then please write down both here and in the original that the two are similar and note here what are the differences. [I.e. at least something like. Inspired by XYZ::GetABC, adding calls to Some::Thing() inside the 2nd nested loops [or whatever is accurate :) ]]
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
return 1; | ||
} | ||
|
||
// Prepare a static tmp buf of adequate size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment probably needs updating (it mentions static but nothing seems (gladfully) static).
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
Int_t loclen = UnzipBuffer(&ptr, locbuff); | ||
if ((loclen > 0) && (loclen == objlen+keylen)) { | ||
if ( (myCycle != fCycle) || !fIsTransferred) { | ||
fUnzipState.SetFinished(index); // Set it as not done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment and the function (name) seems to disagree on the semantic ...
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
std::vector<std::vector<Int_t>> basketIndices; | ||
std::vector<Int_t> indices; | ||
for (Int_t i = 0; i < fNseek; i++) { | ||
while (accusz < 102400) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the semantic of 102400? Why was the particular value picked? Could the user ever want to customize this value?
Either way, please use a constexpr
to record and name this value.
tree/tree/src/TTreeCacheUnzip.cxx
Outdated
@@ -958,7 +872,7 @@ Int_t TTreeCacheUnzip::UnzipBuffer(char **dest, char *src) | |||
/* early consistency check */ | |||
UChar_t *bufcur = (UChar_t *) (src + keylen); | |||
Int_t nin, nbuf; | |||
if(R__unzip_header(&nin, bufcur, &nbuf)!=0) { | |||
if(objlen > nbytes-keylen && R__unzip_header(&nin, bufcur, &nbuf)!=0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is changed in a commit titled "Use TTaskGroup interface to unzip baskets in parallel. "
The link between this change and the title is not clear. If it is not related can you put it in its own commit?
Hi all. The implementation of this feature radically changed wrt ~1 month ago. How is the "Event" benchmark performing? How is the "CMSSW candle" performing? |
@dpiparo |
1bb010a
to
4514b4e
Compare
Run two tests: Event benchmark and B2HHH.root (compressed with zlib-6). Both of the tests disabled parallel TTree::GetEntry since receive_and_steal function from tbb takes ridiculous long in total runtime. B2HHH.root (25 branches and 8556118 entries):
Event benchmark (56 branches and 10000 entries):
|
7945412
to
d7a0560
Compare
@phsft-bot build |
Starting build on |
@pcanal Unfortunately, I can't see what output of the build. I do not have access permission. What are those failures about? |
@pcanal @bbockelm If I turn on TBB both for TTree::GetEntry() and TTreeCacheUnzip, the system will crush as due to the memory unalignment read. I do not know why this happens, but if I only turn on TBB for either TTree::GetEntry or TTreeCacheUnzip, it won't happen. I post the stack trace from gdb as follows: //=========================================================== Thread 4 (Thread 0x7f16176f2700 (LWP 30317)): Thread 3 (Thread 0x7f1617af3700 (LWP 30316)): Thread 2 (Thread 0x7f1617ef4700 (LWP 30315)): Thread 1 (Thread 0x7f1626ed7a40 (LWP 30289)): The lines below might hint at the cause of the crash. |
@phsft-bot build |
Starting build on |
Build failed on slc6/gcc62. Failing tests: |
@pcanal @bbockelm [projectroot.roottest.root.multicore.roottest_root_multicore_tp_process_imt] It is still shown as "Time Out" on my desktop. I also tried this particular test with latest upstream root. It can't pass either. |
@phsft-bot build |
Starting build on |
Build failed on ubuntu16/native. |
Test failed due to failure of uploading test results to cdash, it seems:
Two minutes later:
|
@phsft-bot build (from the Mattermost discussion, it seems there were overnight issues in CVMFS?) |
Starting build on |
@bbockelm @pcanal @dpiparo
Here is the new imt unzipping basket with TTaskGroup interface.
Comparing to #785 , I noticed there are still 3%(in Real Time) ~ 5%(in CPU Time) performance drops in new implementation. The degradation is caused by tbb function:
tbb::internal::custom_schedulertbb::internal::IntelSchedulerTraits::receive_or_steal_task(long&)
I suspect the reason is because #785 in the following function:
https://github.com/zzxuanyuan/root/blob/15cceff19b48dfe4a4b0c69c1ec07ea75bd1ccb5/tree/tree/src/TTreeCacheUnzip.cxx#L708
CreateTasks() explicitly creates 2 tasks (empty_task and MappingTask; and set_ref_count(2) means 2 tasks in total). The scheduler might make a better decision here since it knows there will be only one task except empty_task running in future.
On the other hand, TTaskGroup uses tbb::task_group which calls the following function:
https://github.com/01org/tbb/blob/b9805bacadd4d0474fd3358cf0c7153042ce50c3/include/tbb/task_group.h#L108
task_group_base() also first creates a empty_task. However, it only creates 1 task(itself) by setting reference count as 1 (set_ref_count(1)). When it invoke another task by calling
https://github.com/01org/tbb/blob/b9805bacadd4d0474fd3358cf0c7153042ce50c3/include/tbb/task_group.h#L103
allocate_additional_child() will create a new task as child and increment reference count by 1. I guess accumulating tasks on-the-fly might degrade the performance since the tbb scheduler could spend more time on finding tasks to work on.
In a short, I think explicitly defining the total number of tasks and task graph should have better performance (more efficient for scheduler I guess) than adding more tasks to task_group as the program runs.
There are two alternative approaches that might improve the performance.
If we do not mind a little performance drops, the current implementation should be fine.
Thanks,
Zhe