From b53d18b6413669c7baafcb935b3e0a3a2fcf89ef Mon Sep 17 00:00:00 2001 From: chenlinfeng <723609220@qq.com> Date: Tue, 24 Aug 2021 10:02:59 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=84=E8=8C=83=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/msg_queue.cc | 179 +++++++++++++++++++++----------------------- sql/pq_condition.cc | 143 +++++++++++++++++------------------ 2 files changed, 157 insertions(+), 165 deletions(-) diff --git a/sql/msg_queue.cc b/sql/msg_queue.cc index fc95fe4f27db..75066c559377 100644 --- a/sql/msg_queue.cc +++ b/sql/msg_queue.cc @@ -40,11 +40,11 @@ bool dbug_pq_worker_stall= 0; MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *written, bool nowait) { uint32 used; - uint32 ringsize= m_queue->m_ring_size; + uint32 ringsize = m_queue->m_ring_size; uint32 sent = 0, available; /** only worker thread can send data to message queue */ - THD *thd= current_thd; + THD *thd = current_thd; DBUG_ASSERT(!m_queue->m_sender_event->PQ_caller || thd == m_queue->m_sender_event->PQ_caller); @@ -53,25 +53,25 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *wri { /** atomically obtain the read position */ - rb= atomic_read_u64(&m_queue->m_bytes_read); + rb = atomic_read_u64(&m_queue->m_bytes_read); /** atomically obtain the write position */ - wb= atomic_read_u64(&m_queue->m_bytes_written); + wb = atomic_read_u64(&m_queue->m_bytes_written); DBUG_ASSERT(wb >= rb); - used= wb - rb; + used = wb - rb; DBUG_ASSERT(used <= ringsize); - available= std::min(ringsize - used, nbytes - sent); + available = std::min(ringsize - used, nbytes - sent); compiler_barrier(); if (m_queue->detached == MQ_HAVE_DETACHED) { - *written= sent; + *written = sent; return MQ_DETACHED; } if (m_queue->detached == MQ_TMP_DETACHED) { - *written= sent; + *written = sent; return MQ_SUCCESS; } @@ -86,9 +86,8 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *wri /** notify receiver to receive data from MQ */ end_wait(m_queue->m_receiver_event); /** blocking mode by default, i.e, nowait = false */ - if (nowait) - { - *written= sent; + if (nowait) { + *written = sent; return MQ_WOULD_BLOCK; } /** sender enters into the blocking wait status */ @@ -100,14 +99,14 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *wri uint32 sent_once; /** compute the real write position in ring array */ - offset= MOD(wb, ringsize); - sent_once= std::min(available, ringsize - offset); + offset = MOD(wb, ringsize); + sent_once = std::min(available, ringsize - offset); /** this barrier ensures that memcpy() is finished before end_wait() */ memory_barrier(); memcpy(&m_queue->m_buffer[offset], reinterpret_cast(const_cast(data)) + sent, sent_once); - sent+= sent_once; + sent += sent_once; /** atomically update the write position */ atomic_inc_bytes_written(sent_once); @@ -121,14 +120,13 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *wri #endif } - if (thd->is_pq_error()) - { + if (thd->is_pq_error()) { set_datched_status(MQ_HAVE_DETACHED); return MQ_DETACHED; } DBUG_ASSERT(sent == nbytes); - *written= sent; + *written = sent; return MQ_SUCCESS; } @@ -146,13 +144,12 @@ MQ_RESULT MQueue_handle::send(const void *data, uint32 len, bool nowait) { MQ_RESULT res; - uint32 nbytes= len; + uint32 nbytes = len; uint32 written; /** (1) write the message length into MQ */ - res= send_bytes(WORD_LENGTH, (char *)&nbytes, &written, nowait); - if (res != MQ_SUCCESS) - { + res = send_bytes(WORD_LENGTH, (char *)&nbytes, &written, nowait); + if (res != MQ_SUCCESS) { DBUG_ASSERT(res == MQ_DETACHED); return res; } @@ -160,9 +157,8 @@ MQ_RESULT MQueue_handle::send(const void *data, uint32 len, bool nowait) || written == WORD_LENGTH); /** (2) write the message data into MQ */ - res= send_bytes(nbytes, data, &written, nowait); - if(res != MQ_SUCCESS) - { + res = send_bytes(nbytes, data, &written, nowait); + if(res != MQ_SUCCESS) { DBUG_ASSERT(res == MQ_DETACHED); return res; } @@ -174,21 +170,19 @@ MQ_RESULT MQueue_handle::send(const void *data, uint32 len, bool nowait) /** sending message to MQ in a Field_raw_data manner */ MQ_RESULT MQueue_handle::send(Field_raw_data *fm) { - MQ_RESULT res= MQ_SUCCESS; + MQ_RESULT res = MQ_SUCCESS; uint32 written; /** (s1) sending the variable-field's length_bytes */ - if (fm->m_var_len) - { - res= send_bytes(1, (void *)&fm->m_var_len, &written); + if (fm->m_var_len) { + res = send_bytes(1, (void *)&fm->m_var_len, &written); DBUG_ASSERT((res == MQ_SUCCESS && written == 1) || res == MQ_DETACHED || (!written && m_queue->detached == MQ_TMP_DETACHED)); } /** (s2) sending the data of field->ptr */ - if (MQ_SUCCESS == res) - { - res= send_bytes(fm->m_len, fm->m_ptr, &written); + if (MQ_SUCCESS == res) { + res = send_bytes(fm->m_len, fm->m_ptr, &written); DBUG_ASSERT((res == MQ_SUCCESS && written == fm->m_len) || res == MQ_DETACHED || (!written && m_queue->detached == MQ_TMP_DETACHED)); } @@ -208,38 +202,36 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi uint64 rb, wb; uint32 used, offset; - *nbytesp= 0; - uint32 ringsize= m_queue->m_ring_size; - THD* thd= current_thd; + *nbytesp = 0; + uint32 ringsize = m_queue->m_ring_size; + THD* thd = current_thd; DBUG_ASSERT(!m_queue->m_receiver_event->PQ_caller || thd == m_queue->m_receiver_event->PQ_caller); - while(!thd->is_killed() && !thd->pq_error) - { - rb= atomic_read_u64(&m_queue->m_bytes_read) + m_consume_pending; - wb= atomic_read_u64(&m_queue->m_bytes_written); + while(!thd->is_killed() && !thd->pq_error) { + rb = atomic_read_u64(&m_queue->m_bytes_read) + m_consume_pending; + wb = atomic_read_u64(&m_queue->m_bytes_written); DBUG_ASSERT(wb >= rb); - used= wb - rb; + used = wb - rb; DBUG_ASSERT(used <= ringsize); - offset= MOD(rb, ringsize); + offset = MOD(rb, ringsize); /** we have enough space and then directly read bytes_needed data into datap */ if (used >= bytes_needed) { /** (s1) read data located in [offset, ..., ringsize] */ - if (offset + bytes_needed <= ringsize) - { + if (offset + bytes_needed <= ringsize) { memcpy(datap, &m_queue->m_buffer[offset], bytes_needed); } else { /** (s2) read data located in [offset, ringsize], [0, bytes_needed - (ringsize - offset))] */ - int part_1= ringsize - offset; - int part_2= bytes_needed - part_1; + int part_1 = ringsize - offset; + int part_2 = bytes_needed - part_1; memcpy(datap, &m_queue->m_buffer[offset], part_1); memcpy((char *)datap + part_1, &m_queue->m_buffer[0], part_2); } - *nbytesp= bytes_needed; + *nbytesp = bytes_needed; memory_barrier(); /** notify sender that there is available space in MQ */ @@ -251,10 +243,9 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi * there are not enough data for receiver, and receiver only * receives the data located into [offset, ringsize]. */ - if (offset + used >= ringsize) - { + if (offset + used >= ringsize) { memcpy(datap, &m_queue->m_buffer[offset], ringsize - offset); - *nbytesp= ringsize - offset; + *nbytesp = ringsize - offset; memory_barrier(); end_wait(m_queue->m_sender_event); @@ -265,11 +256,11 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi * if m_queue is detached and there are still data in m_queue, * receiver can receive data until it reads all data. */ - if (m_queue->detached == MQ_HAVE_DETACHED) - { + if (m_queue->detached == MQ_HAVE_DETACHED) { read_barrier(); - if(wb != atomic_read_u64(&m_queue->m_bytes_written)) + if(wb != atomic_read_u64(&m_queue->m_bytes_written)) { continue; + } return MQ_DETACHED; } @@ -281,10 +272,9 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi * } * should be a group of atomic operation. */ - if (m_consume_pending > 0) - { - offset= m_consume_pending; - m_consume_pending= 0; + if (m_consume_pending > 0) { + offset = m_consume_pending; + m_consume_pending = 0; /** ensure that: consume_pending has written into memory */ memory_barrier(); @@ -292,7 +282,9 @@ MQ_RESULT MQueue_handle::receive_bytes(uint32 bytes_needed, uint32 *nbytesp, voi } /** the blocking-read mode */ - if (nowait) return MQ_WOULD_BLOCK; + if (nowait) { + return MQ_WOULD_BLOCK; + } set_wait(m_queue->m_receiver_event); reset_wait(m_queue->m_receiver_event); @@ -310,7 +302,7 @@ MQ_RESULT MQueue_handle::receive(void **datap, uint32 *nbytesp, bool nowait) { MQ_RESULT res; uint32 nbytes, offset; - uint32 rb= 0; + uint32 rb = 0; /** * only when m_consume_pending is greater than 1/4 * m_ring_size, we update the read position * m_read_bytes using m_consume_pending; otherwise, the number of read bytes is firstly @@ -318,10 +310,9 @@ MQ_RESULT MQueue_handle::receive(void **datap, uint32 *nbytesp, bool nowait) * real read position in ring array. * */ - if (m_consume_pending > m_queue->m_ring_size / 4) - { - offset= m_consume_pending; - m_consume_pending= 0; + if (m_consume_pending > m_queue->m_ring_size / 4) { + offset = m_consume_pending; + m_consume_pending = 0; memory_barrier(); atomic_inc_bytes_read(offset); @@ -338,11 +329,12 @@ MQ_RESULT MQueue_handle::receive(void **datap, uint32 *nbytesp, bool nowait) */ /** (1) read the message length */ - while (!m_length_word_complete) - { + while (!m_length_word_complete) { DBUG_ASSERT(m_partial_bytes < WORD_LENGTH); - res= receive_bytes(WORD_LENGTH - m_partial_bytes, &rb, &m_buffer[m_partial_bytes], nowait); - if (res != MQ_SUCCESS) return res; + res = receive_bytes(WORD_LENGTH - m_partial_bytes, &rb, &m_buffer[m_partial_bytes], nowait); + if (res != MQ_SUCCESS) { + return res; + } uint32x2_t v_a = {m_partial_bytes, m_consume_pending}; uint32x2_t v_b = {rb, rb}; @@ -351,58 +343,57 @@ MQ_RESULT MQueue_handle::receive(void **datap, uint32 *nbytesp, bool nowait) m_partial_bytes = vget_lane_u32(v_a, 0); m_consume_pending = vget_lane_u32(v_a, 1); - if (m_partial_bytes >= WORD_LENGTH) - { + if (m_partial_bytes >= WORD_LENGTH) { DBUG_ASSERT(m_partial_bytes == WORD_LENGTH); - m_expected_bytes= *(uint32 *) m_buffer; - m_length_word_complete= true; - m_partial_bytes= 0; + m_expected_bytes = *(uint32 *) m_buffer; + m_length_word_complete = true; + m_partial_bytes = 0; } } - nbytes= m_expected_bytes; + nbytes = m_expected_bytes; /** re-allocing local buffer when m_buffer_len is smaller than nbytes */ if (m_buffer_len < nbytes - || DBUG_EVALUATE_IF("pq_mq_error3", true, false)) - { - while (m_buffer_len < nbytes) - { - m_buffer_len*= 2; + || DBUG_EVALUATE_IF("pq_mq_error3", true, false)) { + while (m_buffer_len < nbytes) { + m_buffer_len *= 2; } - if (m_buffer) destroy(m_buffer); - THD *thd= current_thd; + if (m_buffer) { + destroy(m_buffer); + } + THD *thd = current_thd; DBUG_ASSERT(!m_queue->m_receiver_event->PQ_caller || thd == m_queue->m_receiver_event->PQ_caller); - m_buffer= new (thd->pq_mem_root) char [m_buffer_len]; + m_buffer = new (thd->pq_mem_root) char [m_buffer_len]; /** if m_buffer allocates fail, then directly return my_error */ - if (!m_buffer || DBUG_EVALUATE_IF("pq_mq_error3", true, false)) - { + if (m_buffer == nullptr || DBUG_EVALUATE_IF("pq_mq_error3", true, false)) { my_error(ER_STD_BAD_ALLOC_ERROR, MYF(0), "", "(MQ::receive)"); return MQ_DETACHED; } } /** (2) read data of nbytes **/ - for (;;) - { + for (;;) { size_t still_needed; DBUG_ASSERT(m_partial_bytes <= nbytes); - still_needed= nbytes - m_partial_bytes; - res= receive_bytes(still_needed, &rb, &m_buffer[m_partial_bytes], nowait); - if (res != MQ_SUCCESS) return res; - - m_partial_bytes+= rb; - m_consume_pending+= rb; - if (m_partial_bytes >= nbytes) + still_needed = nbytes - m_partial_bytes; + res = receive_bytes(still_needed, &rb, &m_buffer[m_partial_bytes], nowait); + if (res != MQ_SUCCESS) { + return res; + } + m_partial_bytes += rb; + m_consume_pending += rb; + if (m_partial_bytes >= nbytes) { break; + } } /** reset for next read */ - m_length_word_complete= false; - m_partial_bytes= 0; + m_length_word_complete = false; + m_partial_bytes = 0; - *nbytesp= nbytes; - *datap= m_buffer; + *nbytesp = nbytes; + *datap = m_buffer; return MQ_SUCCESS; } diff --git a/sql/pq_condition.cc b/sql/pq_condition.cc index ba9a14c18822..92fe49741ddf 100644 --- a/sql/pq_condition.cc +++ b/sql/pq_condition.cc @@ -47,7 +47,7 @@ const Item_sum::Sumfunctype NO_PQ_SUPPORTED_AGG_FUNC_TYPES [] = { Item_sum::AVG_DISTINCT_FUNC, Item_sum::GROUP_CONCAT_FUNC, Item_sum::JSON_AGG_FUNC, - Item_sum::UDF_SUM_FUNC , + Item_sum::UDF_SUM_FUNC, Item_sum::STD_FUNC, Item_sum::VARIANCE_FUNC }; @@ -144,7 +144,7 @@ bool pq_not_support_func(Item_func *func) { */ bool pq_not_support_aggr_functype(Item_sum::Sumfunctype type) { for (const Item_sum::Sumfunctype &sum_func_type : NO_PQ_SUPPORTED_AGG_FUNC_TYPES) { - if (sum_func_type == type) { + if (type == sum_func_type) { return true; } } @@ -205,11 +205,11 @@ bool check_pq_support_fieldtype_of_func_item(Item *item) { } // check func args type - for (uint i =0; i < func->arg_count; i++) { + for (uint i = 0; i < func->arg_count; i++) { //c1: Item_func::args contain aggr. function, (i.e., Item_sum) //c2: args contain unsupported fields Item *arg_item = func->arguments()[i]; - if (!arg_item || arg_item->type() == Item::SUM_FUNC_ITEM || //c1 + if (arg_item == nullptr || arg_item->type() == Item::SUM_FUNC_ITEM || //c1 !check_pq_support_fieldtype(arg_item)) { //c2 return false; } @@ -268,8 +268,7 @@ bool check_pq_support_fieldtype_of_sum_func_item(Item *item) { return false; } - for (uint i =0; i < sum->get_arg_count(); i++) - { + for (uint i = 0; i < sum->get_arg_count(); i++) { if (!check_pq_support_fieldtype(sum->get_arg(i))) { return false; } @@ -280,7 +279,7 @@ bool check_pq_support_fieldtype_of_sum_func_item(Item *item) { bool check_pq_support_fieldtype_of_ref_item(Item *item) { Item_ref *item_ref = down_cast(item); - if (!item_ref || pq_not_support_ref(item_ref)) { + if (item_ref == nullptr || pq_not_support_ref(item_ref)) { return false; } @@ -299,7 +298,7 @@ bool check_pq_support_fieldtype_of_cache_item(Item *item) { } Item *example_item = item_cache->get_example(); - if (!example_item || example_item->type() == Item::SUM_FUNC_ITEM || //c1 + if (example_item == nullptr || example_item->type() == Item::SUM_FUNC_ITEM || //c1 !check_pq_support_fieldtype(example_item)) { //c2 return false; } @@ -312,7 +311,7 @@ bool check_pq_support_fieldtype_of_row_item(Item *item) { Item_row *row_item = down_cast(item); for (uint i = 0; i < row_item->cols(); i++) { Item *n_item = row_item->element_index(i); - if (!n_item || n_item->type() == Item::SUM_FUNC_ITEM || //c1 + if (n_item == nullptr || n_item->type() == Item::SUM_FUNC_ITEM || //c1 !check_pq_support_fieldtype(n_item)) { //c2 return false; } @@ -382,7 +381,7 @@ bool check_pq_support_fieldtype(Item *item) { * false: */ bool check_pq_sort_aggregation(const ORDER_with_src &order_list) { - if (!order_list.order) { + if (order_list.order == nullptr) { return false; } @@ -426,7 +425,7 @@ bool pq_create_result_fields(THD *thd, Temp_table_param *param, } Func_ptr_array *copy_func = new (root) Func_ptr_array(root); - if (!copy_func) { + if (copy_func == nullptr) { return true; } @@ -434,7 +433,7 @@ bool pq_create_result_fields(THD *thd, Temp_table_param *param, List_iterator_fast li(fields); Item *item; while ((item = li++)) { - Field *new_field = NULL; + Field *new_field = nullptr; Item::Type type = item->type(); const bool is_sum_func = type == Item::SUM_FUNC_ITEM && !item->m_is_window_function; @@ -491,7 +490,7 @@ bool pq_create_result_fields(THD *thd, Temp_table_param *param, force_copy_fields, false, root) : nullptr; } - if (!new_field) { + if (new_field == nullptr) { DBUG_ASSERT(thd->is_fatal_error()); return true; } @@ -562,7 +561,7 @@ bool check_pq_select_result_fields(JOIN *join) { DBUG_ENTER("check result fields is suitable for parallel query or not"); MEM_ROOT *pq_check_root = ::new MEM_ROOT(); if (pq_check_root == nullptr) { - DBUG_RETURN(false); + DBUG_RETURN(false); } init_sql_alloc(key_memory_thd_main_mem_root, pq_check_root, @@ -586,7 +585,9 @@ bool check_pq_select_result_fields(JOIN *join) { if (tmp_param == nullptr) { // free the memory free_root(pq_check_root, MYF(0)); - if (pq_check_root) ::delete pq_check_root; + if (pq_check_root) { + ::delete pq_check_root; + } DBUG_RETURN(suit_for_parallel); } @@ -630,7 +631,9 @@ bool check_pq_select_result_fields(JOIN *join) { // free the memory free_root(pq_check_root, MYF(0)); - if (pq_check_root) ::delete pq_check_root; + if (pq_check_root){ + ::delete pq_check_root; + } DBUG_RETURN(suit_for_parallel); } @@ -720,10 +723,9 @@ void set_pq_condition_status(THD *thd) { } bool suite_for_parallel_query(THD *thd) { - if (thd->in_sp_trigger || // store procedure or trigger - thd->m_attachable_trx || // attachable transaction - thd->tx_isolation == ISO_SERIALIZABLE) // serializable without snapshot read - { + if (thd->in_sp_trigger || // store procedure or trigger + thd->m_attachable_trx || // attachable transaction + thd->tx_isolation == ISO_SERIALIZABLE) { // serializable without snapshot read return false; } @@ -731,7 +733,7 @@ bool suite_for_parallel_query(THD *thd) { } bool suite_for_parallel_query(LEX *lex) { - if (lex->in_execute_ps){ + if (lex->in_execute_ps) { return false; } @@ -739,7 +741,7 @@ bool suite_for_parallel_query(LEX *lex) { } bool suite_for_parallel_query(SELECT_LEX_UNIT *unit) { - if (!unit->is_simple()){ + if (!unit->is_simple()) { return false; } @@ -759,10 +761,10 @@ bool suite_for_parallel_query(TABLE_LIST *tbl_list) { TABLE *tb = tbl_list->table; if (tb != nullptr && - (tb->s->tmp_table != NO_TMP_TABLE || // template table + (tb->s->tmp_table != NO_TMP_TABLE || // template table tb->file->ht->db_type != DB_TYPE_INNODB || // Non-InnoDB table tb->part_info || // partition table - tb->fulltext_searched)) { // fulltext match search + tb->fulltext_searched)) { // fulltext match search return false; } return true; @@ -772,8 +774,7 @@ bool suite_for_parallel_query(SELECT_LEX *select) { if (select->first_inner_unit() != nullptr || // nesting subquery, including view〝derived table〝subquery condition and so on. select->outer_select() != nullptr || // nested subquery select->is_distinct() || // select distinct - select->saved_windows_elements) // windows function - { + select->saved_windows_elements) { // windows function return false; } @@ -804,7 +805,7 @@ bool suite_for_parallel_query(JOIN *join) { } QEP_TAB *tab = &join->qep_tab[join->const_tables]; // only support table/index full/range scan - join_type scan_type= tab->type(); + join_type scan_type = tab->type(); if (scan_type != JT_ALL && scan_type != JT_INDEX_SCAN && scan_type != JT_REF && @@ -928,69 +929,69 @@ bool PQCheck::suite_for_parallel_query() { } void PlanReadyPQCheck::set_select_id() { - if (tab && sj_is_materialize_strategy(tab->get_sj_strategy())) { - select_id = tab->sjm_query_block_id(); - } else { - PQCheck::set_select_id(); - } + if (tab && sj_is_materialize_strategy(tab->get_sj_strategy())) { + select_id = tab->sjm_query_block_id(); + } else { + PQCheck::set_select_id(); + } } void PlanReadyPQCheck::set_select_type() { - if (tab && sj_is_materialize_strategy(tab->get_sj_strategy())) { - select_type = enum_explain_type::EXPLAIN_MATERIALIZED; - } else { - PQCheck::set_select_type(); - } + if (tab && sj_is_materialize_strategy(tab->get_sj_strategy())) { + select_type = enum_explain_type::EXPLAIN_MATERIALIZED; + } else { + PQCheck::set_select_type(); + } } bool PlanReadyPQCheck::suite_for_parallel_query() { - for (uint t = 0; t < join->tables; t++) { - tab = join->qep_tab + t; - if (!tab->position()) { - continue; - } + for (uint t = 0; t < join->tables; t++) { + tab = join->qep_tab + t; + if (!tab->position()) { + continue; + } - if (!PQCheck::suite_for_parallel_query()) { - return false; - } + if (!PQCheck::suite_for_parallel_query()) { + return false; } + } - return true; + return true; } bool check_select_id_and_type(SELECT_LEX *select_lex) { - JOIN *join = select_lex->join; - std::unique_ptr check; - bool ret = false; + JOIN *join = select_lex->join; + std::unique_ptr check; + bool ret = false; - if (join == nullptr) { - check.reset(new PQCheck(select_lex)); - goto END; - } - - switch (join->get_plan_state()) { - case JOIN::NO_PLAN: - case JOIN::ZERO_RESULT: - case JOIN::NO_TABLES: { - check.reset(new PQCheck(select_lex)); - break; - } + if (join == nullptr) { + check.reset(new PQCheck(select_lex)); + goto END; + } - case JOIN::PLAN_READY: { - check.reset(new PlanReadyPQCheck(select_lex)); - break; - } + switch (join->get_plan_state()) { + case JOIN::NO_PLAN: + case JOIN::ZERO_RESULT: + case JOIN::NO_TABLES: { + check.reset(new PQCheck(select_lex)); + break; + } - default: - DBUG_ASSERT(0); + case JOIN::PLAN_READY: { + check.reset(new PlanReadyPQCheck(select_lex)); + break; } + default: + DBUG_ASSERT(0); + } + END: - if (check != nullptr) { - ret = check->suite_for_parallel_query(); - } + if (check != nullptr) { + ret = check->suite_for_parallel_query(); + } - return ret; + return ret; } bool check_pq_conditions(THD *thd) {