-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pipelineX](refactor) remove source state from operator functions #31435
Conversation
Thank you for your contribution to Apache Doris. |
3d8aec5
to
32b5aef
Compare
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
@@ -74,19 +74,16 @@ Status DataGenSourceOperatorX::prepare(RuntimeState* state) { | |||
return Status::OK(); | |||
} | |||
|
|||
Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, | |||
SourceState& source_state) { | |||
Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: method 'get_block' can be made static [readability-convert-member-functions-to-static]
be/src/pipeline/exec/datagen_operator.h:75:
- Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
+ static Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
@@ -300,8 +300,7 @@ void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT | |||
static_cast<void>(channel->close(state, Status::OK())); | |||
} | |||
|
|||
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, | |||
SourceState source_state) { | |||
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: method 'sink' can be made static [readability-convert-member-functions-to-static]
be/src/pipeline/exec/exchange_sink_operator.h:196:
- Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
+ static Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
@@ -300,8 +300,7 @@ | |||
static_cast<void>(channel->close(state, Status::OK())); | |||
} | |||
|
|||
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, | |||
SourceState source_state) { | |||
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'sink' has cognitive complexity of 123 (threshold 50) [readability-function-cognitive-complexity]
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) {
^
Additional context
be/src/pipeline/exec/exchange_sink_operator.cpp:315: +1, including nesting penalty of 0, nesting level increased to 1
if (all_receiver_eof) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:319: +1, including nesting penalty of 0, nesting level increased to 1
if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:323: +2, including nesting penalty of 1, nesting level increased to 2
if (local_state.only_local_exchange) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:324: +3, including nesting penalty of 2, nesting level increased to 3
if (!block->empty()) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:333: +1, nesting level increased to 2
} else {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:335: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_state.get_next_available_buffer(&block_holder));
^
be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:335: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_state.get_next_available_buffer(&block_holder));
^
be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:339: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
^
be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:339: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
^
be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:342: +3, including nesting penalty of 2, nesting level increased to 3
if (serialized) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:344: +4, including nesting penalty of 3, nesting level increased to 4
if (!cur_block.empty()) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:348: +1, nesting level increased to 4
} else {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:369: +1, nesting level increased to 1
} else if (_part_type == TPartitionType::RANDOM) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:373: +2, including nesting penalty of 1, nesting level increased to 2
if (!current_channel->is_receiver_eof()) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:375: +3, including nesting penalty of 2, nesting level increased to 3
if (current_channel->is_local()) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:377: +4, including nesting penalty of 3, nesting level increased to 4
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:391: expanded from macro 'HANDLE_CHANNEL_STATUS'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:377: +5, including nesting penalty of 4, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:392: expanded from macro 'HANDLE_CHANNEL_STATUS'
if (status.is<ErrorCode::END_OF_FILE>()) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:377: +1, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:394: expanded from macro 'HANDLE_CHANNEL_STATUS'
} else { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:377: +6, including nesting penalty of 5, nesting level increased to 6
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:395: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^
be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:377: +7, including nesting penalty of 6, nesting level increased to 7
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:395: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^
be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:378: +1, nesting level increased to 3
} else {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:380: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^
be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:380: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^
be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:384: +4, including nesting penalty of 3, nesting level increased to 4
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:391: expanded from macro 'HANDLE_CHANNEL_STATUS'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:384: +5, including nesting penalty of 4, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:392: expanded from macro 'HANDLE_CHANNEL_STATUS'
if (status.is<ErrorCode::END_OF_FILE>()) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:384: +1, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:394: expanded from macro 'HANDLE_CHANNEL_STATUS'
} else { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:384: +6, including nesting penalty of 5, nesting level increased to 6
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:395: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^
be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:384: +7, including nesting penalty of 6, nesting level increased to 7
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^
be/src/vec/sink/vdata_stream_sender.h:395: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^
be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:390: +1, nesting level increased to 1
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
^
be/src/pipeline/exec/exchange_sink_operator.cpp:395: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(
^
be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:395: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(
^
be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:398: +2, including nesting penalty of 1, nesting level increased to 2
if (_part_type == TPartitionType::HASH_PARTITIONED) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:399: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(channel_add_rows(
^
be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:399: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(channel_add_rows(
^
be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:402: +1, nesting level increased to 2
} else {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:403: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(channel_add_rows(
^
be/src/common/status.h:541: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:403: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(channel_add_rows(
^
be/src/common/status.h:543: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/exec/exchange_sink_operator.cpp:407: +1, nesting level increased to 1
} else {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:414: +1, including nesting penalty of 0, nesting level increased to 1
if (eos) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:416: +2, including nesting penalty of 1, nesting level increased to 2
for (int i = 0; i < local_state.channels.size(); ++i) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:418: +3, including nesting penalty of 2, nesting level increased to 3
if (!st.ok() && final_st.ok()) {
^
be/src/pipeline/exec/exchange_sink_operator.cpp:418: +1
if (!st.ok() && final_st.ok()) {
^
@@ -102,7 +101,7 @@ class SetSourceOperatorX final : public OperatorX<SetSourceLocalState<is_interse | |||
template <typename HashTableContext> | |||
Status _get_data_in_hashtable(SetSourceLocalState<is_intersect>& local_state, | |||
HashTableContext& hash_table_ctx, vectorized::Block* output_block, | |||
const int batch_size, SourceState& source_state); | |||
const int batch_size, bool* eos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: parameter 'batch_size' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls]
const int batch_size, bool* eos); | |
int batch_size, bool* eos); |
@@ -32,17 +32,12 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnod | |||
const DescriptorTbl& descs) | |||
: OperatorX<SortLocalState>(pool, tnode, operator_id, descs) {} | |||
|
|||
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, | |||
SourceState& source_state) { | |||
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: method 'get_block' can be made static [readability-convert-member-functions-to-static]
be/src/pipeline/exec/sort_source_operator.h:62:
- Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
+ static Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
@@ -1264,26 +1262,24 @@ Status StreamingAggLocalState::close(RuntimeState* state) { | |||
return Base::close(state); | |||
} | |||
|
|||
Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, | |||
SourceState& source_state) const { | |||
Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, bool* eos) const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: method 'pull' can be made static [readability-convert-member-functions-to-static]
Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, bool* eos) const { | |
Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, bool* eos) { |
be/src/pipeline/exec/streaming_aggregation_operator.h:194:
- Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override;
+ static Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) override;
} | ||
|
||
DataDistribution required_data_distribution() const override { | ||
return {ExchangeType::PASSTHROUGH}; | ||
} | ||
|
||
Status push(RuntimeState* state, vectorized::Block* input_block, | ||
SourceState source_state) const override { | ||
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: method 'push' can be made static [readability-convert-member-functions-to-static]
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override { | |
static Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override { |
@@ -114,11 +111,10 @@ | |||
return Status::OK(); | |||
} | |||
|
|||
Status pull(RuntimeState* state, vectorized::Block* output_block, | |||
SourceState& source_state) const override { | |||
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) const override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: method 'pull' can be made static [readability-convert-member-functions-to-static]
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) const override { | |
static Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override { |
TeamCity be ut coverage result: |
run buildall |
TeamCity be ut coverage result: |
TPC-H: Total hot run time: 38207 ms
|
TPC-DS: Total hot run time: 168968 ms
|
ClickBench: Total hot run time: 31.71 s
|
Load test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
|
run buildall |
TPC-H: Total hot run time: 38237 ms
|
TeamCity be ut coverage result: |
TPC-DS: Total hot run time: 170043 ms
|
ClickBench: Total hot run time: 32.23 s
|
Load test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
Proposed changes
Issue Number: close #xxx
Further comments
If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...