Skip to content

Commit

Permalink
Merge branch 'master' into refine_join_probe
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise authored Mar 24, 2023
2 parents 1e4f678 + 828da8a commit cda798d
Show file tree
Hide file tree
Showing 63 changed files with 1,580 additions and 789 deletions.
6 changes: 5 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ namespace DB
F(type_complete_multi_part_upload, {{"type", "complete_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \
F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}))
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_checkpoint_seconds, "PageStorage checkpoint elapsed time", Histogram, \
F(type_dump_checkpoint_snapshot, {{"type", "dump_checkpoint_snapshot"}}, ExpBuckets{0.001, 2, 20}), \
F(type_dump_checkpoint_data, {{"type", "dump_checkpoint_data"}}, ExpBuckets{0.001, 2, 20}), \
F(type_upload_checkpoint, {{"type", "upload_checkpoint"}}, ExpBuckets{0.001, 2, 20}))

// clang-format on

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Block AggregatingBlockInputStream::readImpl()
aggregator.spill(*data_variants);
}
aggregator.finishSpill();
LOG_INFO(log, "Begin restore data from disk for aggregation.");
BlockInputStreams input_streams = aggregator.restoreSpilledData();
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1, log->identifier());
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Block MergeSortingBlockInputStream::readImpl()
{
/// If spill happens

LOG_INFO(log, "Begin external merge sort.");
LOG_INFO(log, "Begin restore data from disk for merge sort.");

/// Create sorted streams to merge.
spiller->finishSpill();
Expand Down
25 changes: 6 additions & 19 deletions dbms/src/DataStreams/NonJoinedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,9 @@ Block NonJoinedBlockInputStream::readImpl()
/// just return empty block for extra non joined block input stream read
if (unlikely(index >= parent.getBuildConcurrency()))
return Block();
if (!parent.isEnableSpill())
{
if (parent.blocks.empty())
return Block();
}
else
{
if (std::all_of(
std::begin(parent.partitions),
std::end(parent.partitions),
[](const std::unique_ptr<Join::JoinPartition> & partition) { return partition->build_partition.blocks.empty(); }))
{
return Block();
}
}

if (!parent.has_build_data_in_memory)
/// no build data in memory, the non joined result must be empty
return Block();

/// todo read data based on JoinPartition
if (add_not_mapped_rows)
Expand Down Expand Up @@ -258,7 +245,7 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,
{
current_segment = index;

while (parent.partitions[current_segment]->spill)
while (parent.partitions[current_segment]->isSpill())
{
current_segment += step;
if (current_segment >= map.getSegmentSize())
Expand All @@ -282,10 +269,10 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,

for (; *it != end || current_segment + step < map.getSegmentSize();)
{
if (*it == end || parent.partitions[current_segment]->spill)
if (*it == end || parent.partitions[current_segment]->isSpill())
{
current_segment += step;
while (parent.partitions[current_segment]->spill)
while (parent.partitions[current_segment]->isSpill())
{
current_segment += step;
if (current_segment >= map.getSegmentSize())
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ DBGInvoker::DBGInvoker()

regSchemalessFunc("read_index_stress_test", ReadIndexStressTest::dbgFuncStressTest);

regSchemalessFunc("get_active_threads_in_dynamic_thread_pool", dbgFuncActiveThreadsInDynamicThreadPool);
regSchemalessFunc("wait_until_no_temp_active_threads_in_dynamic_thread_pool", dbgFuncWaitUntilNoTempActiveThreadsInDynamicThreadPool);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,10 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
const google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters{};
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
google::protobuf::RepeatedPtrField<tipb::Expr>{}, // Not care now
pushed_down_filters, // Not care now
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
Expand Down
28 changes: 24 additions & 4 deletions dbms/src/Debug/dbgFuncMisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Pr
return;
}
String target_line;
for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++)
for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++) // NOLINT
{
if (getReadTSOForLog(*iter) == target_read_tso)
{
Expand Down Expand Up @@ -132,12 +132,32 @@ void dbgFuncTriggerGlobalPageStorageGC(Context & context, const ASTs & /*args*/,
}
}

void dbgFuncActiveThreadsInDynamicThreadPool(Context &, const ASTs &, DBGInvoker::Printer output)
void dbgFuncWaitUntilNoTempActiveThreadsInDynamicThreadPool(Context &, const ASTs & args, DBGInvoker::Printer output)
{
if (DynamicThreadPool::global_instance)
{
auto value = GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value();
output(std::to_string(static_cast<Int64>(value)));
static const UInt64 MAX_WAIT_TIME = 10;
auto wait_time = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[0]).value);
wait_time = std::min(wait_time, MAX_WAIT_TIME);
/// should update the value when there is long running threads using dynamic thread pool
static const int expected_value = 0;

while (wait_time > 0)
{
if (GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value() == expected_value)
{
output("0");
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
--wait_time;
}
if (GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value() == expected_value)
{
output("0");
return;
}
output("1");
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMisc.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Pr
// ./storage-client.sh "DBGInvoke trigger_global_storage_pool_gc()"
void dbgFuncTriggerGlobalPageStorageGC(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Get active threads in dynamic thread pool, if dynamic thread pool is disabled, return 0
void dbgFuncActiveThreadsInDynamicThreadPool(Context & context, const ASTs & /*args*/, DBGInvoker::Printer /*output*/);
// Wait until no active threads in dynamic thread pool finish, if timeout, return 1, else return 0
void dbgFuncWaitUntilNoTempActiveThreadsInDynamicThreadPool(Context & context, const ASTs & /*args*/, DBGInvoker::Printer /*output*/);

} // namespace DB
50 changes: 1 addition & 49 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <Flash/Coprocessor/RequestUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Planner/ExecutorIdGenerator.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/Transaction/TMTContext.h>
#include <kvproto/disaggregated.pb.h>
Expand Down Expand Up @@ -58,15 +57,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, TablesRegionsInfo && tab
, warning_count(0)
, keyspace_id(keyspace_id_)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors());
const auto & root_executor = isTreeBasedExecutors()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}

// for mpp
Expand All @@ -75,7 +66,6 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, return_executor_id(true)
, is_mpp_task(true)
, is_root_mpp_task(is_root_mpp_task_)
, flags(dag_request->flags())
Expand All @@ -87,12 +77,9 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(meta_))
{
RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id());
root_executor_id = dag_request->root_executor().executor_id();
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
initListBasedExecutors();
}

// for disaggregated task on write node
Expand All @@ -116,11 +103,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const disaggregated::Dis
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(task_meta_))
{
RUNTIME_CHECK(isTreeBasedExecutors() && dag_request->root_executor().has_executor_id());
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
initListBasedExecutors();
}

// for test
Expand Down Expand Up @@ -153,15 +136,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, s
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK((dag_request->executors_size() > 0) != isTreeBasedExecutors());
const auto & root_executor = isTreeBasedExecutors()
? dag_request->root_executor()
: dag_request->executors(dag_request->executors_size() - 1);
return_executor_id = root_executor.has_executor_id();
if (return_executor_id)
root_executor_id = root_executor.executor_id();
initOutputInfo();
initListBasedExecutors();
}

void DAGContext::initOutputInfo()
Expand All @@ -182,29 +157,6 @@ void DAGContext::initOutputInfo()
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}


void DAGContext::initListBasedExecutors()
{
if (!isTreeBasedExecutors())
{
ExecutorIdGenerator id_generator;
for (int i = 0; i < dag_request->executors_size(); ++i)
{
auto * executor = dag_request->mutable_executors(i);
const auto & executor_id = id_generator.generate(*executor);
list_based_executors_order.push_back(executor_id);
// Set executor_id for list based executor,
// then we can fill executor_id for Execution Summaries of list-based executors
executor->set_executor_id(executor_id);
}
}
}

bool DAGContext::isTreeBasedExecutors() const
{
return dag_request->has_root_executor();
}

bool DAGContext::allowZeroInDate() const
{
return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE;
Expand Down Expand Up @@ -240,7 +192,7 @@ void DAGContext::initExecutorIdToJoinIdMap()
return;

executor_id_to_join_id_map.clear();
traverseExecutorsReverse(dag_request, [&](const tipb::Executor & executor) {
dag_request.traverseReverse([&](const tipb::Executor & executor) {
std::vector<String> all_join_id;
// for mpp, dag_request.has_root_executor() == true, can call `getChildren` directly.
getChildren(executor).forEach([&](const tipb::Executor & child) {
Expand Down
12 changes: 2 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif
#include <kvproto/mpp.pb.h>
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Logger.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/DAGRequest.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
Expand Down Expand Up @@ -274,7 +274,7 @@ class DAGContext

KeyspaceID getKeyspaceID() const { return keyspace_id; }

tipb::DAGRequest * dag_request;
DAGRequest dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
/// dummy_query_string and dummy_ast is used for that
String dummy_query_string;
Expand All @@ -291,8 +291,6 @@ class DAGContext
// For disaggregated read, this is the host of compute node
String tidb_host = "Unknown";
bool collect_execution_summaries{};
bool return_executor_id{};
String root_executor_id;
/* const */ bool is_mpp_task = false;
/* const */ bool is_root_mpp_task = false;
/* const */ bool is_batch_cop = false;
Expand All @@ -313,10 +311,6 @@ class DAGContext
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

/// Hold the order of list based executors.
/// It is used to ensure that the order of Execution summary of list based executors is the same as the order of list based executors.
std::vector<String> list_based_executors_order;

/// executor_id, ScanContextPtr
/// Currently, max(scan_context_map.size()) == 1, because one mpp task only have do one table scan
/// While when we support collcate join later, scan_context_map.size() may > 1,
Expand All @@ -326,8 +320,6 @@ class DAGContext
private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();
void initListBasedExecutors();
bool isTreeBasedExecutors() const;

private:
std::shared_ptr<ProcessListEntry> process_list_entry;
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ namespace DB
DAGQuerySource::DAGQuerySource(Context & context_)
: context(context_)
{
const tipb::DAGRequest & dag_request = *getDAGContext().dag_request;
if (dag_request.has_root_executor())
const auto & dag_request = getDAGContext().dag_request;
if (dag_request.isTreeBased())
{
QueryBlockIDGenerator id_generator;
root_query_block = std::make_shared<DAGQueryBlock>(dag_request.root_executor(), id_generator);
root_query_block = std::make_shared<DAGQueryBlock>(dag_request->root_executor(), id_generator);
}
else
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors());
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request->executors());
}
}

Expand Down
Loading

0 comments on commit cda798d

Please sign in to comment.