Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Socket Manager API and Windows support #28

Merged
merged 2 commits into from
Dec 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
install:
- SET PATH=C:\Ruby%ruby_version%\bin;%PATH%
- ruby --version
- gem --version
- bundle install
build: off
test_script:
- bundle exec rake -rdevkit

environment:
matrix:
- ruby_version: "200"
- ruby_version: "200-x64"
- ruby_version: "21"
- ruby_version: "21-x64"
- ruby_version: "22"
- ruby_version: "22-x64"
1 change: 1 addition & 0 deletions lib/serverengine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module ServerEngine
:MultiThreadServer => 'serverengine/multi_thread_server',
:MultiSpawnServer => 'serverengine/multi_spawn_server',
:ProcessManager => 'serverengine/process_manager',
:SocketManager => 'serverengine/socket_manager',
:Worker => 'serverengine/worker',
:VERSION => 'serverengine/version',
}.each_pair {|k,v|
Expand Down
21 changes: 15 additions & 6 deletions lib/serverengine/multi_spawn_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@ module ServerEngine

class MultiSpawnServer < MultiWorkerServer
def initialize(worker_module, load_config_proc={}, &block)
@pm = ProcessManager.new(
auto_tick: false,
graceful_kill_signal: Daemon::Signals::GRACEFUL_STOP,
immediate_kill_signal: Daemon::Signals::IMMEDIATE_STOP,
enable_heartbeat: false,
)
if ServerEngine.windows?
@pm = ProcessManager.new(
auto_tick: false,
graceful_kill_signal: Daemon::Signals::GRACEFUL_STOP,
immediate_kill_signal: false,
enable_heartbeat: false,
)
else
@pm = ProcessManager.new(
auto_tick: false,
graceful_kill_signal: Daemon::Signals::GRACEFUL_STOP,
immediate_kill_signal: Daemon::Signals::IMMEDIATE_STOP,
enable_heartbeat: false,
)
end

super(worker_module, load_config_proc, &block)

Expand Down
75 changes: 45 additions & 30 deletions lib/serverengine/process_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def configure(config, opts={})
end

def fork(&block)

if ServerEngine.windows?
raise NotImplementedError, "fork is not available on this platform. Please use spawn(worker_type = 'spawn')."
end

rpipe, wpipe = new_pipe_pair

begin
Expand Down Expand Up @@ -143,26 +148,30 @@ def spawn(*args)

# pipe is necessary even if @enable_heartbeat == false because
# parent process detects shutdown of a child process using it
rpipe, wpipe = new_pipe_pair

begin
options[[wpipe.fileno]] = wpipe
if @enable_heartbeat
env['SERVERENGINE_HEARTBEAT_PIPE'] = wpipe.fileno.to_s
unless ServerEngine.windows?
rpipe, wpipe = new_pipe_pair
options[[wpipe.fileno]] = wpipe
if @enable_heartbeat
env['SERVERENGINE_HEARTBEAT_PIPE'] = wpipe.fileno.to_s
end
end

pid = Process.spawn(env, *args, options)

m = Monitor.new(self, pid)

@monitors << m
@rpipes[rpipe] = m
rpipe = nil

unless ServerEngine.windows?
@rpipes[rpipe] = m
rpipe = nil
end

return m

ensure
wpipe.close
wpipe.close if wpipe
rpipe.close if rpipe
end
end
Expand Down Expand Up @@ -199,30 +208,32 @@ def tick(blocking_timeout=0)
raise AlreadyClosedError.new
end

if @rpipes.empty?
sleep blocking_timeout if blocking_timeout > 0
return nil
end

ready_pipes, _, _ = IO.select(@rpipes.keys, nil, nil, blocking_timeout)

time ||= Time.now

if ready_pipes
ready_pipes.each do |r|
begin
r.read_nonblock(1024, @read_buffer)
rescue Errno::EAGAIN, Errno::EINTR
next
rescue #EOFError
m = @rpipes.delete(r)
m.start_immediate_stop!
r.close rescue nil
next
end
unless ServerEngine.windows?
if @rpipes.empty?
sleep blocking_timeout if blocking_timeout > 0
return nil
end

if m = @rpipes[r]
m.last_heartbeat_time = time
ready_pipes, _, _ = IO.select(@rpipes.keys, nil, nil, blocking_timeout)

if ready_pipes
ready_pipes.each do |r|
begin
r.read_nonblock(1024, @read_buffer)
rescue Errno::EAGAIN, Errno::EINTR
next
rescue #EOFError
m = @rpipes.delete(r)
m.start_immediate_stop!
r.close rescue nil
next
end

if m = @rpipes[r]
m.last_heartbeat_time = time
end
end
end
end
Expand Down Expand Up @@ -389,7 +400,11 @@ def tick(now=Time.now)
end

begin
Process.kill(signal, pid)
if ServerEngine.windows? && (signal == :KILL || signal == :SIGKILL)
system("taskkill /f /pid #{pid}")
else
Process.kill(signal, pid)
end
rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM
# assume that any errors mean the child process is dead
@pid = nil
Expand Down
14 changes: 9 additions & 5 deletions lib/serverengine/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ def install_signal_handlers
s = self
SignalThread.new do |st|
st.trap(Daemon::Signals::GRACEFUL_STOP) { s.stop(true) }
st.trap(Daemon::Signals::IMMEDIATE_STOP) { s.stop(false) }
st.trap(Daemon::Signals::GRACEFUL_RESTART) { s.restart(true) }
st.trap(Daemon::Signals::IMMEDIATE_RESTART) { s.restart(false) }
st.trap(Daemon::Signals::RELOAD) { s.reload }
st.trap(Daemon::Signals::DETACH) { s.stop(true) }
st.trap(Daemon::Signals::DUMP) { Sigdump.dump }
# Here disables signals excepting GRACEFUL_STOP == :SIGTERM because
# only SIGTERM is available on all version of Windows.
unless ServerEngine.windows?
st.trap(Daemon::Signals::IMMEDIATE_STOP) { s.stop(false) }
st.trap(Daemon::Signals::GRACEFUL_RESTART) { s.restart(true) }
st.trap(Daemon::Signals::IMMEDIATE_RESTART) { s.restart(false) }
st.trap(Daemon::Signals::RELOAD) { s.reload }
st.trap(Daemon::Signals::DUMP) { Sigdump.dump }
end
end
end

Expand Down
158 changes: 158 additions & 0 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#
# ServerEngine
#
# Copyright (C) 2012-2013 Sadayuki Furuhashi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module ServerEngine
module SocketManager

require 'socket'
require 'ipaddr'

class Client
def initialize(path)
@path = path
end

def listen_tcp(bind, port)
peer = connect_peer(@path)
begin
SocketManager.send_peer(peer, [Process.pid, :listen_tcp, bind, port])
res = SocketManager.recv_peer(peer)
if res.is_a?(Exception)
raise res
else
return recv_tcp(peer, res)
end
ensure
peer.close
end
end

def listen_udp(bind, port)
peer = connect_peer(@path)
begin
SocketManager.send_peer(peer, [Process.pid, :listen_udp, bind, port])
res = SocketManager.recv_peer(peer)
if res.is_a?(Exception)
raise res
else
return recv_udp(peer, res)
end
ensure
peer.close
end
end
end

class Server
def self.open(path)
new(path)
end

def initialize(path)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
end

attr_reader :path

def new_client
Client.new(@path)
end

def close
stop_server
nil
end

private

def listen_tcp(bind, port)
key, bind_ip = resolve_bind_key(bind, port)

@mutex.synchronize do
if @tcp_sockets.has_key?(key)
return @tcp_sockets[key]
else
return @tcp_sockets[key] = listen_tcp_new(bind_ip, port)
end
end
end

def listen_udp(bind, port)
key, bind_ip = resolve_bind_key(bind, port)

@mutex.synchronize do
if @udp_sockets.has_key?(key)
return @udp_sockets[key]
else
return @udp_sockets[key] = listen_udp_new(bind_ip, port)
end
end
end

def resolve_bind_key(bind, port)
bind_ip = IPAddr.new(IPSocket.getaddress(bind))
if bind_ip.ipv6?
return "[#{bind_ip}]:#{port}", bind_ip
else
# assuming ipv4
return "#{bind_ip}:#{port}", bind_ip
end
end

def process_peer(peer)
while true
pid, method, bind, port = *SocketManager.recv_peer(peer)
begin
send_socket(peer, pid, method, bind, port)
rescue => e
SocketManager.send_peer(peer, e)
end
end
ensure
peer.close
end
end

def self.send_peer(peer, obj)
data = Marshal.dump(obj)
peer.write [data.bytesize].pack('N')
peer.write data
end

def self.recv_peer(peer)
len = peer.read(4).unpack('N').first
data = peer.read(len)
Marshal.load(data)
end

require_relative 'utils'

if ServerEngine.windows?
require_relative 'socket_manager_win'
Client.include(SocketManagerWin::ClientModule)
Server.include(SocketManagerWin::ServerModule)
else
require_relative 'socket_manager_unix'
Client.include(SocketManagerUnix::ClientModule)
Server.include(SocketManagerUnix::ServerModule)
end

end
end
Loading