Skip to content

Commit

Permalink
fix: avoid caching outdated discovery upstream nodes (#3295)
Browse files Browse the repository at this point in the history
Fix #2369
Fix #1838

Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Jan 18, 2021
1 parent 15609ce commit 6fe399c
Show file tree
Hide file tree
Showing 9 changed files with 512 additions and 37 deletions.
2 changes: 1 addition & 1 deletion apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ local function fetch_health_nodes(upstream, checker)
end

if core.table.nkeys(up_nodes) == 0 then
core.log.warn("all upstream nodes is unhealth, use default")
core.log.warn("all upstream nodes is unhealthy, use default")
for _, node in ipairs(nodes) do
up_nodes[node.host .. ":" .. node.port] = node.weight
end
Expand Down
9 changes: 7 additions & 2 deletions apisix/http/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ local function filter(service)
return
end

if not service.value.upstream or not service.value.upstream.nodes then
if not service.value.upstream then
return
end

service.value.upstream.parent = service

if not service.value.upstream.nodes then
return
end

Expand Down Expand Up @@ -79,7 +85,6 @@ local function filter(service)
service.value.upstream.nodes = new_nodes
end

service.value.upstream.parent = service
core.log.info("filter service: ", core.json.delay_encode(service))
end

Expand Down
29 changes: 3 additions & 26 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
local router = require("apisix.router")
local set_upstream = require("apisix.upstream").set_by_route
local upstream_util = require("apisix.utils.upstream")
local ipmatcher = require("resty.ipmatcher")
local ngx = ngx
local get_method = ngx.req.get_method
Expand All @@ -32,7 +33,6 @@ local math = math
local error = error
local ipairs = ipairs
local tostring = tostring
local type = type
local ngx_now = ngx.now
local str_byte = string.byte
local str_sub = string.sub
Expand Down Expand Up @@ -237,29 +237,6 @@ local function parse_domain_for_nodes(nodes)
end


local function compare_upstream_node(old_t, new_t)
if type(old_t) ~= "table" then
return false
end

if #new_t ~= #old_t then
return false
end

for i = 1, #new_t do
local new_node = new_t[i]
local old_node = old_t[i]
for _, name in ipairs({"host", "port", "weight"}) do
if new_node[name] ~= old_node[name] then
return false
end
end
end

return true
end


local function parse_domain_in_up(up)
local nodes = up.value.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
Expand All @@ -268,7 +245,7 @@ local function parse_domain_in_up(up)
end

local old_dns_value = up.dns_value and up.dns_value.nodes
local ok = compare_upstream_node(old_dns_value, new_nodes)
local ok = upstream_util.compare_upstream_node(old_dns_value, new_nodes)
if ok then
return up
end
Expand All @@ -291,7 +268,7 @@ local function parse_domain_in_route(route)
end

local old_dns_value = route.dns_value and route.dns_value.upstream.nodes
local ok = compare_upstream_node(old_dns_value, new_nodes)
local ok = upstream_util.compare_upstream_node(old_dns_value, new_nodes)
if ok then
return route
end
Expand Down
4 changes: 2 additions & 2 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ end


function _M.merge_service_route(service_conf, route_conf)
core.log.info("service conf: ", core.json.delay_encode(service_conf))
core.log.info(" route conf: ", core.json.delay_encode(route_conf))
core.log.info("service conf: ", core.json.delay_encode(service_conf, true))
core.log.info(" route conf: ", core.json.delay_encode(route_conf, true))

local route_service_key = route_conf.value.id .. "#"
.. route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex
Expand Down
9 changes: 7 additions & 2 deletions apisix/router.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ local function filter(route)
return
end

if not route.value.upstream or not route.value.upstream.nodes then
if not route.value.upstream then
return
end

route.value.upstream.parent = route

if not route.value.upstream.nodes then
return
end

Expand Down Expand Up @@ -64,7 +70,6 @@ local function filter(route)
route.value.upstream.nodes = new_nodes
end

route.value.upstream.parent = route
core.log.info("filter route: ", core.json.delay_encode(route))
end

Expand Down
31 changes: 27 additions & 4 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
local require = require
local core = require("apisix.core")
local discovery = require("apisix.discovery.init").discovery
local upstream_util = require("apisix.utils.upstream")
local error = error
local tostring = tostring
local ipairs = ipairs
Expand Down Expand Up @@ -132,9 +133,26 @@ function _M.set_by_route(route, api_ctx)

local dis = discovery[up_conf.discovery_type]
if not dis then
return 500, "discovery " .. up_conf.discovery_type .. "is uninitialized"
return 500, "discovery " .. up_conf.discovery_type .. " is uninitialized"
end
local new_nodes = dis.nodes(up_conf.service_name)
local same = upstream_util.compare_upstream_node(up_conf.nodes, new_nodes)
if not same then
up_conf.nodes = new_nodes
local new_up_conf = core.table.clone(up_conf)
core.log.info("discover new upstream from ", up_conf.service_name, ", type ",
up_conf.discovery_type, ": ",
core.json.delay_encode(new_up_conf, true))

local parent = up_conf.parent
if parent.value.upstream then
-- the up_conf comes from route or service
parent.value.upstream = new_up_conf
else
parent.value = new_up_conf
end
up_conf = new_up_conf
end
up_conf.nodes = dis.nodes(up_conf.service_name)
end

set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
Expand Down Expand Up @@ -175,7 +193,13 @@ function _M.init_worker()
item_schema = core.schema.upstream,
filter = function(upstream)
upstream.has_domain = false
if not upstream.value or not upstream.value.nodes then
if not upstream.value then
return
end

upstream.value.parent = upstream

if not upstream.value.nodes then
return
end

Expand Down Expand Up @@ -207,7 +231,6 @@ function _M.init_worker()
upstream.value.nodes = new_nodes
end

upstream.value.parent = upstream
core.log.info("filter upstream: ", core.json.delay_encode(upstream))
end,
})
Expand Down
56 changes: 56 additions & 0 deletions apisix/utils/upstream.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local ipairs = ipairs
local type = type


local _M = {}


local function sort_by_key_host(a, b)
return a.host < b.host
end


function _M.compare_upstream_node(old_t, new_t)
if type(old_t) ~= "table" then
return false
end

if #new_t ~= #old_t then
return false
end

core.table.sort(old_t, sort_by_key_host)
core.table.sort(new_t, sort_by_key_host)

for i = 1, #new_t do
local new_node = new_t[i]
local old_node = old_t[i]
for _, name in ipairs({"host", "port", "weight"}) do
if new_node[name] ~= old_node[name] then
return false
end
end
end

return true
end


return _M
Loading

0 comments on commit 6fe399c

Please sign in to comment.