Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add batch request plugin. #1388

Merged
merged 21 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against
- IPv6: Use IPv6 to match route.
- Support [TTL](doc/admin-api-cn.md#route)
- [Support priority](doc/router-radixtree.md#3-match-priority)
- [Support Http API-Aggregation](doc/plugins/api-aggregate.md)

- **Security**
- Authentications: [key-auth](doc/plugins/key-auth.md), [JWT](doc/plugins/jwt-auth.md), [basic-auth](doc/plugins/basic-auth.md), [wolf-rbac](doc/plugins/wolf-rbac.md)
Expand Down
1 change: 1 addition & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵
- IPv6:支持使用 IPv6 格式匹配路由
- 支持路由的[自动过期(TTL)](doc/admin-api-cn.md#route)
- [支持路由的优先级](doc/router-radixtree.md#3-match-priority)
- [支持 Http 接口聚合](doc/plugins/api-aggregate-cn.md)

- **安全防护**
- 多种身份认证方式: [key-auth](doc/plugins/key-auth-cn.md), [JWT](doc/plugins/jwt-auth-cn.md), [basic-auth](doc/plugins/basic-auth-cn.md), [wolf-rbac](doc/plugins/wolf-rbac-cn.md)。
Expand Down
206 changes: 206 additions & 0 deletions apisix/plugins/api-aggregate.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local http = require("resty.http")
local ngx = ngx
local ipairs = ipairs
local pairs = pairs

local plugin_name = "api-aggregate"

local schema = {
type = "object",
additionalProperties = false,
}

local req_schema = {
type = "object",
properties = {
membphis marked this conversation as resolved.
Show resolved Hide resolved
query = {
description = "pipeline query string",
type = "object"
},
headers = {
description = "pipeline header",
type = "object"
},
timeout = {
description = "pipeline timeout(ms)",
type = "integer",
default = 30000,
},
pipeline = {
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
version = {
description = "HTTP version",
type = "number",
enum = {1.0, 1.1},
default = 1.1,
},
method = {
description = "HTTP method",
type = "string",
enum = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD",
"OPTIONS", "CONNECT", "TRACE"},
default = "GET"
},
path = {
type = "string",
minLength = 1,
},
query = {
description = "request header",
type = "object",
},
headers = {
description = "request query string",
type = "object",
},
ssl_verify = {
type = "boolean",
default = false
},
}
}
}
},
anyOf = {
{required = {"pipeline"}},
},
}

local _M = {
version = 0.1,
priority = 4010,
ShiningRush marked this conversation as resolved.
Show resolved Hide resolved
name = plugin_name,
schema = schema
}

function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)
if not ok then
return false, err
end
return true
end

ShiningRush marked this conversation as resolved.
Show resolved Hide resolved
local function check_input(data)
local ok, err = core.schema.check(req_schema, data)
if not ok then
return 400, {message = "bad request body", err = err}
end
end

local function set_base_header(data)
if not data.headers then
return
end

for i,req in ipairs(data.pipeline) do
if not req.headers then
req.headers = data.headers
else
for k, v in pairs(data.headers) do
if not req.headers[k] then
req.headers[k] = v
end
end
end
end
end

local function set_base_query(data)
if not data.query then
return
end

for i,req in ipairs(data.pipeline) do
if not req.query then
req.query = data.query
else
for k, v in pairs(data.query) do
if not req.query[k] then
req.query[k] = v
end
end
end
end
end

local function aggregate()
ngx.req.read_body()
local req_body = ngx.req.get_body_data()
ShiningRush marked this conversation as resolved.
Show resolved Hide resolved
if not req_body then
core.response.exit(400, {message = "no request body, you should give at least one pipeline setting"})
end
local data, err = core.json.decode(req_body)
ShiningRush marked this conversation as resolved.
Show resolved Hide resolved
if not data then
core.response.exit(400, {message = "invalid request body", req_body = req_body, err = err})
end

local code, body = check_input(data)
if code then
core.response.exit(code, body)
end

local httpc = http.new()
core.log.info(data.timeout)
ShiningRush marked this conversation as resolved.
Show resolved Hide resolved
httpc:set_timeout(data.timeout)
httpc:connect("127.0.0.1", ngx.var.server_port)
set_base_header(data)
set_base_query(data)
local responses, err = httpc:request_pipeline(data.pipeline)
if not responses then
core.response.exit(400, {message = "request failed", err = err})
end

local aggregated_resp = {}
for i,r in ipairs(responses) do
if not r.status then
core.table.insert(aggregated_resp, {
status = 504,
reason = "upstream timeout"
})
end
local sub_resp = {
status = r.status,
reason = r.reason,
headers = r.headers,
}
if r.has_body then
sub_resp.body = r:read_body()
end
core.table.insert(aggregated_resp, sub_resp)
end
core.response.exit(200, aggregated_resp)
end

function _M.api()
return {
{
methods = {"POST"},
uri = "/apisix/aggregate",
handler = aggregate,
}
}
end

return _M
1 change: 1 addition & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,6 @@ plugins: # plugin list
- proxy-mirror
- kafka-logger
- cors
- api-aggregate
stream_plugins:
- mqtt-proxy
1 change: 1 addition & 0 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Plugins
* [proxy-mirror](plugins/proxy-mirror.md): Provides the ability to mirror client requests.
* [kafka-logger](plugins/kafka-logger.md): Log requests to External Kafka servers.
* [cors](plugins/cors.md): Enable CORS(Cross-origin resource sharing) for your API.
* [api-aggregate](plugins/api-aggregate.md): Allow you aggregate http api via http pipeline.

Deploy to the Cloud
=======
Expand Down
1 change: 1 addition & 0 deletions doc/README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@ Reference document
* [tcp-logger](plugins/tcp-logger.md): 将请求记录到TCP服务器
* [kafka-logger](plugins/kafka-logger-cn.md): 将请求记录到外部Kafka服务器。
* [cors](plugins/cors-cn.md): 为你的API启用CORS.
* [api-aggregate](plugins/api-aggregate-cn.md): 以 **http pipeline** 的方式聚合 `http` 请求
135 changes: 135 additions & 0 deletions doc/plugins/api-aggregate-cn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

# [English](api-aggregate.md)

# 目录

- [**简介**](#简介)
- [**属性**](#属性)
- [**如何启用**](#如何启用)
- [**聚合接口请求/响应**](#聚合接口请求/响应)
- [**测试插件**](#测试插件)
- [**禁用插件**](#禁用插件)

## 简介

`api-aggregate` 插件可以让你在以 [http pipeline](https://en.wikipedia.org/wiki/HTTP_pipelining) 的方式在网关聚合多个http请求。

## 属性


## 如何启用

本插件默认启用。
ShiningRush marked this conversation as resolved.
Show resolved Hide resolved

## 聚合接口请求/响应
插件会为 `apisix` 创建一个 `/apisix/aggregate` 的聚合接口来处理你的聚合请求。
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plugin is using admin API to aggregate apis, why not use plugin's configures?

Copy link
Contributor Author

@ShiningRush ShiningRush Apr 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, is here example about how to create a api via plugin's configures?API-Aggregation need a API to handle request.


### 接口请求参数:

| 参数名 | 类型 | 可选 | 默认值 | 描述 |
| --- | --- | --- | --- | --- |
| query | Object | Yes | | 给所有请求都携带的 `QueryString` |
| headers | Object | Yes | | 给所有请求都携带的 `Header` |
| timeout | Number | Yes | 3000 | 聚合请求的超时时间,单位为 `ms` |
| pipeline | [HttpRequest](#Request) | No | | Http 请求的详细信息 |

#### HttpRequest
| 参数名 | 类型 | 可选 | 默认值 | 描述 |
| --- | --- | --- | --- | --- |
| version | Enum | Yes | 1.1 | 请求用的 `http` 协议版本,可以使用 `1.0` or `1.1` |
| method | Enum | Yes | GET | 请求使用的 `http` 方法,例如:`GET`. |
| query | Object | Yes | | 独立请求所携带的 `QueryString`, 如果 `Key` 和全局的有冲突,以此设置为主。 |
| headers | Object | Yes | | 独立请求所携带的 `Header`, 如果 `Key` 和全局的有冲突,以此设置为主。 |
| path | String | No | | 请求路径 |
| body | String | Yes | | 请求体 |

### 接口响应参数:
返回值为一个 [HttpResponse](#HttpResponse) 的 `数组`。

#### HttpResponse
| 参数名 | 类型 | 描述 |
| --- | --- | --- | --- | --- |
| status | Integer | Http 请求的状态码 |
| reason | String | Http 请求的返回信息 |
| body | String | Http 请求的响应体 |
| headers | Object | Http 请求的响应头 |

## 测试插件

你可以将要访问的请求信息传到网关的聚合接口( `/apisix/aggregate` ),网关会以 [http pipeline](https://en.wikipedia.org/wiki/HTTP_pipelining) 的方式自动帮你完成请求。
```shell
curl --location --request POST 'http://100.109.220.139/apisix/aggregate' \
ShiningRush marked this conversation as resolved.
Show resolved Hide resolved
--header 'Content-Type: application/json' \
--d '{
"headers": {
"Content-Type": "application/json",
"admin-jwt":"xxxx"
},
"timeout": 500,
"pipeline": [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad indentation

{
"method": "POST",
"path": "/community.GiftSrv/GetGifts",
"body": "test"
},
{
"method": "POST",
"path": "/community.GiftSrv/GetGifts",
"body": "test2"
}
]
}'
```

返回如下:
```json
[
{
"status": 200,
"reason": "OK",
"body": "{\"ret\":500,\"msg\":\"error\",\"game_info\":null,\"gift\":[],\"to_gets\":0,\"get_all_msg\":\"\"}",
"headers": {
"Connection": "keep-alive",
"Date": "Sat, 11 Apr 2020 17:53:20 GMT",
"Content-Type": "application/json",
"Content-Length": "81",
"Server": "APISIX web server"
}
},
{
"status": 200,
"reason": "OK",
"body": "{\"ret\":500,\"msg\":\"error\",\"game_info\":null,\"gift\":[],\"to_gets\":0,\"get_all_msg\":\"\"}",
"headers": {
"Connection": "keep-alive",
"Date": "Sat, 11 Apr 2020 17:53:20 GMT",
"Content-Type": "application/json",
"Content-Length": "81",
"Server": "APISIX web server"
}
}
]
```

## 禁用插件

正常来说你不需要禁用本插件,如果有特殊情况,请从 `/conf/config.yaml` 的 `plugins` 节点中移除即可。
Loading