diff --git a/examples/rpc.lua b/examples/rpc.lua index 0c71957..80bd14c 100644 --- a/examples/rpc.lua +++ b/examples/rpc.lua @@ -23,26 +23,28 @@ local function marshal(cmd, body) if type(cmd) == "string" then cmd = proto:tag(cmd) end + print("marshal", cmd, body) local dat, size = proto:encode(cmd, body, true) local buf, size = proto:pack(dat, size, true) return cmd, buf, size end -local function call(msg, fd) - print("callee", msg.txt, fd) - return "pong", msg -end - -local router = setmetatable({}, {__index = function(t, k) return call end}) +local callret = { + ["ping"] = "pong", + [0x01] = "pong", +} local server = cluster.new { marshal = marshal, unmarshal = unmarshal, - router = router, + callret = callret, accept = function(fd, addr) print("accept", fd, addr) end, - + call = function(msg, cmd, fd) + print("callee", msg.txt, fd) + return msg + end, close = function(fd, errno) print("close", fd, errno) end, @@ -53,7 +55,11 @@ server.listen("127.0.0.1:9999") local client = cluster.new { marshal = marshal, unmarshal = unmarshal, - router = router, + callret = callret, + call = function(msg, cmd, fd) + print("callee", msg.txt, fd) + return msg + end, close = function(fd, errno) print("close", fd, errno) end, @@ -67,9 +73,9 @@ core.start(function() for j = 1, 10000 do local txt = crypto.randomkey(5) local ack, cmd = client.ping(fd, {txt = txt}) - print("caller", fd, txt, ack.txt) + print("caller", fd, cmd, txt, ack.txt) assert(ack.txt == txt) - assert(cmd == proto:tag("pong")) + assert(cmd == "pong") core.sleep(1000) end end) diff --git a/lualib/core/cluster.lua b/lualib/core/cluster.lua index c926a38..03328cf 100644 --- a/lualib/core/cluster.lua +++ b/lualib/core/cluster.lua @@ -143,7 +143,7 @@ local function init_event(self, conf) local fdaddr = self.__fdaddr local callret = self.__callret local ctx = self.__ctx - local router = assert(conf.router, "router") + local call = assert(conf.call, "call") local close = assert(conf.close, "close") local marshal = assert(conf.marshal, "marshal") local unmarshal = assert(conf.unmarshal, "unmarshal") @@ -180,14 +180,7 @@ local function init_event(self, conf) core.fork(EVENT.data) while true do local dat - --pars - if cmd then - local fn = router[cmd] - if not fn then - np.drop(buf) - logger.error("[rpc.server] no router", cmd) - break - end + if cmd then --rpc request local body = unmarshal(cmd, buf, size) np.drop(buf) if not body then @@ -195,7 +188,7 @@ local function init_event(self, conf) session, cmd) break end - local ok, res = pcall(fn, body, fd) + local ok, res = pcall(call, body, cmd, fd) if not ok then logger.error("[rpc.server] call error", res) break @@ -208,7 +201,7 @@ local function init_event(self, conf) res = res or NIL local _, buf, sz = marshal(ackname, res) tcp_send(fd, np.ack(session, buf, sz)) - else + else -- rpc acknowledge local cmd = ackcmd[session] if not cmd then --timeout np.drop(buf) diff --git a/lualib/core/logger.lua b/lualib/core/logger.lua index 9225520..01e7697 100644 --- a/lualib/core/logger.lua +++ b/lualib/core/logger.lua @@ -2,7 +2,7 @@ local core = require "core" local env = require "core.env" local c = require "core.logger.c" -local function nop() +local function nop(...) end local logger = { @@ -54,3 +54,4 @@ core.signal("SIGUSR1", function(_) end) return logger + diff --git a/test/test.lua b/test/test.lua index a333b33..644ba76 100644 --- a/test/test.lua +++ b/test/test.lua @@ -14,7 +14,6 @@ local modules = { "testwakeup", "testwaitgroup", "testmutex", - "testmulticast", "testnetstream", "testnetpacket", "testchannel", diff --git a/test/testmulticast.lua b/test/testmulticast.lua deleted file mode 100644 index e2317ba..0000000 --- a/test/testmulticast.lua +++ /dev/null @@ -1,60 +0,0 @@ -local core = require "core" -local testaux = require "test.testaux" -local msg = require "core.cluster.msg" -local zproto = require "zproto" -local logic = zproto:parse [[ -test 0xff { - .str:string 1 -} -]] - - -local server -local accept = {} -local client = {} -local recv = {} - -server = msg.listen { - proto = logic, - addr = "127.0.0.1:8002", - accept = function(fd, addr) - accept[#accept + 1] = fd - --print("accept", addr) - end, - close = function(fd, errno) - --print("close", fd, errno) - end, - data = function(fd, cmd, obj) - local m, sz = server:multipack(cmd, obj, #accept) - for _, fd in pairs(accept) do - local ok = server:multicast(fd, m, sz) - testaux.assertneq(fd, nil, "multicast test send") - testaux.asserteq(ok, true, "multicast test send") - end - end -} -testaux.asserteq(not not server, true, "multicast test listen") - -local inst -for i = 1, 10 do - inst = msg.connect { - proto = logic, - addr = "127.0.0.1:8002", - data = function(fd, cmd, obj) - testaux.asserteq(obj.str, "testmulticast", "muticast validate data") - recv[i] = true - end, - close = function(fd, errno) - - end - } - client[i] = inst -end -inst:send("test", {str = "testmulticast"}) -core.sleep(1000) -for k, _ in pairs(client) do - testaux.asserteq(recv[k], true, "multicast recv count validate") -end -server:stop() - - diff --git a/test/testrpc.lua b/test/testrpc.lua index 2a92f6e..7ef9713 100644 --- a/test/testrpc.lua +++ b/test/testrpc.lua @@ -51,13 +51,6 @@ end local case = case_one local accept_fd local accept_addr -local router = setmetatable({}, {__index = function(t, k) - local fn = function(msg, fd) - return case(msg, k, fd) - end - t[k] = fn - return fn -end}) local callret = { ["foo"] = "bar", @@ -72,12 +65,13 @@ local server = cluster.new { marshal = marshal, unmarshal = unmarshal, callret = callret, - router = router, accept = function(fd, addr) accept_fd = fd accept_addr = addr end, - + call = function(msg, cmd, fd) + return case(msg, cmd, fd) + end, close = function(fd, errno) end, } @@ -89,7 +83,9 @@ local client = cluster.new { marshal = marshal, unmarshal = unmarshal, callret = callret, - router = router, + call = function(msg, cmd, fd) + return case(msg, cmd, fd) + end, close = function(fd, errno) end, }