From 95ccac03fb9d05ef0e2fb01a79a9924a777baab3 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 21 Oct 2024 17:03:40 +0900 Subject: [PATCH] socket_manager: add feature to share sockets with another server This provides live restart feature for network servers. (The existing live restart feature does not support network servers.) Another process can take over UDP/TCP sockets without downtime. server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(path) This starts a new server that shares all UDP/TCP sockets with the existing server. The old process should stop without removing the file for the socket after the new process starts. ref: https://github.com/fluent/fluentd/issues/4622 Limitation: This feature would not work well if the process opens new TCP ports frequently. Signed-off-by: Daijiro Fukuda Co-authored-by: Shizuo Fujita --- README.md | 9 +- lib/serverengine/socket_manager.rb | 21 ++- lib/serverengine/socket_manager_unix.rb | 91 ++++++++++--- spec/socket_manager_spec.rb | 168 ++++++++++++++++++++++++ 4 files changed, 263 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 3edf48b..9b36c54 100644 --- a/README.md +++ b/README.md @@ -413,8 +413,15 @@ se = ServerEngine.create(MyServer, MyWorker, { se.run ``` -See also [examples](https://github.com/fluent/serverengine/tree/master/examples). +Other features: + +- `socket_manager_server = SocketManager::Server.share_sockets_with_another_server(path)` + - It starts a new manager server that shares all UDP/TCP sockets with the existing manager. + - We can use this for live restart for network servers. + - The old process should stop without removing the file for the socket after the new process starts. + - Limitation: This feature would not work well if the process opens new TCP ports frequently. +See also [examples](https://github.com/fluent/serverengine/tree/master/examples). ## Module API diff --git a/lib/serverengine/socket_manager.rb b/lib/serverengine/socket_manager.rb index fe0e484..b0ef405 100644 --- a/lib/serverengine/socket_manager.rb +++ b/lib/serverengine/socket_manager.rb @@ -96,19 +96,32 @@ def self.open(path = nil) end end - def initialize(path) + def self.share_sockets_with_another_server(path) + raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows? + server = new(path, start: false) + server.share_sockets_with_another_server + server + end + + def initialize(path, start: true) @tcp_sockets = {} @udp_sockets = {} @mutex = Mutex.new - @path = start_server(path) + @path = start ? start_server(path) : path end attr_reader :path + attr_reader :tcp_sockets, :udp_sockets # for tests def new_client Client.new(@path) end + def start + start_server(path) + nil + end + def close stop_server nil @@ -159,9 +172,9 @@ def process_peer(peer) res = SocketManager.recv_peer(peer) return if res.nil? - pid, method, bind, port = *res + pid, method, *opts = res begin - send_socket(peer, pid, method, bind, port) + send_socket(peer, pid, method, *opts) rescue => e SocketManager.send_peer(peer, e) end diff --git a/lib/serverengine/socket_manager_unix.rb b/lib/serverengine/socket_manager_unix.rb index 625a831..1b1f46e 100644 --- a/lib/serverengine/socket_manager_unix.rb +++ b/lib/serverengine/socket_manager_unix.rb @@ -47,6 +47,38 @@ def recv_udp(family, peer, sent) end module ServerModule + def share_sockets_with_another_server + another_server = UNIXSocket.new(@path) + begin + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :share_udp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @udp_sockets[key] = another_server.recv_io UDPSocket + idx += 1 + end + + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :share_tcp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @tcp_sockets[key] = another_server.recv_io TCPServer + idx += 1 + end + + SocketManager.send_peer(another_server, [Process.pid, :share_unix]) + res = SocketManager.recv_peer(another_server) + raise res if res.is_a?(Exception) + @server = another_server.recv_io UNIXServer + + start_server(@path) + ensure + another_server.close + end + end + private def listen_tcp_new(bind_ip, port) @@ -77,15 +109,17 @@ def listen_udp_new(bind_ip, port) end def start_server(path) - # return absolute path so that client can connect to this path - # when client changed working directory - path = File.expand_path(path) + unless @server + # return absolute path so that client can connect to this path + # when client changed working directory + path = File.expand_path(path) - begin - old_umask = File.umask(0077) # Protect unix socket from other users - @server = UNIXServer.new(path) - ensure - File.umask(old_umask) + begin + old_umask = File.umask(0077) # Protect unix socket from other users + @server = UNIXServer.new(path) + ensure + File.umask(old_umask) + end end @thread = Thread.new do @@ -111,19 +145,34 @@ def stop_server @thread.join if RUBY_VERSION >= "2.2" end - def send_socket(peer, pid, method, bind, port) - sock = case method - when :listen_tcp - listen_tcp(bind, port) - when :listen_udp - listen_udp(bind, port) - else - raise ArgumentError, "Unknown method: #{method.inspect}" - end - - SocketManager.send_peer(peer, nil) - - peer.send_io sock + def send_socket(peer, pid, method, *opts) + case method + when :listen_tcp + bind, port = opts + sock = listen_tcp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :listen_udp + bind, port = opts + sock = listen_udp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :share_tcp + idx, = opts + key = @tcp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@tcp_sockets.values[idx]) if key + when :share_udp + idx, = opts + key = @udp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@udp_sockets.values[idx]) if key + when :share_unix + SocketManager.send_peer(peer, nil) + peer.send_io @server + else + raise ArgumentError, "Unknown method: #{method.inspect}" + end end end diff --git a/spec/socket_manager_spec.rb b/spec/socket_manager_spec.rb index c74e877..3bbfcac 100644 --- a/spec/socket_manager_spec.rb +++ b/spec/socket_manager_spec.rb @@ -55,6 +55,15 @@ expect(server.path).to be_between(49152, 65535) end end + + context 'Server.share_sockets_with_another_server' do + it 'not supported' do + server = SocketManager::Server.open(server_path) + expect { SocketManager::Server.share_sockets_with_another_server(server_path) }.to raise_error(NotImplementedError) + ensure + server.close + end + end else context 'Server.generate_path' do it 'returns socket path under /tmp' do @@ -76,6 +85,165 @@ expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_') end end + + context 'Server.share_sockets_with_another_server' do + it 'takes over listen sockets to another server' do + server = SocketManager::Server.open(server_path) + + client = SocketManager::Client.new(server_path) + tcp1 = client.listen_tcp('127.0.0.1', 55551) + udp1 = client.listen_udp('127.0.0.1', 55561) + udp2 = client.listen_udp('127.0.0.1', 55562) + + another_server = SocketManager::Server.share_sockets_with_another_server(server_path) + + expect([ + another_server.tcp_sockets.keys, + another_server.tcp_sockets.values.map(&:addr), + another_server.udp_sockets.keys, + another_server.udp_sockets.values.map(&:addr), + ]).to eq([ + server.tcp_sockets.keys, + server.tcp_sockets.values.map(&:addr), + server.udp_sockets.keys, + server.udp_sockets.values.map(&:addr), + ]) + ensure + tcp1&.close + udp1&.close + udp2&.close + server&.close + another_server&.close + end + + it 'takes over TCP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + # The old server starts listening + thread_server = Thread.new do + server = manager_client.listen_tcp('127.0.0.1', test_port) + has_server_started = true + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + # The client starts sending data + thread_client = Thread.new do + 100.times do |i| + socket = TCPSocket.new('127.0.0.1', test_port) + begin + socket.write("Hello #{i}\n") + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + # The new server shares the sockets and starts listening in parallel with the old one + thread_new_server = Thread.new do + new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path) + server = manager_client.listen_tcp('127.0.0.1', test_port) + while socket = server.accept + incr_test_state(:count) + socket.close + end + ensure + new_manager_server&.close + server&.close + end + + # Stop the old server + sleep 0.1 + thread_server.kill + thread_server.join + + thread_client.join + wait_for_stop + + # Confirm that server switching was completed without data loss + expect(test_state(:count)).to eq(100) + ensure + manager_server&.close + thread_server&.kill + thread_new_server&.kill + thread_server&.join + thread_new_server&.join + end + + it 'takes over UDP sockets without downtime' do + manager_server = SocketManager::Server.open(server_path) + manager_client = SocketManager::Client.new(server_path) + + has_server_started = false + # The old server starts listening + thread_server = Thread.new do + server = manager_client.listen_udp('127.0.0.1', test_port) + has_server_started = true + while server.recv(10) + incr_test_state(:count) + end + ensure + server&.close + end + + sleep 0.1 until has_server_started + + # The client starts sending data + thread_client = Thread.new do + 100.times do |i| + socket = UDPSocket.new + begin + socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port) + ensure + socket.close + end + sleep 0.01 + end + end + + sleep 0.5 + + # The new server shares the sockets and starts listening in parallel with the old one + thread_new_server = Thread.new do + new_manager_server = SocketManager::Server.share_sockets_with_another_server(server_path) + server = manager_client.listen_udp('127.0.0.1', test_port) + while server.recv(10) + incr_test_state(:count) + end + ensure + new_manager_server&.close + server&.close + end + + # Stop the old server + sleep 0.1 + thread_server.kill + thread_server.join + + thread_client.join + wait_for_stop + + # Confirm that server switching was completed without data loss + expect(test_state(:count)).to eq(100) + ensure + manager_server&.close + thread_server&.kill + thread_new_server&.kill + thread_server&.join + thread_new_server&.join + end + end end context 'with thread' do