Skip to content

Commit

Permalink
remove cluster.rpc and cluster.msg
Browse files Browse the repository at this point in the history
add cluster.lua for more flexible rpc
  • Loading branch information
findstr committed May 23, 2024
1 parent aa1be00 commit 68f3a94
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 544 deletions.
312 changes: 312 additions & 0 deletions lualib/core/cluster.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
local core = require "core"
local mutex = require "core.sync.mutex"
local dns = require "core.dns"
local logger = require "core.logger"
local np = require "core.netpacket"
local type = type
local ipairs = ipairs
local pairs = pairs
local assert = assert
local remove = table.remove
local tcp_connect = core.tcp_connect
local tcp_send = core.tcp_send
local tcp_close = core.socket_close
local tcp_listen = core.tcp_listen
local pcall = core.pcall
local timeout = core.timeout
local timercancel = core.timercancel
local RPC_ACK<const> = 0x80000000
local setmetatable = setmetatable
local NIL = {}

local mt = {
__gc = function(self)
local fdaddr = self.__fdaddr
for k, _ in pairs(fdaddr) do
if type(k) == "number" then
tcp_close(k)
end
fdaddr[k] = nil
end
end,
__index = function(self, msg)
local proto = self.__proto
local waitfor = self.__waitfor
local fdaddr = self.__fdaddr
local notify = msg:find("_n$")
local cmd = proto:tag(msg)
local fn
if notify then
fn = function(fd, body)
if not fdaddr[fd] then
return nil, "closed"
end
local session = core.genid()
local traceid = core.tracepropagate()
local dat, sz = proto:encode(cmd, body, true)
dat, sz = proto:pack(dat, sz, true)
local ok = tcp_send(fd, np.rpcpack(dat, sz, cmd, session, traceid))
if not ok then
return nil, "send fail"
end
return ok, nil
end
else
fn = function(fd, body)
if not fdaddr[fd] then
return nil, "closed"
end
local session = core.genid()
local traceid = core.tracepropagate()
local dat, sz = proto:encode(cmd, body, true)
dat, sz = proto:pack(dat, sz, true)
local ok = tcp_send(fd, np.rpcpack(dat, sz, cmd, session, traceid))
if not ok then
return nil, "send fail"
end
return waitfor(session)
end
end
self[msg] = fn
return fn
end
}

local function connect_wrapper(self)
local lock = self.__lock
local fdaddr = self.__fdaddr
local connecting = self.__connecting
return function(addr)
local fd = fdaddr[addr]
if fd then
return fd, "connected"
end
connecting[addr] = true
local l<close> = lock:lock(addr)
while true do
local newaddr = addr
local name, port = addr:match("([^:]+):(%d+)")
if dns.isname(name) then
local ip = dns.lookup(name)
if ip then
newaddr = ip .. ":" .. port
else
newaddr = nil
logger.error("[rpc.client] dns lookup fail", name)
end
end
if newaddr then
local fd, errno = tcp_connect(newaddr, self.__event)
if fd then
if connecting[addr] then
connecting[addr] = nil
fdaddr[addr] = fd
fdaddr[fd] = addr
return fd, "ok"
else --already close
tcp_close(fd)
return nil, "active closed"
end
else
logger.error("[rpc.client] connect fail", addr, errno)
end
end
core.sleep(1000)
logger.info("[rpc.client] retry connect:", addr)
end
end
end

local function listen_wrapper(self)
return function(addr, backlog)
local fd, errno = tcp_listen(addr, self.__event, backlog)
if not fd then
return fd, errno
end
self.__fdaddr[addr] = fd
self.__fdaddr[fd] = addr
return fd, nil
end
end

local function close_wrapper(self)
return function(addr)
local connecting = self.__connecting
if connecting[addr] then
connecting[addr] = nil
return
end
local fdaddr = self.__fdaddr
local fd = fdaddr[addr]
if not fd then
return "closed"
end
if type(addr) == "string" then
addr, fd = fd, addr
end
fdaddr[fd] = nil
fdaddr[addr] = nil
core.socket_close(fd)
end
end

local function nop() end

local function init_event(self, conf)
local waitpool = self.__waitpool
local ackcmd = self.__ackcmd
local fdaddr = self.__fdaddr
local connect = self.connect
local proto = assert(conf.proto, "proto")
local close = assert(conf.close, "close")
local call = assert(conf.call, "call")
local accept = conf.accept or nop
local queue = np.create()
local EVENT = {}
function EVENT.accept(fd, _, addr)
fdaddr[fd] = addr
fdaddr[addr] = fd
local ok, err = pcall(accept, fd, addr)
if not ok then
logger.error("[rpc.server] EVENT.accept", err)
np.clear(queue, fd)
core.socket_close(fd)
end
end

function EVENT.close(fd, errno)
local addr = fdaddr[fd]
fdaddr[fd] = nil
fdaddr[addr] = nil
local ok, err = pcall(close, fd, errno)
if not ok then
logger.error("[rpc.server] EVENT.close", err)
end
np.clear(queue, fd)
connect(addr)
end

function EVENT.data()
local fd, buf, size, cmd, session, traceid = np.rpcpop(queue)
if not fd then
return
end
local otrace = core.trace(traceid)
core.fork(EVENT.data)
while true do
local dat
--pars
if cmd < RPC_ACK then
dat, size = proto:unpack(buf, size, true)
np.drop(buf)
local body = proto:decode(cmd, dat, size)
if not body then
logger.error("[rpc.server] decode fail",
session, cmd)
break
end
local ok, ret, res = pcall(call, body, cmd, fd)
if not ok then
logger.error("[rpc.server] call error", ret)
break
end
if not ret then
break
end
--ack
res = res or NIL
if type(ret) == "string" then
ret = proto:tag(ret)
end
local bodydat, sz = proto:encode(ret, res, true)
bodydat, sz = proto:pack(bodydat, sz, true)
ret = ret | RPC_ACK
tcp_send(fd, np.rpcpack(bodydat, sz, ret, session, traceid))
else
cmd = cmd - RPC_ACK
dat, size = proto:unpack(buf, size, true)
np.drop(buf)
local body = proto:decode(cmd, dat, size)
if not body then
logger.error("[rpc.server] decode fail",
session, cmd)
break
end
local co = waitpool[session]
if not co then --timeout
logger.warn("[rpc.client] late session",
session, cmd)
break
end
waitpool[session] = nil
ackcmd[co] = cmd
core.wakeup(co, body)
end
--next
fd, buf, size, cmd, session, traceid = np.rpcpop(queue)
if not fd then
return
end
core.trace(traceid)
end
core.trace(otrace)
end
return function(type, fd, message, ...)
np.message(queue, message)
assert(EVENT[type])(fd, ...)
end
end


local function waitfor_wrapper(self, expire)
expire = expire or 5000
local waitpool = self.__waitpool
local ackcmd = self.__ackcmd
local timer_func = function(session)
local co = waitpool[session]
if not co then
logger.error("[rpc.client] timer error session:", session)
return
end
waitpool[session] = nil
ackcmd[co] = "timeout"
core.wakeup(co)
end
return function(session)
local co = core.running()
local timer_id = timeout(expire, timer_func, session)
waitpool[session] = co
local body = core.wait()
if body then
timercancel(timer_id)
end
local cmd = ackcmd[co]
ackcmd[co] = nil
return body, cmd
end
end

local M = {}
function M.new(conf)
local obj = {
__lock = mutex:new(),
__proto = conf.proto,
__fdaddr = {},
__waitpool = {},
__ackcmd = {},
__waitfor = nil,
__event = nil,
__connecting = {},
connect = nil,
listen = nil,
close = nil,
}
obj.connect = connect_wrapper(obj)
obj.listen = listen_wrapper(obj)
obj.close = close_wrapper(obj)
obj.__event = init_event(obj, conf)
obj.__waitfor = waitfor_wrapper(obj, conf.timeout)
return setmetatable(obj, mt)
end

return M
Loading

0 comments on commit 68f3a94

Please sign in to comment.