Skip to content

Commit

Permalink
Add support of custom sharding func for crud methods
Browse files Browse the repository at this point in the history
  • Loading branch information
AnaNek committed Dec 22, 2021
1 parent 8ee652d commit 2ead6bc
Show file tree
Hide file tree
Showing 16 changed files with 762 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
key specified with DDL schema or in `_ddl_sharding_key` space.
NOTE: CRUD methods delete(), get() and update() requires that sharding key
must be a part of primary key.
* Support bucket id calculating using sharding func specified in
DDL schema or in `_ddl_sharding_func` space.

### Fixed

Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ documentation). As soon as sharding key for a certain space is available in
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
requires that sharding key must be a part of primary key.

You can specify sharding function to calculate bucket_id with
sharding func definition as a part of [DDL
schema](https://github.com/tarantool/ddl#input-data-format)
or insert manually to the space `_ddl_sharding_func`.

Table below describe what operations supports custom sharding key:

| CRUD method | Sharding key support |
Expand Down
16 changes: 14 additions & 2 deletions crud/common/sharding/sharding.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,27 @@ local errors = require('errors')
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})

local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local sharding = {}

function sharding.key_get_bucket_id(key, specified_bucket_id)
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
dev_checks('string', '?', '?number')

if specified_bucket_id ~= nil then
return specified_bucket_id
end

local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
if err ~= nil then
return nil, err
end

if sharding_func ~= nil then
return sharding_func(key)
end

return vshard.router.bucket_id_strcrc32(key)
end

Expand All @@ -31,7 +43,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end
local key = utils.extract_key(tuple, sharding_index_parts)

return sharding.key_get_bucket_id(key)
return sharding.key_get_bucket_id(space.name, key)
end

function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
Expand Down
25 changes: 25 additions & 0 deletions crud/common/sharding/sharding_func.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,31 @@ function sharding_func_module.extract_function_def(tuple)
return nil
end

function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name)
dev_checks('table', 'string')

local result_err

cache.sharding_func_map = {}
for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_func_def ~= nil then
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
space_name)
if err ~= nil then
if specified_space_name == space_name then
result_err = err
else
log.warn(err)
end
end

cache.sharding_func_map[space_name] = sharding_func
end
end

return result_err
end

sharding_func_module.internal = {
as_callable_object = as_callable_object,
is_callable = is_callable,
Expand Down
59 changes: 50 additions & 9 deletions crud/common/sharding/sharding_metadata.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local call = require('crud.common.call')
local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local sharding_func = require('crud.common.sharding.sharding_func')
local sharding_key = require('crud.common.sharding.sharding_key')

local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false})
Expand All @@ -16,21 +17,35 @@ local sharding_metadata_module = {}
-- Return a map or nil when metadata is not available.
function sharding_metadata_module.fetch_on_storage()
local sharding_key_space = box.space._ddl_sharding_key
if sharding_key_space == nil then
local sharding_func_space = box.space._ddl_sharding_func

if sharding_key_space == nil and sharding_func_space == nil then
return nil
end

local SPACE_NAME_FIELDNO = 1
local SPACE_SHARDING_KEY_FIELDNO = 2
local metadata_map = {}
for _, tuple in sharding_key_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
local space_format = box.space[space_name]:format()
metadata_map[space_name] = {
sharding_key_def = sharding_key_def,
space_format = space_format,
}

if sharding_key_space ~= nil then
for _, tuple in sharding_key_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
local space_format = box.space[space_name]:format()
metadata_map[space_name] = {
sharding_key_def = sharding_key_def,
space_format = space_format,
}
end
end

if sharding_func_space ~= nil then
for _, tuple in sharding_func_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_func_def = sharding_func.extract_function_def(tuple)
metadata_map[space_name] = metadata_map[space_name] or {}
metadata_map[space_name].sharding_func_def = sharding_func_def
end
end

return metadata_map
Expand Down Expand Up @@ -83,13 +98,19 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)

if metadata_map == nil then
cache[cache.SHARDING_KEY_MAP_NAME] = {}
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
return
end

local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name)
if err ~= nil then
return err
end

local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name)
if err ~= nil then
return err
end
end)

local function fetch_on_router(space_name, metadata_map_name, timeout)
Expand Down Expand Up @@ -126,11 +147,31 @@ function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeo
return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
end

-- Get sharding func for a certain space.
--
-- Return:
-- - sharding func as callable object, when sharding func definition found on
-- storage.
-- - nil, when sharding func definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
dev_checks('string', '?number')

return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
end

function sharding_metadata_module.update_sharding_key_cache(space_name)
cache.drop_caches()
return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
end

function sharding_metadata_module.update_sharding_func_cache(space_name)
cache.drop_caches()
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
end

function sharding_metadata_module.init()
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
end
Expand Down
3 changes: 3 additions & 0 deletions crud/common/sharding/sharding_metadata_cache.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ local fiber = require('fiber')
local sharding_metadata_cache = {}

sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"

sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
sharding_metadata_cache.fetch_lock = fiber.channel(1)
sharding_metadata_cache.is_part_of_pk = {}

function sharding_metadata_cache.drop_caches()
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
if sharding_metadata_cache.fetch_lock ~= nil then
sharding_metadata_cache.fetch_lock:close()
end
Expand Down
6 changes: 5 additions & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
6 changes: 5 additions & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local call_opts = {
mode = opts.mode or 'read',
prefer_replica = opts.prefer_replica,
Expand Down
6 changes: 5 additions & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
local bucket_id = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

assert(bucket_id ~= nil)

local err
Expand Down
6 changes: 5 additions & 1 deletion crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

assert(bucket_id ~= nil)

local err
Expand Down
6 changes: 5 additions & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ local function call_update_on_router(space_name, key, user_operations, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
5 changes: 5 additions & 0 deletions deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ tarantoolctl rocks install https://raw.githubusercontent.com/moteus/lua-path/mas
tarantoolctl rocks install https://raw.githubusercontent.com/moteus/luacov-coveralls/master/rockspecs/luacov-coveralls-scm-0.rockspec

tarantoolctl rocks install cartridge 2.7.1

# cartridge depends on ddl 1.5.0-1 (version without
# sharding func support), install latest version
tarantoolctl rocks install ddl scm-1

tarantoolctl rocks make
29 changes: 29 additions & 0 deletions test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ local cartridge = require('cartridge')
local ddl = require('ddl')

package.preload['customers-storage'] = function()
-- set sharding func in dot.notation
-- in _G for sharding func tests
local some_module = {
sharding_func =
function(key)
if key ~= nil and key[1] ~= nil then
return key[1] % 10
end
end
}
rawset(_G, 'some_module', some_module)

return {
role_name = 'customers-storage',
init = function()
Expand Down Expand Up @@ -131,6 +143,18 @@ package.preload['customers-storage'] = function()
table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index)
table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index)

local customers_id_key_schema = table.deepcopy(customers_schema)
customers_id_key_schema.sharding_key = {'id'}
table.insert(customers_id_key_schema.indexes, primary_index)
table.insert(customers_id_key_schema.indexes, bucket_id_index)
table.insert(customers_id_key_schema.indexes, name_index)

local customers_body_func_schema = table.deepcopy(customers_id_key_schema)
customers_body_func_schema.sharding_func = { body = 'function(key) return key[1] % 10 end' }

local customers_G_func_schema = table.deepcopy(customers_id_key_schema)
customers_G_func_schema.sharding_func = 'some_module.sharding_func'

local schema = {
spaces = {
customers_name_key = customers_name_key_schema,
Expand All @@ -140,6 +164,8 @@ package.preload['customers-storage'] = function()
customers_age_key = customers_age_key_schema,
customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema,
customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema,
customers_G_func = customers_G_func_schema,
customers_body_func = customers_body_func_schema,
}
}

Expand All @@ -154,6 +180,9 @@ package.preload['customers-storage'] = function()
local fieldno_sharding_key = 2
box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}})
end)
rawset(_G, 'set_sharding_func', function(space_name, fieldno_sharding_func, sharding_func_def)
box.space['_ddl_sharding_func']:update(space_name, {{'=', fieldno_sharding_func, sharding_func_def}})
end)
end,
}
end
Expand Down
38 changes: 38 additions & 0 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,42 @@ function helpers.get_sharding_key_cache(cluster)
]])
end

-- it is not possible to get function or table with function
-- object through net.box that's why we get a sign of record
-- existence of cache but not the cache itself
function helpers.update_sharding_func_cache(cluster, space_name)
return cluster.main_server.net_box:eval([[
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
local space_name = ...
local sharding_func, err = sharding_metadata.update_sharding_func_cache(space_name)
if sharding_func == nil then
return false, err
end
return true, err
]], {space_name})
end

-- it is not possible to get function or table with function
-- object through net.box that's why we get size of cache
-- but not the cache itself
function helpers.get_sharding_func_cache_size(cluster)
return cluster.main_server.net_box:eval([[
local sharding_metadata_cache = require('crud.common.sharding.sharding_metadata_cache')
local cache, err = sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME]
if cache == nil then
return nil, err
end
local cnt = 0
for _, _ in pairs(cache) do
cnt = cnt + 1
end
return cnt, err
]])
end

return helpers
Loading

0 comments on commit 2ead6bc

Please sign in to comment.