Skip to content

Commit

Permalink
Add SocketManager that shares sockets across worker processes
Browse files Browse the repository at this point in the history
With SocketManager, a worker process can ask a parent process
(supervisor or server process) to listen a TCP or UDP socket. This
can dynamically happen so that worker processes can listen dynamically.

On UNIX, SocketManager::Server creates a UNIX domain socket and worker
processes connect there. Then Server listens on a port, then sends the
socket using Socket#send_io.

On Windows, SocketManager::Server creates a named pipe and worker
processes connect there. Then Server listens on a port, then sends the
socket using WSADuplicateSocketA.
  • Loading branch information
frsyuki committed Nov 11, 2015
1 parent 45d0007 commit d7c632c
Show file tree
Hide file tree
Showing 6 changed files with 594 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/serverengine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module ServerEngine
:MultiThreadServer => 'serverengine/multi_thread_server',
:MultiSpawnServer => 'serverengine/multi_spawn_server',
:ProcessManager => 'serverengine/process_manager',
:SocketManager => 'serverengine/socket_manager',
:Worker => 'serverengine/worker',
:VERSION => 'serverengine/version',
}.each_pair {|k,v|
Expand Down
160 changes: 160 additions & 0 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#
# ServerEngine
#
# Copyright (C) 2012-2013 Sadayuki Furuhashi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module ServerEngine
module SocketManager

require 'socket'
require 'ipaddr'

class Client
def initialize(path)
@path = path
end

def listen_tcp(bind, port)
peer = connect_peer(@path)
begin
SocketManager.send_peer(peer, [Process.pid, :listen_tcp, bind, port])
res = SocketManager.recv_peer(peer)
if res.is_a?(Exception)
raise res
else
return recv_tcp(peer, res)
end
ensure
peer.close
end
end

def listen_udp(bind, port)
peer = connect_peer(@path)
begin
SocketManager.send_peer(peer, [Process.pid, :listen_udp, bind, port])
res = SocketManager.recv_peer(peer)
if res.is_a?(Exception)
raise res
else
return recv_udp(peer, res)
end
ensure
peer.close
end
end
end

class Server
def self.open(path)
new(path)
end

def initialize(path)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
end

attr_reader :path

def new_client
Client.new(@path)
end

def close
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
stop_server
nil
end

private

def listen_tcp(bind, port)
key, bind_ip = resolve_bind_key(bind, port)

@mutex.synchronize do
if @tcp_sockets.has_key?(key)
return @tcp_sockets[key]
else
return @tcp_sockets[key] = listen_tcp_new(bind_ip, port)
end
end
end

def listen_udp(bind, port)
key, bind_ip = resolve_bind_key(bind, port)

@mutex.synchronize do
if @udp_sockets.has_key?(key)
return @udp_sockets[key]
else
return @udp_sockets[key] = listen_udp_new(bind_ip, port)
end
end
end

def resolve_bind_key(bind, port)
bind_ip = IPAddr.new(IPSocket.getaddress(bind))
if bind_ip.ipv6?
return "[#{bind_ip}]:#{port}", bind_ip
else
# assuming ipv4
return "#{bind_ip}:#{port}", bind_ip
end
end

def process_peer(peer)
while true
pid, method, bind, port = *SocketManager.recv_peer(peer)
begin
send_socket(peer, pid, method, bind, port)
rescue => e
SocketManager.send_peer(peer, e)
end
end
ensure
peer.close
end
end

def self.send_peer(peer, obj)
data = Marshal.dump(obj)
peer.write [data.bytesize].pack('N')
peer.write data
end

def self.recv_peer(peer)
len = peer.read(4).unpack('N').first
data = peer.read(len)
Marshal.load(data)
end

require 'serverengine/utils'

if ServerEngine.windows?
require 'serverengine/socket_manager_win'
Client.include(SocketManagerWin::ClientModule)
Server.include(SocketManagerWin::ServerModule)
else
require 'serverengine/socket_manager_unix'
Client.include(SocketManagerUnix::ClientModule)
Server.include(SocketManagerUnix::ServerModule)
end

end
end
94 changes: 94 additions & 0 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#
# ServerEngine
#
# Copyright (C) 2012-2013 Sadayuki Furuhashi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module ServerEngine
module SocketManagerUnix

module ClientModule
private

def connect_peer(path)
return UNIXSocket.new(path)
end

def recv_tcp(peer, sent)
return peer.recv_io(TCPServer)
end

def recv_udp(peer, sent)
return peer.recv_io(UDPSocket)
end
end

module ServerModule
private

def listen_tcp_new(bind_ip, port)
sock = TCPServer.new(bind_ip.to_s, port)
sock.listen(Socket::SOMAXCONN) # TODO make backlog configurable if necessary
return sock
end

def listen_udp_new(bind_ip, port)
if bind_ip.ipv6?
sock = UDPSocket.new(Socket::AF_INET6)
else
sock = UDPSocket.new
end
sock.bind(bind_ip.to_s, port)
return sock
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

@server = UNIXServer.new(path)

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def stop_server
@server.close unless @server.closed?
@thread.join
end

def send_socket(peer, pid, method, bind, port)
sock = send(method, bind, port) # calls listen_tcp or listen_udp

SocketManager.send_peer(peer, nil)

peer.send_io sock
end
end

end
end
135 changes: 135 additions & 0 deletions lib/serverengine/socket_manager_win.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# ServerEngine
#
# Copyright (C) 2012-2013 Sadayuki Furuhashi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module ServerEngine
module SocketManagerWin

require 'win32/pipe'
require 'serverengine/winsock'

module ClientModule
private

def connect_peer(path)
return Win32::Pipe::Client.new(@pipe_name)
end

def recv_tcp(peer, proto)
# TODO call rb_w32_wrap_io_handle with TCPServer so that clients can use TCPServer API
return WinSock::WSASocketA(Socket::AF_INET, Socket::SOCK_STREAM, 0, proto, 0, WinSock::WSA_FLAG_OVERLAPPED)
end

def recv_udp(peer, proto)
# TODO call rb_w32_wrap_io_handle with UDPSocket so that clients can use UDPSocket API
return WinSock::WSASocketA(Socket::AF_INET, Socket::SOCK_DGRAM, 0, proto, 0, WinSock::WSA_FLAG_OVERLAPPED)
end
end

class Server
private

def listen_tcp_new(bind, port)
# TODO IPv6 is not supported

sock = WinSock::WSASocketA(Socket::AF_INET, Socket::SOCK_STREAM, Socket::IPPROTO_TCP, nil, 0, WinSock::WSA_FLAG_OVERLAPPED)
# TODO call rb_w32_wrap_io_handle so that sock is closed by SocketManager::Server#close or GC

sock_addr = pack_sockaddr(bind_ip, port)
WinSock::bind(sock, listen_addr, listen_addr.size)
WinSock::listen(sock, Socket::SOMAXCONN)

return sock
end

def listen_udp_new(bind_ip, port)
# TODO IPv6 is not supported

sock = WinSock::WSASocketA(Socket::AF_INET, Socket::SOCK_DGRAM, Socket::IPPROTO_UDP, nil, 0, WinSock::WSA_FLAG_OVERLAPPED)
# TODO call rb_w32_wrap_io_handle so that sock is closed by SocketManager::Server#close or GC

sock_addr = pack_sockaddr(bind_ip, port)
WinSock::bind(sock, sock_addr, sock_addr.size)

return sock
end

def pack_sockaddr(bind_ip, port)
# implementing Socket.pack_sockaddr_in here
sock_addr = WinSock::SockaddrIn.new
in_addr = WinSock::InAddr.new
in_addr[:s_addr] = bind_ip.to_i
sock_addr[:sin_family] = Socket::AF_INET
sock_addr[:sin_port] = htons(port)
sock_addr[:sin_addr] = in_addr
return sock_addr
end

def htons(h)
[h].pack("S").unpack("n")[0]
end

def start_server(path)
@running = true

@thread = Thread.new do
begin
pipe = nil
while @running
pipe ||= Win32::Pipe::Server.new(path, Win32::Pipe::DEFAULT_PIPE_MODE, Win32::Pipe::DEFAULT_OPEN_MODE | Win32::Pipe::OVERLAPPED)
pipe.wait(1)
if pipe.connect
Thread.new(pipe, &method(:process_peer)) # process_peer calls send_socket
pipe = nil
end
end
rescue => e
ServerEngine.dump_uncaught_error(e)
end
end

return path
end

def stop_server
@running = false
@thread.join
end

def send_socket(peer, pid, method, bind, port)
case method
when :listen_tcp
sock = listen_tcp(bind, port)
type = Socket::SOCK_STREAM
when :listen_udp
sock = listen_tcp(bind, port)
type = Socket::SOCK_DGRAM
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

proto = WinSock::WSAPROTOCOL_INFO.new
unless WinSock::WSADuplicateSocketA(sock, pid, proto) == 0
raise "WSADuplicateSocketA faild (0x%x)" % WinSock::WSAGetLastError()
end

SocketManager.send_peer(peer, proto)
end
end

end
end
Loading

0 comments on commit d7c632c

Please sign in to comment.