Skip to content

Commit

Permalink
dfp: reverting 31433 (#32743)
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Mar 7, 2024
1 parent 7186d1b commit 8ac6d0c
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 29 deletions.
6 changes: 0 additions & 6 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,6 @@ HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const {
if (mutable_cross_priority_host_map_ != nullptr) {
const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_);
ASSERT(mutable_cross_priority_host_map_ == nullptr);
ENVOY_LOG(debug, "cross_priority host map, moving mutable to const, len: {}",
const_cross_priority_host_map_->size());
}
return const_cross_priority_host_map_;
}
Expand All @@ -885,18 +883,14 @@ void MainPrioritySetImpl::updateCrossPriorityHostMap(const HostVector& hosts_add
if (mutable_cross_priority_host_map_ == nullptr) {
// Copy old read only host map to mutable host map.
mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_);
ENVOY_LOG(debug, "cross_priority host map, copying from const, len: {}",
const_cross_priority_host_map_->size());
}

for (const auto& host : hosts_removed) {
mutable_cross_priority_host_map_->erase(addressToString(host->address()));
ENVOY_LOG(debug, "cross_priority host map, removing: {}", addressToString(host->address()));
}

for (const auto& host : hosts_added) {
mutable_cross_priority_host_map_->insert({addressToString(host->address()), host});
ENVOY_LOG(debug, "cross_priority host map, adding: {}", addressToString(host->address()));
}
}

Expand Down
56 changes: 36 additions & 20 deletions source/extensions/clusters/dynamic_forward_proxy/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ Cluster::~Cluster() {

void Cluster::startPreInit() {
// If we are attaching to a pre-populated cache we need to initialize our hosts.
std::unique_ptr<Upstream::HostVector> hosts_added;
dns_cache_->iterateHostMap(
[&](absl::string_view host, const Common::DynamicForwardProxy::DnsHostInfoSharedPtr& info) {
addOrUpdateHost(host, info);
addOrUpdateHost(host, info, hosts_added);
});
if (hosts_added) {
updatePriorityState(*hosts_added, {});
}
onPreInitComplete();
}

Expand Down Expand Up @@ -233,11 +237,9 @@ bool Cluster::ClusterInfo::checkIdle() {

void Cluster::addOrUpdateHost(
absl::string_view host,
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) {
Upstream::HostVector hosts_added, hosts_removed;
Upstream::LogicalHostSharedPtr new_host = std::make_shared<Upstream::LogicalHost>(
info(), std::string{host}, host_info->address(), host_info->addressList(),
dummy_locality_lb_endpoint_, dummy_lb_endpoint_, nullptr, time_source_);
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
std::unique_ptr<Upstream::HostVector>& hosts_added) {
Upstream::LogicalHostSharedPtr emplaced_host;
{
absl::WriterMutexLock lock{&host_map_lock_};

Expand All @@ -249,6 +251,7 @@ void Cluster::addOrUpdateHost(
// future.
const auto host_map_it = host_map_.find(host);
if (host_map_it != host_map_.end()) {
// If we only have an address change, we can do that swap inline without any other updates.
// The appropriate R/W locking is in place to allow this. The details of this locking are:
// - Hosts are not thread local, they are global.
// - We take a read lock when reading the address and a write lock when changing it.
Expand All @@ -264,31 +267,44 @@ void Cluster::addOrUpdateHost(
// semantics, meaning the cache would expose multiple addresses and the
// cluster would create multiple logical hosts based on those addresses.
// We will leave this is a follow up depending on need.
ASSERT(host_info == host_map_it->second.shared_host_info_);
ASSERT(host_map_it->second.shared_host_info_->address() !=
host_map_it->second.logical_host_->address());

// remove the old host
hosts_removed.emplace_back(host_map_it->second.logical_host_);
ENVOY_LOG(debug, "updating dfproxy cluster host address '{}'", host);
host_map_.erase(host_map_it);
host_map_.try_emplace(host, host_info, new_host);

} else {
ENVOY_LOG(debug, "adding new dfproxy cluster host '{}'", host);
host_map_.try_emplace(host, host_info, new_host);
host_map_it->second.logical_host_->setNewAddresses(
host_info->address(), host_info->addressList(), dummy_lb_endpoint_);
return;
}
hosts_added.emplace_back(new_host);

ENVOY_LOG(debug, "adding new dfproxy cluster host '{}'", host);

emplaced_host = host_map_
.try_emplace(host, host_info,
std::make_shared<Upstream::LogicalHost>(
info(), std::string{host}, host_info->address(),
host_info->addressList(), dummy_locality_lb_endpoint_,
dummy_lb_endpoint_, nullptr, time_source_))
.first->second.logical_host_;
}

ASSERT(!hosts_added.empty());
updatePriorityState(hosts_added, hosts_removed);
ASSERT(emplaced_host);
if (hosts_added == nullptr) {
hosts_added = std::make_unique<Upstream::HostVector>();
}
hosts_added->emplace_back(emplaced_host);
}

void Cluster::onDnsHostAddOrUpdate(
const std::string& host,
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) {
ENVOY_LOG(debug, "Adding/Updating host info for {}", host);
addOrUpdateHost(host, host_info);
ENVOY_LOG(debug, "Adding host info for {}", host);

std::unique_ptr<Upstream::HostVector> hosts_added;
addOrUpdateHost(host, host_info, hosts_added);
if (hosts_added != nullptr) {
ASSERT(!hosts_added->empty());
updatePriorityState(*hosts_added, {});
}
}

void Cluster::updatePriorityState(const Upstream::HostVector& hosts_added,
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/clusters/dynamic_forward_proxy/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ class Cluster : public Upstream::BaseDynamicClusterImpl,

void
addOrUpdateHost(absl::string_view host,
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info)
const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
std::unique_ptr<Upstream::HostVector>& hosts_added)
ABSL_LOCKS_EXCLUDED(host_map_lock_);

void updatePriorityState(const Upstream::HostVector& hosts_added,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ClusterTest : public testing::Test,
}
}));
if (!existing_hosts.empty()) {
EXPECT_CALL(*this, onMemberUpdateCb(SizeIs(1), SizeIs(0))).Times(existing_hosts.size());
EXPECT_CALL(*this, onMemberUpdateCb(SizeIs(existing_hosts.size()), SizeIs(0)));
}
cluster_->initialize([] {});
}
Expand Down Expand Up @@ -257,7 +257,6 @@ TEST_F(ClusterTest, BasicFlow) {

// After changing the address, LB will immediately resolve the new address with a refresh.
updateTestHostAddress("host1:0", "2.3.4.5");
EXPECT_CALL(*this, onMemberUpdateCb(SizeIs(1), SizeIs(1)));
update_callbacks_->onDnsHostAddOrUpdate("host1:0", host_map_["host1:0"]);
EXPECT_EQ(1UL, cluster_->prioritySet().hostSetsPerPriority()[0]->hosts().size());
EXPECT_EQ("2.3.4.5:0",
Expand Down

0 comments on commit 8ac6d0c

Please sign in to comment.