Skip to content

Commit

Permalink
.*: refine executeQuery (#6043)
Browse files Browse the repository at this point in the history
ref #5900
  • Loading branch information
SeaRise authored Sep 29, 2022
1 parent 1844f6d commit 84a71f0
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 115 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ try
auto start_time = Clock::now();
DAGContext & dag_context = *context.getDAGContext();

BlockIO streams = executeQuery(context, internal, QueryProcessingStage::Complete);
BlockIO streams = executeQuery(context, internal);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);
Expand Down
10 changes: 2 additions & 8 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Flash/Statistics/traverseExecutors.h>
#include <Flash/executeQuery.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <fmt/core.h>
Expand Down Expand Up @@ -273,14 +274,7 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
dag_context->tidb_host = context->getClientInfo().current_address.toString();

context->setDAGContext(dag_context.get());

process_list_entry = context->getProcessList().insert(
dag_context->dummy_query_string,
dag_context->dummy_ast.get(),
context->getClientInfo(),
context->getSettingsRef());

context->setProcessListElement(&process_list_entry->get());
process_list_entry = setProcessListElement(*context, dag_context->dummy_query_string, dag_context->dummy_ast.get());
dag_context->setProcessListEntry(process_list_entry);

if (dag_context->isRootMPPTask())
Expand Down
88 changes: 82 additions & 6 deletions dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,103 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/ProfileEvents.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Planner/PlanQuerySource.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/Quota.h>
#include <Interpreters/executeQuery.h>

namespace ProfileEvents
{
extern const Event Query;
}

namespace DB
{
BlockIO executeQuery(
Context & context,
bool internal,
QueryProcessingStage::Enum stage)
namespace FailPoints
{
extern const char random_interpreter_failpoint[];
} // namespace FailPoints
namespace
{
void prepareForExecute(Context & context)
{
ProfileEvents::increment(ProfileEvents::Query);
context.setQueryContext(context);

QuotaForIntervals & quota = context.getQuota();
quota.addQuery(); /// NOTE Seems that when new time interval has come, first query is not accounted in number of queries.
quota.checkExceeded(time(nullptr));
}

ProcessList::EntryPtr getProcessListEntry(Context & context, const DAGContext & dag_context)
{
if (dag_context.is_mpp_task)
{
/// for MPPTask, process list entry is created in MPPTask::prepare()
RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr");
return dag_context.getProcessListEntry();
}
else
{
RUNTIME_ASSERT(dag_context.getProcessListEntry() == nullptr, "process list entry for non-MPP must be nullptr");
return setProcessListElement(
context,
dag_context.dummy_query_string,
dag_context.dummy_ast.get());
}
}

BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal)
{
RUNTIME_ASSERT(context.getDAGContext());
auto & dag_context = *context.getDAGContext();
const auto & logger = dag_context.log;
RUNTIME_ASSERT(logger);

prepareForExecute(context);

ProcessList::EntryPtr process_list_entry;
if (likely(!internal))
{
process_list_entry = getProcessListEntry(context, dag_context);
logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger);
}

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint);
auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete);
BlockIO res = interpreter->execute();
if (likely(process_list_entry))
(*process_list_entry)->setQueryStreams(res);

/// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry;

prepareForInputStream(context, QueryProcessingStage::Complete, res.in);
if (likely(!internal))
logQueryPipeline(logger, res.in);

dag_context.attachBlockIO(res);
return res;
}
} // namespace

BlockIO executeQuery(Context & context, bool internal)
{
if (context.getSettingsRef().enable_planner)
{
PlanQuerySource plan(context);
return executeQuery(plan, context, internal, stage);
return executeDAG(plan, context, internal);
}
else
{
DAGQuerySource dag(context);
return executeQuery(dag, context, internal, stage);
return executeDAG(dag, context, internal);
}
}
} // namespace DB
7 changes: 2 additions & 5 deletions dbms/src/Flash/executeQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,5 @@

namespace DB
{
BlockIO executeQuery(
Context & context,
bool internal = false,
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete);
}
BlockIO executeQuery(Context & context, bool internal = false);
} // namespace DB
5 changes: 0 additions & 5 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,11 +1223,6 @@ DAGContext * Context::getDAGContext() const
return dag_context;
}

bool Context::isMPPTask() const
{
return dag_context != nullptr && dag_context->is_mpp_task;
}

void Context::setUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ class Context

void setDAGContext(DAGContext * dag_context);
DAGContext * getDAGContext() const;
bool isMPPTask() const;

/// List all queries.
ProcessList & getProcessList();
Expand Down
159 changes: 73 additions & 86 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPReceiverSet.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <Interpreters/IQuerySource.h>
Expand Down Expand Up @@ -87,23 +85,6 @@ LoggerPtr getLogger(const Context & context)
: Logger::get("executeQuery");
}

/// Log query into text log (not into system table).
void logQuery(const String & query, const Context & context, const LoggerPtr & logger)
{
const auto & current_query_id = context.getClientInfo().current_query_id;
const auto & initial_query_id = context.getClientInfo().initial_query_id;
const auto & current_user = context.getClientInfo().current_user;

LOG_FMT_DEBUG(
logger,
"(from {}{}, query_id: {}{}) {}",
context.getClientInfo().current_address.toString(),
(current_user != "default" ? ", user: " + current_user : ""),
current_query_id,
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""),
joinLines(query));
}


/// Call this inside catch block.
void setExceptionStackTrace(QueryLogElement & elem)
Expand Down Expand Up @@ -222,41 +203,9 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

/// Put query to process list. But don't put SHOW PROCESSLIST query itself.
ProcessList::EntryPtr process_list_entry;
if (!internal && !context.isMPPTask() && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
{
process_list_entry = context.getProcessList().insert(
query,
ast.get(),
context.getClientInfo(),
settings);

context.setProcessListElement(&process_list_entry->get());
}
if (context.isMPPTask())
{
if (!context.isTest())
{
/// for MPPTask, process list entry is created in MPPTask::prepare()
RUNTIME_ASSERT(context.getDAGContext()->getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr");
process_list_entry = context.getDAGContext()->getProcessListEntry();
}
else
{
/// it is possible that in test mode, the process list entry is nullptr because some tests run mpp query
/// just based on dag request, there is even no MPPTask at all.
if (context.getDAGContext()->getProcessListEntry() == nullptr)
{
process_list_entry = context.getProcessList().insert(
query,
ast.get(),
context.getClientInfo(),
settings);

context.setProcessListElement(&process_list_entry->get());
}
else
process_list_entry = context.getDAGContext()->getProcessListEntry();
}
process_list_entry = setProcessListElement(context, query, ast.get());
}

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint);
Expand All @@ -272,23 +221,7 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

if (res.in)
{
if (auto * stream = dynamic_cast<IProfilingBlockInputStream *>(res.in.get()))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());

/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);

stream->setLimits(limits);
stream->setQuota(quota);
}
}
prepareForInputStream(context, stage, res.in);
}

if (res.out)
Expand Down Expand Up @@ -420,13 +353,7 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

if (!internal && res.in)
{
auto pipeline_log_str = [&res]() {
FmtBuffer log_buffer;
log_buffer.append("Query pipeline:\n");
res.in->dumpTree(log_buffer);
return log_buffer.toString();
};
LOG_DEBUG(execute_query_logger, pipeline_log_str());
logQueryPipeline(execute_query_logger, res.in);
}
}
}
Expand All @@ -442,6 +369,75 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
} // namespace

/// Log query into text log (not into system table).
void logQuery(const String & query, const Context & context, const LoggerPtr & logger)
{
const auto & current_query_id = context.getClientInfo().current_query_id;
const auto & initial_query_id = context.getClientInfo().initial_query_id;
const auto & current_user = context.getClientInfo().current_user;

LOG_FMT_DEBUG(
logger,
"(from {}{}, query_id: {}{}) {}",
context.getClientInfo().current_address.toString(),
(current_user != "default" ? ", user: " + current_user : ""),
current_query_id,
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""),
joinLines(query));
}

void prepareForInputStream(
Context & context,
QueryProcessingStage::Enum stage,
const BlockInputStreamPtr & in)
{
assert(in);
if (auto * stream = dynamic_cast<IProfilingBlockInputStream *>(in.get()))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());

/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
const auto & settings = context.getSettingsRef();
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);

stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
}

std::shared_ptr<ProcessListEntry> setProcessListElement(
Context & context,
const String & query,
const IAST * ast)
{
assert(ast);
auto process_list_entry = context.getProcessList().insert(
query,
ast,
context.getClientInfo(),
context.getSettingsRef());
context.setProcessListElement(&process_list_entry->get());
return process_list_entry;
}

void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in)
{
assert(in);
auto pipeline_log_str = [&in]() {
FmtBuffer log_buffer;
log_buffer.append("Query pipeline:\n");
in->dumpTree(log_buffer);
return log_buffer.toString();
};
LOG_DEBUG(logger, pipeline_log_str());
}

BlockIO executeQuery(
const String & query,
Expand All @@ -456,15 +452,6 @@ BlockIO executeQuery(
}


BlockIO executeQuery(IQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage)
{
BlockIO streams;
std::tie(std::ignore, streams) = executeQueryImpl(dag, context, internal, stage);
context.getDAGContext()->attachBlockIO(streams);
return streams;
}


void executeQuery(
ReadBuffer & istr,
WriteBuffer & ostr,
Expand Down
Loading

0 comments on commit 84a71f0

Please sign in to comment.