diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index c0ac992bb143..460757b11d7d 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -37,6 +37,7 @@ Bug Fixes * proxy_proto: fixed a bug where the wrong downstream address got sent to upstream connections. * tls: fix detection of the upstream connection close event. * tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers. +* udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash. * watchdog: touch the watchdog before most event loop operations to avoid misses when handling bursts of callbacks. Removed Config or Runtime diff --git a/include/envoy/network/io_handle.h b/include/envoy/network/io_handle.h index 3547ed14c93c..de530474592a 100644 --- a/include/envoy/network/io_handle.h +++ b/include/envoy/network/io_handle.h @@ -118,6 +118,9 @@ class IoHandle { unsigned int msg_len_{0}; // The gso_size, if specified in the transport header unsigned int gso_size_{0}; + // If true indicates a successful syscall, but the packet was dropped due to truncation. We do + // not support receiving truncated packets. + bool truncated_and_dropped_{false}; }; /** diff --git a/source/common/api/win32/os_sys_calls_impl.cc b/source/common/api/win32/os_sys_calls_impl.cc index 5169f6855b50..f0f9768dc0c1 100644 --- a/source/common/api/win32/os_sys_calls_impl.cc +++ b/source/common/api/win32/os_sys_calls_impl.cc @@ -144,14 +144,22 @@ SysCallSizeResult OsSysCallsImpl::recv(os_fd_t socket, void* buffer, size_t leng } SysCallSizeResult OsSysCallsImpl::recvmsg(os_fd_t sockfd, msghdr* msg, int flags) { - DWORD bytes_received; + DWORD bytes_received = 0; LPFN_WSARECVMSG recvmsg_fn_ptr = getFnPtrWSARecvMsg(); wsamsgResult wsamsg = msghdrToWSAMSG(msg); // Windows supports only a single flag on input to WSARecvMsg wsamsg.wsamsg_->dwFlags = flags & MSG_PEEK; const int rc = recvmsg_fn_ptr(sockfd, wsamsg.wsamsg_.get(), &bytes_received, nullptr, nullptr); if (rc == SOCKET_ERROR) { - return {-1, ::WSAGetLastError()}; + // We try to match the UNIX behavior for truncated packages. In that case the return code is + // the length of the allocated buffer and we get the value from `dwFlags`. + auto last_error = ::WSAGetLastError(); + if (last_error == WSAEMSGSIZE) { + msg->msg_flags = wsamsg.wsamsg_->dwFlags; + return {bytes_received, 0}; + } + + return {rc, last_error}; } msg->msg_namelen = wsamsg.wsamsg_->namelen; msg->msg_flags = wsamsg.wsamsg_->dwFlags; diff --git a/source/common/network/io_socket_handle_impl.cc b/source/common/network/io_socket_handle_impl.cc index 4090824a4ef1..508ce6400aac 100644 --- a/source/common/network/io_socket_handle_impl.cc +++ b/source/common/network/io_socket_handle_impl.cc @@ -48,6 +48,16 @@ in_addr addressFromMessage(const cmsghdr& cmsg) { #endif } +constexpr int messageTruncatedOption() { +#if defined(__APPLE__) + // OSX does not support passing `MSG_TRUNC` to recvmsg and recvmmsg. This does not effect + // functionality and it primarily used for logging. + return 0; +#else + return MSG_TRUNC; +#endif +} + } // namespace namespace Network { @@ -350,7 +360,8 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices, hdr.msg_flags = 0; hdr.msg_control = cbuf.begin(); hdr.msg_controllen = cmsg_space_; - const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, 0); + Api::SysCallSizeResult result = + Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, messageTruncatedOption()); if (result.rc_ < 0) { auto io_result = sysCallResultToIoCallResult(result); // Emulated edge events need to registered if the socket operation did not complete @@ -362,6 +373,13 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices, } return io_result; } + if ((hdr.msg_flags & MSG_TRUNC) != 0) { + ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", result.rc_); + result.rc_ = 0; + (*output.dropped_packets_)++; + output.msg_[0].truncated_and_dropped_ = true; + return sysCallResultToIoCallResult(result); + } RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0, fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen)); @@ -386,7 +404,7 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices, if (output.dropped_packets_ != nullptr) { absl::optional maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg); if (maybe_dropped) { - *output.dropped_packets_ = *maybe_dropped; + *output.dropped_packets_ += *maybe_dropped; continue; } } @@ -439,8 +457,9 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin // Set MSG_WAITFORONE so that recvmmsg will not waiting for // |num_packets_per_mmsg_call| packets to arrive before returning when the // socket is a blocking socket. - const Api::SysCallIntResult result = Api::OsSysCallsSingleton::get().recvmmsg( - fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, MSG_TRUNC | MSG_WAITFORONE, nullptr); + const Api::SysCallIntResult result = + Api::OsSysCallsSingleton::get().recvmmsg(fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, + messageTruncatedOption() | MSG_WAITFORONE, nullptr); if (result.rc_ <= 0) { auto io_result = sysCallResultToIoCallResult(result); @@ -457,18 +476,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin int num_packets_read = result.rc_; for (int i = 0; i < num_packets_read; ++i) { - if (mmsg_hdr[i].msg_len == 0) { + msghdr& hdr = mmsg_hdr[i].msg_hdr; + if ((hdr.msg_flags & MSG_TRUNC) != 0) { + ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len); + (*output.dropped_packets_)++; + output.msg_[i].truncated_and_dropped_ = true; continue; } - msghdr& hdr = mmsg_hdr[i].msg_hdr; + RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0, fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen)); RELEASE_ASSERT(hdr.msg_namelen > 0, fmt::format("Unable to get remote address from recvmmsg() for fd: {}", fd_)); - if ((hdr.msg_flags & MSG_TRUNC) != 0) { - ENVOY_LOG_MISC(warn, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len); - continue; - } output.msg_[i].msg_len_ = mmsg_hdr[i].msg_len; // Get local and peer addresses for each packet. @@ -494,7 +513,7 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) { absl::optional maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg); if (maybe_dropped) { - *output.dropped_packets_ = *maybe_dropped; + *output.dropped_packets_ += *maybe_dropped; } } } diff --git a/source/common/network/udp_listener_impl.h b/source/common/network/udp_listener_impl.h index 99db6dca4ac0..a6b2852b3990 100644 --- a/source/common/network/udp_listener_impl.h +++ b/source/common/network/udp_listener_impl.h @@ -24,8 +24,8 @@ class UdpListenerImpl : public BaseListenerImpl, public: UdpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket, UdpListenerCallbacks& cb, TimeSource& time_source); - ~UdpListenerImpl() override; + uint32_t packetsDropped() { return packets_dropped_; } // Network::Listener Interface void disable() override; diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index 426598ace956..0574c040a1d1 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -629,6 +629,10 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, uint64_t packets_read = result.rc_; ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read); for (uint64_t i = 0; i < packets_read; ++i) { + if (output.msg_[i].truncated_and_dropped_) { + continue; + } + Buffer::RawSlice* slice = slices[i].data(); const uint64_t msg_len = output.msg_[i].msg_len_; ASSERT(msg_len <= slice->len_); @@ -651,7 +655,7 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, Api::IoCallUint64Result result = receiveMessage(udp_packet_processor.maxPacketSize(), buffer, output, handle, local_address); - if (!result.ok()) { + if (!result.ok() || output.msg_[0].truncated_and_dropped_) { return result; } @@ -678,12 +682,6 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle, return std::move(result.err_); } - if (result.rc_ == 0) { - // TODO(conqerAtapple): Is zero length packet interesting? If so add stats - // for it. Otherwise remove the warning log below. - ENVOY_LOG_MISC(trace, "received 0-length packet"); - } - if (packets_dropped != old_packets_dropped) { // The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller // value. So as long as this count differs from previously recorded value, diff --git a/test/common/network/udp_listener_impl_test.cc b/test/common/network/udp_listener_impl_test.cc index 1ee20f8e93a9..b6e4bd670510 100644 --- a/test/common/network/udp_listener_impl_test.cc +++ b/test/common/network/udp_listener_impl_test.cc @@ -40,17 +40,21 @@ namespace { // packets are sent from a network namespace different to that of // the client. Currently, the testing framework does not support // this behavior. -// This helper allows to intercept the supportsUdpGro syscall and -// toggle the gro behavior as per individual test requirements. -class MockSupportsUdpGro : public Api::OsSysCallsImpl { +// This helper allows to intercept syscalls and +// toggle the behavior as per individual test requirements. +class OverrideOsSysCallsImpl : public Api::OsSysCallsImpl { public: MOCK_METHOD(bool, supportsUdpGro, (), (const)); + MOCK_METHOD(bool, supportsMmsg, (), (const)); }; class UdpListenerImplTest : public UdpListenerImplTestBase { public: void SetUp() override { - ON_CALL(udp_gro_syscall_, supportsUdpGro()).WillByDefault(Return(false)); + ON_CALL(override_syscall_, supportsUdpGro()).WillByDefault(Return(false)); + // Return the real version by default. + ON_CALL(override_syscall_, supportsMmsg()) + .WillByDefault(Return(os_calls.latched().supportsMmsg())); // Set listening socket options. server_socket_->addOptions(SocketOptionFactory::buildIpPacketInfoOptions()); @@ -64,8 +68,8 @@ class UdpListenerImplTest : public UdpListenerImplTestBase { ON_CALL(listener_callbacks_, udpPacketWriter()).WillByDefault(ReturnRef(*udp_packet_writer_)); } - NiceMock udp_gro_syscall_; - TestThreadsafeSingletonInjector os_calls{&udp_gro_syscall_}; + NiceMock override_syscall_; + TestThreadsafeSingletonInjector os_calls{&override_syscall_}; }; INSTANTIATE_TEST_SUITE_P(IpVersions, UdpListenerImplTest, @@ -126,6 +130,54 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) { dispatcher_->run(Event::Dispatcher::RunType::Block); } +// Test a large datagram that gets dropped using recvmmsg if supported. +TEST_P(UdpListenerImplTest, LargeDatagramRecvmmsg) { + // This will get dropped. + const std::string first(4096, 'a'); + client_.write(first, *send_to_addr_); + const std::string second("second"); + client_.write(second, *send_to_addr_); + // This will get dropped. + const std::string third(4096, 'b'); + client_.write(third, *send_to_addr_); + + EXPECT_CALL(listener_callbacks_, onReadReady()); + EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void { + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); + EXPECT_EQ(data.buffer_->toString(), second); + + dispatcher_->exit(); + })); + + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(2, listener_->packetsDropped()); +} + +// Test a large datagram that gets dropped using recvmsg. +TEST_P(UdpListenerImplTest, LargeDatagramRecvmsg) { + ON_CALL(override_syscall_, supportsMmsg()).WillByDefault(Return(false)); + + // This will get dropped. + const std::string first(4096, 'a'); + client_.write(first, *send_to_addr_); + const std::string second("second"); + client_.write(second, *send_to_addr_); + // This will get dropped. + const std::string third(4096, 'b'); + client_.write(third, *send_to_addr_); + + EXPECT_CALL(listener_callbacks_, onReadReady()); + EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void { + validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u); + EXPECT_EQ(data.buffer_->toString(), second); + + dispatcher_->exit(); + })); + + dispatcher_->run(Event::Dispatcher::RunType::Block); + EXPECT_EQ(2, listener_->packetsDropped()); +} + /** * Tests UDP listener for read and write callbacks with actual data. */ diff --git a/test/extensions/quic_listeners/quiche/quic_io_handle_wrapper_test.cc b/test/extensions/quic_listeners/quiche/quic_io_handle_wrapper_test.cc index da1d7b1aeae5..7a711ef39d65 100644 --- a/test/extensions/quic_listeners/quiche/quic_io_handle_wrapper_test.cc +++ b/test/extensions/quic_listeners/quiche/quic_io_handle_wrapper_test.cc @@ -77,18 +77,19 @@ TEST_F(QuicIoHandleWrapperTest, DelegateIoHandleCalls) { addr = wrapper_->peerAddress(); Network::IoHandle::RecvMsgOutput output(1, nullptr); - EXPECT_CALL(os_sys_calls_, recvmsg(fd, _, 0)).WillOnce(Invoke([](os_fd_t, msghdr* msg, int) { - sockaddr_storage ss; - auto ipv6_addr = reinterpret_cast(&ss); - memset(ipv6_addr, 0, sizeof(sockaddr_in6)); - ipv6_addr->sin6_family = AF_INET6; - ipv6_addr->sin6_addr = in6addr_loopback; - ipv6_addr->sin6_port = htons(54321); - *reinterpret_cast(msg->msg_name) = *ipv6_addr; - msg->msg_namelen = sizeof(sockaddr_in6); - msg->msg_controllen = 0; - return Api::SysCallSizeResult{5u, 0}; - })); + EXPECT_CALL(os_sys_calls_, recvmsg(fd, _, MSG_TRUNC)) + .WillOnce(Invoke([](os_fd_t, msghdr* msg, int) { + sockaddr_storage ss; + auto ipv6_addr = reinterpret_cast(&ss); + memset(ipv6_addr, 0, sizeof(sockaddr_in6)); + ipv6_addr->sin6_family = AF_INET6; + ipv6_addr->sin6_addr = in6addr_loopback; + ipv6_addr->sin6_port = htons(54321); + *reinterpret_cast(msg->msg_name) = *ipv6_addr; + msg->msg_namelen = sizeof(sockaddr_in6); + msg->msg_controllen = 0; + return Api::SysCallSizeResult{5u, 0}; + })); wrapper_->recvmsg(&slice, 1, /*self_port=*/12345, output); size_t num_packet_per_call = 1u; @@ -97,7 +98,7 @@ TEST_F(QuicIoHandleWrapperTest, DelegateIoHandleCalls) { absl::FixedArray({Buffer::RawSlice{data, 5}})); EXPECT_CALL(os_sys_calls_, recvmmsg(fd, _, num_packet_per_call, _, nullptr)) .WillOnce(Invoke([](os_fd_t, struct mmsghdr*, unsigned int, int, struct timespec*) { - return Api::SysCallIntResult{1u, 0}; + return Api::SysCallIntResult{-1, SOCKET_ERROR_AGAIN}; })); wrapper_->recvmmsg(slices, /*self_port=*/12345, output2); diff --git a/test/test_common/threadsafe_singleton_injector.h b/test/test_common/threadsafe_singleton_injector.h index 627efa1390e6..b9a0020dc88a 100644 --- a/test/test_common/threadsafe_singleton_injector.h +++ b/test/test_common/threadsafe_singleton_injector.h @@ -12,6 +12,7 @@ template class TestThreadsafeSingletonInjector { ThreadSafeSingleton::instance_ = instance; } ~TestThreadsafeSingletonInjector() { ThreadSafeSingleton::instance_ = latched_instance_; } + T& latched() { return *latched_instance_; } private: T* latched_instance_;