Skip to content

Commit

Permalink
refactory: collect upstream logic and put them in a single file. (#…
Browse files Browse the repository at this point in the history
…1734)

feature: support dynamic upstream in plugin.

here is a mini  example in `access` phase of plugin:

```lua
    local up_conf = {
        type = "roundrobin",
        nodes = {
            {host = conf.upstream.ip, port = conf.upstream.port, weight = 1},
        }
    }

    local ok, err = upstream.check_schema(up_conf)
    if not ok then
        return 500, err
    end

    local matched_route = ctx.matched_route
    upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
                 ctx.conf_version, up_conf, matched_route)
    return
```
  • Loading branch information
membphis authored Jun 22, 2020
1 parent 9533c8a commit 748e337
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 237 deletions.
93 changes: 4 additions & 89 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ local require = require
local discovery = require("apisix.discovery.init").discovery
local balancer = require("ngx.balancer")
local core = require("apisix.core")
local error = error
local pairs = pairs
local ipairs = ipairs
local tostring = tostring

local set_more_tries = balancer.set_more_tries
local get_last_failure = balancer.get_last_failure
local set_timeouts = balancer.set_timeouts
local upstreams_etcd


local module_name = "balancer"
Expand Down Expand Up @@ -150,38 +146,10 @@ end
local function pick_server(route, ctx)
core.log.info("route: ", core.json.delay_encode(route, true))
core.log.info("ctx: ", core.json.delay_encode(ctx, true))
local healthcheck_parent = route
local up_id = route.value.upstream_id
local up_conf = (route.dns_value and route.dns_value.upstream)
or route.value.upstream
if not up_id and not up_conf then
return nil, nil, "missing upstream configuration"
end

local version
local key

if up_id then
if not upstreams_etcd then
return nil, nil, "need to create a etcd instance for fetching "
.. "upstream information"
end

local up_obj = upstreams_etcd:get(tostring(up_id))
if not up_obj then
return nil, nil, "failed to find upstream by id: " .. up_id
end
core.log.info("upstream: ", core.json.delay_encode(up_obj))

healthcheck_parent = up_obj
up_conf = up_obj.dns_value or up_obj.value
version = up_obj.modifiedIndex
key = up_conf.type .. "#upstream_" .. up_id

else
version = ctx.conf_version
key = up_conf.type .. "#route_" .. route.value.id
end
local healthcheck_parent = ctx.upstream_healthcheck_parent
local up_conf = ctx.upstream_conf
local version = ctx.upstream_version
local key = ctx.upstream_key

if up_conf.service_name then
if not discovery then
Expand Down Expand Up @@ -277,59 +245,6 @@ end


function _M.init_worker()
local err
upstreams_etcd, err = core.config.new("/upstreams", {
automatic = true,
item_schema = core.schema.upstream,
filter = function(upstream)
upstream.has_domain = false
if not upstream.value or not upstream.value.nodes then
return
end

local nodes = upstream.value.nodes
if core.table.isarray(nodes) then
for _, node in ipairs(nodes) do
local host = node.host
if not core.utils.parse_ipv4(host) and
not core.utils.parse_ipv6(host) then
upstream.has_domain = true
break
end
end
else
local new_nodes = core.table.new(core.table.nkeys(nodes), 0)
for addr, weight in pairs(nodes) do
local host, port = core.utils.parse_addr(addr)
if not core.utils.parse_ipv4(host) and
not core.utils.parse_ipv6(host) then
upstream.has_domain = true
end
local node = {
host = host,
port = port,
weight = weight,
}
core.table.insert(new_nodes, node)
end
upstream.value.nodes = new_nodes
end

core.log.info("filter upstream: ", core.json.delay_encode(upstream))
end,
})
if not upstreams_etcd then
error("failed to create etcd instance for fetching upstream: " .. err)
return
end
end

function _M.upstreams()
if not upstreams_etcd then
return nil, nil
end

return upstreams_etcd.values, upstreams_etcd.conf_version
end

return _M
65 changes: 16 additions & 49 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ local service_fetch = require("apisix.http.service").get
local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
local router = require("apisix.router")
local set_upstream = require("apisix.upstream").set_by_route
local ipmatcher = require("resty.ipmatcher")
local ngx = ngx
local get_method = ngx.req.get_method
Expand Down Expand Up @@ -92,6 +93,7 @@ function _M.http_init_worker()
end

require("apisix.debug").init_worker()
require("apisix.upstream").init_worker()

local local_conf = core.config.local_conf()
local dns_resolver_valid = local_conf and local_conf.apisix and
Expand All @@ -114,29 +116,6 @@ local function run_plugin(phase, plugins, api_ctx)
return api_ctx
end

if phase == "balancer" then
local balancer_name = api_ctx.balancer_name
local balancer_plugin = api_ctx.balancer_plugin
if balancer_name and balancer_plugin then
local phase_fun = balancer_plugin[phase]
phase_fun(balancer_plugin, api_ctx)
return api_ctx
end

for i = 1, #plugins, 2 do
local phase_fun = plugins[i][phase]
if phase_fun and
(not balancer_name or balancer_name == plugins[i].name) then
phase_fun(plugins[i + 1], api_ctx)
if api_ctx.balancer_name == plugins[i].name then
api_ctx.balancer_plugin = plugins[i]
return api_ctx
end
end
end
return api_ctx
end

if phase ~= "log"
and phase ~= "header_filter"
and phase ~= "body_filter"
Expand Down Expand Up @@ -383,6 +362,12 @@ function _M.http_access_phase()
end
end
run_plugin("access", plugins, api_ctx)

local ok, err = set_upstream(route, api_ctx)
if not ok then
core.log.error("failed to parse upstream: ", err)
core.response.exit(500)
end
end


Expand Down Expand Up @@ -443,6 +428,8 @@ function _M.grpc_access_phase()

run_plugin("rewrite", plugins, api_ctx)
run_plugin("access", plugins, api_ctx)

set_upstream(route, api_ctx)
end


Expand Down Expand Up @@ -503,19 +490,6 @@ function _M.http_balancer_phase()
return core.response.exit(500)
end

-- first time
if not api_ctx.balancer_name then
run_plugin("balancer", nil, api_ctx)
if api_ctx.balancer_name then
return
end
end

if api_ctx.balancer_name and api_ctx.balancer_name ~= "default" then
return run_plugin("balancer", nil, api_ctx)
end

api_ctx.balancer_name = "default"
load_balancer(api_ctx.matched_route, api_ctx)
end

Expand Down Expand Up @@ -615,7 +589,13 @@ function _M.stream_preread_phase()
api_ctx.plugins = plugin.stream_filter(matched_route, plugins)
-- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true))

api_ctx.conf_type = "stream/route"
api_ctx.conf_version = matched_route.modifiedIndex
api_ctx.conf_id = matched_route.value.id

run_plugin("preread", plugins, api_ctx)

set_upstream(matched_route, api_ctx)
end


Expand All @@ -627,19 +607,6 @@ function _M.stream_balancer_phase()
return ngx_exit(1)
end

-- first time
if not api_ctx.balancer_name then
run_plugin("balancer", nil, api_ctx)
if api_ctx.balancer_name then
return
end
end

if api_ctx.balancer_name and api_ctx.balancer_name ~= "default" then
return run_plugin("balancer", nil, api_ctx)
end

api_ctx.balancer_name = "default"
load_balancer(api_ctx.matched_route, api_ctx)
end

Expand Down
26 changes: 14 additions & 12 deletions apisix/plugins/example-plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- limitations under the License.
--
local core = require("apisix.core")
local balancer = require("ngx.balancer")
local upstream = require("apisix.upstream")

local schema = {
type = "object",
Expand Down Expand Up @@ -60,25 +60,27 @@ end
function _M.access(conf, ctx)
core.log.warn("plugin access phase, conf: ", core.json.encode(conf))
-- return 200, {message = "hit example plugin"}
end


function _M.balancer(conf, ctx)
core.log.warn("plugin balancer phase, conf: ", core.json.encode(conf))

if not conf.ip then
return
end

-- NOTE: update `ctx.balancer_name` is important, APISIX will skip other
-- balancer handler.
ctx.balancer_name = plugin_name
local up_conf = {
type = "roundrobin",
nodes = {
{host = conf.ip, port = conf.port, weight = 1}
}
}

local ok, err = balancer.set_current_peer(conf.ip, conf.port)
local ok, err = upstream.check_schema(up_conf)
if not ok then
core.log.error("failed to set server peer: ", err)
return core.response.exit(502)
return 500, err
end

local matched_route = ctx.matched_route
upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
ctx.conf_version, up_conf, matched_route)
return
end


Expand Down
35 changes: 19 additions & 16 deletions apisix/stream/plugins/mqtt-proxy.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
-- limitations under the License.
--
local core = require("apisix.core")
local balancer = require("ngx.balancer")
local bit = require "bit"
local upstream = require("apisix.upstream")
local bit = require("bit")
local ngx = ngx
local ngx_exit = ngx.exit
local str_byte = string.byte
Expand Down Expand Up @@ -158,25 +158,28 @@ function _M.preread(conf, ctx)
end

core.log.info("mqtt client id: ", res.client_id)
end

local up_conf = {
type = "roundrobin",
nodes = {
{host = conf.upstream.ip, port = conf.upstream.port, weight = 1},
}
}

function _M.log(conf, ctx)
core.log.info("plugin log phase, conf: ", core.json.encode(conf))
end
local ok, err = upstream.check_schema(up_conf)
if not ok then
return 500, err
end

local matched_route = ctx.matched_route
upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
ctx.conf_version, up_conf, matched_route)
return
end

function _M.balancer(conf, ctx)
core.log.info("plugin balancer phase, conf: ", core.json.encode(conf))
-- ctx.balancer_name = plugin_name
local up = conf.upstream
ctx.balancer_name = plugin_name

local ok, err = balancer.set_current_peer(up.ip, up.port)
if not ok then
core.log.error("failed to set server peer: ", err)
return ngx_exit(1)
end
function _M.log(conf, ctx)
core.log.info("plugin log phase, conf: ", core.json.encode(conf))
end


Expand Down
Loading

0 comments on commit 748e337

Please sign in to comment.