-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test: FakeUpstream threading fixes #14526
Changes from 4 commits
512610f
19a63f7
5f767cf
1bfa958
45b8864
3b653f2
ee052d6
e9f4597
3c55b38
495b19e
6580555
f90610f
22211c9
0d4fc3d
d0565e5
197362f
3aee6d7
8fccaf3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
#include "test/test_common/utility.h" | ||
|
||
#include "absl/strings/str_cat.h" | ||
#include "absl/synchronization/notification.h" | ||
|
||
using namespace std::chrono_literals; | ||
|
||
|
@@ -354,14 +355,28 @@ AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { | |
} | ||
|
||
AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) { | ||
return shared_connection_.executeOnDispatcher( | ||
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); | ||
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called | ||
// from outside the dispatcher thread. | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().readDisable(disable); | ||
return AssertionSuccess(); | ||
} else { | ||
return shared_connection_.executeOnDispatcher( | ||
[disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); | ||
} | ||
} | ||
|
||
AssertionResult FakeConnectionBase::enableHalfClose(bool enable, | ||
std::chrono::milliseconds timeout) { | ||
return shared_connection_.executeOnDispatcher( | ||
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); | ||
// Do the work inline if called from the dispatcher thread, executeOnDispatcher can only be called | ||
// from outside the dispatcher thread. | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().enableHalfClose(enable); | ||
return AssertionSuccess(); | ||
} else { | ||
return shared_connection_.executeOnDispatcher( | ||
[enable](Network::Connection& connection) { connection.enableHalfClose(enable); }, timeout); | ||
} | ||
} | ||
|
||
Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { | ||
|
@@ -552,16 +567,19 @@ AssertionResult FakeUpstream::waitForHttpConnection( | |
client_dispatcher, timeout)) { | ||
return AssertionFailure() << "Timed out waiting for new connection."; | ||
} | ||
} | ||
|
||
return runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&lock_); | ||
connection = std::make_unique<FakeHttpConnection>( | ||
*this, consumeConnection(), http_type_, time_system_, max_request_headers_kb, | ||
max_request_headers_count, headers_with_underscores_action); | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
if (read_disable_on_new_connection_) { | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
} | ||
return AssertionSuccess(); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
if (read_disable_on_new_connection_) { | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
} | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
|
||
AssertionResult | ||
|
@@ -585,14 +603,18 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, | |
client_dispatcher, 5ms)) { | ||
continue; | ||
} | ||
} | ||
|
||
return upstream.runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&upstream.lock_); | ||
connection = std::make_unique<FakeHttpConnection>( | ||
upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), | ||
Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, | ||
envoy::config::core::v3::HttpProtocolOptions::ALLOW); | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
return AssertionSuccess(); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
} | ||
return AssertionFailure() << "Timed out waiting for HTTP connection."; | ||
|
@@ -610,11 +632,29 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect | |
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { | ||
return AssertionFailure() << "Timed out waiting for raw connection"; | ||
} | ||
} | ||
|
||
return runOnDispatcherThreadAndWait([&]() { | ||
absl::MutexLock lock(&lock_); | ||
connection = std::make_unique<FakeRawConnection>(consumeConnection(), timeSystem()); | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); | ||
return AssertionSuccess(); | ||
}); | ||
} | ||
|
||
testing::AssertionResult | ||
FakeUpstream::waitForAndConsumeDisconnectedConnection(std::chrono::milliseconds timeout) { | ||
absl::MutexLock lock(&lock_); | ||
const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { | ||
return !new_connections_.empty() && !new_connections_.front()->connected(); | ||
}; | ||
|
||
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { | ||
return AssertionFailure() << "Timed out waiting for raw connection"; | ||
} | ||
VERIFY_ASSERTION(connection->initialize()); | ||
VERIFY_ASSERTION(connection->readDisable(false)); | ||
VERIFY_ASSERTION(connection->enableHalfClose(enable_half_close_)); | ||
consumeConnection(); | ||
return AssertionSuccess(); | ||
} | ||
|
||
|
@@ -647,6 +687,18 @@ void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { | |
received_datagrams_.emplace_back(std::move(data)); | ||
} | ||
|
||
AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this have a timeout to avoid test hard timeout? It could be hard coded to 15s if that is easier? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a timeout which results in a RELEASE_ASSERT if hit. The callbacks run on the dispatcher should be really fast, so it taking an extended period of time indicates that something has gone really wrong. Cancelling the work is not an option since it is hard to tell how far the callback got before the timeout happens. Returning AssertionResult is not a good option because there are FakeUpstream functions that wait for an HTTP upstream connection with short timeout in a loop and interpret the returned AssertionResult as a retryable failure. |
||
AssertionResult result = AssertionSuccess(); | ||
absl::Notification done; | ||
ASSERT(!dispatcher_->isThreadSafe()); | ||
dispatcher_->post([&]() { | ||
result = cb(); | ||
done.Notify(); | ||
}); | ||
done.WaitForNotification(); | ||
return result; | ||
} | ||
|
||
void FakeUpstream::sendUdpDatagram(const std::string& buffer, | ||
const Network::Address::InstanceConstSharedPtr& peer) { | ||
dispatcher_->post([this, buffer, peer] { | ||
|
@@ -683,14 +735,19 @@ FakeRawConnection::~FakeRawConnection() { | |
} | ||
|
||
testing::AssertionResult FakeRawConnection::initialize() { | ||
auto filter = Network::ReadFilterSharedPtr{new ReadFilter(*this)}; | ||
ASSERT(shared_connection_.connection().dispatcher().isThreadSafe()); | ||
Network::ReadFilterSharedPtr filter{new ReadFilter(*this)}; | ||
read_filter_ = filter; | ||
testing::AssertionResult result = shared_connection_.executeOnDispatcher( | ||
[filter = std::move(filter)](Network::Connection& connection) { | ||
connection.addReadFilter(filter); | ||
}); | ||
if (!result) { | ||
return result; | ||
if (shared_connection_.connection().dispatcher().isThreadSafe()) { | ||
shared_connection_.connection().addReadFilter(filter); | ||
} else { | ||
testing::AssertionResult result = shared_connection_.executeOnDispatcher( | ||
[filter = std::move(filter)](Network::Connection& connection) { | ||
connection.addReadFilter(filter); | ||
}); | ||
if (!result) { | ||
return result; | ||
} | ||
} | ||
return FakeConnectionBase::initialize(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -438,11 +438,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutHttp) { | |
hds_stream_->sendGrpcMessage(server_health_check_specifier_); | ||
test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); | ||
|
||
// Envoy sends a health check message to an endpoint | ||
ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); | ||
|
||
// Endpoint doesn't respond to the health check | ||
ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); | ||
// Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the | ||
// health check | ||
ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); | ||
|
||
// Receive updates until the one we expect arrives | ||
waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT); | ||
|
@@ -515,11 +513,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointTimeoutTcp) { | |
hds_stream_->sendGrpcMessage(server_health_check_specifier_); | ||
test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); | ||
|
||
// Envoys asks the endpoint if it's healthy | ||
ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); | ||
|
||
// No response from the endpoint | ||
ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); | ||
// Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the | ||
// health check | ||
ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); | ||
|
||
// Receive updates until the one we expect arrives | ||
waitForEndpointHealthResponse(envoy::config::core::v3::TIMEOUT); | ||
|
@@ -1031,6 +1027,8 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) { | |
tls_hosts_ = true; | ||
|
||
initialize(); | ||
// Allow the fake upstreams to detect an error and disconnect during the TLS handshake. | ||
host_upstream_->setReadDisableOnNewConnection(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The readDisable on the connection does not prevent the delivery of EPOLLOUT events to the connection. The delivery of a write event triggers the start of the SSL handshake operation which can perform reads and writes. The handling of the out event sometimes results in detection of the plain HTTP request sent by the proxy to the fake upstream, which can trigger termination of the upstream connection in the middle of the process of creating a raw connection, which results in a use-after-free crash. By fully enabling events on the TLS upstream and waiting for disconnect we avoid flaky failures. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add all this color to the comment for the next person? (or since you have the same fix below maybe have a well named helper function with a bunch of comments? Up to you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm going to go back to see if I can fix this properly. I think that the original waitForRawConnection followed by waitForDisconnect API was more intuitive than the setReadDisableOnNewConnection+waitForAndConsumeDisconnectedConnection that I need to avoid use after free in the TLS handshake failure cases. I think the solution is to avoid creating the network connection object until later and thus preventing events from being handled when the filter chain and related objects are not fully setup /wait There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was able to make tests work with the old API by allowing raw connection creation to succeed in cases where the network connection is already disconnected. Not having to call different methods in tests that expect disconnects feels easier. PTAL. |
||
|
||
// Server <--> Envoy | ||
waitForHdsStream(); | ||
|
@@ -1046,11 +1044,9 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) { | |
hds_stream_->sendGrpcMessage(server_health_check_specifier_); | ||
test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); | ||
|
||
// Envoy sends a health check message to an endpoint | ||
ASSERT_TRUE(host_upstream_->waitForRawConnection(host_fake_raw_connection_)); | ||
|
||
// Endpoint doesn't respond to the health check | ||
ASSERT_TRUE(host_fake_raw_connection_->waitForDisconnect()); | ||
// Envoy sends a health check message to an endpoint, but the endpoint doesn't respond to the | ||
// health check | ||
ASSERT_TRUE(host_upstream_->waitForAndConsumeDisconnectedConnection()); | ||
|
||
// Receive updates until the one we expect arrives. This should be UNHEALTHY and not TIMEOUT, | ||
// because TIMEOUT occurs in the situation where there is no response from the endpoint. In this | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to special-case same-thread calls? Is that because when calling from the same thread, we are assuming the operation is blocking? (add code comments and we are good to go)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executeOnDispatcher blocks on until the posted callback finishes. Calling post and waiting from the dispatcher thread results in a deadlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense; adding a code comment may help the reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea: Instead of dealing this in all call sites, could executeOnDispatcher() figure this out and just run the lambda versus posting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I undid this branching and reverted this function.
FakeUpstream now does readDisable and enableHalfClose directly on the network connection instead of going through this wrapper in the cases where the operation happens from the dispatcher thread. Some tests still call this method and that requires going down the executeOnDispatcher version.