From b4f444dc1d7ff8d7317f69191f40f34d2434089e Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Fri, 13 Nov 2020 15:05:14 -0800 Subject: [PATCH] CDS: remove warming cluster if CDS response desired (#13997) Signed-off-by: Yuchen Dai Signed-off-by: Qin Qin --- include/envoy/upstream/cluster_manager.h | 9 +- .../common/grpc/async_client_manager_impl.cc | 6 +- source/common/upstream/cds_api_impl.cc | 17 +++- source/common/upstream/cluster_manager_impl.h | 14 +-- source/common/upstream/load_stats_reporter.cc | 17 ++-- .../redis/cluster_refresh_manager_impl.cc | 6 +- .../extensions/stat_sinks/hystrix/hystrix.cc | 8 +- source/server/admin/clusters_handler.cc | 8 +- source/server/admin/config_dump_handler.cc | 13 ++- .../grpc/async_client_manager_impl_test.cc | 9 +- test/common/upstream/cds_api_impl_test.cc | 34 ++++--- .../upstream/cluster_manager_impl_test.cc | 6 +- .../upstream/load_stats_reporter_test.cc | 3 +- .../redis/cluster_refresh_manager_test.cc | 6 +- .../stats_sinks/hystrix/hystrix_test.cc | 18 ++-- test/integration/ads_integration_test.cc | 95 +++++++++++++++++-- .../custom_cluster_integration_test.cc | 8 +- test/integration/eds_integration_test.cc | 8 +- test/mocks/upstream/cluster_manager.h | 3 +- test/server/admin/clusters_handler_test.cc | 6 +- test/server/admin/config_dump_handler_test.cc | 20 ++-- test/server/configuration_impl_test.cc | 4 +- 22 files changed, 216 insertions(+), 102 deletions(-) diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 5939092a371b..beb88299da28 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -126,12 +126,15 @@ class ClusterManager { initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) PURE; using ClusterInfoMap = absl::node_hash_map>; + struct ClusterInfoMaps { + ClusterInfoMap active_clusters_; + ClusterInfoMap warming_clusters_; + }; /** - * @return ClusterInfoMap all current clusters. These are the primary (not thread local) - * clusters which should only be used for stats/admin. + * @return ClusterInfoMap all current clusters including active and warming. */ - virtual ClusterInfoMap clusters() PURE; + virtual ClusterInfoMaps clusters() PURE; using ClusterSet = absl::flat_hash_set; diff --git a/source/common/grpc/async_client_manager_impl.cc b/source/common/grpc/async_client_manager_impl.cc index dc039473395c..5f809755f89d 100644 --- a/source/common/grpc/async_client_manager_impl.cc +++ b/source/common/grpc/async_client_manager_impl.cc @@ -21,9 +21,9 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm, } const std::string& cluster_name = config.envoy_grpc().cluster_name(); - auto clusters = cm_.clusters(); - const auto& it = clusters.find(cluster_name); - if (it == clusters.end()) { + auto all_clusters = cm_.clusters(); + const auto& it = all_clusters.active_clusters_.find(cluster_name); + if (it == all_clusters.active_clusters_.end()) { throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster_name)); } if (it->second.get().info()->addedViaApi()) { diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index 0ca7d5763d5a..4568bf84b8f3 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -38,15 +38,22 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config, void CdsApiImpl::onConfigUpdate(const std::vector& resources, const std::string& version_info) { - ClusterManager::ClusterInfoMap clusters_to_remove = cm_.clusters(); - std::vector clusters; + auto all_existing_clusters = cm_.clusters(); + // Exclude the clusters which CDS wants to add. for (const auto& resource : resources) { - clusters_to_remove.erase(resource.get().name()); + all_existing_clusters.active_clusters_.erase(resource.get().name()); + all_existing_clusters.warming_clusters_.erase(resource.get().name()); } Protobuf::RepeatedPtrField to_remove_repeated; - for (const auto& [cluster_name, _] : clusters_to_remove) { + for (const auto& [cluster_name, _] : all_existing_clusters.active_clusters_) { *to_remove_repeated.Add() = cluster_name; } + for (const auto& [cluster_name, _] : all_existing_clusters.warming_clusters_) { + // Do not add the cluster twice when the cluster is both active and warming. + if (all_existing_clusters.active_clusters_.count(cluster_name) == 0) { + *to_remove_repeated.Add() = cluster_name; + } + } onConfigUpdate(resources, to_remove_repeated, version_info); } @@ -64,7 +71,7 @@ void CdsApiImpl::onConfigUpdate(const std::vector& a removed_resources.size()); std::vector exception_msgs; - absl::node_hash_set cluster_names; + absl::flat_hash_set cluster_names(added_resources.size()); bool any_applied = false; for (const auto& resource : added_resources) { envoy::config::cluster::v3::Cluster cluster; diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 63a5e701e9d6..837afe731a4e 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -236,15 +236,17 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggablecluster_); + clusters_maps.active_clusters_.emplace(cluster.first, *cluster.second->cluster_); } - - return clusters_map; + for (auto& cluster : warming_clusters_) { + clusters_maps.warming_clusters_.emplace(cluster.first, *cluster.second->cluster_); + } + return clusters_maps; } + const ClusterSet& primaryClusters() override { return primary_clusters_; } ThreadLocalCluster* get(absl::string_view cluster) override; diff --git a/source/common/upstream/load_stats_reporter.cc b/source/common/upstream/load_stats_reporter.cc index fa5697e86fbd..b7ff8e94b7c3 100644 --- a/source/common/upstream/load_stats_reporter.cc +++ b/source/common/upstream/load_stats_reporter.cc @@ -62,11 +62,11 @@ void LoadStatsReporter::sendLoadStatsRequest() { // added to the cluster manager. When we get the notification, we record the current time in // clusters_ as the start time for the load reporting window for that cluster. request_.mutable_cluster_stats()->Clear(); + auto all_clusters = cm_.clusters(); for (const auto& cluster_name_and_timestamp : clusters_) { const std::string& cluster_name = cluster_name_and_timestamp.first; - auto cluster_info_map = cm_.clusters(); - auto it = cluster_info_map.find(cluster_name); - if (it == cluster_info_map.end()) { + auto it = all_clusters.active_clusters_.find(cluster_name); + if (it == all_clusters.active_clusters_.end()) { ENVOY_LOG(debug, "Cluster {} does not exist", cluster_name); continue; } @@ -154,7 +154,8 @@ void LoadStatsReporter::startLoadReportPeriod() { // converge. absl::node_hash_map existing_clusters; if (message_->send_all_clusters()) { - for (const auto& p : cm_.clusters()) { + auto cluster_info_map = cm_.clusters(); + for (const auto& p : cluster_info_map.active_clusters_) { const std::string& cluster_name = p.first; if (clusters_.count(cluster_name) > 0) { existing_clusters.emplace(cluster_name, clusters_[cluster_name]); @@ -173,9 +174,10 @@ void LoadStatsReporter::startLoadReportPeriod() { clusters_.emplace(cluster_name, existing_clusters.count(cluster_name) > 0 ? existing_clusters[cluster_name] : time_source_.monotonicTime().time_since_epoch()); + // TODO(lambdai): Move the clusters() call out of this lambda. auto cluster_info_map = cm_.clusters(); - auto it = cluster_info_map.find(cluster_name); - if (it == cluster_info_map.end()) { + auto it = cluster_info_map.active_clusters_.find(cluster_name); + if (it == cluster_info_map.active_clusters_.end()) { return; } // Don't reset stats for existing tracked clusters. @@ -193,7 +195,8 @@ void LoadStatsReporter::startLoadReportPeriod() { cluster.info()->loadReportStats().upstream_rq_dropped_.latch(); }; if (message_->send_all_clusters()) { - for (const auto& p : cm_.clusters()) { + auto cluster_info_map = cm_.clusters(); + for (const auto& p : cluster_info_map.active_clusters_) { const std::string& cluster_name = p.first; handle_cluster_func(cluster_name); } diff --git a/source/extensions/common/redis/cluster_refresh_manager_impl.cc b/source/extensions/common/redis/cluster_refresh_manager_impl.cc index 8da52d96665c..c3caa96d9eec 100644 --- a/source/extensions/common/redis/cluster_refresh_manager_impl.cc +++ b/source/extensions/common/redis/cluster_refresh_manager_impl.cc @@ -133,9 +133,9 @@ bool ClusterRefreshManagerImpl::onEvent(const std::string& cluster_name, EventTy if (post_callback) { main_thread_dispatcher_.post([this, cluster_name, info]() { // Ensure that cluster is still active before calling callback. - auto map = cm_.clusters(); - auto it = map.find(cluster_name); - if (it != map.end()) { + auto maps = cm_.clusters(); + auto it = maps.active_clusters_.find(cluster_name); + if (it != maps.active_clusters_.end()) { info->cb_(); } }); diff --git a/source/extensions/stat_sinks/hystrix/hystrix.cc b/source/extensions/stat_sinks/hystrix/hystrix.cc index a35f67a8d3f7..6f4a1808accf 100644 --- a/source/extensions/stat_sinks/hystrix/hystrix.cc +++ b/source/extensions/stat_sinks/hystrix/hystrix.cc @@ -339,7 +339,7 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) { } incCounter(); std::stringstream ss; - Upstream::ClusterManager::ClusterInfoMap clusters = server_.clusterManager().clusters(); + Upstream::ClusterManager::ClusterInfoMaps all_clusters = server_.clusterManager().clusters(); // Save a map of the relevant histograms per cluster in a convenient format. absl::node_hash_map time_histograms; @@ -370,7 +370,7 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) { } } - for (auto& cluster : clusters) { + for (auto& cluster : all_clusters.active_clusters_) { Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.second.get().info(); std::unique_ptr& cluster_stats_cache_ptr = @@ -407,9 +407,9 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) { } // check if any clusters were removed, and remove from cache - if (clusters.size() < cluster_stats_cache_map_.size()) { + if (all_clusters.active_clusters_.size() < cluster_stats_cache_map_.size()) { for (auto it = cluster_stats_cache_map_.begin(); it != cluster_stats_cache_map_.end();) { - if (clusters.find(it->first) == clusters.end()) { + if (all_clusters.active_clusters_.find(it->first) == all_clusters.active_clusters_.end()) { auto next_it = std::next(it); cluster_stats_cache_map_.erase(it); it = next_it; diff --git a/source/server/admin/clusters_handler.cc b/source/server/admin/clusters_handler.cc index e0e17350b7c9..801045e63ecd 100644 --- a/source/server/admin/clusters_handler.cc +++ b/source/server/admin/clusters_handler.cc @@ -100,7 +100,9 @@ void setHealthFlag(Upstream::Host::HealthFlag flag, const Upstream::Host& host, // TODO(efimki): Add support of text readouts stats. void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) { envoy::admin::v3::Clusters clusters; - for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. + auto all_clusters = server_.clusterManager().clusters(); + for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) { const Upstream::Cluster& cluster = cluster_ref.get(); Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.info(); @@ -184,7 +186,9 @@ void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) { // TODO(efimki): Add support of text readouts stats. void ClustersHandler::writeClustersAsText(Buffer::Instance& response) { - for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. + auto all_clusters = server_.clusterManager().clusters(); + for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) { const Upstream::Cluster& cluster = cluster_ref.get(); const std::string& cluster_name = cluster.info()->name(); addOutlierInfo(cluster_name, cluster.outlierDetector(), response); diff --git a/source/server/admin/config_dump_handler.cc b/source/server/admin/config_dump_handler.cc index 9e1d54e9d3e9..e255f084757c 100644 --- a/source/server/admin/config_dump_handler.cc +++ b/source/server/admin/config_dump_handler.cc @@ -149,7 +149,9 @@ ConfigDumpHandler::addResourceToDump(envoy::admin::v3::ConfigDump& dump, const std::string& resource, bool include_eds) const { Envoy::Server::ConfigTracker::CbsMap callbacks_map = config_tracker_.getCallbacksMap(); if (include_eds) { - if (!server_.clusterManager().clusters().empty()) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. + auto all_clusters = server_.clusterManager().clusters(); + if (!all_clusters.active_clusters_.empty()) { callbacks_map.emplace("endpoint", [this] { return dumpEndpointConfigs(); }); } } @@ -195,7 +197,9 @@ void ConfigDumpHandler::addAllConfigToDump(envoy::admin::v3::ConfigDump& dump, bool include_eds) const { Envoy::Server::ConfigTracker::CbsMap callbacks_map = config_tracker_.getCallbacksMap(); if (include_eds) { - if (!server_.clusterManager().clusters().empty()) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. + auto all_clusters = server_.clusterManager().clusters(); + if (!all_clusters.active_clusters_.empty()) { callbacks_map.emplace("endpoint", [this] { return dumpEndpointConfigs(); }); } } @@ -220,8 +224,9 @@ void ConfigDumpHandler::addAllConfigToDump(envoy::admin::v3::ConfigDump& dump, ProtobufTypes::MessagePtr ConfigDumpHandler::dumpEndpointConfigs() const { auto endpoint_config_dump = std::make_unique(); - - for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. + auto all_clusters = server_.clusterManager().clusters(); + for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) { UNREFERENCED_PARAMETER(name); const Upstream::Cluster& cluster = cluster_ref.get(); Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.info(); diff --git a/test/common/grpc/async_client_manager_impl_test.cc b/test/common/grpc/async_client_manager_impl_test.cc index 55d5c14e2fb1..448f52aa6140 100644 --- a/test/common/grpc/async_client_manager_impl_test.cc +++ b/test/common/grpc/async_client_manager_impl_test.cc @@ -38,10 +38,10 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) { envoy::config::core::v3::GrpcService grpc_service; grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); - Upstream::ClusterManager::ClusterInfoMap cluster_map; + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; Upstream::MockClusterMockPrioritySet cluster; - cluster_map.emplace("foo", cluster); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); + cluster_maps.active_clusters_.emplace("foo", cluster); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_maps)); EXPECT_CALL(cluster, info()); EXPECT_CALL(*cluster.info_, addedViaApi()); @@ -65,7 +65,8 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcDynamicCluster) { Upstream::ClusterManager::ClusterInfoMap cluster_map; Upstream::MockClusterMockPrioritySet cluster; cluster_map.emplace("foo", cluster); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); + EXPECT_CALL(cm_, clusters()) + .WillOnce(Return(Upstream::ClusterManager::ClusterInfoMaps{cluster_map, {}})); EXPECT_CALL(cluster, info()); EXPECT_CALL(*cluster.info_, addedViaApi()).WillOnce(Return(true)); EXPECT_THROW_WITH_MESSAGE( diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index f052749677fb..a914225dd9d0 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -54,16 +54,20 @@ class CdsApiImplTest : public testing::Test { .WillOnce(Throw(EnvoyException(exception_msg))); } - ClusterManager::ClusterInfoMap makeClusterMap(const std::vector& clusters) { - ClusterManager::ClusterInfoMap map; - for (const auto& cluster : clusters) { - map.emplace(cluster, cm_.thread_local_cluster_.cluster_); + ClusterManager::ClusterInfoMaps + makeClusterInfoMaps(const std::vector& active_clusters, + const std::vector& warming_clusters = {}) { + ClusterManager::ClusterInfoMaps maps; + for (const auto& cluster : active_clusters) { + maps.active_clusters_.emplace(cluster, cm_.thread_local_cluster_.cluster_); } - return map; + for (const auto& cluster : warming_clusters) { + maps.warming_clusters_.emplace(cluster, cm_.thread_local_cluster_.cluster_); + } + return maps; } NiceMock cm_; - Upstream::ClusterManager::ClusterInfoMap cluster_map_; Upstream::MockClusterMockPrioritySet mock_cluster_; Stats::IsolatedStoreImpl store_; CdsApiPtr cds_; @@ -92,7 +96,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); expectAdd("cluster1", "0"); EXPECT_CALL(initialized_, ready()); EXPECT_EQ("", cds_->versionInfo()); @@ -108,7 +112,7 @@ version_info: '1' )EOF"; auto response2 = TestUtility::parseYaml(response2_yaml); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1"}))); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({"cluster1"}))); EXPECT_CALL(cm_, removeCluster("cluster1")).WillOnce(Return(true)); const auto decoded_resources_2 = TestUtility::decodeResources(response2); @@ -126,7 +130,7 @@ TEST_F(CdsApiImplTest, ValidateDuplicateClusters) { cluster_1.set_name("duplicate_cluster"); const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1}); - EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_)); + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); EXPECT_THROW_WITH_MESSAGE(cds_callbacks_->onConfigUpdate(decoded_resources.refvec_, ""), EnvoyException, @@ -139,7 +143,7 @@ TEST_F(CdsApiImplTest, EmptyConfigUpdate) { setup(); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); cds_callbacks_->onConfigUpdate({}, ""); @@ -151,7 +155,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateWith2ValidClusters) { setup(); } - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); envoy::config::cluster::v3::Cluster cluster_1; @@ -224,7 +228,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateAddsSecondClusterEvenIfFirstThrows) { setup(); } - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); envoy::config::cluster::v3::Cluster cluster_1; @@ -269,7 +273,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); expectAdd("cluster1", "0"); expectAdd("cluster2", "0"); EXPECT_CALL(initialized_, ready()); @@ -298,7 +302,7 @@ version_info: '1' auto response2 = TestUtility::parseYaml(response2_yaml); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1", "cluster2"}))); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({"cluster1", "cluster2"}))); expectAdd("cluster1", "1"); expectAdd("cluster3", "1"); EXPECT_CALL(cm_, removeCluster("cluster2")); @@ -334,7 +338,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_)); + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); const auto decoded_resources = TestUtility::decodeResources(response1); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index a4f2122c4945..1e3217285dea 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -361,7 +361,7 @@ TEST_F(ClusterManagerImplTest, ValidClusterName) { create(parseBootstrapFromV3Yaml(yaml)); cluster_manager_->clusters() - .find("cluster:name") + .active_clusters_.find("cluster:name") ->second.get() .info() ->statsScope() @@ -1490,7 +1490,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(update_cluster, "")); EXPECT_EQ(cluster2->info_, cluster_manager_->get("fake_cluster")->info()); - EXPECT_EQ(1UL, cluster_manager_->clusters().size()); + EXPECT_EQ(1UL, cluster_manager_->clusters().active_clusters_.size()); Http::ConnectionPool::MockInstance* cp = new Http::ConnectionPool::MockInstance(); EXPECT_CALL(factory_, allocateConnPool_(_, _, _)).WillOnce(Return(cp)); EXPECT_EQ(cp, cluster_manager_->httpConnPoolForCluster("fake_cluster", ResourcePriority::Default, @@ -1520,7 +1520,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { EXPECT_CALL(*cp2, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb2)); EXPECT_TRUE(cluster_manager_->removeCluster("fake_cluster")); EXPECT_EQ(nullptr, cluster_manager_->get("fake_cluster")); - EXPECT_EQ(0UL, cluster_manager_->clusters().size()); + EXPECT_EQ(0UL, cluster_manager_->clusters().active_clusters_.size()); // Close the TCP connection. Success is no ASSERT or crash due to referencing // the removed cluster. diff --git a/test/common/upstream/load_stats_reporter_test.cc b/test/common/upstream/load_stats_reporter_test.cc index 111e7356a064..46105592c00a 100644 --- a/test/common/upstream/load_stats_reporter_test.cc +++ b/test/common/upstream/load_stats_reporter_test.cc @@ -125,7 +125,8 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) { foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(2); foo_cluster.info_->eds_service_name_ = "bar"; NiceMock bar_cluster; - MockClusterManager::ClusterInfoMap cluster_info{{"foo", foo_cluster}, {"bar", bar_cluster}}; + MockClusterManager::ClusterInfoMaps cluster_info{{{"foo", foo_cluster}, {"bar", bar_cluster}}, + {}}; ON_CALL(cm_, clusters()).WillByDefault(Return(cluster_info)); deliverLoadStatsResponse({"foo"}); // Initial stats report for foo on timer tick. diff --git a/test/extensions/common/redis/cluster_refresh_manager_test.cc b/test/extensions/common/redis/cluster_refresh_manager_test.cc index 916a1457da31..ea07e1624384 100644 --- a/test/extensions/common/redis/cluster_refresh_manager_test.cc +++ b/test/extensions/common/redis/cluster_refresh_manager_test.cc @@ -33,8 +33,8 @@ class ClusterRefreshManagerTest : public testing::Test { : cluster_name_("fake_cluster"), refresh_manager_(std::make_shared( dispatcher_, cm_, time_system_)) { time_system_.setMonotonicTime(std::chrono::seconds(1)); - map_.emplace("fake_cluster", mock_cluster_); - ON_CALL(cm_, clusters()).WillByDefault(Return(map_)); + cluster_maps_.active_clusters_.emplace("fake_cluster", mock_cluster_); + ON_CALL(cm_, clusters()).WillByDefault(Return(cluster_maps_)); } ~ClusterRefreshManagerTest() override = default; @@ -104,7 +104,7 @@ class ClusterRefreshManagerTest : public testing::Test { const std::string cluster_name_; NiceMock dispatcher_; NiceMock cm_; - Upstream::ClusterManager::ClusterInfoMap map_; + Upstream::ClusterManager::ClusterInfoMaps cluster_maps_; Upstream::MockClusterMockPrioritySet mock_cluster_; Event::SimulatedTimeSystem time_system_; std::shared_ptr refresh_manager_; diff --git a/test/extensions/stats_sinks/hystrix/hystrix_test.cc b/test/extensions/stats_sinks/hystrix/hystrix_test.cc index 29e7c79d02da..38f88019602c 100644 --- a/test/extensions/stats_sinks/hystrix/hystrix_test.cc +++ b/test/extensions/stats_sinks/hystrix/hystrix_test.cc @@ -128,9 +128,9 @@ class HystrixSinkTest : public testing::Test { void createClusterAndCallbacks() { // Set cluster. - cluster_map_.emplace(cluster1_name_, cluster1_.cluster_); + cluster_maps_.active_clusters_.emplace(cluster1_name_, cluster1_.cluster_); ON_CALL(server_, clusterManager()).WillByDefault(ReturnRef(cluster_manager_)); - ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_map_)); + ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_maps_)); ON_CALL(callbacks_, encodeData(_, _)).WillByDefault(Invoke([&](Buffer::Instance& data, bool) { // Set callbacks to send data to buffer. This will append to the end of the buffer, so @@ -141,15 +141,15 @@ class HystrixSinkTest : public testing::Test { void addClusterToMap(const std::string& cluster_name, NiceMock& cluster) { - cluster_map_.emplace(cluster_name, cluster); - // Redefining since cluster_map_ is returned by value. - ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_map_)); + cluster_maps_.active_clusters_.emplace(cluster_name, cluster); + // Redefining since cluster_maps_ is returned by value. + ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_maps_)); } void removeClusterFromMap(const std::string& cluster_name) { - cluster_map_.erase(cluster_name); - // Redefining since cluster_map_ is returned by value. - ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_map_)); + cluster_maps_.active_clusters_.erase(cluster_name); + // Redefining since cluster_maps_ is returned by value. + ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_maps_)); } void addSecondClusterHelper(Buffer::OwnedImpl& buffer) { @@ -245,7 +245,7 @@ class HystrixSinkTest : public testing::Test { NiceMock callbacks_; NiceMock server_; - Upstream::ClusterManager::ClusterInfoMap cluster_map_; + Upstream::ClusterManager::ClusterInfoMaps cluster_maps_; Buffer::OwnedImpl cluster_stats_buffer_; std::unique_ptr sink_; diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index 3fc55beb56e2..a9d96e44a8a2 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -553,8 +553,9 @@ TEST_P(AdsIntegrationTest, CdsPausedDuringWarming) { // Send the second warming cluster. sendDiscoveryResponse( - Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_2")}, - {buildCluster("warming_cluster_2")}, {}, "3"); + Config::TypeUrl::get().Cluster, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3"); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); // We would've got a Cluster discovery request with version 2 here, had the CDS not been paused. @@ -586,6 +587,87 @@ TEST_P(AdsIntegrationTest, CdsPausedDuringWarming) { {"warming_cluster_2", "warming_cluster_1"}, {}, {})); } +TEST_P(AdsIntegrationTest, RemoveWarmingCluster) { + initialize(); + + // Send initial configuration, validate we can process a request. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {buildCluster("cluster_0")}, + {buildCluster("cluster_0")}, {}, "1"); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "", + {"cluster_0"}, {"cluster_0"}, {})); + + sendDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {}, {}, {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {}, {}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().Listener, {buildListener("listener_0", "route_config_0")}, + {buildListener("listener_0", "route_config_0")}, {}, "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", + {"cluster_0"}, {}, {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "", + {"route_config_0"}, {"route_config_0"}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, {buildRouteConfig("route_config_0", "cluster_0")}, + {buildRouteConfig("route_config_0", "cluster_0")}, {}, "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "1", {}, {}, {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "1", + {"route_config_0"}, {}, {})); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + makeSingleRequest(); + + // Send the first warming cluster. + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_1")}, + {buildCluster("warming_cluster_1")}, {"cluster_0"}, "2"); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", + {"warming_cluster_1"}, {"warming_cluster_1"}, {"cluster_0"})); + + // Send the second warming cluster and remove the first cluster. + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {buildCluster("warming_cluster_2")}, + {buildCluster("warming_cluster_2")}, + // Delta: remove warming_cluster_1. + {"warming_cluster_1"}, "3"); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + // We would've got a Cluster discovery request with version 2 here, had the CDS not been paused. + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", + {"warming_cluster_2"}, {"warming_cluster_2"}, + {"warming_cluster_1"})); + + // Finish warming the clusters. Note that the first warming cluster is not included in the + // response. + sendDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, + {buildClusterLoadAssignment("warming_cluster_2")}, + {buildClusterLoadAssignment("warming_cluster_2")}, {"cluster_0"}, "2"); + + // Validate that all clusters are warmed. + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // CDS is resumed and EDS response was acknowledged. + if (sotw_or_delta_ == Grpc::SotwOrDelta::Delta) { + // Envoy will ACK both Cluster messages. Since they arrived while CDS was paused, they aren't + // sent until CDS is unpaused. Since version 3 has already arrived by the time the version 2 + // ACK goes out, they're both acknowledging version 3. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "3", {}, {}, {})); + } + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "3", {}, {}, {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "2", + {"warming_cluster_2"}, {}, {})); +} // Validate that warming listeners are removed when left out of SOTW update. TEST_P(AdsIntegrationTest, RemoveWarmingListener) { initialize(); @@ -696,8 +778,9 @@ TEST_P(AdsIntegrationTest, ClusterWarmingOnNamedResponse) { // Send the second warming cluster. sendDiscoveryResponse( - Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_2")}, - {buildCluster("warming_cluster_2")}, {}, "3"); + Config::TypeUrl::get().Cluster, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3"); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", @@ -1359,8 +1442,8 @@ TEST_P(AdsClusterV2Test, CdsPausedDuringWarming) { // Send the second warming cluster. sendDiscoveryResponse( - cds_type_url, {buildCluster("warming_cluster_2")}, {buildCluster("warming_cluster_2")}, {}, - "3", true); + cds_type_url, {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3", true); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); // We would've got a Cluster discovery request with version 2 here, had the CDS not been paused. diff --git a/test/integration/custom_cluster_integration_test.cc b/test/integration/custom_cluster_integration_test.cc index 62d5cac5ab9f..84c936e006c5 100644 --- a/test/integration/custom_cluster_integration_test.cc +++ b/test/integration/custom_cluster_integration_test.cc @@ -69,10 +69,10 @@ TEST_P(CustomClusterIntegrationTest, TestCustomConfig) { initialize(); // Verify the cluster is correctly setup with the custom priority - const auto& cluster_map = test_server_->server().clusterManager().clusters(); - EXPECT_EQ(1, cluster_map.size()); - EXPECT_EQ(1, cluster_map.count("cluster_0")); - const auto& cluster_ref = cluster_map.find("cluster_0")->second; + const auto& cluster_maps = test_server_->server().clusterManager().clusters(); + EXPECT_EQ(1, cluster_maps.active_clusters_.size()); + EXPECT_EQ(1, cluster_maps.active_clusters_.count("cluster_0")); + const auto& cluster_ref = cluster_maps.active_clusters_.find("cluster_0")->second; const auto& hostset_per_priority = cluster_ref.get().prioritySet().hostSetsPerPriority(); EXPECT_EQ(11, hostset_per_priority.size()); const Envoy::Upstream::HostSetPtr& host_set = hostset_per_priority[10]; diff --git a/test/integration/eds_integration_test.cc b/test/integration/eds_integration_test.cc index 3e1c237e7c59..b8ca3c6f07c2 100644 --- a/test/integration/eds_integration_test.cc +++ b/test/integration/eds_integration_test.cc @@ -316,9 +316,9 @@ TEST_P(EdsIntegrationTest, OverprovisioningFactorUpdate) { setEndpoints(4, 4, 0); auto get_and_compare = [this](const uint32_t expected_factor) { const auto& cluster_map = test_server_->server().clusterManager().clusters(); - EXPECT_EQ(1, cluster_map.size()); - EXPECT_EQ(1, cluster_map.count("cluster_0")); - const auto& cluster_ref = cluster_map.find("cluster_0")->second; + EXPECT_EQ(1, cluster_map.active_clusters_.size()); + EXPECT_EQ(1, cluster_map.active_clusters_.count("cluster_0")); + const auto& cluster_ref = cluster_map.active_clusters_.find("cluster_0")->second; const auto& hostset_per_priority = cluster_ref.get().prioritySet().hostSetsPerPriority(); EXPECT_EQ(1, hostset_per_priority.size()); const Envoy::Upstream::HostSetPtr& host_set = hostset_per_priority[0]; @@ -340,7 +340,7 @@ TEST_P(EdsIntegrationTest, BatchMemberUpdateCb) { auto& priority_set = test_server_->server() .clusterManager() .clusters() - .find("cluster_0") + .active_clusters_.find("cluster_0") ->second.get() .prioritySet(); diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index c24b1b045acd..cc3071052f67 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -42,7 +42,8 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD(void, setInitializedCb, (InitializationCompleteCallback)); MOCK_METHOD(void, initializeSecondaryClusters, (const envoy::config::bootstrap::v3::Bootstrap& bootstrap)); - MOCK_METHOD(ClusterInfoMap, clusters, ()); + MOCK_METHOD(ClusterInfoMaps, clusters, ()); + MOCK_METHOD(const ClusterSet&, primaryClusters, ()); MOCK_METHOD(ThreadLocalCluster*, get, (absl::string_view cluster)); MOCK_METHOD(Http::ConnectionPool::Instance*, httpConnPoolForCluster, diff --git a/test/server/admin/clusters_handler_test.cc b/test/server/admin/clusters_handler_test.cc index f0ba3f5ae7e5..4d41b6c24969 100644 --- a/test/server/admin/clusters_handler_test.cc +++ b/test/server/admin/clusters_handler_test.cc @@ -14,11 +14,11 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, AdminInstanceTest, TestUtility::ipTestParamsToString); TEST_P(AdminInstanceTest, ClustersJson) { - Upstream::ClusterManager::ClusterInfoMap cluster_map; - ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_map)); + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; + ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_maps)); NiceMock cluster; - cluster_map.emplace(cluster.info_->name_, cluster); + cluster_maps.active_clusters_.emplace(cluster.info_->name_, cluster); NiceMock outlier_detector; ON_CALL(Const(cluster), outlierDetector()).WillByDefault(Return(&outlier_detector)); diff --git a/test/server/admin/config_dump_handler_test.cc b/test/server/admin/config_dump_handler_test.cc index 7c7f5f57781f..6075dffa4a67 100644 --- a/test/server/admin/config_dump_handler_test.cc +++ b/test/server/admin/config_dump_handler_test.cc @@ -113,11 +113,11 @@ TEST_P(AdminInstanceTest, ConfigDumpMaintainsOrder) { // Test that using ?include_eds parameter adds EDS to the config dump. TEST_P(AdminInstanceTest, ConfigDumpWithEndpoint) { - Upstream::ClusterManager::ClusterInfoMap cluster_map; - ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_map)); + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; + ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_maps)); NiceMock cluster; - cluster_map.emplace(cluster.info_->name_, cluster); + cluster_maps.active_clusters_.emplace(cluster.info_->name_, cluster); ON_CALL(*cluster.info_, addedViaApi()).WillByDefault(Return(false)); @@ -186,11 +186,11 @@ TEST_P(AdminInstanceTest, ConfigDumpWithEndpoint) { // Test EDS config dump while multiple localities and priorities exist TEST_P(AdminInstanceTest, ConfigDumpWithLocalityEndpoint) { - Upstream::ClusterManager::ClusterInfoMap cluster_map; - ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_map)); + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; + ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_maps)); NiceMock cluster; - cluster_map.emplace(cluster.info_->name_, cluster); + cluster_maps.active_clusters_.emplace(cluster.info_->name_, cluster); ON_CALL(*cluster.info_, addedViaApi()).WillByDefault(Return(false)); @@ -398,11 +398,11 @@ TEST_P(AdminInstanceTest, ConfigDumpFiltersByResource) { // We add both static and dynamic endpoint config to the dump, but expect only // dynamic in the JSON with ?resource=dynamic_endpoint_configs. TEST_P(AdminInstanceTest, ConfigDumpWithEndpointFiltersByResource) { - Upstream::ClusterManager::ClusterInfoMap cluster_map; - ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_map)); + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; + ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_maps)); NiceMock cluster_1; - cluster_map.emplace(cluster_1.info_->name_, cluster_1); + cluster_maps.active_clusters_.emplace(cluster_1.info_->name_, cluster_1); ON_CALL(*cluster_1.info_, addedViaApi()).WillByDefault(Return(true)); @@ -419,7 +419,7 @@ TEST_P(AdminInstanceTest, ConfigDumpWithEndpointFiltersByResource) { NiceMock cluster_2; cluster_2.info_->name_ = "fake_cluster_2"; - cluster_map.emplace(cluster_2.info_->name_, cluster_2); + cluster_maps.active_clusters_.emplace(cluster_2.info_->name_, cluster_2); ON_CALL(*cluster_2.info_, addedViaApi()).WillByDefault(Return(false)); diff --git a/test/server/configuration_impl_test.cc b/test/server/configuration_impl_test.cc index eeeba3585b77..c8e63cb180f6 100644 --- a/test/server/configuration_impl_test.cc +++ b/test/server/configuration_impl_test.cc @@ -161,10 +161,10 @@ TEST_F(ConfigurationImplTest, SetUpstreamClusterPerConnectionBufferLimit) { MainImpl config; config.initialize(bootstrap, server_, cluster_manager_factory_); - ASSERT_EQ(1U, config.clusterManager()->clusters().count("test_cluster")); + ASSERT_EQ(1U, config.clusterManager()->clusters().active_clusters_.count("test_cluster")); EXPECT_EQ(8192U, config.clusterManager() ->clusters() - .find("test_cluster") + .active_clusters_.find("test_cluster") ->second.get() .info() ->perConnectionBufferLimitBytes());