From b9f176782e6a8cfd477068fe9444510889d729f4 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 18 May 2018 10:54:09 +0800 Subject: [PATCH] tcp: Adjust receive window The receive window size should be adjusted according to the max receiver buffer. Fix the following abort. Without reducing the receive window, the sender has no back pressure, if the application does not read the data, the _rcv.data grows unbounded, when the application read again, all the packet in _rcv.data will be merged into a single packet with tons of fragments and cause the assert. In seastar::net::tcp::tcb::read(): for (auto&& q : _rcv.data) { p.append(std::move(q)); } server: /home/asias/src/cloudius-systems/scylla/seastar/net/packet.hh:144: static void* seastar::net::packet::impl::operator new(size_t, size_t): Assertion `nr_frags == uint16_t(nr_frags)' failed. Aborting on shard 0. Backtrace: 0x000000000041fafa 0x000000000041fbcc 0x000000000041fc83 /lib64/libpthread.so.0+0x000000000001241f /lib64/libc.so.6+0x00000000000369fa /lib64/libc.so.6+0x00000000000387ff /lib64/libc.so.6+0x000000000002f0d9 /lib64/libc.so.6+0x000000000002f151 0x00000000004b89dd 0x00000000005ab01e 0x00000000005cbc6a 0x00000000005dbfc1 0x00000000005db9b8 0x0000000000570986 0x0000000000570be5 0x000000000069859a 0x00000000006a16e5 0x000000000041d136 0x000000000041d422 0x000000000046b527 0x00000000004fba62 0x0000000000418e77 /lib64/libc.so.6+0x0000000000020889 0x0000000000419039 $ addr2line -Cfpi -e ./server 0x000000000041fafa 0x000000000041fbcc 0x000000000041fc83 /lib64/libpthread.so.0+0x000000000001241f /lib64/libc.so.6+0x00000000000369fa /lib64/libc.so.6+0x00000000000387ff /lib64/libc.so.6+0x000000000002f0d9 /lib64/libc.so.6+0x000000000002f151 0x00000000004b89dd 0x00000000005ab01e 0x00000000005cbc6a 0x00000000005dbfc1 0x00000000005db9b8 0x0000000000570986 0x0000000000570be5 0x000000000069859a 0x00000000006a16e5 0x000000000041d136 0x000000000041d422 0x000000000046b527 0x00000000004fba62 0x0000000000418e77 /lib64/libc.so.6+0x0000000000020889 0x0000000000419039 seastar::backtrace_buffer::append_backtrace() at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:378 (inlined by) print_with_backtrace at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:399 seastar::print_with_backtrace(char const*) at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:406 sigabrt_action at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:3914 (inlined by) operator() at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:3896 (inlined by) _FUN at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:3892 ?? ??:0 ?? ??:0 ?? ??:0 ?? ??:0 ?? ??:0 seastar::net::packet::impl::operator new(unsigned long, unsigned long) at /home/asias/src/cloudius-systems/scylla/seastar/net/packet.hh:144 (inlined by) seastar::net::packet::impl::allocate(unsigned long) at /home/asias/src/cloudius-systems/scylla/seastar/net/packet.hh:117 (inlined by) seastar::net::packet::impl::copy(seastar::net::packet::impl*, unsigned long) at /home/asias/src/cloudius-systems/scylla/seastar/net/packet.hh:121 (inlined by) seastar::net::packet::impl::allocate_if_needed(std::unique_ptr >, unsigned long) at /home/asias/src/cloudius-systems/scylla/seastar/net/packet.hh:141 seastar::net::packet::append(seastar::net::packet&&) at /home/asias/src/cloudius-systems/scylla/seastar/net/packet.hh:490 seastar::net::tcp::tcb::read() at /home/asias/src/cloudius-systems/scylla/seastar/net/tcp.hh:1760 (inlined by) seastar::net::tcp::connection::read() at /home/asias/src/cloudius-systems/scylla/seastar/net/tcp.hh:682 (inlined by) seastar::net::native_connected_socket_impl >::native_data_source_impl::get()::{lambda()#2}::operator()() const at /home/asias/src/cloudius-systems/scylla/seastar/net/native-stack-impl.hh:145 (inlined by) seastar::apply_helper >::native_data_source_impl::get()::{lambda()#2}, std::tuple<>&&, std::integer_sequence >::apply({lambda()#2}&&, std::tuple<>) at /home/asias/src/cloudius-systems/scylla/seastar/core/apply.hh:36 (inlined by) auto seastar::apply >::native_data_source_impl::get()::{lambda()#2}>(seastar::net::native_connected_socket_impl >::native_data_source_impl::get()::{lambda()#2}&&, std::tuple<>&&) at /home/asias/src/cloudius-systems/scylla/seastar/core/apply.hh:44 (inlined by) seastar::future > seastar::futurize > >::apply >::native_data_source_impl::get()::{lambda()#2}>(seastar::net::native_connected_socket_impl >::native_data_source_impl::get()::{lambda()#2}&&, std::tuple<>&&) at /home/asias/src/cloudius-systems/scylla/seastar/core/future.hh:1377 seastar::future > seastar::future<>::then >::native_data_source_impl::get()::{lambda()#2}, seastar::future > >(seastar::net::native_connected_socket_impl >::native_data_source_impl::get()::{lambda()#2}&&) at /home/asias/src/cloudius-systems/scylla/seastar/core/future.hh:943 seastar::net::native_connected_socket_impl >::native_data_source_impl::get() at /home/asias/src/cloudius-systems/scylla/seastar/net/native-stack-impl.hh:149 seastar::data_source::get() at /home/asias/src/cloudius-systems/scylla/seastar/core/iostream.hh:67 (inlined by) seastar::input_stream::read_exactly_part(unsigned long, seastar::temporary_buffer, unsigned long) at /home/asias/src/cloudius-systems/scylla/seastar/core/iostream-impl.hh:164 seastar::input_stream::read_exactly(unsigned long) at /home/asias/src/cloudius-systems/scylla/seastar/core/iostream-impl.hh:191 tcp_echo_server::connection::process() at /home/asias/src/cloudius-systems/scylla/seastar/seastar_example_from_shanshanpt/server.cc:86 (inlined by) tcp_echo_server::start()::{lambda()#1}::operator()() const::{lambda(seastar::connected_socket, {lambda()#1}::socket_address)#1}::operator()(seastar, seastar::connected_socket)::{lambda()#2}::operator()() const at /home/asias/src/cloudius-systems/scylla/seastar/seastar_example_from_shanshanpt/server.cc:113 seastar::internal::do_until_state::run_and_dispose() at /home/asias/src/cloudius-systems/scylla/seastar/core/future-util.hh:463 seastar::reactor::run_tasks(seastar::reactor::task_queue&) at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:2597 seastar::reactor::run_some_tasks() at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:3009 seastar::reactor::run_some_tasks() at /usr/include/c++/7/chrono:377 (inlined by) seastar::reactor::run() at /home/asias/src/cloudius-systems/scylla/seastar/core/reactor.cc:3156 seastar::app_template::run_deprecated(int, char**, std::function&&) at /home/asias/src/cloudius-systems/scylla/seastar/core/app-template.cc:185 main at /home/asias/src/cloudius-systems/scylla/seastar/seastar_example_from_shanshanpt/server.cc:179 (discriminator 5) Message-Id: <06a6c0ccb82fd4fdde6f56a112a8eb6f659ee8ab.1526611968.git.asias@scylladb.com> --- net/tcp.hh | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/net/tcp.hh b/net/tcp.hh index 7d9d448db05..d759227cdd4 100644 --- a/net/tcp.hh +++ b/net/tcp.hh @@ -378,8 +378,13 @@ private: tcp_seq urgent; tcp_seq initial; std::deque data; + // The total size of data stored in std::deque data + size_t data_size = 0; tcp_packet_merger out_of_order; std::experimental::optional> _data_received_promise; + // The maximun memory buffer size allowed for receiving + // Currently, it is the same as default receive window size when window scaling is enabled + size_t max_receive_buf_size = 3737600; } _rcv; tcp_option _option; timer _delayed_ack; @@ -410,6 +415,16 @@ private: tcp_seq get_isn(); circular_buffer _packetq; bool _poll_active = false; + uint32_t get_default_receive_window_size() { + // Linux's default window size + constexpr uint32_t size = 29200; + return size << _rcv.window_scale; + } + // Returns the current receive window according to available receiving buffer size + uint32_t get_modified_receive_window_size() { + uint32_t left = _rcv.data_size > _rcv.max_receive_buf_size ? 0 : _rcv.max_receive_buf_size - _rcv.data_size; + return std::min(left, get_default_receive_window_size()); + } public: tcb(tcp& t, connid id); void input_handle_listen_state(tcp_hdr* th, packet p); @@ -1054,8 +1069,7 @@ void tcp::tcb::init_from_options(tcp_hdr* th, uint8_t* opt_start, ui // Maximum segment size local can receive _rcv.mss = _option._local_mss = local_mss(); - // Linux's default window size - _rcv.window = 29200 << _rcv.window_scale; + _rcv.window = get_default_receive_window_size(); _snd.window = th->window << _snd.window_scale; // Segment sequence number used for last window update @@ -1464,9 +1478,11 @@ void tcp::tcb::input_handle_other_state(tcp_hdr* th, packet p) { // RCV.NXT over the data accepted, and adjusts RCV.WND as // apporopriate to the current buffer availability. The total of // RCV.NXT and RCV.WND should not be reduced. + _rcv.data_size += p.len(); _rcv.data.push_back(std::move(p)); _rcv.next += seg_len; auto merged = merge_out_of_order(); + _rcv.window = get_modified_receive_window_size(); signal_data_received(); // Send an acknowledgment of the form: // @@ -1732,8 +1748,7 @@ void tcp::tcb::connect() { _rcv.window_scale = _option._local_win_scale = 7; // Maximum segment size local can receive _rcv.mss = _option._local_mss = local_mss(); - // Linux's default window size - _rcv.window = 29200 << _rcv.window_scale; + _rcv.window = get_default_receive_window_size(); do_syn_sent(); } @@ -1744,7 +1759,9 @@ packet tcp::tcb::read() { for (auto&& q : _rcv.data) { p.append(std::move(q)); } + _rcv.data_size = 0; _rcv.data.clear(); + _rcv.window = get_default_receive_window_size(); return p; } @@ -1857,6 +1874,7 @@ bool tcp::tcb::merge_out_of_order() { } _rcv.next += seg_len; _rcv.data.push_back(std::move(p)); + _rcv.data_size += p.len(); // Since c++11, erase() always returns the value of the following element it = _rcv.out_of_order.map.erase(it); merged = true; @@ -2015,6 +2033,7 @@ void tcp::tcb::cleanup() { _snd.unsent.clear(); _snd.data.clear(); _rcv.out_of_order.map.clear(); + _rcv.data_size = 0; _rcv.data.clear(); stop_retransmit_timer(); clear_delayed_ack();