diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index 13683282d37e..80321fc83592 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -69,9 +69,7 @@ class VersionedGrpcClientIntegrationParamTest return fmt::format("{}_{}_{}", std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6", std::get<1>(p.param) == ClientType::GoogleGrpc ? "GoogleGrpc" : "EnvoyGrpc", - std::get<2>(p.param) == envoy::config::core::v3::ApiVersion::V3 - ? "V3" - : envoy::config::core::v3::ApiVersion::V2 ? "V2" : "AUTO"); + ApiVersion_Name(std::get<2>(p.param))); } Network::Address::IpVersion ipVersion() const override { return std::get<0>(GetParam()); } ClientType clientType() const override { return std::get<1>(GetParam()); } diff --git a/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc b/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc index 590a3d280e51..f2b5c2559933 100644 --- a/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc +++ b/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc @@ -126,7 +126,8 @@ class AccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara }; INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, AccessLogIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Test a basic full access logging flow. TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) { diff --git a/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc b/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc index e8516e0f6d3a..51fc54f02d78 100644 --- a/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc +++ b/test/extensions/access_loggers/grpc/tcp_grpc_access_log_integration_test.cc @@ -126,7 +126,8 @@ class TcpGrpcAccessLogIntegrationTest : public Grpc::VersionedGrpcClientIntegrat }; INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, TcpGrpcAccessLogIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Test a basic full access logging flow. TEST_P(TcpGrpcAccessLogIntegrationTest, BasicAccessLogFlow) { diff --git a/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc b/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc index c3ec137d8360..13e8c40b378b 100644 --- a/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc +++ b/test/extensions/filters/http/ext_authz/ext_authz_integration_test.cc @@ -560,7 +560,8 @@ class ExtAuthzHttpIntegrationTest : public HttpIntegrationTest, }; INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, ExtAuthzGrpcIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Verifies that the request body is included in the CheckRequest when the downstream protocol is // HTTP/1.1. diff --git a/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc b/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc index b02137388102..1a8e83aa5740 100644 --- a/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc +++ b/test/extensions/filters/http/ratelimit/ratelimit_integration_test.cc @@ -231,14 +231,18 @@ class RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFailureModeIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFilterHeadersEnabledIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, RatelimitFilterEnvoyRatelimitedHeaderDisabledIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); TEST_P(RatelimitIntegrationTest, Ok) { XDS_DEPRECATED_FEATURE_TEST_SKIP; diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index 0bf4f9cada60..e325e04d5037 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -150,7 +150,8 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, MetricsServiceIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Test a basic metric service flow. TEST_P(MetricsServiceIntegrationTest, BasicFlow) { diff --git a/test/extensions/transport_sockets/proxy_protocol/proxy_protocol_integration_test.cc b/test/extensions/transport_sockets/proxy_protocol/proxy_protocol_integration_test.cc index 5747ec73d125..7e0f40d136c0 100644 --- a/test/extensions/transport_sockets/proxy_protocol/proxy_protocol_integration_test.cc +++ b/test/extensions/transport_sockets/proxy_protocol/proxy_protocol_integration_test.cc @@ -16,6 +16,7 @@ class ProxyProtocolIntegrationTest : public testing::TestWithParaminitialize(); - RELEASE_ASSERT(result, result.message()); + http_connection->initialize(); http_connections_.push_back(std::move(http_connection)); return true; } diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 0b18b9e8c9b6..90fb9cd50404 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -20,6 +20,7 @@ #include "test/test_common/utility.h" #include "absl/strings/str_cat.h" +#include "absl/synchronization/notification.h" using namespace std::chrono_literals; @@ -358,12 +359,6 @@ AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milli [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); } -AssertionResult FakeConnectionBase::enableHalfClose(bool enable, - std::chrono::milliseconds timeout) { - return shared_connection_.executeOnDispatcher( - [enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); -} - Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { absl::MutexLock lock(&lock_); new_streams_.emplace_back(new FakeStream(*this, encoder, time_system_)); @@ -513,6 +508,9 @@ bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection, const std::vector&) { absl::MutexLock lock(&lock_); if (read_disable_on_new_connection_) { + // Disable early close detection to avoid closing the network connection before full + // initialization is complete. + connection.detectEarlyCloseWhenReadDisabled(false); connection.readDisable(true); } auto connection_wrapper = std::make_unique(connection); @@ -552,16 +550,16 @@ AssertionResult FakeUpstream::waitForHttpConnection( client_dispatcher, timeout)) { return AssertionFailure() << "Timed out waiting for new connection."; } + } + return runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(&lock_); connection = std::make_unique( *this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, max_request_headers_count, headers_with_underscores_action); - } - VERIFY_ASSERTION(connection->initialize()); - if (read_disable_on_new_connection_) { - VERIFY_ASSERTION(connection->readDisable(false)); - } - return AssertionSuccess(); + connection->initialize(); + return AssertionSuccess(); + }); } AssertionResult @@ -585,14 +583,17 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, client_dispatcher, 5ms)) { continue; } + } + + return upstream.runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(&upstream.lock_); connection = std::make_unique( upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, envoy::config::core::v3::HttpProtocolOptions::ALLOW); - } - VERIFY_ASSERTION(connection->initialize()); - VERIFY_ASSERTION(connection->readDisable(false)); - return AssertionSuccess(); + connection->initialize(); + return AssertionSuccess(); + }); } } return AssertionFailure() << "Timed out waiting for HTTP connection."; @@ -610,19 +611,35 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { return AssertionFailure() << "Timed out waiting for raw connection"; } - connection = std::make_unique(consumeConnection(), timeSystem()); } - VERIFY_ASSERTION(connection->initialize()); - VERIFY_ASSERTION(connection->readDisable(false)); - VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); - return AssertionSuccess(); + + return runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(&lock_); + connection = std::make_unique(consumeConnection(), timeSystem()); + connection->initialize(); + // Skip enableHalfClose if the connection is already disconnected. + if (connection->connected()) { + connection->connection().enableHalfClose(enable_half_close_); + } + return AssertionSuccess(); + }); } SharedConnectionWrapper& FakeUpstream::consumeConnection() { ASSERT(!new_connections_.empty()); auto* const connection_wrapper = new_connections_.front().get(); + // Skip the thread safety check if the network connection has already been freed since there's no + // alternate way to get access to the dispatcher. + ASSERT(!connection_wrapper->connected() || + connection_wrapper->connection().dispatcher().isThreadSafe()); connection_wrapper->setParented(); connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); + if (read_disable_on_new_connection_ && connection_wrapper->connected()) { + // Re-enable read and early close detection. + auto& connection = connection_wrapper->connection(); + connection.detectEarlyCloseWhenReadDisabled(true); + connection.readDisable(false); + } return *connection_wrapper; } @@ -647,6 +664,20 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { received_datagrams_.emplace_back(std::move(data)); } +AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function cb, + std::chrono::milliseconds timeout) { + auto result = std::make_shared(AssertionSuccess()); + auto done = std::make_shared(); + ASSERT(!dispatcher_->isThreadSafe()); + dispatcher_->post([&]() { + *result = cb(); + done->Notify(); + }); + RELEASE_ASSERT(done->WaitForNotificationWithTimeout(absl::FromChrono(timeout)), + "Timed out waiting for cb to run on dispatcher"); + return *result; +} + void FakeUpstream::sendUdpDatagram(const std::string& buffer, const Network::Address::InstanceConstSharedPtr& peer) { dispatcher_->post([this, buffer, peer] { @@ -682,17 +713,16 @@ FakeRawConnection::~FakeRawConnection() { } } -testing::AssertionResult FakeRawConnection::initialize() { - auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)}; +void FakeRawConnection::initialize() { + FakeConnectionBase::initialize(); + Network::ReadFilterSharedPtr filter{new ReadFilter(*this)}; read_filter_ = filter; - testing::AssertionResult result = shared_connection_.executeOnDispatcher( - [filter = std::move(filter)](Network::Connection& connection) { - connection.addReadFilter(filter); - }); - if (!result) { - return result; + if (!shared_connection_.connected()) { + ENVOY_LOG(warn, "FakeRawConnection::initialize: network connection is already disconnected"); + return; } - return FakeConnectionBase::initialize(); + ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); + shared_connection_.connection().addReadFilter(filter); } AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data, diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index d68f86677764..0e96ea5a8ad1 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -257,17 +257,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, connection_.addConnectionCallbacks(*this); } - Common::CallbackHandle* addDisconnectCallback(DisconnectCallback callback) { - absl::MutexLock lock(&lock_); - return disconnect_callback_manager_.add(callback); - } - - // Avoid directly removing by caller, since CallbackManager is not thread safe. - void removeDisconnectCallback(Common::CallbackHandle* handle) { - absl::MutexLock lock(&lock_); - handle->remove(); - } - // Network::ConnectionCallbacks void onEvent(Network::ConnectionEvent event) override { // Throughout this entire function, we know that the connection_ cannot disappear, since this @@ -278,7 +267,6 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { disconnected_ = true; - disconnect_callback_manager_.runCallbacks(); } } @@ -315,6 +303,10 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, if (disconnected_) { return testing::AssertionSuccess(); } + // Sanity check: detect if the post and wait is attempted from the dispatcher thread; fail + // immediately instead of deadlocking. + ASSERT(!connection_.dispatcher().isThreadSafe(), + "deadlock: executeOnDispatcher called from dispatcher thread."); bool callback_ready_event = false; bool unexpected_disconnect = false; connection_.dispatcher().post( @@ -345,13 +337,13 @@ class SharedConnectionWrapper : public Network::ConnectionCallbacks, void setParented() { absl::MutexLock lock(&lock_); + ASSERT(!parented_); parented_ = true; } private: Network::Connection& connection_; absl::Mutex lock_; - Common::CallbackManager<> disconnect_callback_manager_ ABSL_GUARDED_BY(lock_); bool parented_ ABSL_GUARDED_BY(lock_){}; bool disconnected_ ABSL_GUARDED_BY(lock_){}; }; @@ -363,7 +355,10 @@ using SharedConnectionWrapperPtr = std::unique_ptr; */ class FakeConnectionBase : public Logger::Loggable { public: - virtual ~FakeConnectionBase() { ASSERT(initialized_); } + virtual ~FakeConnectionBase() { + absl::MutexLock lock(&lock_); + ASSERT(initialized_); + } ABSL_MUST_USE_RESULT testing::AssertionResult close(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); @@ -380,14 +375,10 @@ class FakeConnectionBase : public Logger::Loggable { testing::AssertionResult waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - ABSL_MUST_USE_RESULT - virtual testing::AssertionResult initialize() { + virtual void initialize() { + absl::MutexLock lock(&lock_); initialized_ = true; - return testing::AssertionSuccess(); } - ABSL_MUST_USE_RESULT - testing::AssertionResult - enableHalfClose(bool enabled, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); // The same caveats apply here as in SharedConnectionWrapper::connection(). Network::Connection& connection() const { return shared_connection_.connection(); } bool connected() const { return shared_connection_.connected(); } @@ -398,9 +389,9 @@ class FakeConnectionBase : public Logger::Loggable { time_system_(time_system) {} SharedConnectionWrapper& shared_connection_; - bool initialized_{}; absl::Mutex& lock_; // TODO(mattklein123): Use the shared connection lock and figure out better // guarded by annotations. + bool initialized_ ABSL_GUARDED_BY(lock_){}; bool half_closed_ ABSL_GUARDED_BY(lock_){}; Event::TestTimeSystem& time_system_; }; @@ -493,8 +484,7 @@ class FakeRawConnection : public FakeConnectionBase { testing::AssertionResult write(const std::string& data, bool end_stream = false, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); - ABSL_MUST_USE_RESULT - testing::AssertionResult initialize() override; + void initialize() override; // Creates a ValidatorFunction which returns true when data_to_wait_for is // contained in the incoming data string. Unlike many of Envoy waitFor functions, @@ -736,6 +726,9 @@ class FakeUpstream : Logger::Loggable, void threadRoutine(); SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_); void onRecvDatagram(Network::UdpRecvData& data); + AssertionResult + runOnDispatcherThreadAndWait(std::function cb, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); Network::SocketSharedPtr socket_; Network::ListenSocketFactorySharedPtr socket_factory_; diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index c473226d73f1..85ebbc9b8d31 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -374,7 +374,8 @@ class HdsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationParamTest, }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, HdsIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Tests Envoy HTTP health checking a single healthy endpoint and reporting that it is // indeed healthy to the server. diff --git a/test/integration/load_stats_integration_test.cc b/test/integration/load_stats_integration_test.cc index d47682397192..bd2ad88ed00d 100644 --- a/test/integration/load_stats_integration_test.cc +++ b/test/integration/load_stats_integration_test.cc @@ -390,7 +390,8 @@ class LoadStatsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationPara }; INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, LoadStatsIntegrationTest, - VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS); + VERSIONED_GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::VersionedGrpcClientIntegrationParamTest::protocolTestParamsToString); // Validate the load reports for successful requests as cluster membership // changes.