Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.*: refine executeQuery #6043

Merged
merged 5 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ try
auto start_time = Clock::now();
DAGContext & dag_context = *context.getDAGContext();

BlockIO streams = executeQuery(context, internal, QueryProcessingStage::Complete);
BlockIO streams = executeQuery(context, internal);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);
Expand Down
10 changes: 2 additions & 8 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Flash/Statistics/traverseExecutors.h>
#include <Flash/executeQuery.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <fmt/core.h>
Expand Down Expand Up @@ -273,14 +274,7 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
dag_context->tidb_host = context->getClientInfo().current_address.toString();

context->setDAGContext(dag_context.get());

process_list_entry = context->getProcessList().insert(
dag_context->dummy_query_string,
dag_context->dummy_ast.get(),
context->getClientInfo(),
context->getSettingsRef());

context->setProcessListElement(&process_list_entry->get());
process_list_entry = setProcessListElement(*context, dag_context->dummy_query_string, dag_context->dummy_ast.get());
dag_context->setProcessListEntry(process_list_entry);

if (dag_context->isRootMPPTask())
Expand Down
88 changes: 82 additions & 6 deletions dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,103 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/ProfileEvents.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Planner/PlanQuerySource.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/Quota.h>
#include <Interpreters/executeQuery.h>

namespace ProfileEvents
{
extern const Event Query;
}

namespace DB
{
BlockIO executeQuery(
Context & context,
bool internal,
QueryProcessingStage::Enum stage)
namespace FailPoints
{
extern const char random_interpreter_failpoint[];
} // namespace FailPoints
namespace
{
void prepareForExecute(Context & context)
{
ProfileEvents::increment(ProfileEvents::Query);
context.setQueryContext(context);

QuotaForIntervals & quota = context.getQuota();
quota.addQuery(); /// NOTE Seems that when new time interval has come, first query is not accounted in number of queries.
quota.checkExceeded(time(nullptr));
}

ProcessList::EntryPtr getProcessListEntry(Context & context, const DAGContext & dag_context)
{
if (dag_context.is_mpp_task)
{
/// for MPPTask, process list entry is created in MPPTask::prepare()
RUNTIME_ASSERT(dag_context.getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr");
return dag_context.getProcessListEntry();
}
else
{
RUNTIME_ASSERT(dag_context.getProcessListEntry() == nullptr, "process list entry for non-MPP must be nullptr");
return setProcessListElement(
context,
dag_context.dummy_query_string,
dag_context.dummy_ast.get());
}
}

BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the IQuerySource related code since we don't reuse executeQuery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but here is some relevant code that needs to be cleaned up.
We can remove the IQuerySource in the next pr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, the code related to the DAGQuerySource, perhaps we could remove the IQuerySource along with the DAGQueryBlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

{
RUNTIME_ASSERT(context.getDAGContext());
auto & dag_context = *context.getDAGContext();
const auto & logger = dag_context.log;
RUNTIME_ASSERT(logger);

prepareForExecute(context);

ProcessList::EntryPtr process_list_entry;
if (likely(!internal))
{
process_list_entry = getProcessListEntry(context, dag_context);
logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger);
}

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint);
auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete);
BlockIO res = interpreter->execute();
if (likely(process_list_entry))
(*process_list_entry)->setQueryStreams(res);

/// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry;

prepareForInputStream(context, QueryProcessingStage::Complete, res.in);
if (likely(!internal))
logQueryPipeline(logger, res.in);

dag_context.attachBlockIO(res);
return res;
}
} // namespace

BlockIO executeQuery(Context & context, bool internal)
{
if (context.getSettingsRef().enable_planner)
{
PlanQuerySource plan(context);
return executeQuery(plan, context, internal, stage);
return executeDAG(plan, context, internal);
}
else
{
DAGQuerySource dag(context);
return executeQuery(dag, context, internal, stage);
return executeDAG(dag, context, internal);
}
}
} // namespace DB
7 changes: 2 additions & 5 deletions dbms/src/Flash/executeQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,5 @@

namespace DB
{
BlockIO executeQuery(
Context & context,
bool internal = false,
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete);
}
BlockIO executeQuery(Context & context, bool internal = false);
} // namespace DB
5 changes: 0 additions & 5 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,11 +1223,6 @@ DAGContext * Context::getDAGContext() const
return dag_context;
}

bool Context::isMPPTask() const
{
return dag_context != nullptr && dag_context->is_mpp_task;
}

void Context::setUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ class Context

void setDAGContext(DAGContext * dag_context);
DAGContext * getDAGContext() const;
bool isMPPTask() const;

/// List all queries.
ProcessList & getProcessList();
Expand Down
159 changes: 73 additions & 86 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPReceiverSet.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <Interpreters/IQuerySource.h>
Expand Down Expand Up @@ -87,23 +85,6 @@ LoggerPtr getLogger(const Context & context)
: Logger::get("executeQuery");
}

/// Log query into text log (not into system table).
void logQuery(const String & query, const Context & context, const LoggerPtr & logger)
{
const auto & current_query_id = context.getClientInfo().current_query_id;
const auto & initial_query_id = context.getClientInfo().initial_query_id;
const auto & current_user = context.getClientInfo().current_user;

LOG_FMT_DEBUG(
logger,
"(from {}{}, query_id: {}{}) {}",
context.getClientInfo().current_address.toString(),
(current_user != "default" ? ", user: " + current_user : ""),
current_query_id,
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""),
joinLines(query));
}


/// Call this inside catch block.
void setExceptionStackTrace(QueryLogElement & elem)
Expand Down Expand Up @@ -222,41 +203,9 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

/// Put query to process list. But don't put SHOW PROCESSLIST query itself.
ProcessList::EntryPtr process_list_entry;
if (!internal && !context.isMPPTask() && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
if (!internal && nullptr == typeid_cast<const ASTShowProcesslistQuery *>(&*ast))
{
process_list_entry = context.getProcessList().insert(
query,
ast.get(),
context.getClientInfo(),
settings);

context.setProcessListElement(&process_list_entry->get());
}
if (context.isMPPTask())
{
if (!context.isTest())
{
/// for MPPTask, process list entry is created in MPPTask::prepare()
RUNTIME_ASSERT(context.getDAGContext()->getProcessListEntry() != nullptr, "process list entry for MPP task must not be nullptr");
process_list_entry = context.getDAGContext()->getProcessListEntry();
}
else
{
/// it is possible that in test mode, the process list entry is nullptr because some tests run mpp query
/// just based on dag request, there is even no MPPTask at all.
if (context.getDAGContext()->getProcessListEntry() == nullptr)
{
process_list_entry = context.getProcessList().insert(
query,
ast.get(),
context.getClientInfo(),
settings);

context.setProcessListElement(&process_list_entry->get());
}
else
process_list_entry = context.getDAGContext()->getProcessListEntry();
}
process_list_entry = setProcessListElement(context, query, ast.get());
}

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint);
Expand All @@ -272,23 +221,7 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

if (res.in)
{
if (auto * stream = dynamic_cast<IProfilingBlockInputStream *>(res.in.get()))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());

/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);

stream->setLimits(limits);
stream->setQuota(quota);
}
}
prepareForInputStream(context, stage, res.in);
}

if (res.out)
Expand Down Expand Up @@ -420,13 +353,7 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

if (!internal && res.in)
{
auto pipeline_log_str = [&res]() {
FmtBuffer log_buffer;
log_buffer.append("Query pipeline:\n");
res.in->dumpTree(log_buffer);
return log_buffer.toString();
};
LOG_DEBUG(execute_query_logger, pipeline_log_str());
logQueryPipeline(execute_query_logger, res.in);
}
}
}
Expand All @@ -442,6 +369,75 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
} // namespace

/// Log query into text log (not into system table).
void logQuery(const String & query, const Context & context, const LoggerPtr & logger)
{
const auto & current_query_id = context.getClientInfo().current_query_id;
const auto & initial_query_id = context.getClientInfo().initial_query_id;
const auto & current_user = context.getClientInfo().current_user;

LOG_FMT_DEBUG(
logger,
"(from {}{}, query_id: {}{}) {}",
context.getClientInfo().current_address.toString(),
(current_user != "default" ? ", user: " + current_user : ""),
current_query_id,
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : ""),
joinLines(query));
}

void prepareForInputStream(
Context & context,
QueryProcessingStage::Enum stage,
const BlockInputStreamPtr & in)
{
assert(in);
if (auto * stream = dynamic_cast<IProfilingBlockInputStream *>(in.get()))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());

/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
const auto & settings = context.getSettingsRef();
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);

stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
}

std::shared_ptr<ProcessListEntry> setProcessListElement(
Context & context,
const String & query,
const IAST * ast)
{
assert(ast);
auto process_list_entry = context.getProcessList().insert(
query,
ast,
context.getClientInfo(),
context.getSettingsRef());
context.setProcessListElement(&process_list_entry->get());
return process_list_entry;
}

void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in)
{
assert(in);
auto pipeline_log_str = [&in]() {
FmtBuffer log_buffer;
log_buffer.append("Query pipeline:\n");
in->dumpTree(log_buffer);
return log_buffer.toString();
};
LOG_DEBUG(logger, pipeline_log_str());
}

BlockIO executeQuery(
const String & query,
Expand All @@ -456,15 +452,6 @@ BlockIO executeQuery(
}


BlockIO executeQuery(IQuerySource & dag, Context & context, bool internal, QueryProcessingStage::Enum stage)
{
BlockIO streams;
std::tie(std::ignore, streams) = executeQueryImpl(dag, context, internal, stage);
context.getDAGContext()->attachBlockIO(streams);
return streams;
}


void executeQuery(
ReadBuffer & istr,
WriteBuffer & ostr,
Expand Down
Loading