From 3dd6c4959c3056d6f007cc4001e0923302a566a2 Mon Sep 17 00:00:00 2001 From: Borys Date: Wed, 8 May 2024 14:26:42 +0300 Subject: [PATCH] feat: add defragment command (#3003) * feat: add defragment command and improve auto defragmentation algorithm --- src/server/dragonfly_test.cc | 5 ++- src/server/engine_shard_set.cc | 38 ++++++++++++----- src/server/engine_shard_set.h | 5 ++- src/server/memory_cmd.cc | 8 ++++ tools/defrag_mem_test.py | 74 +++++++++++++++++++++++----------- 5 files changed, 93 insertions(+), 37 deletions(-) diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 2776f989be47..7ef4cb7ccd16 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -23,6 +23,7 @@ extern "C" { #include "server/test_utils.h" ABSL_DECLARE_FLAG(float, mem_defrag_threshold); +ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval); ABSL_DECLARE_FLAG(std::vector, rename_command); ABSL_DECLARE_FLAG(double, oom_deny_ratio); ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float); @@ -641,7 +642,9 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) { GTEST_SKIP() << "Defragmentation via idle task is only supported in io uring"; } - absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.02); + // mem_defrag_threshold is based on RSS statistic, but we don't count it in the test + absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.0); + absl::SetFlag(&FLAGS_mem_defrag_check_sec_interval, 0); // Fill data into dragonfly and then check if we have // any location in memory to defrag. See issue #448 for details about this. constexpr size_t kMaxMemoryForTest = 1'100'000; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index a246a3603cb1..6bb8f8cd0219 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -69,6 +69,9 @@ ABSL_FLAG(string, shard_round_robin_prefix, "", "support up to a few hundreds of prefixes. Note: prefix is looked inside hash tags when " "cluster mode is enabled."); +ABSL_FLAG(uint32_t, mem_defrag_check_sec_interval, 10, + "Number of seconds between every defragmentation necessity check"); + namespace dfly { using namespace tiering::literals; @@ -222,7 +225,6 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) { cursor = cursor_val; - underutilized_found = false; // Once we're done with a db, jump to the next if (cursor == kCursorDoneState) { dbid++; @@ -231,7 +233,6 @@ void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) { void EngineShard::DefragTaskState::ResetScanState() { dbid = cursor = 0u; - underutilized_found = false; } // This function checks 3 things: @@ -241,8 +242,9 @@ void EngineShard::DefragTaskState::ResetScanState() { // 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory // (control by mem_defrag_waste_threshold flag) bool EngineShard::DefragTaskState::CheckRequired() { - if (cursor > kCursorDoneState || underutilized_found) { - VLOG(2) << "cursor: " << cursor << " and underutilized_found " << underutilized_found; + if (is_force_defrag || cursor > kCursorDoneState) { + is_force_defrag = false; + VLOG(2) << "cursor: " << cursor << " and is_force_defrag " << is_force_defrag; return true; } @@ -251,20 +253,35 @@ bool EngineShard::DefragTaskState::CheckRequired() { return false; } - const std::size_t threshold_mem = memory_per_shard * GetFlag(FLAGS_mem_defrag_threshold); - const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold); + const std::size_t global_threshold = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold); + if (global_threshold > rss_mem_current.load(memory_order_relaxed)) { + return false; + } + + const auto now = time(nullptr); + const auto seconds_from_prev_check = now - last_check_time; + const auto mem_defrag_interval = GetFlag(FLAGS_mem_defrag_check_sec_interval); + + if (seconds_from_prev_check < mem_defrag_interval) { + return false; + } + last_check_time = now; ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold)); - if (threshold_mem < usage.commited && - usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) { + const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold); + if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) { VLOG(1) << "memory issue found for memory " << usage; - underutilized_found = true; + return true; } return false; } +void EngineShard::ForceDefrag() { + defrag_state_.is_force_defrag = true; +} + bool EngineShard::DoDefrag() { // -------------------------------------------------------------------------- // NOTE: This task is running with exclusive access to the shard. @@ -341,8 +358,7 @@ uint32_t EngineShard::DefragTask() { const auto shard_id = db_slice().shard_id(); if (defrag_state_.CheckRequired()) { - VLOG(2) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor - << ", underutilzation found: " << defrag_state_.underutilized_found; + VLOG(2) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor; if (DoDefrag()) { // we didn't finish the scan return util::ProactorBase::kOnIdleMaxLevel; diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index dc0f576b719a..70f87b0d0c79 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -183,11 +183,14 @@ class EngineShard { TxQueueInfo AnalyzeTxQueue() const; + void ForceDefrag(); + private: struct DefragTaskState { size_t dbid = 0u; uint64_t cursor = 0u; - bool underutilized_found = false; + time_t last_check_time = 0; + bool is_force_defrag = false; // check the current threshold and return true if // we need to do the defragmentation diff --git a/src/server/memory_cmd.cc b/src/server/memory_cmd.cc index 87faadd84fbb..14bfa74a2c93 100644 --- a/src/server/memory_cmd.cc +++ b/src/server/memory_cmd.cc @@ -154,6 +154,14 @@ void MemoryCmd::Run(CmdArgList args) { return Track(args); } + if (sub_cmd == "DEFRAGMENT") { + shard_set->pool()->DispatchOnAll([this](util::ProactorBase*) { + if (auto* shard = EngineShard::tlocal(); shard) + shard->ForceDefrag(); + }); + return cntx_->SendSimpleString("OK"); + } + string err = UnknownSubCmd(sub_cmd, "MEMORY"); return cntx_->SendError(err, kSyntaxErrType); } diff --git a/tools/defrag_mem_test.py b/tools/defrag_mem_test.py index 39a437f97c1c..cc2b151265f2 100755 --- a/tools/defrag_mem_test.py +++ b/tools/defrag_mem_test.py @@ -4,12 +4,13 @@ import async_timeout import sys import argparse -''' + +""" To install: pip install -r requirements.txt Run -dragonfly --mem_defrag_threshold=0.01 --commit_use_threshold=1.2 --mem_utilization_threshold=0.8 -defrag_mem_test.py -k 800000 -v 645 +dragonfly --mem_defrag_threshold=0.01 --mem_defrag_waste_threshold=0.01 +defrag_mem_test.py -k 8000000 -v 645 This program would try to re-create the issue with memory defragmentation. See issue number 448 for more details. @@ -29,7 +30,8 @@ NOTE: If this seems to get stuck please kill it with ctrl+c This can happen in case we don't have "defrag_realloc_total > 0" -''' +""" + class TaskCancel: def __init__(self): @@ -41,14 +43,16 @@ def dont_stop(self): def stop(self): self.run = False + async def run_cmd(connection, cmd, sub_val): val = await connection.execute_command(cmd, sub_val) return val + async def handle_defrag_stats(connection, prev): info = await run_cmd(connection, "info", "stats") if info is not None: - if info['defrag_task_invocation_total'] != prev: + if info["defrag_task_invocation_total"] != prev: print("--------------------------------------------------------------") print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}") print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}") @@ -56,22 +60,23 @@ async def handle_defrag_stats(connection, prev): print("--------------------------------------------------------------") if info["defrag_realloc_total"] > 0: return True, None - return False, info['defrag_task_invocation_total'] + return False, info["defrag_task_invocation_total"] return False, None + async def memory_stats(connection): print("--------------------------------------------------------------") info = await run_cmd(connection, "info", "memory") - print(f"memory commited: {info['comitted_memory']:,}") + # print(f"memory commited: {info['comitted_memory']:,}") print(f"memory used: {info['used_memory']:,}") - print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}") + # print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}") print("--------------------------------------------------------------") async def stats_check(connection, condition): try: - defrag_task_invocation_total = 0; - runs=0 + defrag_task_invocation_total = 0 + runs = 0 while condition.dont_stop(): await asyncio.sleep(0.3) done, d = await handle_defrag_stats(connection, defrag_task_invocation_total) @@ -101,13 +106,15 @@ async def delete_keys(connection, keys): results = await connection.delete(*keys) return results + def generate_keys(pattern: str, count: int, batch_size: int) -> list: for i in range(1, count, batch_size): batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)] yield batch + async def mem_cleanup(connection, pattern, num, cond, keys_count): - counter=0 + counter = 0 for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950): if cond.dont_stop() == False: print(f"task number {num} that deleted keys {pattern} finished") @@ -130,9 +137,17 @@ async def run_tasks(pool, key_name, value_size, keys_count): tasks = [] count = 0 for key in keys: - pattern=f"{key}:" + pattern = f"{key}:" print(f"deleting keys from {pattern}") - tasks.append(mem_cleanup(connection=connection, pattern=pattern, num=count, cond=stop_cond, keys_count=int(keys_count))) + tasks.append( + mem_cleanup( + connection=connection, + pattern=pattern, + num=count, + cond=stop_cond, + keys_count=int(keys_count), + ) + ) count += 1 monitor_task = asyncio.create_task(stats_check(connection, stop_cond)) total = await asyncio.gather(*tasks, return_exceptions=True) @@ -147,29 +162,40 @@ async def run_tasks(pool, key_name, value_size, keys_count): def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379): - async_pool = aioredis.ConnectionPool(host=host, port=port, - db=0, decode_responses=True, max_connections=16) + async_pool = aioredis.ConnectionPool( + host=host, port=port, db=0, decode_responses=True, max_connections=16 + ) loop = asyncio.new_event_loop() - success = loop.run_until_complete(run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count)) + success = loop.run_until_complete( + run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count) + ) return success if __name__ == "__main__": - parser = argparse.ArgumentParser(description='active memory testing', formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('-k', '--keys', type=int, default=800000, help='total number of keys') - parser.add_argument('-v', '--value_size', type=int, default=645, help='size of the values') - parser.add_argument('-n', '--key_name', type=str, default="key-for-testing", help='the base key name') - parser.add_argument('-s', '--server', type=str, default="localhost", help='server host name') - parser.add_argument('-p', '--port', type=int, default=6379, help='server port number') + parser = argparse.ArgumentParser( + description="active memory testing", formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("-k", "--keys", type=int, default=800000, help="total number of keys") + parser.add_argument("-v", "--value_size", type=int, default=645, help="size of the values") + parser.add_argument( + "-n", "--key_name", type=str, default="key-for-testing", help="the base key name" + ) + parser.add_argument("-s", "--server", type=str, default="localhost", help="server host name") + parser.add_argument("-p", "--port", type=int, default=6379, help="server port number") args = parser.parse_args() keys_num = args.keys key_name = args.key_name value_size = args.value_size host = args.server port = args.port - print(f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}") - result = connect_and_run(key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port) + print( + f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}" + ) + result = connect_and_run( + key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port + ) if result == True: print("finished successfully") else: