Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine executeQuery: remove bool internal #6654

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tidb, only the new created file need to set as 2023?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, is all files.
A pr is needed later to update the file headers of all files.

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,7 +92,7 @@ try
auto start_time = Clock::now();
DAGContext & dag_context = *context.getDAGContext();

BlockIO streams = executeQuery(context, internal);
BlockIO streams = executeAsBlockIO(context);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);
Expand Down
25 changes: 10 additions & 15 deletions dbms/src/Flash/executeQuery.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,7 +67,7 @@ ProcessList::EntryPtr getProcessListEntry(Context & context, DAGContext & dag_co
}
}

BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal)
BlockIO doExecuteAsBlockIO(IQuerySource & dag, Context & context)
{
RUNTIME_ASSERT(context.getDAGContext());
auto & dag_context = *context.getDAGContext();
Expand All @@ -76,12 +76,8 @@ BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal)

prepareForExecute(context);

ProcessList::EntryPtr process_list_entry;
if (likely(!internal))
{
process_list_entry = getProcessListEntry(context, dag_context);
logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger);
}
auto process_list_entry = getProcessListEntry(context, dag_context);
logQuery(dag.str(context.getSettingsRef().log_queries_cut_to_length), context, logger);

FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint);
auto interpreter = dag.interpreter(context, QueryProcessingStage::Complete);
Expand All @@ -93,29 +89,28 @@ BlockIO executeDAG(IQuerySource & dag, Context & context, bool internal)
res.process_list_entry = process_list_entry;

prepareForInputStream(context, QueryProcessingStage::Complete, res.in);
if (likely(!internal))
logQueryPipeline(logger, res.in);
logQueryPipeline(logger, res.in);

return res;
}
} // namespace

BlockIO executeQuery(Context & context, bool internal)
BlockIO executeAsBlockIO(Context & context)
{
if (context.getSettingsRef().enable_planner)
{
PlanQuerySource plan(context);
return executeDAG(plan, context, internal);
return doExecuteAsBlockIO(plan, context);
}
else
{
DAGQuerySource dag(context);
return executeDAG(dag, context, internal);
return doExecuteAsBlockIO(dag, context);
}
}

QueryExecutorPtr queryExecute(Context & context, bool internal)
QueryExecutorPtr queryExecute(Context & context)
{
return std::make_unique<DataStreamExecutor>(executeQuery(context, internal));
return std::make_unique<DataStreamExecutor>(executeAsBlockIO(context));
}
} // namespace DB
12 changes: 7 additions & 5 deletions dbms/src/Flash/executeQuery.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,13 +14,15 @@

#pragma once

#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <Flash/Executor/QueryExecutor.h>
#include <Interpreters/Context.h>

namespace DB
{
BlockIO executeQuery(Context & context, bool internal = false);
QueryExecutorPtr queryExecute(Context & context, bool internal = false);
class Context;

// Use BlockInputStream-based pull model directly.
BlockIO executeAsBlockIO(Context & context);

QueryExecutorPtr queryExecute(Context & context);
} // namespace DB
18 changes: 8 additions & 10 deletions dbms/src/Flash/tests/gtest_spill_aggregation.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,36 +58,34 @@ try
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// disable spill
context.context.setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
auto ref_columns = executeStreams(request, original_max_streams, true);
auto ref_columns = executeStreams(request, original_max_streams);
/// enable spill
context.context.setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(total_data_size / 200)));
context.context.setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context.setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
/// don't use `executeAndAssertColumnsEqual` since it takes too long to run
/// test single thread aggregation
/// need to enable memory tracker since currently, the memory usage in aggregator is
/// calculated by memory tracker, if memory tracker is not enabled, spill will never be triggered.
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1));
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
/// enable spill and use small max_spilled_size_per_spill
context.context.setSetting("max_spilled_size_per_spill", Field(static_cast<UInt64>(total_data_size / 200)));
/// test single thread aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, 1));
/// test parallel aggregation
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams, true));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
/// test spill with small max_block_size
/// the avg rows in one bucket is ~10240/256 = 400, so set the small_max_block_size to 300
/// is enough to test the output spilt
size_t small_max_block_size = 300;
context.context.setSetting("max_block_size", Field(static_cast<UInt64>(small_max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, 1, true);
auto blocks = getExecuteStreamsReturnBlocks(request, 1);
for (auto & block : blocks)
{
ASSERT_EQ(block.rows() <= small_max_block_size, true);
}
ASSERT_COLUMNS_EQ_UR(ref_columns, mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName());
blocks = getExecuteStreamsReturnBlocks(request, original_max_streams, true);
blocks = getExecuteStreamsReturnBlocks(request, original_max_streams);
for (auto & block : blocks)
{
ASSERT_EQ(block.rows() <= small_max_block_size, true);
Expand Down
23 changes: 11 additions & 12 deletions dbms/src/TestUtils/ExecutorTestUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -100,7 +100,7 @@ void ExecutorTest::executeInterpreter(const String & expected_string, const std:
TiFlashTestEnv::setUpTestContext(context.context, &dag_context, context.mockStorage(), TestType::INTERPRETER_TEST);

// Don't care regions information in interpreter tests.
auto query_executor = queryExecute(context.context, /*internal=*/true);
auto query_executor = queryExecute(context.context);
ASSERT_EQ(Poco::trim(expected_string), Poco::trim(query_executor->dump()));
}

Expand All @@ -109,7 +109,7 @@ void ExecutorTest::executeInterpreterWithDeltaMerge(const String & expected_stri
DAGContext dag_context(*request, "interpreter_test_with_delta_merge", concurrency);
TiFlashTestEnv::setUpTestContext(context.context, &dag_context, context.mockStorage(), TestType::EXECUTOR_TEST);
// Don't care regions information in interpreter tests.
auto query_executor = queryExecute(context.context, /*internal=*/true);
auto query_executor = queryExecute(context.context);
ASSERT_EQ(Poco::trim(expected_string), Poco::trim(query_executor->dump()));
}

Expand Down Expand Up @@ -207,31 +207,30 @@ void ExecutorTest::enablePlanner(bool is_enable)

DB::ColumnsWithTypeAndName ExecutorTest::executeStreams(
const std::shared_ptr<tipb::DAGRequest> & request,
size_t concurrency,
bool enable_memory_tracker)
size_t concurrency)
{
DAGContext dag_context(*request, "executor_test", concurrency);
return executeStreams(&dag_context, enable_memory_tracker);
return executeStreams(&dag_context);
}

ColumnsWithTypeAndName ExecutorTest::executeStreams(DAGContext * dag_context, bool enable_memory_tracker)
ColumnsWithTypeAndName ExecutorTest::executeStreams(DAGContext * dag_context)
{
TiFlashTestEnv::setUpTestContext(context.context, dag_context, context.mockStorage(), TestType::EXECUTOR_TEST);
// Currently, don't care about regions information in tests.
Blocks blocks;
queryExecute(context.context, /*internal=*/!enable_memory_tracker)->execute([&blocks](const Block & block) { blocks.push_back(block); }).verify();
queryExecute(context.context)->execute([&blocks](const Block & block) { blocks.push_back(block); }).verify();
return mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName();
}

Blocks ExecutorTest::getExecuteStreamsReturnBlocks(const std::shared_ptr<tipb::DAGRequest> & request,
size_t concurrency,
bool enable_memory_tracker)
Blocks ExecutorTest::getExecuteStreamsReturnBlocks(
const std::shared_ptr<tipb::DAGRequest> & request,
size_t concurrency)
{
DAGContext dag_context(*request, "executor_test", concurrency);
TiFlashTestEnv::setUpTestContext(context.context, &dag_context, context.mockStorage(), TestType::EXECUTOR_TEST);
// Currently, don't care about regions information in tests.
Blocks blocks;
queryExecute(context.context, /*internal=*/!enable_memory_tracker)->execute([&blocks](const Block & block) { blocks.push_back(block); }).verify();
queryExecute(context.context)->execute([&blocks](const Block & block) { blocks.push_back(block); }).verify();
return blocks;
}

Expand Down
10 changes: 4 additions & 6 deletions dbms/src/TestUtils/ExecutorTestUtils.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,17 +94,15 @@ class ExecutorTest : public ::testing::Test
}
}

ColumnsWithTypeAndName executeStreams(DAGContext * dag_context, bool enalbe_memory_tracker = false);
ColumnsWithTypeAndName executeStreams(DAGContext * dag_context);

ColumnsWithTypeAndName executeStreams(
const std::shared_ptr<tipb::DAGRequest> & request,
size_t concurrency = 1,
bool enable_memory_tracker = false);
size_t concurrency = 1);

Blocks getExecuteStreamsReturnBlocks(
const std::shared_ptr<tipb::DAGRequest> & request,
size_t concurrency = 1,
bool enable_memory_tracker = false);
size_t concurrency = 1);

private:
void executeExecutor(
Expand Down