Skip to content

Commit

Permalink
internal: rework sharding schema reload
Browse files Browse the repository at this point in the history
This patch is the groundwork for vshard groups and custom routers
support.

After this patch, sharding schema reload works per vshard router object.

Test runs have shown that this patch do not affects the performance of
crud requests.

Part of #44
  • Loading branch information
DifferentialOrange committed Aug 26, 2022
1 parent 9a40bf8 commit 40ffd8b
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 65 deletions.
6 changes: 4 additions & 2 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
return { bucket_id = specified_bucket_id }
end

local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
local vshard_router = vshard.router.static
local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name)
if err ~= nil then
return nil, err
end
Expand All @@ -53,7 +54,8 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end

local sharding_index_parts = space.index[0].parts
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
local vshard_router = vshard.router.static
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space.name)
if err ~= nil then
return nil, err
end
Expand Down
46 changes: 34 additions & 12 deletions crud/common/sharding/router_metadata_cache.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,43 @@ local router_metadata_cache = {}
router_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
router_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
router_metadata_cache.META_HASH_MAP_NAME = "sharding_meta_hash_map"
router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {}
router_metadata_cache.fetch_lock = fiber.channel(1)
router_metadata_cache.is_part_of_pk = {}

local internal_storage = {}

function router_metadata_cache.get_instance(vshard_router_name)
if internal_storage[vshard_router_name] ~= nil then
return internal_storage[vshard_router_name]
end

internal_storage[vshard_router_name] = {
[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil,
[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil,
[router_metadata_cache.META_HASH_MAP_NAME] = {},
fetch_lock = fiber.channel(1),
is_part_of_pk = {}
}

return internal_storage[vshard_router_name]
end

function router_metadata_cache.drop_instance(vshard_router_name)
if internal_storage[vshard_router_name] == nil then
return
end

if internal_storage[vshard_router_name].fetch_lock ~= nil then
internal_storage[vshard_router_name].fetch_lock:close()
end

internal_storage[vshard_router_name] = nil
end

function router_metadata_cache.drop_caches()
router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {}
if router_metadata_cache.fetch_lock ~= nil then
router_metadata_cache.fetch_lock:close()
for name, _ in pairs(internal_storage) do
router_metadata_cache.drop_instance(name)
end
router_metadata_cache.fetch_lock = fiber.channel(1)
router_metadata_cache.is_part_of_pk = {}

internal_storage = {}
end

return router_metadata_cache
13 changes: 8 additions & 5 deletions crud/common/sharding/sharding_func.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
local errors = require('errors')
local log = require('log')
local vshard = require('vshard')

local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.router_metadata_cache')
local router_cache = require('crud.common.sharding.router_metadata_cache')
local utils = require('crud.common.utils')

local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false})
Expand Down Expand Up @@ -82,11 +83,13 @@ function sharding_func_module.construct_as_callable_obj_cache(metadata_map, spec

local result_err

cache[cache.SHARDING_FUNC_MAP_NAME] = {}
local func_cache = cache[cache.SHARDING_FUNC_MAP_NAME]
local vshard_router = vshard.router.static
local cache = router_cache.get_instance(vshard_router.name)
cache[router_cache.SHARDING_FUNC_MAP_NAME] = {}
local func_cache = cache[router_cache.SHARDING_FUNC_MAP_NAME]

cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] = {}
local func_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME]
cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_FUNC_MAP_NAME] = {}
local func_hash_cache = cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_FUNC_MAP_NAME]

for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_func_def ~= nil then
Expand Down
21 changes: 13 additions & 8 deletions crud/common/sharding/sharding_key.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
local errors = require('errors')
local log = require('log')
local vshard = require('vshard')

local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.router_metadata_cache')
local router_cache = require('crud.common.sharding.router_metadata_cache')
local utils = require('crud.common.utils')

local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
Expand Down Expand Up @@ -30,8 +31,8 @@ local function as_index_object(space_name, space_format, sharding_key_def)
end

-- Make sure sharding key definition is a part of primary key.
local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
dev_checks('string', 'table', 'table')
local function is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj)
dev_checks('table', 'string', 'table', 'table')

if cache.is_part_of_pk[space_name] ~= nil then
return cache.is_part_of_pk[space_name]
Expand Down Expand Up @@ -83,7 +84,9 @@ function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_o
return primary_key
end

local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
local vshard_router = vshard.router.static
local cache = router_cache.get_instance(vshard_router.name)
local res = is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj)
if res == false then
return nil, ShardingKeyError:new(
"Sharding key for space %q is missed in primary index, specify bucket_id",
Expand All @@ -102,11 +105,13 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map, specifie

local result_err

cache[cache.SHARDING_KEY_MAP_NAME] = {}
local key_cache = cache[cache.SHARDING_KEY_MAP_NAME]
local vshard_router = vshard.router.static
local cache = router_cache.get_instance(vshard_router.name)
cache[router_cache.SHARDING_KEY_MAP_NAME] = {}
local key_cache = cache[router_cache.SHARDING_KEY_MAP_NAME]

cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] = {}
local key_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME]
cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_KEY_MAP_NAME] = {}
local key_hash_cache = cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_KEY_MAP_NAME]

for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_key_def ~= nil then
Expand Down
63 changes: 37 additions & 26 deletions crud/common/sharding/sharding_metadata.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
local fiber = require('fiber')
local errors = require('errors')
local log = require('log')
local vshard = require('vshard')

local call = require('crud.common.call')
local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.router_metadata_cache')
local router_cache = require('crud.common.sharding.router_metadata_cache')
local storage_cache = require('crud.common.sharding.storage_metadata_cache')
local sharding_func = require('crud.common.sharding.sharding_func')
local sharding_key = require('crud.common.sharding.sharding_key')
Expand All @@ -22,8 +23,11 @@ local sharding_metadata_module = {}
local function locked(f)
dev_checks('function')

return function(timeout, ...)
return function(timeout, vshard_router, ...)
local timeout_deadline = fiber.clock() + timeout

local cache = router_cache.get_instance(vshard_router.name)

local ok = cache.fetch_lock:put(true, timeout)
-- channel:put() returns false in two cases: when timeout is exceeded
-- or channel has been closed. However error message describes only
Expand All @@ -34,7 +38,7 @@ local function locked(f)
"Timeout for fetching sharding metadata is exceeded")
end
local timeout = timeout_deadline - fiber.clock()
local status, err = pcall(f, timeout, ...)
local status, err = pcall(f, timeout, vshard_router, ...)
cache.fetch_lock:get()
if not status or err ~= nil then
return err
Expand Down Expand Up @@ -95,8 +99,10 @@ end
-- cache.fetch_lock become unlocked during timeout passed to
-- _fetch_on_router().
-- metadata_map_name == nil means forced reload.
local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
dev_checks('number', 'string', '?string')
local _fetch_on_router = locked(function(timeout, vshard_router, space_name, metadata_map_name)
dev_checks('number', 'table', 'string', '?string')

local cache = router_cache.get_instance(vshard_router.name)

if (metadata_map_name ~= nil) and (cache[metadata_map_name]) ~= nil then
return
Expand All @@ -109,11 +115,11 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
return err
end
if metadata_map == nil then
cache[cache.SHARDING_KEY_MAP_NAME] = {}
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
cache[cache.META_HASH_MAP_NAME] = {
[cache.SHARDING_KEY_MAP_NAME] = {},
[cache.SHARDING_FUNC_MAP_NAME] = {},
cache[router_cache.SHARDING_KEY_MAP_NAME] = {}
cache[router_cache.SHARDING_FUNC_MAP_NAME] = {}
cache[router_cache.META_HASH_MAP_NAME] = {
[router_cache.SHARDING_KEY_MAP_NAME] = {},
[router_cache.SHARDING_FUNC_MAP_NAME] = {},
}
return
end
Expand All @@ -129,24 +135,26 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
end
end)

local function fetch_on_router(space_name, metadata_map_name, timeout)
local function fetch_on_router(vshard_router, space_name, metadata_map_name, timeout)
local cache = router_cache.get_instance(vshard_router.name)

if cache[metadata_map_name] ~= nil then
return {
value = cache[metadata_map_name][space_name],
hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name]
hash = cache[router_cache.META_HASH_MAP_NAME][metadata_map_name][space_name]
}
end

local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT
local err = _fetch_on_router(timeout, space_name, metadata_map_name)
local err = _fetch_on_router(timeout, vshard_router, space_name, metadata_map_name)
if err ~= nil then
return nil, err
end

if cache[metadata_map_name] ~= nil then
return {
value = cache[metadata_map_name][space_name],
hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name],
hash = cache[router_cache.META_HASH_MAP_NAME][metadata_map_name][space_name],
}
end

Expand All @@ -163,10 +171,10 @@ end
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
dev_checks('string', '?number')
function sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name, timeout)
dev_checks('table', 'string', '?number')

return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_KEY_MAP_NAME, timeout)
end

-- Get sharding func for a certain space.
Expand All @@ -178,28 +186,31 @@ end
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
dev_checks('string', '?number')
function sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name, timeout)
dev_checks('table', 'string', '?number')

return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_FUNC_MAP_NAME, timeout)
end

function sharding_metadata_module.update_sharding_key_cache(space_name)
cache.drop_caches()
local vshard_router = vshard.router.static
router_cache.drop_instance(vshard_router.name)

return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
return sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
end

function sharding_metadata_module.update_sharding_func_cache(space_name)
cache.drop_caches()
local vshard_router = vshard.router.static
router_cache.drop_instance(vshard_router.name)

return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
return sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name)
end

function sharding_metadata_module.reload_sharding_cache(space_name)
cache.drop_caches()
local vshard_router = vshard.router.static
router_cache.drop_instance(vshard_router.name)

local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, space_name, nil)
local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, vshard_router, space_name, nil)
if err ~= nil then
log.warn('Failed to reload sharding cache: %s', err)
end
Expand Down
2 changes: 1 addition & 1 deletion crud/count.lua
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ local function call_count_on_router(space_name, user_conditions, opts)

-- We don't need sharding info if bucket_id specified.
if opts.bucket_id == nil then
sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
if err ~= nil then
return nil, err
end
Expand Down
2 changes: 1 addition & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ local function call_delete_on_router(space_name, key, opts)
if opts.bucket_id == nil then
local primary_index_parts = space.index[0].parts

local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
if err ~= nil then
return nil, err
end
Expand Down
2 changes: 1 addition & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ local function call_get_on_router(space_name, key, opts)
if opts.bucket_id == nil then
local primary_index_parts = space.index[0].parts

local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
if err ~= nil then
return nil, err
end
Expand Down
2 changes: 1 addition & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ local function build_select_iterator(space_name, user_conditions, opts)

-- We don't need sharding info if bucket_id specified.
if opts.bucket_id == nil then
sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
if err ~= nil then
return nil, err
end
Expand Down
2 changes: 1 addition & 1 deletion crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
local sharding_key_as_index_obj = nil
-- We don't need sharding info if bucket_id specified.
if opts.bucket_id == nil then
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
if err ~= nil then
return nil, err
end
Expand Down
2 changes: 1 addition & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ local function call_update_on_router(space_name, key, user_operations, opts)
if opts.bucket_id == nil then
local primary_index_parts = space.index[0].parts

local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name)
if err ~= nil then
return nil, err
end
Expand Down
12 changes: 10 additions & 2 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,13 @@ end

function helpers.get_sharding_key_cache(cluster)
return cluster.main_server.net_box:eval([[
local vshard = require('vshard')
local sharding_metadata_cache = require('crud.common.sharding.router_metadata_cache')
return sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME]
local vshard_router = vshard.router.static
local cache = sharding_metadata_cache.get_instance(vshard_router.name)
return cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME]
]])
end

Expand All @@ -362,9 +366,13 @@ end
-- but not the cache itself
function helpers.get_sharding_func_cache_size(cluster)
return cluster.main_server.net_box:eval([[
local vshard = require('vshard')
local sharding_metadata_cache = require('crud.common.sharding.router_metadata_cache')
local cache, err = sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME]
local vshard_router = vshard.router.static
local instance_cache = sharding_metadata_cache.get_instance(vshard_router.name)
local cache, err = instance_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME]
if cache == nil then
return nil, err
end
Expand Down
Loading

0 comments on commit 40ffd8b

Please sign in to comment.