Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for parallel queries #232

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions sql/basic_row_iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define SQL_BASIC_ROW_ITERATORS_H_

/* Copyright (c) 2018, 2021, Oracle and/or its affiliates.
Copyright (c) 2022, Huawei Technologies Co., Ltd.

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
Expand Down Expand Up @@ -41,6 +42,7 @@
#include "sql/mem_root_array.h"
#include "sql/row_iterator.h"
#include "sql/sql_list.h"
#include "filesort.h"

class Filesort_info;
class Item;
Expand All @@ -50,6 +52,80 @@ class THD;
class handler;
struct IO_CACHE;
struct TABLE;
class Gather_operator;
class ORDER;
class MQ_record_gather;
class QEP_TAB;

/**
* Parallel scan iterator, which is used in parallel leader
*/
class ParallelScanIterator final : public TableRowIterator {
public:
ParallelScanIterator(THD *thd, QEP_TAB *tab, TABLE *table,
ha_rows *examined_rows, JOIN *join,
Gather_operator *gather, bool stab_output = false,
uint ref_length = 0);

~ParallelScanIterator() override;

bool Init() override;
int Read() override;
int End() override;
void UnlockRow() override {}
void SetNullRowFlag(bool) override {}
void StartPSIBatchMode() override {}
void EndPSIBatchModeIfStarted() override {}

private:
uchar *const m_record;
ha_rows *const m_examined_rows;
uint m_dop;
JOIN *m_join;
Gather_operator *m_gather;
MQ_record_gather *m_record_gather;
ORDER *m_order; /** use for records merge sort */
QEP_TAB *m_tab;
bool m_stable_sort; /** determine whether using stable sort */
uint m_ref_length;

/** construct filesort on leader when needing stab_output or merge_sort */
bool pq_make_filesort(Filesort **sort);
/** init m_record_gather */
bool pq_init_record_gather();
/** launch worker threads to execute parallel query */
bool pq_launch_worker();
/** wait all workers finished */
void pq_wait_workers_finished();
/** outoput parallel query error code */
int pq_error_code();
};

class PQ_worker_manager;

/**
* block scan iterator, which is used is in parallel worker.
* a whole talbe is cut into many blocks for parallel scan
*/
class PQblockScanIterator final : public TableRowIterator {
public:
PQblockScanIterator(THD *thd, TABLE *table, uchar *record,
ha_rows *examined_rows, Gather_operator *gather,
bool need_rowid = false);
~PQblockScanIterator() override;

bool Init() override;
int Read() override;
int End() override;

private:
uchar *const m_record;
ha_rows *const m_examined_rows;
void *m_pq_ctx; // parallel query context
uint keyno;
Gather_operator *m_gather;
bool m_need_rowid;
};

/**
Scan a table from beginning to end.
Expand Down
15 changes: 14 additions & 1 deletion sql/composite_iterators.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* Copyright (c) 2018, 2021, Oracle and/or its affiliates.
Copyright (c) 2022, Huawei Technologies Co., Ltd.

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
Expand Down Expand Up @@ -179,6 +180,10 @@ int LimitOffsetIterator::Read() {
return result;
}

int LimitOffsetIterator::End() {
return m_source->End();
}

AggregateIterator::AggregateIterator(
THD *thd, unique_ptr_destroy_only<RowIterator> source, JOIN *join,
TableCollection tables, bool rollup)
Expand All @@ -193,7 +198,7 @@ AggregateIterator::AggregateIterator(
}

bool AggregateIterator::Init() {
assert(!m_join->tmp_table_param.precomputed_group_by);
assert(!m_join->tmp_table_param->precomputed_group_by);

// Disable any leftover rollup items used in children.
m_current_rollup_position = -1;
Expand All @@ -214,6 +219,10 @@ bool AggregateIterator::Init() {
return false;
}

int AggregateIterator::End() {
return m_source->End();
}

int AggregateIterator::Read() {
switch (m_state) {
case READING_FIRST_ROW: {
Expand Down Expand Up @@ -1187,6 +1196,10 @@ int TemptableAggregateIterator::Read() {
return m_table_iterator->Read();
}

int TemptableAggregateIterator::End() {
return m_subquery_iterator->End();
}

MaterializedTableFunctionIterator::MaterializedTableFunctionIterator(
THD *thd, Table_function *table_function, TABLE *table,
unique_ptr_destroy_only<RowIterator> table_iterator)
Expand Down
20 changes: 20 additions & 0 deletions sql/composite_iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define SQL_COMPOSITE_ITERATORS_INCLUDED

/* Copyright (c) 2018, 2021, Oracle and/or its affiliates.
Copyright (c) 2022, Huawei Technologies Co., Ltd.

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
Expand Down Expand Up @@ -81,6 +82,8 @@ class FilterIterator final : public RowIterator {

int Read() override;

int End() override { return m_source->End(); }

void SetNullRowFlag(bool is_null_row) override {
m_source->SetNullRowFlag(is_null_row);
}
Expand Down Expand Up @@ -135,6 +138,8 @@ class LimitOffsetIterator final : public RowIterator {

int Read() override;

int End() override;

void SetNullRowFlag(bool is_null_row) override {
m_source->SetNullRowFlag(is_null_row);
}
Expand Down Expand Up @@ -205,6 +210,8 @@ class AggregateIterator final : public RowIterator {

bool Init() override;
int Read() override;
int End() override;

void SetNullRowFlag(bool is_null_row) override {
m_source->SetNullRowFlag(is_null_row);
}
Expand Down Expand Up @@ -535,6 +542,13 @@ class MaterializeIterator final : public TableRowIterator {
bool Init() override;
int Read() override;

int End() override {
for (auto &qb : m_query_blocks_to_materialize) {
qb.subquery_iterator->End();
}
return thd()->is_worker() ? -1 : 1;
}

void SetNullRowFlag(bool is_null_row) override {
m_table_iterator->SetNullRowFlag(is_null_row);
}
Expand Down Expand Up @@ -654,6 +668,10 @@ class StreamingIterator final : public TableRowIterator {

int Read() override;

int End() override {
return m_subquery_iterator->End();
}

void StartPSIBatchMode() override {
m_subquery_iterator->StartPSIBatchMode();
}
Expand Down Expand Up @@ -691,6 +709,8 @@ class TemptableAggregateIterator final : public TableRowIterator {

bool Init() override;
int Read() override;
int End() override;

void SetNullRowFlag(bool is_null_row) override {
m_table_iterator->SetNullRowFlag(is_null_row);
}
Expand Down
6 changes: 3 additions & 3 deletions sql/exchange_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Exchange_sort : public Exchange {

/** get the k-th record in m_min_records */
mq_record_st *get_record(int k) {
DBUG_ASSERT(0 <= k && k < lanuch_workers());
assert(0 <= k && k < lanuch_workers());
mq_record_st *record = m_min_records[k];
return record;
}
Expand All @@ -120,12 +120,12 @@ class Exchange_sort : public Exchange {
inline const Filesort *get_filesort() { return m_sort; }

inline uchar *get_row_id(int i) {
DBUG_ASSERT(0 <= i && i < 2);
assert(0 <= i && i < 2);
return row_id[i];
}

inline uchar *get_key(int i) {
DBUG_ASSERT(0 <= i && i < 2);
assert(0 <= i && i < 2);
return keys[i];
}

Expand Down
120 changes: 117 additions & 3 deletions sql/filesort.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Copyright (c) 2022, Huawei Technologies Co., Ltd.

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
Expand Down Expand Up @@ -77,6 +78,7 @@
#include "sql/debug_sync.h"
#include "sql/derror.h"
#include "sql/error_handler.h"
#include "sql/exchange_sort.h"
#include "sql/field.h"
#include "sql/filesort_utils.h"
#include "sql/handler.h"
Expand Down Expand Up @@ -601,8 +603,7 @@ bool filesort(THD *thd, Filesort *filesort, RowIterator *source_iterator,
else
sort_mode.append("fixed_sort_key");
sort_mode.append(", ");
sort_mode.append(param->using_packed_addons()
? "packed_additional_fields"
sort_mode.append(param->using_packed_addons() ? "packed_additional_fields"
: param->using_addon_fields() ? "additional_fields"
: "rowid");
sort_mode.append(">");
Expand Down Expand Up @@ -683,6 +684,7 @@ Filesort::Filesort(THD *thd, Mem_root_array<TABLE *> tables_arg,
bool sort_positions, bool unwrap_rollup)
: m_thd(thd),
tables(std::move(tables_arg)),
m_order(order),
keep_buffers(keep_buffers_arg),
limit(limit_arg),
sortorder(nullptr),
Expand All @@ -691,7 +693,11 @@ Filesort::Filesort(THD *thd, Mem_root_array<TABLE *> tables_arg,
force_stable_sort), // keep relative order of equiv. elts
m_remove_duplicates(remove_duplicates),
m_force_sort_positions(sort_positions),
m_sort_order_length(make_sortorder(order, unwrap_rollup)) {}
m_sort_order_length(0) {
if (order) {
m_sort_order_length = make_sortorder(order, unwrap_rollup);
}
}

uint Filesort::make_sortorder(ORDER *order, bool unwrap_rollup) {
uint count;
Expand Down Expand Up @@ -2314,3 +2320,111 @@ void change_double_for_sort(double nr, uchar *to) {
swap(to[3], to[4]);
#endif
}

/**
* compare table->record[0] of two workers in PQ_merge_sort
* @a: the ID of first worker
* @b: the ID of second worker
* @arg: PQ_merge sort
* @return
* true if a's record is less than b's record;
* false otherwise.
*/
bool heap_compare_records(int a, int b, void *arg) {
assert(arg);
bool convert_res;

Exchange_sort *merge_sort = static_cast<Exchange_sort *>(arg);
const Filesort *filesort = merge_sort->get_filesort();
THD *thd = merge_sort->get_thd();
assert(filesort && current_thd == thd);

uchar *row_id_0 = merge_sort->get_row_id(0);
uchar *row_id_1 = merge_sort->get_row_id(1);
uchar *key_0 = merge_sort->get_key(0);
uchar *key_1 = merge_sort->get_key(1);

/** using previous old table when comparing row_id (or PK) */
handler *file = merge_sort->get_file();
assert(file->ht->db_type == DB_TYPE_INNODB);
#if !defined(NDEBUG)
uint ref_len = merge_sort->ref_length();
assert(ref_len == file->ref_length);
#endif
bool force_stable_sort = merge_sort->is_stable();

Sort_param *sort_param = merge_sort->get_sort_param();
int key_len = 0, compare_len = 0;

if (sort_param) {
key_len = sort_param->max_record_length() + 1;
compare_len = sort_param->max_compare_length();
}

/**
* the compare process contains the following three steps:
* 1. copy to table->record[0]
* 2. add row_id info.
* 3. generate sort key
*/
mq_record_st *compare_a = merge_sort->get_record(a);
convert_res = merge_sort->convert_mq_data_to_record(
compare_a->m_data, compare_a->m_length, row_id_0);

// there is an error during execution
if (!convert_res || DBUG_EVALUATE_IF("pq_msort_error6", true, false)) {
thd->pq_error = true;
return true;
}

/*
* using row_id to achieve stable sort, i.e.,
* record1 < record2 <=> key1 < key2 or (key1 = key2 && row_id1 < row_id2)
*/
if (sort_param) {
sort_param->make_sortkey(key_0, key_len, filesort->tables);
}

mq_record_st *compare_b = merge_sort->get_record(b);
convert_res = merge_sort->convert_mq_data_to_record(
compare_b->m_data, compare_b->m_length, row_id_1);

// there is an error during execution
if (!convert_res || DBUG_EVALUATE_IF("pq_msort_error7", true, false)) {
thd->pq_error = true;
return true;
}

if (sort_param) {
sort_param->make_sortkey(key_1, key_len, filesort->tables);
}

// c1: table scan (or index scan with optimized order = nullptr)
if (!filesort->sortorder) {
assert(sort_param == nullptr && force_stable_sort);
assert(row_id_0 && row_id_1);
return file->cmp_ref(row_id_0, row_id_1) < 0;
} else {
int cmp_key_result;
// c2: with order
if (sort_param != nullptr && sort_param->using_varlen_keys()) {
cmp_varlen_keys(sort_param->local_sortorder, sort_param->use_hash, key_0,
key_1, &cmp_key_result);
if (!force_stable_sort) {
return cmp_key_result < 0;
} else {
assert(row_id_0 && row_id_1);
return (cmp_key_result < 0 ||
(cmp_key_result == 0 && file->cmp_ref(row_id_0, row_id_1) < 0));
}
} else {
int cmp = memcmp(key_0, key_1, compare_len);
if (!force_stable_sort) {
return cmp < 0;
} else {
assert(row_id_0 && row_id_1);
return (cmp < 0 || (cmp == 0 && file->cmp_ref(row_id_0, row_id_1) < 0));
}
}
}
}
Loading