Skip to content

Commit

Permalink
ext_proc filter crash when sending trailers when request has no body (e…
Browse files Browse the repository at this point in the history
…nvoyproxy#27430)

* ext_proc filter crash when sending trailers without sending body.

Signed-off-by: Yanjun Xiang <[email protected]>
Signed-off-by: Ryan Eskin <[email protected]>
  • Loading branch information
yanjunxiang-google authored and reskin89 committed Jul 11, 2023
1 parent aff50ea commit 6f667da
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 3 deletions.
21 changes: 21 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
}

if (!body_delivered && state.bodyMode() == ProcessingMode::BUFFERED) {
// If no gRPC stream yet, opens it before sending data.
switch (openStream()) {
case StreamOpenState::Error:
return FilterTrailersStatus::StopIteration;
case StreamOpenState::IgnoreError:
return FilterTrailersStatus::Continue;
case StreamOpenState::Ok:
// Fall through
break;
}
// We would like to process the body in a buffered way, but until now the complete
// body has not arrived. With the arrival of trailers, we now know that the body
// has arrived.
Expand Down Expand Up @@ -516,6 +526,17 @@ void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
stats_.stream_msgs_sent_.inc();
}

void Filter::sendBufferedData(ProcessorState& state, ProcessorState::CallbackState new_state,
bool end_stream) {
if (state.hasBufferedData()) {
sendBodyChunk(state, *state.bufferedData(), new_state, end_stream);
} else {
// If there is no buffered data, sends an empty body.
Buffer::OwnedImpl data("");
sendBodyChunk(state, data, new_state, end_stream);
}
}

void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) {
ProcessingRequest req;
auto* trailers_req = state.mutableTrailers(req);
Expand Down
5 changes: 2 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void onNewTimeout(const ProtobufWkt::Duration& override_message_timeout);

void sendBufferedData(ProcessorState& state, ProcessorState::CallbackState new_state,
bool end_stream) {
sendBodyChunk(state, *state.bufferedData(), new_state, end_stream);
}
bool end_stream);

void sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
ProcessorState::CallbackState new_state, bool end_stream);

Expand Down
45 changes: 45 additions & 0 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,27 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
}
}

void processRequestTrailersMessage(
FakeUpstream& grpc_upstream, bool first_message,
absl::optional<std::function<bool(const HttpTrailers&, TrailersResponse&)>> cb) {
ProcessingRequest request;
if (first_message) {
ASSERT_TRUE(grpc_upstream.waitForHttpConnection(*dispatcher_, processor_connection_));
ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_));
}
ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request));
ASSERT_TRUE(request.has_request_trailers());
if (first_message) {
processor_stream_->startGrpcStream();
}
ProcessingResponse response;
auto* body = response.mutable_request_trailers();
const bool sendReply = !cb || (*cb)(request.request_trailers(), *body);
if (sendReply) {
processor_stream_->sendGrpcMessage(response);
}
}

void processResponseHeadersMessage(
FakeUpstream& grpc_upstream, bool first_message,
absl::optional<std::function<bool(const HttpHeaders&, HeadersResponse&)>> cb) {
Expand Down Expand Up @@ -1969,4 +1990,28 @@ TEST_P(ExtProcIntegrationTest, RequestMessageNewTimeoutOutOfBounds) {
newTimeoutWrongConfigTest(override_message_timeout);
}

// Set the ext_proc filter in SKIP header, BUFFERED body mode.
// Send a request with headers and trailers.
TEST_P(ExtProcIntegrationTest, SendHeaderAndTrailerInBufferedMode) {
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED);
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
initializeConfig();
HttpIntegrationTest::initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
auto encoder_decoder = codec_client_->startRequest(headers);
request_encoder_ = &encoder_decoder.first;
IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second);
Http::TestRequestTrailerMapImpl request_trailers{{"request", "trailer"}};
codec_client_->sendTrailers(*request_encoder_, request_trailers);
processRequestBodyMessage(*grpc_upstreams_[0], true, absl::nullopt);
processRequestTrailersMessage(*grpc_upstreams_[0], false, absl::nullopt);
handleUpstreamRequest();
processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt);
verifyDownstreamResponse(*response, 200);
}

} // namespace Envoy

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6f667da

Please sign in to comment.