Skip to content

Commit

Permalink
core: prevent race that causes double delete of HCM ActiveStreams (#669)
Browse files Browse the repository at this point in the history
Description: previously Envoy Mobile was not guarded against a race that resulted in a double deletion of an Http::ConnectionManagerImpl::ActiveStream. This PR creates state that protects against that race.
Risk Level: low, integration test reproe'd the stack trace, and then updated prod code fixed as expected.
Testing: new integration test.

Fixes #668

Signed-off-by: Jose Nino <[email protected]>
Signed-off-by: JP Simard <[email protected]>
  • Loading branch information
junr03 authored and jpsim committed Nov 29, 2022
1 parent 211e81e commit 55bcaf7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 3 deletions.
51 changes: 49 additions & 2 deletions mobile/library/common/http/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ Dispatcher::DirectStreamCallbacks::DirectStreamCallbacks(DirectStream& direct_st
void Dispatcher::DirectStreamCallbacks::encodeHeaders(const HeaderMap& headers, bool end_stream) {
ENVOY_LOG(debug, "[S{}] response headers for stream (end_stream={}):\n{}",
direct_stream_.stream_handle_, end_stream, headers);

// @see Dispatcher::resetStream's comment on its call to runResetCallbacks.
// Envoy only deferredDeletes the ActiveStream if it was fully closed otherwise it issues a reset.
if (direct_stream_.local_closed_ && end_stream) {
direct_stream_.hcm_stream_pending_destroy_ = true;
}

// TODO: ***HACK*** currently Envoy sends local replies in cases where an error ought to be
// surfaced via the error path. There are ways we can clean up Envoy's local reply path to
// make this possible, but nothing expedient. For the immediate term this is our only real
Expand Down Expand Up @@ -95,7 +102,19 @@ void Dispatcher::DirectStreamCallbacks::encodeHeaders(const HeaderMap& headers,
void Dispatcher::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool end_stream) {
ENVOY_LOG(debug, "[S{}] response data for stream (length={} end_stream={})",
direct_stream_.stream_handle_, data.length(), end_stream);

// @see Dispatcher::resetStream's comment on its call to runResetCallbacks.
// Envoy only deferredDeletes the ActiveStream if it was fully closed otherwise it issues a reset.
if (direct_stream_.local_closed_ && end_stream) {
direct_stream_.hcm_stream_pending_destroy_ = true;
}

if (!error_code_.has_value()) {
// Testing hook.
if (end_stream) {
http_dispatcher_.synchronizer_.syncPoint("dispatch_encode_final_data");
}

// @see Dispatcher::DirectStream::dispatch_lock_ for why this lock is necessary.
Thread::BasicLockable* mutex = end_stream ? nullptr : &direct_stream_.dispatch_lock_;
Thread::OptionalReleasableLockGuard lock(mutex);
Expand Down Expand Up @@ -125,6 +144,13 @@ void Dispatcher::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool
void Dispatcher::DirectStreamCallbacks::encodeTrailers(const HeaderMap& trailers) {
ENVOY_LOG(debug, "[S{}] response trailers for stream:\n{}", direct_stream_.stream_handle_,
trailers);

// @see Dispatcher::resetStream's comment on its call to runResetCallbacks.
// Envoy only deferredDeletes the ActiveStream if it was fully closed otherwise it issues a reset.
if (direct_stream_.local_closed_) {
direct_stream_.hcm_stream_pending_destroy_ = true;
}

if (direct_stream_.dispatchable(true)) {
ENVOY_LOG(debug, "[S{}] dispatching to platform response trailers for stream:\n{}",
direct_stream_.stream_handle_, trailers);
Expand Down Expand Up @@ -200,7 +226,13 @@ void Dispatcher::DirectStream::resetStream(StreamResetReason reason) {
// resetStream on the response_encoder_'s Stream. It is up to the response_encoder_ to
// runResetCallbacks in order to have the Http::ConnectionManager call doDeferredStreamDestroy in
// ConnectionManagerImpl::ActiveStream::onResetStream.
runResetCallbacks(reason);
//
// @see Dispatcher::resetStream's comment on its call to runResetCallbacks for an explanation if
// this if guard.
if (!hcm_stream_pending_destroy_) {
hcm_stream_pending_destroy_ = true;
runResetCallbacks(reason);
}
callbacks_->onReset();
}

Expand Down Expand Up @@ -378,7 +410,22 @@ envoy_status_t Dispatcher::resetStream(envoy_stream_t stream) {
//
// StreamResetReason::RemoteReset is used as the platform code that issues the
// cancellation is considered the remote.
direct_stream->runResetCallbacks(StreamResetReason::RemoteReset);
//
// This call is guarded by hcm_stream_pending_destroy_ to protect against the
// following race condition:
// 1. resetStream executes first on a platform thread, getting through the dispatch
// guard and posting this lambda.
// 2. The event dispatcher's thread executes a terminal encoding or a reset in the
// Http::ConnectionManager, thus calling deferredDelete on the ActiveStream.
// 3. The event dispatcher's thread executes this post body, thus calling
// runResetCallbacks, which ends up calling deferredDelete (for a second time!) on the
// ActiveStream.
// This protection makes sure that Envoy Mobile's Http::Dispatcher::DirectStream knows
// synchronously when the ActiveStream is deferredDelete'd for the first time.
if (!direct_stream->hcm_stream_pending_destroy_) {
direct_stream->hcm_stream_pending_destroy_ = true;
direct_stream->runResetCallbacks(StreamResetReason::RemoteReset);
}
cleanup(direct_stream->stream_handle_);
}
});
Expand Down
1 change: 1 addition & 0 deletions mobile/library/common/http/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class Dispatcher : public Logger::Loggable<Logger::Id::http> {
Thread::MutexBasicLockable dispatch_lock_;
std::atomic<bool> closed_{};
bool local_closed_{};
bool hcm_stream_pending_destroy_{};

// Used to issue outgoing HTTP stream operations.
RequestDecoder* request_decoder_;
Expand Down
65 changes: 64 additions & 1 deletion mobile/test/integration/dispatcher_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ TEST_P(DispatcherIntegrationTest, Basic) {
envoy_headers c_headers = Http::Utility::toBridgeHeaders(headers);

// Create a stream.
Event::PostCb start_stream_post_cb;
EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS);
http_dispatcher_.sendHeaders(stream, c_headers, true);

Expand All @@ -153,5 +152,69 @@ TEST_P(DispatcherIntegrationTest, Basic) {
ASSERT_EQ(cc.on_complete_calls, 1);
}

TEST_P(DispatcherIntegrationTest, RaceDoesNotCauseDoubleDeletion) {
ConditionalInitializer ready_ran;
test_server_->server().dispatcher().post([this, &ready_ran]() -> void {
http_dispatcher_.ready(
test_server_->server().dispatcher(),
test_server_->server().listenerManager().apiListener()->get().http()->get());
ready_ran.setReady();
});
ready_ran.waitReady();

http_dispatcher_.synchronizer().enable();

envoy_stream_t stream = 1;
// Setup bridge_callbacks to handle the response.
envoy_http_callbacks bridge_callbacks;
callbacks_called cc = {0, 0, 0, 0, 0, nullptr};
bridge_callbacks.context = &cc;
bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream,
void* context) -> void {
ASSERT_FALSE(end_stream);
Http::HeaderMapPtr response_headers = Http::Utility::toInternalHeaders(c_headers);
EXPECT_EQ(response_headers->Status()->value().getStringView(), "200");
callbacks_called* cc = static_cast<callbacks_called*>(context);
cc->on_headers_calls++;
};
bridge_callbacks.on_data = [](envoy_data c_data, bool end_stream, void* context) -> void {
if (end_stream) {
ASSERT_EQ(Http::Utility::convertToString(c_data), "");
} else {
ASSERT_EQ(c_data.length, 10);
}
callbacks_called* cc = static_cast<callbacks_called*>(context);
cc->on_data_calls++;
c_data.release(c_data.context);
};
bridge_callbacks.on_complete = [](void* context) -> void {
callbacks_called* cc = static_cast<callbacks_called*>(context);
cc->on_complete_calls++;
};
bridge_callbacks.on_cancel = [](void* context) -> void {
callbacks_called* cc = static_cast<callbacks_called*>(context);
cc->on_cancel_calls++;
};

// Build a set of request headers.
Http::TestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
envoy_headers c_headers = Http::Utility::toBridgeHeaders(headers);

// Create a stream.
EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS);
http_dispatcher_.synchronizer().waitOn("dispatch_encode_final_data");
http_dispatcher_.sendHeaders(stream, c_headers, true);
http_dispatcher_.synchronizer().barrierOn("dispatch_encode_final_data");
ASSERT_EQ(cc.on_headers_calls, 1);
ASSERT_EQ(cc.on_data_calls, 1);
ASSERT_EQ(cc.on_complete_calls, 0);

ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_SUCCESS);
ASSERT_EQ(cc.on_cancel_calls, 1);

http_dispatcher_.synchronizer().signal("dispatch_encode_final_data");
}

} // namespace
} // namespace Envoy

0 comments on commit 55bcaf7

Please sign in to comment.