Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thrift proxy: fix crash when using payload_passthrough #14723

Merged
Merged
20 changes: 12 additions & 8 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ Network::FilterStatus ConnectionManager::onData(Buffer::Instance& data, bool end
// Downstream has closed. Unless we're waiting for an upstream connection to complete a oneway
// request, close. The special case for oneway requests allows them to complete before the
// ConnectionManager is destroyed.
if (stopped_) {
ASSERT(!rpcs_.empty());
if (stopped_ && !rpcs_.empty()) {
MessageMetadata& metadata = *(*rpcs_.begin())->metadata_;
ASSERT(metadata.hasMessageType());
if (metadata.messageType() == MessageType::Oneway) {
Expand Down Expand Up @@ -181,9 +180,11 @@ bool ConnectionManager::passthroughEnabled() const {
return false;
}

// This is called right after the metadata has been parsed, and the ActiveRpc being processed must
// be in the rpcs_ list.
ASSERT(!rpcs_.empty());
// If the rpcs list is empty, a local response happened.
if (rpcs_.empty()) {
return false;
}

return (*rpcs_.begin())->passthroughSupported();
}

Expand Down Expand Up @@ -303,6 +304,8 @@ void ConnectionManager::ActiveRpcDecoderFilter::continueDecoding() {
}

FilterStatus ConnectionManager::ActiveRpc::applyDecoderFilters(ActiveRpcDecoderFilter* filter) {
FilterStatus ret = FilterStatus::Continue;

ASSERT(filter_action_ != nullptr);

if (!local_response_sent_) {
Expand All @@ -323,8 +326,9 @@ FilterStatus ConnectionManager::ActiveRpc::applyDecoderFilters(ActiveRpcDecoderF
for (; entry != decoder_filters_.end(); entry++) {
const FilterStatus status = filter_action_((*entry)->handle_.get());
if (local_response_sent_) {
// The filter called sendLocalReply: stop processing filters and return
// FilterStatus::Continue irrespective of the current result.
// The filter called sendLocalReply: stop processing filters.
ret = FilterStatus::StopIteration;
finalizeRequest();
break;
}

Expand All @@ -337,7 +341,7 @@ FilterStatus ConnectionManager::ActiveRpc::applyDecoderFilters(ActiveRpcDecoderF
filter_action_ = nullptr;
filter_context_.reset();

return FilterStatus::Continue;
return ret;
}

FilterStatus ConnectionManager::ActiveRpc::transportBegin(MessageMetadataSharedPtr metadata) {
Expand Down