-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11928: [C++] Execution engine API #10204
Conversation
@wesm @bkietz @westonpace This is still quite draft-ish but you may want to look out for potential gotchas. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few questions, I suspect I'll have more later
cpp/src/arrow/engine/exec_plan.h
Outdated
/// is started before all of its inputs. | ||
Status StartProducing(); | ||
|
||
// XXX should we also have `void StopProducing()`? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you could use a stop token.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I think a StopToken would be used at a higher level, but I'm not sure.
cpp/src/arrow/engine/exec_plan.h
Outdated
virtual ~ExecNode(); | ||
|
||
virtual const char* kind_name() = 0; | ||
// The number of inputs and outputs expected by this node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expected? Or current? What's the purpose of these fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expected. They're mostly there for validation and debugging. I'm not sure we'll keep them, but introspectability can be useful.
cpp/src/arrow/engine/exec_plan.h
Outdated
|
||
/// The datatypes for each input | ||
// XXX Should it be std::vector<DataType>? | ||
const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When starting a scan Arrow doesn't know the physical schema at all. Reading through this it seems like the execution plan cannot be created without knowing the physical schema. Is the plan to do some initial scanning before creating an execution plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! I hadn't thought about that... Like num_inputs()
and num_outputs()
, those should be mostly useful for validation, so perhaps we can remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the other hand, even if the physical dataset schema is not known, the logical expected schema should be known (and the dataset scanner should cast implicitly). @bkietz What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dataset_schema
(reader schema in Avro parlance) is always known when we construct a ScanNode and add it to a plan. Inferring dataset schema from physical schema is deferred to the dataset factories in discovery.h
. I believe it's safe to rely on having a known schema along all edges when constructing these nodes
cpp/src/arrow/engine/exec_plan.h
Outdated
/// - StopProducing() should recurse into the inputs | ||
/// - StopProducing() must be idempotent | ||
|
||
// XXX What happens if StartProducing() calls an output's InputReceived() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to create a map node to test these rules out. Right now you have a source node which doesn't have to worry about input callbacks and a sink node which doesn't have to worry about output callbacks but no intermediate nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yes, I plan to add a bunch of other test nodes to stress the execution model (including, hopefully, intermediate nodes that split or combine their input(s), that introduce random delays, that shuffle output order, that limit output size).
cpp/src/arrow/engine/exec_plan.h
Outdated
/// - StopProducing() should recurse into the inputs | ||
/// - StopProducing() must be idempotent | ||
|
||
// XXX What happens if StartProducing() calls an output's InputReceived() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, there's no examples of a node with 2 inputs. For example, what happens if one input arrives faster than the other? I suppose you would queue for some limit and then apply back pressure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That depends if the 2 inputs are needed to produce some output. That's probably node-dependent. But there's certainly some complication when a node has more than one input or output.
cpp/src/arrow/engine/exec_plan.h
Outdated
// (or PauseProducing()) because it received enough data? | ||
// | ||
// Right now, since synchronous calls happen in both directions (input to | ||
// output and then output to input), a node must be careful to be reentrant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the output -> input direction? Backpressure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, backpressure. Is "downstream" and "upstream" more readable than "input to output" and "output to input"?
5dda970
to
14a9abb
Compare
cpp/src/arrow/engine/exec_plan.h
Outdated
|
||
/// The datatypes for each input | ||
// XXX Should it be std::vector<DataType>? | ||
const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dataset_schema
(reader schema in Avro parlance) is always known when we construct a ScanNode and add it to a plan. Inferring dataset schema from physical schema is deferred to the dataset factories in discovery.h
. I believe it's safe to rely on having a known schema along all edges when constructing these nodes
cpp/src/arrow/engine/exec_plan.h
Outdated
virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0; | ||
|
||
/// Signal error to ExecNode | ||
virtual void ErrorReceived(int input_index, Status error) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe too magic, but:
virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0; | |
/// Signal error to ExecNode | |
virtual void ErrorReceived(int input_index, Status error) = 0; | |
virtual void Receive(int input_index, int seq_num, Result<compute::ExecBatch> batch) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, no preference from me.
cpp/src/arrow/engine/exec_plan.h
Outdated
/// and StopProducing() | ||
|
||
/// Transfer input batch to ExecNode | ||
virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to identify inputs and outputs by pointer rather than index?
virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0; | |
virtual void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) = 0; |
I think we won't be gaining a huge performance benefit by using indices at the top level, and it requires much more cognitive overhead of tracking a node's identifying indices within other nodes. If we identified by pointer instead, a node could push output with:
auto batch = ...;
// push to outputs
for (ExecNode* output : outputs_) {
output->InputReceived(this, seq_num, batch);
}
Instead of:
auto batch = ...;
// push to outputs
for (OutputNode output : outputs_) {
output.output->InputReceived(output.input_index, seq_num, batch);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would certainly work, API-wise, but I'm a bit wary of introducing hash lookups out of convenience (unless you have another implementation in mind?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of a simple linear search. I wouldn't expect nodes to deal with a large number of inputs, and for vectors of integers shorter than ~256 I'd expect a linear search to be faster than a hash lookup anyway
I think we can defer support for multiple consumers for the moment, I'm refactoring to singular output nodes |
16bdf53
to
51f2a3c
Compare
Hmm, I'm not convinced about this. Say you have a RecordBatchReader with several columns of interest. You want each column to feed a separate execution node (say, one different scalar function per column). How do you do that if you can only have singular output nodes? Implement a "supernode" that applies multiple scalar functions to multiple columns? |
I would workaround this with an API for producing conjoined split nodes: /// Split the output of `to_be_split` for consumption by multiple downstream nodes
///
/// Each split node will list `to_be_split` as its only input, though `to_be_split` will only
/// consider the split node as its output. Whenever `to_be_split` pushes to the first split
/// node, that data will be replicated to the outputs of each split node. Back pressure
/// on any split node will also be felt by `to_be_split`.
std::vector<std::unique_ptr<ExecNode>> MakeSplit(ExecNode* to_be_split, int n_splits); I think that the multiple consumer case will be sufficiently uncommon that making it |
Regardless of whether splitting is delegated to a specialized node (which I agree is a good idea), I'm not sure what it gains to remove multiple outputs from the API. Nodes with a single output don't have to deal with the complication in either case. |
This article has an interesting description of how DAGs (which implies multiple outputs) are used by Materialize to optimize query plans: https://scattered-thoughts.net/writing/materialize-decorrelation I don't know nearly enough to know how common or essential this is. As for complications, multiple outputs introduces buffering (in both pull and push models). While you are delivering a result to consumer 1 you have to buffer the result so you can later deliver it to consumer 2. If your query plan's bottleneck is down the consumer 1 path you could potentially accumulate results in the multicasting operator and need to trigger backpressure. That's the main complication that jumps to mind. That being said, this "multicasting" is one of the more confusing points of Rx (Reactive). But that may just come from the dynamic and linear way in which observers are chained. Since you're already building a graph (that presumably is unchanging for the duration of the execution) that shouldn't be a problem. |
It's worth repeating that multiple consumers will exist one way or another. They can be exposed in the API, or they can be buried inside the implementation of a special node. Personally, I would find it slightly more logical to expose them in the API, and it would help give the user a better view of what happens (think visualizing the execution graph, for instance). |
53a8901
to
7f1d533
Compare
This reverts commit 7f1d533.
Closes apache#10204 from pitrou/ARROW-11928-engine-hierarchy-v2 Lead-authored-by: "Antoine Pitrou <[email protected]>" Co-authored-by: Benjamin Kietzman <[email protected]> Co-authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
No description provided.