Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix_rename_test
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Sep 5, 2022
2 parents 966bf3b + 4752b4b commit 7bba09a
Show file tree
Hide file tree
Showing 23 changed files with 491 additions and 134 deletions.
20 changes: 18 additions & 2 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ namespace CurrentMetrics
extern const Metric MemoryTracking;
}

class MemoryTracker;
using MemoryTrackerPtr = std::shared_ptr<MemoryTracker>;

/** Tracks memory consumption.
* It throws an exception if amount of consumed memory become greater than certain limit.
* The same memory tracker could be simultaneously used in different threads.
Expand All @@ -51,12 +54,26 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
/// This description will be used as prefix into log messages (if isn't nullptr)
const char * description = nullptr;

public:
/// Make constructors private to ensure all objects of this class is created by `MemoryTracker::create`.
MemoryTracker() = default;
explicit MemoryTracker(Int64 limit_)
: limit(limit_)
{}

public:
/// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors.
static MemoryTrackerPtr create(Int64 limit = 0)
{
if (limit == 0)
{
return std::shared_ptr<MemoryTracker>(new MemoryTracker);
}
else
{
return std::shared_ptr<MemoryTracker>(new MemoryTracker(limit));
}
}

~MemoryTracker();

/** Call the following functions before calling of corresponding operations with memory allocators.
Expand Down Expand Up @@ -101,7 +118,6 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
void logPeakMemoryUsage() const;
};


/** The MemoryTracker object is quite difficult to pass to all places where significant amounts of memory are allocated.
* Therefore, a thread-local pointer to used MemoryTracker is set, or nullptr if MemoryTracker does not need to be used.
* This pointer is set when memory consumption is monitored in current thread.
Expand Down
12 changes: 7 additions & 5 deletions dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ CATCH
TEST_F(DynamicThreadPoolTest, testMemoryTracker)
try
{
MemoryTracker t0, t1, t2;
auto t0 = MemoryTracker::create();
auto t1 = MemoryTracker::create();
auto t2 = MemoryTracker::create();

current_memory_tracker = &t2;
current_memory_tracker = t2.get();

auto getter = [] {
return current_memory_tracker;
Expand All @@ -145,18 +147,18 @@ try
auto f = pool.schedule(false, getter);
ASSERT_EQ(f.get(), nullptr);

auto f0 = pool.schedule(false, setter, &t0);
auto f0 = pool.schedule(false, setter, t0.get());
f0.wait();

auto f1 = pool.schedule(false, getter);
// f0 didn't pollute memory_tracker
ASSERT_EQ(f1.get(), nullptr);

current_memory_tracker = &t1;
current_memory_tracker = t1.get();

auto f2 = pool.schedule(true, getter);
// set propagate = true and it did propagate
ASSERT_EQ(f2.get(), &t1);
ASSERT_EQ(f2.get(), t1.get());

auto f3 = pool.schedule(false, getter);
// set propagate = false and it didn't propagate
Expand Down
84 changes: 42 additions & 42 deletions dbms/src/Common/tests/gtest_memtracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,93 +26,93 @@ class MemTrackerTest : public ::testing::Test
TEST_F(MemTrackerTest, testBasic)
try
{
MemoryTracker mem_tracker;
mem_tracker.alloc(1024);
ASSERT_EQ(1024, mem_tracker.get());
mem_tracker.free(1024);
ASSERT_EQ(0, mem_tracker.get());
auto mem_tracker = MemoryTracker::create();
mem_tracker->alloc(1024);
ASSERT_EQ(1024, mem_tracker->get());
mem_tracker->free(1024);
ASSERT_EQ(0, mem_tracker->get());
}
CATCH

TEST_F(MemTrackerTest, testRootAndChild)
try
{
MemoryTracker root_mem_tracker;
MemoryTracker child_mem_tracker(512);
child_mem_tracker.setNext(&root_mem_tracker);
auto root_mem_tracker = MemoryTracker::create();
auto child_mem_tracker = MemoryTracker::create(512);
child_mem_tracker->setNext(root_mem_tracker.get());
// alloc 500
child_mem_tracker.alloc(500);
ASSERT_EQ(500, child_mem_tracker.get());
ASSERT_EQ(500, root_mem_tracker.get());
child_mem_tracker->alloc(500);
ASSERT_EQ(500, child_mem_tracker->get());
ASSERT_EQ(500, root_mem_tracker->get());

// alloc 256 base on 500
bool has_err = false;
try
{
child_mem_tracker.alloc(256); //500 + 256 > limit(512)
child_mem_tracker->alloc(256); //500 + 256 > limit(512)
}
catch (...)
{
has_err = true;
}
ASSERT_TRUE(has_err);
ASSERT_EQ(500, child_mem_tracker.get());
ASSERT_EQ(500, root_mem_tracker.get());
ASSERT_EQ(500, child_mem_tracker->get());
ASSERT_EQ(500, root_mem_tracker->get());

//free 500
child_mem_tracker.free(500);
ASSERT_EQ(0, child_mem_tracker.get());
ASSERT_EQ(0, root_mem_tracker.get());
child_mem_tracker->free(500);
ASSERT_EQ(0, child_mem_tracker->get());
ASSERT_EQ(0, root_mem_tracker->get());
}
CATCH

TEST_F(MemTrackerTest, testRootAndMultipleChild)
try
{
MemoryTracker root(512); // limit 512
MemoryTracker child1(512); // limit 512
MemoryTracker child2(512); // limit 512
child1.setNext(&root);
child2.setNext(&root);
auto root = MemoryTracker::create(512); // limit 512
auto child1 = MemoryTracker::create(512); // limit 512
auto child2 = MemoryTracker::create(512); // limit 512
child1->setNext(root.get());
child2->setNext(root.get());
// alloc 500 on child1
child1.alloc(500);
ASSERT_EQ(500, child1.get());
ASSERT_EQ(0, child2.get());
ASSERT_EQ(500, root.get());
child1->alloc(500);
ASSERT_EQ(500, child1->get());
ASSERT_EQ(0, child2->get());
ASSERT_EQ(500, root->get());


// alloc 500 on child2, should fail
bool has_err = false;
try
{
child2.alloc(500); // root will throw error because of "out of quota"
child2->alloc(500); // root will throw error because of "out of quota"
}
catch (...)
{
has_err = true;
}
ASSERT_TRUE(has_err);
ASSERT_EQ(500, child1.get());
ASSERT_EQ(0, child2.get());
ASSERT_EQ(500, root.get());
ASSERT_EQ(500, child1->get());
ASSERT_EQ(0, child2->get());
ASSERT_EQ(500, root->get());

// alloc 10 on child2
child2.alloc(10);
ASSERT_EQ(500, child1.get());
ASSERT_EQ(10, child2.get());
ASSERT_EQ(510, root.get());
child2->alloc(10);
ASSERT_EQ(500, child1->get());
ASSERT_EQ(10, child2->get());
ASSERT_EQ(510, root->get());

// free 500 on child1
child1.free(500);
ASSERT_EQ(0, child1.get());
ASSERT_EQ(10, child2.get());
ASSERT_EQ(10, root.get());
child1->free(500);
ASSERT_EQ(0, child1->get());
ASSERT_EQ(10, child2->get());
ASSERT_EQ(10, root->get());

// free 10 on child2
child2.free(10);
ASSERT_EQ(0, child1.get());
ASSERT_EQ(0, child2.get());
ASSERT_EQ(0, root.get());
child2->free(10);
ASSERT_EQ(0, child1->get());
ASSERT_EQ(0, child2->get());
ASSERT_EQ(0, root->get());
}
CATCH

Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Interpreters/ProcessList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,18 @@ ProcessList::EntryPtr ProcessList::insert(
/// setting from one query effectively sets values for all other queries.

/// Track memory usage for all simultaneously running queries from single user.
user_process_list.user_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)");
current_memory_tracker->setNext(&user_process_list.user_memory_tracker);
user_process_list.user_memory_tracker->setOrRaiseLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker->setDescription("(for user)");
current_memory_tracker->setNext(user_process_list.user_memory_tracker.get());

/// Track memory usage for all simultaneously running queries.
/// You should specify this value in configuration for default profile,
/// not for specific users, sessions or queries,
/// because this setting is effectively global.
total_memory_tracker.setOrRaiseLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker.setBytesThatRssLargerThanLimit(settings.bytes_that_rss_larger_than_limit);
total_memory_tracker.setDescription("(total)");
user_process_list.user_memory_tracker.setNext(&total_memory_tracker);
total_memory_tracker->setOrRaiseLimit(settings.max_memory_usage_for_all_queries);
total_memory_tracker->setBytesThatRssLargerThanLimit(settings.bytes_that_rss_larger_than_limit);
total_memory_tracker->setDescription("(total)");
user_process_list.user_memory_tracker->setNext(total_memory_tracker.get());
}

if (!total_network_throttler && settings.max_network_bandwidth_for_all_users)
Expand Down Expand Up @@ -244,8 +244,8 @@ ProcessListEntry::~ProcessListEntry()
if (parent.cur_size == 0)
{
/// Reset MemoryTracker, similarly (see above).
parent.total_memory_tracker.logPeakMemoryUsage();
parent.total_memory_tracker.reset();
parent.total_memory_tracker->logPeakMemoryUsage();
parent.total_memory_tracker->reset();
parent.total_network_throttler.reset();
}
}
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Interpreters/ProcessList.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ProcessListElement
/// Progress of output stream
Progress progress_out;

std::shared_ptr<MemoryTracker> memory_tracker;
MemoryTrackerPtr memory_tracker;

QueryPriorities::Handle priority_handle;

Expand Down Expand Up @@ -115,7 +115,7 @@ class ProcessListElement
QueryPriorities::Handle && priority_handle_)
: query(query_)
, client_info(client_info_)
, memory_tracker(std::make_shared<MemoryTracker>(max_memory_usage))
, memory_tracker(MemoryTracker::create(max_memory_usage))
, priority_handle(std::move(priority_handle_))
{
memory_tracker->setDescription("(for query)");
Expand Down Expand Up @@ -205,18 +205,21 @@ struct ProcessListForUser
QueryToElement queries;

/// Limit and counter for memory of all simultaneously running queries of single user.
MemoryTracker user_memory_tracker;
MemoryTrackerPtr user_memory_tracker;

/// Count network usage for all simultaneously running queries of single user.
ThrottlerPtr user_throttler;

ProcessListForUser()
: user_memory_tracker(MemoryTracker::create())
{}
/// Clears MemoryTracker for the user.
/// Sometimes it is important to reset the MemoryTracker, because it may accumulate skew
/// due to the fact that there are cases when memory can be allocated while processing the query, but released later.
/// Clears network bandwidth Throttler, so it will not count periods of inactivity.
void reset()
{
user_memory_tracker.reset();
user_memory_tracker->reset();
if (user_throttler)
user_throttler.reset();
}
Expand Down Expand Up @@ -281,7 +284,7 @@ class ProcessList
QueryPriorities priorities;

/// Limit and counter for memory of all simultaneously running queries.
MemoryTracker total_memory_tracker;
MemoryTrackerPtr total_memory_tracker;

/// Limit network bandwidth for all users
ThrottlerPtr total_network_throttler;
Expand All @@ -293,6 +296,7 @@ class ProcessList
ProcessList(size_t max_size_ = 0)
: cur_size(0)
, max_size(max_size_)
, total_memory_tracker(MemoryTracker::create())
{}

using EntryPtr = std::shared_ptr<ProcessListEntry>;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ void BackgroundProcessingPool::threadFunction()
addThreadId(getTid());
}

MemoryTracker memory_tracker;
memory_tracker.setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
current_memory_tracker = &memory_tracker;
auto memory_tracker = MemoryTracker::create();
memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
current_memory_tracker = memory_tracker.get();

pcg64 rng(randomSeed());
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
Expand Down
Loading

0 comments on commit 7bba09a

Please sign in to comment.