Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add skywalking plugin. #1241

Merged
merged 2 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ install:
$(INSTALL) -d $(INST_LUADIR)/apisix/plugins/zipkin
$(INSTALL) apisix/plugins/zipkin/*.lua $(INST_LUADIR)/apisix/plugins/zipkin/

$(INSTALL) -d $(INST_LUADIR)/apisix/plugins/skywalking
$(INSTALL) apisix/plugins/skywalking/*.lua $(INST_LUADIR)/apisix/plugins/skywalking/

$(INSTALL) -d $(INST_LUADIR)/apisix/stream/plugins
$(INSTALL) apisix/stream/plugins/*.lua $(INST_LUADIR)/apisix/stream/plugins/

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against
- [CORS](doc/plugins/cors.md)

- **OPS friendly**
- OpenTracing: [support Apache Skywalking and Zipkin](doc/plugins/zipkin.md)
- OpenTracing: support [Apache Skywalking](doc/plugins/skywalking.md) and [Zipkin](doc/plugins/zipkin.md)
- Monitoring And Metrics: [Prometheus](doc/plugins/prometheus.md)
- Clustering: APISIX nodes are stateless, creates clustering of the configuration center, please refer to [etcd Clustering Guide](https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/clustering.md).
- High availability: support to configure multiple etcd addresses in the same cluster.
Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵
- [CORS](doc/plugins/cors-cn.md)

- **运维友好**
- OpenTracing 可观测性: [支持 Apache Skywalking 和 Zipkin](doc/plugins/zipkin-cn.md)。
- OpenTracing 可观测性: 支持 [Apache Skywalking](doc/plugins/skywalking-cn.md)[Zipkin](doc/plugins/zipkin-cn.md)。
- 监控和指标: [Prometheus](doc/plugins/prometheus-cn.md)
- 集群:APISIX 节点是无状态的,创建配置中心集群请参考 [etcd Clustering Guide](https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/clustering.md)。
- 高可用:支持配置同一个集群内的多个 etcd 地址。
Expand Down
12 changes: 6 additions & 6 deletions apisix/admin/routes.lua
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ function _M.put(id, conf, sub_path, args)
local key = "/routes/" .. id
local res, err = core.etcd.set(key, conf, args.ttl)
if not res then
core.log.error("failed to put route[", key, "]: ", err)
core.log.error("failed to put route[", key, "] to etcd: ", err)
return 500, {error_msg = err}
end

Expand All @@ -151,7 +151,7 @@ function _M.get(id)

local res, err = core.etcd.get(key)
if not res then
core.log.error("failed to get route[", key, "]: ", err)
core.log.error("failed to get route[", key, "] from etcd: ", err)
return 500, {error_msg = err}
end

Expand All @@ -169,7 +169,7 @@ function _M.post(id, conf, sub_path, args)
-- core.log.info("key: ", key)
local res, err = core.etcd.push("/routes", conf, args.ttl)
if not res then
core.log.error("failed to post route[", key, "]: ", err)
core.log.error("failed to post route[", key, "] to etcd: ", err)
return 500, {error_msg = err}
end

Expand All @@ -186,7 +186,7 @@ function _M.delete(id)
-- core.log.info("key: ", key)
local res, err = core.etcd.delete(key)
if not res then
core.log.error("failed to delete route[", key, "]: ", err)
core.log.error("failed to delete route[", key, "] in etcd: ", err)
return 500, {error_msg = err}
end

Expand Down Expand Up @@ -214,7 +214,7 @@ function _M.patch(id, conf, sub_path, args)

local res_old, err = core.etcd.get(key)
if not res_old then
core.log.error("failed to get route [", key, "]: ", err)
core.log.error("failed to get route [", key, "] in etcd: ", err)
return 500, {error_msg = err}
end

Expand Down Expand Up @@ -261,7 +261,7 @@ function _M.patch(id, conf, sub_path, args)
-- TODO: this is not safe, we need to use compare-set
local res, err = core.etcd.set(key, node_value, args.ttl)
if not res then
core.log.error("failed to set new route[", key, "]: ", err)
core.log.error("failed to set new route[", key, "] to etcd: ", err)
return 500, {error_msg = err}
end

Expand Down
80 changes: 80 additions & 0 deletions apisix/plugins/skywalking.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local ngx = ngx
local math = math

local sw_client = require("apisix.plugins.skywalking.client")
local sw_tracer = require("apisix.plugins.skywalking.tracer")

local plugin_name = "skywalking"


local schema = {
type = "object",
properties = {
endpoint = {type = "string"},
sample_ratio = {type = "number", minimum = 0.00001, maximum = 1, default = 1}
},
service_name = {
type = "string",
description = "service name for skywalking",
default = "APISIX",
},
required = {"endpoint"}
}


local _M = {
version = 0.1,
priority = -1100, -- last running plugin, but before serverless post func
name = plugin_name,
schema = schema,
}


function _M.check_schema(conf)
return core.schema.check(schema, conf)
end


function _M.rewrite(conf, ctx)
core.log.debug("rewrite phase of skywalking plugin")
ctx.skywalking_sample = false
if conf.sample_ratio == 1 or math.random() < conf.sample_ratio then
ctx.skywalking_sample = true
sw_client.heartbeat(conf)
-- Currently, we can not have the upstream real network address
sw_tracer.start(ctx, conf.endpoint, "upstream service")
end
end


function _M.body_filter(conf, ctx)
if ctx.skywalking_sample and ngx.arg[2] then
sw_tracer.finish(ctx)
end
end


function _M.log(conf, ctx)
if ctx.skywalking_sample then
sw_tracer.prepareForReport(ctx, conf.endpoint)
end
end

return _M
226 changes: 226 additions & 0 deletions apisix/plugins/skywalking/client.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local http = require("resty.http")
local cjson = require('cjson')
local ngx = ngx
local ipairs = ipairs

local register = require("skywalking.register")

local _M = {}

local function register_service(conf)
local endpoint = conf.endpoint

local tracing_buffer = ngx.shared['skywalking-tracing-buffer']
local service_id = tracing_buffer:get(endpoint .. '_service_id')
if service_id then
return service_id
end

local service_name = conf.service_name
local service = register.newServiceRegister(service_name)

local httpc = http.new()
local res, err = httpc:request_uri(endpoint .. '/v2/service/register',
{
method = "POST",
body = core.json.encode(service),
headers = {
["Content-Type"] = "application/json",
},
})
if not res then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style, this style is better

if not res then
    core.log.error(...)
    return nil
end

if res.status ~= 200 then
    core.log.error(...)
    return nil
end

tracing_buffer:set(endpoint .. '_service_id', service_id)
return service_id

core.log.error("skywalking service register failed, request uri: ",
endpoint .. '/v2/service/register', ", err: ", err)

elseif res.status == 200 then
core.log.debug("skywalking service register response: ", res.body)
local register_results = cjson.decode(res.body)

for _, result in ipairs(register_results) do
if result.key == service_name then
service_id = result.value
core.log.debug("skywalking service registered, service id:"
.. service_id)
end
end

else
core.log.error("skywalking service register failed, request uri:",
endpoint .. "/v2/service/register",
", response code:", res.status)
end

if service_id then
tracing_buffer:set(endpoint .. '_service_id', service_id)
end

return service_id
end

local function register_service_instance(conf, service_id)
local endpoint = conf.endpoint

local tracing_buffer = ngx.shared['skywalking-tracing-buffer']
local instance_id = tracing_buffer:get(endpoint .. '_instance_id')
if instance_id then
return instance_id
end

local service_instance_name = core.id.get()
local service_instance = register.newServiceInstanceRegister(
service_id,
service_instance_name,
ngx.now() * 1000)

local httpc = http.new()
local res, err = httpc:request_uri(endpoint .. '/v2/instance/register',
{
method = "POST",
body = core.json.encode(service_instance),
headers = {
["Content-Type"] = "application/json",
},
})

if not res then
core.log.error("skywalking service Instance register failed",
", request uri: ", conf.endpoint .. '/v2/instance/register',
", err: ", err)

elseif res.status == 200 then
core.log.debug("skywalking service instance register response: ", res.body)
local register_results = cjson.decode(res.body)

for _, result in ipairs(register_results) do
if result.key == service_instance_name then
instance_id = result.value
core.log.debug("skywalking service Instance registered, ",
"service instance id: ", instance_id)
end
end

else
core.log.error("skywalking service instance register failed, ",
"response code:", res.status)
end

if instance_id then
tracing_buffer:set(endpoint .. '_instance_id', instance_id)
end

return instance_id
end

local function ping(endpoint)
local tracing_buffer = ngx.shared['skywalking-tracing-buffer']
local ping_pkg = register.newServiceInstancePingPkg(
tracing_buffer:get(endpoint .. '_instance_id'),
core.id.get(),
ngx.now() * 1000)

local httpc = http.new()
local _, err = httpc:request_uri(endpoint .. '/v2/instance/heartbeat', {
method = "POST",
body = core.json.encode(ping_pkg),
headers = {
["Content-Type"] = "application/json",
},
})

if err then
core.log.error("skywalking agent ping failed, err: ", err)
end
end

-- report trace segments to the backend
local function report_traces(endpoint)
local tracing_buffer = ngx.shared['skywalking-tracing-buffer']
local segment = tracing_buffer:rpop(endpoint .. '_segment')

local count = 0

local httpc = http.new()

while segment ~= nil do
local res, err = httpc:request_uri(endpoint .. '/v2/segments', {
method = "POST",
body = segment,
headers = {
["Content-Type"] = "application/json",
},
})

if err == nil then
if res.status ~= 200 then
core.log.error("skywalking segment report failed, response code ", res.status)
break
else
count = count + 1
end
else
core.log.error("skywalking segment report failed, err: ", err)
break
end

segment = tracing_buffer:rpop('segment')
end

if count > 0 then
core.log.debug(count, " skywalking segments reported")
end
end

do
local heartbeat_timer

function _M.heartbeat(conf)
local sw_heartbeat = function()
local service_id = register_service(conf)
if not service_id then
return
end

local service_instance_id = register_service_instance(conf, service_id)
if not service_instance_id then
return
end

report_traces(conf.endpoint)
ping(conf.endpoint)
end

local err
if ngx.worker.id() == 0 and not heartbeat_timer then
heartbeat_timer, err = core.timer.new("skywalking_heartbeat",
sw_heartbeat,
{check_interval = 3}
)
if not heartbeat_timer then
core.log.error("failed to create skywalking_heartbeat timer: ", err)
else
core.log.info("succeed to create timer: skywalking heartbeat")
end
end
end

end -- do


return _M
Loading