Skip to content

Commit

Permalink
CDS: remove warming cluster if CDS response desired (envoyproxy#13997)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Dai <[email protected]>
Signed-off-by: Qin Qin <[email protected]>
  • Loading branch information
lambdai authored and qqustc committed Nov 24, 2020
1 parent 26ab4e5 commit b4f444d
Show file tree
Hide file tree
Showing 22 changed files with 216 additions and 102 deletions.
9 changes: 6 additions & 3 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,15 @@ class ClusterManager {
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) PURE;

using ClusterInfoMap = absl::node_hash_map<std::string, std::reference_wrapper<const Cluster>>;
struct ClusterInfoMaps {
ClusterInfoMap active_clusters_;
ClusterInfoMap warming_clusters_;
};

/**
* @return ClusterInfoMap all current clusters. These are the primary (not thread local)
* clusters which should only be used for stats/admin.
* @return ClusterInfoMap all current clusters including active and warming.
*/
virtual ClusterInfoMap clusters() PURE;
virtual ClusterInfoMaps clusters() PURE;

using ClusterSet = absl::flat_hash_set<std::string>;

Expand Down
6 changes: 3 additions & 3 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
}

const std::string& cluster_name = config.envoy_grpc().cluster_name();
auto clusters = cm_.clusters();
const auto& it = clusters.find(cluster_name);
if (it == clusters.end()) {
auto all_clusters = cm_.clusters();
const auto& it = all_clusters.active_clusters_.find(cluster_name);
if (it == all_clusters.active_clusters_.end()) {
throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster_name));
}
if (it->second.get().info()->addedViaApi()) {
Expand Down
17 changes: 12 additions & 5 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config,

void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) {
ClusterManager::ClusterInfoMap clusters_to_remove = cm_.clusters();
std::vector<envoy::config::cluster::v3::Cluster> clusters;
auto all_existing_clusters = cm_.clusters();
// Exclude the clusters which CDS wants to add.
for (const auto& resource : resources) {
clusters_to_remove.erase(resource.get().name());
all_existing_clusters.active_clusters_.erase(resource.get().name());
all_existing_clusters.warming_clusters_.erase(resource.get().name());
}
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (const auto& [cluster_name, _] : clusters_to_remove) {
for (const auto& [cluster_name, _] : all_existing_clusters.active_clusters_) {
*to_remove_repeated.Add() = cluster_name;
}
for (const auto& [cluster_name, _] : all_existing_clusters.warming_clusters_) {
// Do not add the cluster twice when the cluster is both active and warming.
if (all_existing_clusters.active_clusters_.count(cluster_name) == 0) {
*to_remove_repeated.Add() = cluster_name;
}
}
onConfigUpdate(resources, to_remove_repeated, version_info);
}

Expand All @@ -64,7 +71,7 @@ void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& a
removed_resources.size());

std::vector<std::string> exception_msgs;
absl::node_hash_set<std::string> cluster_names;
absl::flat_hash_set<std::string> cluster_names(added_resources.size());
bool any_applied = false;
for (const auto& resource : added_resources) {
envoy::config::cluster::v3::Cluster cluster;
Expand Down
14 changes: 8 additions & 6 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,17 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
init_helper_.setInitializedCb(callback);
}

ClusterInfoMap clusters() override {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
ClusterInfoMap clusters_map;
ClusterInfoMaps clusters() override {
ClusterInfoMaps clusters_maps;
for (auto& cluster : active_clusters_) {
clusters_map.emplace(cluster.first, *cluster.second->cluster_);
clusters_maps.active_clusters_.emplace(cluster.first, *cluster.second->cluster_);
}

return clusters_map;
for (auto& cluster : warming_clusters_) {
clusters_maps.warming_clusters_.emplace(cluster.first, *cluster.second->cluster_);
}
return clusters_maps;
}

const ClusterSet& primaryClusters() override { return primary_clusters_; }
ThreadLocalCluster* get(absl::string_view cluster) override;

Expand Down
17 changes: 10 additions & 7 deletions source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ void LoadStatsReporter::sendLoadStatsRequest() {
// added to the cluster manager. When we get the notification, we record the current time in
// clusters_ as the start time for the load reporting window for that cluster.
request_.mutable_cluster_stats()->Clear();
auto all_clusters = cm_.clusters();
for (const auto& cluster_name_and_timestamp : clusters_) {
const std::string& cluster_name = cluster_name_and_timestamp.first;
auto cluster_info_map = cm_.clusters();
auto it = cluster_info_map.find(cluster_name);
if (it == cluster_info_map.end()) {
auto it = all_clusters.active_clusters_.find(cluster_name);
if (it == all_clusters.active_clusters_.end()) {
ENVOY_LOG(debug, "Cluster {} does not exist", cluster_name);
continue;
}
Expand Down Expand Up @@ -154,7 +154,8 @@ void LoadStatsReporter::startLoadReportPeriod() {
// converge.
absl::node_hash_map<std::string, std::chrono::steady_clock::duration> existing_clusters;
if (message_->send_all_clusters()) {
for (const auto& p : cm_.clusters()) {
auto cluster_info_map = cm_.clusters();
for (const auto& p : cluster_info_map.active_clusters_) {
const std::string& cluster_name = p.first;
if (clusters_.count(cluster_name) > 0) {
existing_clusters.emplace(cluster_name, clusters_[cluster_name]);
Expand All @@ -173,9 +174,10 @@ void LoadStatsReporter::startLoadReportPeriod() {
clusters_.emplace(cluster_name, existing_clusters.count(cluster_name) > 0
? existing_clusters[cluster_name]
: time_source_.monotonicTime().time_since_epoch());
// TODO(lambdai): Move the clusters() call out of this lambda.
auto cluster_info_map = cm_.clusters();
auto it = cluster_info_map.find(cluster_name);
if (it == cluster_info_map.end()) {
auto it = cluster_info_map.active_clusters_.find(cluster_name);
if (it == cluster_info_map.active_clusters_.end()) {
return;
}
// Don't reset stats for existing tracked clusters.
Expand All @@ -193,7 +195,8 @@ void LoadStatsReporter::startLoadReportPeriod() {
cluster.info()->loadReportStats().upstream_rq_dropped_.latch();
};
if (message_->send_all_clusters()) {
for (const auto& p : cm_.clusters()) {
auto cluster_info_map = cm_.clusters();
for (const auto& p : cluster_info_map.active_clusters_) {
const std::string& cluster_name = p.first;
handle_cluster_func(cluster_name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ bool ClusterRefreshManagerImpl::onEvent(const std::string& cluster_name, EventTy
if (post_callback) {
main_thread_dispatcher_.post([this, cluster_name, info]() {
// Ensure that cluster is still active before calling callback.
auto map = cm_.clusters();
auto it = map.find(cluster_name);
if (it != map.end()) {
auto maps = cm_.clusters();
auto it = maps.active_clusters_.find(cluster_name);
if (it != maps.active_clusters_.end()) {
info->cb_();
}
});
Expand Down
8 changes: 4 additions & 4 deletions source/extensions/stat_sinks/hystrix/hystrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) {
}
incCounter();
std::stringstream ss;
Upstream::ClusterManager::ClusterInfoMap clusters = server_.clusterManager().clusters();
Upstream::ClusterManager::ClusterInfoMaps all_clusters = server_.clusterManager().clusters();

// Save a map of the relevant histograms per cluster in a convenient format.
absl::node_hash_map<std::string, QuantileLatencyMap> time_histograms;
Expand Down Expand Up @@ -370,7 +370,7 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) {
}
}

for (auto& cluster : clusters) {
for (auto& cluster : all_clusters.active_clusters_) {
Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.second.get().info();

std::unique_ptr<ClusterStatsCache>& cluster_stats_cache_ptr =
Expand Down Expand Up @@ -407,9 +407,9 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) {
}

// check if any clusters were removed, and remove from cache
if (clusters.size() < cluster_stats_cache_map_.size()) {
if (all_clusters.active_clusters_.size() < cluster_stats_cache_map_.size()) {
for (auto it = cluster_stats_cache_map_.begin(); it != cluster_stats_cache_map_.end();) {
if (clusters.find(it->first) == clusters.end()) {
if (all_clusters.active_clusters_.find(it->first) == all_clusters.active_clusters_.end()) {
auto next_it = std::next(it);
cluster_stats_cache_map_.erase(it);
it = next_it;
Expand Down
8 changes: 6 additions & 2 deletions source/server/admin/clusters_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ void setHealthFlag(Upstream::Host::HealthFlag flag, const Upstream::Host& host,
// TODO(efimki): Add support of text readouts stats.
void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) {
envoy::admin::v3::Clusters clusters;
for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) {
const Upstream::Cluster& cluster = cluster_ref.get();
Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.info();

Expand Down Expand Up @@ -184,7 +186,9 @@ void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) {

// TODO(efimki): Add support of text readouts stats.
void ClustersHandler::writeClustersAsText(Buffer::Instance& response) {
for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) {
const Upstream::Cluster& cluster = cluster_ref.get();
const std::string& cluster_name = cluster.info()->name();
addOutlierInfo(cluster_name, cluster.outlierDetector(), response);
Expand Down
13 changes: 9 additions & 4 deletions source/server/admin/config_dump_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ ConfigDumpHandler::addResourceToDump(envoy::admin::v3::ConfigDump& dump,
const std::string& resource, bool include_eds) const {
Envoy::Server::ConfigTracker::CbsMap callbacks_map = config_tracker_.getCallbacksMap();
if (include_eds) {
if (!server_.clusterManager().clusters().empty()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
if (!all_clusters.active_clusters_.empty()) {
callbacks_map.emplace("endpoint", [this] { return dumpEndpointConfigs(); });
}
}
Expand Down Expand Up @@ -195,7 +197,9 @@ void ConfigDumpHandler::addAllConfigToDump(envoy::admin::v3::ConfigDump& dump,
bool include_eds) const {
Envoy::Server::ConfigTracker::CbsMap callbacks_map = config_tracker_.getCallbacksMap();
if (include_eds) {
if (!server_.clusterManager().clusters().empty()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
if (!all_clusters.active_clusters_.empty()) {
callbacks_map.emplace("endpoint", [this] { return dumpEndpointConfigs(); });
}
}
Expand All @@ -220,8 +224,9 @@ void ConfigDumpHandler::addAllConfigToDump(envoy::admin::v3::ConfigDump& dump,

ProtobufTypes::MessagePtr ConfigDumpHandler::dumpEndpointConfigs() const {
auto endpoint_config_dump = std::make_unique<envoy::admin::v3::EndpointsConfigDump>();

for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) {
UNREFERENCED_PARAMETER(name);
const Upstream::Cluster& cluster = cluster_ref.get();
Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.info();
Expand Down
9 changes: 5 additions & 4 deletions test/common/grpc/async_client_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

Upstream::ClusterManager::ClusterInfoMap cluster_map;
Upstream::ClusterManager::ClusterInfoMaps cluster_maps;
Upstream::MockClusterMockPrioritySet cluster;
cluster_map.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map));
cluster_maps.active_clusters_.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_maps));
EXPECT_CALL(cluster, info());
EXPECT_CALL(*cluster.info_, addedViaApi());

Expand All @@ -65,7 +65,8 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcDynamicCluster) {
Upstream::ClusterManager::ClusterInfoMap cluster_map;
Upstream::MockClusterMockPrioritySet cluster;
cluster_map.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map));
EXPECT_CALL(cm_, clusters())
.WillOnce(Return(Upstream::ClusterManager::ClusterInfoMaps{cluster_map, {}}));
EXPECT_CALL(cluster, info());
EXPECT_CALL(*cluster.info_, addedViaApi()).WillOnce(Return(true));
EXPECT_THROW_WITH_MESSAGE(
Expand Down
34 changes: 19 additions & 15 deletions test/common/upstream/cds_api_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ class CdsApiImplTest : public testing::Test {
.WillOnce(Throw(EnvoyException(exception_msg)));
}

ClusterManager::ClusterInfoMap makeClusterMap(const std::vector<std::string>& clusters) {
ClusterManager::ClusterInfoMap map;
for (const auto& cluster : clusters) {
map.emplace(cluster, cm_.thread_local_cluster_.cluster_);
ClusterManager::ClusterInfoMaps
makeClusterInfoMaps(const std::vector<std::string>& active_clusters,
const std::vector<std::string>& warming_clusters = {}) {
ClusterManager::ClusterInfoMaps maps;
for (const auto& cluster : active_clusters) {
maps.active_clusters_.emplace(cluster, cm_.thread_local_cluster_.cluster_);
}
return map;
for (const auto& cluster : warming_clusters) {
maps.warming_clusters_.emplace(cluster, cm_.thread_local_cluster_.cluster_);
}
return maps;
}

NiceMock<MockClusterManager> cm_;
Upstream::ClusterManager::ClusterInfoMap cluster_map_;
Upstream::MockClusterMockPrioritySet mock_cluster_;
Stats::IsolatedStoreImpl store_;
CdsApiPtr cds_;
Expand Down Expand Up @@ -92,7 +96,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
expectAdd("cluster1", "0");
EXPECT_CALL(initialized_, ready());
EXPECT_EQ("", cds_->versionInfo());
Expand All @@ -108,7 +112,7 @@ version_info: '1'
)EOF";
auto response2 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response2_yaml);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1"})));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({"cluster1"})));
EXPECT_CALL(cm_, removeCluster("cluster1")).WillOnce(Return(true));
const auto decoded_resources_2 =
TestUtility::decodeResources<envoy::config::cluster::v3::Cluster>(response2);
Expand All @@ -126,7 +130,7 @@ TEST_F(CdsApiImplTest, ValidateDuplicateClusters) {
cluster_1.set_name("duplicate_cluster");
const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1});

EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_));
EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());
EXPECT_THROW_WITH_MESSAGE(cds_callbacks_->onConfigUpdate(decoded_resources.refvec_, ""),
EnvoyException,
Expand All @@ -139,7 +143,7 @@ TEST_F(CdsApiImplTest, EmptyConfigUpdate) {

setup();

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());

cds_callbacks_->onConfigUpdate({}, "");
Expand All @@ -151,7 +155,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateWith2ValidClusters) {
setup();
}

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());

envoy::config::cluster::v3::Cluster cluster_1;
Expand Down Expand Up @@ -224,7 +228,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateAddsSecondClusterEvenIfFirstThrows) {
setup();
}

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());

envoy::config::cluster::v3::Cluster cluster_1;
Expand Down Expand Up @@ -269,7 +273,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
expectAdd("cluster1", "0");
expectAdd("cluster2", "0");
EXPECT_CALL(initialized_, ready());
Expand Down Expand Up @@ -298,7 +302,7 @@ version_info: '1'
auto response2 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response2_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1", "cluster2"})));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({"cluster1", "cluster2"})));
expectAdd("cluster1", "1");
expectAdd("cluster3", "1");
EXPECT_CALL(cm_, removeCluster("cluster2"));
Expand Down Expand Up @@ -334,7 +338,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_));
EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());
const auto decoded_resources =
TestUtility::decodeResources<envoy::config::cluster::v3::Cluster>(response1);
Expand Down
6 changes: 3 additions & 3 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ TEST_F(ClusterManagerImplTest, ValidClusterName) {

create(parseBootstrapFromV3Yaml(yaml));
cluster_manager_->clusters()
.find("cluster:name")
.active_clusters_.find("cluster:name")
->second.get()
.info()
->statsScope()
Expand Down Expand Up @@ -1490,7 +1490,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) {
EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(update_cluster, ""));

EXPECT_EQ(cluster2->info_, cluster_manager_->get("fake_cluster")->info());
EXPECT_EQ(1UL, cluster_manager_->clusters().size());
EXPECT_EQ(1UL, cluster_manager_->clusters().active_clusters_.size());
Http::ConnectionPool::MockInstance* cp = new Http::ConnectionPool::MockInstance();
EXPECT_CALL(factory_, allocateConnPool_(_, _, _)).WillOnce(Return(cp));
EXPECT_EQ(cp, cluster_manager_->httpConnPoolForCluster("fake_cluster", ResourcePriority::Default,
Expand Down Expand Up @@ -1520,7 +1520,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) {
EXPECT_CALL(*cp2, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb2));
EXPECT_TRUE(cluster_manager_->removeCluster("fake_cluster"));
EXPECT_EQ(nullptr, cluster_manager_->get("fake_cluster"));
EXPECT_EQ(0UL, cluster_manager_->clusters().size());
EXPECT_EQ(0UL, cluster_manager_->clusters().active_clusters_.size());

// Close the TCP connection. Success is no ASSERT or crash due to referencing
// the removed cluster.
Expand Down
Loading

0 comments on commit b4f444d

Please sign in to comment.