Skip to content

Commit

Permalink
hotfix(mashape-analytics) improve ALF buffer under load
Browse files Browse the repository at this point in the history
As reported by #750, the buffer is vulnerable to a heavy load because of
its `sending_queue` of batches pending for sending. It also does not
handle the new 207 HTTP status code returned by the collector in case of
invalid ALFs.

Changes:
- handle 207 HTTP status code by discarding the batch. Some ALFs in it
  will have been saved, and the invalid one(s) should not be retried.
- implement a maximum size (in MB) for the sending_queue, as suggested
  by #750. Originally, I was about to implement such a size limitation
  by "number of batches pending", but it would not be intuitive for
  users to know what fits best their use case, because ALF sizes varies
  from one API to another, and one endpoint to another. By defining it
  in MB it is easier for users to chose a value. The default value of
  10MB has been chosen after performing some benchmarking, and should
  handle from 300 to 500 req/s depending on the ALFs sizes.
  When the `sending_queue` has reached its limit, the current ALFs in
  the buffer will be **discarded**.
- implement a retry policy. Instead of insisting on retrying to send batches
  when the collector cannot be reached, a delay is computed an
  exponentially increases on each failure to connect to the collector.
  That delay is shared by all workers. This avoids to load the collector
  when it is having difficulties and saves up bandwidth on Kong's side.
  As soon as the collector can be reached again, the delay is reset.
  Currently, the minimum retry delay is 1s and the maximum is 60s. Those
  values cannot be configured.
- no more line jumps in logs printing responses from the collector.
  • Loading branch information
thibaultcha committed Dec 2, 2015
1 parent b4cc76a commit 87e15dd
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 97 deletions.
172 changes: 116 additions & 56 deletions kong/plugins/mashape-analytics/buffer.lua
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
-- ALF buffer module
--
-- This module contains a buffered array of ALF objects. When the buffer is full (max number of entries
-- or max payload size), it is converted to a JSON payload and moved a queue of payloads to be
-- sent to the server.
-- or max payload size accepted by the socket server), it is eventually converted to a JSON payload and moved a
-- queue of payloads to be sent to the server. "Eventually", because to prevent the sending queue from growing
-- too much and crashing the Lua VM, its size is limited (in bytes). If the sending queue is currently bloated
-- and reached its size limit, then the buffer is NOT added to it, and simply discarded. ALFs will be lost.
--
-- 1 buffer of ALFs (gets flushed once it reached the mmax size)
-- 1 queue of ready-to-be-sent batches which are JSON payloads
-- So to resume:
-- One buffer of ALFs (gets flushed once it reaches the max size)
-- One queue of pending, ready-to-be-sent batches which are JSON payloads (which also has a max size, in bytes)
--
-- We only remove a payload from the sent queue if it has been correctly received by the socket server.
-- We retry if there is any error during the sending.
-- We run a 'delayed timer' in case no call is received for a while to still flush
-- the buffer and have 'real-time' analytics.
-- 1. The sending queue keeps sending batches one by one, and if batches are acknowledged by the socket server,
-- the batch is considered saved and is discarded.
-- 2. If the batch is invalid (bad ALF formatting) according to the socket server, it is discarded and won't be retried.
-- 3. If the connection to the socket server could not be made, the batch will not be discarded so it can be retried.
-- 4. The sending queue keeps sending batches as long as it has some pending for sending. If the connection failed (3.),
-- the sending queue will use a retry policy timer which is incremented everytime the socket server did not answer.
-- 5. We run a 'delayed timer' in case no call is received for a while to still flush the buffer and have 'real-time' analytics.
--
-- @see alf_serializer.lua
-- @see handler.lua
Expand All @@ -26,15 +32,27 @@ local ngx_timer_at = ngx.timer.at
local table_insert = table.insert
local table_concat = table.concat
local table_remove = table.remove
local string_sub = string.sub
local string_len = string.len
local string_rep = string.rep
local string_format = string.format
local math_pow = math.pow
local math_min = math.min
local setmetatable = setmetatable

local MB = 1024 * 1024
local MAX_BUFFER_SIZE = 1 * MB
local MAX_BUFFER_SIZE = 500 * MB
local EMPTY_ARRAY_PLACEHOLDER = "__empty_array_placeholder__"

-- Define an exponential retry policy for all workers.
-- The policy will give a delay that grows everytime
-- Galileo fails to respond. As soon as Galileo responds,
-- the delay is reset to its base.
local dict = ngx.shared.locks
local RETRY_INDEX_KEY = "mashape_analytics_retry_index"
local RETRY_BASE_DELAY = 1 -- seconds
local RETRY_MAX_DELAY = 60 -- seconds

local buffer_mt = {}
buffer_mt.__index = buffer_mt
buffer_mt.MAX_BUFFER_SIZE = MAX_BUFFER_SIZE
Expand All @@ -44,10 +62,10 @@ buffer_mt.MAX_BUFFER_SIZE = MAX_BUFFER_SIZE
-- as possible.
local delayed_send_handler
delayed_send_handler = function(premature, buffer)
if ngx_now() - buffer.latest_call < buffer.AUTO_FLUSH_DELAY then
if ngx_now() - buffer.latest_call < buffer.auto_flush_delay then
-- If the latest call was received during the wait delay, abort the delayed send and
-- report it for X more seconds.
local ok, err = ngx_timer_at(buffer.AUTO_FLUSH_DELAY, delayed_send_handler, buffer)
local ok, err = ngx_timer_at(buffer.auto_flush_delay, delayed_send_handler, buffer)
if not ok then
buffer.lock_delayed = false -- re-enable creation of a delayed-timer for this buffer
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to create delayed batch sending timer: ", err)
Expand All @@ -64,18 +82,20 @@ end
-- Instanciate a new buffer with configuration and properties
function buffer_mt.new(conf)
local buffer = {
MAX_ENTRIES = conf.batch_size,
MAX_SIZE = MAX_BUFFER_SIZE,
AUTO_FLUSH_DELAY = conf.delay,
HOST = conf.host,
PORT = conf.port,
PATH = conf.path,
max_entries = conf.batch_size,
max_entries_size = buffer_mt.MAX_BUFFER_SIZE, -- using the value attached to the buffer_mt allows us to unit test this
auto_flush_delay = conf.delay,
host = conf.host,
port = conf.port,
path = conf.path,
max_queue_size = conf.sending_queue_size * MB,
entries = {}, -- current buffer as an array of strings (serialized ALFs)
entries_size = 0, -- current buffer size in bytes
entries_size = 0, -- current entries size in bytes (total)
sending_queue = {}, -- array of constructed payloads (batches of ALFs) to be sent
sending_queue_size = 0, -- current sending queue size in bytes
lock_sending = false, -- lock if currently sending its data
lock_delayed = false, -- lock if a delayed timer is already set for this buffer
latest_call = nil -- date at which a request was last made to this API (for delayed timer)
latest_call = nil -- date at which a request was last made to this API (for the delayed timer to know if it needs to trigger)
}
return setmetatable(buffer, buffer_mt)
end
Expand All @@ -93,18 +113,17 @@ function buffer_mt:add_alf(alf)
str = str:gsub("\""..EMPTY_ARRAY_PLACEHOLDER.."\"", ""):gsub("\\/", "/")

-- Check what would be the size of the buffer
local next_n_entries = #self.entries + 1
local next_n_entries = table_getn(self.entries) + 1
local alf_size = string_len(str)

-- If the alf_size exceeds the payload limit by itself, we have a big problem
if alf_size > self.MAX_SIZE then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] ALF size exceeded the maximum size (%sMB) accepted by the socket server. Dropping it.",
self.MAX_SIZE / MB))
if alf_size > self.max_entries_size then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] ALF size exceeded the maximum size (%sMB) accepted by the socket server. Dropping it.", self.max_entries_size / MB))
return
end

-- If size or entries exceed the max limits
local full = next_n_entries > self.MAX_ENTRIES or (self:get_size() + alf_size) > self.MAX_SIZE
local full = next_n_entries > self.max_entries or (self:get_size() + alf_size) > self.max_entries_size
if full then
self:flush()
-- Batch size reached, let's send the data
Expand All @@ -116,7 +135,7 @@ function buffer_mt:add_alf(alf)
-- Batch size not yet reached.
-- Set a timer sending the data only in case nothing happens for awhile or if the batch_size is taking
-- too much time to reach the limit and trigger the flush.
local ok, err = ngx_timer_at(self.AUTO_FLUSH_DELAY, delayed_send_handler, self)
local ok, err = ngx_timer_at(self.auto_flush_delay, delayed_send_handler, self)
if ok then
self.lock_delayed = true -- Make sure only one delayed timer is ever pending for a given buffer
else
Expand All @@ -137,28 +156,40 @@ end

-- Get the size of the current buffer if it was to be converted to a JSON payload
function buffer_mt:get_size()
local commas = string_rep(",", #self.entries - 1)
local commas = string_rep(",", table_getn(self.entries) - 1)
return string_len(commas.."[]") + self.entries_size
end

-- Flush the buffer
-- 1. Convert the content of it into a JSON payload
-- 2. Add the payload to the queue of payloads to be sent
-- 3. Empty the buffer and reset the current buffer size
-- 1. Make sure the current sending queue doesn't exceed its size limit
-- 1b. Convert its content into a JSON payload
-- 1c. Add the payload to the queue of payloads to be sent
-- 2. Empty the buffer and reset the current buffer size
function buffer_mt:flush()
local payload = self:payload_string()
table_insert(self.sending_queue, {
payload = payload,
n_entries = #self.entries,
size = self:get_size()
})
local size = self:get_size()

-- Make sure we don't cross the size limit. The only exception is if the sending_queue is empty,
-- it could happen that the configuration is erroneous (ex: batch_size too big for a too small sending_queue_size)
if self.sending_queue_size + size <= self.max_queue_size or table_getn(self.sending_queue) < 1 then
self.sending_queue_size = self.sending_queue_size + size

table_insert(self.sending_queue, {
payload = self:payload_string(),
n_entries = table_getn(self.entries),
size = size
})
else
ngx_log(ngx.NOTICE, string_format("[mashape-analytics] buffer reached its maximum sending queue size. (%s) ALFs, (%s) bytes dropped.", table_getn(self.entries), size))
end

self.entries = {}
self.entries_size = 0
end

-- Send the oldest payload (batch of ALFs) from the queue to the socket server.
-- The payload will be removed if the socket server acknowledged the batch.
-- If the queue still has payloads to be sent, keep on sending them.
-- If the connection to the socket server fails, use the retry policy.
function buffer_mt.send_batch(premature, self)
if self.lock_sending then return end
self.lock_sending = true -- simple lock
Expand All @@ -170,51 +201,80 @@ function buffer_mt.send_batch(premature, self)
-- Let's send the oldest batch in our queue
local batch_to_send = table_remove(self.sending_queue, 1)

local drop_batch = false
local retry
local client = http:new()
client:set_timeout(50000) -- 5 sec

local ok, err = client:connect(self.HOST, self.PORT)
local ok, err = client:connect(self.host, self.port)
if ok then
local res, err = client:request({path = self.PATH, body = batch_to_send.payload})
local res, err = client:request({path = self.path, body = batch_to_send.payload})
if not res then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] failed to send batch (%s ALFs %s bytes): %s",
batch_to_send.n_entries, batch_to_send.size, err))
elseif res.status == 200 then
drop_batch = true
ngx_log(ngx.DEBUG, string_format("[mashape-analytics] successfully saved the batch. (%s)", res.body))
elseif res.status == 400 then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server refused the batch (%s ALFs %s bytes). Dropping batch. Status: (%s) Error: (%s)",
batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
drop_batch = true
retry = true
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] failed to send batch (%s ALFs %s bytes): %s", batch_to_send.n_entries, batch_to_send.size, err))
else
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server could not save the batch (%s ALFs %s bytes). Status: (%s) Error: (%s)",
batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
res.body = string_sub(res.body, 1, -2) -- remove trailing line jump for logs
if res.status == 200 then
ngx_log(ngx.NOTICE, string_format("[mashape-analytics] successfully saved the batch. (%s)", res.body))
elseif res.status == 207 then
retry = true
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server could not save all ALFs from the batch. (%s)", res.body))
elseif res.status == 400 then
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server refused the batch (%s ALFs %s bytes). Dropping batch. Status: (%s) Error: (%s)", batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
else
retry = true
ngx_log(ngx_log_ERR, string_format("[mashape-analytics] socket server could not save the batch (%s ALFs %s bytes). Status: (%s) Error: (%s)", batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
end
end

-- close connection, or put it into the connection pool
if not res or res.headers["connection"] == "close" then
ok, err = client:close()
if not ok then
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to close socket: "..err)
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to close socket: ", err)
end
else
client:set_keepalive()
end
else
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to connect to the socket server: "..err)
retry = true
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to connect to the socket server: ", err)
end

if not drop_batch then
-- If the batch is not dropped, then add it back to the end of the queue and it will be tried again later
table_insert(self.sending_queue, batch_to_send)
local next_batch_delay = 0 -- default delay for the next batch sending

if retry then
-- could not reach the socket server, need to retry
table_insert(self.sending_queue, 1, batch_to_send)

local ok, err = dict:add(RETRY_INDEX_KEY, 0)
if not ok and err ~= "exists" then
ngx_log(ngx_log_ERR, "[mashape-analytics] cannot prepare retry policy: ", err)
end

local index, err = dict:incr(RETRY_INDEX_KEY, 1)
if err then
ngx_log(ngx_log_ERR, "[mashape-analytics] cannot increment retry policy index: ", err)
elseif index then
next_batch_delay = math_min(math_pow(index, 2) * RETRY_BASE_DELAY, RETRY_MAX_DELAY)
end

ngx_log(ngx.NOTICE, string_format("[mashape-analytics] batch was queued for retry. Next retry in: %s seconds", next_batch_delay))
else
-- batch acknowledged by the socket server
self.sending_queue_size = self.sending_queue_size - batch_to_send.size

-- reset retry policy
local ok, err = dict:set(RETRY_INDEX_KEY, 0)
if not ok then
ngx_log(ngx_log_ERR, "[mashape-analytics] cannot reset retry policy index: ", err)
end
end

self.lock_sending = false

-- Keep sendind data if the queue is not yet emptied
if #self.sending_queue > 0 then
local ok, err = ngx_timer_at(2, self.send_batch, self)
if table_getn(self.sending_queue) > 0 then
local ok, err = ngx_timer_at(next_batch_delay, self.send_batch, self)
if not ok then
ngx_log(ngx_log_ERR, "[mashape-analytics] failed to create batch retry timer: ", err)
end
Expand Down
17 changes: 9 additions & 8 deletions kong/plugins/mashape-analytics/schema.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
return {
fields = {
service_token = { type = "string", required = true },
environment = { type = "string" },
batch_size = { type = "number", default = 100 },
log_body = { type = "boolean", default = false },
delay = { type = "number", default = 2 },
host = { required = true, type = "string", default = "socket.analytics.mashape.com" },
port = { required = true, type = "number", default = 80 },
path = { required = true, type = "string", default = "/1.0.0/batch" }
service_token = {type = "string", required = true},
environment = {type = "string"},
batch_size = {type = "number", default = 100},
log_body = {type = "boolean", default = false},
delay = {type = "number", default = 2},
sending_queue_size = {type = "number", default = 10}, -- in mb
host = {required = true, type = "string", default = "socket.analytics.mashape.com"},
port = {required = true, type = "number", default = 80},
path = {required = true, type = "string", default = "/1.0.0/batch"}
}
}
Loading

0 comments on commit 87e15dd

Please sign in to comment.