From f9d8c82eb372ee7b27a12e65880a48394ff6b58c Mon Sep 17 00:00:00 2001 From: findstr Date: Thu, 23 May 2024 20:31:33 +0800 Subject: [PATCH] remove cluster.rpc and cluster.msg add cluster.lua for more flexible rpc --- examples/rpc.lua | 61 +++++-- lualib/core/cluster.lua | 316 +++++++++++++++++++++++++++++++++ lualib/core/cluster/msg.lua | 187 -------------------- lualib/core/cluster/rpc.lua | 336 ------------------------------------ lualib/core/sync/mutex.lua | 2 +- test/testrpc.lua | 100 ++++++++--- 6 files changed, 435 insertions(+), 567 deletions(-) create mode 100644 lualib/core/cluster.lua delete mode 100644 lualib/core/cluster/msg.lua delete mode 100644 lualib/core/cluster/rpc.lua diff --git a/examples/rpc.lua b/examples/rpc.lua index 3fc7f172..0c71957c 100644 --- a/examples/rpc.lua +++ b/examples/rpc.lua @@ -1,6 +1,6 @@ local core = require "core" local crypto = require "core.crypto" -local rpc = require "cluster.rpc" +local cluster = require "core.cluster" local zproto = require "zproto" local proto = zproto:parse [[ @@ -12,10 +12,33 @@ pong 0x2 { } ]] +assert(proto) +local function unmarshal(cmd, buf, size) + local dat, size = proto:unpack(buf, size, true) + local body = proto:decode(cmd, dat, size) + return body +end -local server = rpc.listen { - addr = "127.0.0.1:9999", - proto = proto, +local function marshal(cmd, body) + if type(cmd) == "string" then + cmd = proto:tag(cmd) + end + local dat, size = proto:encode(cmd, body, true) + local buf, size = proto:pack(dat, size, true) + return cmd, buf, size +end + +local function call(msg, fd) + print("callee", msg.txt, fd) + return "pong", msg +end + +local router = setmetatable({}, {__index = function(t, k) return call end}) + +local server = cluster.new { + marshal = marshal, + unmarshal = unmarshal, + router = router, accept = function(fd, addr) print("accept", fd, addr) end, @@ -23,28 +46,28 @@ local server = rpc.listen { close = function(fd, errno) print("close", fd, errno) end, - - call = function(msg, cmd, fd) - print("callee", msg.txt, cmd, fd) - return "pong", msg - end } +server.listen("127.0.0.1:9999") + +local client = cluster.new { + marshal = marshal, + unmarshal = unmarshal, + router = router, + close = function(fd, errno) + print("close", fd, errno) + end, +} core.start(function() for i = 1, 3 do core.fork(function() - local conn = rpc.connect { - addr = "127.0.0.1:9999", - proto = proto, - timeout = 5000, - close = function(fd, errno) - end, - } - while true do + local fd, err = client.connect("127.0.0.1:9999") + print("connect", fd, err) + for j = 1, 10000 do local txt = crypto.randomkey(5) - local ack, cmd = conn:call("ping", {txt = txt}) - print("caller", conn, txt, ack.txt) + local ack, cmd = client.ping(fd, {txt = txt}) + print("caller", fd, txt, ack.txt) assert(ack.txt == txt) assert(cmd == proto:tag("pong")) core.sleep(1000) diff --git a/lualib/core/cluster.lua b/lualib/core/cluster.lua new file mode 100644 index 00000000..8aa24219 --- /dev/null +++ b/lualib/core/cluster.lua @@ -0,0 +1,316 @@ +local core = require "core" +local mutex = require "core.sync.mutex" +local dns = require "core.dns" +local logger = require "core.logger" +local np = require "core.netpacket" +local type = type +local rawget = rawget +local ipairs = ipairs +local pairs = pairs +local assert = assert +local remove = table.remove +local tcp_connect = core.tcp_connect +local tcp_send = core.tcp_send +local tcp_close = core.socket_close +local tcp_listen = core.tcp_listen +local pcall = core.pcall +local timeout = core.timeout +local timercancel = core.timercancel +local RPC_ACK = 0x80000000 +local RPC_MASK = RPC_ACK - 1 +local setmetatable = setmetatable +local NIL = {} + +local mt = { + __gc = function(self) + local fdaddr = self.__fdaddr + for k, _ in pairs(fdaddr) do + if type(k) == "number" then + tcp_close(k) + end + fdaddr[k] = nil + end + end, + __index = function(self, msg) + local waitfor = self.__waitfor + local fdaddr = self.__fdaddr + local marshal = self.__marshal + local issend = rawget(self, "__issend") + local fn + if issend and issend(msg) then + fn = function(fd, body) + if not fdaddr[fd] then + return nil, "closed" + end + local session = core.genid() + local traceid = core.tracepropagate() + local cmd, dat, sz = marshal(msg, body) + local ok = tcp_send(fd, np.rpcpack(dat, sz, cmd, session, traceid)) + if not ok then + return nil, "send fail" + end + return ok, nil + end + else + fn = function(fd, body) + if not fdaddr[fd] then + return nil, "closed" + end + local session = core.genid() + local traceid = core.tracepropagate() + local cmd, dat, sz = marshal(msg, body) + local ok = tcp_send(fd, np.rpcpack(dat, sz, cmd, session, traceid)) + if not ok then + return nil, "send fail" + end + return waitfor(session) + end + end + self[msg] = fn + return fn + end +} + +local function connect_wrapper(self) + local lock = self.__lock + local fdaddr = self.__fdaddr + local connecting = self.__connecting + return function(addr) + local fd = fdaddr[addr] + if fd then + return fd, "connected" + end + connecting[addr] = true + local l = lock:lock(addr) + local fd = fdaddr[addr] + if fd then + return fd, "connected" + end + while true do + local newaddr = addr + local name, port = addr:match("([^:]+):(%d+)") + if dns.isname(name) then + local ip = dns.lookup(name) + if ip then + newaddr = ip .. ":" .. port + else + newaddr = nil + logger.error("[rpc.client] dns lookup fail", name) + end + end + if newaddr then + local fd, errno = tcp_connect(newaddr, self.__event) + if fd then + if connecting[addr] then + connecting[addr] = nil + fdaddr[addr] = fd + fdaddr[fd] = addr + return fd, "ok" + else --already close + tcp_close(fd) + return nil, "active closed" + end + else + logger.error("[rpc.client] connect fail", addr, errno) + end + end + core.sleep(1000) + logger.info("[rpc.client] retry connect:", addr) + end + end +end + +local function listen_wrapper(self) + return function(addr, backlog) + local fd, errno = tcp_listen(addr, self.__event, backlog) + if not fd then + return fd, errno + end + self.__fdaddr[addr] = fd + self.__fdaddr[fd] = addr + return fd, nil + end +end + +local function close_wrapper(self) + return function(addr) + local connecting = self.__connecting + if connecting[addr] then + connecting[addr] = nil + return + end + local fdaddr = self.__fdaddr + local fd = fdaddr[addr] + if not fd then + return "closed" + end + if type(addr) == "string" then + addr, fd = fd, addr + end + fdaddr[fd] = nil + fdaddr[addr] = nil + core.socket_close(fd) + end +end + +local function nop() end + +local function init_event(self, conf) + local waitpool = self.__waitpool + local ackcmd = self.__ackcmd + local fdaddr = self.__fdaddr + local connect = self.connect + local router = assert(conf.router, "router") + local close = assert(conf.close, "close") + local marshal = assert(conf.marshal, "marshal") + local unmarshal = assert(conf.unmarshal, "unmarshal") + local accept = conf.accept or nop + local queue = np.create() + local EVENT = {} + function EVENT.accept(fd, _, addr) + fdaddr[fd] = addr + fdaddr[addr] = fd + local ok, err = pcall(accept, fd, addr) + if not ok then + logger.error("[rpc.server] EVENT.accept", err) + np.clear(queue, fd) + core.socket_close(fd) + end + end + + function EVENT.close(fd, errno) + local addr = fdaddr[fd] + fdaddr[fd] = nil + fdaddr[addr] = nil + local ok, err = pcall(close, fd, errno) + if not ok then + logger.error("[rpc.server] EVENT.close", err) + end + np.clear(queue, fd) + connect(addr) + end + + function EVENT.data() + local fd, buf, size, cmd, session, traceid = np.rpcpop(queue) + if not fd then + return + end + local otrace = core.trace(traceid) + core.fork(EVENT.data) + while true do + local dat + --pars + if cmd < RPC_ACK then + local body = unmarshal(cmd, buf, size) + np.drop(buf) + if not body then + logger.error("[rpc.server] decode fail", + session, cmd) + break + end + local fn = router[cmd] + if not fn then + logger.error("[rpc.server] no router", cmd) + break + end + local ok, ret, res = pcall(fn, body, fd) + if not ok then + logger.error("[rpc.server] call error", ret) + break + end + if not ret then + break + end + --ack + res = res or NIL + local ret, buf, sz = marshal(ret, res) + ret = ret | RPC_ACK + tcp_send(fd, np.rpcpack(buf, sz, ret, session, traceid)) + else + cmd = cmd & RPC_MASK + local body = unmarshal(cmd, buf, size) + np.drop(buf) + if not body then + logger.error("[rpc.server] decode fail", + session, cmd) + break + end + local co = waitpool[session] + if not co then --timeout + logger.warn("[rpc.client] late session", + session, cmd) + break + end + waitpool[session] = nil + ackcmd[co] = cmd + core.wakeup(co, body) + end + --next + fd, buf, size, cmd, session, traceid = np.rpcpop(queue) + if not fd then + return + end + core.trace(traceid) + end + core.trace(otrace) + end + return function(type, fd, message, ...) + np.message(queue, message) + assert(EVENT[type])(fd, ...) + end +end + + +local function waitfor_wrapper(self, expire) + expire = expire or 5000 + local waitpool = self.__waitpool + local ackcmd = self.__ackcmd + local timer_func = function(session) + local co = waitpool[session] + if not co then + logger.error("[rpc.client] timer error session:", session) + return + end + waitpool[session] = nil + ackcmd[co] = "timeout" + core.wakeup(co) + end + return function(session) + local co = core.running() + local timer_id = timeout(expire, timer_func, session) + waitpool[session] = co + local body = core.wait() + if body then + timercancel(timer_id) + end + local cmd = ackcmd[co] + ackcmd[co] = nil + return body, cmd + end +end + +local M = {} +function M.new(conf) + local obj = { + __lock = mutex:new(), + __fdaddr = {}, + __waitpool = {}, + __ackcmd = {}, + __waitfor = nil, + __event = nil, + __connecting = {}, + __issend = conf.issend, + __marshal = assert(conf.marshal, "marshal"), + connect = nil, + listen = nil, + close = nil, + } + obj.connect = connect_wrapper(obj) + obj.listen = listen_wrapper(obj) + obj.close = close_wrapper(obj) + obj.__event = init_event(obj, conf) + obj.__waitfor = waitfor_wrapper(obj, conf.timeout) + return setmetatable(obj, mt) +end + +return M diff --git a/lualib/core/cluster/msg.lua b/lualib/core/cluster/msg.lua deleted file mode 100644 index 1ce067d2..00000000 --- a/lualib/core/cluster/msg.lua +++ /dev/null @@ -1,187 +0,0 @@ -local core = require "core" -local logger = require "core.logger" -local np = require "core.netpacket" -local pairs = pairs -local assert = assert -local type = type -local msg = {} -local msgserver = {} -local msgclient = {} -local queue = np.create() - -local function gc(obj) - if not obj.fd then - return - end - if obj.fd < 0 then - return - end - core.socket_close(obj.fd) - obj.fd = false -end - -local servermt = {__index = msgserver, __gc = gc} -local clientmt = {__index = msgclient, __gc = gc} - -local function event_callback(proto, accept_cb, close_cb, data_cb) - local EVENT = {} - function EVENT.accept(fd, portid, addr) - local ok, err = core.pcall(accept_cb, fd, addr) - if not ok then - logger.error("[msg] EVENT.accept", err) - core.socket_close(fd) - end - end - function EVENT.close(fd, errno) - local ok, err = core.pcall(close_cb, fd, errno) - if not ok then - logger.error("[msg] EVENT.close", err) - end - end - function EVENT.data() - local f, d, sz, cmd = np.msgpop(queue) - if not f then - return - end - core.fork(EVENT.data) - while true do - --parse - local dat, size = proto:unpack(d, sz, true) - np.drop(d) - local obj = proto:decode(cmd, dat, size) - local ok, err = core.pcall(data_cb, f, cmd, obj) - if not ok then - logger.error("[msg] dispatch socket", err) - end - f, d, sz, cmd = np.msgpop(queue) - if not f then - return - end - end - end - return function (type, fd, message, ...) - np.message(queue, message) - assert(EVENT[type])(fd, ...) - end -end - ----server -local function sendmsg(self, fd, cmd, data) - local proto = self.proto - if type(cmd) == "string" then - cmd = proto:tag(cmd) - end - local dat, sz = proto:encode(cmd, data, true) - dat, sz= proto:pack(dat, sz, true) - return core.tcp_send(fd, np.msgpack(dat, sz, cmd)) -end -msgserver.send = sendmsg -msgserver.sendbin = function(self, fd, cmd, bin) - return core.tcp_send(fd, np.msgpack(bin, cmd)) -end -msgserver.multipack = function(self, cmd, dat, n) - local proto = self.proto - if type(cmd) == "string" then - cmd = proto:tag(cmd) - end - local dat, sz = proto:encode(cmd, dat, true) - dat, sz = proto:pack(dat, sz, true) - dat, sz = np.msgpack(dat, sz, cmd) - dat, sz = core.multipack(dat, sz, n) - return dat, sz -end - -msgserver.multicast = function(self, fd, data, sz) - return core.tcp_multicast(fd, data, sz) -end - -function msgserver.stop(self) - gc(self) -end - -function msgserver.close(self, fd) - core.socket_close(fd) -end - ------client -local function wakeupall(self) - local q = self.connectqueue - for k, v in pairs(q) do - core.wakeup(v) - q[k] = nil - end -end - -local function checkconnect(self) - if self.fd and self.fd >= 0 then - return self.fd - end - if not self.fd then --disconnected - self.fd = -1 - local fd = core.tcp_connect(self.addr, self.callback) - if not fd then - self.fd = false - else - self.fd = fd - end - wakeupall(self) - return self.fd - else - local co = core.running() - local t = self.connectqueue - t[#t + 1] = co - core.wait() - return self.fd and self.fd > 0 - end -end - -function msgclient.close(self) - gc(self) -end - -function msgclient.send(self, cmd, data) - local fd = checkconnect(self) - if not fd then - return false - end - return sendmsg(self, fd, cmd, data) -end - -function msg.connect(conf) - local obj = { - fd = false, - callback = false, - addr = conf.addr, - proto = conf.proto, - connectqueue = {}, - } - local close_cb = assert(conf.close, "clientcb close") - local data_cb = assert(conf.data, "clientcb data") - obj.callback = event_callback(conf.proto, nil, close_cb, data_cb) - setmetatable(obj, clientmt) - checkconnect(obj) - return obj -end - -function msg.listen(conf) - local obj = { - fd = false, - callback = false, - addr = conf.addr, - proto = conf.proto, - } - local accept_cb = assert(conf.accept, "servercb accept") - local close_cb = assert(conf.close, "servercb close") - local data_cb = assert(conf.data, "servercb data") - obj.callback = event_callback(conf.proto, accept_cb, close_cb, data_cb) - setmetatable(obj, servermt) - local fd, errno = core.tcp_listen(obj.addr, obj.callback, obj.backlog) - if not fd then - return nil, errno - end - return obj -end - - -return msg - diff --git a/lualib/core/cluster/rpc.lua b/lualib/core/cluster/rpc.lua deleted file mode 100644 index afa36bef..00000000 --- a/lualib/core/cluster/rpc.lua +++ /dev/null @@ -1,336 +0,0 @@ -local core = require "core" -local logger = require "core.logger" -local np = require "core.netpacket" -local zproto = require "zproto" -local type = type -local pairs = pairs -local assert = assert -local pack = string.pack -local unpack = string.unpack -local tcp_send = core.tcp_send -local queue = np.create() -local NIL = {} -local rpc = {} - -local function gc(obj) - if not obj.fd then - return - end - local fd = obj.fd - obj.fd = false - if fd < 0 then - return - end - core.socket_close(fd) -end - ------------server -local server = {} -local servermt = {__index = server} - -local function server_listen(self) - local EVENT = {} - local accept = assert(self.accept, "accept") - local close = assert(self.close, "close") - local call = assert(self.call, "call") - local proto = self.proto - function EVENT.accept(fd, portid, addr) - local ok, err = core.pcall(accept, fd, addr) - if not ok then - logger.error("[rpc.server] EVENT.accept", err) - np.clear(queue, fd) - core.socket_close(fd) - end - end - - function EVENT.close(fd, errno) - local ok, err = core.pcall(close, fd, errno) - if not ok then - logger.error("[rpc.server] EVENT.close", err) - end - np.clear(queue, fd) - end - - function EVENT.data() - local fd, buf, size, cmd, session, traceid = np.rpcpop(queue) - if not fd then - return - end - local otrace = core.trace(traceid) - core.fork(EVENT.data) - while true do - local dat - --parse - dat, size = proto:unpack(buf, size, true) - np.drop(buf) - local body = proto:decode(cmd, dat, size) - if not body then - logger.error("[rpc.server] decode fail", - session, cmd) - return - end - local ok, ret, res = core.pcall(call, body, cmd, fd) - if not ok then - logger.error("[rpc.server] call error", ret) - return - end - if not ret then - return - end - --ack - res = res or NIL - if type(ret) == "string" then - ret = proto:tag(ret) - end - local bodydat, sz = proto:encode(ret, res, true) - bodydat, sz = proto:pack(bodydat, sz, true) - tcp_send(fd, np.rpcpack(bodydat, sz, ret, session, traceid)) - --next - fd, buf, size, cmd, session, traceid = np.rpcpop(queue) - if not fd then - return - end - core.trace(traceid) - end - core.trace(otrace) - end - local callback = function(type, fd, message, ...) - np.message(queue, message) - assert(EVENT[type])(fd, ...) - end - local fd, errno = core.tcp_listen(self.addr, callback, self.backlog) - self.fd = fd - return fd, errno -end - -function server.close(self) - gc(self) -end - --------client -local client = {} -local clientmt = {__index = client, __gc = gc} - -local function wakeup_all_calling(self) - local waitpool = self.waitpool - local ackcmd = self.ackcmd - for session, co in pairs(waitpool) do - waitpool[session] = nil - logger.info("[rpc.client] wakeupall session", session) - ackcmd[co] = "closed" - core.wakeup(co) - end -end - -local function wakeup_all_connect(self) - local q = self.connectqueue - for k, v in pairs(q) do - core.wakeup(v) - q[k] = nil - end -end - -local function doconnect(self) - local EVENT = {} - local addr = self.__addr - local close = self.__close - local proto = self.__proto - local ackcmd = self.ackcmd - local waitpool = self.waitpool - function EVENT.close(fd, errno) - if close then - local ok, err = core.pcall(close, fd, errno) - if not ok then - logger.info("[rpc.client] EVENT.close", err) - end - end - self.fd = nil - np.clear(queue, fd) - end - - function EVENT.data() - local fd, d, sz, cmd, session, _ = np.rpcpop(queue) - if not fd then - return - end - core.fork(EVENT.data) - while true do - local str - str, sz = proto:unpack(d, sz, true) - np.drop(d) - local body = proto:decode(cmd, str, sz) - if not body then - logger.error("[rpc.client] decode fail", - session, cmd) - return - end - --ack - local co = waitpool[session] - if not co then --timeout - logger.warn("[rpc.client] late session", - session, cmd) - return - end - waitpool[session] = nil - ackcmd[co] = cmd - core.wakeup(co, body) - --next - fd, d, sz, cmd, session, _ = np.rpcpop(queue) - if not fd then - break - end - end - end - - local callback = function(type, fd, message, ...) - np.message(queue, message) - assert(EVENT[type])(fd, ...) - end - return core.tcp_connect(addr, callback) -end - ---return true/false -local function checkconnect(self) - if self.fd and self.fd >= 0 then - return self.fd - end - if self.closed then - return false - end - if not self.fd then --disconnected - self.fd = -1 - local fd = doconnect(self) - if self.closed then - if fd then - core.socket_close(fd) - fd = nil - end - end - if not fd then - logger.error("[rpc.client] connect", self.__addr, "fail") - self.fd = false - else - self.fd = fd - end - wakeup_all_connect(self) - return self.fd - else - local co = core.running() - local t = self.connectqueue - t[#t + 1] = co - core.wait() - return self.fd and self.fd > 0 - end -end - -local timeout = core.timeout -local timercancel = core.timercancel -local function waitfor(self, expire) - local waitpool = self.waitpool - local ackcmd = self.ackcmd - local timer_func = function(session) - if self.closed then - return - end - local co = waitpool[session] - if not co then - logger.error("[rpc.client] timer error session:", session) - return - end - waitpool[session] = nil - ackcmd[co] = "timeout" - core.wakeup(co) - end - return function(session) - local co = core.running() - local timer = timeout(expire, timer_func, session) - waitpool[session] = co - local body = core.wait() - if body then - timercancel(timer) - end - local cmd = ackcmd[co] - ackcmd[co] = nil - return body, cmd - end -end - -local function send_request(self, cmd, body) - local ok = checkconnect(self) - if not ok then - return false, "closed" - end - local proto = self.__proto - if type(cmd) == "string" then - cmd = proto:tag(cmd) - end - local session = core.genid() - local traceid = core.tracepropagate() - local body, sz = proto:encode(cmd, body, true) - body, sz = proto:pack(body, sz, true) - local ok = tcp_send(self.fd, np.rpcpack(body, sz, cmd, session, traceid)) - if not ok then - return false, "send fail" - end - return true, session -end - -client.send = send_request - -function client.call(self, cmd, body) - local ok, session = send_request(self, cmd, body) - if not ok then - return false, session - end - return self.waitfor(session) -end - -function client.close(self) - if self.closed then - return - end - gc(self) - self.closed = true - wakeup_all_connect(self) - wakeup_all_calling(self) -end - ------rpc -function rpc.connect(config) - local totalwheel = math.floor((config.timeout + 999) / 1000) - local obj = { - fd = false, --false disconnected, -1 conncting, >=0 conncted - closed = false, - connectqueue = {}, - waitpool = {}, - ackcmd = {}, - waitfor = nil, - __addr = config.addr, - __proto = config.proto, - __close = config.close, - } - obj.waitfor = waitfor(obj, config.timeout) - setmetatable(obj, clientmt) - checkconnect(obj) - return obj -end - -function rpc.listen(config) - local obj = { - addr = config.addr, - backlog = config.backlog, - proto = config.proto, - accept = config.accept, - close = config.close, - call = config.call, - } - setmetatable(obj, servermt) - local ok, errno = server_listen(obj) - if not ok then - return nil, errno - end - return obj -end - -return rpc - diff --git a/lualib/core/sync/mutex.lua b/lualib/core/sync/mutex.lua index de9b49bc..2c01f668 100644 --- a/lualib/core/sync/mutex.lua +++ b/lualib/core/sync/mutex.lua @@ -40,7 +40,7 @@ local proxymt = { end } -function M.new() +function M:new() return setmetatable({ lockobj = {}, }, mt) diff --git a/test/testrpc.lua b/test/testrpc.lua index 15ef6332..c84a90aa 100644 --- a/test/testrpc.lua +++ b/test/testrpc.lua @@ -1,6 +1,6 @@ local core = require "core" local waitgroup = require "core.sync.waitgroup" -local rpc = require "core.cluster.rpc" +local cluster = require "core.cluster" local crypto = require "core.crypto" local testaux = require "test.testaux" local zproto = require "zproto" @@ -16,6 +16,8 @@ bar 0xfe { } ]] +assert(logic) + local function case_one(msg, cmd, fd) if cmd == 0xff then cmd = 0xfe @@ -33,23 +35,61 @@ end local function case_three(msg, cmd, fd) end +local function issend(msg) + return msg:find("_n$") +end + +local function unmarshal(cmd, buf, size) + local dat, size = logic:unpack(buf, size, true) + local body = logic:decode(cmd, dat, size) + return body +end + +local function marshal(cmd, body) + if type(cmd) == "string" then + cmd = logic:tag(cmd) + end + local dat, size = logic:encode(cmd, body, true) + local buf, size = logic:pack(dat, size, true) + return cmd, buf, size +end + + local case = case_one +local accept_fd +local accept_addr +local router = setmetatable({}, {__index = function(t, k) + local fn = function(msg, fd) + return case(msg, k, fd) + end + t[k] = fn + return fn +end}) -local server = rpc.listen { - addr = ":8989", - proto = logic, +local server = cluster.new { + issend = issend, + marshal = marshal, + unmarshal = unmarshal, + router = router, accept = function(fd, addr) + accept_fd = fd + accept_addr = addr end, close = function(fd, errno) end, - - call = function(msg, cmd, fd) - return case(msg, cmd, fd) - end } -local client +server.listen(":8989") +local client_fd +local client = cluster.new { + issend = issend, + marshal = marshal, + unmarshal = unmarshal, + router = router, + close = function(fd, errno) + end, +} local function request(fd, index, count, cmd) return function() @@ -59,9 +99,9 @@ local function request(fd, index, count, cmd) age = index, rand = crypto.randomkey(8), } - local body, ack = client:call(cmd, test) + local body, ack = client[cmd](fd, test) testaux.assertneq(body, nil, "rpc timeout") - testaux.asserteq(test.rand, body.rand, "rpc match request/response") + testaux.asserteq(test.rand, body and body.rand, "rpc match request/response") end end end @@ -74,7 +114,7 @@ local function timeout(fd, index, count, cmd) age = index, rand = crypto.randomkey(8), } - local body, ack = client:call(cmd, test) + local body, ack = client[cmd](fd, test) testaux.asserteq(body, nil, "rpc timeout, body is nil") testaux.asserteq(ack, "timeout", "rpc timeout, ack is timeout") end @@ -84,13 +124,9 @@ end local function client_part() - client = rpc.connect { - addr = "127.0.0.1:8989", - proto = logic, - timeout = 1000, - close = function(fd, errno) - end, - } + local err + client_fd, err = client.connect("127.0.0.1:8989") + print("connect", client_fd, err) local wg = waitgroup:create() case = case_one for i = 1, 2 do @@ -100,26 +136,42 @@ local function client_part() else cmd = "bar" end - wg:fork(request(client, i, 5, cmd)) + wg:fork(request(client_fd, i, 5, cmd)) end wg:wait() print("case one finish") case = case_two for i = 1, 20 do - wg:fork(request(client, i, 50, "foo")) + wg:fork(request(client_fd, i, 50, "foo")) core.sleep(100) end wg:wait() print("case two finish") case = case_three for i = 1, 20 do - wg:fork(timeout(client, i, 2, "foo")) + wg:fork(timeout(client_fd, i, 2, "foo")) core.sleep(10) end wg:wait() print("case three finish") end +local function server_part() + case = case_one + local req = { + name = "hello", + age = 1, + rand = crypto.randomkey(8), + } + local ack, _ = server.foo(accept_fd, req) + testaux.assertneq(ack, nil, "rpc timeout") + testaux.asserteq(req.rand, ack and ack.rand, "rpc match request/response") +end + client_part() -client:close() -server:close() \ No newline at end of file +server_part() +client.close("127.0.0.1:8989") +server.close(":8989") +server.close(accept_addr) +testaux.asserteq(next(client.__fdaddr), nil, "client fdaddr empty") +testaux.asserteq(next(server.__fdaddr), nil, "client fdaddr empty") \ No newline at end of file