From 239013eccc1a57939be938ba69c1e8e8eea61d76 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Sat, 19 Dec 2020 15:13:35 -0800 Subject: [PATCH 1/2] api: relax inline_string length limitation in DataSource (#14461) Signed-off-by: Lizan Zhou --- api/envoy/config/core/v3/base.proto | 4 +- api/envoy/config/core/v4alpha/base.proto | 4 +- .../envoy/config/core/v3/base.proto | 4 +- .../envoy/config/core/v4alpha/base.proto | 4 +- source/common/config/datasource.cc | 15 +++-- test/common/config/datasource_test.cc | 30 ++++++++++ .../local_reply_integration_test.cc | 55 +++++++++++++++++++ 7 files changed, 104 insertions(+), 12 deletions(-) diff --git a/api/envoy/config/core/v3/base.proto b/api/envoy/config/core/v3/base.proto index 5b5339ea5bc5..74a7d55a7374 100644 --- a/api/envoy/config/core/v3/base.proto +++ b/api/envoy/config/core/v3/base.proto @@ -331,10 +331,10 @@ message DataSource { string filename = 1 [(validate.rules).string = {min_len: 1}]; // Bytes inlined in the configuration. - bytes inline_bytes = 2 [(validate.rules).bytes = {min_len: 1}]; + bytes inline_bytes = 2; // String inlined in the configuration. - string inline_string = 3 [(validate.rules).string = {min_len: 1}]; + string inline_string = 3; } } diff --git a/api/envoy/config/core/v4alpha/base.proto b/api/envoy/config/core/v4alpha/base.proto index 27b0b356b1a7..6a967b1ae5f2 100644 --- a/api/envoy/config/core/v4alpha/base.proto +++ b/api/envoy/config/core/v4alpha/base.proto @@ -329,10 +329,10 @@ message DataSource { string filename = 1 [(validate.rules).string = {min_len: 1}]; // Bytes inlined in the configuration. - bytes inline_bytes = 2 [(validate.rules).bytes = {min_len: 1}]; + bytes inline_bytes = 2; // String inlined in the configuration. - string inline_string = 3 [(validate.rules).string = {min_len: 1}]; + string inline_string = 3; } } diff --git a/generated_api_shadow/envoy/config/core/v3/base.proto b/generated_api_shadow/envoy/config/core/v3/base.proto index 1184c89de6e2..807045fde4c9 100644 --- a/generated_api_shadow/envoy/config/core/v3/base.proto +++ b/generated_api_shadow/envoy/config/core/v3/base.proto @@ -329,10 +329,10 @@ message DataSource { string filename = 1 [(validate.rules).string = {min_len: 1}]; // Bytes inlined in the configuration. - bytes inline_bytes = 2 [(validate.rules).bytes = {min_len: 1}]; + bytes inline_bytes = 2; // String inlined in the configuration. - string inline_string = 3 [(validate.rules).string = {min_len: 1}]; + string inline_string = 3; } } diff --git a/generated_api_shadow/envoy/config/core/v4alpha/base.proto b/generated_api_shadow/envoy/config/core/v4alpha/base.proto index 95ca4f77a2bc..78fb00882e2c 100644 --- a/generated_api_shadow/envoy/config/core/v4alpha/base.proto +++ b/generated_api_shadow/envoy/config/core/v4alpha/base.proto @@ -336,10 +336,10 @@ message DataSource { string filename = 1 [(validate.rules).string = {min_len: 1}]; // Bytes inlined in the configuration. - bytes inline_bytes = 2 [(validate.rules).bytes = {min_len: 1}]; + bytes inline_bytes = 2; // String inlined in the configuration. - string inline_string = 3 [(validate.rules).string = {min_len: 1}]; + string inline_string = 3; } } diff --git a/source/common/config/datasource.cc b/source/common/config/datasource.cc index 776061a61be2..3f60da031a28 100644 --- a/source/common/config/datasource.cc +++ b/source/common/config/datasource.cc @@ -15,20 +15,27 @@ static constexpr uint32_t RetryCount = 1; std::string read(const envoy::config::core::v3::DataSource& source, bool allow_empty, Api::Api& api) { + std::string data; switch (source.specifier_case()) { case envoy::config::core::v3::DataSource::SpecifierCase::kFilename: - return api.fileSystem().fileReadToEnd(source.filename()); + data = api.fileSystem().fileReadToEnd(source.filename()); + break; case envoy::config::core::v3::DataSource::SpecifierCase::kInlineBytes: - return source.inline_bytes(); + data = source.inline_bytes(); + break; case envoy::config::core::v3::DataSource::SpecifierCase::kInlineString: - return source.inline_string(); + data = source.inline_string(); + break; default: if (!allow_empty) { throw EnvoyException( fmt::format("Unexpected DataSource::specifier_case(): {}", source.specifier_case())); } - return ""; } + if (!allow_empty && data.empty()) { + throw EnvoyException("DataSource cannot be empty"); + } + return data; } absl::optional getPath(const envoy::config::core::v3::DataSource& source) { diff --git a/test/common/config/datasource_test.cc b/test/common/config/datasource_test.cc index 70860a7fefd2..8e2900b38c4a 100644 --- a/test/common/config/datasource_test.cc +++ b/test/common/config/datasource_test.cc @@ -105,6 +105,36 @@ TEST_F(AsyncDataSourceTest, LoadLocalDataSource) { EXPECT_EQ(async_data, "xxxxxx"); } +TEST_F(AsyncDataSourceTest, LoadLocalEmptyDataSource) { + AsyncDataSourcePb config; + + std::string yaml = R"EOF( + local: + inline_string: "" + )EOF"; + TestUtility::loadFromYamlAndValidate(yaml, config); + EXPECT_TRUE(config.has_local()); + + std::string async_data; + + EXPECT_CALL(init_manager_, add(_)).WillOnce(Invoke([this](const Init::Target& target) { + init_target_handle_ = target.createHandle("test"); + })); + + local_data_provider_ = std::make_unique( + init_manager_, config.local(), true, *api_, [&](const std::string& data) { + EXPECT_EQ(init_manager_.state(), Init::Manager::State::Initializing); + EXPECT_EQ(data, ""); + async_data = data; + }); + + EXPECT_CALL(init_manager_, state()).WillOnce(Return(Init::Manager::State::Initializing)); + EXPECT_CALL(init_watcher_, ready()); + + init_target_handle_->initialize(init_watcher_); + EXPECT_EQ(async_data, ""); +} + TEST_F(AsyncDataSourceTest, LoadRemoteDataSourceNoCluster) { AsyncDataSourcePb config; diff --git a/test/integration/local_reply_integration_test.cc b/test/integration/local_reply_integration_test.cc index 74da5ae0c25a..8bf22764eb10 100644 --- a/test/integration/local_reply_integration_test.cc +++ b/test/integration/local_reply_integration_test.cc @@ -411,4 +411,59 @@ TEST_P(LocalReplyIntegrationTest, ShouldFormatResponseToCustomString) { EXPECT_EQ(response->body(), "513 - customized body text"); } +// Should return formatted text/plain response. +TEST_P(LocalReplyIntegrationTest, ShouldFormatResponseToEmptyBody) { + const std::string yaml = R"EOF( +mappers: +- filter: + status_code_filter: + comparison: + op: EQ + value: + default_value: 503 + runtime_key: key_b + status_code: 513 + body: + inline_string: "" +body_format: + text_format_source: + inline_string: "" +)EOF"; + setLocalReplyConfig(yaml); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto encoder_decoder = codec_client_->startRequest( + Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"test-header", "exact-match-value-2"}}); + auto response = std::move(encoder_decoder.second); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + ASSERT_TRUE(fake_upstream_connection_->close()); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + response->waitForEndStream(); + + if (downstream_protocol_ == Http::CodecClient::Type::HTTP1) { + ASSERT_TRUE(codec_client_->waitForDisconnect()); + } else { + codec_client_->close(); + } + + EXPECT_FALSE(upstream_request_->complete()); + EXPECT_EQ(0U, upstream_request_->bodyLength()); + + EXPECT_TRUE(response->complete()); + + EXPECT_EQ("513", response->headers().Status()->value().getStringView()); + + EXPECT_EQ(response->body(), ""); +} + } // namespace Envoy From 867b9e23d2e48350bd1b0d1fbc392a8355f20e35 Mon Sep 17 00:00:00 2001 From: Tong Cai Date: Sun, 20 Dec 2020 07:14:49 +0800 Subject: [PATCH 2/2] server: wait workers to start before draining parent. (#14319) Signed-off-by: Tong Cai --- include/envoy/server/listener_manager.h | 3 +- source/server/listener_hooks.h | 6 ++ source/server/listener_manager_impl.cc | 15 +++-- source/server/listener_manager_impl.h | 2 +- source/server/server.cc | 27 +++++--- source/server/server.h | 1 + test/integration/server.h | 1 + test/mocks/server/listener_manager.h | 2 +- test/server/listener_manager_impl_test.cc | 68 +++++++++++--------- test/server/listener_manager_impl_test.h | 1 + test/server/server_test.cc | 75 ++++++++++++++++++++++- 11 files changed, 151 insertions(+), 50 deletions(-) diff --git a/include/envoy/server/listener_manager.h b/include/envoy/server/listener_manager.h index d76d73cf315e..01fe285b1c86 100644 --- a/include/envoy/server/listener_manager.h +++ b/include/envoy/server/listener_manager.h @@ -202,8 +202,9 @@ class ListenerManager { /** * Start all workers accepting new connections on all added listeners. * @param guard_dog supplies the guard dog to use for thread watching. + * @param callback supplies the callback to complete server initialization. */ - virtual void startWorkers(GuardDog& guard_dog) PURE; + virtual void startWorkers(GuardDog& guard_dog, std::function callback) PURE; /** * Stop all listeners from accepting new connections without actually removing any of them. This diff --git a/source/server/listener_hooks.h b/source/server/listener_hooks.h index 1b3de394ab13..1d88ab4760af 100644 --- a/source/server/listener_hooks.h +++ b/source/server/listener_hooks.h @@ -22,6 +22,11 @@ class ListenerHooks { */ virtual void onWorkerListenerRemoved() PURE; + /** + * Called when all workers have started. + */ + virtual void onWorkersStarted() PURE; + /** * Called when the Runtime::ScopedLoaderSingleton is created by the server. */ @@ -36,6 +41,7 @@ class DefaultListenerHooks : public ListenerHooks { // ListenerHooks void onWorkerListenerAdded() override {} void onWorkerListenerRemoved() override {} + void onWorkersStarted() override {} void onRuntimeCreated() override {} }; diff --git a/source/server/listener_manager_impl.cc b/source/server/listener_manager_impl.cc index fb17e6810ed2..eb588456a9b9 100644 --- a/source/server/listener_manager_impl.cc +++ b/source/server/listener_manager_impl.cc @@ -884,7 +884,7 @@ bool ListenerManagerImpl::removeListenerInternal(const std::string& name, return true; } -void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) { +void ListenerManagerImpl::startWorkers(GuardDog& guard_dog, std::function callback) { ENVOY_LOG(info, "all dependencies initialized. starting workers"); ASSERT(!workers_started_); workers_started_ = true; @@ -899,11 +899,13 @@ void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) { ENVOY_LOG(debug, "starting worker {}", i); ASSERT(warming_listeners_.empty()); for (const auto& listener : active_listeners_) { - addListenerToWorker(*worker, absl::nullopt, *listener, [this, listeners_pending_init]() { - if (--(*listeners_pending_init) == 0) { - stats_.workers_started_.set(1); - } - }); + addListenerToWorker(*worker, absl::nullopt, *listener, + [this, listeners_pending_init, callback]() { + if (--(*listeners_pending_init) == 0) { + stats_.workers_started_.set(1); + callback(); + } + }); } worker->start(guard_dog); if (enable_dispatcher_stats_) { @@ -913,6 +915,7 @@ void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) { } if (active_listeners_.empty()) { stats_.workers_started_.set(1); + callback(); } } diff --git a/source/server/listener_manager_impl.h b/source/server/listener_manager_impl.h index c29a0f8478ea..63bef43f0993 100644 --- a/source/server/listener_manager_impl.h +++ b/source/server/listener_manager_impl.h @@ -193,7 +193,7 @@ class ListenerManagerImpl : public ListenerManager, Logger::Loggable callback) override; void stopListeners(StopListenersType stop_listeners_type) override; void stopWorkers() override; void beginListenerUpdate() override { error_state_tracker_.clear(); } diff --git a/source/server/server.cc b/source/server/server.cc index ef9e211c2e8f..fc41edc2ed08 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -87,7 +87,7 @@ InstanceImpl::InstanceImpl( : nullptr), grpc_context_(store.symbolTable()), http_context_(store.symbolTable()), router_context_(store.symbolTable()), process_context_(std::move(process_context)), - main_thread_id_(std::this_thread::get_id()), server_contexts_(*this) { + main_thread_id_(std::this_thread::get_id()), hooks_(hooks), server_contexts_(*this) { try { if (!options.logPath().empty()) { try { @@ -609,15 +609,22 @@ void InstanceImpl::onRuntimeReady() { } void InstanceImpl::startWorkers() { - listener_manager_->startWorkers(*worker_guard_dog_); - initialization_timer_->complete(); - // Update server stats as soon as initialization is done. - updateServerStats(); - workers_started_ = true; - // At this point we are ready to take traffic and all listening ports are up. Notify our parent - // if applicable that they can stop listening and drain. - restarter_.drainParentListeners(); - drain_manager_->startParentShutdownSequence(); + // The callback will be called after workers are started. + listener_manager_->startWorkers(*worker_guard_dog_, [this]() { + if (isShutdown()) { + return; + } + + initialization_timer_->complete(); + // Update server stats as soon as initialization is done. + updateServerStats(); + workers_started_ = true; + hooks_.onWorkersStarted(); + // At this point we are ready to take traffic and all listening ports are up. Notify our + // parent if applicable that they can stop listening and drain. + restarter_.drainParentListeners(); + drain_manager_->startParentShutdownSequence(); + }); } Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server, diff --git a/source/server/server.h b/source/server/server.h index 47d6c6b0a18a..1b9147fc6eb7 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -365,6 +365,7 @@ class InstanceImpl final : Logger::Loggable, // initialization_time is a histogram for tracking the initialization time across hot restarts // whenever we have support for histogram merge across hot restarts. Stats::TimespanPtr initialization_timer_; + ListenerHooks& hooks_; ServerFactoryContextImpl server_contexts_; diff --git a/test/integration/server.h b/test/integration/server.h index 9d70d735bbb4..87544c711815 100644 --- a/test/integration/server.h +++ b/test/integration/server.h @@ -415,6 +415,7 @@ class IntegrationTestServer : public Logger::Loggable, on_server_ready_cb_ = std::move(on_server_ready); } void onRuntimeCreated() override {} + void onWorkersStarted() override {} void start(const Network::Address::IpVersion version, std::function on_server_init_function, bool deterministic, diff --git a/test/mocks/server/listener_manager.h b/test/mocks/server/listener_manager.h index 582be9ac9bac..c7c855508f6d 100644 --- a/test/mocks/server/listener_manager.h +++ b/test/mocks/server/listener_manager.h @@ -21,7 +21,7 @@ class MockListenerManager : public ListenerManager { (ListenerState state)); MOCK_METHOD(uint64_t, numConnections, (), (const)); MOCK_METHOD(bool, removeListener, (const std::string& listener_name)); - MOCK_METHOD(void, startWorkers, (GuardDog & guard_dog)); + MOCK_METHOD(void, startWorkers, (GuardDog & guard_dog, std::function callback)); MOCK_METHOD(void, stopListeners, (StopListenersType listeners_type)); MOCK_METHOD(void, stopWorkers, ()); MOCK_METHOD(void, beginListenerUpdate, ()); diff --git a/test/server/listener_manager_impl_test.cc b/test/server/listener_manager_impl_test.cc index 178ed3dfcf1e..86121c09ccee 100644 --- a/test/server/listener_manager_impl_test.cc +++ b/test/server/listener_manager_impl_test.cc @@ -335,7 +335,7 @@ TEST_F(ListenerManagerImplWithRealFiltersTest, TransportSocketConnectTimeout) { TEST_F(ListenerManagerImplWithRealFiltersTest, UdpAddress) { EXPECT_CALL(*worker_, start(_)); EXPECT_FALSE(manager_->isWorkerStarted()); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Validate that there are no active listeners and workers are started. EXPECT_EQ(0, server_.stats_store_ .gauge("listener_manager.total_active_listeners", @@ -873,7 +873,7 @@ version_info: version1 )EOF"); EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Now add new version listener foo after workers start, note it's fine that server_init_mgr is // initialized, as no target will be added to it. @@ -960,7 +960,7 @@ filter_chains: {} .RetiresOnSaturation(); EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); EXPECT_EQ(0, server_.stats_store_.counter("listener_manager.listener_create_success").value()); checkStats(__LINE__, 1, 0, 0, 0, 1, 0, 0); @@ -1102,7 +1102,7 @@ version_info: version2 // Start workers. EXPECT_CALL(*worker_, addListener(_, _, _)); EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Validate that workers_started stat is still zero before workers set the status via // completion callback. EXPECT_EQ(0, server_.stats_store_ @@ -1307,7 +1307,7 @@ TEST_F(ListenerManagerImplTest, UpdateActiveToWarmAndBack) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add and initialize foo listener. const std::string listener_foo_yaml = R"EOF( @@ -1368,7 +1368,7 @@ TEST_F(ListenerManagerImplTest, AddReusableDrainingListener) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener directly into active. const std::string listener_foo_yaml = R"EOF( @@ -1428,7 +1428,7 @@ TEST_F(ListenerManagerImplTest, AddClosedDrainingListener) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener directly into active. const std::string listener_foo_yaml = R"EOF( @@ -1481,7 +1481,7 @@ TEST_F(ListenerManagerImplTest, BindToPortEqualToFalse) { InSequence s; ProdListenerComponentFactory real_listener_factory(server_); EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); const std::string listener_foo_yaml = R"EOF( name: foo address: @@ -1519,7 +1519,7 @@ TEST_F(ListenerManagerImplTest, ReusePortEqualToTrue) { InSequence s; ProdListenerComponentFactory real_listener_factory(server_); EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); const std::string listener_foo_yaml = R"EOF( name: foo address: @@ -1574,7 +1574,7 @@ TEST_F(ListenerManagerImplTest, CantBindSocket) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); const std::string listener_foo_yaml = R"EOF( name: foo @@ -1627,7 +1627,7 @@ TEST_F(ListenerManagerImplTest, ConfigDumpWithExternalError) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Make sure the config dump is empty by default. ListenerManager::FailureStates empty_failure_state; @@ -1663,7 +1663,7 @@ TEST_F(ListenerManagerImplTest, ListenerDraining) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); const std::string listener_foo_yaml = R"EOF( name: foo @@ -1713,7 +1713,7 @@ TEST_F(ListenerManagerImplTest, RemoveListener) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Remove an unknown listener. EXPECT_FALSE(manager_->removeListener("unknown")); @@ -1795,7 +1795,7 @@ TEST_F(ListenerManagerImplTest, StopListeners) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener in inbound direction. const std::string listener_foo_yaml = R"EOF( @@ -1900,7 +1900,7 @@ TEST_F(ListenerManagerImplTest, StopAllListeners) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into warming. const std::string listener_foo_yaml = R"EOF( @@ -1948,7 +1948,7 @@ TEST_F(ListenerManagerImplTest, StopWarmingListener) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into warming. const std::string listener_foo_yaml = R"EOF( @@ -2005,7 +2005,7 @@ TEST_F(ListenerManagerImplTest, AddListenerFailure) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into active. const std::string listener_foo_yaml = R"EOF( @@ -2042,7 +2042,7 @@ TEST_F(ListenerManagerImplTest, StaticListenerAddFailure) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into active. const std::string listener_foo_yaml = R"EOF( @@ -2096,7 +2096,7 @@ TEST_F(ListenerManagerImplTest, DuplicateAddressDontBind) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into warming. const std::string listener_foo_yaml = R"EOF( @@ -4223,7 +4223,7 @@ TEST_F(ListenerManagerImplWithRealFiltersTest, VerifyIgnoreExpirationWithCA) { TEST_F(ListenerManagerImplWithDispatcherStatsTest, DispatherStatsWithCorrectPrefix) { EXPECT_CALL(*worker_, start(_)); EXPECT_CALL(*worker_, initializeStats(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); } TEST_F(ListenerManagerImplWithRealFiltersTest, ApiListener) { @@ -4352,7 +4352,7 @@ TEST_F(ListenerManagerImplTest, StopInplaceWarmingListener) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into warming. const std::string listener_foo_yaml = R"EOF( @@ -4414,7 +4414,7 @@ TEST_F(ListenerManagerImplTest, RemoveInplaceUpdatingListener) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into warming. const std::string listener_foo_yaml = R"EOF( @@ -4483,7 +4483,7 @@ TEST_F(ListenerManagerImplTest, UpdateInplaceWarmingListener) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into warming. const std::string listener_foo_yaml = R"EOF( @@ -4546,7 +4546,7 @@ TEST_F(ListenerManagerImplTest, DrainageDuringInplaceUpdate) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener into warming. const std::string listener_foo_yaml = R"EOF( @@ -4696,7 +4696,7 @@ TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TraditionalUpdateIfWo TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TraditionalUpdateIfAnyListenerIsNotTcp) { EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); auto listener_proto = createDefaultListener(); @@ -4722,7 +4722,7 @@ TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, auto tls_inspector_injection_enabled_guard = enableTlsInspectorInjectionForThisTest(); EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); auto listener_proto = createDefaultListener(); @@ -4748,7 +4748,7 @@ TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, DEPRECATED_FEATURE_TEST(TraditionalUpdateIfImplicitProxyProtocolChanges)) { EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); auto listener_proto = createDefaultListener(); @@ -4768,7 +4768,7 @@ TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TraditionalUpdateOnZeroFilterChain) { EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); auto listener_proto = createDefaultListener(); @@ -4792,7 +4792,7 @@ TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TraditionalUpdateOnZe TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TraditionalUpdateIfListenerConfigHasUpdateOtherThanFilterChain) { EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); auto listener_proto = createDefaultListener(); @@ -4816,7 +4816,7 @@ TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TEST_F(ListenerManagerImplTest, RuntimeDisabledInPlaceUpdateFallbacksToTraditionalUpdate) { InSequence s; EXPECT_CALL(*worker_, start(_)); - manager_->startWorkers(guard_dog_); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); // Add foo listener. const std::string listener_foo_yaml = R"EOF( @@ -4951,6 +4951,14 @@ TEST_F(ListenerManagerImplTest, TcpBacklogCustomConfig) { EXPECT_EQ(100U, manager_->listeners().back().get().tcpBacklogSize()); } +TEST_F(ListenerManagerImplTest, WorkersStartedCallbackCalled) { + InSequence s; + + EXPECT_CALL(*worker_, start(_)); + EXPECT_CALL(callback_, Call()); + manager_->startWorkers(guard_dog_, callback_.AsStdFunction()); +} + } // namespace } // namespace Server } // namespace Envoy diff --git a/test/server/listener_manager_impl_test.h b/test/server/listener_manager_impl_test.h index 01104f1729e4..68df5988d9b1 100644 --- a/test/server/listener_manager_impl_test.h +++ b/test/server/listener_manager_impl_test.h @@ -304,6 +304,7 @@ class ListenerManagerImplTest : public testing::Test { std::unique_ptr socket_; uint64_t listener_tag_{1}; bool enable_dispatcher_stats_{false}; + NiceMock> callback_; }; } // namespace Server diff --git a/test/server/server_test.cc b/test/server/server_test.cc index 162126c25b3c..c09fb90ef732 100644 --- a/test/server/server_test.cc +++ b/test/server/server_test.cc @@ -181,6 +181,11 @@ class ServerInstanceImplTestBase { void initialize(const std::string& bootstrap_path) { initialize(bootstrap_path, false); } void initialize(const std::string& bootstrap_path, const bool use_intializing_instance) { + initialize(bootstrap_path, use_intializing_instance, hooks_); + } + + void initialize(const std::string& bootstrap_path, const bool use_intializing_instance, + ListenerHooks& hooks) { if (options_.config_path_.empty()) { options_.config_path_ = TestEnvironment::temporaryFileSubstitute( bootstrap_path, {{"upstream_0", 0}, {"upstream_1", 0}}, version_); @@ -194,7 +199,7 @@ class ServerInstanceImplTestBase { server_ = std::make_unique( *init_manager_, options_, time_system_, - std::make_shared("127.0.0.1"), hooks_, restart_, + std::make_shared("127.0.0.1"), hooks, restart_, stats_store_, fakelock_, component_factory_, std::make_unique>(), *thread_local_, Thread::threadFactoryForTest(), Filesystem::fileSystemForTest(), @@ -313,6 +318,18 @@ class CustomStatsSinkFactory : public Server::Configuration::StatsSinkFactory { std::string name() const override { return "envoy.custom_stats_sink"; } }; +// CustomListenerHooks is used for synchronization between test thread and server thread. +class CustomListenerHooks : public DefaultListenerHooks { +public: + CustomListenerHooks(std::function workers_started_cb) + : on_workers_started_cb_(workers_started_cb) {} + + void onWorkersStarted() override { on_workers_started_cb_(); } + +private: + std::function on_workers_started_cb_; +}; + INSTANTIATE_TEST_SUITE_P(IpVersions, ServerInstanceImplTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); @@ -423,6 +440,37 @@ TEST_P(ServerInstanceImplTest, LifecycleNotifications) { server_thread->join(); } +TEST_P(ServerInstanceImplTest, DrainParentListenerAfterWorkersStarted) { + bool workers_started = false; + absl::Notification workers_started_fired, workers_started_block; + // Expect drainParentListeners not to be called before workers start. + EXPECT_CALL(restart_, drainParentListeners).Times(0); + + // Run the server in a separate thread so we can test different lifecycle stages. + auto server_thread = Thread::threadFactoryForTest().createThread([&] { + auto hooks = CustomListenerHooks([&]() { + workers_started = true; + workers_started_fired.Notify(); + workers_started_block.WaitForNotification(); + }); + initialize("test/server/test_data/server/node_bootstrap.yaml", false, hooks); + server_->run(); + server_ = nullptr; + thread_local_ = nullptr; + }); + + workers_started_fired.WaitForNotification(); + EXPECT_TRUE(workers_started); + EXPECT_TRUE(TestUtility::findGauge(stats_store_, "server.state")->used()); + EXPECT_EQ(0L, TestUtility::findGauge(stats_store_, "server.state")->value()); + + EXPECT_CALL(restart_, drainParentListeners); + workers_started_block.Notify(); + + server_->dispatcher().post([&] { server_->shutdown(); }); + server_thread->join(); +} + // A test target which never signals that it is ready. class NeverReadyTarget : public Init::TargetImpl { public: @@ -463,6 +511,31 @@ TEST_P(ServerInstanceImplTest, NoLifecycleNotificationOnEarlyShutdown) { server_thread->join(); } +TEST_P(ServerInstanceImplTest, ShutdownBeforeWorkersStarted) { + // Test that drainParentListeners() should never be called because we will shutdown + // early before the server starts worker threads. + EXPECT_CALL(restart_, drainParentListeners).Times(0); + + auto server_thread = Thread::threadFactoryForTest().createThread([&] { + initialize("test/server/test_data/server/node_bootstrap.yaml"); + + auto post_init_handle = server_->registerCallback(ServerLifecycleNotifier::Stage::PostInit, + [&] { server_->shutdown(); }); + + // This shutdown notification should never be called because we will shutdown early. + auto shutdown_handle = server_->registerCallback(ServerLifecycleNotifier::Stage::ShutdownExit, + [&](Event::PostCb) { FAIL(); }); + server_->run(); + + post_init_handle = nullptr; + shutdown_handle = nullptr; + server_ = nullptr; + thread_local_ = nullptr; + }); + + server_thread->join(); +} + TEST_P(ServerInstanceImplTest, V2ConfigOnly) { options_.service_cluster_name_ = "some_cluster_name"; options_.service_node_name_ = "some_node_name";