-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(timers) cluster and reports timers use raw shm
This is based on #1748 but adds some cleanup and performance improvements. It gets rid of using the database cache module and uses the raw 'kong' shared dict.
- Loading branch information
1 parent
573577b
commit 2ace691
Showing
12 changed files
with
365 additions
and
257 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,100 +1,145 @@ | ||
local cache = require "kong.tools.database_cache" | ||
local singletons = require "kong.singletons" | ||
|
||
|
||
local kong_dict = ngx.shared.kong | ||
local timer_at = ngx.timer.at | ||
local ngx_log = ngx.log | ||
local ERR = ngx.ERR | ||
local DEBUG = ngx.DEBUG | ||
|
||
local resty_lock | ||
local status, res = pcall(require, "resty.lock") | ||
if status then | ||
resty_lock = res | ||
end | ||
|
||
local KEEPALIVE_INTERVAL = 30 | ||
local ASYNC_AUTOJOIN_INTERVAL = 3 | ||
local ASYNC_AUTOJOIN_RETRIES = 20 -- Try for max a minute (3s * 20) | ||
local KEEPALIVE_KEY = "events:keepalive" | ||
local AUTOJOIN_INTERVAL = 3 | ||
local AUTOJOIN_KEY = "events:autojoin" | ||
local AUTOJOIN_MAX_RETRIES = 20 -- Try for max a minute (3s * 20) | ||
local AUTOJOIN_MAX_RETRIES_KEY = "autojoin_retries" | ||
|
||
|
||
local function log(lvl, ...) | ||
ngx_log(lvl, "[cluster] ", ...) | ||
end | ||
|
||
|
||
-- Hold a lock for the whole interval (exptime) to prevent multiple | ||
-- worker processes from sending the test request simultaneously. | ||
-- Other workers do not need to wait until this lock is released, | ||
-- and can ignore the event, knowing another worker is handling it. | ||
-- We substract 1ms to the exp time to prevent a race condition | ||
-- with the next timer event. | ||
local function get_lock(key, exptime) | ||
local ok, err = kong_dict:safe_add(key, true, exptime - 0.001) | ||
if not ok and err ~= "exists" then | ||
log(ERR, "could not get lock from 'kong' shm: ", err) | ||
end | ||
|
||
return ok | ||
end | ||
|
||
|
||
local function create_timer(at, cb) | ||
local ok, err = ngx.timer.at(at, cb) | ||
local function create_timer(...) | ||
local ok, err = timer_at(...) | ||
if not ok then | ||
ngx_log(ngx.ERR, "[cluster] failed to create timer: ", err) | ||
log(ERR, "could not create timer: ", err) | ||
end | ||
end | ||
|
||
local function async_autojoin(premature) | ||
if premature then return end | ||
|
||
local function autojoin_handler(premature) | ||
if premature then | ||
return | ||
end | ||
|
||
-- increase retry count by 1 | ||
|
||
local n_retries, err = kong_dict:incr(AUTOJOIN_MAX_RETRIES_KEY, 1, 0) | ||
if err then | ||
log(ERR, "could not increment number of auto-join retries in 'kong' ", | ||
"shm: ", err) | ||
return | ||
end | ||
|
||
-- register recurring retry timer | ||
|
||
if n_retries < AUTOJOIN_MAX_RETRIES then | ||
-- all workers need to register a recurring timer, in case one of them | ||
-- crashes. Hence, this must be called before the `get_lock()` call. | ||
create_timer(AUTOJOIN_INTERVAL, autojoin_handler) | ||
end | ||
|
||
if not get_lock(AUTOJOIN_KEY, AUTOJOIN_INTERVAL) then | ||
return | ||
end | ||
|
||
-- auto-join nodes table | ||
|
||
-- If this node is the only node in the cluster, but other nodes are present, then try to join them | ||
-- This usually happens when two nodes are started very fast, and the first node didn't write his | ||
-- information into the datastore yet. When the second node starts up, there is nothing to join yet. | ||
local lock, err = resty_lock:new("cluster_autojoin_locks", { | ||
exptime = ASYNC_AUTOJOIN_INTERVAL - 0.001 | ||
}) | ||
if not lock then | ||
ngx_log(ngx.ERR, "could not create lock: ", err) | ||
return | ||
end | ||
local elapsed = lock:lock("async_autojoin") | ||
if elapsed and elapsed == 0 then | ||
-- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in | ||
-- the DAO, then try to join them | ||
local count, err = singletons.dao.nodes:count() | ||
log(DEBUG, "auto-joining") | ||
|
||
-- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in | ||
-- the DAO, then try to join them | ||
local count, err = singletons.dao.nodes:count() | ||
if err then | ||
log(ERR, err) | ||
|
||
elseif count > 1 then | ||
local members, err = singletons.serf:members() | ||
if err then | ||
ngx_log(ngx.ERR, tostring(err)) | ||
elseif count > 1 then | ||
local members, err = singletons.serf:members() | ||
log(ERR, err) | ||
|
||
elseif #members < 2 then | ||
-- Trigger auto-join | ||
local _, err = singletons.serf:autojoin() | ||
if err then | ||
ngx_log(ngx.ERR, tostring(err)) | ||
elseif #members < 2 then | ||
-- Trigger auto-join | ||
local _, err = singletons.serf:autojoin() | ||
if err then | ||
ngx_log(ngx.ERR, tostring(err)) | ||
end | ||
else | ||
return -- The node is already in the cluster and no need to continue | ||
log(ERR, err) | ||
end | ||
end | ||
|
||
-- Create retries counter key if it doesn't exist | ||
if not cache.get(cache.autojoin_retries_key()) then | ||
cache.rawset(cache.autojoin_retries_key(), 0) | ||
end | ||
|
||
local autojoin_retries = cache.incr(cache.autojoin_retries_key(), 1) -- Increment retries counter | ||
if (autojoin_retries < ASYNC_AUTOJOIN_RETRIES) then | ||
create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) | ||
else | ||
return -- The node is already in the cluster and no need to continue | ||
end | ||
end | ||
end | ||
|
||
local function send_keepalive(premature) | ||
if premature then return end | ||
|
||
local lock = resty_lock:new("cluster_locks", { | ||
exptime = KEEPALIVE_INTERVAL - 0.001 | ||
}) | ||
local elapsed = lock:lock("keepalive") | ||
if elapsed and elapsed == 0 then | ||
-- Send keepalive | ||
local nodes, err = singletons.dao.nodes:find_all { | ||
name = singletons.serf.node_name | ||
} | ||
|
||
local function keepalive_handler(premature) | ||
if premature then | ||
return | ||
end | ||
|
||
-- all workers need to register a recurring timer, in case one of them | ||
-- crashes. Hence, this must be called before the `get_lock()` call. | ||
create_timer(KEEPALIVE_INTERVAL, keepalive_handler) | ||
|
||
if not get_lock(KEEPALIVE_KEY, KEEPALIVE_INTERVAL) then | ||
return | ||
end | ||
|
||
log(DEBUG, "sending keepalive event to datastore") | ||
|
||
local nodes, err = singletons.dao.nodes:find_all { | ||
name = singletons.serf.node_name | ||
} | ||
if err then | ||
log(ERR, "could not retrieve nodes from datastore: ", err) | ||
|
||
elseif #nodes == 1 then | ||
local node = nodes[1] | ||
local _, err = singletons.dao.nodes:update(node, node, { | ||
ttl = singletons.configuration.cluster_ttl_on_failure, | ||
quiet = true | ||
}) | ||
if err then | ||
ngx_log(ngx.ERR, tostring(err)) | ||
elseif #nodes == 1 then | ||
local node = nodes[1] | ||
local _, err = singletons.dao.nodes:update(node, node, {ttl=singletons.configuration.cluster_ttl_on_failure}) | ||
if err then | ||
ngx_log(ngx.ERR, tostring(err)) | ||
end | ||
log(ERR, "could not update node in datastore:", err) | ||
end | ||
end | ||
|
||
create_timer(KEEPALIVE_INTERVAL, send_keepalive) | ||
end | ||
|
||
|
||
return { | ||
init_worker = function() | ||
create_timer(KEEPALIVE_INTERVAL, send_keepalive) | ||
create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) -- Only execute one time | ||
create_timer(KEEPALIVE_INTERVAL, keepalive_handler) | ||
create_timer(AUTOJOIN_INTERVAL, autojoin_handler) | ||
end | ||
} |
Oops, something went wrong.