Skip to content

Commit

Permalink
change: limit the maximum length of Lua code to 100. (#1525)
Browse files Browse the repository at this point in the history
  • Loading branch information
membphis authored Apr 29, 2020
1 parent d970dab commit a446cd0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
1 change: 1 addition & 0 deletions .luacheckrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
std = "ngx_lua"
unused_args = false
redefined = false
max_line_length = 100
53 changes: 30 additions & 23 deletions apisix/utils/batch-processor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,64 +42,70 @@ local schema = {
}


local function schedule_func_exec(batch_processor, delay, batch)
local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
local function schedule_func_exec(self, delay, batch)
local hdl, err = timer_at(delay, execute_func, self, batch)
if not hdl then
core.log.error("failed to create process timer: ", err)
return
end
end


function execute_func(premature, batch_processor, batch)
function execute_func(premature, self, batch)
if premature then
return
end

local ok, err = batch_processor.func(batch.entries, batch_processor.batch_max_size)
local ok, err = self.func(batch.entries, self.batch_max_size)
if not ok then
core.log.error("Batch Processor[", batch_processor.name, "] failed to process entries: ", err)
core.log.error("Batch Processor[", self.name,
"] failed to process entries: ", err)
batch.retry_count = batch.retry_count + 1
if batch.retry_count <= batch_processor.max_retry_count then
schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
if batch.retry_count <= self.max_retry_count then
schedule_func_exec(self, self.retry_delay,
batch)
else
core.log.error("Batch Processor[", batch_processor.name,"] exceeded ",
"the max_retry_count[", batch.retry_count,"] dropping the entries")
core.log.error("Batch Processor[", self.name,"] exceeded ",
"the max_retry_count[", batch.retry_count,
"] dropping the entries")
end
return
end

core.log.debug("Batch Processor[", batch_processor.name ,"] successfully processed the entries")
core.log.debug("Batch Processor[", self.name,
"] successfully processed the entries")
end


local function flush_buffer(premature, batch_processor)
local function flush_buffer(premature, self)
if premature then
return
end

if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
core.log.debug("Batch Processor[", batch_processor.name ,"] buffer ",
if now() - self.last_entry_t >= self.inactive_timeout or
now() - self.first_entry_t >= self.buffer_duration
then
core.log.debug("Batch Processor[", self.name ,"] buffer ",
"duration exceeded, activating buffer flush")
batch_processor:process_buffer()
batch_processor.is_timer_running = false
self:process_buffer()
self.is_timer_running = false
return
end

-- buffer duration did not exceed or the buffer is active, extending the timer
core.log.debug("Batch Processor[", batch_processor.name ,"] extending buffer timer")
create_buffer_timer(batch_processor)
-- buffer duration did not exceed or the buffer is active,
-- extending the timer
core.log.debug("Batch Processor[", self.name ,"] extending buffer timer")
create_buffer_timer(self)
end


function create_buffer_timer(batch_processor)
local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
function create_buffer_timer(self)
local hdl, err = timer_at(self.inactive_timeout, flush_buffer, self)
if not hdl then
core.log.error("failed to create buffer timer: ", err)
return
end
batch_processor.is_timer_running = true
self.is_timer_running = true
end


Expand Down Expand Up @@ -149,7 +155,8 @@ function Batch_Processor:push(entry)
self.last_entry_t = now()

if self.batch_max_size <= #entries then
core.log.debug("Batch Processor[", self.name ,"] batch max size has exceeded")
core.log.debug("Batch Processor[", self.name ,
"] batch max size has exceeded")
self:process_buffer()
end

Expand Down

0 comments on commit a446cd0

Please sign in to comment.