Skip to content

Commit

Permalink
plugin(kafka-logger): Updating kafka logger to use the batch processo…
Browse files Browse the repository at this point in the history
…r util (#1358)
  • Loading branch information
sshniro authored Apr 29, 2020
1 parent 89a07ef commit c6cc2b5
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 40 deletions.
89 changes: 65 additions & 24 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,29 @@
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local pairs = pairs
local type = type
local table = table

local plugin_name = "kafka-logger"
local ngx = ngx

local timer_at = ngx.timer.at
local buffers = {}

local schema = {
type = "object",
properties = {
broker_list = {
type = "object"
},
timeout = { -- timeout in milliseconds
type = "integer", minimum = 1, default= 2000
},
kafka_topic = {type = "string"},
async = {type = "boolean", default = false},
key = {type = "string"},
max_retry = {type = "integer", minimum = 0 , default = 3},
timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "kafka logger"},
max_retry_count = {type = "integer", minimum = 0, default = 0},
retry_delay = {type = "integer", minimum = 0, default = 1},
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
},
required = {"broker_list", "kafka_topic", "key"}
}
Expand All @@ -50,15 +51,13 @@ local _M = {
schema = schema,
}


function _M.check_schema(conf)
return core.schema.check(schema, conf)
end

local function log(premature, conf, log_message)
if premature then
return
end

local function send_kafka_data(conf, log_message)
if core.table.nkeys(conf.broker_list) == 0 then
core.log.error("failed to identify the broker specified")
end
Expand All @@ -68,7 +67,7 @@ local function log(premature, conf, log_message)

for host, port in pairs(conf.broker_list) do
if type(host) == 'string'
and type(port) == 'number' then
and type(port) == 'number' then

local broker = {
host = host, port = port
Expand All @@ -77,28 +76,70 @@ local function log(premature, conf, log_message)
end
end

broker_config["request_timeout"] = conf.timeout
broker_config["max_retry"] = conf.max_retry

--Async producers will queue logs and push them when the buffer exceeds.
if conf.async then
broker_config["producer_type"] = "async"
end
broker_config["request_timeout"] = conf.timeout * 1000

local prod, err = producer:new(broker_list,broker_config)
if err then
core.log.error("failed to identify the broker specified", err)
return
return nil, "failed to identify the broker specified: " .. err
end

local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
if not ok then
core.log.error("failed to send data to Kafka topic", err)
return nil, "failed to send data to Kafka topic" .. err
end
end


function _M.log(conf)
return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
local entry = log_util.get_full_log(ngx)

if not entry.route_id then
core.log.error("failed to obtain the route id for kafka logger")
return
end

local log_buffer = buffers[entry.route_id]

if log_buffer then
log_buffer:push(entry)
return
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err
if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- encode as single {}
else
data, err = core.json.encode(entries) -- encode as array [{}]
end

if not data then
return false, 'error occurred while encoding the data: ' .. err
end

return send_kafka_data(conf, data)
end

local config = {
name = conf.name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
}

local err
log_buffer, err = batch_processor:new(func, config)

if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end

buffers[entry.route_id] = log_buffer
log_buffer:push(entry)
end

return _M
32 changes: 19 additions & 13 deletions doc/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@ This will provide the ability to send Log data requests as JSON objects to exter

## Attributes

|Name |Requirement |Description|
|--------- |--------|-----------|
| broker_list |required| An array of Kafka brokers.|
| kafka_topic |required| Target topic to push data.|
| timeout |optional|Timeout for the upstream to send data.|
| async |optional|Boolean value to control whether to perform async push.|
| key |required|Key for the message.|
| max_retry |optional|No of retries|
|Name |Requirement |Description|
|--------- |-------- |-----------|
| broker_list |required | An array of Kafka brokers.|
| kafka_topic |required | Target topic to push data.|
| timeout |optional |Timeout for the upstream to send data.|
| async |optional |Boolean value to control whether to perform async push.|
| key |required |Key for the message.|
|name |optional |A unique identifier to identity the batch processor|
|batch_max_size |optional |Max size of each batch, default is 1000|
|inactive_timeout|optional |maximum age in seconds when the buffer will be flushed if inactive, default is 5s|
|buffer_duration|optional |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 5|
|max_retry_count|optional |Maximum number of retries before removing from the processing pipe line; default is zero|
|retry_delay |optional |Number of seconds the process execution should be delayed if the execution fails; default is 1|

## Info

Expand Down Expand Up @@ -75,7 +80,7 @@ sample to take effect of this functionality.

## How To Enable

1. Here is an examle on how to enable kafka-logger plugin for a specific route.
The following is an example on how to enable the kafka-logger for a specific route.

```shell
curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
Expand All @@ -88,7 +93,9 @@ curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f1
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
"key" : "key1",
"batch_max_size": 1,
"name": "kafka logger"
}
},
"upstream": {
Expand All @@ -114,9 +121,8 @@ hello, world

## Disable Plugin

When you want to disable the `kafka-logger` plugin, it is very simple,
you can delete the corresponding json configuration in the plugin configuration,
no need to restart the service, it will take effect immediately:
Remove the corresponding json configuration in the plugin configuration to disable the `kafka-logger`.
APISIX plugins are hot-reloaded, therefore no need to restart APISIX.

```shell
$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
Expand Down
8 changes: 5 additions & 3 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ hello world
"127.0.0.1":9093
},
"kafka_topic" : "test2",
"key" : "key1"
"key" : "key1",
"batch_max_size": 1
}
},
"upstream": {
Expand All @@ -217,7 +218,8 @@ hello world
"127.0.0.1":9093
},
"kafka_topic" : "test2",
"key" : "key1"
"key" : "key1",
"batch_max_size": 1
}
},
"upstream": {
Expand Down Expand Up @@ -248,4 +250,4 @@ GET /t
--- error_log
failed to send data to Kafka topic
[error]
--- wait: 0.2
--- wait: 1

0 comments on commit c6cc2b5

Please sign in to comment.