Skip to content

Commit

Permalink
feat: add defragment command
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed May 2, 2024
1 parent a95419b commit 9834791
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
30 changes: 18 additions & 12 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,13 @@ bool EngineShard::DefragTaskState::CheckRequired() {
return false;
}

bool EngineShard::DoDefrag() {
void EngineShard::ForceDefrag() {
DefragTaskState defrag_state_;
while (DoDefrag(defrag_state_))
;
}

bool EngineShard::DoDefrag(DefragTaskState& defrag_state) {
// --------------------------------------------------------------------------
// NOTE: This task is running with exclusive access to the shard.
// i.e. - Since we are using shared noting access here, and all access
Expand All @@ -279,18 +285,18 @@ bool EngineShard::DoDefrag() {
auto& slice = db_slice();

// If we moved to an invalid db, skip as long as it's not the last one
while (!slice.IsDbValid(defrag_state_.dbid) && defrag_state_.dbid + 1 < slice.db_array_size())
defrag_state_.dbid++;
while (!slice.IsDbValid(defrag_state.dbid) && defrag_state.dbid + 1 < slice.db_array_size())
defrag_state.dbid++;

// If we found no valid db, we finished traversing and start from scratch next time
if (!slice.IsDbValid(defrag_state_.dbid)) {
defrag_state_.ResetScanState();
if (!slice.IsDbValid(defrag_state.dbid)) {
defrag_state.ResetScanState();
return false;
}

DCHECK(slice.IsDbValid(defrag_state_.dbid));
auto [prime_table, expire_table] = slice.GetTables(defrag_state_.dbid);
PrimeTable::Cursor cur = defrag_state_.cursor;
DCHECK(slice.IsDbValid(defrag_state.dbid));
auto [prime_table, expire_table] = slice.GetTables(defrag_state.dbid);
PrimeTable::Cursor cur = defrag_state.cursor;
uint64_t reallocations = 0;
unsigned traverses_count = 0;
uint64_t attempts = 0;
Expand All @@ -308,16 +314,16 @@ bool EngineShard::DoDefrag() {
traverses_count++;
} while (traverses_count < kMaxTraverses && cur);

defrag_state_.UpdateScanState(cur.value());
defrag_state.UpdateScanState(cur.value());

if (reallocations > 0) {
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << reallocations
<< " times, did it in " << traverses_count << " cursor is at the "
<< (defrag_state_.cursor == kCursorDoneState ? "end" : "in progress");
<< (defrag_state.cursor == kCursorDoneState ? "end" : "in progress");
} else {
VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count
<< " times out of maximum " << kMaxTraverses << ", with cursor at "
<< (defrag_state_.cursor == kCursorDoneState ? "end" : "in progress")
<< (defrag_state.cursor == kCursorDoneState ? "end" : "in progress")
<< " but no location for defrag were found";
}

Expand All @@ -343,7 +349,7 @@ uint32_t EngineShard::DefragTask() {
if (defrag_state_.CheckRequired()) {
VLOG(2) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor
<< ", underutilzation found: " << defrag_state_.underutilized_found;
if (DoDefrag()) {
if (DoDefrag(defrag_state_)) {
// we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
Expand Down
4 changes: 3 additions & 1 deletion src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class EngineShard {

TxQueueInfo AnalyzeTxQueue() const;

void ForceDefrag();

private:
struct DefragTaskState {
size_t dbid = 0u;
Expand Down Expand Up @@ -225,7 +227,7 @@ class EngineShard {
// de-fragmentation option for entries. This function will return the new cursor at the end of the
// scan This function is called from context of StartDefragTask
// return true if we did not complete the shard scan
bool DoDefrag();
bool DoDefrag(DefragTaskState& state);

TaskQueue queue_;

Expand Down
8 changes: 8 additions & 0 deletions src/server/memory_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ void MemoryCmd::Run(CmdArgList args) {
return Track(args);
}

if (sub_cmd == "DEFRAGMENT") {
shard_set->pool()->DispatchOnAll([this](util::ProactorBase*) {
if (auto* shard = EngineShard::tlocal(); shard)
shard->ForceDefrag();
});
return cntx_->SendSimpleString("OK");
}

string err = UnknownSubCmd(sub_cmd, "MEMORY");
return cntx_->SendError(err, kSyntaxErrType);
}
Expand Down

0 comments on commit 9834791

Please sign in to comment.