Skip to content

Commit

Permalink
fix bugs about merge task confilcts
Browse files Browse the repository at this point in the history
  • Loading branch information
young-scott committed Sep 28, 2023
1 parent 2184fc0 commit 1c591a7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 32 deletions.
6 changes: 3 additions & 3 deletions src/Storages/MergeTree/MergeFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
/// 2. We have some larger merged part which covers new_part_name (and therefore it covers source_part_name too)
/// 3. We have two intersecting parts, both cover source_part_name. It's logical error.
/// TODO Why 1 and 2 can happen? Do we need more assertions here or somewhere else?
constexpr const char * message = "Part {} is covered by {} but should be merged into {}. This shouldn't happen often.";
LOG_WARNING(log, fmt::runtime(message), source_part_name, source_part_or_covering->name, entry.new_part_name);
constexpr const char * message = "Part {} of table {} is covered by {} but should be merged into {}. This shouldn't happen often.";
LOG_WARNING(log, fmt::runtime(message), source_part_name, storage.getStorageID().getFullTableName(), source_part_or_covering->name, entry.new_part_name);
if (!source_part_or_covering->info.contains(MergeTreePartInfo::fromPartName(entry.new_part_name, storage.format_version)))
throw Exception(ErrorCodes::LOGICAL_ERROR, message, source_part_name, source_part_or_covering->name, entry.new_part_name);
throw Exception(ErrorCodes::LOGICAL_ERROR, message, source_part_name, storage.getStorageID().getFullTableName(), source_part_or_covering->name, entry.new_part_name);
return {false, {}};
}

Expand Down
45 changes: 16 additions & 29 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,10 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper() const

zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
{
do
{
auto res = tryGetZooKeeper();
if (!res)
throw Exception("Cannot get ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
if (res->expired())
{
const_cast<StorageReplicatedMergeTree*>(this)->setZooKeeper();
continue;
}
return res;
}
while (true);
auto res = tryGetZooKeeper();
if (!res)
throw Exception("Cannot get ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
return res;
}

zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonly() const
Expand Down Expand Up @@ -1737,11 +1728,11 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
if (!unactive_replica.empty())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
throw Exception("No active replica has part " + entry.new_part_name + " or covering part", ErrorCodes::NO_REPLICA_HAS_PART);
throw Exception("Replica " + unactive_replica + " has part " + entry.new_part_name + " or covering part, but it's unactive, try fetch it later", ErrorCodes::NO_REPLICA_HAS_PART);
}
if (std::difftime(std::time(nullptr), entry.create_time) < getSettings()->prefer_fetch_merged_part_time_threshold.totalSeconds())
if (std::difftime(std::time(nullptr), entry.create_time) < 7 * 24 * 60 * 60)
{
LOG_INFO(log, "Will not fetch part {}. No replica has this part. try fetch it later", entry.new_part_name);
LOG_INFO(log, "Will not fetch part {}. No replica has this part. Maybe this part isn't merged by others. Try fetch it later", entry.new_part_name);
return false;
}
LOG_INFO(log, "Will not fetch part {}. No replica has this part.", entry.new_part_name);
Expand Down Expand Up @@ -7832,24 +7823,20 @@ DistributeLockGuardPtr StorageReplicatedMergeTree::getDistributeLockGuard(
const String & part_name,
const String & value) const
{
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
{
LOG_WARNING(log, "Cannot create distribute lock for part {} on disk {} guard without zookeeper", part_name, disk_name);
return nullptr;
}
zkutil::ZooKeeperPtr zookeeper = getZooKeeperAndAssertNotReadonly();

String hostname = getContext()->getInterserverIOAddress().first;
String zk_node_path = getDistributeLockPath(disk_name, part_name);
DistributeLockGuardPtr lock_guard = std::make_shared<DistributeLockGuard>(zookeeper, zk_node_path);
while (!lock_guard->tryLock(value, hostname))
if (zookeeper->expired())
{
if (zookeeper->expired())
{
zookeeper = getContext()->getZooKeeper();
}
LOG_DEBUG(log, "lock of part {} postponed, someone other using ({}) this part right now", part_name, value);
sleep(2);
LOG_DEBUG(log, "lock of part {} ({}) failed, zk session expired", part_name, value);
return nullptr;
}
if (!lock_guard->tryLock(value, hostname))
{
LOG_DEBUG(log, "lock of part {} postponed, maybe someone using ({}) this part right now", part_name, value);
return nullptr;
}
LOG_DEBUG(log, "Create distribute lock for part {} on disk {} for {}", part_name, disk_name, value);
return lock_guard;
Expand Down

0 comments on commit 1c591a7

Please sign in to comment.