Skip to content

Commit

Permalink
Little refinements on DAG code (#3482)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzhe1989 authored Nov 21, 2021
1 parent c3c0bb7 commit 9fca53d
Show file tree
Hide file tree
Showing 13 changed files with 934 additions and 964 deletions.
3 changes: 1 addition & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,7 @@ void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t co
astToPB(input, child_ast, child, collator_id, context);
}
// for like need to add the third argument
tipb::Expr * constant_expr = expr->add_children();
constructInt64LiteralTiExpr(*constant_expr, 92);
*expr->add_children() = constructInt64LiteralTiExpr(92);
return;
}
case tipb::ScalarFuncSig::FromUnixTime2Arg:
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Flash/BatchCoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ grpc::Status BatchCoprocessorHandler::execute()
SCOPE_EXIT(
{ GET_METRIC(tiflash_coprocessor_handling_request_count, type_super_batch_cop_dag).Decrement(); });

const auto dag_request = ({
tipb::DAGRequest dag_req;
getDAGRequestFromStringWithRetry(dag_req, cop_request->data());
std::move(dag_req);
});
auto dag_request = getDAGRequestFromStringWithRetry(cop_request->data());
RegionInfoMap regions;
RegionInfoList retry_regions;
for (auto & r : cop_request->regions())
Expand Down
236 changes: 110 additions & 126 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp

Large diffs are not rendered by default.

13 changes: 3 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
Context & context_,
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
bool keep_session_timezone_info_,
const DAGQuerySource & dag_,
std::vector<SubqueriesForSets> & subqueries_for_sets_,
Expand All @@ -50,6 +51,7 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
, query_block(query_block_)
, keep_session_timezone_info(keep_session_timezone_info_)
, rqst(dag_.getDAGRequest())
, max_streams(max_streams_)
, dag(dag_)
, subqueries_for_sets(subqueries_for_sets_)
, exchange_receiver_map(exchange_receiver_map_)
Expand All @@ -60,15 +62,6 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
for (const auto & condition : query_block.selection->selection().conditions())
conditions.push_back(&condition);
}
const Settings & settings = context.getSettingsRef();
if (dag.isBatchCop())
max_streams = settings.max_threads;
else
max_streams = 1;
if (max_streams > 1)
{
max_streams *= settings.max_streams_to_max_threads_ratio;
}
}

BlockInputStreamPtr combinedNonJoinedDataStream(DAGPipeline & pipeline, size_t max_threads, const LogWithPrefixPtr & log)
Expand Down Expand Up @@ -209,7 +202,7 @@ AnalysisResult analyzeExpressions(
query_block.output_field_types,
query_block.output_offsets,
query_block.qb_column_prefix,
keep_session_timezone_info || !query_block.isRootQueryBlock());
keep_session_timezone_info);

res.before_order_and_select = chain.getLastActions();

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DAGQueryBlockInterpreter
Context & context_,
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
bool keep_session_timezone_info_,
const DAGQuerySource & dag_,
std::vector<SubqueriesForSets> & subqueries_for_sets_,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ DAGQuerySource::DAGQuerySource(
const RegionInfoList & regions_for_remote_read_,
const tipb::DAGRequest & dag_request_,
const LogWithPrefixPtr & log_,
const bool is_batch_cop_)
const bool is_batch_cop_or_mpp_)
: context(context_)
, regions(regions_)
, regions_for_remote_read(regions_for_remote_read_)
, dag_request(dag_request_)
, is_batch_cop(is_batch_cop_)
, is_batch_cop_or_mpp(is_batch_cop_or_mpp_)
, log(log_)
{
if (dag_request.has_root_executor())
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class DAGQuerySource : public IQuerySource
const RegionInfoList & regions_needs_remote_read_,
const tipb::DAGRequest & dag_request_,
const LogWithPrefixPtr & log_,
const bool is_batch_cop_ = false);
const bool is_batch_cop_or_mpp_ = false);

std::tuple<std::string, ASTPtr> parse(size_t) override;
String str(size_t max_query_size) override;
Expand All @@ -42,7 +42,7 @@ class DAGQuerySource : public IQuerySource
const RegionInfoMap & getRegions() const { return regions; }
const RegionInfoList & getRegionsForRemoteRead() const { return regions_for_remote_read; }

bool isBatchCop() const { return is_batch_cop; }
bool isBatchCopOrMpp() const { return is_batch_cop_or_mpp; }

DAGContext & getDAGContext() const { return *context.getDAGContext(); }

Expand All @@ -64,7 +64,7 @@ class DAGQuerySource : public IQuerySource
std::shared_ptr<DAGQueryBlock> root_query_block;
ASTPtr ast;

const bool is_batch_cop;
const bool is_batch_cop_or_mpp;

LogWithPrefixPtr log;
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ DAGStorageInterpreter::DAGStorageInterpreter(

void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
{
if (dag.isBatchCop())
if (dag.isBatchCopOrMpp())
learner_read_snapshot = doBatchCopLearnerRead();
else
learner_read_snapshot = doCopLearnerRead();
Expand Down Expand Up @@ -285,7 +285,7 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block
catch (RegionException & e)
{
/// Recover from region exception when super batch is enable
if (dag.isBatchCop())
if (dag.isBatchCopOrMpp())
{
// clean all streams from local because we are not sure the correctness of those streams
pipeline.streams.clear();
Expand Down
Loading

0 comments on commit 9fca53d

Please sign in to comment.