Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use TimerTask in the 3scale batcher policy #786

Merged
merged 8 commits into from
Jul 2, 2018
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- OpenTracing support [PR #669](https://github.com/3scale/apicast/pull/669)
- Generate new policy scaffold from the CLI [PR #682](https://github.com/3scale/apicast/pull/682)
- 3scale batcher policy [PR #685](https://github.com/3scale/apicast/pull/685), [PR #710](https://github.com/3scale/apicast/pull/710), [PR #757](https://github.com/3scale/apicast/pull/757)
- 3scale batcher policy [PR #685](https://github.com/3scale/apicast/pull/685), [PR #710](https://github.com/3scale/apicast/pull/710), [PR #757](https://github.com/3scale/apicast/pull/757), [PR #786](https://github.com/3scale/apicast/pull/786)
- Liquid templating support in the headers policy configuration [PR #716](https://github.com/3scale/apicast/pull/716)
- Ability to modify query parameters in the URL rewriting policy [PR #724](https://github.com/3scale/apicast/pull/724)
- 3scale referrer policy [PR #728](https://github.com/3scale/apicast/pull/728), [PR #777](https://github.com/3scale/apicast/pull/777)
Expand Down
52 changes: 36 additions & 16 deletions gateway/src/apicast/policy/3scale_batcher/3scale_batcher.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,27 @@ local reporter = require('reporter')
local Transaction = require('transaction')
local http_ng_resty = require('resty.http_ng.backend.resty')
local semaphore = require('ngx.semaphore')
local TimerTask = require('resty.concurrent.timer_task')

local ipairs = ipairs

local default_auths_ttl = 10
local default_batch_reports_seconds = 10

local _M = policy.new('Caching policy')
local _M, mt = policy.new('Caching policy')

local new = _M.new

mt.__gc = function(self)
-- Instances of this policy are garbage-collected when the config is
-- reloaded. We need to ensure that the TimerTask instance schedules another
-- run before that so we do not leave any pending reports.

if self.timer_task then
self.timer_task:cancel(true)
end
end

function _M.new(config)
local self = new(config)

Expand All @@ -30,9 +41,7 @@ function _M.new(config)
self.batch_reports_seconds = config.batch_report_seconds or
default_batch_reports_seconds

self.report_timer_on = false

-- Semaphore used to ensure that only one timer is started per worker.
-- Semaphore used to ensure that only one TimerTask is started per worker.
local semaphore_report_timer, err = semaphore.new(1)
if not semaphore_report_timer then
ngx.log(ngx.ERR, "Create semaphore failed: ", err)
Expand Down Expand Up @@ -70,7 +79,7 @@ local function set_flags_to_avoid_auths_in_apicast(context)
context.skip_apicast_post_action = true
end

local function report(_, service_id, backend, reports_batcher)
local function report(service_id, backend, reports_batcher)
local reports = reports_batcher:get_all(service_id)

if reports then
Expand All @@ -81,22 +90,33 @@ local function report(_, service_id, backend, reports_batcher)
reporter.report(reports, service_id, backend, reports_batcher)
end

-- This starts a timer on each worker.
-- Starting a timer on each worker means that there will be more calls to
local function timer_task(self, service_id, backend)
local task = report

local task_options = {
args = { service_id, backend, self.reports_batcher },
interval = self.batch_reports_seconds
}

return TimerTask.new(task, task_options)
end

-- This starts a TimerTask on each worker.
-- Starting a TimerTask on each worker means that there will be more calls to
-- 3scale backend, and the config param 'batch_report_seconds' becomes
-- more confusing because the reporting frequency will be affected by the
-- number of APIcast workers.
-- If we started a timer just on one of the workers, it could die, and then,
-- If we started a TimerTask just on one of the workers, it could die, and then,
-- there would not be any reporting.
local function ensure_report_timer_on(self, service_id, backend)
local check_timer = self.semaphore_report_timer:wait(0)
local function ensure_timer_task_created(self, service_id, backend)
local check_timer_task = self.semaphore_report_timer:wait(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to .new ? This policy should not be initialized in init phase, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't because we don't have access to the service ID in .new()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Maybe better to leave it for later, butI think we should think about changing the data structure to remove the service_id dependency. It should be as simple as create timer in new that gets reports from reports batcher. There should be no need for service_id because reports batcher instance is specific to this policy and timer, so it will ever contain only reports from this service. I see the reports batcher actually uses that service_id to do a lock on shmem. Hopefully we can solve that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. That part can be simplified a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we generate some UUID instead of using service id ? Then every policy would take care just of its own data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to an idea that we discussed some time ago.

Right now, pending reports are stored in a shared dictionary and every instance of the policy creates a timer (per worker) to report all the pending reports for its service ID.

After introducing TimerTask I see it more clearly that it might be better to adopt a different strategy. We could store the pending reports in a table of the instance, as we know that there'll only be one instance of the policy for a given service ID per Apicast worker. Pros: no locks, simpler code. Cons: possibility of losing reports if a worker dies, which in practice I don't think it'll be an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this for a future PR.
In this one, I'd like to focus on switching to using TimerTask to be able to cancel timers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, definitely 👍


if check_timer_task then
if not self.timer_task then
self.timer_task = timer_task(self, service_id, backend)

if check_timer then
if not self.report_timer_on then
ngx.timer.every(self.batch_reports_seconds, report,
service_id, backend, self.reports_batcher)
self.timer_task:execute()

self.report_timer_on = true
ngx.log(ngx.DEBUG, 'scheduled 3scale batcher report timer every ',
self.batch_reports_seconds, ' seconds')
end
Expand Down Expand Up @@ -166,7 +186,7 @@ function _M:access(context)
local credentials = context.credentials
local transaction = Transaction.new(service_id, credentials, usage)

ensure_report_timer_on(self, service_id, backend)
ensure_timer_task_created(self, service_id, backend)

local cached_auth = self.auths_cache:get(transaction)

Expand Down
5 changes: 4 additions & 1 deletion gateway/src/apicast/policy/3scale_batcher/reporter.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local ReportsBatch = require('apicast.policy.3scale_batcher.reports_batch')
local Usage = require('apicast.usage')
local Transaction = require('apicast.policy.3scale_batcher.transaction')

local pairs = pairs

Expand All @@ -14,11 +15,13 @@ local function return_reports(service_id, batch, reports_batcher)
usage:add(metric, value)
end

reports_batcher:add(
local transaction = Transaction.new(
service_id,
{ [credentials_type] = credential },
usage
)

reports_batcher:add(transaction)
end
end

Expand Down
27 changes: 24 additions & 3 deletions gateway/src/resty/concurrent/timer_task.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ local default_interval_seconds = 60

_M.active_tasks = {}

-- Whether a run should be the last one for a given ID
-- When a task is marked to run for a last time, it will do so even if it is
-- cancelled.
_M.last_one = {}

function _M.register_task(id)
_M.active_tasks[id] = true
end
Expand Down Expand Up @@ -58,13 +63,19 @@ end
local run_periodic, schedule_next, timer_execute

run_periodic = function(run_now, id, func, args, interval)
if not _M.task_is_active(id) then return end
if not _M.task_is_active(id) and not _M.last_one[id] then
return
end

if run_now then
func(unpack(args))
end

schedule_next(interval, id, func, args, interval)
if not _M.last_one[id] then
schedule_next(id, func, args, interval)
else
_M.last_one[id] = nil
end
end

-- Note: ngx.timer.at always sends "premature" as the first param.
Expand All @@ -89,7 +100,17 @@ function _M:execute(run_now)
run_periodic(run_now or false, self.id, self.task, self.args, self.interval)
end

function _M:cancel()
--- Cancel a task
-- @tparam[opt] run_one_more boolean True to ensure that the task will run one
-- more time before it is cancelled. False to just cancel the task. (Defaults
-- to false)
function _M:cancel(run_one_more)
if run_one_more then
_M.last_one[self.id] = true
end

-- We can cancel the task in all cases because the flag to run for the last
-- time has precedence.
_M.unregister_task(self.id)
end

Expand Down
9 changes: 9 additions & 0 deletions spec/policy/3scale_batcher/3scale_batcher_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@ local Transaction = require('apicast.policy.3scale_batcher.transaction')
local Usage = require('apicast.usage')
local configuration = require('apicast.configuration')
local lrucache = require('resty.lrucache')
local TimerTask = require('resty.concurrent.timer_task')

describe('3scale batcher policy', function()
before_each(function()
TimerTask.active_tasks = {}
end)

describe('.new', function()
it('allows to configure the batching period', function()
local test_batching_period = 3
Expand Down Expand Up @@ -42,6 +47,10 @@ describe('3scale batcher policy', function()
batcher_policy.auths_cache = AuthsCache.new(lrucache.new(10), 10)
stub(batcher_policy.reports_batcher, 'add')

-- if a report job executes, by default, stub the batcher so it returns
-- no pending reports.
stub(batcher_policy.reports_batcher, 'get_all').returns({})

stub(batcher_policy, 'backend_downtime_cache')

context = {
Expand Down
63 changes: 32 additions & 31 deletions spec/policy/3scale_batcher/reporter_spec.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
local reporter = require('apicast.policy.3scale_batcher.reporter')
local keys_helper = require('apicast.policy.3scale_batcher.keys_helper')
local ipairs = ipairs
local ReportsBatcher = require('apicast.policy.3scale_batcher.reports_batcher')
local lrucache = require('resty.lrucache')
local resty_lock = require 'resty.lock'
local pairs = pairs
local insert = table.insert

-- ReportsBatcher uses a shdict. For the test we can use a lrucache instead
-- but we need to define 2 missing methods (safe_add and get_keys)
local function build_fake_shdict()
local fake_shdict = lrucache.new(100)

fake_shdict.safe_add = function(self, k, v)
local current = self:get(k) or 0
self:set(k, current + v)
end

fake_shdict.get_keys = function(self)
local res = {}

for k, _ in pairs(self.hasht) do
insert(res, k)
end

return res
end

return fake_shdict
end

describe('reporter', function()
local test_service_id = 's1'

Expand All @@ -13,37 +37,14 @@ describe('reporter', function()
before_each(function()
test_backend_client = { report = function() return { ok = false } end }
spy_report_backend_client = spy.on(test_backend_client, 'report')

-- Mock the lock so it can always be acquired and returned without waiting.
stub(resty_lock, 'new').returns(
{ lock = function() return 0 end, unlock = function() return 1 end }
)
end)

-- Testing using the real ReportsBatcher is a bit difficult because it uses
-- shared dicts and locks. To simplify we define this table with the same
-- interface.
local reports_batcher = {
reports = {},

add = function(self, service_id, credentials, usage)
local deltas = usage.deltas
for _, metric in ipairs(usage.metrics) do
local key = keys_helper.key_for_batched_report(service_id, credentials, metric)
self.reports[key] = (self.reports[key] or 0) + deltas[metric]
end
end,

get_all = function(self, service_id)
local cached_reports = {}

for key, value in pairs(self.reports) do
local report = keys_helper.report_from_key_batched_report(key, value)

if value and value > 0 and report.service_id == service_id then
insert(cached_reports, report)
self.reports[key] = nil
end
end

return cached_reports
end
}
local reports_batcher = ReportsBatcher.new(build_fake_shdict())

it('returns reports to the batcher when sending reports to backend fails', function()
local test_reports = {
Expand Down
46 changes: 46 additions & 0 deletions spec/resty/concurrent/timer_task_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ describe('TimerTask', function()

before_each(function()
TimerTask.active_tasks = {}
TimerTask.last_one = {}
end)

after_each(function()
Expand Down Expand Up @@ -85,6 +86,14 @@ describe('TimerTask', function()

assert.is_false(TimerTask.task_is_active(task.id))
end)

it('marks the task to run for the last time when specified in the params', function()
local task = TimerTask.new(test_task)

task:cancel(true)

assert.is_true(TimerTask.last_one[task.id])
end)
end)

describe(':execute', function()
Expand Down Expand Up @@ -114,6 +123,12 @@ describe('TimerTask', function()
timer_task:execute(true)

assert.stub(ngx_timer_stub).was_called()

-- Can't check all the arguments of ngx.timer.at because it calls an
-- private function but at least we can check the interval (first arg),
-- and that the second argument is a function.
assert.equals(interval, ngx_timer_stub.calls[1].vals[1])
assert.is_function(ngx_timer_stub.calls[1].vals[2])
end)
end)

Expand Down Expand Up @@ -157,6 +172,27 @@ describe('TimerTask', function()
assert.stub(ngx_timer_stub).was_called()
end)
end)

describe('when the task should run for the last time', function()
it('runs the task', function()
local timer_task = TimerTask.new(func, { args = args, interval = interval })
local func_spy = spy.on(timer_task, 'task')
timer_task:cancel(true)

timer_task:execute(true)

assert.spy(func_spy).was_called_with(unpack(args))
end)

it('does not schedule another task', function()
local timer_task = TimerTask.new(func, { args = args, interval = interval })
timer_task:cancel(true)

timer_task:execute(true)

assert.stub(ngx_timer_stub).was_not_called()
end)
end)
end)

it('cancels itself when it is garbage collected', function()
Expand All @@ -168,4 +204,14 @@ describe('TimerTask', function()

assert.is_false(TimerTask.task_is_active(id))
end)

it('does not ensure a last run when garbage collected', function()
local timer_task = TimerTask.new(test_task)
local id = timer_task.id

timer_task = nil
collectgarbage()

assert.is_falsy(TimerTask.last_one[id])
end)
end)
Loading