diff --git a/mobile/library/common/http/dispatcher.cc b/mobile/library/common/http/dispatcher.cc index 904137c750f8..d6a331991550 100644 --- a/mobile/library/common/http/dispatcher.cc +++ b/mobile/library/common/http/dispatcher.cc @@ -31,6 +31,13 @@ Dispatcher::DirectStreamCallbacks::DirectStreamCallbacks(DirectStream& direct_st void Dispatcher::DirectStreamCallbacks::encodeHeaders(const HeaderMap& headers, bool end_stream) { ENVOY_LOG(debug, "[S{}] response headers for stream (end_stream={}):\n{}", direct_stream_.stream_handle_, end_stream, headers); + + // @see Dispatcher::resetStream's comment on its call to runResetCallbacks. + // Envoy only deferredDeletes the ActiveStream if it was fully closed otherwise it issues a reset. + if (direct_stream_.local_closed_ && end_stream) { + direct_stream_.hcm_stream_pending_destroy_ = true; + } + // TODO: ***HACK*** currently Envoy sends local replies in cases where an error ought to be // surfaced via the error path. There are ways we can clean up Envoy's local reply path to // make this possible, but nothing expedient. For the immediate term this is our only real @@ -95,7 +102,19 @@ void Dispatcher::DirectStreamCallbacks::encodeHeaders(const HeaderMap& headers, void Dispatcher::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool end_stream) { ENVOY_LOG(debug, "[S{}] response data for stream (length={} end_stream={})", direct_stream_.stream_handle_, data.length(), end_stream); + + // @see Dispatcher::resetStream's comment on its call to runResetCallbacks. + // Envoy only deferredDeletes the ActiveStream if it was fully closed otherwise it issues a reset. + if (direct_stream_.local_closed_ && end_stream) { + direct_stream_.hcm_stream_pending_destroy_ = true; + } + if (!error_code_.has_value()) { + // Testing hook. + if (end_stream) { + http_dispatcher_.synchronizer_.syncPoint("dispatch_encode_final_data"); + } + // @see Dispatcher::DirectStream::dispatch_lock_ for why this lock is necessary. Thread::BasicLockable* mutex = end_stream ? nullptr : &direct_stream_.dispatch_lock_; Thread::OptionalReleasableLockGuard lock(mutex); @@ -125,6 +144,13 @@ void Dispatcher::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool void Dispatcher::DirectStreamCallbacks::encodeTrailers(const HeaderMap& trailers) { ENVOY_LOG(debug, "[S{}] response trailers for stream:\n{}", direct_stream_.stream_handle_, trailers); + + // @see Dispatcher::resetStream's comment on its call to runResetCallbacks. + // Envoy only deferredDeletes the ActiveStream if it was fully closed otherwise it issues a reset. + if (direct_stream_.local_closed_) { + direct_stream_.hcm_stream_pending_destroy_ = true; + } + if (direct_stream_.dispatchable(true)) { ENVOY_LOG(debug, "[S{}] dispatching to platform response trailers for stream:\n{}", direct_stream_.stream_handle_, trailers); @@ -200,7 +226,13 @@ void Dispatcher::DirectStream::resetStream(StreamResetReason reason) { // resetStream on the response_encoder_'s Stream. It is up to the response_encoder_ to // runResetCallbacks in order to have the Http::ConnectionManager call doDeferredStreamDestroy in // ConnectionManagerImpl::ActiveStream::onResetStream. - runResetCallbacks(reason); + // + // @see Dispatcher::resetStream's comment on its call to runResetCallbacks for an explanation if + // this if guard. + if (!hcm_stream_pending_destroy_) { + hcm_stream_pending_destroy_ = true; + runResetCallbacks(reason); + } callbacks_->onReset(); } @@ -378,7 +410,22 @@ envoy_status_t Dispatcher::resetStream(envoy_stream_t stream) { // // StreamResetReason::RemoteReset is used as the platform code that issues the // cancellation is considered the remote. - direct_stream->runResetCallbacks(StreamResetReason::RemoteReset); + // + // This call is guarded by hcm_stream_pending_destroy_ to protect against the + // following race condition: + // 1. resetStream executes first on a platform thread, getting through the dispatch + // guard and posting this lambda. + // 2. The event dispatcher's thread executes a terminal encoding or a reset in the + // Http::ConnectionManager, thus calling deferredDelete on the ActiveStream. + // 3. The event dispatcher's thread executes this post body, thus calling + // runResetCallbacks, which ends up calling deferredDelete (for a second time!) on the + // ActiveStream. + // This protection makes sure that Envoy Mobile's Http::Dispatcher::DirectStream knows + // synchronously when the ActiveStream is deferredDelete'd for the first time. + if (!direct_stream->hcm_stream_pending_destroy_) { + direct_stream->hcm_stream_pending_destroy_ = true; + direct_stream->runResetCallbacks(StreamResetReason::RemoteReset); + } cleanup(direct_stream->stream_handle_); } }); diff --git a/mobile/library/common/http/dispatcher.h b/mobile/library/common/http/dispatcher.h index 01d3cb1b2ccd..424c6d68e492 100644 --- a/mobile/library/common/http/dispatcher.h +++ b/mobile/library/common/http/dispatcher.h @@ -179,6 +179,7 @@ class Dispatcher : public Logger::Loggable { Thread::MutexBasicLockable dispatch_lock_; std::atomic closed_{}; bool local_closed_{}; + bool hcm_stream_pending_destroy_{}; // Used to issue outgoing HTTP stream operations. RequestDecoder* request_decoder_; diff --git a/mobile/test/integration/dispatcher_integration_test.cc b/mobile/test/integration/dispatcher_integration_test.cc index ff79f61affe0..8ec13da3e2b1 100644 --- a/mobile/test/integration/dispatcher_integration_test.cc +++ b/mobile/test/integration/dispatcher_integration_test.cc @@ -142,7 +142,6 @@ TEST_P(DispatcherIntegrationTest, Basic) { envoy_headers c_headers = Http::Utility::toBridgeHeaders(headers); // Create a stream. - Event::PostCb start_stream_post_cb; EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); http_dispatcher_.sendHeaders(stream, c_headers, true); @@ -153,5 +152,69 @@ TEST_P(DispatcherIntegrationTest, Basic) { ASSERT_EQ(cc.on_complete_calls, 1); } +TEST_P(DispatcherIntegrationTest, RaceDoesNotCauseDoubleDeletion) { + ConditionalInitializer ready_ran; + test_server_->server().dispatcher().post([this, &ready_ran]() -> void { + http_dispatcher_.ready( + test_server_->server().dispatcher(), + test_server_->server().listenerManager().apiListener()->get().http()->get()); + ready_ran.setReady(); + }); + ready_ran.waitReady(); + + http_dispatcher_.synchronizer().enable(); + + envoy_stream_t stream = 1; + // Setup bridge_callbacks to handle the response. + envoy_http_callbacks bridge_callbacks; + callbacks_called cc = {0, 0, 0, 0, 0, nullptr}; + bridge_callbacks.context = &cc; + bridge_callbacks.on_headers = [](envoy_headers c_headers, bool end_stream, + void* context) -> void { + ASSERT_FALSE(end_stream); + Http::HeaderMapPtr response_headers = Http::Utility::toInternalHeaders(c_headers); + EXPECT_EQ(response_headers->Status()->value().getStringView(), "200"); + callbacks_called* cc = static_cast(context); + cc->on_headers_calls++; + }; + bridge_callbacks.on_data = [](envoy_data c_data, bool end_stream, void* context) -> void { + if (end_stream) { + ASSERT_EQ(Http::Utility::convertToString(c_data), ""); + } else { + ASSERT_EQ(c_data.length, 10); + } + callbacks_called* cc = static_cast(context); + cc->on_data_calls++; + c_data.release(c_data.context); + }; + bridge_callbacks.on_complete = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_complete_calls++; + }; + bridge_callbacks.on_cancel = [](void* context) -> void { + callbacks_called* cc = static_cast(context); + cc->on_cancel_calls++; + }; + + // Build a set of request headers. + Http::TestHeaderMapImpl headers; + HttpTestUtility::addDefaultHeaders(headers); + envoy_headers c_headers = Http::Utility::toBridgeHeaders(headers); + + // Create a stream. + EXPECT_EQ(http_dispatcher_.startStream(stream, bridge_callbacks, {}), ENVOY_SUCCESS); + http_dispatcher_.synchronizer().waitOn("dispatch_encode_final_data"); + http_dispatcher_.sendHeaders(stream, c_headers, true); + http_dispatcher_.synchronizer().barrierOn("dispatch_encode_final_data"); + ASSERT_EQ(cc.on_headers_calls, 1); + ASSERT_EQ(cc.on_data_calls, 1); + ASSERT_EQ(cc.on_complete_calls, 0); + + ASSERT_EQ(http_dispatcher_.resetStream(stream), ENVOY_SUCCESS); + ASSERT_EQ(cc.on_cancel_calls, 1); + + http_dispatcher_.synchronizer().signal("dispatch_encode_final_data"); +} + } // namespace } // namespace Envoy