From 9a625be26b2a1f91693e0ce0fd0417a4bf0e4bed Mon Sep 17 00:00:00 2001 From: Christoph Pakulski Date: Tue, 1 Dec 2020 17:06:12 -0500 Subject: [PATCH] backport to 1.15: udp: properly handle truncated/dropped datagrams (#14122) (#14166) Signed-off-by: Matt Klein Signed-off-by: Christoph Pakulski Co-authored-by: Matt Klein Co-authored-by: Christoph Pakulski --- docs/root/version_history/current.rst | 1 + include/envoy/network/io_handle.h | 3 + source/common/api/win32/os_sys_calls_impl.cc | 12 ++- .../common/network/io_socket_handle_impl.cc | 50 ++++++++--- source/common/network/udp_listener_impl.h | 2 +- source/common/network/utility.cc | 12 ++- test/common/network/udp_listener_impl_test.cc | 87 ++++++++++++++++--- .../quiche/quic_io_handle_wrapper_test.cc | 27 +++--- .../threadsafe_singleton_injector.h | 1 + 9 files changed, 147 insertions(+), 48 deletions(-) diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index bd0c2db54bb3..84ab09df6d44 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -6,3 +6,4 @@ Changes * listener: fix crash when disabling or re-enabling listeners due to overload while processing LDS updates. * proxy_proto: fixed a bug where network filters would not have the correct downstreamRemoteAddress() when accessed from the StreamInfo. This could result in incorrect enforcement of RBAC rules in the RBAC network filter (but not in the RBAC HTTP filter), or incorrect access log addresses from tcp_proxy. * tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers. +* udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash. diff --git a/include/envoy/network/io_handle.h b/include/envoy/network/io_handle.h index bd569c56179e..e989bfe4d01f 100644 --- a/include/envoy/network/io_handle.h +++ b/include/envoy/network/io_handle.h @@ -85,6 +85,9 @@ class IoHandle { Address::InstanceConstSharedPtr peer_address_; // The payload length of this packet. unsigned int msg_len_{0}; + // If true indicates a successful syscall, but the packet was dropped due to truncation. We do + // not support receiving truncated packets. + bool truncated_and_dropped_{false}; }; /** diff --git a/source/common/api/win32/os_sys_calls_impl.cc b/source/common/api/win32/os_sys_calls_impl.cc index 49a05b9fda2a..d66aa4dcee68 100644 --- a/source/common/api/win32/os_sys_calls_impl.cc +++ b/source/common/api/win32/os_sys_calls_impl.cc @@ -145,14 +145,22 @@ SysCallSizeResult OsSysCallsImpl::recv(os_fd_t socket, void* buffer, size_t leng } SysCallSizeResult OsSysCallsImpl::recvmsg(os_fd_t sockfd, msghdr* msg, int flags) { - DWORD bytes_received; + DWORD bytes_received = 0; LPFN_WSARECVMSG recvmsg_fn_ptr = getFnPtrWSARecvMsg(); WSAMSGPtr wsa_msg = msghdrToWSAMSG(msg); // Windows supports only a single flag on input to WSARecvMsg wsa_msg->dwFlags = flags & MSG_PEEK; const int rc = recvmsg_fn_ptr(sockfd, wsa_msg.get(), &bytes_received, nullptr, nullptr); if (rc == SOCKET_ERROR) { - return {-1, ::WSAGetLastError()}; + // We try to match the UNIX behavior for truncated packages. In that case the return code is + // the length of the allocated buffer and we get the value from `dwFlags`. + auto last_error = ::WSAGetLastError(); + if (last_error == WSAEMSGSIZE) { + msg->msg_flags = wsamsg.wsamsg_->dwFlags; + return {bytes_received, 0}; + } + + return {rc, last_error}; } msg->msg_namelen = wsa_msg->namelen; msg->msg_flags = wsa_msg->dwFlags; diff --git a/source/common/network/io_socket_handle_impl.cc b/source/common/network/io_socket_handle_impl.cc index bb30a4a3b204..9e84e165b4e8 100644 --- a/source/common/network/io_socket_handle_impl.cc +++ b/source/common/network/io_socket_handle_impl.cc @@ -13,6 +13,20 @@ using Envoy::Api::SysCallIntResult; using Envoy::Api::SysCallSizeResult; namespace Envoy { + +namespace { +constexpr int messageTruncatedOption() { +#if defined(__APPLE__) + // OSX does not support passing `MSG_TRUNC` to recvmsg and recvmmsg. This does not effect + // functionality and it primarily used for logging. + return 0; +#else + return MSG_TRUNC; +#endif +} + +} // namespace + namespace Network { IoSocketHandleImpl::~IoSocketHandleImpl() { @@ -191,7 +205,7 @@ Address::InstanceConstSharedPtr maybeGetDstAddressFromHeader(const cmsghdr& cmsg return getAddressFromSockAddrOrDie(ss, sizeof(sockaddr_in), fd); } return nullptr; -} +} // namespace Network absl::optional maybeGetPacketsDroppedFromHeader( #ifdef SO_RXQ_OVFL @@ -203,7 +217,7 @@ absl::optional maybeGetPacketsDroppedFromHeader( const cmsghdr&) { #endif return absl::nullopt; -} +} // namespace Envoy Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice, uint32_t self_port, @@ -235,10 +249,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices, hdr.msg_flags = 0; hdr.msg_control = cbuf.begin(); hdr.msg_controllen = cmsg_space_; - const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, 0); + Api::SysCallSizeResult result = + Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, messageTruncatedOption()); if (result.rc_ < 0) { return sysCallResultToIoCallResult(result); } + if ((hdr.msg_flags & MSG_TRUNC) != 0) { + ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", result.rc_); + result.rc_ = 0; + (*output.dropped_packets_)++; + output.msg_[0].truncated_and_dropped_ = true; + return sysCallResultToIoCallResult(result); + } RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0, fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen)); @@ -261,7 +283,8 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices, if (output.dropped_packets_ != nullptr) { absl::optional maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg); if (maybe_dropped) { - *output.dropped_packets_ = *maybe_dropped; + *output.dropped_packets_ += *maybe_dropped; + continue; } } } @@ -307,8 +330,9 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin // Set MSG_WAITFORONE so that recvmmsg will not waiting for // |num_packets_per_mmsg_call| packets to arrive before returning when the // socket is a blocking socket. - const Api::SysCallIntResult result = Api::OsSysCallsSingleton::get().recvmmsg( - fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, MSG_TRUNC | MSG_WAITFORONE, nullptr); + const Api::SysCallIntResult result = + Api::OsSysCallsSingleton::get().recvmmsg(fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, + messageTruncatedOption() | MSG_WAITFORONE, nullptr); if (result.rc_ <= 0) { return sysCallResultToIoCallResult(result); @@ -317,18 +341,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin int num_packets_read = result.rc_; for (int i = 0; i < num_packets_read; ++i) { - if (mmsg_hdr[i].msg_len == 0) { + msghdr& hdr = mmsg_hdr[i].msg_hdr; + if ((hdr.msg_flags & MSG_TRUNC) != 0) { + ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len); + (*output.dropped_packets_)++; + output.msg_[i].truncated_and_dropped_ = true; continue; } - msghdr& hdr = mmsg_hdr[i].msg_hdr; + RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0, fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen)); RELEASE_ASSERT(hdr.msg_namelen > 0, fmt::format("Unable to get remote address from recvmmsg() for fd: {}", fd_)); - if ((hdr.msg_flags & MSG_TRUNC) != 0) { - ENVOY_LOG_MISC(warn, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len); - continue; - } output.msg_[i].msg_len_ = mmsg_hdr[i].msg_len; // Get local and peer addresses for each packet. @@ -354,7 +378,7 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) { absl::optional maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg); if (maybe_dropped) { - *output.dropped_packets_ = *maybe_dropped; + *output.dropped_packets_ += *maybe_dropped; } } } diff --git a/source/common/network/udp_listener_impl.h b/source/common/network/udp_listener_impl.h index 2184b4419c10..8c0f6a4e1735 100644 --- a/source/common/network/udp_listener_impl.h +++ b/source/common/network/udp_listener_impl.h @@ -24,8 +24,8 @@ class UdpListenerImpl : public BaseListenerImpl, public: UdpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket, UdpListenerCallbacks& cb, TimeSource& time_source); - ~UdpListenerImpl() override; + uint32_t packetsDropped() { return packets_dropped_; } // Network::Listener Interface void disable() override; diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index 2cdb3e614de4..20e94b52e142 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -569,6 +569,10 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, uint64_t packets_read = result.rc_; ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read); for (uint64_t i = 0; i < packets_read; ++i) { + if (output.msg_[i].truncated_and_dropped_) { + continue; + } + Buffer::RawSlice* slice = slices[i].data(); const uint64_t msg_len = output.msg_[i].msg_len_; ASSERT(msg_len <= slice->len_); @@ -589,7 +593,7 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, Api::IoCallUint64Result result = handle.recvmsg(&slice, num_slices, local_address.ip()->port(), output); - if (!result.ok()) { + if (!result.ok() || output.msg_[0].truncated_and_dropped_) { return result; } @@ -616,12 +620,6 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle, return std::move(result.err_); } - if (result.rc_ == 0) { - // TODO(conqerAtapple): Is zero length packet interesting? If so add stats - // for it. Otherwise remove the warning log below. - ENVOY_LOG_MISC(trace, "received 0-length packet"); - } - if (packets_dropped != old_packets_dropped) { // The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller // value. So as long as this count differs from previously recorded value, diff --git a/test/common/network/udp_listener_impl_test.cc b/test/common/network/udp_listener_impl_test.cc index 0d139b887e43..e67e762ca30f 100644 --- a/test/common/network/udp_listener_impl_test.cc +++ b/test/common/network/udp_listener_impl_test.cc @@ -32,14 +32,31 @@ namespace Envoy { namespace Network { namespace { +// `UdpGro` is only supported on Linux versions >= 5.0. Also, the +// underlying platform only performs the payload concatenation when +// packets are sent from a network namespace different to that of +// the client. Currently, the testing framework does not support +// this behavior. +// This helper allows to intercept syscalls and +// toggle the behavior as per individual test requirements. +class OverrideOsSysCallsImpl : public Api::OsSysCallsImpl { +public: + MOCK_METHOD(bool, supportsUdpGro, (), (const)); + MOCK_METHOD(bool, supportsMmsg, (), (const)); +}; + class UdpListenerImplTest : public ListenerImplTestBase { public: UdpListenerImplTest() : server_socket_(createServerSocket(true)), send_to_addr_(getServerLoopbackAddress()) { time_system_.advanceTimeWait(std::chrono::milliseconds(100)); } - void SetUp() override { + ON_CALL(override_syscall_, supportsUdpGro()).WillByDefault(Return(false)); + // Return the real version by default. + ON_CALL(override_syscall_, supportsMmsg()) + .WillByDefault(Return(os_calls.latched().supportsMmsg())); + // Set listening socket options. server_socket_->addOptions(SocketOptionFactory::buildIpPacketInfoOptions()); server_socket_->addOptions(SocketOptionFactory::buildRxQueueOverFlowOptions()); @@ -70,7 +87,7 @@ class UdpListenerImplTest : public ListenerImplTestBase { } // Validates receive data, source/destination address and received time. - void validateRecvCallbackParams(const UdpRecvData& data) { + void validateRecvCallbackParams(const UdpRecvData& data, size_t num_packet_per_recv) { ASSERT_NE(data.addresses_.local_, nullptr); ASSERT_NE(data.addresses_.peer_, nullptr); @@ -83,10 +100,6 @@ class UdpListenerImplTest : public ListenerImplTestBase { EXPECT_EQ(*data.addresses_.local_, *send_to_addr_); - size_t num_packet_per_recv = 1u; - if (Api::OsSysCallsSingleton::get().supportsMmsg()) { - num_packet_per_recv = 16u; - } EXPECT_EQ(time_system_.monotonicTime(), data.receive_time_ + std::chrono::milliseconds( @@ -99,9 +112,11 @@ class UdpListenerImplTest : public ListenerImplTestBase { SocketSharedPtr server_socket_; Network::Test::UdpSyncPeer client_{GetParam()}; Address::InstanceConstSharedPtr send_to_addr_; - MockUdpListenerCallbacks listener_callbacks_; + NiceMock listener_callbacks_; std::unique_ptr listener_; size_t num_packets_received_by_listener_{0}; + NiceMock override_syscall_; + TestThreadsafeSingletonInjector os_calls{&override_syscall_}; }; INSTANTIATE_TEST_SUITE_P(IpVersions, UdpListenerImplTest, @@ -144,12 +159,12 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) { EXPECT_CALL(listener_callbacks_, onReadReady()); EXPECT_CALL(listener_callbacks_, onData(_)) .WillOnce(Invoke([&](const UdpRecvData& data) -> void { - validateRecvCallbackParams(data); + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); EXPECT_EQ(data.buffer_->toString(), first); })) .WillOnce(Invoke([&](const UdpRecvData& data) -> void { - validateRecvCallbackParams(data); + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); EXPECT_EQ(data.buffer_->toString(), second); @@ -164,6 +179,54 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) { dispatcher_->run(Event::Dispatcher::RunType::Block); } +// Test a large datagram that gets dropped using recvmmsg if supported. +TEST_P(UdpListenerImplTest, LargeDatagramRecvmmsg) { + // This will get dropped. + const std::string first(4096, 'a'); + client_.write(first, *send_to_addr_); + const std::string second("second"); + client_.write(second, *send_to_addr_); + // This will get dropped. + const std::string third(4096, 'b'); + client_.write(third, *send_to_addr_); + + EXPECT_CALL(listener_callbacks_, onReadReady()); + EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void { + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); + EXPECT_EQ(data.buffer_->toString(), second); + + dispatcher_->exit(); + })); + + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(2, listener_->packetsDropped()); +} + +// Test a large datagram that gets dropped using recvmsg. +TEST_P(UdpListenerImplTest, LargeDatagramRecvmsg) { + ON_CALL(override_syscall_, supportsMmsg()).WillByDefault(Return(false)); + + // This will get dropped. + const std::string first(4096, 'a'); + client_.write(first, *send_to_addr_); + const std::string second("second"); + client_.write(second, *send_to_addr_); + // This will get dropped. + const std::string third(4096, 'b'); + client_.write(third, *send_to_addr_); + + EXPECT_CALL(listener_callbacks_, onReadReady()); + EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void { + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); + EXPECT_EQ(data.buffer_->toString(), second); + + dispatcher_->exit(); + })); + + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(2, listener_->packetsDropped()); +} + /** * Tests UDP listener for read and write callbacks with actual data. */ @@ -185,7 +248,7 @@ TEST_P(UdpListenerImplTest, UdpEcho) { EXPECT_CALL(listener_callbacks_, onReadReady()); EXPECT_CALL(listener_callbacks_, onData(_)) .WillOnce(Invoke([&](const UdpRecvData& data) -> void { - validateRecvCallbackParams(data); + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); test_peer_address = data.addresses_.peer_; @@ -195,7 +258,7 @@ TEST_P(UdpListenerImplTest, UdpEcho) { server_received_data.push_back(data_str); })) .WillRepeatedly(Invoke([&](const UdpRecvData& data) -> void { - validateRecvCallbackParams(data); + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); const std::string data_str = data.buffer_->toString(); EXPECT_EQ(data_str, client_data[num_packets_received_by_listener_ - 1]); @@ -271,7 +334,7 @@ TEST_P(UdpListenerImplTest, UdpListenerEnableDisable) { .Times(2) .WillOnce(Return()) .WillOnce(Invoke([&](const UdpRecvData& data) -> void { - validateRecvCallbackParams(data); + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); EXPECT_EQ(data.buffer_->toString(), second); diff --git a/test/extensions/quic_listeners/quiche/quic_io_handle_wrapper_test.cc b/test/extensions/quic_listeners/quiche/quic_io_handle_wrapper_test.cc index 4357d75edeae..3a45d92fa24f 100644 --- a/test/extensions/quic_listeners/quiche/quic_io_handle_wrapper_test.cc +++ b/test/extensions/quic_listeners/quiche/quic_io_handle_wrapper_test.cc @@ -78,18 +78,19 @@ TEST_F(QuicIoHandleWrapperTest, DelegateIoHandleCalls) { addr = wrapper_->peerAddress(); Network::IoHandle::RecvMsgOutput output(1, nullptr); - EXPECT_CALL(os_sys_calls_, recvmsg(fd, _, 0)).WillOnce(Invoke([](os_fd_t, msghdr* msg, int) { - sockaddr_storage ss; - auto ipv6_addr = reinterpret_cast(&ss); - memset(ipv6_addr, 0, sizeof(sockaddr_in6)); - ipv6_addr->sin6_family = AF_INET6; - ipv6_addr->sin6_addr = in6addr_loopback; - ipv6_addr->sin6_port = htons(54321); - *reinterpret_cast(msg->msg_name) = *ipv6_addr; - msg->msg_namelen = sizeof(sockaddr_in6); - msg->msg_controllen = 0; - return Api::SysCallSizeResult{5u, 0}; - })); + EXPECT_CALL(os_sys_calls_, recvmsg(fd, _, MSG_TRUNC)) + .WillOnce(Invoke([](os_fd_t, msghdr* msg, int) { + sockaddr_storage ss; + auto ipv6_addr = reinterpret_cast(&ss); + memset(ipv6_addr, 0, sizeof(sockaddr_in6)); + ipv6_addr->sin6_family = AF_INET6; + ipv6_addr->sin6_addr = in6addr_loopback; + ipv6_addr->sin6_port = htons(54321); + *reinterpret_cast(msg->msg_name) = *ipv6_addr; + msg->msg_namelen = sizeof(sockaddr_in6); + msg->msg_controllen = 0; + return Api::SysCallSizeResult{5u, 0}; + })); wrapper_->recvmsg(&slice, 1, /*self_port=*/12345, output); size_t num_packet_per_call = 1u; @@ -98,7 +99,7 @@ TEST_F(QuicIoHandleWrapperTest, DelegateIoHandleCalls) { absl::FixedArray({Buffer::RawSlice{data, 5}})); EXPECT_CALL(os_sys_calls_, recvmmsg(fd, _, num_packet_per_call, _, nullptr)) .WillOnce(Invoke([](os_fd_t, struct mmsghdr*, unsigned int, int, struct timespec*) { - return Api::SysCallIntResult{1u, 0}; + return Api::SysCallIntResult{-1, SOCKET_ERROR_AGAIN}; })); wrapper_->recvmmsg(slices, /*self_port=*/12345, output2); diff --git a/test/test_common/threadsafe_singleton_injector.h b/test/test_common/threadsafe_singleton_injector.h index 627efa1390e6..b9a0020dc88a 100644 --- a/test/test_common/threadsafe_singleton_injector.h +++ b/test/test_common/threadsafe_singleton_injector.h @@ -12,6 +12,7 @@ template class TestThreadsafeSingletonInjector { ThreadSafeSingleton::instance_ = instance; } ~TestThreadsafeSingletonInjector() { ThreadSafeSingleton::instance_ = latched_instance_; } + T& latched() { return *latched_instance_; } private: T* latched_instance_;