Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HostResolver behavior on fail #62652

Merged
merged 7 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@
M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \
\
M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \


#ifdef APPLY_FOR_EXTERNAL_METRICS
Expand Down
48 changes: 39 additions & 9 deletions src/Common/HostResolvePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <Common/MemoryTrackerSwitcher.h>

#include <mutex>
#include <algorithm>


namespace ProfileEvents
{
Expand All @@ -19,6 +21,7 @@ namespace ProfileEvents
namespace CurrentMetrics
{
extern const Metric AddressesActive;
extern const Metric AddressesBanned;
}

namespace DB
Expand All @@ -36,6 +39,7 @@ HostResolverMetrics HostResolver::getMetrics()
.expired = ProfileEvents::AddressesExpired,
.failed = ProfileEvents::AddressesMarkedAsFailed,
.active_count = CurrentMetrics::AddressesActive,
.banned_count = CurrentMetrics::AddressesBanned,
};
}

Expand All @@ -47,7 +51,7 @@ HostResolver::WeakPtr HostResolver::getWeakFromThis()
HostResolver::HostResolver(String host_, Poco::Timespan history_)
: host(std::move(host_))
, history(history_)
, resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAll(host_to_resolve); })
, resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAllInOriginOrder(host_to_resolve); })
{
update();
}
Expand All @@ -62,6 +66,12 @@ HostResolver::HostResolver(
HostResolver::~HostResolver()
{
std::lock_guard lock(mutex);

auto banned_count = 0;
for (const auto & rec: records)
banned_count += rec.failed;
CurrentMetrics::sub(metrics.banned_count, banned_count);

CurrentMetrics::sub(metrics.active_count, records.size());
records.clear();
}
Expand Down Expand Up @@ -113,6 +123,7 @@ void HostResolver::updateWeights()

if (getTotalWeight() == 0 && !records.empty())
{
CurrentMetrics::sub(metrics.banned_count, records.size());
Copy link
Contributor Author

@ianton-ru ianton-ru May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of records may be alive here, need to decrease only for records with failed==true, I guess.
Like

size_t banned_count = 0;
for (auto & rec : record)
{
  if (rec.failed)
  {
    ++banned_count;
    rec.failed = false;
  }
}
CurrentMetrics::sub(metrics.banned_count, banned_count);

And technically metric will jump from some value to zero and back when IP still unavailable.
So may be more clean to return bool was_banned from setSuccess method and decrement counter when changed there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are here only when we need to reset failed flag for all records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateWeights is called in updateImpl too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I do not understand what you are pointing at.

update fetch new hosts from resolver cache and calls updateImpl under a lock.
updateImpl applys new set of IP's to the inner stage and calls updateWeights.
updateWeights firstly calls updateWeightsImpl. updateWeightsImpl calculate new weights for random choises.
after that updateWeights checks if we seceded in that weight calculation. If all hosts are benned as faulty then we reset faulty flags for all the records. It is our only choice here. And after updateWeights calls updateWeightsImpl one more time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateWeights is called in two cases -- 1. we do update. 2. Some records changed their weight.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed that getTotalWeight() == 0 when this code call.
Sorry,my mistake.

for (auto & rec : records)
rec.failed = false;

Expand Down Expand Up @@ -140,7 +151,7 @@ void HostResolver::setSuccess(const Poco::Net::IPAddress & address)
return;

auto old_weight = it->getWeight();
++it->usage;
it->setSuccess();
auto new_weight = it->getWeight();

if (old_weight != new_weight)
Expand All @@ -158,8 +169,8 @@ void HostResolver::setFail(const Poco::Net::IPAddress & address)
if (it == records.end())
return;

it->failed = true;
it->fail_time = now;
if (it->setFail(now))
CurrentMetrics::add(metrics.banned_count);
}

ProfileEvents::increment(metrics.failed);
Expand Down Expand Up @@ -216,14 +227,20 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
{
CurrentMetrics::sub(metrics.active_count, 1);
ProfileEvents::increment(metrics.expired, 1);
if (it_before->failed)
CurrentMetrics::sub(metrics.banned_count);
}
++it_before;
}
else if (it_before == records.end() || (it_next != next_gen.end() && *it_next < it_before->address))
{
CurrentMetrics::add(metrics.active_count, 1);
ProfileEvents::increment(metrics.discovered, 1);
merged.push_back(Record(*it_next, now));
/// there are could be duplicates in next_gen vector
if (merged.empty() || merged.back().address != *it_next)
{
CurrentMetrics::add(metrics.active_count, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we do not update metrics for duplicates.

ProfileEvents::increment(metrics.discovered, 1);
merged.push_back(Record(*it_next, now));
}
++it_next;
}
else
Expand All @@ -237,10 +254,22 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
}

for (auto & rec : merged)
if (rec.failed && rec.fail_time < last_effective_resolve)
rec.failed = false;
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I adjust new counter banned_count. class Rec is unaware about metrics, as a result that code does not belong Rec's method.

if (!rec.failed)
continue;

/// Exponential increased time for each consecutive fail
auto banned_until = now - Poco::Timespan(history.totalMicroseconds() * (1ull << (rec.consecutive_fail_count - 1)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I use history.totalMicroseconds() instead history.totalSeconds() in order to work with times less than 1 second in ut-tests.

if (rec.fail_time < banned_until)
{
rec.failed = false;
CurrentMetrics::sub(metrics.banned_count);
}
}

chassert(std::is_sorted(merged.begin(), merged.end()));
// check that merged contains unuque elements
chassert(std::adjacent_find(merged.begin(), merged.end()) == merged.end());

last_resolve_time = now;
records.swap(merged);
Expand All @@ -251,6 +280,7 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
updateWeights();
}


size_t HostResolver::getTotalWeight() const
{
if (records.empty())
Expand Down
33 changes: 32 additions & 1 deletion src/Common/HostResolvePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ struct HostResolverMetrics
const ProfileEvents::Event failed = ProfileEvents::end();

const CurrentMetrics::Metric active_count = CurrentMetrics::end();
const CurrentMetrics::Metric banned_count = CurrentMetrics::end();
};

constexpr size_t DEFAULT_RESOLVE_TIME_HISTORY_SECONDS = 2*60;
constexpr size_t RECORD_CONSECTIVE_FAIL_COUNT_LIMIT = 6;


class HostResolver : public std::enable_shared_from_this<HostResolver>
Expand Down Expand Up @@ -141,6 +143,7 @@ class HostResolver : public std::enable_shared_from_this<HostResolver>
size_t usage = 0;
bool failed = false;
Poco::Timestamp fail_time = 0;
size_t consecutive_fail_count = 0;

size_t weight_prefix_sum;

Expand All @@ -149,6 +152,11 @@ class HostResolver : public std::enable_shared_from_this<HostResolver>
return address < r.address;
}

bool operator ==(const Record & r) const
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs for is_unuque check under chassert

{
return address == r.address;
}

size_t getWeight() const
{
if (failed)
Expand All @@ -166,6 +174,28 @@ class HostResolver : public std::enable_shared_from_this<HostResolver>
return 8;
return 10;
}

bool setFail(const Poco::Timestamp & now)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return true if status has chenged. Needs for adjusting metrics.

{
bool was_ok = !failed;

failed = true;
fail_time = now;

if (was_ok)
{
if (consecutive_fail_count < RECORD_CONSECTIVE_FAIL_COUNT_LIMIT)
++consecutive_fail_count;
}

return was_ok;
}

void setSuccess()
{
consecutive_fail_count = 0;
++usage;
}
};

using Records = std::vector<Record>;
Expand All @@ -178,6 +208,7 @@ class HostResolver : public std::enable_shared_from_this<HostResolver>
void updateWeights() TSA_REQUIRES(mutex);
void updateWeightsImpl() TSA_REQUIRES(mutex);
size_t getTotalWeight() const TSA_REQUIRES(mutex);
Poco::Timespan getRecordHistoryTime(const Record&) const;

const String host;
const Poco::Timespan history;
Expand All @@ -188,7 +219,7 @@ class HostResolver : public std::enable_shared_from_this<HostResolver>

std::mutex mutex;

Poco::Timestamp last_resolve_time TSA_GUARDED_BY(mutex);
Poco::Timestamp last_resolve_time TSA_GUARDED_BY(mutex) = Poco::Timestamp::TIMEVAL_MIN;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to be sure that HostResolver::update is called in c-tor even if history is 0.

Records records TSA_GUARDED_BY(mutex);

Poco::Logger * log = &Poco::Logger::get("ConnectionPool");
Expand Down
Loading
Loading