Skip to content

Commit

Permalink
Merge branch 'master' into fix_update_statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 23, 2023
2 parents 3d89050 + d1949c6 commit 773447f
Show file tree
Hide file tree
Showing 49 changed files with 1,561 additions and 558 deletions.
6 changes: 6 additions & 0 deletions dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ class IColumn : public COWPtr<IColumn>
return res;
}

MutablePtr cloneFullColumn() const
{
MutablePtr res = clone();
res->forEachSubcolumn([](Ptr & subcolumn) { subcolumn = subcolumn->clone(); });
return res;
}

/** Some columns can contain another columns inside.
* So, we have a tree of columns. But not all combinations are possible.
Expand Down
16 changes: 10 additions & 6 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ MemoryTracker::~MemoryTracker()
if (is_global_root)
return;

if (peak)
if (peak && log_peak_memory_usage_in_destructor)
{
try
{
Expand Down Expand Up @@ -305,19 +305,23 @@ void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit)
"Storage task memory limit={}, larger_than_limit={}",
formatReadableSizeWithBinarySuffix(limit),
formatReadableSizeWithBinarySuffix(larger_than_limit));

// When these (static) mem tracker are reset, it means the process is shutdown and the logging system is stopped.
// Do not log down the usage info in dctor of them.
bool log_in_destructor = false;
RUNTIME_CHECK(sub_root_of_query_storage_task_mem_trackers == nullptr);
sub_root_of_query_storage_task_mem_trackers = MemoryTracker::create(limit);
sub_root_of_query_storage_task_mem_trackers = MemoryTracker::create(limit, nullptr, log_in_destructor);
sub_root_of_query_storage_task_mem_trackers->setBytesThatRssLargerThanLimit(larger_than_limit);
sub_root_of_query_storage_task_mem_trackers->setAmountMetric(CurrentMetrics::MemoryTrackingQueryStorageTask);

RUNTIME_CHECK(fetch_pages_mem_tracker == nullptr);
fetch_pages_mem_tracker = MemoryTracker::create();
fetch_pages_mem_tracker->setNext(sub_root_of_query_storage_task_mem_trackers.get());
fetch_pages_mem_tracker
= MemoryTracker::create(0, sub_root_of_query_storage_task_mem_trackers.get(), log_in_destructor);
fetch_pages_mem_tracker->setAmountMetric(CurrentMetrics::MemoryTrackingFetchPages);

RUNTIME_CHECK(shared_column_data_mem_tracker == nullptr);
shared_column_data_mem_tracker = MemoryTracker::create();
shared_column_data_mem_tracker->setNext(sub_root_of_query_storage_task_mem_trackers.get());
shared_column_data_mem_tracker
= MemoryTracker::create(0, sub_root_of_query_storage_task_mem_trackers.get(), log_in_destructor);
shared_column_data_mem_tracker->setAmountMetric(CurrentMetrics::MemoryTrackingSharedColumnData);
}

Expand Down
24 changes: 11 additions & 13 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <common/types.h>

#include <atomic>
#include <boost/noncopyable.hpp>

extern std::atomic<Int64> real_rss, proc_num_threads, baseline_of_query_mem_tracker;
extern std::atomic<UInt64> proc_virt_size;
Expand Down Expand Up @@ -46,6 +47,7 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
double fault_probability = 0;

bool is_global_root = false;
bool log_peak_memory_usage_in_destructor = true;

/// To test the accuracy of memory track, it throws an exception when the part exceeding the tracked amount is greater than accuracy_diff_for_test.
std::atomic<Int64> accuracy_diff_for_test{0};
Expand All @@ -65,7 +67,6 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
std::atomic<const char *> description = nullptr;

/// Make constructors private to ensure all objects of this class is created by `MemoryTracker::create`.
MemoryTracker() = default;
explicit MemoryTracker(Int64 limit_)
: limit(limit_)
{}
Expand All @@ -79,16 +80,15 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>

public:
/// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors.
static MemoryTrackerPtr create(Int64 limit = 0)
static MemoryTrackerPtr create(
Int64 limit = 0,
MemoryTracker * parent = nullptr,
bool log_peak_memory_usage_in_destructor = true)
{
if (limit == 0)
{
return std::shared_ptr<MemoryTracker>(new MemoryTracker);
}
else
{
return std::shared_ptr<MemoryTracker>(new MemoryTracker(limit));
}
auto p = std::shared_ptr<MemoryTracker>(new MemoryTracker(limit));
p->setParent(parent);
p->log_peak_memory_usage_in_destructor = log_peak_memory_usage_in_destructor;
return p;
}

static MemoryTrackerPtr createGlobalRoot() { return std::shared_ptr<MemoryTracker>(new MemoryTracker(0, true)); }
Expand Down Expand Up @@ -128,7 +128,7 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
void setAccuracyDiffForTest(Int64 value) { accuracy_diff_for_test.store(value, std::memory_order_relaxed); }

/// next should be changed only once: from nullptr to some value.
void setNext(MemoryTracker * elem)
void setParent(MemoryTracker * elem)
{
MemoryTracker * old_val = nullptr;
if (!next.compare_exchange_strong(old_val, elem, std::memory_order_seq_cst, std::memory_order_relaxed))
Expand Down Expand Up @@ -187,8 +187,6 @@ void free(Int64 size);
} // namespace CurrentMemoryTracker


#include <boost/noncopyable.hpp>

struct TemporarilyDisableMemoryTracker : private boost::noncopyable
{
MemoryTracker * memory_tracker;
Expand Down
9 changes: 3 additions & 6 deletions dbms/src/Common/tests/gtest_memtracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ TEST_F(MemTrackerTest, testRootAndChild)
try
{
auto root_mem_tracker = MemoryTracker::create();
auto child_mem_tracker = MemoryTracker::create(512);
child_mem_tracker->setNext(root_mem_tracker.get());
auto child_mem_tracker = MemoryTracker::create(512, root_mem_tracker.get());
// alloc 500
child_mem_tracker->alloc(500);
ASSERT_EQ(500, child_mem_tracker->get());
Expand Down Expand Up @@ -71,10 +70,8 @@ TEST_F(MemTrackerTest, testRootAndMultipleChild)
try
{
auto root = MemoryTracker::create(512); // limit 512
auto child1 = MemoryTracker::create(512); // limit 512
auto child2 = MemoryTracker::create(512); // limit 512
child1->setNext(root.get());
child2->setNext(root.get());
auto child1 = MemoryTracker::create(512, root.get()); // limit 512
auto child2 = MemoryTracker::create(512, root.get()); // limit 512
// alloc 500 on child1
child1->alloc(500);
ASSERT_EQ(500, child1->get());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
probe_exec.set(HashJoinProbeExec::build(req_id, original_join, stream_index, input, max_block_size_));
probe_exec->setCancellationHook([&]() { return isCancelledOrThrowIfKilled(); });

ProbeProcessInfo header_probe_process_info(0);
ProbeProcessInfo header_probe_process_info(0, 0);
header_probe_process_info.resetBlock(input->getHeader());
header = original_join->joinBlock(header_probe_process_info, true);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/HashJoinProbeExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ HashJoinProbeExec::HashJoinProbeExec(
, need_scan_hash_map_after_probe(need_scan_hash_map_after_probe_)
, scan_hash_map_after_probe_stream(scan_hash_map_after_probe_stream_)
, max_block_size(max_block_size_)
, probe_process_info(max_block_size_)
, probe_process_info(max_block_size_, join->getProbeCacheColumnThreshold())
{}

void HashJoinProbeExec::waitUntilAllBuildFinished()
Expand Down
59 changes: 50 additions & 9 deletions dbms/src/DataStreams/ScanHashMapAfterProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct AdderMapEntry<ASTTableJoin::Strictness::Any, Mapped>
size_t num_columns_right,
MutableColumns & columns_right,
const void *&,
size_t,
const size_t)
{
for (size_t j = 0; j < num_columns_left; ++j)
Expand All @@ -68,20 +69,40 @@ struct AdderMapEntry<ASTTableJoin::Strictness::All, Mapped>
size_t num_columns_right,
MutableColumns & columns_right,
const void *& next_element_in_row_list,
size_t probe_cached_rows_threshold,
const size_t max_row_added)
{
size_t rows_added = 0;
auto current = &static_cast<const typename Mapped::Base_t &>(mapped);
if unlikely (next_element_in_row_list != nullptr)
current = reinterpret_cast<const typename Mapped::Base_t *>(next_element_in_row_list);
for (; rows_added < max_row_added && current != nullptr; current = current->next)
{
assert(rows_added < max_row_added);
const auto * current = &static_cast<const typename Mapped::Base_t &>(mapped);

auto add_one_row = [&]() {
/// handle left columns later to utilize insertManyDefaults
for (size_t j = 0; j < num_columns_right; ++j)
columns_right[j]->insertFrom(
*current->block->getByPosition(key_num + j).column.get(),
current->row_num);
++rows_added;
};
if unlikely (next_element_in_row_list != nullptr)
{
current = reinterpret_cast<const typename Mapped::Base_t *>(next_element_in_row_list);
}
else
{
add_one_row();
if unlikely (probe_cached_rows_threshold > 0 && current->list_length >= probe_cached_rows_threshold)
{
current = reinterpret_cast<const typename Mapped::Base_t *>(current->cached_column_info->next);
}
else
{
current = current->next;
}
}
for (; rows_added < max_row_added && current != nullptr; current = current->next)
{
add_one_row();
}
for (size_t j = 0; j < num_columns_left; ++j)
/// should fill the key column with key columns from right block
Expand All @@ -106,14 +127,14 @@ struct AdderRowFlaggedMapEntry
size_t num_columns_right,
MutableColumns & columns_right,
const void *& next_element_in_row_list,
size_t probe_cached_rows_threshold,
const size_t max_row_added)
{
size_t rows_added = 0;
assert(rows_added < max_row_added);
const auto * current = &static_cast<const typename Mapped::Base_t &>(mapped);
if unlikely (next_element_in_row_list != nullptr)
current = reinterpret_cast<const typename Mapped::Base_t *>(next_element_in_row_list);
for (; rows_added < max_row_added && current != nullptr; current = current->next)
{

auto check_and_add_one_row = [&]() {
bool flag = current->getUsed();
if constexpr (!add_joined)
flag = !flag;
Expand All @@ -126,6 +147,24 @@ struct AdderRowFlaggedMapEntry
current->row_num);
++rows_added;
}
};
if unlikely (next_element_in_row_list != nullptr)
current = reinterpret_cast<const typename Mapped::Base_t *>(next_element_in_row_list);
else
{
check_and_add_one_row();
if unlikely (probe_cached_rows_threshold > 0 && current->list_length >= probe_cached_rows_threshold)
{
current = reinterpret_cast<const typename Mapped::Base_t *>(current->cached_column_info->next);
}
else
{
current = current->next;
}
}
for (; rows_added < max_row_added && current != nullptr; current = current->next)
{
check_and_add_one_row();
}
for (size_t j = 0; j < num_columns_left; ++j)
/// should fill the key column with key columns from right block
Expand Down Expand Up @@ -488,6 +527,7 @@ void ScanHashMapAfterProbeBlockInputStream::fillColumns(
num_columns_right,
mutable_columns_right,
next_element_in_row_list,
parent.probe_cache_column_threshold,
row_count_info.availableRowCount()));
else
{
Expand All @@ -508,6 +548,7 @@ void ScanHashMapAfterProbeBlockInputStream::fillColumns(
num_columns_right,
mutable_columns_right,
next_element_in_row_list,
parent.probe_cache_column_threshold,
row_count_info.availableRowCount()));
}
assert(row_count_info.getCurrentRows() <= max_block_size);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct MockSchemaGetter

static bool checkSchemaDiffExists(Int64 version) { return MockTiDB::instance().checkSchemaDiffExists(version); }

static TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id)
static TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id, [[maybe_unused]] bool try_mvcc = true)
{
return MockTiDB::instance().getTableInfoByID(table_id);
}
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Encryption/tests/gtest_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ TEST(WriteLimiterTest, Rate)
thread.join();
auto elapsed = watch.elapsedSeconds();
auto actual_rate = write_limiter->getTotalBytesThrough() / elapsed;
// make sure that 0.8 * target <= actual_rate <= 1.25 * target
// hint: the range [0.8, 1.25] is copied from rocksdb,
// if tests fail, try to enlarge this range.
// enlarge the range to [0.75, 1.30]
EXPECT_GE(actual_rate / target, 0.75);
EXPECT_LE(actual_rate / target, 1.30);
// For environments with high loads, latency can be very large.
// In theory, the upper bound of `elapsed` cannot be guaranteed.
// So that we cannot guarantee the lower bound of `actual_rate`.
// EXPECT_GE(actual_rate / target, 0.75)
// << fmt::format("actual_rate={} target={} elapsed={:.3f}s", actual_rate, target, elapsed);
EXPECT_LE(actual_rate / target, 1.30)
<< fmt::format("actual_rate={} target={} elapsed={:.3f}s", actual_rate, target, elapsed);
}
}

Expand Down
12 changes: 5 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ void DAGQueryBlockInterpreter::handleJoin(
const Settings & settings = context.getSettingsRef();
SpillConfig build_spill_config(
context.getTemporaryPath(),
fmt::format("{}_hash_join_0_build", log->identifier()),
fmt::format("{}_0_build", log->identifier()),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand All @@ -310,7 +310,7 @@ void DAGQueryBlockInterpreter::handleJoin(
settings.max_block_size);
SpillConfig probe_spill_config(
context.getTemporaryPath(),
fmt::format("{}_hash_join_0_probe", log->identifier()),
fmt::format("{}_0_probe", log->identifier()),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand All @@ -332,12 +332,11 @@ void DAGQueryBlockInterpreter::handleJoin(
build_key_names,
tiflash_join.kind,
log->identifier(),
enableFineGrainedShuffle(fine_grained_shuffle_count),
fine_grained_shuffle_count,
settings.max_bytes_before_external_join,
build_spill_config,
probe_spill_config,
settings.join_restore_concurrency,
RestoreConfig{settings.join_restore_concurrency, 0, 0},
join_output_column_names,
[&](const OperatorSpillContextPtr & operator_spill_context) {
if (context.getDAGContext() != nullptr)
Expand All @@ -352,8 +351,7 @@ void DAGQueryBlockInterpreter::handleJoin(
settings.shallow_copy_cross_probe_threshold,
match_helper_name,
flag_mapped_entry_helper_name,
0,
0,
settings.join_probe_cache_columns_threshold,
context.isTest());

recordJoinExecuteInfo(tiflash_join.build_side_index, join_ptr);
Expand Down Expand Up @@ -499,7 +497,7 @@ void DAGQueryBlockInterpreter::executeAggregation(
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
log->identifier(),
settings.max_cached_data_bytes_in_spiller,
settings.max_spilled_rows_per_file,
settings.max_spilled_bytes_per_file,
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,11 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
if (!table_store)
{
if (schema_synced)
throw TiFlashException(fmt::format("Table {} doesn't exist.", table_id), Errors::Table::NotExists);
throw TiFlashException(
Errors::Table::NotExists,
"Table doesn't exist, keyspace={} table_id={}",
keyspace_id,
table_id);
else
return {{}, {}};
}
Expand Down
Loading

0 comments on commit 773447f

Please sign in to comment.