-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] ARROW-11928: [C++] Execution engine API #9742
Conversation
66b9b60
to
2631943
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some basic things here that I don't understand, I put a few questions in my comments, so I will review some more after I see the answers
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that work on query planning is anticipated soon, only work on execution, so this could be put off later.
As a nit, I would also suggest calling this "logical_plan.h" to distinguish logical from physical execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, perhaps the naming is bad, but this is not a logical plan. This is the physical plan. "Exec plan" is a particular instantiation of executing this physical plan.
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. How about:
- query_plan.h -> physical_plan.h
- exec_plan.h -> exec_node.h
/// When all inputs are received for a given batch_index, the batch is ready | ||
/// for execution. | ||
Status InputReceived(int32_t input_index, int32_t batch_index, | ||
compute::ExecBatch batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is "batch_index"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The index of the batch in the batch stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this parameter is important, at least at this stage. Could you clarify what "When all inputs are received for a given batch_index, the batch is ready for execution." means? In general, inputs from different parents won't necessarily correspond to each other.
For example, suppose that we are joining a 1,000,000 row input with a 100,000 row input. One input might yield 100 batches while the other might only yield 5 or 10. They may be of differing lengths, in an unpredictable order (since scans won't necessarily yield a deterministic order), and the batches won't correspond to each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, inputs from different parents won't necessarily correspond to each other.
Ah, I see. This is a misunderstanding on my part, then. Sorry.
/// This may be called before all inputs are received. This simply fixes | ||
/// the total number of incoming batches so that the ExecNode knows when | ||
/// it has received all input. | ||
Status InputFinished(int32_t num_batches); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- In general, nodes will not know how many batches they are going to produce, so the situations where this API would be used seem rare to me
- When a node has multiple inputs (e.g. joins, unions), the parent nodes will finish producing outputs at different times. How are they supposed to independently communicate that they are done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the docstring says, this may or may not be called before all inputs are received. But it must be called at some point so that the node knows that input is finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the second question, it seems I should change this declaration to Status InputFinished(int32_t input_index, int32_t num_batches)
. Does that sound right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say to nix the "num_batches" argument, but otherwise yes InputFinished(input_index)
sounds right
/// Note that execution doesn't necessarily mean that any outputs are produced. | ||
/// Depending on the ExecNode type, outputs may be produced on the fly, | ||
/// or only at the end when all inputs have been received. | ||
Future<> RunAsync(int32_t batch_index, internal::Executor* executor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I don't understand what "batch_index" is yet
How does this handle fan-out style parallelism. For example, you can generally filter batch N while filtering batch N+1 at the same time. Would there be a single ExecNode that is run twice? If so won't you have trouble tracking inputs? Or would there be two ExecNode instances in the plan? In that case how would you decide which node to deliver batches to? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope these questions aren't too far off base, I've only started getting up to speed on the execution plan docs.
class QueryContext; | ||
class QueryNode; | ||
|
||
class ARROW_EXPORT QueryPlan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the relationship between a query plan and an execution plan? My current understanding is that a query plan is an AST of the query and the execution plan is a possibly optimized tree of workers. Is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I answered above, "query plan" is the physical plan. "Exec plan" is the particular execution of this plan for a given set of inputs. Does that make sense? Do you want to suggest other names?
auto* input_batch = EnsureBatch(batch_index); | ||
|
||
// TODO lifetime (take strong ref to ExecPlan?) | ||
return executor->Transfer(input_batch->ready_fut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would different nodes run on different executors? If not it might be simpler to avoid the transfer. Also, do I have to call RunAsync on every node for every batch index? Couldn't calling RunAsync on the root node(s?) be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what "root nodes" are in this context. You mean the sources?
Indeed, we may simply want to start executing as soon as all inputs are ready. Though this will depend on the node: some can execute as soon as one batch is received, some need the whole input to be received.
class QueryPlan; | ||
|
||
class ARROW_EXPORT ExecPlan { | ||
public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do I actually run this? There doesn't appear to be any public interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this is just a WIP PR. I decided to post it to ensure that I'm not entirely going in a bad direction.
compute::ExecContext* context() { return context_; } | ||
|
||
protected: | ||
friend class QueryPlan; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be the other way around? The QueryPlan doesn't have any reference to the ExecPlan (and if my understanding of the relationship is correct it shouldn't). What value is there in this declaration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query plan is supposed to instantiate a corresponding exec plan when QueryPlan::MakeExecPlan
is called. Presumably that will need access to non-public APIs or members, but this is all just a sketch.
There would be a single ExecNode that is run twice. Tracking the input is done through the I don't think independent ExecNodes would work, since some operations (e.g. hash aggregation or any vector function) need access to the entire input before starting to compute their output. |
Note: Hash aggregation does not need access to its entire input to start computing, and in the first iteration of this project vector functions will be entirely disallowed. |
Also keep in mind (for subsequent work) that some nodes (ie “Limit” in particular) will need to be able to apply backpressure on their parents to get them to immediately quit producing outputs. For example, if you have a “limit 1000” query, as soon as you have 1000 rows of output you shut down the ancestors. |
Hmm, right. But still, any output depends on the entire input, so it cannot be emitted on-the-fly while receiving partial input chunks. |
Ok, I'll have to think about that. |
Right, these kinds of operators (ones that have to exhaust their inputs before emitting an output) are often called "blocking" in the literature (see e.g. http://pages.cs.wisc.edu/~jignesh/publ/Quickstep.pdf or http://www.vldb.org/pvldb/vol11/p2209-kersten.pdf). |
Note for readers: I'm working on a new PR which will supersede this. |
Closing in favour of #10204. |
No description provided.