From 0642a47697d2f0083c711cebd4c039482b990ab9 Mon Sep 17 00:00:00 2001 From: Borys Date: Mon, 3 Jun 2024 17:30:15 +0300 Subject: [PATCH 1/2] fix: fix RestoreStreamer to prevent bucket skipping #2830 --- src/server/journal/streamer.cc | 9 +++++++++ tests/dragonfly/cluster_test.py | 21 ++++----------------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index ac6adf198043..68bf3d406bda 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -83,6 +83,15 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { bool written = false; cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { + uint64_t v = it.GetVersion(); + if (v >= snapshot_version_) { + // either has been already serialized or added after snapshotting started. + DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() + << " at " << v; + } + constexpr DbIndex cluster_db_index = 0; + db_slice_->FlushChangeToEarlierCallbacks(cluster_db_index, DbSlice::Iterator::FromPrime(it), + snapshot_version_); if (WriteBucket(it)) { written = true; } diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index c25887ff7896..19757bf99a95 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1177,11 +1177,6 @@ async def test_cluster_fuzzymigration( seeder = df_seeder_factory.create(keys=keys, port=nodes[0].instance.port, cluster_mode=True) await seeder.run(target_deviation=0.1) - fill_task = asyncio.create_task(seeder.run()) - - # some time fo seeder - await asyncio.sleep(0.5) - # Counter that pushes values to a list async def list_counter(key, client: aioredis.RedisCluster): for i in itertools.count(start=1): @@ -1197,9 +1192,6 @@ async def list_counter(key, client: aioredis.RedisCluster): for key, conn in zip(counter_keys, counter_connections) ] - seeder.stop() - await fill_task - # Generate capture, capture ignores counter keys capture = await seeder.capture() @@ -1237,6 +1229,7 @@ async def list_counter(key, client: aioredis.RedisCluster): keeping = node.slots[num_outgoing:] node.next_slots.extend(keeping) + logging.debug("start migrations") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) iterations = 0 @@ -1244,7 +1237,7 @@ async def list_counter(key, client: aioredis.RedisCluster): is_all_finished = True for node in nodes: states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") - print(states) + logging.debug(states) is_all_finished = is_all_finished and ( all("FINISHED" in s for s in states) or states == "NO_STATE" ) @@ -1257,23 +1250,17 @@ async def list_counter(key, client: aioredis.RedisCluster): await asyncio.sleep(0.1) - # Stop counters for counter in counters: counter.cancel() # clean migrations for node in nodes: node.migrations = [] + node.slots = node.next_slots - # TODO this config should be pushed with new slots - # Push new config + logging.debug("remove finished migrations") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - # Transfer nodes - for node in nodes: - node.slots = node.next_slots - node.new_slots = [] - # Check counter consistency cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) for key in counter_keys: From ef0e5b770d7ea7998f9ba04d7e93d98c95b265a3 Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 4 Jun 2024 11:01:06 +0300 Subject: [PATCH 2/2] refactor: address comments --- src/server/journal/streamer.cc | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 68bf3d406bda..2495eb8c783f 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -83,15 +83,8 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { bool written = false; cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { - uint64_t v = it.GetVersion(); - if (v >= snapshot_version_) { - // either has been already serialized or added after snapshotting started. - DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << ":" << it.slot_id() - << " at " << v; - } - constexpr DbIndex cluster_db_index = 0; - db_slice_->FlushChangeToEarlierCallbacks(cluster_db_index, DbSlice::Iterator::FromPrime(it), - snapshot_version_); + db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, + DbSlice::Iterator::FromPrime(it), snapshot_version_); if (WriteBucket(it)) { written = true; }