-
Notifications
You must be signed in to change notification settings - Fork 170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
3scale batcher policy #685
Changes from all commits
addb1a1
a7f580e
3d1ea79
94ac269
6b4e0f7
efb7484
eac9265
98f2ea0
14b2adf
e81b1d8
6692777
cfc0992
82df00b
b3fe30c
e0edc01
f388d17
d52edc7
21f4d91
24e34c4
ec6306b
b7789b3
2a1f754
ecfa88e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ local concat = table.concat | |
local insert = table.insert | ||
local len = string.len | ||
local format = string.format | ||
local pairs = pairs | ||
|
||
local http_ng = require('resty.http_ng') | ||
local user_agent = require('apicast.user_agent') | ||
|
@@ -120,6 +121,8 @@ local function auth_path(using_oauth) | |
'/transactions/authorize.xml' | ||
end | ||
|
||
local report_path = '/transactions.xml' | ||
|
||
local function create_token_path(service_id) | ||
return format('/services/%s/oauth_access_tokens.xml', service_id) | ||
end | ||
|
@@ -142,6 +145,32 @@ local function authorize_options(using_oauth) | |
return { headers = headers } | ||
end | ||
|
||
local function add_transaction(transactions, index, cred_type, cred, reports) | ||
local index_with_cred = format('transactions[%s][%s]', index, cred_type) | ||
transactions[index_with_cred] = cred | ||
|
||
for metric, value in pairs(reports) do | ||
local index_with_metric = format('transactions[%s][usage][%s]', index, metric) | ||
transactions[index_with_metric] = value | ||
end | ||
end | ||
|
||
local function format_transactions(reports_batch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure this method should live in the backend client. IMO this client should be unaware of any of other objects and just work with plain tables (and possibly __tostring metamethod on them). edit: or some API exposed by this backend client returning tables that properly serialize. |
||
local res = {} | ||
|
||
-- Note: A service only supports one kind of credentials | ||
local credentials_type = reports_batch.credentials_type | ||
local reports = reports_batch.reports | ||
|
||
local transaction_index = 0 | ||
for credential, metrics in pairs(reports) do | ||
add_transaction(res, transaction_index, credentials_type, credential, metrics) | ||
transaction_index = transaction_index + 1 | ||
end | ||
|
||
return res | ||
end | ||
|
||
--- Call authrep (oauth_authrep) on backend. | ||
-- @tparam ?{table,...} query list of query parameters | ||
-- @treturn http_ng.response http response | ||
|
@@ -168,6 +197,16 @@ function _M:authorize(...) | |
return call_backend_transaction(self, auth_uri, authorize_options(using_oauth), ...) | ||
end | ||
|
||
function _M:report(reports_batch) | ||
local http_client = self.http_client | ||
|
||
local report_uri = build_url(self, report_path) | ||
local report_body = format_transactions(reports_batch) | ||
local res = http_client.post(report_uri, report_body) | ||
|
||
return res | ||
end | ||
|
||
--- Calls backend to create an oauth token. | ||
-- @tparam ?{table, ...} list of query params (might include the token, ttl, | ||
-- app_id, and user_id) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
local _M = { } | ||
|
||
function _M.no_credentials(service) | ||
ngx.log(ngx.INFO, 'no credentials provided for service ', service.id) | ||
ngx.var.cached_key = nil | ||
ngx.status = service.auth_missing_status | ||
ngx.header.content_type = service.auth_missing_headers | ||
ngx.print(service.error_auth_missing) | ||
return ngx.exit(ngx.HTTP_OK) | ||
end | ||
|
||
function _M.authorization_failed(service) | ||
ngx.log(ngx.INFO, 'authorization failed for service ', service.id) | ||
ngx.var.cached_key = nil | ||
ngx.status = service.auth_failed_status | ||
ngx.header.content_type = service.auth_failed_headers | ||
ngx.print(service.error_auth_failed) | ||
return ngx.exit(ngx.HTTP_OK) | ||
end | ||
|
||
function _M.limits_exceeded(service) | ||
ngx.log(ngx.INFO, 'limits exceeded for service ', service.id) | ||
ngx.var.cached_key = nil | ||
ngx.status = service.limits_exceeded_status | ||
ngx.header.content_type = service.limits_exceeded_headers | ||
ngx.print(service.error_limits_exceeded) | ||
return ngx.exit(ngx.HTTP_OK) | ||
end | ||
|
||
function _M.no_match(service) | ||
ngx.header.x_3scale_matched_rules = '' | ||
ngx.log(ngx.INFO, 'no rules matched for service ', service.id) | ||
ngx.var.cached_key = nil | ||
ngx.status = service.no_match_status | ||
ngx.header.content_type = service.no_match_headers | ||
ngx.print(service.error_no_match) | ||
return ngx.exit(ngx.HTTP_OK) | ||
end | ||
|
||
function _M.service_not_found(host) | ||
ngx.status = 404 | ||
ngx.print('') | ||
ngx.log(ngx.WARN, 'could not find service for host: ', host or ngx.var.host) | ||
return ngx.exit(ngx.status) | ||
end | ||
|
||
return _M |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
local backend_client = require('apicast.backend_client') | ||
local AuthsCache = require('auths_cache') | ||
local ReportsBatcher = require('reports_batcher') | ||
local policy = require('apicast.policy') | ||
local errors = require('apicast.errors') | ||
local reporter = require('reporter') | ||
local http_ng_resty = require('resty.http_ng.backend.resty') | ||
local semaphore = require('ngx.semaphore') | ||
|
||
local ipairs = ipairs | ||
|
||
local default_auths_ttl = 10 | ||
local default_batch_reports_seconds = 10 | ||
|
||
local _M = policy.new('Caching policy') | ||
|
||
local new = _M.new | ||
|
||
function _M.new(config) | ||
local self = new(config) | ||
|
||
local auths_ttl = config.auths_ttl or default_auths_ttl | ||
self.auths_cache = AuthsCache.new(ngx.shared.cached_auths, auths_ttl) | ||
|
||
self.reports_batcher = ReportsBatcher.new( | ||
ngx.shared.batched_reports, 'batched_reports_locks') | ||
|
||
self.batch_reports_seconds = config.batch_reports_seconds or | ||
default_batch_reports_seconds | ||
|
||
self.report_timer_on = false | ||
|
||
-- Semaphore used to ensure that only one timer is started per worker. | ||
local semaphore_report_timer, err = semaphore.new(1) | ||
if not semaphore_report_timer then | ||
ngx.log(ngx.ERR, "Create semaphore failed: ", err) | ||
end | ||
self.semaphore_report_timer = semaphore_report_timer | ||
|
||
return self | ||
end | ||
|
||
-- TODO: More policies are using this method. Move it to backend_client to | ||
-- avoid duplicating code. | ||
-- Converts a usage to the format expected by the 3scale backend client. | ||
local function format_usage(usage) | ||
local res = {} | ||
|
||
local usage_metrics = usage.metrics | ||
local usage_deltas = usage.deltas | ||
|
||
for _, metric in ipairs(usage_metrics) do | ||
local delta = usage_deltas[metric] | ||
res['usage[' .. metric .. ']'] = delta | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to change this now, but this should also use |
||
end | ||
|
||
return res | ||
end | ||
|
||
local function set_flags_to_avoid_auths_in_apicast(context) | ||
context.skip_apicast_access = true | ||
context.skip_apicast_post_action = true | ||
end | ||
|
||
local function report(_, service_id, backend, reports_batcher) | ||
local reports = reports_batcher:get_all(service_id) | ||
|
||
-- TODO: verify if we should limit the number of reports sent in a sigle req | ||
reporter.report(reports, service_id, backend, reports_batcher) | ||
end | ||
|
||
-- This starts a timer on each worker. | ||
-- Starting a timer on each worker means that there will be more calls to | ||
-- 3scale backend, and the config param 'batch_report_seconds' becomes | ||
-- more confusing because the reporting frequency will be affected by the | ||
-- number of APIcast workers. | ||
-- If we started a timer just on one of the workers, it could die, and then, | ||
-- there would not be any reporting. | ||
local function ensure_report_timer_on(self, service_id, backend) | ||
local check_timer = self.semaphore_report_timer:wait(0) | ||
|
||
if check_timer then | ||
if not self.report_timer_on then | ||
ngx.timer.every(self.batch_reports_seconds, report, | ||
service_id, backend, self.reports_batcher) | ||
|
||
self.report_timer_on = true | ||
end | ||
|
||
self.semaphore_report_timer:post() | ||
end | ||
end | ||
|
||
local function rejection_reason_from_headers(response_headers) | ||
return response_headers and response_headers['3scale-rejection-reason'] | ||
end | ||
|
||
local function error(service, rejection_reason) | ||
if rejection_reason == 'limits_exceeded' then | ||
return errors.limits_exceeded(service) | ||
else | ||
return errors.authorization_failed(service) | ||
end | ||
end | ||
|
||
-- Note: when an entry in the cache expires, there might be several requests | ||
-- with those credentials and all of them will call auth() on backend with the | ||
-- same parameters until the auth status is cached again. In the future, we | ||
-- might want to introduce a mechanism to avoid this and reduce the number of | ||
-- calls to backend. | ||
function _M:access(context) | ||
local backend = backend_client:new(context.service, http_ng_resty) | ||
local usage = context.usage | ||
local service = context.service | ||
local service_id = service.id | ||
local credentials = context.credentials | ||
|
||
ensure_report_timer_on(self, service_id, backend) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We definitely have to expose some way to init/destroy policies so they can do stuff like this. Global policies can do this in init_worker and we need some mechanics for local policies too. Including destructors because they can get GC'd. |
||
|
||
local cached_auth = self.auths_cache:get(service_id, credentials, usage) | ||
|
||
if not cached_auth then | ||
local formatted_usage = format_usage(usage) | ||
local backend_res = backend:authorize(formatted_usage, credentials) | ||
local backend_status = backend_res.status | ||
|
||
if backend_status == 200 then | ||
self.auths_cache:set(service_id, credentials, usage, 200) | ||
local to_batch = { service_id = service_id, credentials = credentials, usage = usage } | ||
self.reports_batcher:add(to_batch.service_id, to_batch.credentials, to_batch.usage) | ||
elseif backend_status >= 400 and backend_status < 500 then | ||
local rejection_reason = rejection_reason_from_headers(backend_res.headers) | ||
self.auths_cache:set(service_id, credentials, usage, backend_status, rejection_reason) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If This looks like clear parameter coupling and makes the parameter list really long. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I thought about that too. I think there are some refactoring opportunities there. Maybe we could extract some classes like |
||
return error(service, rejection_reason) | ||
else | ||
return error(service) | ||
end | ||
else | ||
if cached_auth.status == 200 then | ||
local to_batch = { service_id = service_id, credentials = credentials, usage = usage } | ||
self.reports_batcher:add(to_batch.service_id, to_batch.credentials, to_batch.usage) | ||
else | ||
return error(service, cached_auth.rejection_reason) | ||
end | ||
end | ||
|
||
set_flags_to_avoid_auths_in_apicast(context) | ||
end | ||
|
||
return _M |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
# 3scale Batcher Policy | ||
|
||
## Description | ||
|
||
The APIcast policy performs one call to the 3scale backend for each request that | ||
it receives. The goal of this policy is to reduce latency and increase | ||
throughput by significantly reducing the number of requests made to the 3scale | ||
backend. In order to achieve that, this policy caches authorization statuses and | ||
batches reports. | ||
|
||
## Technical details | ||
|
||
When the APIcast policy receives a request, it makes an 'authrep' call to | ||
backend. This call checks the credentials sent by APIcast, and also applies rate | ||
limiting over the metrics sent by APIcast. If the credentials are correct and | ||
rate limits not violated, backend also increases the counters of the metrics | ||
reported by APIcast. This counters are used both to show statistics in the | ||
3scale UI and also to apply the rate limits. This means that the rate limiting | ||
applied will not be accurate until the counter is updated. For limits defined | ||
for long windows of time (hour, day, etc.) this update lag if often irrelevant. | ||
However, it might be important to take it into account for limits defined for a | ||
small window of time (a per-minute limit, for example). | ||
|
||
This policy uses a cache for authorizations and batches reports. Also, it makes | ||
'authorize' and 'report' calls to backend instead of 'authrep' calls. On each | ||
request, the flow is as follows: | ||
|
||
1. The policy checks whether the credentials are cached. If they are, the policy | ||
uses the cached authorization status instead of calling 3scale's backend. When | ||
the credentials are not cached, it calls backend and caches the authorization | ||
status with a configurable TTL. | ||
|
||
2. Instead of reporting to 3scale's backend the metrics associated with the | ||
request, the policy accumulates their usages to report to backend in batches. | ||
|
||
Apart from that, there's a separate thread that reports to backend periodically. | ||
The time is configurable. This thread fetches all the batched reports and sends | ||
them to backend in a single call. | ||
|
||
This approach increases throughput for two reasons. First, it caches | ||
authorizations. This reduces the number of calls to the 3scale backend. Second, | ||
it batches the reports. This also helps reducing the number of calls made to the | ||
3scale backend, but more importantly, it also reduces the amount of work it | ||
needs to do because the policy already aggregated the metrics to report. Suppose | ||
that we define a mapping rule that increases the metric 'hits' by one on each | ||
request. Suppose also that we have 1000 requests per second. If we define a | ||
batching period of 10 seconds, this policy will report to 3scale backend just a | ||
'hits +10000' instead of 10000 separated 'hits +1'. This is very important, | ||
because from the 3scale backend perspective reporting a +10000 or a +1 to its | ||
database it's the same amount of work. | ||
|
||
Of course, reporting to 3scale in batches has a trade-off. Rate limiting loses | ||
accuracy. The reason is that while reports are accumulated, they're not being | ||
sent to backend and rate limits only take into account reports that have been | ||
stored in the 3scale backend database. In summary, going over the defined usage | ||
limits is easier. The APIcast policy reports to 3scale backend every time it | ||
receives a request. Reports are asynchronous and that means that we can go over | ||
the limits for a brief window of time. On the other hand, this policy reports | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to explore different strategy for reporting. Right now there is a timer to report every N seconds. I could imagine a strategy where it would report continuously. So basically it would just cache the parallel calls and make them 1:1 to backend. All these optimizations have compromises and to chose the right one we should have some target. There is plenty ways how to do this with different tradeoffs and performance characteristics. We should define some bar we want to reach and chose the correct way to get there. Fully caching everything and then reporting to backend in one batch definitely has the best performance, but also the highest chance of going wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backend report scheduling strategy is a hard problem. Definitely, trial an error with different approaches is the best strategy to come up with the best solution within our constraints. I think it may depend on cache hit ratio. When hit ratio is high, the best approximation might be waiting as much as we can to achieve the highest batch size in allowed time window. On the other hand, when hit ratio is low, waiting does not make sense, keeping memory file descriptors and timers (through fd's) is expensive. We could implement a dynamic algorithm that changes strategies depending on cache hit ratio that we could measure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On event based systems, locks are very painful with unfordable penalties in performance. I suggest we do not use shared resources and IPC mechanisms. Each worker keeps its own state. Performance and low latency is the advantage. The drawback is increasing backend traffic since batching level is lower. Again depending on cache hit ratio. Good when cache hit ratio is high. Does not make sense on high cache miss scenarios. |
||
every X seconds (configurable) to 3scale backend. The window of time in which we | ||
can get over the limits is wider in this case. | ||
|
||
The effectiveness of this policy will depend on the cache hit ratio. For use | ||
cases where the variety of services, apps, metrics, etc. is relatively low, | ||
caching and batching will be very effective and will increase the throughput of | ||
the system significantly. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
{ | ||
"$schema": "http://apicast.io/policy-v1/schema#manifest#", | ||
"name": "3scale batcher", | ||
"summary": "Caches auths from 3scale backend and batches reports.", | ||
"description": | ||
["This policy caches authorizations from the 3scale backend ", | ||
"and also reports in batches. Doing this is more efficient than ", | ||
"authorizing and reporting on each request at the expense of losing ", | ||
"accuracy in the rate limits."], | ||
"version": "builtin", | ||
"configuration": { | ||
"type": "object", | ||
"properties": { | ||
"auths_ttl": { | ||
"description": "TTL for cached auths in seconds", | ||
"type": "integer" | ||
}, | ||
"batch_report_seconds": { | ||
"description": "Duration (in seconds) for batching reports", | ||
"type": "integer" | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the record: #477