From 1c591a767f50ab3af673a3ca71848342dfe3af76 Mon Sep 17 00:00:00 2001 From: young scott Date: Thu, 28 Sep 2023 02:08:02 +0000 Subject: [PATCH] fix bugs about merge task confilcts --- .../MergeTree/MergeFromLogEntryTask.cpp | 6 +-- src/Storages/StorageReplicatedMergeTree.cpp | 45 +++++++------------ 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index b441a3b88cbc..e8082ffbba0c 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -79,10 +79,10 @@ std::pair MergeFromLogEntryT /// 2. We have some larger merged part which covers new_part_name (and therefore it covers source_part_name too) /// 3. We have two intersecting parts, both cover source_part_name. It's logical error. /// TODO Why 1 and 2 can happen? Do we need more assertions here or somewhere else? - constexpr const char * message = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often."; - LOG_WARNING(log, fmt::runtime(message), source_part_name, source_part_or_covering->name, entry.new_part_name); + constexpr const char * message = "Part {} of table {} is covered by {} but should be merged into {}. This shouldn't happen often."; + LOG_WARNING(log, fmt::runtime(message), source_part_name, storage.getStorageID().getFullTableName(), source_part_or_covering->name, entry.new_part_name); if (!source_part_or_covering->info.contains(MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version))) - throw Exception(ErrorCodes::LOGICAL_ERROR, message, source_part_name, source_part_or_covering->name, entry.new_part_name); + throw Exception(ErrorCodes::LOGICAL_ERROR, message, source_part_name, storage.getStorageID().getFullTableName(), source_part_or_covering->name, entry.new_part_name); return {false, {}}; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7ded9915f78e..b3804ed49799 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -188,19 +188,10 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const { - do - { - auto res = tryGetZooKeeper(); - if (!res) - throw Exception("Cannot get ZooKeeper", ErrorCodes::NO_ZOOKEEPER); - if (res->expired()) - { - const_cast(this)->setZooKeeper(); - continue; - } - return res; - } - while (true); + auto res = tryGetZooKeeper(); + if (!res) + throw Exception("Cannot get ZooKeeper", ErrorCodes::NO_ZOOKEEPER); + return res; } zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonly() const @@ -1737,11 +1728,11 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry) if (!unactive_replica.empty()) { ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches); - throw Exception("No active replica has part " + entry.new_part_name + " or covering part", ErrorCodes::NO_REPLICA_HAS_PART); + throw Exception("Replica " + unactive_replica + " has part " + entry.new_part_name + " or covering part, but it's unactive, try fetch it later", ErrorCodes::NO_REPLICA_HAS_PART); } - if (std::difftime(std::time(nullptr), entry.create_time) < getSettings()->prefer_fetch_merged_part_time_threshold.totalSeconds()) + if (std::difftime(std::time(nullptr), entry.create_time) < 7 * 24 * 60 * 60) { - LOG_INFO(log, "Will not fetch part {}. No replica has this part. try fetch it later", entry.new_part_name); + LOG_INFO(log, "Will not fetch part {}. No replica has this part. Maybe this part isn't merged by others. Try fetch it later", entry.new_part_name); return false; } LOG_INFO(log, "Will not fetch part {}. No replica has this part.", entry.new_part_name); @@ -7832,24 +7823,20 @@ DistributeLockGuardPtr StorageReplicatedMergeTree::getDistributeLockGuard( const String & part_name, const String & value) const { - zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper(); - if (!zookeeper) - { - LOG_WARNING(log, "Cannot create distribute lock for part {} on disk {} guard without zookeeper", part_name, disk_name); - return nullptr; - } + zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly(); String hostname = getContext()->getInterserverIOAddress().first; String zk_node_path = getDistributeLockPath(disk_name, part_name); DistributeLockGuardPtr lock_guard = std::make_shared(zookeeper, zk_node_path); - while (!lock_guard->tryLock(value, hostname)) + if (zookeeper->expired()) { - if (zookeeper->expired()) - { - zookeeper = getContext()->getZooKeeper(); - } - LOG_DEBUG(log, "lock of part {} postponed, someone other using ({}) this part right now", part_name, value); - sleep(2); + LOG_DEBUG(log, "lock of part {} ({}) failed, zk session expired", part_name, value); + return nullptr; + } + if (!lock_guard->tryLock(value, hostname)) + { + LOG_DEBUG(log, "lock of part {} postponed, maybe someone using ({}) this part right now", part_name, value); + return nullptr; } LOG_DEBUG(log, "Create distribute lock for part {} on disk {} for {}", part_name, disk_name, value); return lock_guard;