diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 29196b355e4c..6d4bb7f81af9 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -90,6 +90,7 @@ void JournalStreamer::Write(std::string_view str) { DVLOG(2) << "Writing " << str.size() << " bytes"; size_t total_pending = pending_buf_.size() + str.size(); + if (in_flight_bytes_ > 0) { // We can not flush data while there are in flight requests because AsyncWrite // is not atomic. Therefore, we just aggregate. @@ -212,17 +213,11 @@ void RestoreStreamer::Run() { if (fiber_cancelled_) return; - bool written = false; cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) { db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, DbSlice::Iterator::FromPrime(it), snapshot_version_); - if (WriteBucket(it)) { - written = true; - } + WriteBucket(it); }); - if (written) { - ThrottleIfNeeded(); - } if (++last_yield >= 100) { ThisFiber::Yield(); @@ -282,18 +277,15 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const { return my_slots_.Contains(slot_id); } -bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { - bool written = false; - +void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { if (it.GetVersion() < snapshot_version_) { + FiberAtomicGuard fg; it.SetVersion(snapshot_version_); string key_buffer; // we can reuse it for (; !it.is_done(); ++it) { const auto& pv = it->second; string_view key = it->first.GetSlice(&key_buffer); if (ShouldWrite(key)) { - written = true; - uint64_t expire = 0; if (pv.HasExpire()) { auto eit = db_slice_->databases()[0]->expire.Find(it->first); @@ -304,8 +296,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { } } } - - return written; + ThrottleIfNeeded(); } void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { @@ -332,33 +323,31 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr string expire_str = absl::StrCat(expire_ms); args.push_back(expire_str); - io::StringSink value_dump_sink; - SerializerBase::DumpObject(pv, &value_dump_sink); - args.push_back(value_dump_sink.str()); + io::StringSink restore_cmd_sink; + { // to destroy extra copy + io::StringSink value_dump_sink; + SerializerBase::DumpObject(pv, &value_dump_sink); + args.push_back(value_dump_sink.str()); - args.push_back("ABSTTL"); // Means expire string is since epoch + args.push_back("ABSTTL"); // Means expire string is since epoch - if (pk.IsSticky()) { - args.push_back("STICK"); - } - - WriteCommand(journal::Entry::Payload("RESTORE", ArgSlice(args))); -} + if (pk.IsSticky()) { + args.push_back("STICK"); + } -void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) { - journal::Entry entry(0, // txid - journal::Op::COMMAND, // single command - 0, // db index - 1, // shard count - 0, // slot-id, but it is ignored at this level - cmd_payload); + journal::Entry entry(0, // txid + journal::Op::COMMAND, // single command + 0, // db index + 1, // shard count + 0, // slot-id, but it is ignored at this level + journal::Entry::Payload("RESTORE", ArgSlice(args))); - // TODO: From WriteEntry to till Write we tripple copy the PrimeValue. It's ver in-efficient and + JournalWriter writer{&restore_cmd_sink}; + writer.Write(entry); + } + // TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and // will burn CPU for large values. - io::StringSink sink; - JournalWriter writer{&sink}; - writer.Write(entry); - Write(sink.str()); + Write(restore_cmd_sink.str()); } } // namespace dfly diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index ce60f1071ec9..c625b60c5157 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -97,9 +97,8 @@ class RestoreStreamer : public JournalStreamer { bool ShouldWrite(cluster::SlotId slot_id) const; // Returns whether anything was written - bool WriteBucket(PrimeTable::bucket_iterator it); + void WriteBucket(PrimeTable::bucket_iterator it); void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms); - void WriteCommand(journal::Entry::Payload cmd_payload); DbSlice* db_slice_; DbTableArray db_array_; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 48b9e15532a9..3a19cfd022d6 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1994,7 +1994,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_ @pytest.mark.skip("Takes more than 10 minutes") -@dfly_args({"proactor_threads": 12, "cluster_mode": "yes"}) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFactory): # Check data migration from one node to another instances = [ @@ -2002,29 +2002,21 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact maxmemory="15G", port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000, - vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", + vmodule="streamer=9", ) - for i in range(2) + for i in range(3) ] - replica = df_factory.create( - port=BASE_PORT + 3, - admin_port=BASE_PORT + 3 + 1000, - vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", - ) - - df_factory.start_all(instances + [replica]) + df_factory.start_all(instances) nodes = [(await create_node_info(instance)) for instance in instances] nodes[0].slots = [(0, 16383)] for i in range(1, len(instances)): nodes[i].slots = [] - await replica.admin_client().execute_command(f"replicaof localhost {nodes[0].instance.port}") - await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - await nodes[0].client.execute_command("DEBUG POPULATE 22500000 test 1000 RAND SLOTS 0 16383") + await nodes[0].client.execute_command("DEBUG POPULATE 5000000 test 1000 RAND SLOTS 0 16383") await asyncio.sleep(2)