Skip to content

Commit

Permalink
Merge branch 'master' into enhance-join-test-framework
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Aug 8, 2022
2 parents 51ed3fa + 97fa91f commit b63d8d9
Show file tree
Hide file tree
Showing 54 changed files with 2,279 additions and 716 deletions.
275 changes: 148 additions & 127 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions dbms/src/Debug/dbgFuncCoprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ QueryTasks queryPlanToQueryTasks(
ExecutorPtr root_executor,
size_t & executor_index,
const Context & context);

BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DAGProperties & properties, QueryTasks & query_tasks, MakeResOutputStream & func_wrap_output_stream);

BlockInputStreamPtr executeMPPQuery(Context & context, const DAGProperties & properties, QueryTasks & query_tasks);
namespace Debug
{
void setServiceAddr(const std::string & addr);
Expand Down
119 changes: 53 additions & 66 deletions dbms/src/Encryption/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,47 +289,31 @@ void WriteLimiter::updateMaxBytesPerSec(Int64 max_bytes_per_sec)
}

ReadLimiter::ReadLimiter(
std::function<Int64()> getIOStatistic_,
std::function<Int64()> get_read_bytes_,
Int64 rate_limit_per_sec_,
LimiterType type_,
Int64 get_io_stat_period_us,
UInt64 refill_period_ms_)
: WriteLimiter(rate_limit_per_sec_, type_, refill_period_ms_)
, getIOStatistic(std::move(getIOStatistic_))
, last_stat_bytes(getIOStatistic())
, last_stat_time(now())
, get_read_bytes(std::move(get_read_bytes_))
, last_stat_bytes(get_read_bytes())
, log(Logger::get("ReadLimiter"))
, get_io_statistic_period_us(get_io_stat_period_us)
{}

Int64 ReadLimiter::getAvailableBalance()
{
TimePoint us = now();
// Not call getIOStatisctics() every time for performance.
// If the clock back, elapsed_us could be negative.
Int64 elapsed_us = std::chrono::duration_cast<std::chrono::microseconds>(us - last_stat_time).count();
if (get_io_statistic_period_us != 0 && elapsed_us < get_io_statistic_period_us)
{
return available_balance;
}

return refreshAvailableBalance();
}

Int64 ReadLimiter::refreshAvailableBalance()
{
TimePoint us = now();
Int64 bytes = getIOStatistic();
if (bytes < last_stat_bytes)
Int64 bytes = get_read_bytes();
if (unlikely(bytes < last_stat_bytes))
{
LOG_FMT_WARNING(
log,
"last_stat {}:{} current_stat {}:{}",
last_stat_time.time_since_epoch().count(),
"last_stat: {} current_stat: {}",
last_stat_bytes,
us.time_since_epoch().count(),
bytes);
}
else if (likely(bytes == last_stat_bytes))
{
return available_balance;
}
else
{
Int64 real_alloc_bytes = bytes - last_stat_bytes;
Expand All @@ -338,7 +322,6 @@ Int64 ReadLimiter::refreshAvailableBalance()
alloc_bytes += real_alloc_bytes;
}
last_stat_bytes = bytes;
last_stat_time = us;
return available_balance;
}

Expand Down Expand Up @@ -381,17 +364,18 @@ void ReadLimiter::refillAndAlloc()
}
}

IORateLimiter::IORateLimiter()
IORateLimiter::IORateLimiter(UInt64 update_read_info_period_ms_)
: log(Logger::get("IORateLimiter"))
, stop(false)
, update_read_info_period_ms(update_read_info_period_ms_)
{}

IORateLimiter::~IORateLimiter()
{
stop.store(true, std::memory_order_relaxed);
if (auto_tune_thread.joinable())
if (auto_tune_and_get_read_info_thread.joinable())
{
auto_tune_thread.join();
auto_tune_and_get_read_info_thread.join();
}
}

Expand All @@ -409,13 +393,13 @@ extern thread_local bool is_background_thread;

WriteLimiterPtr IORateLimiter::getWriteLimiter()
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
return is_background_thread ? bg_write_limiter : fg_write_limiter;
}

ReadLimiterPtr IORateLimiter::getReadLimiter()
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
return is_background_thread ? bg_read_limiter : fg_read_limiter;
}

Expand All @@ -426,7 +410,7 @@ void IORateLimiter::updateConfig(Poco::Util::AbstractConfiguration & config_)
{
return;
}
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
updateReadLimiter(io_config.getBgReadMaxBytesPerSec(), io_config.getFgReadMaxBytesPerSec());
updateWriteLimiter(io_config.getBgWriteMaxBytesPerSec(), io_config.getFgWriteMaxBytesPerSec());
}
Expand Down Expand Up @@ -455,11 +439,10 @@ void IORateLimiter::updateReadLimiter(Int64 bg_bytes, Int64 fg_bytes)
{
LOG_FMT_INFO(log, "updateReadLimiter: bg_bytes {} fg_bytes {}", bg_bytes, fg_bytes);
auto get_bg_read_io_statistic = [&]() {
return getCurrentIOInfo().bg_read_bytes;
return read_info.bg_read_bytes.load(std::memory_order_relaxed);
};
auto get_fg_read_io_statistic = [&]() {
auto io_info = getCurrentIOInfo();
return std::max(0, io_info.total_read_bytes - io_info.bg_read_bytes);
return read_info.fg_read_bytes.load(std::memory_order_relaxed);
};

if (bg_bytes == 0)
Expand Down Expand Up @@ -526,7 +509,7 @@ void IORateLimiter::setBackgroundThreadIds(std::vector<pid_t> thread_ids)
LOG_FMT_INFO(log, "bg_thread_ids {} => {}", bg_thread_ids.size(), bg_thread_ids);
}

std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fname [[maybe_unused]])
Int64 IORateLimiter::getReadBytes(const std::string & fname [[maybe_unused]])
{
#if __linux__
std::ifstream ifs(fname);
Expand All @@ -538,7 +521,6 @@ std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fna
}
std::string s;
Int64 read_bytes = -1;
Int64 write_bytes = -1;
while (std::getline(ifs, s))
{
if (s.empty())
Expand All @@ -557,49 +539,43 @@ std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fna
boost::algorithm::trim(values[1]);
read_bytes = std::stoll(values[1]);
}
else if (values[0] == "write_bytes")
{
boost::algorithm::trim(values[1]);
write_bytes = std::stoll(values[1]);
}
}
if (read_bytes == -1 || write_bytes == -1)
if (read_bytes == -1)
{
auto msg = fmt::format("read_bytes: {} write_bytes: {} Invalid result.", read_bytes, write_bytes);
auto msg = fmt::format("read_bytes: {}. Invalid result.", read_bytes);
LOG_ERROR(log, msg);
throw Exception(msg, ErrorCodes::UNKNOWN_EXCEPTION);
}
return {read_bytes, write_bytes};
return read_bytes;
#else
return {0, 0};
return 0;
#endif
}

IORateLimiter::IOInfo IORateLimiter::getCurrentIOInfo()
void IORateLimiter::getCurrentIOInfo()
{
static const pid_t pid = getpid();
IOInfo io_info;

// Read I/O info of each background threads.
// Read read info of each background threads.
Int64 bg_read_bytes_tmp{0};
for (pid_t tid : bg_thread_ids)
{
const std::string thread_io_fname = fmt::format("/proc/{}/task/{}/io", pid, tid);
Int64 read_bytes, write_bytes;
std::tie(read_bytes, write_bytes) = getReadWriteBytes(thread_io_fname);
io_info.bg_read_bytes += read_bytes;
io_info.bg_write_bytes += write_bytes;
Int64 read_bytes;
read_bytes = getReadBytes(thread_io_fname);
bg_read_bytes_tmp += read_bytes;
}
read_info.bg_read_bytes.store(bg_read_bytes_tmp, std::memory_order_relaxed);

// Read I/O info of this process.
// Read read info of this process.
static const std::string proc_io_fname = fmt::format("/proc/{}/io", pid);
std::tie(io_info.total_read_bytes, io_info.total_write_bytes) = getReadWriteBytes(proc_io_fname);
io_info.update_time = std::chrono::system_clock::now();
return io_info;
Int64 fg_read_bytes_tmp{getReadBytes(proc_io_fname) - bg_read_bytes_tmp};
read_info.fg_read_bytes.store(std::max(0, fg_read_bytes_tmp), std::memory_order_relaxed);
}

void IORateLimiter::setStop()
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
if (bg_write_limiter != nullptr)
{
auto sz = bg_write_limiter->setStop();
Expand All @@ -624,17 +600,28 @@ void IORateLimiter::setStop()

void IORateLimiter::runAutoTune()
{
auto auto_tune_worker = [&]() {
auto auto_tune_and_get_read_info_worker = [&]() {
using time_point = std::chrono::time_point<std::chrono::system_clock>;
using clock = std::chrono::system_clock;
time_point auto_tune_time = clock::now();
time_point update_read_info_time = auto_tune_time;
while (!stop.load(std::memory_order_relaxed))
{
::sleep(io_config.auto_tune_sec > 0 ? io_config.auto_tune_sec : 1);
if (io_config.auto_tune_sec > 0)
std::this_thread::sleep_for(std::chrono::milliseconds(update_read_info_period_ms));
auto now_time_point = clock::now();
if ((io_config.auto_tune_sec > 0) && (now_time_point - auto_tune_time >= std::chrono::seconds(io_config.auto_tune_sec)))
{
autoTune();
auto_tune_time = now_time_point;
}
if ((bg_read_limiter || fg_read_limiter) && likely(now_time_point - update_read_info_time >= std::chrono::milliseconds(update_read_info_period_ms)))
{
getCurrentIOInfo();
update_read_info_time = now_time_point;
}
}
};
auto_tune_thread = std::thread(auto_tune_worker);
auto_tune_and_get_read_info_thread = std::thread(auto_tune_and_get_read_info_worker);
}

std::unique_ptr<IOLimitTuner> IORateLimiter::createIOLimitTuner()
Expand All @@ -643,7 +630,7 @@ std::unique_ptr<IOLimitTuner> IORateLimiter::createIOLimitTuner()
ReadLimiterPtr bg_read, fg_read;
StorageIORateLimitConfig t_io_config;
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
bg_write = bg_write_limiter;
fg_write = fg_write_limiter;
bg_read = bg_read_limiter;
Expand All @@ -666,12 +653,12 @@ void IORateLimiter::autoTune()
auto tune_result = tuner->tune();
if (tune_result.read_tuned)
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
updateReadLimiter(tune_result.max_bg_read_bytes_per_sec, tune_result.max_fg_read_bytes_per_sec);
}
if (tune_result.write_tuned)
{
std::lock_guard lock(mtx_);
std::lock_guard lock(mtx);
updateWriteLimiter(tune_result.max_bg_write_bytes_per_sec, tune_result.max_fg_write_bytes_per_sec);
}
}
Expand Down
Loading

0 comments on commit b63d8d9

Please sign in to comment.