Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce memory consumption during migration #4017

Merged
merged 2 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 25 additions & 36 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void JournalStreamer::Write(std::string_view str) {
DVLOG(2) << "Writing " << str.size() << " bytes";

size_t total_pending = pending_buf_.size() + str.size();

if (in_flight_bytes_ > 0) {
// We can not flush data while there are in flight requests because AsyncWrite
// is not atomic. Therefore, we just aggregate.
Expand Down Expand Up @@ -212,17 +213,11 @@ void RestoreStreamer::Run() {
if (fiber_cancelled_)
return;

bool written = false;
cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) {
db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);
if (WriteBucket(it)) {
written = true;
}
WriteBucket(it);
});
if (written) {
ThrottleIfNeeded();
}

if (++last_yield >= 100) {
ThisFiber::Yield();
Expand Down Expand Up @@ -282,18 +277,15 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {
return my_slots_.Contains(slot_id);
}

bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
bool written = false;

void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
if (it.GetVersion() < snapshot_version_) {
FiberAtomicGuard fg;
it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
const auto& pv = it->second;
string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) {
written = true;

uint64_t expire = 0;
if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
Expand All @@ -304,8 +296,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
}
}
}

return written;
ThrottleIfNeeded();
}

void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
Expand All @@ -332,33 +323,31 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr
string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);

io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());
io::StringSink restore_cmd_sink;
{ // to destroy extra copy
io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());

args.push_back("ABSTTL"); // Means expire string is since epoch
args.push_back("ABSTTL"); // Means expire string is since epoch

if (pk.IsSticky()) {
args.push_back("STICK");
}

WriteCommand(journal::Entry::Payload("RESTORE", ArgSlice(args)));
}
if (pk.IsSticky()) {
args.push_back("STICK");
}

void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) {
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
cmd_payload);
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload("RESTORE", ArgSlice(args)));

// TODO: From WriteEntry to till Write we tripple copy the PrimeValue. It's ver in-efficient and
JournalWriter writer{&restore_cmd_sink};
writer.Write(entry);
}
// TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and
// will burn CPU for large values.
io::StringSink sink;
JournalWriter writer{&sink};
writer.Write(entry);
Write(sink.str());
Write(restore_cmd_sink.str());
}

} // namespace dfly
3 changes: 1 addition & 2 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ class RestoreStreamer : public JournalStreamer {
bool ShouldWrite(cluster::SlotId slot_id) const;

// Returns whether anything was written
bool WriteBucket(PrimeTable::bucket_iterator it);
void WriteBucket(PrimeTable::bucket_iterator it);
void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms);
void WriteCommand(journal::Entry::Payload cmd_payload);

DbSlice* db_slice_;
DbTableArray db_array_;
Expand Down
18 changes: 5 additions & 13 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1994,37 +1994,29 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_


@pytest.mark.skip("Takes more than 10 minutes")
@dfly_args({"proactor_threads": 12, "cluster_mode": "yes"})
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFactory):
# Check data migration from one node to another
instances = [
df_factory.create(
maxmemory="15G",
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
vmodule="streamer=9",
)
for i in range(2)
for i in range(3)
]

replica = df_factory.create(
port=BASE_PORT + 3,
admin_port=BASE_PORT + 3 + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
)

df_factory.start_all(instances + [replica])
df_factory.start_all(instances)

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
for i in range(1, len(instances)):
nodes[i].slots = []

await replica.admin_client().execute_command(f"replicaof localhost {nodes[0].instance.port}")

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await nodes[0].client.execute_command("DEBUG POPULATE 22500000 test 1000 RAND SLOTS 0 16383")
await nodes[0].client.execute_command("DEBUG POPULATE 5000000 test 1000 RAND SLOTS 0 16383")

await asyncio.sleep(2)

Expand Down
Loading