Skip to content

Commit

Permalink
[Bugfix](topn-two-phase) using a rpc flag to indicate whether we are …
Browse files Browse the repository at this point in the history
…in two phase read procedure (apache#1294)

Fix using block column name to judge condition since column name may be lost and modified
  • Loading branch information
eldenmoon authored Dec 28, 2022
1 parent 9fef1a2 commit 561fddc
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 62 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Segment : public std::enable_shared_from_this<Segment> {

RowsetId rowset_id() const { return _rowset_id; }

RowsetId rowset_id() const { return _rowset_id; }
// RowsetId rowset_id() const { return _rowset_id; }

uint32_t num_rows() const { return _footer.num_rows(); }

Expand Down
5 changes: 0 additions & 5 deletions be/src/vec/common/sort/heap_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ Status HeapSorter::append_block(Block* block) {
if (_vsort_exec_exprs.need_materialize_tuple()) {
auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
// If block contains ROWID_COL column, indicating topn two phase
// read enabled, so we should not ignore this column
if (block->try_get_by_name(BeConsts::ROWID_COL)) {
valid_column_ids.push_back(block->columns() - 1);
}
for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(block, &valid_column_ids[i]));
}
Expand Down
5 changes: 0 additions & 5 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block) {
if (_materialize_sort_exprs) {
auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
// If block contains ROWID_COL column, indicating topn two phase
// read enabled, so we should not ignore this column
if (src_block.try_get_by_name(BeConsts::ROWID_COL)) {
valid_column_ids.push_back(src_block.columns() - 1);
}
for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, &valid_column_ids[i]));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
Status VScanner::_filter_output_block(Block* block) {
auto old_rows = block->rows();
Status st =
VExprContext::filter_block(_vconjunct_ctx, block, _output_tuple_desc->slots().size());
VExprContext::filter_block(_vconjunct_ctx, block, block->columns());
_counter.num_rows_unselected += old_rows - block->rows();
return st;
}
Expand Down
45 changes: 25 additions & 20 deletions be/src/vec/exec/vexchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {
if (tnode.exchange_node.__isset.nodes_info) {
_nodes_info = _pool->add(new DorisNodesInfo(tnode.exchange_node.nodes_info));
}
_scan_node_tuple_desc = state->desc_tbl().get_tuple_descriptor(tnode.olap_scan_node.tuple_id);
_use_two_phase_read = tnode.exchange_node.sort_info.use_two_phase_read;
return Status::OK();
}

Expand Down Expand Up @@ -90,27 +90,32 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
}

Status VExchangeNode::second_phase_fetch_data(RuntimeState* state, Block* final_block) {
auto row_id_col = final_block->try_get_by_name(BeConsts::ROWID_COL);
if (row_id_col != nullptr && final_block->rows() > 0) {
MonotonicStopWatch watch;
watch.start();
RowIDFetcher id_fetcher(_scan_node_tuple_desc, state);
RETURN_IF_ERROR(id_fetcher.init(_nodes_info));
vectorized::Block materialized_block(_scan_node_tuple_desc->slots(), final_block->rows());
auto tmp_block = MutableBlock::build_mutable_block(&materialized_block);
// fetch will sort block by sequence of ROWID_COL
RETURN_IF_ERROR(id_fetcher.fetch(row_id_col->column, &tmp_block));
materialized_block.swap(tmp_block.to_block());

// materialize by name
for (auto& column_type_name : *final_block) {
auto materialized_column = materialized_block.try_get_by_name(column_type_name.name);
if (materialized_column != nullptr) {
column_type_name.column = std::move(materialized_column->column);
}
if (!_use_two_phase_read) {
return Status::OK();
}
if (final_block->rows() == 0) {
return Status::OK();
}
auto row_id_col = final_block->get_by_position(final_block->columns() - 1);
MonotonicStopWatch watch;
watch.start();
auto tuple_desc = _row_descriptor.tuple_descriptors()[0];
RowIDFetcher id_fetcher(tuple_desc, state);
RETURN_IF_ERROR(id_fetcher.init(_nodes_info));
vectorized::Block materialized_block(tuple_desc->slots(), final_block->rows());
auto tmp_block = MutableBlock::build_mutable_block(&materialized_block);
// fetch will sort block by sequence of ROWID_COL
RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, &tmp_block));
materialized_block.swap(tmp_block.to_block());

// materialize by name
for (auto& column_type_name : *final_block) {
auto materialized_column = materialized_block.try_get_by_name(column_type_name.name);
if (materialized_column != nullptr) {
column_type_name.column = std::move(materialized_column->column);
}
LOG(INFO) << "fetch_id finished, cost(ms):" << watch.elapsed_time() / 1000 / 1000;
}
LOG(INFO) << "fetch_id finished, cost(ms):" << watch.elapsed_time() / 1000 / 1000;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/vexchange_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class VExchangeNode : public ExecNode {

// for fetch data by rowids
DorisNodesInfo* _nodes_info = nullptr;
const TupleDescriptor* _scan_node_tuple_desc;
bool _use_two_phase_read = false;
};
} // namespace vectorized
} // namespace doris
9 changes: 4 additions & 5 deletions be/src/vec/exec/vsort_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
_is_asc_order = tnode.sort_node.sort_info.is_asc_order;
_nulls_first = tnode.sort_node.sort_info.nulls_first;
_use_topn_opt = tnode.sort_node.use_topn_opt;
_use_two_phase_read = tnode.sort_node.sort_info.use_two_phase_read;
const auto& row_desc = child(0)->row_desc();
// If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap sort in priority.
// To do heap sorting, each income block will be filtered by heap-top row. There will be some
Expand All @@ -61,7 +62,6 @@ Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
}

_sorter->init_profile(_runtime_profile.get());
_scan_node_tuple_desc = state->desc_tbl().get_tuple_descriptor(tnode.olap_scan_node.tuple_id);
return Status::OK();
}

Expand Down Expand Up @@ -95,11 +95,10 @@ Status VSortNode::open(RuntimeState* state) {
child(0)->get_next_after_projects(state, upstream_block.get(), &eos),
child(0)->get_next_span(), eos);
if (upstream_block->rows() != 0) {
// If block contains ROWID_COL column, indicating topn two phase
// read enabled, some columns maybe pruned in the scann nodes
// Indicating topn two phase read enabled, some columns maybe pruned in the scann nodes
// to ensure everything goes well, we mock some columns for later usage
// those columns will be read in the second phase when everything's ready
if (upstream_block->try_get_by_name(BeConsts::ROWID_COL)) {
if (_use_two_phase_read) {
// We must not reuse upstream_block,since it's rebuilded.
_reuse_mem = false;
_rebuild_block(upstream_block.get());
Expand Down Expand Up @@ -136,7 +135,7 @@ Status VSortNode::open(RuntimeState* state) {
// find by name
void VSortNode::_rebuild_block(Block* block) {
Block new_block;
for (auto slot : _scan_node_tuple_desc->slots()) {
for (auto slot : child(0)->row_desc().tuple_descriptors()[0]->slots()) {
auto type_column = block->try_get_by_name(slot->col_name());
if (!type_column) {
auto type = slot->get_data_type_ptr();
Expand Down
3 changes: 1 addition & 2 deletions be/src/vec/exec/vsort_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ class VSortNode : public doris::ExecNode {

bool _reuse_mem;
bool _use_topn_opt = false;
bool _use_two_phase_read = false;

std::unique_ptr<Sorter> _sorter;

const TupleDescriptor* _scan_node_tuple_desc = nullptr;

static constexpr size_t ACCUMULATED_PARTIAL_SORT_THRESHOLD = 256;

RuntimeState* _runtime_state = nullptr;
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class SortInfo {
// Input expressions materialized into sortTupleDesc_. One expr per slot in
// sortTupleDesc_.
private List<Expr> sortTupleSlotExprs;
private boolean useTwoPhaseRead = false;

public SortInfo(List<Expr> orderingExprs, List<Boolean> isAscOrder,
List<Boolean> nullsFirstParams) {
Expand Down Expand Up @@ -150,6 +151,14 @@ public void setSortTupleDesc(TupleDescriptor tupleDesc) {
sortTupleDesc = tupleDesc;
}

public void setUseTwoPhaseRead() {
useTwoPhaseRead = true;
}

public boolean useTwoPhaseRead() {
return useTwoPhaseRead;
}

public TupleDescriptor getSortTupleDescriptor() {
return sortTupleDesc;
}
Expand Down Expand Up @@ -311,6 +320,9 @@ public TSortInfo toThrift() {
Expr.treesToThrift(orderingExprs),
isAscOrder,
nullsFirstParams);
if (useTwoPhaseRead) {
sortInfo.setUseTwoPhaseRead(true);
}
return sortInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1286,11 +1286,6 @@ private PlanFragment createOrderByFragment(
exchNode.setMergeInfo(node.getSortInfo());
exchNode.setOffset(offset);

// Set nodes info for the fecond phase fetch data by rowids
if (node.isUseTopNTwoPhaseOptimize()) {
exchNode.createNodesInfo();
}

// Child nodes should not process the offset. If there is a limit,
// the child nodes need only return (offset + limit) rows.
SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package org.apache.doris.planner;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SortInfo;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
Expand All @@ -37,7 +36,6 @@
import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TSortInfo;

import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
Expand Down Expand Up @@ -72,9 +70,6 @@ public class ExchangeNode extends PlanNode {
// only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
private long offset;

// Used send rpc to fetch data by RowIds
TPaloNodesInfo nodesInfo;

/**
* Create ExchangeNode that consumes output of inputNode.
* An ExchangeNode doesn't have an input node as a child, which is why we
Expand Down Expand Up @@ -166,15 +161,12 @@ protected void toThrift(TPlanNode msg) {
msg.exchange_node.addToInputRowTuples(tid.asInt());
}
if (mergeInfo != null) {
TSortInfo sortInfo = new TSortInfo(
Expr.treesToThrift(mergeInfo.getOrderingExprs()),
mergeInfo.getIsAscOrder(), mergeInfo.getNullsFirst());
msg.exchange_node.setSortInfo(sortInfo);
msg.exchange_node.setSortInfo(mergeInfo.toThrift());
msg.exchange_node.setOffset(offset);
}
// nodeinfos for second phase fetch rows
if (nodesInfo != null) {
msg.exchange_node.setNodesInfo(nodesInfo);

if (mergeInfo.useTwoPhaseRead()) {
msg.exchange_node.setNodesInfo(createNodesInfo());
}
}
}

Expand All @@ -195,12 +187,13 @@ public int getNumInstances() {
* Set the parameters used to fetch data by rowid column
* after init().
*/
public void createNodesInfo() {
nodesInfo = new TPaloNodesInfo();
private TPaloNodesInfo createNodesInfo() {
TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) {
Backend backend = systemInfoService.getBackend(id);
nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
}
return nodesInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,9 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
if (useTopnOpt) {
output.append(prefix).append("TOPN OPT\n");
}
if (sortInfo.useTwoPhaseRead()) {
output.append(prefix).append("OPT TWO PHASE\n");
}
if (!conjuncts.isEmpty()) {
output.append(prefix).append("PREDICATES: ").append(getExplainString(conjuncts)).append("\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,9 @@ private void pushDownResultFileSink(Analyzer analyzer) {
}


private void injectRowIdColumnSlot(Analyzer analyzer, TupleDescriptor tupleDesc) {
private SlotDescriptor injectRowIdColumnSlot(Analyzer analyzer, TupleDescriptor tupleDesc) {
SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(tupleDesc);
LOG.debug("inject slot {}", slotDesc);
String name = Column.ROWID_COL;
Column col = new Column(name, Type.STRING, false, null, false, "",
"rowid column");
Expand All @@ -356,6 +357,7 @@ private void injectRowIdColumnSlot(Analyzer analyzer, TupleDescriptor tupleDesc)
// Non-nullable slots will have 0 for the byte offset and -1 for the bit mask
slotDesc.setNullIndicatorBit(-1);
slotDesc.setNullIndicatorByte(0);
return slotDesc;
}

/**
Expand Down Expand Up @@ -386,9 +388,12 @@ private void pushSortToOlapScan(Analyzer analyzer) {
// and reconconstruct the final block
if (!addedRowIdColumn && sortNode.isUseTopNTwoPhaseOptimize()) {
// fragment.setParallelExecNum(1);
injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor());
addedRowIdColumn = true;
SlotRef extSlot = new SlotRef(slot);
sortNode.getResolvedTupleExprs().add(extSlot);
sortNode.getSortInfo().setUseTwoPhaseRead();
}

if (!scanNode.checkPushSort(sortNode)) {
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ struct TSortInfo {

// Indicates the nullable info of sort_tuple_slot_exprs is changed after substitute by child's smap
5: optional list<bool> slot_exprs_nullability_changed_flags
// Indicates whether topn query using two phase read
6: optional bool use_two_phase_read
}

enum TPushAggOp {
Expand Down

0 comments on commit 561fddc

Please sign in to comment.