Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement traffic splitting plugin #2935

Merged
merged 31 commits into from
Dec 25, 2020
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8a2736e
feat: traffic split plugin.
Firstsawyou Dec 2, 2020
4d49ef5
fix test case global ipairs and pairs.
Firstsawyou Dec 2, 2020
fcec69e
update code, docs and test cases.
Firstsawyou Dec 7, 2020
2edd06d
resolve review.
Firstsawyou Dec 9, 2020
4a9cd43
resolve review and update docs.
Firstsawyou Dec 9, 2020
9b200af
resolve conflicts.
Firstsawyou Dec 9, 2020
42d74f9
update docs.
Firstsawyou Dec 9, 2020
0dcecf5
update README docs and resolve review.
Firstsawyou Dec 11, 2020
847fa6b
`weighted_upstreams` field changed to singular form `weighted_upstream`.
Firstsawyou Dec 12, 2020
1f72f67
Merge branch 'master' into dynamic-upstream-plugin
juzhiyuan Dec 19, 2020
3126c9e
fix ci run error.
Firstsawyou Dec 21, 2020
371f806
update README files content.
Firstsawyou Dec 21, 2020
b8f18d6
rerun ci.
Firstsawyou Dec 21, 2020
e5e0414
upstream nodes support array types.
Firstsawyou Dec 23, 2020
b2b6701
code style.
Firstsawyou Dec 23, 2020
727ef58
fix unstable test cases.
Firstsawyou Dec 23, 2020
51ecca7
fix doc desc and code style.
Firstsawyou Dec 24, 2020
a37d754
delete `---LAST` of test cases.
Firstsawyou Dec 24, 2020
0b84c2e
fix: the upstream key is not unique
Firstsawyou Dec 24, 2020
13057ae
fix global "type".
Firstsawyou Dec 24, 2020
84399e8
add more test case and update schema field.
Firstsawyou Dec 24, 2020
80683f5
update test case.
Firstsawyou Dec 24, 2020
f752da8
update code style and doc
Firstsawyou Dec 24, 2020
9fa9ded
update test.
Firstsawyou Dec 24, 2020
f819400
refactored plugin docs.
Firstsawyou Dec 24, 2020
2e9dde5
update plugin docs.
Firstsawyou Dec 24, 2020
ce0abab
fix doc typo.
Firstsawyou Dec 24, 2020
cc550e0
update docs.
Firstsawyou Dec 25, 2020
2dca214
update doc note content.
Firstsawyou Dec 25, 2020
6f6263b
update docs title.
Firstsawyou Dec 25, 2020
237a4e1
in docs add how to enable content.
Firstsawyou Dec 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against
- [Health Checks](doc/health-check.md): Enable health check on the upstream node, and will automatically filter unhealthy nodes during load balancing to ensure system stability.
- Circuit-Breaker: Intelligent tracking of unhealthy upstream services.
- [Proxy Mirror](doc/plugins/proxy-mirror.md): Provides the ability to mirror client requests.
- [Traffic Split](doc/plugins/traffic-split.md): Allows users to incrementally direct percentages of traffic between various upstreams.

- **Fine-grained routing**

Expand Down
1 change: 1 addition & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵
- [健康检查](doc/zh-cn/health-check.md):启用上游节点的健康检查,将在负载均衡期间自动过滤不健康的节点,以确保系统稳定性。
- 熔断器: 智能跟踪不健康上游服务。
- [代理镜像](doc/zh-cn/plugins/proxy-mirror.md): 提供镜像客户端请求的能力。
- [流量拆分](doc/zh-cn/plugins/traffic-split.md): 允许用户逐步控制各个上游之间的流量百分比。

- **精细化路由**

Expand Down
1 change: 1 addition & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ local function parse_domain(host)
return nil, "failed to parse domain"
end
end
_M.parse_domain = parse_domain


local function parse_domain_for_nodes(nodes)
Expand Down
323 changes: 323 additions & 0 deletions apisix/plugins/traffic-split.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local upstream = require("apisix.upstream")
local schema_def = require("apisix.schema_def")
local init = require("apisix.init")
local roundrobin = require("resty.roundrobin")
local ipmatcher = require("resty.ipmatcher")
local expr = require("resty.expr.v1")
local pairs = pairs
local ipairs = ipairs
local type = type
local table_insert = table.insert

local lrucache = core.lrucache.new({
ttl = 0, count = 512
})


local vars_schema = {
type = "array",
items = {
type = "array",
items = {
{
type = "string",
minLength = 1,
maxLength = 100
},
{
type = "string",
minLength = 1,
maxLength = 2
}
},
additionalItems = {
anyOf = {
{type = "string"},
{type = "number"},
{type = "boolean"},
{
type = "array",
items = {
anyOf = {
{
type = "string",
minLength = 1, maxLength = 100
},
{
type = "number"
},
{
type = "boolean"
}
}
},
uniqueItems = true
}
}
},
minItems = 0,
maxItems = 10
}
}


local match_schema = {
type = "array",
items = {
type = "object",
properties = {
vars = vars_schema
}
},
-- When there is no `match` rule, the default rule passes.
-- Perform upstream logic of plugin configuration.
default = {{ vars = {{"server_port", ">", 0}}}}
}


local upstreams_schema = {
type = "array",
items = {
type = "object",
properties = {
upstream_id = schema_def.id_schema, -- todo: support upstream_id method
upstream = schema_def.upstream,
weight = {
description = "used to split traffic between different" ..
"upstreams for plugin configuration",
type = "integer",
default = 1,
minimum = 0
}
}
},
-- When the upstream configuration of the plugin is missing,
-- the upstream of `route` is used by default.
default = {
{
weight = 1
}
},
minItems = 1,
maxItems = 20
}


local schema = {
type = "object",
properties = {
rules = {
type = "array",
items = {
type = "object",
properties = {
match = match_schema,
weighted_upstreams = upstreams_schema
}
}
}
}
}

local plugin_name = "traffic-split"

local _M = {
version = 0.1,
priority = 966,
name = plugin_name,
schema = schema
}

function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)

if not ok then
return false, err
end

return true
end


local function parse_domain_for_node(node)
if not ipmatcher.parse_ipv4(node)
and not ipmatcher.parse_ipv6(node)
then
local ip, err = init.parse_domain(node)
if ip then
return ip
end

if err then
return nil, err
end
end

return node
end


local function set_pass_host(ctx, upstream_info, host)
-- Currently only supports a single upstream of the domain name.
-- When the upstream is `IP`, do not do any `pass_host` operation.
if not core.utils.parse_ipv4(host)
and not core.utils.parse_ipv6(host)
then
local pass_host = upstream_info.pass_host or "pass"
if pass_host == "pass" then
ctx.var.upstream_host = ctx.var.host
return
end

if pass_host == "rewrite" then
ctx.var.upstream_host = upstream_info.upstream_host
return
end

ctx.var.upstream_host = host
return
end

return
end


local function set_upstream(upstream_info, ctx)
Firstsawyou marked this conversation as resolved.
Show resolved Hide resolved
local nodes = upstream_info.nodes
local new_nodes = {}
if core.table.isarray(nodes) then
for _, node in ipairs(nodes) do
set_pass_host(ctx, upstream_info, node.host)
node.host = parse_domain_for_node(node.host)
node.port = node.port
node.weight = node.weight
table_insert(new_nodes, node)
end
else
for addr, weight in pairs(nodes) do
local node = {}
local ip, port, host
host, port = core.utils.parse_addr(addr)
set_pass_host(ctx, upstream_info, host)
ip = parse_domain_for_node(host)
node.host = ip
node.port = port
node.weight = weight
table_insert(new_nodes, node)
end
end
core.log.info("upstream_host: ", ctx.var.upstream_host)

local up_conf = {
name = upstream_info.name,
type = upstream_info.type,
nodes = new_nodes,
timeout = {
send = upstream_info.timeout and upstream_info.timeout.send or 15,
read = upstream_info.timeout and upstream_info.timeout.read or 15,
connect = upstream_info.timeout and upstream_info.timeout.connect or 15
}
}

local ok, err = upstream.check_schema(up_conf)
if not ok then
return 500, err
end

local matched_route = ctx.matched_route
local upstream_key = up_conf.type .. "#route_" ..
matched_route.value.id .. "_" ..upstream_info.vid
core.log.info("upstream_key: ", upstream_key)
upstream.set(ctx, upstream_key, ctx.conf_version, up_conf, matched_route)

return
end


local function new_rr_obj(weighted_upstreams)
local server_list = {}
for i, upstream_obj in ipairs(weighted_upstreams) do
if not upstream_obj.upstream then
-- If the `upstream` object has only the `weight` value, it means
-- that the `upstream` weight value on the default `route` has been reached.
-- Need to set an identifier to mark the empty upstream.
upstream_obj.upstream = "empty_upstream"
end

if type(upstream_obj.upstream) == "table" then
-- Add a virtual id field to uniquely identify the upstream `key`.
upstream_obj.upstream.vid = i
end
server_list[upstream_obj.upstream] = upstream_obj.weight
end

return roundrobin:new(server_list)
end


function _M.access(conf, ctx)
if not conf or not conf.rules then
return
end

local weighted_upstreams, match_flag
for _, rule in ipairs(conf.rules) do
match_flag = true
for _, single_match in ipairs(rule.match) do
local expr, err = expr.new(single_match.vars)
Firstsawyou marked this conversation as resolved.
Show resolved Hide resolved
if err then
Firstsawyou marked this conversation as resolved.
Show resolved Hide resolved
core.log.error("vars expression does not match: ", err)
return 500, err
end

match_flag = expr:eval()
if match_flag then
Firstsawyou marked this conversation as resolved.
Show resolved Hide resolved
Firstsawyou marked this conversation as resolved.
Show resolved Hide resolved
break
end
end

if match_flag then
weighted_upstreams = rule.weighted_upstreams
break
end
end
core.log.info("match_flag: ", match_flag)

if not match_flag then
return
end

local rr_up, err = lrucache(weighted_upstreams, nil, new_rr_obj, weighted_upstreams)
if not rr_up then
core.log.error("lrucache roundrobin failed: ", err)
return 500
end

local upstream = rr_up:find()
Firstsawyou marked this conversation as resolved.
Show resolved Hide resolved
tokers marked this conversation as resolved.
Show resolved Hide resolved
if upstream and upstream ~= "empty_upstream" then
core.log.info("upstream: ", core.json.encode(upstream))
return set_upstream(upstream, ctx)
end

return
end


return _M
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ plugins: # plugin list (sorted in alphabetical order)
- wolf-rbac
- zipkin
# - server-info
- traffic-split
Firstsawyou marked this conversation as resolved.
Show resolved Hide resolved

stream_plugins:
- mqtt-proxy
Expand Down
1 change: 1 addition & 0 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* [request-validation](plugins/request-validation.md): Validates requests before forwarding to upstream.
* [proxy-mirror](plugins/proxy-mirror.md): Provides the ability to mirror client requests.
* [api-breaker](plugins/api-breaker.md): Circuit Breaker for API that stops requests forwarding to upstream in case of unhealthy state.
* [traffic-split](plugins/traffic-split.md): Allows users to incrementally direct percentages of traffic between various upstreams.

### Monitoring

Expand Down
Loading