Skip to content

Commit

Permalink
Merge pull request #786 from 3scale/use-timertasks-in-batcher-policy
Browse files Browse the repository at this point in the history
Use TimerTask in the 3scale batcher policy
  • Loading branch information
davidor authored Jul 2, 2018
2 parents 0fa2ed4 + b04d254 commit a95ebdd
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 189 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- OpenTracing support [PR #669](https://github.com/3scale/apicast/pull/669)
- Generate new policy scaffold from the CLI [PR #682](https://github.com/3scale/apicast/pull/682)
- 3scale batcher policy [PR #685](https://github.com/3scale/apicast/pull/685), [PR #710](https://github.com/3scale/apicast/pull/710), [PR #757](https://github.com/3scale/apicast/pull/757)
- 3scale batcher policy [PR #685](https://github.com/3scale/apicast/pull/685), [PR #710](https://github.com/3scale/apicast/pull/710), [PR #757](https://github.com/3scale/apicast/pull/757), [PR #786](https://github.com/3scale/apicast/pull/786)
- Liquid templating support in the headers policy configuration [PR #716](https://github.com/3scale/apicast/pull/716)
- Ability to modify query parameters in the URL rewriting policy [PR #724](https://github.com/3scale/apicast/pull/724)
- 3scale referrer policy [PR #728](https://github.com/3scale/apicast/pull/728), [PR #777](https://github.com/3scale/apicast/pull/777)
Expand Down
52 changes: 36 additions & 16 deletions gateway/src/apicast/policy/3scale_batcher/3scale_batcher.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,27 @@ local reporter = require('reporter')
local Transaction = require('transaction')
local http_ng_resty = require('resty.http_ng.backend.resty')
local semaphore = require('ngx.semaphore')
local TimerTask = require('resty.concurrent.timer_task')

local ipairs = ipairs

local default_auths_ttl = 10
local default_batch_reports_seconds = 10

local _M = policy.new('Caching policy')
local _M, mt = policy.new('Caching policy')

local new = _M.new

mt.__gc = function(self)
-- Instances of this policy are garbage-collected when the config is
-- reloaded. We need to ensure that the TimerTask instance schedules another
-- run before that so we do not leave any pending reports.

if self.timer_task then
self.timer_task:cancel(true)
end
end

function _M.new(config)
local self = new(config)

Expand All @@ -30,9 +41,7 @@ function _M.new(config)
self.batch_reports_seconds = config.batch_report_seconds or
default_batch_reports_seconds

self.report_timer_on = false

-- Semaphore used to ensure that only one timer is started per worker.
-- Semaphore used to ensure that only one TimerTask is started per worker.
local semaphore_report_timer, err = semaphore.new(1)
if not semaphore_report_timer then
ngx.log(ngx.ERR, "Create semaphore failed: ", err)
Expand Down Expand Up @@ -70,7 +79,7 @@ local function set_flags_to_avoid_auths_in_apicast(context)
context.skip_apicast_post_action = true
end

local function report(_, service_id, backend, reports_batcher)
local function report(service_id, backend, reports_batcher)
local reports = reports_batcher:get_all(service_id)

if reports then
Expand All @@ -81,22 +90,33 @@ local function report(_, service_id, backend, reports_batcher)
reporter.report(reports, service_id, backend, reports_batcher)
end

-- This starts a timer on each worker.
-- Starting a timer on each worker means that there will be more calls to
local function timer_task(self, service_id, backend)
local task = report

local task_options = {
args = { service_id, backend, self.reports_batcher },
interval = self.batch_reports_seconds
}

return TimerTask.new(task, task_options)
end

-- This starts a TimerTask on each worker.
-- Starting a TimerTask on each worker means that there will be more calls to
-- 3scale backend, and the config param 'batch_report_seconds' becomes
-- more confusing because the reporting frequency will be affected by the
-- number of APIcast workers.
-- If we started a timer just on one of the workers, it could die, and then,
-- If we started a TimerTask just on one of the workers, it could die, and then,
-- there would not be any reporting.
local function ensure_report_timer_on(self, service_id, backend)
local check_timer = self.semaphore_report_timer:wait(0)
local function ensure_timer_task_created(self, service_id, backend)
local check_timer_task = self.semaphore_report_timer:wait(0)

if check_timer_task then
if not self.timer_task then
self.timer_task = timer_task(self, service_id, backend)

if check_timer then
if not self.report_timer_on then
ngx.timer.every(self.batch_reports_seconds, report,
service_id, backend, self.reports_batcher)
self.timer_task:execute()

self.report_timer_on = true
ngx.log(ngx.DEBUG, 'scheduled 3scale batcher report timer every ',
self.batch_reports_seconds, ' seconds')
end
Expand Down Expand Up @@ -166,7 +186,7 @@ function _M:access(context)
local credentials = context.credentials
local transaction = Transaction.new(service_id, credentials, usage)

ensure_report_timer_on(self, service_id, backend)
ensure_timer_task_created(self, service_id, backend)

local cached_auth = self.auths_cache:get(transaction)

Expand Down
5 changes: 4 additions & 1 deletion gateway/src/apicast/policy/3scale_batcher/reporter.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local ReportsBatch = require('apicast.policy.3scale_batcher.reports_batch')
local Usage = require('apicast.usage')
local Transaction = require('apicast.policy.3scale_batcher.transaction')

local pairs = pairs

Expand All @@ -14,11 +15,13 @@ local function return_reports(service_id, batch, reports_batcher)
usage:add(metric, value)
end

reports_batcher:add(
local transaction = Transaction.new(
service_id,
{ [credentials_type] = credential },
usage
)

reports_batcher:add(transaction)
end
end

Expand Down
27 changes: 24 additions & 3 deletions gateway/src/resty/concurrent/timer_task.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ local default_interval_seconds = 60

_M.active_tasks = {}

-- Whether a run should be the last one for a given ID
-- When a task is marked to run for a last time, it will do so even if it is
-- cancelled.
_M.last_one = {}

function _M.register_task(id)
_M.active_tasks[id] = true
end
Expand Down Expand Up @@ -58,13 +63,19 @@ end
local run_periodic, schedule_next, timer_execute

run_periodic = function(run_now, id, func, args, interval)
if not _M.task_is_active(id) then return end
if not _M.task_is_active(id) and not _M.last_one[id] then
return
end

if run_now then
func(unpack(args))
end

schedule_next(interval, id, func, args, interval)
if not _M.last_one[id] then
schedule_next(id, func, args, interval)
else
_M.last_one[id] = nil
end
end

-- Note: ngx.timer.at always sends "premature" as the first param.
Expand All @@ -89,7 +100,17 @@ function _M:execute(run_now)
run_periodic(run_now or false, self.id, self.task, self.args, self.interval)
end

function _M:cancel()
--- Cancel a task
-- @tparam[opt] run_one_more boolean True to ensure that the task will run one
-- more time before it is cancelled. False to just cancel the task. (Defaults
-- to false)
function _M:cancel(run_one_more)
if run_one_more then
_M.last_one[self.id] = true
end

-- We can cancel the task in all cases because the flag to run for the last
-- time has precedence.
_M.unregister_task(self.id)
end

Expand Down
9 changes: 9 additions & 0 deletions spec/policy/3scale_batcher/3scale_batcher_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ local Transaction = require('apicast.policy.3scale_batcher.transaction')
local Usage = require('apicast.usage')
local configuration = require('apicast.configuration')
local lrucache = require('resty.lrucache')
local TimerTask = require('resty.concurrent.timer_task')

describe('3scale batcher policy', function()
before_each(function()
TimerTask.active_tasks = {}
end)

describe('.new', function()
it('allows to configure the batching period', function()
local test_batching_period = 3
Expand Down Expand Up @@ -42,6 +47,10 @@ describe('3scale batcher policy', function()
batcher_policy.auths_cache = AuthsCache.new(lrucache.new(10), 10)
stub(batcher_policy.reports_batcher, 'add')

-- if a report job executes, by default, stub the batcher so it returns
-- no pending reports.
stub(batcher_policy.reports_batcher, 'get_all').returns({})

stub(batcher_policy, 'backend_downtime_cache')

context = {
Expand Down
63 changes: 32 additions & 31 deletions spec/policy/3scale_batcher/reporter_spec.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
local reporter = require('apicast.policy.3scale_batcher.reporter')
local keys_helper = require('apicast.policy.3scale_batcher.keys_helper')
local ipairs = ipairs
local ReportsBatcher = require('apicast.policy.3scale_batcher.reports_batcher')
local lrucache = require('resty.lrucache')
local resty_lock = require 'resty.lock'
local pairs = pairs
local insert = table.insert

-- ReportsBatcher uses a shdict. For the test we can use a lrucache instead
-- but we need to define 2 missing methods (safe_add and get_keys)
local function build_fake_shdict()
local fake_shdict = lrucache.new(100)

fake_shdict.safe_add = function(self, k, v)
local current = self:get(k) or 0
self:set(k, current + v)
end

fake_shdict.get_keys = function(self)
local res = {}

for k, _ in pairs(self.hasht) do
insert(res, k)
end

return res
end

return fake_shdict
end

describe('reporter', function()
local test_service_id = 's1'

Expand All @@ -13,37 +37,14 @@ describe('reporter', function()
before_each(function()
test_backend_client = { report = function() return { ok = false } end }
spy_report_backend_client = spy.on(test_backend_client, 'report')

-- Mock the lock so it can always be acquired and returned without waiting.
stub(resty_lock, 'new').returns(
{ lock = function() return 0 end, unlock = function() return 1 end }
)
end)

-- Testing using the real ReportsBatcher is a bit difficult because it uses
-- shared dicts and locks. To simplify we define this table with the same
-- interface.
local reports_batcher = {
reports = {},

add = function(self, service_id, credentials, usage)
local deltas = usage.deltas
for _, metric in ipairs(usage.metrics) do
local key = keys_helper.key_for_batched_report(service_id, credentials, metric)
self.reports[key] = (self.reports[key] or 0) + deltas[metric]
end
end,

get_all = function(self, service_id)
local cached_reports = {}

for key, value in pairs(self.reports) do
local report = keys_helper.report_from_key_batched_report(key, value)

if value and value > 0 and report.service_id == service_id then
insert(cached_reports, report)
self.reports[key] = nil
end
end

return cached_reports
end
}
local reports_batcher = ReportsBatcher.new(build_fake_shdict())

it('returns reports to the batcher when sending reports to backend fails', function()
local test_reports = {
Expand Down
46 changes: 46 additions & 0 deletions spec/resty/concurrent/timer_task_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ describe('TimerTask', function()

before_each(function()
TimerTask.active_tasks = {}
TimerTask.last_one = {}
end)

after_each(function()
Expand Down Expand Up @@ -85,6 +86,14 @@ describe('TimerTask', function()

assert.is_false(TimerTask.task_is_active(task.id))
end)

it('marks the task to run for the last time when specified in the params', function()
local task = TimerTask.new(test_task)

task:cancel(true)

assert.is_true(TimerTask.last_one[task.id])
end)
end)

describe(':execute', function()
Expand Down Expand Up @@ -114,6 +123,12 @@ describe('TimerTask', function()
timer_task:execute(true)

assert.stub(ngx_timer_stub).was_called()

-- Can't check all the arguments of ngx.timer.at because it calls an
-- private function but at least we can check the interval (first arg),
-- and that the second argument is a function.
assert.equals(interval, ngx_timer_stub.calls[1].vals[1])
assert.is_function(ngx_timer_stub.calls[1].vals[2])
end)
end)

Expand Down Expand Up @@ -157,6 +172,27 @@ describe('TimerTask', function()
assert.stub(ngx_timer_stub).was_called()
end)
end)

describe('when the task should run for the last time', function()
it('runs the task', function()
local timer_task = TimerTask.new(func, { args = args, interval = interval })
local func_spy = spy.on(timer_task, 'task')
timer_task:cancel(true)

timer_task:execute(true)

assert.spy(func_spy).was_called_with(unpack(args))
end)

it('does not schedule another task', function()
local timer_task = TimerTask.new(func, { args = args, interval = interval })
timer_task:cancel(true)

timer_task:execute(true)

assert.stub(ngx_timer_stub).was_not_called()
end)
end)
end)

it('cancels itself when it is garbage collected', function()
Expand All @@ -168,4 +204,14 @@ describe('TimerTask', function()

assert.is_false(TimerTask.task_is_active(id))
end)

it('does not ensure a last run when garbage collected', function()
local timer_task = TimerTask.new(test_task)
local id = timer_task.id

timer_task = nil
collectgarbage()

assert.is_falsy(TimerTask.last_one[id])
end)
end)
Loading

0 comments on commit a95ebdd

Please sign in to comment.