Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp: properly handle truncated/dropped datagrams #14122

Merged
merged 13 commits into from
Nov 20, 2020
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Bug Fixes
* proxy_proto: fixed a bug where the wrong downstream address got sent to upstream connections.
* tls: fix detection of the upstream connection close event.
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.
* udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash.
* watchdog: touch the watchdog before most event loop operations to avoid misses when handling bursts of callbacks.

Removed Config or Runtime
Expand Down
3 changes: 3 additions & 0 deletions include/envoy/network/io_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class IoHandle {
unsigned int msg_len_{0};
// The gso_size, if specified in the transport header
unsigned int gso_size_{0};
// If true indicates a successful syscall, but the packet was dropped due to truncation. We do
// not support receiving truncated packets.
bool truncated_and_dropped_{false};
};

/**
Expand Down
12 changes: 10 additions & 2 deletions source/common/api/win32/os_sys_calls_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,22 @@ SysCallSizeResult OsSysCallsImpl::recv(os_fd_t socket, void* buffer, size_t leng
}

SysCallSizeResult OsSysCallsImpl::recvmsg(os_fd_t sockfd, msghdr* msg, int flags) {
DWORD bytes_received;
DWORD bytes_received = 0;
LPFN_WSARECVMSG recvmsg_fn_ptr = getFnPtrWSARecvMsg();
wsamsgResult wsamsg = msghdrToWSAMSG(msg);
// Windows supports only a single flag on input to WSARecvMsg
wsamsg.wsamsg_->dwFlags = flags & MSG_PEEK;
const int rc = recvmsg_fn_ptr(sockfd, wsamsg.wsamsg_.get(), &bytes_received, nullptr, nullptr);
if (rc == SOCKET_ERROR) {
return {-1, ::WSAGetLastError()};
// We try to match the UNIX behavior for truncated packages. In that case the return code is
// the length of the allocated buffer and we get the value from `dwFlags`.
auto last_error = ::WSAGetLastError();
if (last_error == WSAEMSGSIZE) {
msg->msg_flags = wsamsg.wsamsg_->dwFlags;
return {bytes_received, 0};
}

return {rc, last_error};
}
msg->msg_namelen = wsamsg.wsamsg_->namelen;
msg->msg_flags = wsamsg.wsamsg_->dwFlags;
Expand Down
41 changes: 30 additions & 11 deletions source/common/network/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ in_addr addressFromMessage(const cmsghdr& cmsg) {
#endif
}

constexpr int messageTruncatedOption() {
#if defined(__APPLE__)
// OSX does not support passing `MSG_TRUNC` to recvmsg and recvmmsg. This does not effect
// functionality and it primarily used for logging.
return 0;
#else
return MSG_TRUNC;
#endif
}

} // namespace

namespace Network {
Expand Down Expand Up @@ -350,7 +360,8 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
hdr.msg_flags = 0;
hdr.msg_control = cbuf.begin();
hdr.msg_controllen = cmsg_space_;
const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, 0);
Api::SysCallSizeResult result =
Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, messageTruncatedOption());
if (result.rc_ < 0) {
auto io_result = sysCallResultToIoCallResult(result);
// Emulated edge events need to registered if the socket operation did not complete
Expand All @@ -362,6 +373,13 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
}
return io_result;
}
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", result.rc_);
result.rc_ = 0;
(*output.dropped_packets_)++;
output.msg_[0].truncated_and_dropped_ = true;
return sysCallResultToIoCallResult(result);
}

RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
Expand All @@ -386,7 +404,7 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
if (output.dropped_packets_ != nullptr) {
absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
if (maybe_dropped) {
*output.dropped_packets_ = *maybe_dropped;
*output.dropped_packets_ += *maybe_dropped;
continue;
}
}
Expand Down Expand Up @@ -439,8 +457,9 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
// Set MSG_WAITFORONE so that recvmmsg will not waiting for
// |num_packets_per_mmsg_call| packets to arrive before returning when the
// socket is a blocking socket.
const Api::SysCallIntResult result = Api::OsSysCallsSingleton::get().recvmmsg(
fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, MSG_TRUNC | MSG_WAITFORONE, nullptr);
const Api::SysCallIntResult result =
Api::OsSysCallsSingleton::get().recvmmsg(fd_, mmsg_hdr.data(), num_packets_per_mmsg_call,
messageTruncatedOption() | MSG_WAITFORONE, nullptr);

if (result.rc_ <= 0) {
auto io_result = sysCallResultToIoCallResult(result);
Expand All @@ -457,18 +476,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
int num_packets_read = result.rc_;

for (int i = 0; i < num_packets_read; ++i) {
if (mmsg_hdr[i].msg_len == 0) {
msghdr& hdr = mmsg_hdr[i].msg_hdr;
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len);
(*output.dropped_packets_)++;
output.msg_[i].truncated_and_dropped_ = true;
continue;
}
msghdr& hdr = mmsg_hdr[i].msg_hdr;

RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
RELEASE_ASSERT(hdr.msg_namelen > 0,
fmt::format("Unable to get remote address from recvmmsg() for fd: {}", fd_));
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(warn, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len);
continue;
}

output.msg_[i].msg_len_ = mmsg_hdr[i].msg_len;
// Get local and peer addresses for each packet.
Expand All @@ -494,7 +513,7 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
if (maybe_dropped) {
*output.dropped_packets_ = *maybe_dropped;
*output.dropped_packets_ += *maybe_dropped;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class UdpListenerImpl : public BaseListenerImpl,
public:
UdpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket,
UdpListenerCallbacks& cb, TimeSource& time_source);

~UdpListenerImpl() override;
uint32_t packetsDropped() { return packets_dropped_; }

// Network::Listener Interface
void disable() override;
Expand Down
12 changes: 5 additions & 7 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
uint64_t packets_read = result.rc_;
ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
for (uint64_t i = 0; i < packets_read; ++i) {
if (output.msg_[i].truncated_and_dropped_) {
continue;
}

Buffer::RawSlice* slice = slices[i].data();
const uint64_t msg_len = output.msg_[i].msg_len_;
ASSERT(msg_len <= slice->len_);
Expand All @@ -651,7 +655,7 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
Api::IoCallUint64Result result =
receiveMessage(udp_packet_processor.maxPacketSize(), buffer, output, handle, local_address);

if (!result.ok()) {
if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
return result;
}

Expand All @@ -678,12 +682,6 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
return std::move(result.err_);
}

if (result.rc_ == 0) {
// TODO(conqerAtapple): Is zero length packet interesting? If so add stats
// for it. Otherwise remove the warning log below.
ENVOY_LOG_MISC(trace, "received 0-length packet");
}

if (packets_dropped != old_packets_dropped) {
// The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
// value. So as long as this count differs from previously recorded value,
Expand Down
64 changes: 58 additions & 6 deletions test/common/network/udp_listener_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ namespace {
// packets are sent from a network namespace different to that of
// the client. Currently, the testing framework does not support
// this behavior.
// This helper allows to intercept the supportsUdpGro syscall and
// toggle the gro behavior as per individual test requirements.
class MockSupportsUdpGro : public Api::OsSysCallsImpl {
// This helper allows to intercept syscalls and
// toggle the behavior as per individual test requirements.
class OverrideOsSysCallsImpl : public Api::OsSysCallsImpl {
public:
MOCK_METHOD(bool, supportsUdpGro, (), (const));
MOCK_METHOD(bool, supportsMmsg, (), (const));
};

class UdpListenerImplTest : public UdpListenerImplTestBase {
public:
void SetUp() override {
ON_CALL(udp_gro_syscall_, supportsUdpGro()).WillByDefault(Return(false));
ON_CALL(override_syscall_, supportsUdpGro()).WillByDefault(Return(false));
// Return the real version by default.
ON_CALL(override_syscall_, supportsMmsg())
.WillByDefault(Return(os_calls.latched().supportsMmsg()));

// Set listening socket options.
server_socket_->addOptions(SocketOptionFactory::buildIpPacketInfoOptions());
Expand All @@ -64,8 +68,8 @@ class UdpListenerImplTest : public UdpListenerImplTestBase {
ON_CALL(listener_callbacks_, udpPacketWriter()).WillByDefault(ReturnRef(*udp_packet_writer_));
}

NiceMock<MockSupportsUdpGro> udp_gro_syscall_;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls{&udp_gro_syscall_};
NiceMock<OverrideOsSysCallsImpl> override_syscall_;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls{&override_syscall_};
};

INSTANTIATE_TEST_SUITE_P(IpVersions, UdpListenerImplTest,
Expand Down Expand Up @@ -126,6 +130,54 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) {
dispatcher_->run(Event::Dispatcher::RunType::Block);
}

// Test a large datagram that gets dropped using recvmmsg if supported.
TEST_P(UdpListenerImplTest, LargeDatagramRecvmmsg) {
// This will get dropped.
const std::string first(4096, 'a');
client_.write(first, *send_to_addr_);
const std::string second("second");
client_.write(second, *send_to_addr_);
// This will get dropped.
const std::string third(4096, 'b');
client_.write(third, *send_to_addr_);

EXPECT_CALL(listener_callbacks_, onReadReady());
EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);
EXPECT_EQ(data.buffer_->toString(), second);

dispatcher_->exit();
}));

dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_EQ(2, listener_->packetsDropped());
}

// Test a large datagram that gets dropped using recvmsg.
TEST_P(UdpListenerImplTest, LargeDatagramRecvmsg) {
ON_CALL(override_syscall_, supportsMmsg()).WillByDefault(Return(false));

// This will get dropped.
const std::string first(4096, 'a');
client_.write(first, *send_to_addr_);
const std::string second("second");
client_.write(second, *send_to_addr_);
// This will get dropped.
const std::string third(4096, 'b');
client_.write(third, *send_to_addr_);

EXPECT_CALL(listener_callbacks_, onReadReady());
EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);
EXPECT_EQ(data.buffer_->toString(), second);

dispatcher_->exit();
}));

dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_EQ(2, listener_->packetsDropped());
}

/**
* Tests UDP listener for read and write callbacks with actual data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,19 @@ TEST_F(QuicIoHandleWrapperTest, DelegateIoHandleCalls) {
addr = wrapper_->peerAddress();

Network::IoHandle::RecvMsgOutput output(1, nullptr);
EXPECT_CALL(os_sys_calls_, recvmsg(fd, _, 0)).WillOnce(Invoke([](os_fd_t, msghdr* msg, int) {
sockaddr_storage ss;
auto ipv6_addr = reinterpret_cast<sockaddr_in6*>(&ss);
memset(ipv6_addr, 0, sizeof(sockaddr_in6));
ipv6_addr->sin6_family = AF_INET6;
ipv6_addr->sin6_addr = in6addr_loopback;
ipv6_addr->sin6_port = htons(54321);
*reinterpret_cast<sockaddr_in6*>(msg->msg_name) = *ipv6_addr;
msg->msg_namelen = sizeof(sockaddr_in6);
msg->msg_controllen = 0;
return Api::SysCallSizeResult{5u, 0};
}));
EXPECT_CALL(os_sys_calls_, recvmsg(fd, _, MSG_TRUNC))
.WillOnce(Invoke([](os_fd_t, msghdr* msg, int) {
sockaddr_storage ss;
auto ipv6_addr = reinterpret_cast<sockaddr_in6*>(&ss);
memset(ipv6_addr, 0, sizeof(sockaddr_in6));
ipv6_addr->sin6_family = AF_INET6;
ipv6_addr->sin6_addr = in6addr_loopback;
ipv6_addr->sin6_port = htons(54321);
*reinterpret_cast<sockaddr_in6*>(msg->msg_name) = *ipv6_addr;
msg->msg_namelen = sizeof(sockaddr_in6);
msg->msg_controllen = 0;
return Api::SysCallSizeResult{5u, 0};
}));
wrapper_->recvmsg(&slice, 1, /*self_port=*/12345, output);

size_t num_packet_per_call = 1u;
Expand All @@ -97,7 +98,7 @@ TEST_F(QuicIoHandleWrapperTest, DelegateIoHandleCalls) {
absl::FixedArray<Buffer::RawSlice>({Buffer::RawSlice{data, 5}}));
EXPECT_CALL(os_sys_calls_, recvmmsg(fd, _, num_packet_per_call, _, nullptr))
.WillOnce(Invoke([](os_fd_t, struct mmsghdr*, unsigned int, int, struct timespec*) {
return Api::SysCallIntResult{1u, 0};
return Api::SysCallIntResult{-1, SOCKET_ERROR_AGAIN};
}));
wrapper_->recvmmsg(slices, /*self_port=*/12345, output2);

Expand Down
1 change: 1 addition & 0 deletions test/test_common/threadsafe_singleton_injector.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ template <class T> class TestThreadsafeSingletonInjector {
ThreadSafeSingleton<T>::instance_ = instance;
}
~TestThreadsafeSingletonInjector() { ThreadSafeSingleton<T>::instance_ = latched_instance_; }
T& latched() { return *latched_instance_; }

private:
T* latched_instance_;
Expand Down