diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f606f622..0843f03da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. key specified with DDL schema or in `_ddl_sharding_key` space. NOTE: CRUD methods delete(), get() and update() requires that sharding key must be a part of primary key. +* Support bucket id calculating using sharding func specified in + DDL schema or in `_ddl_sharding_func` space. ### Fixed diff --git a/README.md b/README.md index c62a5412a..86e222179 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in automatically. Note that CRUD methods `delete()`, `get()` and `update()` requires that sharding key must be a part of primary key. +You can specify sharding function to calculate bucket_id with +sharding func definition as a part of [DDL +schema](https://github.com/tarantool/ddl#input-data-format) +or insert manually to the space `_ddl_sharding_func`. + +CRUD uses `strcrc32` as sharding function by default. +The reason why using of `strcrc32` is undesirable is that +this sharding function is not consistent for cdata numbers. +In particular, it returns 3 different values for normal Lua +numbers like 123, for `unsigned long long` cdata +(like `123ULL`, or `ffi.cast('unsigned long long', +123)`), and for `signed long long` cdata (like `123LL`, or +`ffi.cast('long long', 123)`). + +We cannot change default sharding function `strcrc32` +due to backward compatibility concerns, but please consider +using better alternatives for sharding function. +`mpcrc32` is one of them. + Table below describe what operations supports custom sharding key: | CRUD method | Sharding key support | diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index b47c5e88e..dc4f68982 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -4,15 +4,27 @@ local errors = require('errors') local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) local utils = require('crud.common.utils') +local dev_checks = require('crud.common.dev_checks') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding = {} -function sharding.key_get_bucket_id(key, specified_bucket_id) +function sharding.key_get_bucket_id(space_name, key, specified_bucket_id) + dev_checks('string', '?', '?number|cdata') + if specified_bucket_id ~= nil then return specified_bucket_id end + local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name) + if err ~= nil then + return nil, err + end + + if sharding_func ~= nil then + return sharding_func(key) + end + return vshard.router.bucket_id_strcrc32(key) end @@ -31,7 +43,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local key = utils.extract_key(tuple, sharding_index_parts) - return sharding.key_get_bucket_id(key) + return sharding.key_get_bucket_id(space.name, key) end function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id) diff --git a/crud/common/sharding/sharding_func.lua b/crud/common/sharding/sharding_func.lua index 77c4a675a..b2afb9084 100644 --- a/crud/common/sharding/sharding_func.lua +++ b/crud/common/sharding/sharding_func.lua @@ -77,6 +77,32 @@ local function as_callable_object(sharding_func_def, space_name) ) end +function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name) + dev_checks('table', 'string') + + local result_err + + cache.sharding_func_map = {} + for space_name, metadata in pairs(metadata_map) do + if metadata.sharding_func_def ~= nil then + local sharding_func, err = as_callable_object(metadata.sharding_func_def, + space_name) + if err ~= nil then + if specified_space_name == space_name then + result_err = err + log.error(err) + else + log.warn(err) + end + end + + cache.sharding_func_map[space_name] = sharding_func + end + end + + return result_err +end + sharding_func_module.internal = { as_callable_object = as_callable_object, is_callable = is_callable, diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index b9f2a396c..d6fbf87eb 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -5,6 +5,7 @@ local call = require('crud.common.call') local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') local cache = require('crud.common.sharding.sharding_metadata_cache') +local sharding_func = require('crud.common.sharding.sharding_func') local sharding_key = require('crud.common.sharding.sharding_key') local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false}) @@ -57,25 +58,39 @@ local function extract_sharding_func_def(tuple) return nil end --- Return a map with metadata or nil when space box.space._ddl_sharding_key is --- not available on storage. +-- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and +-- box.space._ddl_sharding_func are not available on storage. function sharding_metadata_module.fetch_on_storage() local sharding_key_space = box.space._ddl_sharding_key - if sharding_key_space == nil then + local sharding_func_space = box.space._ddl_sharding_func + + if sharding_key_space == nil and sharding_func_space == nil then return nil end local SPACE_NAME_FIELDNO = 1 local SPACE_SHARDING_KEY_FIELDNO = 2 local metadata_map = {} - for _, tuple in sharding_key_space:pairs() do - local space_name = tuple[SPACE_NAME_FIELDNO] - local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] - local space_format = box.space[space_name]:format() - metadata_map[space_name] = { - sharding_key_def = sharding_key_def, - space_format = space_format, - } + + if sharding_key_space ~= nil then + for _, tuple in sharding_key_space:pairs() do + local space_name = tuple[SPACE_NAME_FIELDNO] + local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] + local space_format = box.space[space_name]:format() + metadata_map[space_name] = { + sharding_key_def = sharding_key_def, + space_format = space_format, + } + end + end + + if sharding_func_space ~= nil then + for _, tuple in sharding_func_space:pairs() do + local space_name = tuple[SPACE_NAME_FIELDNO] + local sharding_func_def = extract_sharding_func_def(tuple) + metadata_map[space_name] = metadata_map[space_name] or {} + metadata_map[space_name].sharding_func_def = sharding_func_def + end end return metadata_map @@ -102,6 +117,7 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) end if metadata_map == nil then cache[cache.SHARDING_KEY_MAP_NAME] = {} + cache[cache.SHARDING_FUNC_MAP_NAME] = {} return end @@ -109,17 +125,13 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) if err ~= nil then return err end + + local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name) + if err ~= nil then + return err + end end) --- Get sharding index for a certain space. --- --- Return: --- - sharding key as index object, when sharding key definition found on --- storage. --- - nil, when sharding key definition was not found on storage. Pay attention --- that nil without error is a successfull return value. --- - nil and error, when something goes wrong on fetching attempt. --- local function fetch_on_router(space_name, metadata_map_name, timeout) if cache[metadata_map_name] ~= nil then return cache[metadata_map_name][space_name] @@ -139,17 +151,46 @@ local function fetch_on_router(space_name, metadata_map_name, timeout) "Fetching sharding key for space '%s' is failed", space_name) end +-- Get sharding index for a certain space. +-- +-- Return: +-- - sharding key as index object, when sharding key definition found on +-- storage. +-- - nil, when sharding key definition was not found on storage. Pay attention +-- that nil without error is a successfull return value. +-- - nil and error, when something goes wrong on fetching attempt. +-- function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout) dev_checks('string', '?number') return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout) end +-- Get sharding func for a certain space. +-- +-- Return: +-- - sharding func as callable object, when sharding func definition found on +-- storage. +-- - nil, when sharding func definition was not found on storage. Pay attention +-- that nil without error is a successfull return value. +-- - nil and error, when something goes wrong on fetching attempt. +-- +function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout) + dev_checks('string', '?number') + + return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout) +end + function sharding_metadata_module.update_sharding_key_cache(space_name) cache.drop_caches() return sharding_metadata_module.fetch_sharding_key_on_router(space_name) end +function sharding_metadata_module.update_sharding_func_cache(space_name) + cache.drop_caches() + return sharding_metadata_module.fetch_sharding_func_on_router(space_name) +end + function sharding_metadata_module.init() _G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage end diff --git a/crud/common/sharding/sharding_metadata_cache.lua b/crud/common/sharding/sharding_metadata_cache.lua index 77c775940..5b2fdcd65 100644 --- a/crud/common/sharding/sharding_metadata_cache.lua +++ b/crud/common/sharding/sharding_metadata_cache.lua @@ -3,12 +3,15 @@ local fiber = require('fiber') local sharding_metadata_cache = {} sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map" +sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map" sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil +sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil sharding_metadata_cache.fetch_lock = fiber.channel(1) sharding_metadata_cache.is_part_of_pk = {} function sharding_metadata_cache.drop_caches() sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil + sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil if sharding_metadata_cache.fetch_lock ~= nil then sharding_metadata_cache.fetch_lock:close() end diff --git a/crud/delete.lua b/crud/delete.lua index 0bdc16ae6..9630bbd36 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts) end end - local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + local call_opts = { mode = 'write', timeout = opts.timeout, diff --git a/crud/get.lua b/crud/get.lua index 3022ca578..b4f44ff22 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts) end end - local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + local call_opts = { mode = opts.mode or 'read', prefer_replica = opts.prefer_replica, diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 81638c14d..073f819a8 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + assert(bucket_id ~= nil) local err diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index bcbf3c4c1..ccdf8ba70 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + assert(bucket_id ~= nil) local err diff --git a/crud/update.lua b/crud/update.lua index 31f7135b1..e82dad04e 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -110,7 +110,11 @@ local function call_update_on_router(space_name, key, user_operations, opts) end end - local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + local call_opts = { mode = 'write', timeout = opts.timeout, diff --git a/deps.sh b/deps.sh index a2949a2ca..1b8c405fe 100755 --- a/deps.sh +++ b/deps.sh @@ -16,4 +16,9 @@ tarantoolctl rocks install https://raw.githubusercontent.com/moteus/lua-path/mas tarantoolctl rocks install https://raw.githubusercontent.com/moteus/luacov-coveralls/master/rockspecs/luacov-coveralls-scm-0.rockspec tarantoolctl rocks install cartridge 2.7.1 + +# cartridge depends on ddl 1.5.0-1 (version without +# sharding func support), install latest version +tarantoolctl rocks install ddl scm-1 + tarantoolctl rocks make diff --git a/test/entrypoint/srv_ddl.lua b/test/entrypoint/srv_ddl.lua index f240c743e..30f432b8a 100755 --- a/test/entrypoint/srv_ddl.lua +++ b/test/entrypoint/srv_ddl.lua @@ -9,6 +9,18 @@ local cartridge = require('cartridge') local ddl = require('ddl') package.preload['customers-storage'] = function() + -- set sharding func in dot.notation + -- in _G for sharding func tests + local some_module = { + sharding_func = + function(key) + if key ~= nil and key[1] ~= nil then + return key[1] % 10 + end + end + } + rawset(_G, 'some_module', some_module) + return { role_name = 'customers-storage', init = function() @@ -131,6 +143,18 @@ package.preload['customers-storage'] = function() table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index) table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index) + local customers_id_key_schema = table.deepcopy(customers_schema) + customers_id_key_schema.sharding_key = {'id'} + table.insert(customers_id_key_schema.indexes, primary_index) + table.insert(customers_id_key_schema.indexes, bucket_id_index) + table.insert(customers_id_key_schema.indexes, name_index) + + local customers_body_func_schema = table.deepcopy(customers_id_key_schema) + customers_body_func_schema.sharding_func = { body = 'function(key) return key[1] % 10 end' } + + local customers_G_func_schema = table.deepcopy(customers_id_key_schema) + customers_G_func_schema.sharding_func = 'some_module.sharding_func' + local schema = { spaces = { customers_name_key = customers_name_key_schema, @@ -140,6 +164,8 @@ package.preload['customers-storage'] = function() customers_age_key = customers_age_key_schema, customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema, customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema, + customers_G_func = customers_G_func_schema, + customers_body_func = customers_body_func_schema, } } @@ -154,6 +180,9 @@ package.preload['customers-storage'] = function() local fieldno_sharding_key = 2 box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}}) end) + rawset(_G, 'set_sharding_func', function(space_name, fieldno_sharding_func, sharding_func_def) + box.space['_ddl_sharding_func']:update(space_name, {{'=', fieldno_sharding_func, sharding_func_def}}) + end) end, } end diff --git a/test/helper.lua b/test/helper.lua index ab2acf277..d6e923a7b 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -340,4 +340,42 @@ function helpers.get_sharding_key_cache(cluster) ]]) end +-- it is not possible to get function or table with function +-- object through net.box that's why we get a sign of record +-- existence of cache but not the cache itself +function helpers.update_sharding_func_cache(cluster, space_name) + return cluster.main_server.net_box:eval([[ + local sharding_metadata = require('crud.common.sharding.sharding_metadata') + + local space_name = ... + local sharding_func, err = sharding_metadata.update_sharding_func_cache(space_name) + if sharding_func == nil then + return false, err + end + + return true, err + ]], {space_name}) +end + +-- it is not possible to get function or table with function +-- object through net.box that's why we get size of cache +-- but not the cache itself +function helpers.get_sharding_func_cache_size(cluster) + return cluster.main_server.net_box:eval([[ + local sharding_metadata_cache = require('crud.common.sharding.sharding_metadata_cache') + + local cache, err = sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] + if cache == nil then + return nil, err + end + + local cnt = 0 + for _, _ in pairs(cache) do + cnt = cnt + 1 + end + + return cnt, err + ]]) +end + return helpers diff --git a/test/integration/ddl_sharding_func_test.lua b/test/integration/ddl_sharding_func_test.lua new file mode 100644 index 000000000..6d22fddb9 --- /dev/null +++ b/test/integration/ddl_sharding_func_test.lua @@ -0,0 +1,504 @@ +local fio = require('fio') +local crud = require('crud') +local t = require('luatest') + +local helpers = require('test.helper') + +local ok = pcall(require, 'ddl') +if not ok then + t.skip('Lua module ddl is required to run test') +end + +local pgroup = t.group('ddl_sharding_func', { + {engine = 'memtx', space_name = 'customers_G_func'}, + {engine = 'memtx', space_name = 'customers_body_func'}, + {engine = 'vinyl', space_name = 'customers_G_func'}, + {engine = 'vinyl', space_name = 'customers_body_func'}, +}) + +local cache_group = t.group('ddl_sharding_func_cache', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_ddl'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + local result, err = g.cluster.main_server.net_box:eval([[ + local ddl = require('ddl') + + local ok, err = ddl.get_schema() + return ok, err + ]]) + t.assert_equals(type(result), 'table') + t.assert_equals(err, nil) +end) + +pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +pgroup.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers_G_func') + helpers.truncate_space_on_cluster(g.cluster, 'customers_body_func') +end) + +cache_group.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_ddl'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + local result, err = g.cluster.main_server.net_box:eval([[ + local ddl = require('ddl') + + local ok, err = ddl.get_schema() + return ok, err + ]]) + t.assert_equals(type(result), 'table') + t.assert_equals(err, nil) +end) + +cache_group.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +cache_group.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers_G_func') + helpers.truncate_space_on_cluster(g.cluster, 'customers_body_func') +end) + +pgroup.test_insert_object = function(g) + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert_object', {g.params.space_name, {id = 158, name = 'Augustus', age = 48}}) + t.assert_equals(err, nil) + + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_equals(objects, {{id = 158, bucket_id = 8, name = 'Augustus', age = 48}}) + + local conn_s1 = g.cluster:server('s1-master').net_box + -- There is no tuple on s1 that we inserted before using crud.insert_object(). + local result = conn_s1.space[g.params.space_name]:get({158, 'Augustus'}) + t.assert_equals(result, nil) + + local conn_s2 = g.cluster:server('s2-master').net_box + -- There is a tuple on s2 that we inserted before using crud.insert_object(). + local result = conn_s2.space[g.params.space_name]:get({158, 'Augustus'}) + t.assert_equals(result, {158, 8, 'Augustus', 48}) +end + +pgroup.test_insert = function(g) + -- Insert a tuple. + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert', {g.params.space_name, {27, box.NULL, 'Ivan', 25}}) + t.assert_equals(err, nil) + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {27, 7, 'Ivan', 25}) + + -- There is a tuple on s2 that we inserted before using crud.insert(). + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({27, 'Ivan'}) + t.assert_equals(result, {27, 7, 'Ivan', 25}) + + -- There is no tuple on s1 that we inserted before using crud.insert(). + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({27, 'Ivan'}) + t.assert_equals(result, nil) +end + +pgroup.test_replace_object = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {8, 596, 'Dimitrion', 20} + + -- Put tuple to s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Replace an object. + local result, err = g.cluster.main_server.net_box:call( + 'crud.replace_object', {g.params.space_name, {id = 8, name = 'John Doe', age = 25}}) + t.assert_equals(err, nil) + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_equals(objects, {{id = 8, bucket_id = 8, name = 'John Doe', age = 25}}) + + -- There is no replaced tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({8, 'John Doe'}) + t.assert_equals(result, nil) + + -- There is replaced tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({8, 'John Doe'}) + t.assert_equals(result, {8, 8, 'John Doe', 25}) +end + +pgroup.test_replace = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {71, 596, 'Dimitrion', 20} + + -- Put tuple to s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + local tuple = {71, box.NULL, 'Augustus', 21} + + -- Replace a tuple. + local result, err = g.cluster.main_server.net_box:call('crud.replace', { + g.params.space_name, tuple + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {71, 1, 'Augustus', 21}) + + -- There is no replaced tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({71, 'Augustus'}) + t.assert_equals(result, nil) + + -- There is replaced tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({71, 'Augustus'}) + t.assert_equals(result, {71, 1, 'Augustus', 21}) +end + +pgroup.test_upsert_object = function(g) + -- Upsert an object first time. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', + {g.params.space_name, {id = 66, name = 'Jack Sparrow', age = 25}, {{'+', 'age', 26}}} + ) + t.assert_equals(#result.rows, 0) + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + t.assert_equals(err, nil) + + -- There is no tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({66, 'Jack Sparrow'}) + t.assert_equals(result, nil) + + -- There is a tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({66, 'Jack Sparrow'}) + t.assert_equals(result, {66, 6, 'Jack Sparrow', 25}) + + -- Upsert the same query second time when tuple exists. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', + {g.params.space_name, {id = 66, name = 'Jack Sparrow', age = 25}, {{'+', 'age', 26}}} + ) + t.assert_equals(#result.rows, 0) + t.assert_equals(err, nil) + + -- There is no tuple on s2 replicaset. + local result = conn_s1.space[g.params.space_name]:get({66, 'Jack Sparrow'}) + t.assert_equals(result, nil) + + -- There is an updated tuple on s1 replicaset. + local result = conn_s2.space[g.params.space_name]:get({66, 'Jack Sparrow'}) + t.assert_equals(result, {66, 6, 'Jack Sparrow', 51}) +end + +pgroup.test_upsert = function(g) + local tuple = {14, box.NULL, 'John', 25} + + -- Upsert an object first time. + local result, err = g.cluster.main_server.net_box:call('crud.upsert', { + g.params.space_name, tuple, {} + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 0) + + -- There is no tuple on s2 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({14, 'John'}) + t.assert_equals(result, nil) + + -- There is a tuple on s1 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({14, 'John'}) + t.assert_equals(result, {14, 4, 'John', 25}) + + -- Upsert the same query second time when tuple exists. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', + {g.params.space_name, {id = 14, name = 'John', age = 25}, {{'+', 'age', 26}}} + ) + t.assert_equals(#result.rows, 0) + t.assert_equals(err, nil) + + -- There is no tuple on s2 replicaset. + local result = conn_s1.space[g.params.space_name]:get({14, 'John'}) + t.assert_equals(result, nil) + + -- There is an updated tuple on s1 replicaset. + local result = conn_s2.space[g.params.space_name]:get({14, 'John'}) + t.assert_equals(result, {14, 4, 'John', 51}) +end + +pgroup.test_select = function(g) + -- bucket_id is id % 10 = 8 + local tuple = {18, 8, 'Ptolemy', 25} + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + local conditions = {{'==', 'id', 18}} + local result, err = g.cluster.main_server.net_box:call('crud.select', { + g.params.space_name, conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], tuple) + + -- bucket_id is 2719, storage is s-1 + local tuple = {19, 2719, 'Ptolemy', 25} + + -- Put tuple to s1 replicaset. + local conn_s2 = g.cluster:server('s1-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- calculated bucket_id will be id % 10 = 19 % 10 = 9 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty + local conditions = {{'==', 'id', 19}} + local result, err = g.cluster.main_server.net_box:call('crud.select', { + g.params.space_name, conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(result.rows, {}) +end + +pgroup.test_update = function(g) + -- bucket_id is id % 10 = 2 + local tuple = {12, 2, 'Ivan', 10} + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- Put tuple with to s1 replicaset. + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple with to s2 replicaset. + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Update a tuple. + local update_operations = { + {'+', 'age', 10}, + } + local result, err = g.cluster.main_server.net_box:call('crud.update', { + g.params.space_name, {12, 'Ivan'}, update_operations, + }) + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows, {{12, 2, 'Ivan', 20}}) + + -- Tuple on s1 replicaset was not updated. + local result = conn_s1.space[g.params.space_name]:get({12, 'Ivan'}) + t.assert_equals(result, {12, 2, 'Ivan', 10}) + + -- Tuple on s2 replicaset was updated. + local result = conn_s2.space[g.params.space_name]:get({12, 'Ivan'}) + t.assert_equals(result, {12, 2, 'Ivan', 20}) + + -- bucket_id is 2719, storage is s-1 + local tuple = {18, 2719, 'Ivan', 10} + + -- Put tuple with to s1 replicaset. + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Update a tuple. + local update_operations = { + {'+', 'age', 10}, + } + -- calculated bucket_id will be id % 10 = 18 % 10 = 8 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty + local result, err = g.cluster.main_server.net_box:call('crud.update', { + g.params.space_name, {18, 'Ivan'}, update_operations, + }) + t.assert_equals(err, nil) + t.assert_equals(result.rows, {}) + + -- Tuple on s1 replicaset was not updated. + local result = conn_s1.space[g.params.space_name]:get({18, 'Ivan'}) + t.assert_equals(result, {18, 2719, 'Ivan', 10}) +end + +pgroup.test_get = function(g) + -- bucket_id is id % 10 = 2 + local tuple = {12, 2, 'Ivan', 20} + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Get a tuple. + local result, err = g.cluster.main_server.net_box:call('crud.get', { + g.params.space_name, {12, 'Ivan'}, + }) + t.assert_equals(err, nil) + t.assert_equals(result.rows, {{12, 2, 'Ivan', 20}}) + + -- bucket_id is 2719, storage is s-1 + local tuple = {18, 2719, 'Ivan', 10} + + -- Put tuple to s1 replicaset. + local conn_s2 = g.cluster:server('s1-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- calculated bucket_id will be id % 10 = 18 % 10 = 8 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty + local result, err = g.cluster.main_server.net_box:call('crud.get', { + g.params.space_name, {18, 'Ivan'}, + }) + t.assert_equals(err, nil) + t.assert_equals(result.rows, {}) +end + +pgroup.test_delete = function(g) + -- bucket_id is id % 10 = 2 + local tuple = {12, 2, 'Ivan', 20} + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- Put tuple to s1 replicaset. + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Delete tuple. + local _, err = g.cluster.main_server.net_box:call('crud.delete', { + g.params.space_name, {12, 'Ivan'}, + }) + t.assert_equals(err, nil) + + -- There is a tuple on s1 replicaset. + local result = conn_s1.space[g.params.space_name]:get({12, 'Ivan'}) + t.assert_equals(result, {12, 2, 'Ivan', 20}) + + -- There is no tuple on s2 replicaset. + local result = conn_s2.space[g.params.space_name]:get({12, 'Ivan'}) + t.assert_equals(result, nil) + + -- bucket_id is 2719, storage is s-1 + local tuple = {18, 2719, 'Ivan', 20} + + -- Put tuple with to s1 replicaset. + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- calculated bucket_id will be id % 10 = 18 % 10 = 8 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty + local _, err = g.cluster.main_server.net_box:call('crud.delete', { + g.params.space_name, {18, 'Ivan'} + }) + t.assert_equals(err, nil) + + -- Tuple on s1 replicaset was not deleted. + local result = conn_s1.space[g.params.space_name]:get({18, 'Ivan'}) + t.assert_equals(result, {18, 2719, 'Ivan', 20}) +end + +cache_group.test_update_cache_with_incorrect_func = function(g) + local fieldno_sharding_func_name = 2 + + -- get data from cache for space with correct sharding func + local space_name = 'customers_G_func' + + local record_exist, err = helpers.update_sharding_func_cache(g.cluster, space_name) + t.assert_equals(err, nil) + t.assert_equals(record_exist, true) + + -- records for all spaces exist + local cache_size = helpers.get_sharding_func_cache_size(g.cluster) + t.assert_equals(cache_size, 2) + + -- no error just warning + local space_name = 'customers_G_func' + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_func', {space_name, fieldno_sharding_func_name, 'non_existent_func'}) + end) + + -- we get no error because we sent request for correct space + local record_exist, err = helpers.update_sharding_func_cache(g.cluster, 'customers_body_func') + t.assert_equals(err, nil) + t.assert_equals(record_exist, true) + + -- cache['customers_G_func'] == nil (space with incorrect func) + -- other records for correct spaces exist in cache + cache_size = helpers.get_sharding_func_cache_size(g.cluster) + t.assert_equals(cache_size, 1) + + -- get data from cache for space with incorrect sharding func + local space_name = 'customers_G_func' + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_func', {space_name, fieldno_sharding_func_name, 'non_existent_func'}) + end) + + -- we get an error because we sent request for incorrect space + local record_exist, err = helpers.update_sharding_func_cache(g.cluster, space_name) + t.assert_equals(record_exist, false) + t.assert_str_contains(err.err, + "Wrong sharding function specified in _ddl_sharding_func space for (customers_G_func) space") + + -- cache['customers_G_func'] == nil (space with incorrect func) + -- other records for correct spaces exist in cache + cache_size = helpers.get_sharding_func_cache_size(g.cluster) + t.assert_equals(cache_size, 1) +end diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index dc36f3688..afc1280d1 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -688,6 +688,8 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- records for all spaces exist sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers_G_func = {parts = {{fieldno = 1}}}, + customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, @@ -712,6 +714,8 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- other records for correct spaces exist in cache sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers_G_func = {parts = {{fieldno = 1}}}, + customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, @@ -735,6 +739,8 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- other records for correct spaces exist in cache sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers_G_func = {parts = {{fieldno = 1}}}, + customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, diff --git a/test/unit/sharding_metadata_test.lua b/test/unit/sharding_metadata_test.lua index b76af630c..9c5bfda80 100644 --- a/test/unit/sharding_metadata_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -15,15 +15,31 @@ g.before_each(function() {name = 'space_name', type = 'string', is_nullable = false}, {name = 'sharding_key', type = 'array', is_nullable = false} } - -- Create a space _ddl_sharding_key with a tuple that - -- contains a space name and it's sharding key. + + local sharding_func_format = { + {name = 'space_name', type = 'string', is_nullable = false}, + {name = 'sharding_func_name', type = 'string', is_nullable = true}, + {name = 'sharding_func_body', type = 'string', is_nullable = true}, + } + if type(box.cfg) ~= 'table' then helpers.box_cfg() end + + -- Create a space _ddl_sharding_key with a tuple that + -- contains a space name and it's sharding key. box.schema.space.create('_ddl_sharding_key', { format = sharding_key_format, }) box.space._ddl_sharding_key:create_index('pk') + + -- Create a space _ddl_sharding_func with a tuple that + -- contains a space name and it's sharding func name/body. + box.schema.space.create('_ddl_sharding_func', { + format = sharding_func_format, + }) + box.space._ddl_sharding_func:create_index('pk') + box.schema.space.create('fetch_on_storage') end) @@ -32,6 +48,11 @@ g.after_each(function() if box.space._ddl_sharding_key ~= nil then box.space._ddl_sharding_key:drop() end + + if box.space._ddl_sharding_func ~= nil then + box.space._ddl_sharding_func:drop() + end + box.space.fetch_on_storage:drop() cache.drop_caches() end) @@ -88,20 +109,25 @@ end g.test_fetch_sharding_metadata_on_storage_positive = function() local space_name = 'fetch_on_storage' local sharding_key_def = {'name', 'age'} + local sharding_func_def = 'sharding_func_name' box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) + box.space._ddl_sharding_func:insert({space_name, sharding_func_def}) local metadata_map = sharding_metadata_module.fetch_on_storage() t.assert_equals(metadata_map, { [space_name] = { sharding_key_def = sharding_key_def, + sharding_func_def = sharding_func_def, space_format = {} }, }) end g.test_fetch_sharding_key_on_storage_positive = function() + box.space._ddl_sharding_func:drop() + local space_name = 'fetch_on_storage' local sharding_key_def = {'name', 'age'} box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) @@ -116,9 +142,43 @@ g.test_fetch_sharding_key_on_storage_positive = function() }) end +g.test_fetch_sharding_func_name_on_storage_positive = function() + box.space._ddl_sharding_key:drop() + + local space_name = 'fetch_on_storage' + local sharding_func_def = 'sharding_func_name' + box.space._ddl_sharding_func:insert({space_name, sharding_func_def}) + + local metadata_map = sharding_metadata_module.fetch_on_storage() + + t.assert_equals(metadata_map, { + [space_name] = { + sharding_func_def = sharding_func_def, + }, + }) +end + +g.test_fetch_sharding_func_body_on_storage_positive = function() + box.space._ddl_sharding_key:drop() + + local space_name = 'fetch_on_storage' + local sharding_func_def = 'function(key) return key end' + box.space._ddl_sharding_func:insert({space_name, nil, sharding_func_def}) + + local metadata_map = sharding_metadata_module.fetch_on_storage() + + t.assert_equals(metadata_map, { + [space_name] = { + sharding_func_def = {body = sharding_func_def}, + }, + }) +end + g.test_fetch_sharding_metadata_on_storage_negative = function() - -- Test checks return value when _ddl_sharding_key is absent. + -- Test checks return value when _ddl_sharding_key + -- and _ddl_sharding_func are absent. box.space._ddl_sharding_key:drop() + box.space._ddl_sharding_func:drop() local metadata_map = sharding_metadata_module.fetch_on_storage() t.assert_equals(metadata_map, nil)