From 1822206af82fc4e3a5368a346a6346d44a299fa5 Mon Sep 17 00:00:00 2001 From: spacewander Date: Wed, 30 Jun 2021 15:36:42 +0800 Subject: [PATCH] feat(stream): add limit-conn Signed-off-by: spacewander --- apisix/cli/ngx_tpl.lua | 1 + apisix/plugins/limit-conn.lua | 86 +------------ apisix/plugins/limit-conn/init.lua | 107 ++++++++++++++++ apisix/stream/plugins/limit-conn.lua | 59 +++++++++ conf/config-default.yaml | 1 + docs/en/latest/plugins/limit-conn.md | 2 + docs/zh/latest/plugins/limit-conn.md | 3 + t/APISIX.pm | 1 + t/plugin/limit-conn2.t | 2 +- t/stream-plugin/limit-conn.t | 183 +++++++++++++++++++++++++++ 10 files changed, 364 insertions(+), 81 deletions(-) create mode 100644 apisix/plugins/limit-conn/init.lua create mode 100644 apisix/stream/plugins/limit-conn.lua create mode 100644 t/stream-plugin/limit-conn.t diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua index 3e44b53d7bd7e..d09e2f51f77a0 100644 --- a/apisix/cli/ngx_tpl.lua +++ b/apisix/cli/ngx_tpl.lua @@ -67,6 +67,7 @@ stream { lua_socket_log_errors off; lua_shared_dict lrucache-lock-stream 10m; + lua_shared_dict stream-plugin-limit-conn 10m; resolver {% for _, dns_addr in ipairs(dns_resolver or {}) do %} {*dns_addr*} {% end %} {% if dns_resolver_valid then %} valid={*dns_resolver_valid*}{% end %}; resolver_timeout {*resolver_timeout*}; diff --git a/apisix/plugins/limit-conn.lua b/apisix/plugins/limit-conn.lua index 564a1b54c33bd..2f174c9ac3aea 100644 --- a/apisix/plugins/limit-conn.lua +++ b/apisix/plugins/limit-conn.lua @@ -14,16 +14,11 @@ -- See the License for the specific language governing permissions and -- limitations under the License. -- -local limit_conn_new = require("resty.limit.conn").new local core = require("apisix.core") -local sleep = core.sleep -local plugin_name = "limit-conn" - +local limit_conn = require("apisix.plugins.limit-conn.init") -local lrucache = core.lrucache.new({ - type = "plugin", -}) +local plugin_name = "limit-conn" local schema = { type = "object", properties = { @@ -41,7 +36,6 @@ local schema = { required = {"conn", "burst", "default_conn_delay", "key"} } - local _M = { version = 0.1, priority = 1003, @@ -49,87 +43,19 @@ local _M = { schema = schema, } -function _M.check_schema(conf) - local ok, err = core.schema.check(schema, conf) - if not ok then - return false, err - end - - return true -end -local function create_limit_obj(conf) - core.log.info("create new limit-conn plugin instance") - return limit_conn_new("plugin-limit-conn", conf.conn, conf.burst, - conf.default_conn_delay) +function _M.check_schema(conf) + return core.schema.check(schema, conf) end function _M.access(conf, ctx) - core.log.info("ver: ", ctx.conf_version) - local lim, err = lrucache(conf, nil, create_limit_obj, conf) - if not lim then - core.log.error("failed to instantiate a resty.limit.conn object: ", err) - return 500 - end - - local key = (ctx.var[conf.key] or "") .. ctx.conf_type .. ctx.conf_version - core.log.info("limit key: ", key) - - local delay, err = lim:incoming(key, true) - if not delay then - if err == "rejected" then - return conf.rejected_code - end - - core.log.error("failed to limit req: ", err) - return 500 - end - - if lim:is_committed() then - if not ctx.limit_conn then - ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6) - end - - core.table.insert_tail(ctx.limit_conn, lim, key, delay) - end - - if delay >= 0.001 then - sleep(delay) - end + return limit_conn.increase(conf, ctx) end function _M.log(conf, ctx) - local limit_conn = ctx.limit_conn - if not limit_conn then - return - end - - for i = 1, #limit_conn, 3 do - local lim = limit_conn[i] - local key = limit_conn[i + 1] - local delay = limit_conn[i + 2] - - local latency - if ctx.proxy_passed then - latency = ctx.var.upstream_response_time - else - latency = ctx.var.request_time - delay - end - - core.log.debug("request latency is ", latency) -- for test - - local conn, err = lim:leaving(key, latency) - if not conn then - core.log.error("failed to record the connection leaving request: ", - err) - break - end - end - - core.tablepool.release("plugin#limit-conn", limit_conn) - return + return limit_conn.decrease(conf, ctx) end diff --git a/apisix/plugins/limit-conn/init.lua b/apisix/plugins/limit-conn/init.lua new file mode 100644 index 0000000000000..38db8a270857b --- /dev/null +++ b/apisix/plugins/limit-conn/init.lua @@ -0,0 +1,107 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local limit_conn_new = require("resty.limit.conn").new +local core = require("apisix.core") +local sleep = core.sleep +local shdict_name = "plugin-limit-conn" +if ngx.config.subsystem == "stream" then + shdict_name = "stream-" .. shdict_name +end + + +local lrucache = core.lrucache.new({ + type = "plugin", +}) +local _M = {} + + +local function create_limit_obj(conf) + core.log.info("create new limit-conn plugin instance") + return limit_conn_new(shdict_name, conf.conn, conf.burst, + conf.default_conn_delay) +end + + +function _M.increase(conf, ctx) + core.log.info("ver: ", ctx.conf_version) + local lim, err = lrucache(conf, nil, create_limit_obj, conf) + if not lim then + core.log.error("failed to instantiate a resty.limit.conn object: ", err) + return 500 + end + + local key = (ctx.var[conf.key] or "") .. ctx.conf_type .. ctx.conf_version + core.log.info("limit key: ", key) + + local delay, err = lim:incoming(key, true) + if not delay then + if err == "rejected" then + return conf.rejected_code or 503 + end + + core.log.error("failed to limit req: ", err) + return 500 + end + + if lim:is_committed() then + if not ctx.limit_conn then + ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6) + end + + core.table.insert_tail(ctx.limit_conn, lim, key, delay) + end + + if delay >= 0.001 then + sleep(delay) + end +end + + +function _M.decrease(conf, ctx) + local limit_conn = ctx.limit_conn + if not limit_conn then + return + end + + for i = 1, #limit_conn, 3 do + local lim = limit_conn[i] + local key = limit_conn[i + 1] + local delay = limit_conn[i + 2] + + local latency + if ctx.proxy_passed then + latency = ctx.var.upstream_response_time + else + latency = ctx.var.request_time - delay + end + + core.log.debug("request latency is ", latency) -- for test + + local conn, err = lim:leaving(key, latency) + if not conn then + core.log.error("failed to record the connection leaving request: ", + err) + break + end + end + + core.tablepool.release("plugin#limit-conn", limit_conn) + return +end + + +return _M diff --git a/apisix/stream/plugins/limit-conn.lua b/apisix/stream/plugins/limit-conn.lua new file mode 100644 index 0000000000000..6f949c3d081cd --- /dev/null +++ b/apisix/stream/plugins/limit-conn.lua @@ -0,0 +1,59 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local limit_conn = require("apisix.plugins.limit-conn.init") + + +local plugin_name = "limit-conn" +local schema = { + type = "object", + properties = { + conn = {type = "integer", exclusiveMinimum = 0}, + burst = {type = "integer", minimum = 0}, + default_conn_delay = {type = "number", exclusiveMinimum = 0}, + key = { + type = "string", + enum = {"remote_addr", "server_addr"} + }, + }, + required = {"conn", "burst", "default_conn_delay", "key"} +} + +local _M = { + version = 0.1, + priority = 1003, + name = plugin_name, + schema = schema, +} + + +function _M.check_schema(conf) + return core.schema.check(schema, conf) +end + + +function _M.preread(conf, ctx) + return limit_conn.increase(conf, ctx) +end + + +function _M.log(conf, ctx) + return limit_conn.decrease(conf, ctx) +end + + +return _M diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 3065646d42025..1a0c936f1ecc5 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -293,6 +293,7 @@ plugins: # plugin list (sorted by priority) - ext-plugin-post-req # priority: -3000 stream_plugins: # sorted by priority + - limit-conn # priority: 1003 - mqtt-proxy # priority: 1000 # <- recommend to use priority (0, 100) for your custom plugins diff --git a/docs/en/latest/plugins/limit-conn.md b/docs/en/latest/plugins/limit-conn.md index ed04e058298e4..b402a268ab07e 100644 --- a/docs/en/latest/plugins/limit-conn.md +++ b/docs/en/latest/plugins/limit-conn.md @@ -45,6 +45,8 @@ Limiting request concurrency plugin. **Key can be customized by the user, only need to modify a line of code of the plug-in to complete. It is a security consideration that is not open in the plugin.** +When used in the stream proxy, only `remote_addr` and `server_addr` can be used as key. And `rejected_code` is meaningless. + ## How To Enable Here's an example, enable the limit-conn plugin on the specified route: diff --git a/docs/zh/latest/plugins/limit-conn.md b/docs/zh/latest/plugins/limit-conn.md index e307beb869e24..10fe390453db6 100644 --- a/docs/zh/latest/plugins/limit-conn.md +++ b/docs/zh/latest/plugins/limit-conn.md @@ -35,6 +35,9 @@ title: limit-conn **注:key 是可以被用户自定义的,只需要修改插件的一行代码即可完成。并没有在插件中放开是处于安全的考虑。** +: when used in the stream proxy, only `remote_addr` and `server_addr` can be used as `key`. And `rejected_code` is meaningless. +在 stream 代理中使用该插件时,只有 `remote_addr` 和 `server_addr` 可以被用作 key。另外设置 `rejected_code` 毫无意义。 + #### 如何启用 下面是一个示例,在指定的 route 上开启了 limit-conn 插件: diff --git a/t/APISIX.pm b/t/APISIX.pm index c09f6cf78c696..1d0e4868fe120 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -299,6 +299,7 @@ _EOC_ lua_socket_log_errors off; lua_shared_dict lrucache-lock-stream 10m; + lua_shared_dict stream-plugin-limit-conn 10m; upstream apisix_backend { server 127.0.0.1:1900; diff --git a/t/plugin/limit-conn2.t b/t/plugin/limit-conn2.t index 914abca00d822..565a5c73047ea 100644 --- a/t/plugin/limit-conn2.t +++ b/t/plugin/limit-conn2.t @@ -16,7 +16,7 @@ # BEGIN { if ($ENV{TEST_NGINX_CHECK_LEAK}) { - $SkipReason = "unavailable for the hup tests"; + $SkipReason = "unavailable for the check leak tests"; } else { $ENV{TEST_NGINX_USE_HUP} = 1; diff --git a/t/stream-plugin/limit-conn.t b/t/stream-plugin/limit-conn.t new file mode 100644 index 0000000000000..874b6d3f3a50e --- /dev/null +++ b/t/stream-plugin/limit-conn.t @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_shuffle(); +no_root_location(); + + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->error_log && !$block->no_error_log) { + $block->set_value("no_error_log", "[error]\n[alert]"); + } + + my $config = $block->config // <<_EOC_; + location /hit { + content_by_lua_block { + local sock = ngx.socket.tcp() + local ok, err = sock:connect("127.0.0.1", 1985) + if not ok then + ngx.log(ngx.ERR, "failed to connect: ", err) + return ngx.exit(503) + end + + local bytes, err = sock:send("mmm") + if not bytes then + ngx.log(ngx.ERR, "send stream request error: ", err) + return ngx.exit(503) + end + local data, err = sock:receive("*a") + if not data then + sock:close() + return ngx.exit(503) + end + ngx.print(data) + } + } + + location /test_concurrency { + content_by_lua_block { + local reqs = {} + for i = 1, 5 do + reqs[i] = { "/hit" } + end + local resps = { ngx.location.capture_multi(reqs) } + for i, resp in ipairs(resps) do + ngx.say(resp.status) + end + } + } +_EOC_ + + $block->set_value("config", $config); +}); + +run_tests; + +__DATA__ + +=== TEST 1: set route +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/upstreams/1', + ngx.HTTP_PUT, + [[{ + "nodes": { + "127.0.0.1:1995": 1 + }, + "type": "roundrobin" + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "limit-conn": { + "conn": 100, + "burst": 50, + "default_conn_delay": 0.1, + "key": "remote_addr" + } + }, + "upstream_id": "1" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 2: not exceeding the burst +--- request +GET /test_concurrency +--- response_body +200 +200 +200 +200 +200 +--- stream_enable + + + +=== TEST 3: update route +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "limit-conn": { + "conn": 2, + "burst": 1, + "default_conn_delay": 0.1, + "key": "remote_addr" + } + }, + "upstream_id": "1" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed + + + +=== TEST 4: exceeding the burst +--- request +GET /test_concurrency +--- response_body +200 +200 +200 +503 +503 +--- error_log +Connection reset by peer +--- stream_enable