-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-12208: [C++] Add the ability to run async tasks without using the CPU thread pool #9892
Conversation
@pitrou I initially wanted to replace the IOContext's executor with the serial executor as well but I wasn't sure what the best way to do that would be. For example, I could create a new IOContext with the same pool & stop token but a different executor and pass that down but I wasn't sure if that would be correct. I think a simplified version of my question is "does the IOContext come from the calling user (via options) or from the filesystem?" |
Indeed, I think we should clarify this. I would say: by default, the default IOContext comes from the filesystem but can be overriden by the user. But we may need to modify the IOContext API slightly so that |
Ok, that makes sense. I think that RTools is happy enough with the I/O threads (they have always existed anyways) so I'll defer that work for the future. |
JNI check seems unrelated. Please review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some tiny nits but this looks good overall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I hope we can cut down on the complication by returning Future<T>
.
cpp/src/arrow/util/thread_pool.cc
Outdated
return Status::OK(); | ||
} | ||
|
||
void SerialExecutor::MarkFinished(bool& finished) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool* finished
, since it's mutable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
cpp/src/arrow/util/thread_pool.cc
Outdated
|
||
while (!finished) { | ||
while (!state_->task_queue.empty()) { | ||
Task& task = state_->task_queue.front(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task_queue
can be mutated from another thread, and I don't think there are any guarantees about reference stability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I thought I was fine since I re-lock before I pop but I suppose the underlying storage could be getting reallocated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
cpp/src/arrow/util/thread_pool.h
Outdated
template <typename T = ::arrow::detail::Empty> | ||
using FinishSignal = internal::FnOnce<void(const Result<T>&)>; | ||
template <typename T = ::arrow::detail::Empty> | ||
using Scheduler = internal::FnOnce<Status(Executor*, FinishSignal<T>)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Scheduler" is confusing. "TopLevelTask" perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to TopLevelTask
std::move(finish_signal)(Status::Invalid("XYZ")); | ||
return Status::OK(); | ||
}; | ||
ASSERT_RAISES(Invalid, SerialExecutor::RunInSerialExecutor(std::move(scheduler))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... instead of testing RunInSerialExecutor
explicitly, can all these tests call RunSynchronously
, so that you can run the same tests for different use_threads
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -166,14 +174,15 @@ class CsvScanTask : public ScanTask { | |||
source_(fragment->source()) {} | |||
|
|||
Result<RecordBatchIterator> Execute() override { | |||
ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync()); | |||
ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync(internal::GetCpuThreadPool())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you lookup ScanOptions::use_threads
? Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit awkward. This method would not be used by either Scanner::ToTable
or FileSystemDataset::Write
(they will call ExecuteAsync
). The only way this could be called is if the user called Scan()
directly.
In that case we cannot really use RunInSerialExecutor
here because it is returning an iterator.
I suppose that means there is no truly serial way to call Scan()
on CSV. It wouldn't be parallel exactly, it would just be juggling the work between the calling thread, the I/O executor, and the CPU executor.
@@ -29,10 +29,12 @@ | |||
#include "arrow/dataset/partition.h" | |||
#include "arrow/dataset/scanner.h" | |||
#include "arrow/util/logging.h" | |||
#include "arrow/util/thread_pool.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
cpp/src/arrow/util/thread_pool.h
Outdated
/// always mark a future finished (which can someday be aided by ARROW-12207). | ||
template <typename T> | ||
static Result<T> RunInSerialExecutor(Scheduler<T> initial_task) { | ||
auto serial_executor = std::make_shared<SerialExecutor>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but this can probably be SerialExecutor serial_executor;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
cpp/src/arrow/util/thread_pool.h
Outdated
return final_result; | ||
} | ||
void RunLoop(const bool& finished); | ||
void MarkFinished(bool& finished); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it seems bool finished
may simply be an internal state member of SerialExecutor
, e.g.:
void RunLoop();
void FinishRunning();
(though, is the executor meant to be restartable?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved finished
into the internal state. The executor is not meant to be restartable at all and should not exist outside of the scope of RunInSerialExecutor
. I've moved the constructor for SerialExecutor
to private to better reflect this.
I believe I've addressed all comments. The only outstanding question is how we want to handle |
cpp/src/arrow/dataset/file_base.cc
Outdated
}); | ||
} | ||
} | ||
RETURN_NOT_OK(task_group->Finish()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace Is there a reason why you're not using FinishAsync
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, do you mean add FinishAsync
to scan_futs
and then wait for them all at AllComplete
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
…y without needing to create threads. In addition, modifies the Scanner::ToTable and FileSystemDataset::Write to execute serially. Scanner::Scan is still non-serial but I think this is OK. The I/O thread pool is still being used however. To truly remove all instances of std::thread we'd need to change the I/O context to use the serial executor but let's see if this makes R happy enough first.
* Improve tests
…te(futures) I added task_group->FinishAsync to the futures and called AllComplete on that.
32e9c12
to
c215a31
Compare
I believe all new changes have been addressed. |
This seems to fix the CI issues at least, so I'm going to merge. We can follow up next week if necessary. |
@@ -26,6 +26,7 @@ | |||
#include "arrow/type.h" | |||
#include "arrow/type_fwd.h" | |||
#include "arrow/util/future.h" | |||
#include "arrow/util/thread_pool.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow/util/type_fwd.h
was probably sufficient.
…he CPU thread pool Added a serial executor which can run async code serially without needing to create threads. In addition, modifies the Scanner::ToTable and FileSystemDataset::Write to execute serially. Scanner::Scan is still non-serial but I think this is OK. The I/O thread pool is still being used however. To truly remove all instances of std::thread we'd need to change the I/O context to use the serial executor but let's see if this makes R happy enough first. Closes apache#9892 from westonpace/feature/arrow-12208 Lead-authored-by: Weston Pace <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Neal Richardson <[email protected]>
…he CPU thread pool Added a serial executor which can run async code serially without needing to create threads. In addition, modifies the Scanner::ToTable and FileSystemDataset::Write to execute serially. Scanner::Scan is still non-serial but I think this is OK. The I/O thread pool is still being used however. To truly remove all instances of std::thread we'd need to change the I/O context to use the serial executor but let's see if this makes R happy enough first. Closes apache#9892 from westonpace/feature/arrow-12208 Lead-authored-by: Weston Pace <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Neal Richardson <[email protected]>
…he CPU thread pool Added a serial executor which can run async code serially without needing to create threads. In addition, modifies the Scanner::ToTable and FileSystemDataset::Write to execute serially. Scanner::Scan is still non-serial but I think this is OK. The I/O thread pool is still being used however. To truly remove all instances of std::thread we'd need to change the I/O context to use the serial executor but let's see if this makes R happy enough first. Closes apache#9892 from westonpace/feature/arrow-12208 Lead-authored-by: Weston Pace <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Neal Richardson <[email protected]>
Added a serial executor which can run async code serially without needing to create threads. In addition, modifies the Scanner::ToTable and FileSystemDataset::Write to execute serially. Scanner::Scan is still non-serial but I think this is OK. The I/O thread pool is still being used however. To truly remove all instances of std::thread we'd need to change the I/O context to use the serial executor but let's see if this makes R happy enough first.