Skip to content

Commit

Permalink
fix issue #7810 (#7854) (#7871)
Browse files Browse the repository at this point in the history
close #7810
  • Loading branch information
ti-chi-bot authored Aug 1, 2023
1 parent bae3aef commit 4df1054
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 32 deletions.
1 change: 0 additions & 1 deletion dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal)
/// Hold element of process list till end of query execution.
res.process_list_entry = process_list_entry;

prepareForInputStream(context, QueryProcessingStage::Complete, res.in);
if (likely(!internal))
logQueryPipeline(logger, res.in);

Expand Down
52 changes: 26 additions & 26 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,32 @@ void onExceptionBeforeStart(const String & query, Context & context, time_t curr
}
}

void prepareForInputStream(
Context & context,
QueryProcessingStage::Enum stage,
const BlockInputStreamPtr & in)
{
assert(in);
if (auto * stream = dynamic_cast<IProfilingBlockInputStream *>(in.get()))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());

/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
const auto & settings = context.getSettingsRef();
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);

stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
}

std::tuple<ASTPtr, BlockIO> executeQueryImpl(
IQuerySource & query_src,
Context & context,
Expand Down Expand Up @@ -386,32 +412,6 @@ void logQuery(const String & query, const Context & context, const LoggerPtr & l
joinLines(query));
}

void prepareForInputStream(
Context & context,
QueryProcessingStage::Enum stage,
const BlockInputStreamPtr & in)
{
assert(in);
if (auto * stream = dynamic_cast<IProfilingBlockInputStream *>(in.get()))
{
stream->setProgressCallback(context.getProgressCallback());
stream->setProcessListElement(context.getProcessListElement());

/// Limits on the result, the quota on the result, and also callback for progress.
/// Limits apply only to the final result.
if (stage == QueryProcessingStage::Complete)
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
const auto & settings = context.getSettingsRef();
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);

stream->setLimits(limits);
stream->setQuota(context.getQuota());
}
}
}

std::shared_ptr<ProcessListEntry> setProcessListElement(
Context & context,
const String & query,
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/Interpreters/executeQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,4 @@ void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in);

void logQuery(const String & query, const Context & context, const LoggerPtr & logger);

void prepareForInputStream(
Context & context,
QueryProcessingStage::Enum stage,
const BlockInputStreamPtr & in);

} // namespace DB

0 comments on commit 4df1054

Please sign in to comment.