Skip to content

Commit

Permalink
backport to 1.15: udp: properly handle truncated/dropped datagrams (#…
Browse files Browse the repository at this point in the history
…14122) (#14166)

Signed-off-by: Matt Klein <[email protected]>
Signed-off-by: Christoph Pakulski <[email protected]>
Co-authored-by: Matt Klein <[email protected]>
Co-authored-by: Christoph Pakulski <[email protected]>
  • Loading branch information
cpakulski and mattklein123 authored Dec 1, 2020
1 parent 4204341 commit 9a625be
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 48 deletions.
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Changes
* listener: fix crash when disabling or re-enabling listeners due to overload while processing LDS updates.
* proxy_proto: fixed a bug where network filters would not have the correct downstreamRemoteAddress() when accessed from the StreamInfo. This could result in incorrect enforcement of RBAC rules in the RBAC network filter (but not in the RBAC HTTP filter), or incorrect access log addresses from tcp_proxy.
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.
* udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash.
3 changes: 3 additions & 0 deletions include/envoy/network/io_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class IoHandle {
Address::InstanceConstSharedPtr peer_address_;
// The payload length of this packet.
unsigned int msg_len_{0};
// If true indicates a successful syscall, but the packet was dropped due to truncation. We do
// not support receiving truncated packets.
bool truncated_and_dropped_{false};
};

/**
Expand Down
12 changes: 10 additions & 2 deletions source/common/api/win32/os_sys_calls_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,22 @@ SysCallSizeResult OsSysCallsImpl::recv(os_fd_t socket, void* buffer, size_t leng
}

SysCallSizeResult OsSysCallsImpl::recvmsg(os_fd_t sockfd, msghdr* msg, int flags) {
DWORD bytes_received;
DWORD bytes_received = 0;
LPFN_WSARECVMSG recvmsg_fn_ptr = getFnPtrWSARecvMsg();
WSAMSGPtr wsa_msg = msghdrToWSAMSG(msg);
// Windows supports only a single flag on input to WSARecvMsg
wsa_msg->dwFlags = flags & MSG_PEEK;
const int rc = recvmsg_fn_ptr(sockfd, wsa_msg.get(), &bytes_received, nullptr, nullptr);
if (rc == SOCKET_ERROR) {
return {-1, ::WSAGetLastError()};
// We try to match the UNIX behavior for truncated packages. In that case the return code is
// the length of the allocated buffer and we get the value from `dwFlags`.
auto last_error = ::WSAGetLastError();
if (last_error == WSAEMSGSIZE) {
msg->msg_flags = wsamsg.wsamsg_->dwFlags;
return {bytes_received, 0};
}

return {rc, last_error};
}
msg->msg_namelen = wsa_msg->namelen;
msg->msg_flags = wsa_msg->dwFlags;
Expand Down
50 changes: 37 additions & 13 deletions source/common/network/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ using Envoy::Api::SysCallIntResult;
using Envoy::Api::SysCallSizeResult;

namespace Envoy {

namespace {
constexpr int messageTruncatedOption() {
#if defined(__APPLE__)
// OSX does not support passing `MSG_TRUNC` to recvmsg and recvmmsg. This does not effect
// functionality and it primarily used for logging.
return 0;
#else
return MSG_TRUNC;
#endif
}

} // namespace

namespace Network {

IoSocketHandleImpl::~IoSocketHandleImpl() {
Expand Down Expand Up @@ -191,7 +205,7 @@ Address::InstanceConstSharedPtr maybeGetDstAddressFromHeader(const cmsghdr& cmsg
return getAddressFromSockAddrOrDie(ss, sizeof(sockaddr_in), fd);
}
return nullptr;
}
} // namespace Network

absl::optional<uint32_t> maybeGetPacketsDroppedFromHeader(
#ifdef SO_RXQ_OVFL
Expand All @@ -203,7 +217,7 @@ absl::optional<uint32_t> maybeGetPacketsDroppedFromHeader(
const cmsghdr&) {
#endif
return absl::nullopt;
}
} // namespace Envoy

Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
const uint64_t num_slice, uint32_t self_port,
Expand Down Expand Up @@ -235,10 +249,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
hdr.msg_flags = 0;
hdr.msg_control = cbuf.begin();
hdr.msg_controllen = cmsg_space_;
const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, 0);
Api::SysCallSizeResult result =
Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, messageTruncatedOption());
if (result.rc_ < 0) {
return sysCallResultToIoCallResult(result);
}
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", result.rc_);
result.rc_ = 0;
(*output.dropped_packets_)++;
output.msg_[0].truncated_and_dropped_ = true;
return sysCallResultToIoCallResult(result);
}

RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
Expand All @@ -261,7 +283,8 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
if (output.dropped_packets_ != nullptr) {
absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
if (maybe_dropped) {
*output.dropped_packets_ = *maybe_dropped;
*output.dropped_packets_ += *maybe_dropped;
continue;
}
}
}
Expand Down Expand Up @@ -307,8 +330,9 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
// Set MSG_WAITFORONE so that recvmmsg will not waiting for
// |num_packets_per_mmsg_call| packets to arrive before returning when the
// socket is a blocking socket.
const Api::SysCallIntResult result = Api::OsSysCallsSingleton::get().recvmmsg(
fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, MSG_TRUNC | MSG_WAITFORONE, nullptr);
const Api::SysCallIntResult result =
Api::OsSysCallsSingleton::get().recvmmsg(fd_, mmsg_hdr.data(), num_packets_per_mmsg_call,
messageTruncatedOption() | MSG_WAITFORONE, nullptr);

if (result.rc_ <= 0) {
return sysCallResultToIoCallResult(result);
Expand All @@ -317,18 +341,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
int num_packets_read = result.rc_;

for (int i = 0; i < num_packets_read; ++i) {
if (mmsg_hdr[i].msg_len == 0) {
msghdr& hdr = mmsg_hdr[i].msg_hdr;
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len);
(*output.dropped_packets_)++;
output.msg_[i].truncated_and_dropped_ = true;
continue;
}
msghdr& hdr = mmsg_hdr[i].msg_hdr;

RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
RELEASE_ASSERT(hdr.msg_namelen > 0,
fmt::format("Unable to get remote address from recvmmsg() for fd: {}", fd_));
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(warn, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len);
continue;
}

output.msg_[i].msg_len_ = mmsg_hdr[i].msg_len;
// Get local and peer addresses for each packet.
Expand All @@ -354,7 +378,7 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
if (maybe_dropped) {
*output.dropped_packets_ = *maybe_dropped;
*output.dropped_packets_ += *maybe_dropped;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class UdpListenerImpl : public BaseListenerImpl,
public:
UdpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket,
UdpListenerCallbacks& cb, TimeSource& time_source);

~UdpListenerImpl() override;
uint32_t packetsDropped() { return packets_dropped_; }

// Network::Listener Interface
void disable() override;
Expand Down
12 changes: 5 additions & 7 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,10 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
uint64_t packets_read = result.rc_;
ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
for (uint64_t i = 0; i < packets_read; ++i) {
if (output.msg_[i].truncated_and_dropped_) {
continue;
}

Buffer::RawSlice* slice = slices[i].data();
const uint64_t msg_len = output.msg_[i].msg_len_;
ASSERT(msg_len <= slice->len_);
Expand All @@ -589,7 +593,7 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
Api::IoCallUint64Result result =
handle.recvmsg(&slice, num_slices, local_address.ip()->port(), output);

if (!result.ok()) {
if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
return result;
}

Expand All @@ -616,12 +620,6 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
return std::move(result.err_);
}

if (result.rc_ == 0) {
// TODO(conqerAtapple): Is zero length packet interesting? If so add stats
// for it. Otherwise remove the warning log below.
ENVOY_LOG_MISC(trace, "received 0-length packet");
}

if (packets_dropped != old_packets_dropped) {
// The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
// value. So as long as this count differs from previously recorded value,
Expand Down
87 changes: 75 additions & 12 deletions test/common/network/udp_listener_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,31 @@ namespace Envoy {
namespace Network {
namespace {

// `UdpGro` is only supported on Linux versions >= 5.0. Also, the
// underlying platform only performs the payload concatenation when
// packets are sent from a network namespace different to that of
// the client. Currently, the testing framework does not support
// this behavior.
// This helper allows to intercept syscalls and
// toggle the behavior as per individual test requirements.
class OverrideOsSysCallsImpl : public Api::OsSysCallsImpl {
public:
MOCK_METHOD(bool, supportsUdpGro, (), (const));
MOCK_METHOD(bool, supportsMmsg, (), (const));
};

class UdpListenerImplTest : public ListenerImplTestBase {
public:
UdpListenerImplTest()
: server_socket_(createServerSocket(true)), send_to_addr_(getServerLoopbackAddress()) {
time_system_.advanceTimeWait(std::chrono::milliseconds(100));
}

void SetUp() override {
ON_CALL(override_syscall_, supportsUdpGro()).WillByDefault(Return(false));
// Return the real version by default.
ON_CALL(override_syscall_, supportsMmsg())
.WillByDefault(Return(os_calls.latched().supportsMmsg()));

// Set listening socket options.
server_socket_->addOptions(SocketOptionFactory::buildIpPacketInfoOptions());
server_socket_->addOptions(SocketOptionFactory::buildRxQueueOverFlowOptions());
Expand Down Expand Up @@ -70,7 +87,7 @@ class UdpListenerImplTest : public ListenerImplTestBase {
}

// Validates receive data, source/destination address and received time.
void validateRecvCallbackParams(const UdpRecvData& data) {
void validateRecvCallbackParams(const UdpRecvData& data, size_t num_packet_per_recv) {
ASSERT_NE(data.addresses_.local_, nullptr);

ASSERT_NE(data.addresses_.peer_, nullptr);
Expand All @@ -83,10 +100,6 @@ class UdpListenerImplTest : public ListenerImplTestBase {

EXPECT_EQ(*data.addresses_.local_, *send_to_addr_);

size_t num_packet_per_recv = 1u;
if (Api::OsSysCallsSingleton::get().supportsMmsg()) {
num_packet_per_recv = 16u;
}
EXPECT_EQ(time_system_.monotonicTime(),
data.receive_time_ +
std::chrono::milliseconds(
Expand All @@ -99,9 +112,11 @@ class UdpListenerImplTest : public ListenerImplTestBase {
SocketSharedPtr server_socket_;
Network::Test::UdpSyncPeer client_{GetParam()};
Address::InstanceConstSharedPtr send_to_addr_;
MockUdpListenerCallbacks listener_callbacks_;
NiceMock<MockUdpListenerCallbacks> listener_callbacks_;
std::unique_ptr<UdpListenerImpl> listener_;
size_t num_packets_received_by_listener_{0};
NiceMock<OverrideOsSysCallsImpl> override_syscall_;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls{&override_syscall_};
};

INSTANTIATE_TEST_SUITE_P(IpVersions, UdpListenerImplTest,
Expand Down Expand Up @@ -144,12 +159,12 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) {
EXPECT_CALL(listener_callbacks_, onReadReady());
EXPECT_CALL(listener_callbacks_, onData(_))
.WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data);
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);

EXPECT_EQ(data.buffer_->toString(), first);
}))
.WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data);
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);

EXPECT_EQ(data.buffer_->toString(), second);

Expand All @@ -164,6 +179,54 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) {
dispatcher_->run(Event::Dispatcher::RunType::Block);
}

// Test a large datagram that gets dropped using recvmmsg if supported.
TEST_P(UdpListenerImplTest, LargeDatagramRecvmmsg) {
// This will get dropped.
const std::string first(4096, 'a');
client_.write(first, *send_to_addr_);
const std::string second("second");
client_.write(second, *send_to_addr_);
// This will get dropped.
const std::string third(4096, 'b');
client_.write(third, *send_to_addr_);

EXPECT_CALL(listener_callbacks_, onReadReady());
EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);
EXPECT_EQ(data.buffer_->toString(), second);

dispatcher_->exit();
}));

dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_EQ(2, listener_->packetsDropped());
}

// Test a large datagram that gets dropped using recvmsg.
TEST_P(UdpListenerImplTest, LargeDatagramRecvmsg) {
ON_CALL(override_syscall_, supportsMmsg()).WillByDefault(Return(false));

// This will get dropped.
const std::string first(4096, 'a');
client_.write(first, *send_to_addr_);
const std::string second("second");
client_.write(second, *send_to_addr_);
// This will get dropped.
const std::string third(4096, 'b');
client_.write(third, *send_to_addr_);

EXPECT_CALL(listener_callbacks_, onReadReady());
EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);
EXPECT_EQ(data.buffer_->toString(), second);

dispatcher_->exit();
}));

dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_EQ(2, listener_->packetsDropped());
}

/**
* Tests UDP listener for read and write callbacks with actual data.
*/
Expand All @@ -185,7 +248,7 @@ TEST_P(UdpListenerImplTest, UdpEcho) {
EXPECT_CALL(listener_callbacks_, onReadReady());
EXPECT_CALL(listener_callbacks_, onData(_))
.WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data);
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);

test_peer_address = data.addresses_.peer_;

Expand All @@ -195,7 +258,7 @@ TEST_P(UdpListenerImplTest, UdpEcho) {
server_received_data.push_back(data_str);
}))
.WillRepeatedly(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data);
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);

const std::string data_str = data.buffer_->toString();
EXPECT_EQ(data_str, client_data[num_packets_received_by_listener_ - 1]);
Expand Down Expand Up @@ -271,7 +334,7 @@ TEST_P(UdpListenerImplTest, UdpListenerEnableDisable) {
.Times(2)
.WillOnce(Return())
.WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data);
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);

EXPECT_EQ(data.buffer_->toString(), second);

Expand Down
Loading

0 comments on commit 9a625be

Please sign in to comment.