Skip to content

Commit

Permalink
async_tcp_client: remove callbacks if connection was not closed (envo…
Browse files Browse the repository at this point in the history
…yproxy#35410)

When the ``AsyncTcpClient`` is being destroyed but it
also has an active client connection, there's a crash since during the
instance destruction, the ``ClientConnection`` object would also be
destroyed, causing ``raiseEvent`` to be called back to
``AsyncTcpClient`` while it is being destroyed

Caught with the following stack trace:
```
Caught Segmentation fault, suspect faulting address 0x0
Backtrace (use tools/stack_decode.py to get line numbers):
Envoy version: ee8c765a07037033766ea556c032120b497152b3/1.27.0/Clean/RELEASE/BoringSSL
#0: __restore_rt [0x7d80ab903420]
#1: Envoy::Extensions::AccessLoggers::Fluentd::FluentdAccessLoggerImpl::onEvent() [0x58313528746b]
envoyproxy#2: Envoy::Tcp::AsyncTcpClientImpl::onEvent() [0x5831359da00a]
envoyproxy#3: Envoy::Network::ConnectionImplBase::raiseConnectionEvent() [0x583135f0521d]
envoyproxy#4: Envoy::Network::ConnectionImpl::raiseEvent() [0x583135e9fed9]
envoyproxy#5: Envoy::Network::ConnectionImpl::closeSocket() [0x583135e9f90c]
envoyproxy#6: Envoy::Network::ConnectionImpl::close() [0x583135e9e54c]
envoyproxy#7: Envoy::Network::ConnectionImpl::~ConnectionImpl() [0x583135e9de5c]
envoyproxy#8: Envoy::Network::ClientConnectionImpl::~ClientConnectionImpl() [0x5831355fd25e]
envoyproxy#9: Envoy::Tcp::AsyncTcpClientImpl::~AsyncTcpClientImpl() [0x5831359da247]
envoyproxy#10: Envoy::Extensions::AccessLoggers::Fluentd::FluentdAccessLoggerImpl::~FluentdAccessLoggerImpl() [0x583135289350]
envoyproxy#11: Envoy::Extensions::AccessLoggers::Fluentd::FluentdAccessLog::ThreadLocalLogger::~ThreadLocalLogger() [0x58313528adbf]
envoyproxy#12: Envoy::ThreadLocal::InstanceImpl::shutdownThread() [0x58313560373a]
envoyproxy#13: Envoy::Server::WorkerImpl::threadRoutine() [0x583135630c0a]
envoyproxy#14: Envoy::Thread::ThreadImplPosix::ThreadImplPosix()::{lambda()#1}::__invoke() [0x5831364e88d5]
envoyproxy#15: start_thread [0x7d80ab8f7609]
```
Risk Level: low
Testing: unit tests
Docs Changes: none
Release Notes: none
Platform Specific Features: none

---------

Signed-off-by: Ohad Vano <[email protected]>
  • Loading branch information
ohadvano authored Aug 1, 2024
1 parent 770fa7b commit 0e03184
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 19 deletions.
12 changes: 11 additions & 1 deletion source/common/tcp/async_tcp_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ AsyncTcpClientImpl::AsyncTcpClientImpl(Event::Dispatcher& dispatcher,
connect_timer_(dispatcher.createTimer([this]() { onConnectTimeout(); })),
enable_half_close_(enable_half_close) {}

AsyncTcpClientImpl::~AsyncTcpClientImpl() {
if (connection_) {
connection_->removeConnectionCallbacks(*this);
}

close(Network::ConnectionCloseType::NoFlush);
}

bool AsyncTcpClientImpl::connect() {
if (connection_) {
return false;
Expand Down Expand Up @@ -69,7 +77,8 @@ void AsyncTcpClientImpl::onConnectTimeout() {
}

void AsyncTcpClientImpl::close(Network::ConnectionCloseType type) {
if (connection_) {
if (connection_ && !closing_) {
closing_ = true;
connection_->close(type);
}
}
Expand Down Expand Up @@ -127,6 +136,7 @@ void AsyncTcpClientImpl::onEvent(Network::ConnectionEvent event) {
detected_close_ = connection_->detectedCloseType();
}

closing_ = false;
dispatcher_.deferredDelete(std::move(connection_));
if (callbacks_) {
callbacks_->onEvent(event);
Expand Down
2 changes: 2 additions & 0 deletions source/common/tcp/async_tcp_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient,
AsyncTcpClientImpl(Event::Dispatcher& dispatcher,
Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context, bool enable_half_close);
~AsyncTcpClientImpl();

void close(Network::ConnectionCloseType type) override;

Expand Down Expand Up @@ -106,6 +107,7 @@ class AsyncTcpClientImpl : public AsyncTcpClient,
Event::TimerPtr connect_timer_;
AsyncTcpClientCallbacks* callbacks_{};
Network::DetectedCloseType detected_close_{Network::DetectedCloseType::Normal};
bool closing_{false};
bool connected_{false};
bool enable_half_close_{false};
};
Expand Down
13 changes: 11 additions & 2 deletions test/common/tcp/async_tcp_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ using testing::Return;
namespace Envoy {
namespace Tcp {

class CustomMockClientConnection : public Network::MockClientConnection {
public:
~CustomMockClientConnection() {
if (state_ != Connection::State::Closed) {
raiseEvent(Network::ConnectionEvent::LocalClose);
}
};
};

class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public testing::Test {
public:
AsyncTcpClientImplTest() = default;
Expand All @@ -32,7 +41,7 @@ class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public test
}

void expectCreateConnection(bool trigger_connected = true) {
connection_ = new NiceMock<Network::MockClientConnection>();
connection_ = new NiceMock<CustomMockClientConnection>();
Upstream::MockHost::MockCreateConnectionData conn_info;
connection_->streamInfo().setAttemptCount(1);
conn_info.connection_ = connection_;
Expand All @@ -59,7 +68,7 @@ class AsyncTcpClientImplTest : public Event::TestUsingSimulatedTime, public test
NiceMock<Event::MockTimer>* connect_timer_;
NiceMock<Event::MockDispatcher> dispatcher_;
NiceMock<Upstream::MockClusterManager> cluster_manager_;
Network::MockClientConnection* connection_{};
CustomMockClientConnection* connection_{};

NiceMock<Tcp::AsyncClient::MockAsyncTcpClientCallbacks> callbacks_;
};
Expand Down
9 changes: 8 additions & 1 deletion test/integration/filters/test_network_async_tcp_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
const test::integration::filters::TestNetworkAsyncTcpFilterConfig& config,
Stats::Scope& scope, Upstream::ClusterManager& cluster_manager)
: stats_(generateStats("test_network_async_tcp_filter", scope)),
cluster_name_(config.cluster_name()), cluster_manager_(cluster_manager) {
cluster_name_(config.cluster_name()), kill_after_on_data_(config.kill_after_on_data()),
cluster_manager_(cluster_manager) {
const auto thread_local_cluster = cluster_manager_.getThreadLocalCluster(cluster_name_);
options_ = std::make_shared<Tcp::AsyncTcpClientOptions>(true);
if (thread_local_cluster != nullptr) {
Expand All @@ -60,6 +61,11 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
data.length());
client_->write(data, end_stream);

if (kill_after_on_data_) {
Tcp::AsyncTcpClient* c1 = client_.release();
delete c1;
}

return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -166,6 +172,7 @@ class TestNetworkAsyncTcpFilter : public Network::ReadFilter {
TestNetworkAsyncTcpFilterStats stats_;
Tcp::AsyncTcpClientPtr client_;
absl::string_view cluster_name_;
bool kill_after_on_data_;
std::unique_ptr<RequestAsyncCallbacks> request_callbacks_;
std::unique_ptr<DownstreamCallbacks> downstream_callbacks_;
Upstream::ClusterManager& cluster_manager_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package test.integration.filters;

message TestNetworkAsyncTcpFilterConfig {
string cluster_name = 1;
bool kill_after_on_data = 2;
}
61 changes: 46 additions & 15 deletions test/integration/tcp_async_client_integration_test.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "test/integration/filters/test_network_async_tcp_filter.pb.h"
#include "test/integration/integration.h"

#include "gtest/gtest.h"
Expand All @@ -16,15 +17,37 @@ class TcpAsyncClientIntegrationTest : public testing::TestWithParam<Network::Add
typed_config:
"@type": type.googleapis.com/test.integration.filters.TestNetworkAsyncTcpFilterConfig
cluster_name: cluster_0
)EOF")) {}
)EOF")) {
enableHalfClose(true);
}

void init(bool kill_after_on_data = false) {
const std::string yaml = fmt::format(R"EOF(
cluster_name: cluster_0
kill_after_on_data: {}
)EOF",
kill_after_on_data ? "true" : "false");

config_helper_.addConfigModifier(
[&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
test::integration::filters::TestNetworkAsyncTcpFilterConfig proto_config;
TestUtility::loadFromYaml(yaml, proto_config);

auto* listener = bootstrap.mutable_static_resources()->mutable_listeners(0);
auto* filter_chain = listener->mutable_filter_chains(0);
auto* filter = filter_chain->mutable_filters(0);
filter->mutable_typed_config()->PackFrom(proto_config);
});

BaseIntegrationTest::initialize();
}
};

INSTANTIATE_TEST_SUITE_P(IpVersions, TcpAsyncClientIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()));

TEST_P(TcpAsyncClientIntegrationTest, SingleRequest) {
enableHalfClose(true);
initialize();
init();

std::string request("request");
std::string response("response");
Expand All @@ -51,8 +74,7 @@ TEST_P(TcpAsyncClientIntegrationTest, SingleRequest) {
}

TEST_P(TcpAsyncClientIntegrationTest, MultipleRequestFrames) {
enableHalfClose(true);
initialize();
init();

std::string data_frame_1("data_frame_1");
std::string data_frame_2("data_frame_2");
Expand Down Expand Up @@ -85,8 +107,7 @@ TEST_P(TcpAsyncClientIntegrationTest, MultipleRequestFrames) {
}

TEST_P(TcpAsyncClientIntegrationTest, MultipleResponseFrames) {
enableHalfClose(true);
initialize();
init();

std::string data_frame_1("data_frame_1");
std::string response_1("response_1");
Expand Down Expand Up @@ -116,8 +137,7 @@ TEST_P(TcpAsyncClientIntegrationTest, Reconnect) {
return;
}

enableHalfClose(true);
initialize();
init();

IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
ASSERT_TRUE(tcp_client->write("hello1", false));
Expand All @@ -143,11 +163,24 @@ TEST_P(TcpAsyncClientIntegrationTest, Reconnect) {
test_server_->waitForGaugeEq("cluster.cluster_0.upstream_cx_active", 0);
}

TEST_P(TcpAsyncClientIntegrationTest, ClientTearDown) {
init(true);

std::string request("request");

IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0"));
ASSERT_TRUE(tcp_client->write(request, true));
FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
ASSERT_TRUE(fake_upstream_connection->waitForData(request.size()));

tcp_client->close();
}

#if ENVOY_PLATFORM_ENABLE_SEND_RST
// Test if RST close can be detected from downstream and upstream is closed by RST.
TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) {
enableHalfClose(true);
initialize();
init();

std::string request("request");
std::string response("response");
Expand Down Expand Up @@ -178,8 +211,7 @@ TEST_P(TcpAsyncClientIntegrationTest, TestClientCloseRST) {

// Test if RST close can be detected from upstream.
TEST_P(TcpAsyncClientIntegrationTest, TestUpstreamCloseRST) {
enableHalfClose(true);
initialize();
init();

std::string request("request");
std::string response("response");
Expand Down Expand Up @@ -212,8 +244,7 @@ TEST_P(TcpAsyncClientIntegrationTest, TestUpstreamCloseRST) {
// the client. The behavior is different for windows, since RST support is literally supported for
// unix like system, disabled the test for windows.
TEST_P(TcpAsyncClientIntegrationTest, TestDownstremHalfClosedThenRST) {
enableHalfClose(true);
initialize();
init();

std::string request("request");
std::string response("response");
Expand Down

0 comments on commit 0e03184

Please sign in to comment.