diff --git a/crud/borders.lua b/crud/borders.lua index cc8fed1d..82827fc6 100644 --- a/crud/borders.lua +++ b/crud/borders.lua @@ -67,15 +67,14 @@ else end end -local function call_get_border_on_router(border_name, space_name, index_name, opts) - checks('string', 'string', '?string|number', { +local function call_get_border_on_router(vshard_router, border_name, space_name, index_name, opts) + checks('table', 'string', 'string', '?string|number', { timeout = '?number', fields = '?table', }) opts = opts or {} - local vshard_router = vshard.router.static local replicasets = vshard_router:routeall() local space = utils.get_space(space_name, replicasets) if space == nil then @@ -105,7 +104,7 @@ local function call_get_border_on_router(border_name, space_name, index_name, op replicasets = replicasets, timeout = opts.timeout, } - local results, err = call.map( + local results, err = call.map(vshard_router, STAT_FUNC_NAME, {border_name, space_name, index.id, field_names}, call_opts @@ -161,8 +160,10 @@ local function call_get_border_on_router(border_name, space_name, index_name, op end local function get_border(border_name, space_name, index_name, opts) - return schema.wrap_func_reload( - call_get_border_on_router, border_name, space_name, index_name, opts + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, call_get_border_on_router, + border_name, space_name, index_name, opts ) end diff --git a/crud/common/call.lua b/crud/common/call.lua index bffb4391..8fc75e8c 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -1,4 +1,3 @@ -local vshard = require('vshard') local errors = require('errors') local dev_checks = require('crud.common.dev_checks') @@ -41,14 +40,13 @@ function call.get_vshard_call_name(mode, prefer_replica, balance) return 'callbre' end -local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id) +local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, bucket_id) -- Do not rewrite ShardingHashMismatchError class. if err.class_name == sharding_utils.ShardingHashMismatchError.name then return errors.wrap(err) end if replicaset_uuid == nil then - local vshard_router = vshard.router.static local replicaset, _ = vshard_router:route(bucket_id) if replicaset == nil then return CallError:new( @@ -67,8 +65,8 @@ local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id) )) end -function call.map(func_name, func_args, opts) - dev_checks('string', '?table', { +function call.map(vshard_router, func_name, func_args, opts) + dev_checks('table', 'string', '?table', { mode = 'string', prefer_replica = '?boolean', balance = '?boolean', @@ -88,7 +86,11 @@ function call.map(func_name, func_args, opts) local iter = opts.iter if iter == nil then - iter, err = BaseIterator:new({func_args = func_args, replicasets = opts.replicasets}) + iter, err = BaseIterator:new({ + func_args = func_args, + replicasets = opts.replicasets, + vshard_router = vshard_router, + }) if err ~= nil then return nil, err end @@ -136,8 +138,8 @@ function call.map(func_name, func_args, opts) return postprocessor:get() end -function call.single(bucket_id, func_name, func_args, opts) - dev_checks('number', 'string', '?table', { +function call.single(vshard_router, bucket_id, func_name, func_args, opts) + dev_checks('table', 'number', 'string', '?table', { mode = 'string', prefer_replica = '?boolean', balance = '?boolean', @@ -151,13 +153,12 @@ function call.single(bucket_id, func_name, func_args, opts) local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT - local vshard_router = vshard.router.static local res, err = vshard_router[vshard_call_name](vshard_router, bucket_id, func_name, func_args, { timeout = timeout, }) if err ~= nil then - return nil, wrap_vshard_err(err, func_name, nil, bucket_id) + return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id) end if res == box.NULL then @@ -167,17 +168,16 @@ function call.single(bucket_id, func_name, func_args, opts) return res end -function call.any(func_name, func_args, opts) - dev_checks('string', '?table', { +function call.any(vshard_router, func_name, func_args, opts) + dev_checks('table', 'string', '?table', { timeout = '?number', }) local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT - local vshard_router = vshard.router.static local replicasets, err = vshard_router:routeall() if replicasets == nil then - return nil, CallError:new("Failed to get all replicasets: %s", err.err) + return nil, CallError:new("Failed to get router replicasets: %s", err.err) end local replicaset = select(2, next(replicasets)) @@ -185,7 +185,7 @@ function call.any(func_name, func_args, opts) timeout = timeout, }) if err ~= nil then - return nil, wrap_vshard_err(err, func_name, replicaset.uuid) + return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset.uuid) end if res == box.NULL then diff --git a/crud/common/map_call_cases/base_iter.lua b/crud/common/map_call_cases/base_iter.lua index ad61c7f7..0fd0e00d 100644 --- a/crud/common/map_call_cases/base_iter.lua +++ b/crud/common/map_call_cases/base_iter.lua @@ -1,5 +1,4 @@ local errors = require('errors') -local vshard = require('vshard') local dev_checks = require('crud.common.dev_checks') local GetReplicasetsError = errors.new_class('GetReplicasetsError') @@ -24,14 +23,14 @@ function BaseIterator:new(opts) dev_checks('table', { func_args = '?table', replicasets = '?table', + vshard_router = 'table', }) local replicasets, err if opts.replicasets ~= nil then replicasets = opts.replicasets else - local vshard_router = vshard.router.static - replicasets, err = vshard_router:routeall() + replicasets, err = opts.vshard_router:routeall() if err ~= nil then return nil, GetReplicasetsError:new("Failed to get all replicasets: %s", err.err) end diff --git a/crud/common/map_call_cases/base_postprocessor.lua b/crud/common/map_call_cases/base_postprocessor.lua index 56963770..7502cf71 100644 --- a/crud/common/map_call_cases/base_postprocessor.lua +++ b/crud/common/map_call_cases/base_postprocessor.lua @@ -7,11 +7,12 @@ local BasePostprocessor = {} -- @function new -- -- @return[1] table postprocessor -function BasePostprocessor:new() +function BasePostprocessor:new(vshard_router) local postprocessor = { results = {}, early_exit = false, - errs = nil + errs = nil, + vshard_router = vshard_router, } setmetatable(postprocessor, self) @@ -58,7 +59,7 @@ function BasePostprocessor:collect(result_info, err_info) if err ~= nil then self.results = nil - self.errs = err_info.err_wrapper(err, unpack(err_info.wrapper_args)) + self.errs = err_info.err_wrapper(self.vshard_router, err, unpack(err_info.wrapper_args)) self.early_exit = true return self.early_exit diff --git a/crud/common/map_call_cases/batch_insert_iter.lua b/crud/common/map_call_cases/batch_insert_iter.lua index 637bce22..5fcbce14 100644 --- a/crud/common/map_call_cases/batch_insert_iter.lua +++ b/crud/common/map_call_cases/batch_insert_iter.lua @@ -32,9 +32,10 @@ function BatchInsertIterator:new(opts) tuples = 'table', space = 'table', execute_on_storage_opts = 'table', + vshard_router = 'table', }) - local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space) + local sharding_data, err = sharding.split_tuples_by_replicaset(opts.vshard_router, opts.tuples, opts.space) if err ~= nil then return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err) end diff --git a/crud/common/map_call_cases/batch_postprocessor.lua b/crud/common/map_call_cases/batch_postprocessor.lua index 2217854d..73a23983 100644 --- a/crud/common/map_call_cases/batch_postprocessor.lua +++ b/crud/common/map_call_cases/batch_postprocessor.lua @@ -51,7 +51,7 @@ function BatchPostprocessor:collect(result_info, err_info) err_to_wrap = err.err end - local err_obj = err_info.err_wrapper(err_to_wrap, unpack(err_info.wrapper_args)) + local err_obj = err_info.err_wrapper(self.vshard_router, err_to_wrap, unpack(err_info.wrapper_args)) err_obj.operation_data = err.operation_data err_obj.space_schema_hash = err.space_schema_hash diff --git a/crud/common/map_call_cases/batch_upsert_iter.lua b/crud/common/map_call_cases/batch_upsert_iter.lua index ad349b8b..fc393f1a 100644 --- a/crud/common/map_call_cases/batch_upsert_iter.lua +++ b/crud/common/map_call_cases/batch_upsert_iter.lua @@ -35,11 +35,15 @@ function BatchUpsertIterator:new(opts) space = 'table', operations = 'table', execute_on_storage_opts = 'table', + vshard_router = 'table', }) - local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space, { - operations = opts.operations, - }) + local sharding_data, err = sharding.split_tuples_by_replicaset( + opts.vshard_router, + opts.tuples, + opts.space, + {operations = opts.operations}) + if err ~= nil then return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err) end diff --git a/crud/common/schema.lua b/crud/common/schema.lua index 2325e799..ab86df07 100644 --- a/crud/common/schema.lua +++ b/crud/common/schema.lua @@ -1,7 +1,6 @@ local fiber = require('fiber') local msgpack = require('msgpack') local digest = require('digest') -local vshard = require('vshard') local errors = require('errors') local log = require('log') @@ -86,18 +85,17 @@ end -- This wrapper is used for functions that can fail if router uses outdated -- space schema. In case of such errors these functions returns `need_reload` -- for schema-dependent errors. -function schema.wrap_func_reload(func, ...) +function schema.wrap_func_reload(vshard_router, func, ...) local i = 0 local res, err, need_reload while true do - res, err, need_reload = func(...) + res, err, need_reload = func(vshard_router, ...) if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then break end - local vshard_router = vshard.router.static local ok, reload_schema_err = reload_schema(vshard_router) if not ok then log.warn("Failed to reload schema: %s", reload_schema_err) diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index 200dd12b..b5842917 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -1,4 +1,3 @@ -local vshard = require('vshard') local errors = require('errors') local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) @@ -13,8 +12,7 @@ local sharding_utils = require('crud.common.sharding.utils') local sharding = {} -function sharding.get_replicasets_by_bucket_id(bucket_id) - local vshard_router = vshard.router.static +function sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id) local replicaset, err = vshard_router:route(bucket_id) if replicaset == nil then return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err) @@ -25,14 +23,13 @@ function sharding.get_replicasets_by_bucket_id(bucket_id) } end -function sharding.key_get_bucket_id(space_name, key, specified_bucket_id) - dev_checks('string', '?', '?number|cdata') +function sharding.key_get_bucket_id(vshard_router, space_name, key, specified_bucket_id) + dev_checks('table', 'string', '?', '?number|cdata') if specified_bucket_id ~= nil then return { bucket_id = specified_bucket_id } end - local vshard_router = vshard.router.static local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name) if err ~= nil then return nil, err @@ -48,13 +45,12 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id) return { bucket_id = vshard_router:bucket_id_strcrc32(key) } end -function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) +function sharding.tuple_get_bucket_id(vshard_router, tuple, space, specified_bucket_id) if specified_bucket_id ~= nil then return { bucket_id = specified_bucket_id } end local sharding_index_parts = space.index[0].parts - local vshard_router = vshard.router.static local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space.name) if err ~= nil then return nil, err @@ -64,7 +60,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local key = utils.extract_key(tuple, sharding_index_parts) - local bucket_id_data, err = sharding.key_get_bucket_id(space.name, key, nil) + local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space.name, key, nil) if err ~= nil then return nil, err end @@ -76,7 +72,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) } end -function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id) +function sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, specified_bucket_id) local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space) if err ~= nil then return nil, BucketIDError:new("Failed to get bucket ID fieldno: %s", err) @@ -98,7 +94,7 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_ local sharding_data = { bucket_id = tuple[bucket_id_fieldno] } if sharding_data.bucket_id == nil then - sharding_data, err = sharding.tuple_get_bucket_id(tuple, space) + sharding_data, err = sharding.tuple_get_bucket_id(vshard_router, tuple, space) if err ~= nil then return nil, err end @@ -144,18 +140,18 @@ function sharding.batching_result_needs_sharding_reload(errs, tuples_count) return sharding_errs_count == tuples_count end -function sharding.wrap_method(method, space_name, ...) +function sharding.wrap_method(vshard_router, method, space_name, ...) local i = 0 local res, err, need_reload while true do - res, err, need_reload = method(space_name, ...) + res, err, need_reload = method(vshard_router, space_name, ...) if err == nil or need_reload ~= const.NEED_SHARDING_RELOAD then break end - sharding_metadata_module.reload_sharding_cache(space_name) + sharding_metadata_module.reload_sharding_cache(vshard_router, space_name) i = i + 1 @@ -169,12 +165,12 @@ end -- This wrapper assumes reload is performed inside the method and -- expect ShardingHashMismatchError error to be thrown. -function sharding.wrap_select_method(method, space_name, ...) +function sharding.wrap_select_method(vshard_router, method, space_name, ...) local i = 0 local ok, res, err while true do - ok, res, err = pcall(method, space_name, ...) + ok, res, err = pcall(method, vshard_router, space_name, ...) if ok == true then break @@ -212,8 +208,8 @@ end -- @return[1] batches -- Map where key is a replicaset and value -- is table of tuples related to this replicaset -function sharding.split_tuples_by_replicaset(tuples, space, opts) - dev_checks('table', 'table', { +function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) + dev_checks('table', 'table', 'table', { operations = '?table', }) @@ -227,7 +223,7 @@ function sharding.split_tuples_by_replicaset(tuples, space, opts) local sharding_data local err for i, tuple in ipairs(tuples) do - sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space) + sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space) if err ~= nil then return nil, BucketIDError:new("Failed to get bucket ID: %s", err) end @@ -244,7 +240,6 @@ function sharding.split_tuples_by_replicaset(tuples, space, opts) skip_sharding_hash_check = false end - local vshard_router = vshard.router.static local replicaset, err = vshard_router:route(sharding_data.bucket_id) if replicaset == nil then return nil, GetReplicasetsError:new( diff --git a/crud/common/sharding/sharding_func.lua b/crud/common/sharding/sharding_func.lua index 58fd696a..274c11ce 100644 --- a/crud/common/sharding/sharding_func.lua +++ b/crud/common/sharding/sharding_func.lua @@ -1,6 +1,5 @@ local errors = require('errors') local log = require('log') -local vshard = require('vshard') local dev_checks = require('crud.common.dev_checks') local router_cache = require('crud.common.sharding.router_metadata_cache') @@ -78,12 +77,11 @@ local function as_callable_object(sharding_func_def, space_name) ) end -function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name) - dev_checks('table', 'string') +function sharding_func_module.construct_as_callable_obj_cache(vshard_router, metadata_map, specified_space_name) + dev_checks('table', 'table', 'string') local result_err - local vshard_router = vshard.router.static local cache = router_cache.get_instance(vshard_router.name) cache[router_cache.SHARDING_FUNC_MAP_NAME] = {} local func_cache = cache[router_cache.SHARDING_FUNC_MAP_NAME] diff --git a/crud/common/sharding/sharding_key.lua b/crud/common/sharding/sharding_key.lua index 9be93cdd..c7f19f52 100644 --- a/crud/common/sharding/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -1,6 +1,5 @@ local errors = require('errors') local log = require('log') -local vshard = require('vshard') local dev_checks = require('crud.common.dev_checks') local router_cache = require('crud.common.sharding.router_metadata_cache') @@ -77,14 +76,14 @@ end -- Extract sharding key from pk. -- Returns a table with sharding key or pair of nil and error. -function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_obj, primary_index_parts, primary_key) - dev_checks('string', '?table', 'table', '?') +function sharding_key_module.extract_from_pk(vshard_router, space_name, sharding_key_as_index_obj, + primary_index_parts, primary_key) + dev_checks('table', 'string', '?table', 'table', '?') if sharding_key_as_index_obj == nil then return primary_key end - local vshard_router = vshard.router.static local cache = router_cache.get_instance(vshard_router.name) local res = is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj) if res == false then @@ -100,12 +99,11 @@ function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_o return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) end -function sharding_key_module.construct_as_index_obj_cache(metadata_map, specified_space_name) - dev_checks('table', 'string') +function sharding_key_module.construct_as_index_obj_cache(vshard_router, metadata_map, specified_space_name) + dev_checks('table', 'table', 'string') local result_err - local vshard_router = vshard.router.static local cache = router_cache.get_instance(vshard_router.name) cache[router_cache.SHARDING_KEY_MAP_NAME] = {} local key_cache = cache[router_cache.SHARDING_KEY_MAP_NAME] diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index 0aa1ce0a..5812fef8 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -1,7 +1,6 @@ local fiber = require('fiber') local errors = require('errors') local log = require('log') -local vshard = require('vshard') local call = require('crud.common.call') local const = require('crud.common.const') @@ -23,7 +22,7 @@ local sharding_metadata_module = {} local function locked(f) dev_checks('function') - return function(timeout, vshard_router, ...) + return function(vshard_router, space_name, metadata_map_name, timeout) local timeout_deadline = fiber.clock() + timeout local cache = router_cache.get_instance(vshard_router.name) @@ -38,7 +37,7 @@ local function locked(f) "Timeout for fetching sharding metadata is exceeded") end local timeout = timeout_deadline - fiber.clock() - local status, err = pcall(f, timeout, vshard_router, ...) + local status, err = pcall(f, vshard_router, space_name, metadata_map_name, timeout) cache.fetch_lock:get() if not status or err ~= nil then return err @@ -99,8 +98,8 @@ end -- cache.fetch_lock become unlocked during timeout passed to -- _fetch_on_router(). -- metadata_map_name == nil means forced reload. -local _fetch_on_router = locked(function(timeout, vshard_router, space_name, metadata_map_name) - dev_checks('number', 'table', 'string', '?string') +local _fetch_on_router = locked(function(vshard_router, space_name, metadata_map_name, timeout) + dev_checks('table', 'string', '?string', 'number') local cache = router_cache.get_instance(vshard_router.name) @@ -108,7 +107,7 @@ local _fetch_on_router = locked(function(timeout, vshard_router, space_name, met return end - local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, { + local metadata_map, err = call.any(vshard_router, FETCH_FUNC_NAME, {}, { timeout = timeout }) if err ~= nil then @@ -124,12 +123,12 @@ local _fetch_on_router = locked(function(timeout, vshard_router, space_name, met return end - local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name) + local err = sharding_key.construct_as_index_obj_cache(vshard_router, metadata_map, space_name) if err ~= nil then return err end - local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name) + local err = sharding_func.construct_as_callable_obj_cache(vshard_router, metadata_map, space_name) if err ~= nil then return err end @@ -146,7 +145,7 @@ local function fetch_on_router(vshard_router, space_name, metadata_map_name, tim end local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT - local err = _fetch_on_router(timeout, vshard_router, space_name, metadata_map_name) + local err = _fetch_on_router(vshard_router, space_name, metadata_map_name, timeout) if err ~= nil then return nil, err end @@ -192,25 +191,22 @@ function sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, s return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_FUNC_MAP_NAME, timeout) end -function sharding_metadata_module.update_sharding_key_cache(space_name) - local vshard_router = vshard.router.static +function sharding_metadata_module.update_sharding_key_cache(vshard_router, space_name) router_cache.drop_instance(vshard_router.name) return sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) end -function sharding_metadata_module.update_sharding_func_cache(space_name) - local vshard_router = vshard.router.static +function sharding_metadata_module.update_sharding_func_cache(vshard_router, space_name) router_cache.drop_instance(vshard_router.name) return sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name) end -function sharding_metadata_module.reload_sharding_cache(space_name) - local vshard_router = vshard.router.static +function sharding_metadata_module.reload_sharding_cache(vshard_router, space_name) router_cache.drop_instance(vshard_router.name) - local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, vshard_router, space_name, nil) + local err = _fetch_on_router(vshard_router, space_name, nil, const.FETCH_SHARDING_METADATA_TIMEOUT) if err ~= nil then log.warn('Failed to reload sharding cache: %s', err) end diff --git a/crud/common/sharding_func.lua b/crud/common/sharding_func.lua index 501ca709..19aa81e6 100644 --- a/crud/common/sharding_func.lua +++ b/crud/common/sharding_func.lua @@ -1,4 +1,5 @@ local log = require('log') +local vshard = require('vshard') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') @@ -10,10 +11,15 @@ local sharding_func_cache = {} -- projects like `require('crud.common.sharding_key').update_cache()` -- This method provides similar behavior for -- sharding function cache. -function sharding_func_cache.update_cache(space_name) +function sharding_func_cache.update_cache(space_name, vshard_router) log.warn("require('crud.common.sharding_func').update_cache()" .. "is deprecated and will be removed in future releases") - return sharding_metadata_module.update_sharding_func_cache(space_name) + + if vshard_router == nil then + vshard_router = vshard.router.static + end + + return sharding_metadata_module.update_sharding_func_cache(vshard_router, space_name) end return sharding_func_cache diff --git a/crud/common/sharding_key.lua b/crud/common/sharding_key.lua index 30d75e7b..f5fc3c90 100644 --- a/crud/common/sharding_key.lua +++ b/crud/common/sharding_key.lua @@ -1,4 +1,5 @@ local log = require('log') +local vshard = require('vshard') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') @@ -8,10 +9,15 @@ local sharding_key_cache = {} -- we already have customers using old API -- for updating sharding key cache in their -- projects like `require('crud.common.sharding_key').update_cache()` -function sharding_key_cache.update_cache(space_name) +function sharding_key_cache.update_cache(space_name, vshard_router) log.warn("require('crud.common.sharding_key').update_cache()" .. "is deprecated and will be removed in future releases") - return sharding_metadata_module.update_sharding_key_cache(space_name) + + if vshard_router == nil then + vshard_router = vshard.router.static + end + + return sharding_metadata_module.update_sharding_key_cache(vshard_router, space_name) end return sharding_key_cache diff --git a/crud/common/utils.lua b/crud/common/utils.lua index ff07608b..c59dfa90 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -633,8 +633,7 @@ function utils.cut_rows(rows, metadata, field_names) } end -local function flatten_obj(space_name, obj) - local vshard_router = vshard.router.static +local function flatten_obj(vshard_router, space_name, obj) local space_format, err = utils.get_space_format(space_name, vshard_router:routeall()) if err ~= nil then return nil, FlattenError:new("Failed to get space format: %s", err), const.NEED_SCHEMA_RELOAD @@ -648,8 +647,8 @@ local function flatten_obj(space_name, obj) return tuple end -function utils.flatten_obj_reload(space_name, obj) - return schema.wrap_func_reload(flatten_obj, space_name, obj) +function utils.flatten_obj_reload(vshard_router, space_name, obj) + return schema.wrap_func_reload(vshard_router, flatten_obj, space_name, obj) end -- Merge two options map. diff --git a/crud/count.lua b/crud/count.lua index bbab947d..0aba0423 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -110,8 +110,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_count_on_router(space_name, user_conditions, opts) - checks('string', '?table', { +local function call_count_on_router(vshard_router, space_name, user_conditions, opts) + checks('table', 'string', '?table', { timeout = '?number', bucket_id = '?number|cdata', force_map_call = '?boolean', @@ -134,10 +134,9 @@ local function call_count_on_router(space_name, user_conditions, opts) return nil, CountError:new("Failed to parse conditions: %s", err) end - local vshard_router = vshard.router.static local replicasets, err = vshard_router:routeall() if err ~= nil then - return nil, CountError:new("Failed to get all replicasets: %s", err) + return nil, CountError:new("Failed to get router replicasets: %s", err) end local space = utils.get_space(space_name, replicasets) @@ -204,7 +203,8 @@ local function call_count_on_router(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id_data, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, + plan.sharding_key, opts.bucket_id) if err ~= nil then return nil, err end @@ -214,7 +214,7 @@ local function call_count_on_router(space_name, user_conditions, opts) sharding_func_hash = bucket_id_data.sharding_func_hash local err - replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id) + replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id_data.bucket_id) if err ~= nil then return nil, err, const.NEED_SCHEMA_RELOAD end @@ -242,7 +242,7 @@ local function call_count_on_router(space_name, user_conditions, opts) skip_sharding_hash_check = skip_sharding_hash_check, } - local results, err = call.map(COUNT_FUNC_NAME, { + local results, err = call.map(vshard_router, COUNT_FUNC_NAME, { space_name, plan.index_id, plan.conditions, count_opts }, call_opts) @@ -324,8 +324,10 @@ function count.call(space_name, user_conditions, opts) mode = '?string', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_count_on_router, space_name, user_conditions, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_count_on_router, + space_name, user_conditions, opts) end return count diff --git a/crud/delete.lua b/crud/delete.lua index e2d317fc..3bca594e 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -55,8 +55,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_delete_on_router(space_name, key, opts) - dev_checks('string', '?', { +local function call_delete_on_router(vshard_router, space_name, key, opts) + dev_checks('table', 'string', '?', { timeout = '?number', bucket_id = '?number|cdata', fields = '?table', @@ -64,7 +64,6 @@ local function call_delete_on_router(space_name, key, opts) opts = opts or {} - local vshard_router = vshard.router.static local space = utils.get_space(space_name, vshard_router:routeall()) if space == nil then return nil, DeleteError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD @@ -86,7 +85,8 @@ local function call_delete_on_router(space_name, key, opts) return nil, err end - sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key, err = sharding_key_module.extract_from_pk(vshard_router, + space_name, sharding_key_data.value, primary_index_parts, key) if err ~= nil then @@ -98,7 +98,7 @@ local function call_delete_on_router(space_name, key, opts) skip_sharding_hash_check = true end - local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, sharding_key, opts.bucket_id) if err ~= nil then return nil, err end @@ -114,7 +114,7 @@ local function call_delete_on_router(space_name, key, opts) timeout = opts.timeout, } - local storage_result, err = call.single( + local storage_result, err = call.single(vshard_router, bucket_id_data.bucket_id, DELETE_FUNC_NAME, {space_name, key, opts.fields, delete_on_storage_opts}, call_opts @@ -167,8 +167,10 @@ function delete.call(space_name, key, opts) fields = '?table', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_delete_on_router, space_name, key, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_delete_on_router, + space_name, key, opts) end return delete diff --git a/crud/get.lua b/crud/get.lua index 0bffce48..c88bec40 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -55,8 +55,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_get_on_router(space_name, key, opts) - dev_checks('string', '?', { +local function call_get_on_router(vshard_router, space_name, key, opts) + dev_checks('table', 'string', '?', { timeout = '?number', bucket_id = '?number|cdata', fields = '?table', @@ -67,7 +67,6 @@ local function call_get_on_router(space_name, key, opts) opts = opts or {} - local vshard_router = vshard.router.static local space = utils.get_space(space_name, vshard_router:routeall()) if space == nil then return nil, GetError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD @@ -89,7 +88,8 @@ local function call_get_on_router(space_name, key, opts) return nil, err end - sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key, err = sharding_key_module.extract_from_pk(vshard_router, + space_name, sharding_key_data.value, primary_index_parts, key) if err ~= nil then @@ -101,7 +101,7 @@ local function call_get_on_router(space_name, key, opts) skip_sharding_hash_check = true end - local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, sharding_key, opts.bucket_id) if err ~= nil then return nil, err end @@ -119,7 +119,7 @@ local function call_get_on_router(space_name, key, opts) timeout = opts.timeout, } - local storage_result, err = call.single( + local storage_result, err = call.single(vshard_router, bucket_id_data.bucket_id, GET_FUNC_NAME, {space_name, key, opts.fields, get_on_storage_opts}, call_opts @@ -186,8 +186,10 @@ function get.call(space_name, key, opts) mode = '?string', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_get_on_router, space_name, key, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_get_on_router, + space_name, key, opts) end return get diff --git a/crud/insert.lua b/crud/insert.lua index a996a7c7..5412c85f 100644 --- a/crud/insert.lua +++ b/crud/insert.lua @@ -56,8 +56,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_insert_on_router(space_name, original_tuple, opts) - dev_checks('string', 'table', { +local function call_insert_on_router(vshard_router, space_name, original_tuple, opts) + dev_checks('table', 'string', 'table', { timeout = '?number', bucket_id = '?number|cdata', add_space_schema_hash = '?boolean', @@ -66,7 +66,6 @@ local function call_insert_on_router(space_name, original_tuple, opts) opts = opts or {} - local vshard_router = vshard.router.static local space = utils.get_space(space_name, vshard_router:routeall()) if space == nil then return nil, InsertError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD @@ -74,7 +73,7 @@ local function call_insert_on_router(space_name, original_tuple, opts) local tuple = table.deepcopy(original_tuple) - local sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) + local sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, opts.bucket_id) if err ~= nil then return nil, InsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD end @@ -92,7 +91,7 @@ local function call_insert_on_router(space_name, original_tuple, opts) timeout = opts.timeout, } - local storage_result, err = call.single( + local storage_result, err = call.single(vshard_router, sharding_data.bucket_id, INSERT_FUNC_NAME, {space_name, tuple, insert_on_storage_opts}, call_opts @@ -152,8 +151,10 @@ function insert.tuple(space_name, tuple, opts) fields = '?table', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_insert_on_router, space_name, tuple, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router, + space_name, tuple, opts) end --- Inserts an object to the specified space @@ -174,17 +175,25 @@ end -- @treturn[2] table Error description -- function insert.object(space_name, obj, opts) - checks('string', 'table', '?table') + checks('string', 'table', { + timeout = '?number', + bucket_id = '?number|cdata', + add_space_schema_hash = '?boolean', + fields = '?table', + }) + + local vshard_router = vshard.router.static -- insert can fail if router uses outdated schema to flatten object opts = utils.merge_options(opts, {add_space_schema_hash = true}) - local tuple, err = utils.flatten_obj_reload(space_name, obj) + local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj) if err ~= nil then return nil, InsertError:new("Failed to flatten object: %s", err) end - return insert.tuple(space_name, tuple, opts) + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_on_router, + space_name, tuple, opts) end return insert diff --git a/crud/insert_many.lua b/crud/insert_many.lua index cbb4e189..bdb07661 100644 --- a/crud/insert_many.lua +++ b/crud/insert_many.lua @@ -122,8 +122,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_insert_many_on_router(space_name, original_tuples, opts) - dev_checks('string', 'table', { +local function call_insert_many_on_router(vshard_router, space_name, original_tuples, opts) + dev_checks('table', 'string', 'table', { timeout = '?number', fields = '?table', add_space_schema_hash = '?boolean', @@ -133,7 +133,6 @@ local function call_insert_many_on_router(space_name, original_tuples, opts) opts = opts or {} - local vshard_router = vshard.router.static local space = utils.get_space(space_name, vshard_router:routeall()) if space == nil then return nil, {InsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD @@ -152,14 +151,15 @@ local function call_insert_many_on_router(space_name, original_tuples, opts) tuples = tuples, space = space, execute_on_storage_opts = batch_insert_on_storage_opts, + vshard_router = vshard_router, }) if err ~= nil then return nil, {err}, const.NEED_SCHEMA_RELOAD end - local postprocessor = BatchPostprocessor:new() + local postprocessor = BatchPostprocessor:new(vshard_router) - local rows, errs = call.map(INSERT_MANY_FUNC_NAME, nil, { + local rows, errs = call.map(vshard_router, INSERT_MANY_FUNC_NAME, nil, { timeout = opts.timeout, mode = 'write', iter = iter, @@ -217,8 +217,10 @@ function insert_many.tuples(space_name, tuples, opts) rollback_on_error = '?boolean', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_insert_many_on_router, space_name, tuples, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router, + space_name, tuples, opts) end --- Inserts batch of objects to the specified space @@ -246,6 +248,8 @@ function insert_many.objects(space_name, objs, opts) rollback_on_error = '?boolean', }) + local vshard_router = vshard.router.static + -- insert can fail if router uses outdated schema to flatten object opts = utils.merge_options(opts, {add_space_schema_hash = true}) @@ -254,7 +258,7 @@ function insert_many.objects(space_name, objs, opts) for _, obj in ipairs(objs) do - local tuple, err = utils.flatten_obj_reload(space_name, obj) + local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj) if err ~= nil then local err_obj = InsertManyError:new("Failed to flatten object: %s", err) err_obj.operation_data = obj @@ -273,7 +277,8 @@ function insert_many.objects(space_name, objs, opts) return nil, format_errs end - local res, errs = insert_many.tuples(space_name, tuples, opts) + local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_insert_many_on_router, + space_name, tuples, opts) if next(format_errs) ~= nil then if errs == nil then diff --git a/crud/replace.lua b/crud/replace.lua index 85ac90fb..9234f9b3 100644 --- a/crud/replace.lua +++ b/crud/replace.lua @@ -56,8 +56,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_replace_on_router(space_name, original_tuple, opts) - dev_checks('string', 'table', { +local function call_replace_on_router(vshard_router, space_name, original_tuple, opts) + dev_checks('table', 'string', 'table', { timeout = '?number', bucket_id = '?number|cdata', add_space_schema_hash = '?boolean', @@ -66,7 +66,6 @@ local function call_replace_on_router(space_name, original_tuple, opts) opts = opts or {} - local vshard_router = vshard.router.static local space, err = utils.get_space(space_name, vshard_router:routeall()) if err ~= nil then return nil, ReplaceError:new("Failed to get space %q: %s", space_name, err), const.NEED_SCHEMA_RELOAD @@ -78,7 +77,7 @@ local function call_replace_on_router(space_name, original_tuple, opts) local tuple = table.deepcopy(original_tuple) - local sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) + local sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, opts.bucket_id) if err ~= nil then return nil, ReplaceError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD end @@ -95,7 +94,7 @@ local function call_replace_on_router(space_name, original_tuple, opts) mode = 'write', timeout = opts.timeout, } - local storage_result, err = call.single( + local storage_result, err = call.single(vshard_router, sharding_data.bucket_id, REPLACE_FUNC_NAME, {space_name, tuple, replace_on_storage_opts}, call_opts @@ -155,8 +154,10 @@ function replace.tuple(space_name, tuple, opts) fields = '?table', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_replace_on_router, space_name, tuple, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_replace_on_router, + space_name, tuple, opts) end --- Insert or replace an object in the specified space @@ -177,17 +178,25 @@ end -- @treturn[2] table Error description -- function replace.object(space_name, obj, opts) - checks('string', 'table', '?table') + checks('string', 'table', { + timeout = '?number', + bucket_id = '?number|cdata', + add_space_schema_hash = '?boolean', + fields = '?table', + }) + + local vshard_router = vshard.router.static -- replace can fail if router uses outdated schema to flatten object opts = utils.merge_options(opts, {add_space_schema_hash = true}) - local tuple, err = utils.flatten_obj_reload(space_name, obj) + local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj) if err ~= nil then return nil, ReplaceError:new("Failed to flatten object: %s", err) end - return replace.tuple(space_name, tuple, opts) + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_replace_on_router, + space_name, tuple, opts) end return replace diff --git a/crud/replace_many.lua b/crud/replace_many.lua index e318559f..c75cf29e 100644 --- a/crud/replace_many.lua +++ b/crud/replace_many.lua @@ -124,8 +124,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_replace_many_on_router(space_name, original_tuples, opts) - dev_checks('string', 'table', { +local function call_replace_many_on_router(vshard_router, space_name, original_tuples, opts) + dev_checks('table', 'string', 'table', { timeout = '?number', fields = '?table', add_space_schema_hash = '?boolean', @@ -135,7 +135,6 @@ local function call_replace_many_on_router(space_name, original_tuples, opts) opts = opts or {} - local vshard_router = vshard.router.static local space = utils.get_space(space_name, vshard_router:routeall()) if space == nil then return nil, {ReplaceManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD @@ -154,14 +153,15 @@ local function call_replace_many_on_router(space_name, original_tuples, opts) tuples = tuples, space = space, execute_on_storage_opts = replace_many_on_storage_opts, + vshard_router = vshard_router, }) if err ~= nil then return nil, {err}, const.NEED_SCHEMA_RELOAD end - local postprocessor = BatchPostprocessor:new() + local postprocessor = BatchPostprocessor:new(vshard_router) - local rows, errs = call.map(REPLACE_MANY_FUNC_NAME, nil, { + local rows, errs = call.map(vshard_router, REPLACE_MANY_FUNC_NAME, nil, { timeout = opts.timeout, mode = 'write', iter = iter, @@ -219,8 +219,10 @@ function replace_many.tuples(space_name, tuples, opts) rollback_on_error = '?boolean', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_replace_many_on_router, space_name, tuples, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_replace_many_on_router, + space_name, tuples, opts) end --- Replace batch of objects to the specified space @@ -248,6 +250,8 @@ function replace_many.objects(space_name, objs, opts) rollback_on_error = '?boolean', }) + local vshard_router = vshard.router.static + -- insert can fail if router uses outdated schema to flatten object opts = utils.merge_options(opts, {add_space_schema_hash = true}) @@ -256,7 +260,7 @@ function replace_many.objects(space_name, objs, opts) for _, obj in ipairs(objs) do - local tuple, err = utils.flatten_obj_reload(space_name, obj) + local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj) if err ~= nil then local err_obj = ReplaceManyError:new("Failed to flatten object: %s", err) err_obj.operation_data = obj @@ -275,7 +279,8 @@ function replace_many.objects(space_name, objs, opts) return nil, format_errs end - local res, errs = replace_many.tuples(space_name, tuples, opts) + local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_replace_many_on_router, + space_name, tuples, opts) if next(format_errs) ~= nil then if errs == nil then diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index d16e0823..f9d6a189 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -20,8 +20,8 @@ local SelectError = errors.new_class('SelectError') local select_module = {} -local function build_select_iterator(space_name, user_conditions, opts) - dev_checks('string', '?table', { +local function build_select_iterator(vshard_router, space_name, user_conditions, opts) + dev_checks('table', 'string', '?table', { after = '?table|cdata', first = '?number', batch_size = '?number', @@ -43,10 +43,9 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Failed to parse conditions: %s", err) end - local vshard_router = vshard.router.static local replicasets, err = vshard_router:routeall() if err ~= nil then - return nil, SelectError:new("Failed to get all replicasets: %s", err) + return nil, SelectError:new("Failed to get router replicasets: %s", err) end local space = utils.get_space(space_name, replicasets) @@ -117,7 +116,8 @@ local function build_select_iterator(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id_data, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, + plan.sharding_key, opts.bucket_id) if err ~= nil then return nil, err end @@ -125,7 +125,7 @@ local function build_select_iterator(space_name, user_conditions, opts) assert(bucket_id_data.bucket_id ~= nil) local err - replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id) + replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id_data.bucket_id) if err ~= nil then return nil, err, const.NEED_SCHEMA_RELOAD end @@ -165,7 +165,7 @@ local function build_select_iterator(space_name, user_conditions, opts) skip_sharding_hash_check = skip_sharding_hash_check, } - local merger = Merger.new(replicasets_to_select, space, plan.index_id, + local merger = Merger.new(vshard_router, replicasets_to_select, space, plan.index_id, common.SELECT_FUNC_NAME, {space_name, plan.index_id, plan.conditions, select_opts}, {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts} @@ -208,6 +208,8 @@ function select_module.pairs(space_name, user_conditions, opts) error(string.format("Negative first isn't allowed for pairs")) end + local vshard_router = vshard.router.static + local iterator_opts = { after = opts.after, first = opts.first, @@ -224,7 +226,7 @@ function select_module.pairs(space_name, user_conditions, opts) } local iter, err = schema.wrap_func_reload( - build_select_iterator, space_name, user_conditions, iterator_opts + vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts ) if err ~= nil then @@ -250,8 +252,8 @@ function select_module.pairs(space_name, user_conditions, opts) return gen, param, state end -local function select_module_call_xc(space_name, user_conditions, opts) - checks('string', '?table', { +local function select_module_call_xc(vshard_router, space_name, user_conditions, opts) + checks('table', 'string', '?table', { after = '?table|cdata', first = '?number', batch_size = '?number', @@ -290,7 +292,7 @@ local function select_module_call_xc(space_name, user_conditions, opts) } local iter, err = schema.wrap_func_reload( - build_select_iterator, space_name, user_conditions, iterator_opts + vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts ) if err ~= nil then @@ -322,8 +324,10 @@ local function select_module_call_xc(space_name, user_conditions, opts) end function select_module.call(space_name, user_conditions, opts) - return SelectError:pcall(sharding.wrap_select_method, - select_module_call_xc, space_name, user_conditions, opts) + local vshard_router = vshard.router.static + + return SelectError:pcall(sharding.wrap_select_method, vshard_router, select_module_call_xc, + space_name, user_conditions, opts) end return select_module diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 2aea31b9..640bff60 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -30,6 +30,7 @@ local function select_iteration(space_name, plan, opts) limit = 'number', call_opts = 'table', sharding_hash = 'table', + vshard_router = 'table', }) local call_opts = opts.call_opts @@ -51,7 +52,7 @@ local function select_iteration(space_name, plan, opts) space_name, plan.index_id, plan.conditions, storage_select_opts, } - local results, err = call.map(common.SELECT_FUNC_NAME, storage_select_args, { + local results, err = call.map(opts.vshard_router, common.SELECT_FUNC_NAME, storage_select_args, { replicasets = opts.replicasets, timeout = call_opts.timeout, mode = call_opts.mode or 'read', @@ -82,8 +83,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function build_select_iterator(space_name, user_conditions, opts) - dev_checks('string', '?table', { +local function build_select_iterator(vshard_router, space_name, user_conditions, opts) + dev_checks('table', 'string', '?table', { after = '?table', first = '?number', batch_size = '?number', @@ -107,10 +108,9 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Failed to parse conditions: %s", err) end - local vshard_router = vshard.router.static local replicasets, err = vshard_router:routeall() if err ~= nil then - return nil, SelectError:new("Failed to get all replicasets: %s", err) + return nil, SelectError:new("Failed to get router replicasets: %s", err) end local space = utils.get_space(space_name, replicasets) @@ -156,7 +156,8 @@ local function build_select_iterator(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id_data, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, + plan.sharding_key, opts.bucket_id) if err ~= nil then return nil, err end @@ -164,7 +165,7 @@ local function build_select_iterator(space_name, user_conditions, opts) assert(bucket_id_data.bucket_id ~= nil) local err - replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id) + replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id_data.bucket_id) if err ~= nil then return nil, err, const.NEED_SCHEMA_RELOAD end @@ -212,6 +213,7 @@ local function build_select_iterator(space_name, user_conditions, opts) call_opts = opts.call_opts, sharding_hash = sharding_hash, + vshard_router = vshard_router, }) return iter @@ -239,6 +241,8 @@ function select_module.pairs(space_name, user_conditions, opts) error(string.format("Negative first isn't allowed for pairs")) end + local vshard_router = vshard.router.static + local iterator_opts = { after = opts.after, first = opts.first, @@ -255,7 +259,7 @@ function select_module.pairs(space_name, user_conditions, opts) } local iter, err = schema.wrap_func_reload( - build_select_iterator, space_name, user_conditions, iterator_opts + vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts ) if err ~= nil then @@ -271,7 +275,7 @@ function select_module.pairs(space_name, user_conditions, opts) local tuple, err = iter:get() if err ~= nil then if sharding.result_needs_sharding_reload(err) then - sharding_metadata_module.reload_sharding_cache(space_name) + sharding_metadata_module.reload_sharding_cache(vshard_router, space_name) end error(string.format("Failed to get next object: %s", err)) @@ -301,8 +305,8 @@ function select_module.pairs(space_name, user_conditions, opts) return gen, param, state end -local function select_module_call_xc(space_name, user_conditions, opts) - dev_checks('string', '?table', '?table') +local function select_module_call_xc(vshard_router, space_name, user_conditions, opts) + dev_checks('table', 'string', '?table', '?table') opts = opts or {} @@ -328,7 +332,7 @@ local function select_module_call_xc(space_name, user_conditions, opts) } local iter, err = schema.wrap_func_reload( - build_select_iterator, space_name, user_conditions, iterator_opts + vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts ) if err ~= nil then return nil, err @@ -380,7 +384,9 @@ function select_module.call(space_name, user_conditions, opts) timeout = '?number', }) - return sharding.wrap_method(select_module_call_xc, space_name, user_conditions, opts) + local vshard_router = vshard.router.static + + return sharding.wrap_method(vshard_router, select_module_call_xc, space_name, user_conditions, opts) end return select_module diff --git a/crud/select/iterator.lua b/crud/select/iterator.lua index 285df01c..1053ea22 100644 --- a/crud/select/iterator.lua +++ b/crud/select/iterator.lua @@ -107,6 +107,7 @@ local function update_replicasets_tuples(iter, after_tuple, replicaset_uuid) field_names = iter.field_names, call_opts = iter.call_opts, sharding_hash = iter.sharding_hash, + vshard_router = iter.vshard_router, }) if err ~= nil then if sharding.result_needs_sharding_reload(err) then diff --git a/crud/select/merger.lua b/crud/select/merger.lua index 6b682e92..4788d364 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -104,6 +104,7 @@ local function fetch_chunk(context, state) local vshard_call_name = context.vshard_call_name local timeout = context.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT local space_name = context.space_name + local vshard_router = context.vshard_router local future = state.future -- The source was entirely drained. @@ -125,7 +126,7 @@ local function fetch_chunk(context, state) local wrapped_err = errors.wrap(err) if sharding.result_needs_sharding_reload(err) then - sharding_metadata_module.reload_sharding_cache(space_name) + sharding_metadata_module.reload_sharding_cache(vshard_router, space_name) end error(wrapped_err) @@ -163,7 +164,7 @@ local reverse_tarantool_iters = { [box.index.REQ] = true, } -local function new(replicasets, space, index_id, func_name, func_args, opts) +local function new(vshard_router, replicasets, space, index_id, func_name, func_args, opts) opts = opts or {} local call_opts = opts.call_opts local mode = call_opts.mode or 'read' @@ -188,6 +189,7 @@ local function new(replicasets, space, index_id, func_name, func_args, opts) vshard_call_name = vshard_call_name, timeout = call_opts.timeout, space_name = space.name, + vshard_router = vshard_router, } local state = {future = future} local source = merger_lib.new_buffer_source(fetch_chunk, context, state) diff --git a/crud/stats/init.lua b/crud/stats/init.lua index 49152939..1b013f58 100644 --- a/crud/stats/init.lua +++ b/crud/stats/init.lua @@ -251,6 +251,7 @@ end local function resolve_space_name(space_id) local vshard_router = vshard.router.static + local replicasets = vshard_router:routeall() if next(replicasets) == nil then log.warn('Failed to resolve space name for stats: no replicasets found') diff --git a/crud/truncate.lua b/crud/truncate.lua index 58fb3417..bd91e889 100644 --- a/crud/truncate.lua +++ b/crud/truncate.lua @@ -49,7 +49,7 @@ function truncate.call(space_name, opts) local vshard_router = vshard.router.static local replicasets = vshard_router:routeall() - local _, err = call.map(TRUNCATE_FUNC_NAME, {space_name}, { + local _, err = call.map(vshard_router, TRUNCATE_FUNC_NAME, {space_name}, { mode = 'write', replicasets = replicasets, timeout = opts.timeout, diff --git a/crud/update.lua b/crud/update.lua index 3ed058cc..328a756e 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -77,8 +77,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_update_on_router(space_name, key, user_operations, opts) - dev_checks('string', '?', 'table', { +local function call_update_on_router(vshard_router, space_name, key, user_operations, opts) + dev_checks('table', 'string', '?', 'table', { timeout = '?number', bucket_id = '?number|cdata', fields = '?table', @@ -86,7 +86,6 @@ local function call_update_on_router(space_name, key, user_operations, opts) opts = opts or {} - local vshard_router = vshard.router.static local space, err = utils.get_space(space_name, vshard_router:routeall()) if err ~= nil then return nil, UpdateError:new("Failed to get space %q: %s", space_name, err), const.NEED_SCHEMA_RELOAD @@ -114,7 +113,8 @@ local function call_update_on_router(space_name, key, user_operations, opts) return nil, err end - sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key, err = sharding_key_module.extract_from_pk(vshard_router, + space_name, sharding_key_data.value, primary_index_parts, key) if err ~= nil then @@ -134,7 +134,7 @@ local function call_update_on_router(space_name, key, user_operations, opts) end end - local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, sharding_key, opts.bucket_id) if err ~= nil then return nil, err end @@ -150,7 +150,7 @@ local function call_update_on_router(space_name, key, user_operations, opts) timeout = opts.timeout, } - local storage_result, err = call.single( + local storage_result, err = call.single(vshard_router, bucket_id_data.bucket_id, UPDATE_FUNC_NAME, {space_name, key, operations, opts.fields, update_on_storage_opts}, call_opts @@ -207,8 +207,10 @@ function update.call(space_name, key, user_operations, opts) fields = '?table', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_update_on_router, space_name, key, user_operations, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_update_on_router, + space_name, key, user_operations, opts) end return update diff --git a/crud/upsert.lua b/crud/upsert.lua index e602c84e..55b996b8 100644 --- a/crud/upsert.lua +++ b/crud/upsert.lua @@ -54,8 +54,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_upsert_on_router(space_name, original_tuple, user_operations, opts) - dev_checks('string', '?', 'table', { +local function call_upsert_on_router(vshard_router, space_name, original_tuple, user_operations, opts) + dev_checks('table', 'string', '?', 'table', { timeout = '?number', bucket_id = '?number|cdata', add_space_schema_hash = '?boolean', @@ -64,7 +64,6 @@ local function call_upsert_on_router(space_name, original_tuple, user_operations opts = opts or {} - local vshard_router = vshard.router.static local space, err = utils.get_space(space_name, vshard_router:routeall()) if err ~= nil then return nil, UpsertError:new("Failed to get space %q: %s", space_name, err), const.NEED_SCHEMA_RELOAD @@ -85,7 +84,7 @@ local function call_upsert_on_router(space_name, original_tuple, user_operations local tuple = table.deepcopy(original_tuple) - local sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space, opts.bucket_id) + local sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, opts.bucket_id) if err ~= nil then return nil, UpsertError:new("Failed to get bucket ID: %s", err), const.NEED_SCHEMA_RELOAD end @@ -103,7 +102,7 @@ local function call_upsert_on_router(space_name, original_tuple, user_operations timeout = opts.timeout, } - local storage_result, err = call.single( + local storage_result, err = call.single(vshard_router, sharding_data.bucket_id, UPSERT_FUNC_NAME, {space_name, tuple, operations, upsert_on_storage_opts}, call_opts @@ -166,8 +165,10 @@ function upsert.tuple(space_name, tuple, user_operations, opts) fields = '?table', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_upsert_on_router, space_name, tuple, user_operations, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_upsert_on_router, + space_name, tuple, user_operations, opts) end --- Update or insert an object in the specified space @@ -192,17 +193,24 @@ end -- @treturn[2] table Error description -- function upsert.object(space_name, obj, user_operations, opts) - checks('string', 'table', 'table', '?table') + checks('string', 'table', 'table', { + timeout = '?number', + bucket_id = '?number|cdata', + add_space_schema_hash = '?boolean', + fields = '?table', + }) + local vshard_router = vshard.router.static -- upsert can fail if router uses outdated schema to flatten object opts = utils.merge_options(opts, {add_space_schema_hash = true}) - local tuple, err = utils.flatten_obj_reload(space_name, obj) + local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj) if err ~= nil then return nil, UpsertError:new("Failed to flatten object: %s", err) end - return upsert.tuple(space_name, tuple, user_operations, opts) + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_upsert_on_router, + space_name, tuple, user_operations, opts) end return upsert diff --git a/crud/upsert_many.lua b/crud/upsert_many.lua index 47aa7a11..d27f77e1 100644 --- a/crud/upsert_many.lua +++ b/crud/upsert_many.lua @@ -120,8 +120,8 @@ end -- returns result, err, need_reload -- need_reload indicates if reloading schema could help -- see crud.common.schema.wrap_func_reload() -local function call_upsert_many_on_router(space_name, original_tuples_operation_data, opts) - dev_checks('string', 'table', { +local function call_upsert_many_on_router(vshard_router, space_name, original_tuples_operation_data, opts) + dev_checks('table', 'string', 'table', { timeout = '?number', fields = '?table', add_space_schema_hash = '?boolean', @@ -131,7 +131,6 @@ local function call_upsert_many_on_router(space_name, original_tuples_operation_ opts = opts or {} - local vshard_router = vshard.router.static local space = utils.get_space(space_name, vshard_router:routeall()) if space == nil then return nil, {UpsertManyError:new("Space %q doesn't exist", space_name)}, const.NEED_SCHEMA_RELOAD @@ -168,14 +167,15 @@ local function call_upsert_many_on_router(space_name, original_tuples_operation_ space = space, operations = operations, execute_on_storage_opts = upsert_many_on_storage_opts, + vshard_router = vshard_router, }) if err ~= nil then return nil, {err}, const.NEED_SCHEMA_RELOAD end - local postprocessor = BatchPostprocessor:new() + local postprocessor = BatchPostprocessor:new(vshard_router) - local _, errs = call.map(UPSERT_MANY_FUNC_NAME, nil, { + local _, errs = call.map(vshard_router, UPSERT_MANY_FUNC_NAME, nil, { timeout = opts.timeout, mode = 'write', iter = iter, @@ -234,8 +234,10 @@ function upsert_many.tuples(space_name, tuples_operation_data, opts) rollback_on_error = '?boolean', }) - return schema.wrap_func_reload(sharding.wrap_method, - call_upsert_many_on_router, space_name, tuples_operation_data, opts) + local vshard_router = vshard.router.static + + return schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_upsert_many_on_router, + space_name, tuples_operation_data, opts) end --- Update or insert batch of objects to the specified space @@ -264,6 +266,8 @@ function upsert_many.objects(space_name, objs_operation_data, opts) rollback_on_error = '?boolean', }) + local vshard_router = vshard.router.static + -- upsert can fail if router uses outdated schema to flatten object opts = utils.merge_options(opts, {add_space_schema_hash = true}) @@ -271,7 +275,7 @@ function upsert_many.objects(space_name, objs_operation_data, opts) local format_errs = {} for _, obj_operation_data in ipairs(objs_operation_data) do - local tuple, err = utils.flatten_obj_reload(space_name, obj_operation_data[1]) + local tuple, err = utils.flatten_obj_reload(vshard_router, space_name, obj_operation_data[1]) if err ~= nil then local err_obj = UpsertManyError:new("Failed to flatten object: %s", err) err_obj.operation_data = obj_operation_data[1] @@ -290,7 +294,8 @@ function upsert_many.objects(space_name, objs_operation_data, opts) return nil, format_errs end - local res, errs = upsert_many.tuples(space_name, tuples_operation_data, opts) + local res, errs = schema.wrap_func_reload(vshard_router, sharding.wrap_method, call_upsert_many_on_router, + space_name, tuples_operation_data, opts) if next(format_errs) ~= nil then if errs == nil then diff --git a/test/unit/call_test.lua b/test/unit/call_test.lua index 43aae654..e6a93880 100644 --- a/test/unit/call_test.lua +++ b/test/unit/call_test.lua @@ -61,8 +61,10 @@ end g.test_map_non_existent_func = function() local results, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') - return call.map('non_existent_func', nil, {mode = 'write'}) + + return call.map(vshard.router.static, 'non_existent_func', nil, {mode = 'write'}) ]]) t.assert_equals(results, nil) @@ -72,8 +74,10 @@ end g.test_single_non_existent_func = function() local results, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') - return call.single(1, 'non_existent_func', nil, {mode = 'write'}) + + return call.single(vshard.router.static, 1, 'non_existent_func', nil, {mode = 'write'}) ]]) t.assert_equals(results, nil) @@ -83,8 +87,10 @@ end g.test_map_invalid_mode = function() local results, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') - return call.map('say_hi_politely', nil, {mode = 'invalid'}) + + return call.map(vshard.router.static, 'say_hi_politely', nil, {mode = 'invalid'}) ]]) t.assert_equals(results, nil) @@ -93,8 +99,10 @@ end g.test_single_invalid_mode = function() local results, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') - return call.single(1, 'say_hi_politely', nil, {mode = 'invalid'}) + + return call.single(vshard.router.static, 1, 'say_hi_politely', nil, {mode = 'invalid'}) ]]) t.assert_equals(results, nil) @@ -103,8 +111,10 @@ end g.test_map_no_args = function() local results_map, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') - return call.map('say_hi_politely', nil, {mode = 'write'}) + + return call.map(vshard.router.static, 'say_hi_politely', nil, {mode = 'write'}) ]]) t.assert_equals(err, nil) @@ -115,8 +125,10 @@ end g.test_args = function() local results_map, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') - return call.map('say_hi_politely', {'dokshina'}, {mode = 'write'}) + + return call.map(vshard.router.static, 'say_hi_politely', {'dokshina'}, {mode = 'write'}) ]]) t.assert_equals(err, nil) @@ -129,11 +141,12 @@ g.test_timeout = function() local timeout = 0.2 local results, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') local say_hi_timeout, call_timeout = ... - return call.map('say_hi_sleepily', {say_hi_timeout}, { + return call.map(vshard.router.static, 'say_hi_sleepily', {say_hi_timeout}, { mode = 'write', timeout = call_timeout, }) @@ -147,9 +160,11 @@ end local function check_single_vshard_call(g, exp_vshard_call, opts) g.clear_vshard_calls() local _, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') + local opts = ... - return call.single(1, 'say_hi_politely', {'dokshina'}, opts) + return call.single(vshard.router.static, 1, 'say_hi_politely', {'dokshina'}, opts) ]], {opts}) t.assert_equals(err, nil) local vshard_calls = g.get_vshard_calls() @@ -159,9 +174,11 @@ end local function check_map_vshard_call(g, exp_vshard_call, opts) g.clear_vshard_calls() local _, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') + local opts = ... - return call.map('say_hi_politely', {'dokshina'}, opts) + return call.map(vshard.router.static, 'say_hi_politely', {'dokshina'}, opts) ]], {opts}) t.assert_equals(err, nil) local vshard_calls = g.get_vshard_calls() @@ -249,8 +266,10 @@ end g.test_any_vshard_call = function() g.clear_vshard_calls() local results, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') - return call.any('say_hi_politely', {'dude'}, {}) + + return call.any(vshard.router.static, 'say_hi_politely', {'dude'}, {}) ]]) t.assert_equals(results, 'HI, dude! I am s2-master') @@ -261,11 +280,12 @@ g.test_any_vshard_call_timeout = function() local timeout = 0.2 local results, err = g.cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local call = require('crud.common.call') local say_hi_timeout, call_timeout = ... - return call.any('say_hi_sleepily', {say_hi_timeout}, { + return call.any(vshard.router.static, 'say_hi_sleepily', {say_hi_timeout}, { timeout = call_timeout, }) ]], {timeout + 0.1, timeout})