Skip to content

Commit

Permalink
http inspector:inline the recv in the onAccept (envoyproxy#8111)
Browse files Browse the repository at this point in the history
Description:

There is one behavior change. 
This PR watch Closed type event. `Closed` indicates a FIN is received on the OS supporting this event.
Upon `Closed` event http inspector would parse the last round. If the parser cannot determine the protocol is http this listener would give up since there is no more data from client.
Previous behavior: watch only READ event and hoping peek would return errno.

With this PR: 
1. A poll cycle is saved.
2. A client stream "ping"+FIN flag could pass the listener filter. Not sure this is very useful in production but it helps me debug with ping-pong client server. And this behavior is less surprising.

Risk Level: MID
Testing: UT
Docs Changes:
Release Notes:

Signed-off-by: Yuchen Dai <[email protected]>
  • Loading branch information
lambdai committed Sep 24, 2019
1 parent af9bbf6 commit ec4d791
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 43 deletions.
102 changes: 77 additions & 25 deletions source/extensions/filters/listener/http_inspector/http_inspector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,45 +41,96 @@ Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {
return Network::FilterStatus::Continue;
}

ASSERT(file_event_ == nullptr);

file_event_ = cb.dispatcher().createFileEvent(
socket.ioHandle().fd(),
[this](uint32_t events) {
ASSERT(events == Event::FileReadyType::Read);
onRead();
},
Event::FileTriggerType::Edge, Event::FileReadyType::Read);

cb_ = &cb;
return Network::FilterStatus::StopIteration;
const ParseState parse_state = onRead();
switch (parse_state) {
case ParseState::Error:
// As per discussion in https://github.com/envoyproxy/envoy/issues/7864
// we don't add new enum in FilterStatus so we have to signal the caller
// the new condition.
cb.socket().close();
return Network::FilterStatus::StopIteration;
case ParseState::Done:
return Network::FilterStatus::Continue;
case ParseState::Continue:
// do nothing but create the event
ASSERT(file_event_ == nullptr);
file_event_ = cb.dispatcher().createFileEvent(
socket.ioHandle().fd(),
[this](uint32_t events) {
ENVOY_LOG(trace, "http inspector event: {}", events);
// inspector is always peeking and can never determine EOF.
// Use this event type to avoid listener timeout on the OS supporting
// FileReadyType::Closed.
bool end_stream = events & Event::FileReadyType::Closed;

const ParseState parse_state = onRead();
switch (parse_state) {
case ParseState::Error:
file_event_.reset();
cb_->continueFilterChain(false);
break;
case ParseState::Done:
file_event_.reset();
// Do not skip following listener filters.
cb_->continueFilterChain(true);
break;
case ParseState::Continue:
if (end_stream) {
// Parser fails to determine http but the end of stream is reached. Fallback to
// non-http.
done(false);
file_event_.reset();
cb_->continueFilterChain(true);
}
// do nothing but wait for the next event
break;
}
},
Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Closed);
return Network::FilterStatus::StopIteration;
}
NOT_REACHED_GCOVR_EXCL_LINE
}

void Filter::onRead() {
ParseState Filter::onRead() {
auto& os_syscalls = Api::OsSysCallsSingleton::get();
const Network::ConnectionSocket& socket = cb_->socket();
const Api::SysCallSizeResult result =
os_syscalls.recv(socket.ioHandle().fd(), buf_, Config::MAX_INSPECT_SIZE, MSG_PEEK);
ENVOY_LOG(trace, "http inspector: recv: {}", result.rc_);
if (result.rc_ == -1 && result.errno_ == EAGAIN) {
return;
return ParseState::Continue;
} else if (result.rc_ < 0) {
config_->stats().read_error_.inc();
return done(false);
return ParseState::Error;
}

parseHttpHeader(absl::string_view(reinterpret_cast<const char*>(buf_), result.rc_));
const auto parse_state =
parseHttpHeader(absl::string_view(reinterpret_cast<const char*>(buf_), result.rc_));
switch (parse_state) {
case ParseState::Continue:
// do nothing but wait for the next event
return ParseState::Continue;
case ParseState::Error:
done(false);
return ParseState::Done;
case ParseState::Done:
done(true);
return ParseState::Done;
}
NOT_REACHED_GCOVR_EXCL_LINE
}

void Filter::parseHttpHeader(absl::string_view data) {
ParseState Filter::parseHttpHeader(absl::string_view data) {
const size_t len = std::min(data.length(), Filter::HTTP2_CONNECTION_PREFACE.length());
if (Filter::HTTP2_CONNECTION_PREFACE.compare(0, len, data, 0, len) == 0) {
if (data.length() < Filter::HTTP2_CONNECTION_PREFACE.length()) {
return;
return ParseState::Continue;
}
ENVOY_LOG(trace, "http inspector: http2 connection preface found");
protocol_ = "HTTP/2";
done(true);
return ParseState::Done;
} else {
const size_t pos = data.find_first_of("\r\n");
if (pos != absl::string_view::npos) {
Expand All @@ -90,20 +141,25 @@ void Filter::parseHttpHeader(absl::string_view data) {
// Method SP Request-URI SP HTTP-Version
if (fields.size() != 3) {
ENVOY_LOG(trace, "http inspector: invalid http1x request line");
return done(false);
// done(false);
return ParseState::Error;
}

if (http1xMethods().count(fields[0]) == 0 || httpProtocols().count(fields[2]) == 0) {
ENVOY_LOG(trace, "http inspector: method: {} or protocol: {} not valid", fields[0],
fields[2]);
return done(false);
// done(false);
return ParseState::Error;
}

ENVOY_LOG(trace, "http inspector: method: {}, request uri: {}, protocol: {}", fields[0],
fields[1], fields[2]);

protocol_ = fields[2];
return done(true);
// done(true);
return ParseState::Done;
} else {
return ParseState::Continue;
}
}
}
Expand Down Expand Up @@ -131,10 +187,6 @@ void Filter::done(bool success) {
} else {
config_->stats().http_not_found_.inc();
}

file_event_.reset();
// Do not skip following listener filters.
cb_->continueFilterChain(true);
}

const absl::flat_hash_set<std::string>& Filter::httpProtocols() const {
Expand Down
13 changes: 11 additions & 2 deletions source/extensions/filters/listener/http_inspector/http_inspector.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ struct HttpInspectorStats {
ALL_HTTP_INSPECTOR_STATS(GENERATE_COUNTER_STRUCT)
};

enum class ParseState {
// Parse result is out. It could be http family or empty.
Done,
// Parser expects more data.
Continue,
// Parser reports unrecoverable error.
Error
};

/**
* Global configuration for http inspector.
*/
Expand Down Expand Up @@ -62,9 +71,9 @@ class Filter : public Network::ListenerFilter, Logger::Loggable<Logger::Id::filt
private:
static const absl::string_view HTTP2_CONNECTION_PREFACE;

void onRead();
ParseState onRead();
void done(bool success);
void parseHttpHeader(absl::string_view data);
ParseState parseHttpHeader(absl::string_view data);

const absl::flat_hash_set<std::string>& httpProtocols() const;
const absl::flat_hash_set<std::string>& http1xMethods() const;
Expand Down
101 changes: 85 additions & 16 deletions test/extensions/filters/listener/http_inspector/http_inspector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,26 @@ class HttpInspectorTest : public testing::Test {
io_handle_(std::make_unique<Network::IoSocketHandleImpl>(42)) {}
~HttpInspectorTest() override { io_handle_->close(); }

void init() {
void init(bool include_inline_recv = true) {
filter_ = std::make_unique<Filter>(cfg_);

EXPECT_CALL(cb_, socket()).WillRepeatedly(ReturnRef(socket_));
EXPECT_CALL(socket_, detectedTransportProtocol()).WillRepeatedly(Return("raw_buffer"));
EXPECT_CALL(cb_, dispatcher()).WillRepeatedly(ReturnRef(dispatcher_));
EXPECT_CALL(testing::Const(socket_), ioHandle()).WillRepeatedly(ReturnRef(*io_handle_));

EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::FileTriggerType::Edge, Event::FileReadyType::Read))
.WillOnce(
DoAll(SaveArg<1>(&file_event_callback_), ReturnNew<NiceMock<Event::MockFileEvent>>()));
filter_->onAccept(cb_);
if (include_inline_recv) {
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(Return(Api::SysCallSizeResult{static_cast<ssize_t>(0), 0}));

EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::FileTriggerType::Edge,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(DoAll(SaveArg<1>(&file_event_callback_),
ReturnNew<NiceMock<Event::MockFileEvent>>()));

filter_->onAccept(cb_);
}
}

NiceMock<Api::MockOsSysCalls> os_sys_calls_;
Expand All @@ -68,8 +75,67 @@ TEST_F(HttpInspectorTest, SkipHttpInspectForTLS) {
EXPECT_EQ(filter_->onAccept(cb_), Network::FilterStatus::Continue);
}

TEST_F(HttpInspectorTest, InlineReadIoError) {
init(/*include_inline_recv=*/false);
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(Invoke([](int, void*, size_t, int) -> Api::SysCallSizeResult {
return Api::SysCallSizeResult{ssize_t(-1), 0};
}));
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).Times(0);
EXPECT_CALL(socket_, setRequestedApplicationProtocols(_)).Times(0);
EXPECT_CALL(socket_, close()).Times(1);
auto accepted = filter_->onAccept(cb_);
EXPECT_EQ(accepted, Network::FilterStatus::StopIteration);
// It's arguable if io error should bump the not_found counter
EXPECT_EQ(0, cfg_->stats().http_not_found_.value());
}

TEST_F(HttpInspectorTest, InlineReadInspectHttp10) {
init(/*include_inline_recv=*/false);
const absl::string_view header =
"GET /anything HTTP/1.0\r\nhost: google.com\r\nuser-agent: curl/7.64.0\r\naccept: "
"*/*\r\nx-forwarded-proto: http\r\nx-request-id: "
"a52df4a0-ed00-4a19-86a7-80e5049c6c84\r\nx-envoy-expected-rq-timeout-ms: "
"15000\r\ncontent-length: 0\r\n\r\n";
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(Invoke([&header](int, void* buffer, size_t length, int) -> Api::SysCallSizeResult {
ASSERT(length >= header.size());
memcpy(buffer, header.data(), header.size());
return Api::SysCallSizeResult{ssize_t(header.size()), 0};
}));
const std::vector<absl::string_view> alpn_protos{absl::string_view("http/1.0")};

EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).Times(0);

EXPECT_CALL(socket_, setRequestedApplicationProtocols(alpn_protos));
auto accepted = filter_->onAccept(cb_);
EXPECT_EQ(accepted, Network::FilterStatus::Continue);
EXPECT_EQ(1, cfg_->stats().http10_found_.value());
}

TEST_F(HttpInspectorTest, InlineReadParseError) {
init(/*include_inline_recv=*/false);
const absl::string_view header =
"NOT_A_LEGAL_PREFIX /anything HTTP/1.0\r\nhost: google.com\r\nuser-agent: "
"curl/7.64.0\r\naccept: "
"*/*\r\nx-forwarded-proto: http\r\nx-request-id: "
"a52df4a0-ed00-4a19-86a7-80e5049c6c84\r\nx-envoy-expected-rq-timeout-ms: "
"15000\r\ncontent-length: 0\r\n\r\n";
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(Invoke([&header](int, void* buffer, size_t length, int) -> Api::SysCallSizeResult {
ASSERT(length >= header.size());
memcpy(buffer, header.data(), header.size());
return Api::SysCallSizeResult{ssize_t(header.size()), 0};
}));
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).Times(0);
EXPECT_CALL(socket_, setRequestedApplicationProtocols(_)).Times(0);
auto accepted = filter_->onAccept(cb_);
EXPECT_EQ(accepted, Network::FilterStatus::Continue);
EXPECT_EQ(1, cfg_->stats().http_not_found_.value());
}

TEST_F(HttpInspectorTest, InspectHttp10) {
init();
init(true);
const absl::string_view header =
"GET /anything HTTP/1.0\r\nhost: google.com\r\nuser-agent: curl/7.64.0\r\naccept: "
"*/*\r\nx-forwarded-proto: http\r\nx-request-id: "
Expand Down Expand Up @@ -232,7 +298,7 @@ TEST_F(HttpInspectorTest, ReadError) {
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK)).WillOnce(InvokeWithoutArgs([]() {
return Api::SysCallSizeResult{ssize_t(-1), ENOTSUP};
}));
EXPECT_CALL(cb_, continueFilterChain(true));
EXPECT_CALL(cb_, continueFilterChain(false));
file_event_callback_(Event::FileReadyType::Read);
EXPECT_EQ(1, cfg_->stats().read_error_.value());
}
Expand Down Expand Up @@ -386,22 +452,25 @@ TEST_F(HttpInspectorTest, MultipleReadsHttp1BadProtocol) {

init();

const absl::string_view data = "GET /index HTT\r";
const std::string valid_header = "GET /index HTTP/1.1\r";
// offset: 0 10
const std::string truncate_header = valid_header.substr(0, 14).append("\r");

{
InSequence s;

EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK)).WillOnce(InvokeWithoutArgs([]() {
return Api::SysCallSizeResult{ssize_t(-1), EAGAIN};
}));

for (size_t i = 1; i <= data.length(); i++) {
for (size_t i = 1; i <= truncate_header.length(); i++) {
EXPECT_CALL(os_sys_calls_, recv(42, _, _, MSG_PEEK))
.WillOnce(
Invoke([&data, i](int, void* buffer, size_t length, int) -> Api::SysCallSizeResult {
ASSERT(length >= data.size());
memcpy(buffer, data.data(), data.size());
return Api::SysCallSizeResult{ssize_t(i), 0};
}));
.WillOnce(Invoke([&truncate_header, i](int, void* buffer, size_t length,
int) -> Api::SysCallSizeResult {
ASSERT(length >= truncate_header.size());
memcpy(buffer, truncate_header.data(), truncate_header.size());
return Api::SysCallSizeResult{ssize_t(i), 0};
}));
}
}

Expand Down

0 comments on commit ec4d791

Please sign in to comment.