diff --git a/CHANGELOG.md b/CHANGELOG.md index 66c78f40..7086f6bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## Unreleased + +### Fixed +* Compatibility with vshard configuration if UUIDs are omitted (#407). + ## [1.4.2] - 25-12-23 ### Added diff --git a/crud.lua b/crud.lua index 5d1e1e1d..5c33b927 100644 --- a/crud.lua +++ b/crud.lua @@ -173,12 +173,13 @@ function crud.init_storage() local user = nil if not box.info.ro then - local replicaset_uuid, replicaset = utils.get_self_vshard_replicaset() + local replicaset_key, replicaset = utils.get_self_vshard_replicaset() if replicaset == nil or replicaset.master == nil then - error(string.format('Failed to find a vshard configuration for ' .. - ' replicaset with replicaset_uuid %s.', - replicaset_uuid)) + error(string.format( + 'Failed to find a vshard configuration ' .. + 'for storage replicaset with key %q.', + replicaset_key)) end user = luri.parse(replicaset.master.uri).login or 'guest' end diff --git a/crud/common/call.lua b/crud/common/call.lua index 8fc75e8c..95ed8eb4 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -40,13 +40,13 @@ function call.get_vshard_call_name(mode, prefer_replica, balance) return 'callbre' end -local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, bucket_id) +local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, bucket_id) -- Do not rewrite ShardingHashMismatchError class. if err.class_name == sharding_utils.ShardingHashMismatchError.name then return errors.wrap(err) end - if replicaset_uuid == nil then + if replicaset_id == nil then local replicaset, _ = vshard_router:route(bucket_id) if replicaset == nil then return CallError:new( @@ -54,14 +54,20 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, b ) end - replicaset_uuid = replicaset.uuid + replicaset_id = utils.get_replicaset_id(vshard_router, replicaset) + + if replicaset_id == nil then + return CallError:new( + "Function returned an error, but we couldn't figure out the replicaset id: %s", err + ) + end end - err = utils.update_storage_call_error_description(err, func_name, replicaset_uuid) + err = utils.update_storage_call_error_description(err, func_name, replicaset_id) err = errors.wrap(err) return CallError:new(utils.format_replicaset_error( - replicaset_uuid, "Function returned an error: %s", err + replicaset_id, "Function returned an error: %s", err )) end @@ -104,13 +110,13 @@ function call.map(vshard_router, func_name, func_args, opts) local futures_by_replicasets = {} local call_opts = {is_async = true} while iter:has_next() do - local args, replicaset = iter:get() + local args, replicaset, replicaset_id = iter:get() local future = replicaset[vshard_call_name](replicaset, func_name, args, call_opts) - futures_by_replicasets[replicaset.uuid] = future + futures_by_replicasets[replicaset_id] = future end local deadline = fiber_clock() + timeout - for replicaset_uuid, future in pairs(futures_by_replicasets) do + for replicaset_id, future in pairs(futures_by_replicasets) do local wait_timeout = deadline - fiber_clock() if wait_timeout < 0 then wait_timeout = 0 @@ -119,14 +125,14 @@ function call.map(vshard_router, func_name, func_args, opts) local result, err = future:wait_result(wait_timeout) local result_info = { - key = replicaset_uuid, + key = replicaset_id, value = result, } local err_info = { err_wrapper = wrap_vshard_err, err = err, - wrapper_args = {func_name, replicaset_uuid}, + wrapper_args = {func_name, replicaset_id}, } local early_exit = postprocessor:collect(result_info, err_info) @@ -179,13 +185,13 @@ function call.any(vshard_router, func_name, func_args, opts) if replicasets == nil then return nil, CallError:new("Failed to get router replicasets: %s", err.err) end - local replicaset = select(2, next(replicasets)) + local replicaset_id, replicaset = next(replicasets) local res, err = replicaset:call(func_name, func_args, { timeout = timeout, }) if err ~= nil then - return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset.uuid) + return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id) end if res == box.NULL then diff --git a/crud/common/map_call_cases/base_iter.lua b/crud/common/map_call_cases/base_iter.lua index 0fd0e00d..452e1599 100644 --- a/crud/common/map_call_cases/base_iter.lua +++ b/crud/common/map_call_cases/base_iter.lua @@ -66,11 +66,13 @@ end -- -- @return[1] table func_args -- @return[2] table replicaset +-- @return[3] string replicaset_id function BaseIterator:get() + local replicaset_id = self.next_index local replicaset = self.next_replicaset self.next_index, self.next_replicaset = next(self.replicasets, self.next_index) - return self.func_args, replicaset + return self.func_args, replicaset, replicaset_id end return BaseIterator diff --git a/crud/common/map_call_cases/batch_insert_iter.lua b/crud/common/map_call_cases/batch_insert_iter.lua index 5fcbce14..37867f1b 100644 --- a/crud/common/map_call_cases/batch_insert_iter.lua +++ b/crud/common/map_call_cases/batch_insert_iter.lua @@ -40,7 +40,7 @@ function BatchInsertIterator:new(opts) return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err) end - local next_replicaset, next_batch = next(sharding_data.batches) + local next_index, next_batch = next(sharding_data.batches) local execute_on_storage_opts = opts.execute_on_storage_opts execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash @@ -51,7 +51,7 @@ function BatchInsertIterator:new(opts) space_name = opts.space.name, opts = execute_on_storage_opts, batches_by_replicasets = sharding_data.batches, - next_index = next_replicaset, + next_index = next_index, next_batch = next_batch, } @@ -67,8 +67,10 @@ end -- -- @return[1] table func_args -- @return[2] table replicaset +-- @return[3] string replicaset_id function BatchInsertIterator:get() - local replicaset = self.next_index + local replicaset_id = self.next_index + local replicaset = self.next_batch.replicaset local func_args = { self.space_name, self.next_batch.tuples, @@ -77,7 +79,7 @@ function BatchInsertIterator:get() self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset + return func_args, replicaset, replicaset_id end return BatchInsertIterator diff --git a/crud/common/map_call_cases/batch_upsert_iter.lua b/crud/common/map_call_cases/batch_upsert_iter.lua index fc393f1a..d25e3ee8 100644 --- a/crud/common/map_call_cases/batch_upsert_iter.lua +++ b/crud/common/map_call_cases/batch_upsert_iter.lua @@ -48,7 +48,7 @@ function BatchUpsertIterator:new(opts) return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err) end - local next_replicaset, next_batch = next(sharding_data.batches) + local next_index, next_batch = next(sharding_data.batches) local execute_on_storage_opts = opts.execute_on_storage_opts execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash @@ -59,7 +59,7 @@ function BatchUpsertIterator:new(opts) space_name = opts.space.name, opts = execute_on_storage_opts, batches_by_replicasets = sharding_data.batches, - next_index = next_replicaset, + next_index = next_index, next_batch = next_batch, } @@ -75,8 +75,10 @@ end -- -- @return[1] table func_args -- @return[2] table replicaset +-- @return[3] string replicaset_id function BatchUpsertIterator:get() - local replicaset = self.next_index + local replicaset_id = self.next_index + local replicaset = self.next_batch.replicaset local func_args = { self.space_name, self.next_batch.tuples, @@ -86,7 +88,7 @@ function BatchUpsertIterator:get() self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset + return func_args, replicaset, replicaset_id end return BatchUpsertIterator diff --git a/crud/common/schema.lua b/crud/common/schema.lua index 387d37bf..6137cc91 100644 --- a/crud/common/schema.lua +++ b/crud/common/schema.lua @@ -8,6 +8,7 @@ local ReloadSchemaError = errors.new_class('ReloadSchemaError', {capture_stack = local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') +local utils = require('crud.common.vshard_utils') local schema = {} @@ -234,7 +235,8 @@ function schema.wrap_func_result(space, func, args, opts) replica_schema_version = box.internal.schema_version() end result.storage_info = { - replica_uuid = box.info().uuid, + replica_uuid = box.info().uuid, -- Backward compatibility. + replica_id = utils.get_self_vshard_replica_id(), -- Replacement for replica_uuid. replica_schema_version = replica_schema_version, } end diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index b5842917..72c4dfe7 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -18,8 +18,13 @@ function sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id) return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err) end + local replicaset_id = utils.get_replicaset_id(vshard_router, replicaset) + if replicaset_id == nil then + return nil, GetReplicasetsError:new("Failed to get replicaset id for bucket_id %s replicaset", bucket_id) + end + return { - [replicaset.uuid] = replicaset, + [replicaset_id] = replicaset, } end @@ -206,8 +211,8 @@ end -- Specified space -- -- @return[1] batches --- Map where key is a replicaset and value --- is table of tuples related to this replicaset +-- Map where key is a replicaset id and value +-- is replicaset and table of tuples related to this replicaset function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) dev_checks('table', 'table', 'table', { operations = '?table', @@ -247,7 +252,17 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) sharding_data.bucket_id, err.err) end - local record_by_replicaset = batches[replicaset] or {tuples = {}} + local replicaset_id = utils.get_replicaset_id(vshard_router, replicaset) + if replicaset_id == nil then + return nil, GetReplicasetsError:new( + "Failed to get replicaset id for bucket_id %s replicaset", + sharding_data.bucket_id) + end + + local record_by_replicaset = batches[replicaset_id] or { + replicaset = replicaset, + tuples = {}, + } table.insert(record_by_replicaset.tuples, tuple) if opts.operations ~= nil then @@ -255,7 +270,7 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) table.insert(record_by_replicaset.operations, opts.operations[i]) end - batches[replicaset] = record_by_replicaset + batches[replicaset_id] = record_by_replicaset end return { diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 7d624d1f..cc542aee 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -101,30 +101,31 @@ function utils.table_count(table) return cnt end -function utils.format_replicaset_error(replicaset_uuid, msg, ...) +function utils.format_replicaset_error(replicaset_id, msg, ...) dev_checks("string", "string") return string.format( "Failed for %s: %s", - replicaset_uuid, + replicaset_id, string.format(msg, ...) ) end -local function get_replicaset_by_replica_uuid(replicasets, uuid) - for _, replicaset in pairs(replicasets) do - for _, replica in pairs(replicaset.replicas) do - if replica.uuid == uuid then - return replicaset +local function get_replicaset_by_replica_id(replicasets, id) + for replicaset_id, replicaset in pairs(replicasets) do + for replica_id, _ in pairs(replicaset.replicas) do + if replica_id == id then + return replicaset_id, replicaset end end end - return nil + return nil, nil end -function utils.get_spaces(vshard_router, timeout, replica_uuid) - local replicasets, replicaset +function utils.get_spaces(vshard_router, timeout, replica_id) + local replicasets, replicaset, replicaset_id + timeout = timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local deadline = fiber.clock() + timeout while ( @@ -135,15 +136,15 @@ function utils.get_spaces(vshard_router, timeout, replica_uuid) -- Try to get master with timeout. fiber.yield() replicasets = vshard_router:routeall() - if replica_uuid ~= nil then + if replica_id ~= nil then -- Get the same replica on which the last DML operation was performed. -- This approach is temporary and is related to [1], [2]. -- [1] https://github.com/tarantool/crud/issues/236 -- [2] https://github.com/tarantool/crud/issues/361 - replicaset = get_replicaset_by_replica_uuid(replicasets, replica_uuid) + replicaset_id, replicaset = get_replicaset_by_replica_id(replicasets, replica_id) break else - replicaset = select(2, next(replicasets)) + replicaset_id, replicaset = next(replicasets) end if replicaset ~= nil and replicaset.master ~= nil and @@ -162,22 +163,22 @@ function utils.get_spaces(vshard_router, timeout, replica_uuid) local error_msg = string.format( 'The master was not found in replicaset %s, ' .. 'check status of the master and repeat the operation later', - replicaset.uuid) + replicaset_id) return nil, GetSpaceError:new(error_msg) end if replicaset.master.conn.error ~= nil then local error_msg = string.format( 'The connection to the master of replicaset %s is not valid: %s', - replicaset.uuid, replicaset.master.conn.error) + replicaset_id, replicaset.master.conn.error) return nil, GetSpaceError:new(error_msg) end return replicaset.master.conn.space, nil, replicaset.master.conn.schema_version end -function utils.get_space(space_name, vshard_router, timeout, replica_uuid) - local spaces, err, schema_version = utils.get_spaces(vshard_router, timeout, replica_uuid) +function utils.get_space(space_name, vshard_router, timeout, replica_id) + local spaces, err, schema_version = utils.get_spaces(vshard_router, timeout, replica_id) if spaces == nil then return nil, err @@ -208,20 +209,30 @@ function utils.fetch_latest_metadata_when_single_storage(space, space_name, netb -- [1] https://github.com/tarantool/crud/issues/236 -- [2] https://github.com/tarantool/crud/issues/361 local latest_space, err + assert(storage_info.replica_schema_version ~= nil, 'check the replica_schema_version value from storage ' .. 'for correct use of the fetch_latest_metadata opt') - assert(storage_info.replica_uuid ~= nil, - 'check the replica_uuid value from storage ' .. - 'for correct use of the fetch_latest_metadata opt') + + local replica_id + if storage_info.replica_id == nil then -- Backward compatibility. + assert(storage_info.replica_uuid ~= nil, + 'check the replica_uuid value from storage ' .. + 'for correct use of the fetch_latest_metadata opt') + replica_id = storage_info.replica_uuid + else + replica_id = storage_info.replica_id + end + assert(netbox_schema_version ~= nil, 'check the netbox_schema_version value from net_box conn on router ' .. 'for correct use of the fetch_latest_metadata opt') + if storage_info.replica_schema_version ~= netbox_schema_version then local ok, reload_schema_err = schema.reload_schema(vshard_router) if ok then latest_space, err = utils.get_space(space_name, vshard_router, - opts.timeout, storage_info.replica_uuid) + opts.timeout, replica_id) if err ~= nil then local warn_msg = "Failed to fetch space for latest schema actualization, metadata may be outdated: %s" log.warn(warn_msg, err) @@ -1053,7 +1064,7 @@ function utils.check_name_isident(name) return true end -function utils.update_storage_call_error_description(err, func_name, replicaset_uuid) +function utils.update_storage_call_error_description(err, func_name, replicaset_id) if err == nil then return nil end @@ -1067,7 +1078,7 @@ function utils.update_storage_call_error_description(err, func_name, replicaset_ err = NotInitializedError:new("Function %s is not registered: " .. "crud isn't initialized on replicaset %q or crud module versions mismatch " .. "between router and storage", - func_name, replicaset_uuid or "Unknown") + func_name, replicaset_id or "Unknown") else err = NotInitializedError:new("Function %s is not registered", func_name) end @@ -1123,7 +1134,7 @@ end -- @tparam ?string|table opts.vshard_router -- Cartridge vshard group name or vshard router instance. -- --- @return a table of storage states by replica uuid. +-- @return a table of storage states by replica id. function utils.storage_info(opts) opts = opts or {} @@ -1138,35 +1149,35 @@ function utils.storage_info(opts) end local futures_by_replicas = {} - local replica_state_by_uuid = {} + local replica_state_by_id = {} local async_opts = {is_async = true} local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT for _, replicaset in pairs(replicasets) do - for _, replica in pairs(replicaset.replicas) do - replica_state_by_uuid[replica.uuid] = { + for replica_id, replica in pairs(replicaset.replicas) do + replica_state_by_id[replica_id] = { status = "error", is_master = replicaset.master == replica } local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME, {}, async_opts) if ok then - futures_by_replicas[replica.uuid] = res + futures_by_replicas[replica_id] = res else - local err_msg = string.format("Error getting storage info for %s", replica.uuid) + local err_msg = string.format("Error getting storage info for %s", replica_id) if res ~= nil then log.error("%s: %s", err_msg, res) - replica_state_by_uuid[replica.uuid].message = tostring(res) + replica_state_by_id[replica_id].message = tostring(res) else log.error(err_msg) - replica_state_by_uuid[replica.uuid].message = err_msg + replica_state_by_id[replica_id].message = err_msg end end end end local deadline = fiber.clock() + timeout - for replica_uuid, future in pairs(futures_by_replicas) do + for replica_id, future in pairs(futures_by_replicas) do local wait_timeout = deadline - fiber.clock() if wait_timeout < 0 then wait_timeout = 0 @@ -1175,24 +1186,24 @@ function utils.storage_info(opts) local result, err = future:wait_result(wait_timeout) if result == nil then future:discard() - local err_msg = string.format("Error getting storage info for %s", replica_uuid) + local err_msg = string.format("Error getting storage info for %s", replica_id) if err ~= nil then if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then - replica_state_by_uuid[replica_uuid].status = "uninitialized" + replica_state_by_id[replica_id].status = "uninitialized" else log.error("%s: %s", err_msg, err) - replica_state_by_uuid[replica_uuid].message = tostring(err) + replica_state_by_id[replica_id].message = tostring(err) end else log.error(err_msg) - replica_state_by_uuid[replica_uuid].message = err_msg + replica_state_by_id[replica_id].message = err_msg end else - replica_state_by_uuid[replica_uuid].status = result[1].status or "uninitialized" + replica_state_by_id[replica_id].status = result[1].status or "uninitialized" end end - return replica_state_by_uuid + return replica_state_by_id end --- Storage status information. @@ -1314,29 +1325,8 @@ function utils.is_cartridge_hotreload_supported() return true, cartridge_hotreload end -function utils.get_self_vshard_replicaset() - local box_info = box.info() - - local ok, storage_info = pcall(vshard.storage.info) - assert(ok, 'vshard.storage.cfg() must be called first') - - local replicaset_uuid - if box_info.replicaset ~= nil then - replicaset_uuid = box_info.replicaset.uuid - else - replicaset_uuid = box_info.cluster.uuid - end - - local replicaset - -- Identification key may be name since vshard 0.1.25. - -- See also https://github.com/tarantool/vshard/issues/460. - for _, v in pairs(storage_info.replicasets) do - if v.uuid == replicaset_uuid then - replicaset = v - end - end - - return replicaset_uuid, replicaset +for k, v in pairs(require('crud.common.vshard_utils')) do + utils[k] = v end return utils diff --git a/crud/common/vshard_utils.lua b/crud/common/vshard_utils.lua new file mode 100644 index 00000000..f19ddca3 --- /dev/null +++ b/crud/common/vshard_utils.lua @@ -0,0 +1,56 @@ +local vshard = require('vshard') + +local vshard_utils = {} + +function vshard_utils.get_self_vshard_replicaset() + local box_info = box.info() + + local ok, storage_info = pcall(vshard.storage.info) + assert(ok, 'vshard.storage.cfg() must be called first') + + if vshard_utils.get_vshard_identification_mode() == 'name_as_key' then + local replicaset_name = box_info.replicaset.name + + return replicaset_name, storage_info.replicasets[replicaset_name] + else + local replicaset_uuid + if box_info.replicaset ~= nil then + replicaset_uuid = box_info.replicaset.uuid + else + replicaset_uuid = box_info.cluster.uuid + end + + return replicaset_uuid, storage_info.replicasets[replicaset_uuid] + end +end + +function vshard_utils.get_self_vshard_replica_id() + local box_info = box.info() + + if vshard_utils.get_vshard_identification_mode() == 'name_as_key' then + return box_info.name + else + return box_info.uuid + end +end + +function vshard_utils.get_replicaset_id(vshard_router, replicaset) + -- https://github.com/tarantool/vshard/issues/460. + local known_replicasets = vshard_router:routeall() + + for known_replicaset_id, known_replicaset in pairs(known_replicasets) do + if known_replicaset == replicaset then + return known_replicaset_id + end + end + + return nil +end + +function vshard_utils.get_vshard_identification_mode() + -- https://github.com/tarantool/vshard/issues/460. + assert(vshard.storage.internal.current_cfg ~= nil, 'available only on vshard storage') + return vshard.storage.internal.current_cfg.identification_mode +end + +return vshard_utils diff --git a/crud/readview.lua b/crud/readview.lua index e5c5ece0..cb7c8c7f 100644 --- a/crud/readview.lua +++ b/crud/readview.lua @@ -51,24 +51,35 @@ local function readview_open_on_storage(readview_name) ReadviewError:assert(false, ("Error creating readview")) end - local replica_info = {} - replica_info.uuid = box.info().uuid - replica_info.id = read_view.id + return { + id = read_view.id, - return replica_info, nil + uuid = box.info().uuid, -- Backward compatibility. + replica_id = utils.get_self_vshard_replica_id(), -- Replacement for uuid. + } end -local function readview_close_on_storage(readview_uuid) +local function readview_close_on_storage(info) dev_checks('table') - local list = box.read_view.list() + local replica_id = utils.get_self_vshard_replica_id() + local readview_id - for _, replica_info in pairs(readview_uuid) do - if replica_info.uuid == box.info().uuid then + for _, replica_info in pairs(info) do + local found = false + + if replica_info.replica_id == replica_id then + found = true + elseif replica_info.uuid == box.info().uuid then -- Backward compatibility. + found = true + end + + if found then readview_id = replica_info.id end end + local list = box.read_view.list() for k,v in pairs(list) do if v.id == readview_id then list[k]:close() @@ -106,7 +117,8 @@ local function select_readview_on_storage(space_name, index_id, conditions, opts replica_schema_version = box.internal.schema_version() end cursor.storage_info = { - replica_uuid = box.info().uuid, + replica_uuid = box.info().uuid, -- Backward compatibility. + replica_id = utils.get_self_vshard_replica_id(), -- Replacement for replica_uuid. replica_schema_version = replica_schema_version, } end @@ -206,7 +218,7 @@ local select_call = stats.wrap(select.call, stats.op.SELECT) function Readview_obj:select(space_name, user_conditions, opts) opts = opts or {} opts.readview = true - opts.readview_uuid = self._uuid + opts.readview_info = self._info if self.opened == false then return nil, ReadviewError:new("Read view is closed") @@ -220,7 +232,7 @@ local pairs_call = stats.wrap(select.pairs, stats.op.SELECT, {pairs = true}) function Readview_obj:pairs(space_name, user_conditions, opts) opts = opts or {} opts.readview = true - opts.readview_uuid = self._uuid + opts.readview_info = self._info if self.opened == false then return nil, ReadviewError:new("Read view is closed") @@ -259,21 +271,39 @@ function Readview_obj:close(opts) end local errors = {} - for _, replicaset in pairs(replicasets) do - for _, replica in pairs(replicaset.replicas) do - for _, value in pairs(self._uuid) do - if replica.uuid == value.uuid then - local replica_result, replica_err = replica.conn:call(CRUD_CLOSE_FUNC_NAME, - {self._uuid}, {timeout = opts.timeout}) - if replica_err ~= nil then - table.insert(errors, ReadviewError:new("Failed to close Readview on storage: %s", replica_err)) - end - if replica_err == nil and (not replica_result) then - table.insert(errors, ReadviewError:new("Readview was not found on storage: %s", replica.uuid)) - end - end + for replicaset_id, replicaset in pairs(replicasets) do + local replicaset_info = self._info[replicaset_id] + + if replicaset_info == nil then + goto next_replicaset + end + + for replica_id, replica in pairs(replicaset.replicas) do + local found = false + + if replicaset_info.replica_id == replica_id then + found = true + elseif replicaset_info.uuid == replica.uuid then -- Backward compatibility. + found = true end + + if not found then + goto next_replica + end + + local replica_result, replica_err = replica.conn:call(CRUD_CLOSE_FUNC_NAME, + {self._info}, {timeout = opts.timeout}) + if replica_err ~= nil then + table.insert(errors, ReadviewError:new("Failed to close Readview on storage: %s", replica_err)) + end + if replica_err == nil and (not replica_result) then + table.insert(errors, ReadviewError:new("Readview was not found on storage: %s", replica_id)) + end + + ::next_replica:: end + + ::next_replicaset:: end if next(errors) ~= nil then @@ -303,28 +333,26 @@ function Readview_obj.create(vshard_router, opts) setmetatable(readview, Readview_obj) readview._name = opts.name - local results, err, err_uuid = vshard_router:map_callrw(CRUD_OPEN_FUNC_NAME, + local results, err, err_id = vshard_router:map_callrw(CRUD_OPEN_FUNC_NAME, {readview._name}, {timeout = opts.timeout}) if err ~= nil then - return nil, - ReadviewError:new("Failed to call readview_open_on_storage on storage-side: storage uuid: %s err: %s", - err_uuid, err) + return nil, ReadviewError:new( + "Failed to call readview_open_on_storage on storage-side: storage id: %s err: %s", + err_id, err + ) end - local uuid = {} - local errors = {} - for _, replicaset_results in pairs(results) do - for _, replica_result in pairs(replicaset_results) do - table.insert(uuid, replica_result) - end + -- map_callrw response format: + -- {replicaset_id1 = {res1}, replicaset_id2 = {res2}, ...} + local info = {} + for replicaset_id, replicaset_results in pairs(results) do + local _, replica_info = next(replicaset_results) + info[replicaset_id] = replica_info end - readview._uuid = uuid + readview._info = info readview.opened = true - if next(errors) ~= nil then - return nil, errors - end return readview, nil end diff --git a/crud/select.lua b/crud/select.lua index ffb850cc..a095b589 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -53,7 +53,8 @@ local function select_on_storage(space_name, index_id, conditions, opts) replica_schema_version = box.internal.schema_version() end cursor.storage_info = { - replica_uuid = box.info().uuid, + replica_uuid = box.info().uuid, -- Backward compatibility. + replica_id = utils.get_self_vshard_replica_id(), -- Replacement for replica_uuid. replica_schema_version = replica_schema_version, } end diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index cc309b70..abd40c19 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -31,7 +31,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, yield_every = '?number', call_opts = 'table', readview = '?boolean', - readview_uuid = '?table', + readview_info = '?table', }) opts = opts or {} @@ -178,7 +178,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, local merger if opts.readview then - merger = Merger.new_readview(vshard_router, replicasets_to_select, opts.readview_uuid, + merger = Merger.new_readview(vshard_router, replicasets_to_select, opts.readview_info, space, plan.index_id, common.READVIEW_SELECT_FUNC_NAME, {space_name, plan.index_id, plan.conditions, select_opts}, {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts} @@ -215,7 +215,7 @@ function select_module.pairs(space_name, user_conditions, opts) balance = '?boolean', timeout = '?number', readview = '?boolean', - readview_uuid = '?table', + readview_info = '?table', vshard_router = '?string|table', @@ -267,7 +267,7 @@ function select_module.pairs(space_name, user_conditions, opts) fetch_latest_metadata = opts.fetch_latest_metadata, }, readview = opts.readview, - readview_uuid = opts.readview_uuid, + readview_info = opts.readview_info, } local iter, err = schema.wrap_func_reload( @@ -332,7 +332,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, balance = '?boolean', timeout = '?number', readview = '?boolean', - readview_uuid = '?table', + readview_info = '?table', vshard_router = '?string|table', @@ -379,7 +379,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, fetch_latest_metadata = opts.fetch_latest_metadata, }, readview = opts.readview, - readview_uuid = opts.readview_uuid, + readview_info = opts.readview_info, } local iter, err = schema.wrap_func_reload( diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 930ba454..009f6dce 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -67,6 +67,7 @@ local function select_iteration(space_name, plan, opts) end local tuples = {} + -- Old select works with vshard without `name_as_key` support. for replicaset_uuid, replicaset_results in pairs(results) do -- Stats extracted with callback here and not passed -- outside to wrapper because fetch for pairs can be diff --git a/crud/select/merger.lua b/crud/select/merger.lua index 46dbcff5..b8e358e1 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -244,44 +244,62 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_ end -local function new_readview(vshard_router, replicasets, readview_uuid, space, index_id, func_name, func_args, opts) +local function new_readview(vshard_router, replicasets, readview_info, space, index_id, func_name, func_args, opts) opts = opts or {} local call_opts = opts.call_opts -- Request a first data chunk and create merger sources. local merger_sources = {} - for _, replicaset in pairs(replicasets) do - for replica_uuid, replica in pairs(replicaset.replicas) do - for _, value in pairs(readview_uuid) do - if replica_uuid == value.uuid then - -- Perform a request. - local buf = buffer.ibuf() - local net_box_opts = {is_async = true, buffer = buf, - skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} - func_args[4].readview_id = value.id - local future = replica.conn:call(func_name, func_args, net_box_opts) - - -- Create a source. - local context = { - net_box_opts = net_box_opts, - buffer = buf, - func_name = func_name, - func_args = func_args, - replicaset = replicaset, - vshard_call_name = nil, - timeout = call_opts.timeout, - fetch_latest_metadata = call_opts.fetch_latest_metadata, - space_name = space.name, - vshard_router = vshard_router, - readview = true, - future_replica = replica - } - local state = {future = future} - local source = merger_lib.new_buffer_source(fetch_chunk, context, state) - table.insert(merger_sources, source) - end + for replicaset_id, replicaset in pairs(replicasets) do + local replicaset_info = readview_info[replicaset_id] + + if replicaset_info == nil then + goto next_replicaset + end + + for replica_id, replica in pairs(replicaset.replicas) do + local found = false + + if replicaset_info.replica_id == replica_id then + found = true + elseif replicaset_info.uuid == replica.uuid then -- Backward compatibility. + found = true end + + if not found then + goto next_replica + end + + -- Perform a request. + local buf = buffer.ibuf() + local net_box_opts = {is_async = true, buffer = buf, + skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} + func_args[4].readview_id = replicaset_info.id + local future = replica.conn:call(func_name, func_args, net_box_opts) + + -- Create a source. + local context = { + net_box_opts = net_box_opts, + buffer = buf, + func_name = func_name, + func_args = func_args, + replicaset = replicaset, + vshard_call_name = nil, + timeout = call_opts.timeout, + fetch_latest_metadata = call_opts.fetch_latest_metadata, + space_name = space.name, + vshard_router = vshard_router, + readview = true, + future_replica = replica + } + local state = {future = future} + local source = merger_lib.new_buffer_source(fetch_chunk, context, state) + table.insert(merger_sources, source) + + ::next_replica:: end + + ::next_replicaset:: end -- Trick for performance. diff --git a/test/helper.lua b/test/helper.lua index 3d044f70..1f823c0b 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -832,4 +832,41 @@ function helpers.schema_compatibility(schema) return schema end +function helpers.string_replace(base, old_fragment, new_fragment) + local i, j = base:find(old_fragment) + + if i == nil then + return base + end + + local prefix = '' + if i > 1 then + prefix = base:sub(1, i - 1) + end + + local suffix = '' + if j < base:len() then + suffix = base:sub(j + 1, base:len()) + end + + return prefix .. new_fragment .. suffix +end + +function helpers.assert_string_contains_pattern_with_replicaset_id(str, pattern) + local uuid_pattern = "%w+%-0000%-0000%-0000%-00000000000%d" + local name_pattern = "s%-%d" -- All existing clusters uses this pattern, but this assumption is unstable. + + local found = false + for _, id_pattern in pairs({uuid_pattern, name_pattern}) do + -- pattern is expected to be like "Failed for [replicaset_id]". + local full_pattern = helpers.string_replace('[replicaset_id]', id_pattern) + if str:find(full_pattern) ~= nil then + found = true + break + end + end + + t.assert(found, ("string %q does not contain pattern %q"):format(str, pattern)) +end + return helpers diff --git a/test/integration/select_readview_test.lua b/test/integration/select_readview_test.lua index f5efb5d8..6a003d9a 100644 --- a/test/integration/select_readview_test.lua +++ b/test/integration/select_readview_test.lua @@ -2308,10 +2308,18 @@ pgroup.test_stop_select = function(g) g.cluster:server('s2-master'):start() if g.params.backend == helpers.backend.VSHARD then - g.cluster:server('s2-master'):exec(function(cfg) - require('vshard.storage').cfg(cfg, box.info.uuid) + local bootstrap_key + if type(g.params.backend_cfg) == 'table' + and g.params.backend_cfg.identification_mode == 'name_as_key' then + bootstrap_key = 'name' + else + bootstrap_key = 'uuid' + end + + g.cluster:server('s2-master'):exec(function(cfg, bootstrap_key) + require('vshard.storage').cfg(cfg, box.info[bootstrap_key]) require('crud').init_storage() - end, {g.cfg}) + end, {g.cfg, bootstrap_key}) end local _, err = g.cluster.main_server.net_box:eval([[ diff --git a/test/integration/storages_state_test.lua b/test/integration/storages_state_test.lua index 10ccf41a..9b7f348a 100644 --- a/test/integration/storages_state_test.lua +++ b/test/integration/storages_state_test.lua @@ -49,30 +49,63 @@ pgroup.after_all(function(g) helpers.stop_cluster(g.cluster, g.params.backend) end) +local function build_storage_info(g, array_info) + local is_vshard = g.params.backend == 'vshard' + local name_as_key = is_vshard and ( + type(g.params.backend_cfg) == 'table' + and g.params.backend_cfg.identification_mode == 'name_as_key' + ) + + local keys + if name_as_key then + keys = { + 's1-master', + 's1-replica', + 's2-master', + 's2-replica', + } + else + keys = { + helpers.uuid('b', 1), + helpers.uuid('b', 10), + helpers.uuid('c', 1), + helpers.uuid('c', 10), + } + end + + local res = {} + for i, v in ipairs(array_info) do + res[keys[i]] = v + end + + return res +end + pgroup.test_crud_storage_status_of_stopped_servers = function(g) g.cluster:server("s2-replica"):stop() local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) t.assert_equals(err, nil, "Error getting storages info") - t.assert_equals(results, { - [helpers.uuid('b', 1)] = { + + t.assert_equals(results, build_storage_info(g, { + { status = "running", is_master = true }, - [helpers.uuid('b', 10)] = { + { status = "running", is_master = false }, - [helpers.uuid('c', 1)] = { + { status = "running", is_master = true }, - [helpers.uuid('c', 10)] = { + { status = "error", is_master = false, message = "Peer closed" - } - }) + }, + })) end pgroup.after_test('test_crud_storage_status_of_stopped_servers', function(g) @@ -97,24 +130,24 @@ pgroup.test_disabled_storage_role = function(g) local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) t.assert_equals(err, nil, "Error getting storages info") - t.assert_equals(results, { - [helpers.uuid('b', 1)] = { + t.assert_equals(results, build_storage_info(g, { + { status = "running", is_master = true }, - [helpers.uuid('b', 10)] = { + { status = "uninitialized", is_master = false }, - [helpers.uuid('c', 1)] = { + { status = "running", is_master = true }, - [helpers.uuid('c', 10)] = { + { status = "running", - is_master = false - } - }) + is_master = false, + }, + })) end pgroup.after_test('test_disabled_storage_role', function(g) @@ -138,26 +171,25 @@ pgroup.test_storage_call_failure = function(g) local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) t.assert_equals(err, nil, "Error getting storages info") - - t.assert_equals(results, { - [helpers.uuid('b', 1)] = { + t.assert_equals(results, build_storage_info(g, { + { status = "running", is_master = true }, - [helpers.uuid('b', 10)] = { + { status = "running", is_master = false }, - [helpers.uuid('c', 1)] = { + { status = "running", is_master = true }, - [helpers.uuid('c', 10)] = { + { status = "error", is_master = false, message = "attempt to call a table value" - } - }) + }, + })) end pgroup.after_test('test_storage_call_failure', function(g) diff --git a/test/unit/call_test.lua b/test/unit/call_test.lua index 9405e200..319d53b5 100644 --- a/test/unit/call_test.lua +++ b/test/unit/call_test.lua @@ -93,7 +93,7 @@ pgroup.test_map_non_existent_func = function(g) ]]) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") t.assert_str_contains(err.err, "Function non_existent_func is not registered") end @@ -106,7 +106,7 @@ pgroup.test_single_non_existent_func = function(g) ]]) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") t.assert_str_contains(err.err, "Function non_existent_func is not registered") end @@ -178,7 +178,7 @@ pgroup.test_timeout = function(g) ]], {timeout + 0.1, timeout}) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") helpers.assert_timeout_error(err.err) end @@ -316,6 +316,6 @@ pgroup.test_any_vshard_call_timeout = function(g) ]], {timeout + 0.1, timeout}) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") helpers.assert_timeout_error(err.err) end diff --git a/test/unit/not_initialized_test.lua b/test/unit/not_initialized_test.lua index cdd6ae20..2e40a23b 100644 --- a/test/unit/not_initialized_test.lua +++ b/test/unit/not_initialized_test.lua @@ -60,7 +60,7 @@ pgroup.test_insert = function(g) ]]) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") t.assert_str_contains(err.err, "crud isn't initialized on replicaset") end diff --git a/test/vshard_helpers/vtest.lua b/test/vshard_helpers/vtest.lua index 0097d919..7dd2e82c 100644 --- a/test/vshard_helpers/vtest.lua +++ b/test/vshard_helpers/vtest.lua @@ -150,7 +150,7 @@ local function config_new(templ, additional_cfg) end if res.identification_mode == 'name_as_key' then - replica.uuid = replica_uuid + replica.uuid = replica_uuid -- Optional, will clean up from 'sharding' later. replicas[replica_name] = replica else replica.name = replica_name @@ -159,7 +159,7 @@ local function config_new(templ, additional_cfg) end if res.identification_mode == 'name_as_key' then - replicaset.uuid = replicaset_uuid + replicaset.uuid = replicaset_uuid -- Optional, will clean up from 'sharding' later. sharding[replicaset_name] = replicaset else replicaset.name = replicaset_name @@ -245,7 +245,7 @@ local function cluster_bootstrap(g, cfg) local rs_uuid if cfg.identification_mode == 'name_as_key' then - rs_uuid = rs.uuid + rs_uuid = replicaset_name_to_uuid(rs_id) else rs_uuid = rs_id end @@ -353,6 +353,22 @@ local function cluster_wait_vclock_all(g) end end +local function strip_uuid_from_sharding_if_name_is_key(cfg) + if cfg.identification_mode ~= 'name_as_key' then + return cfg + end + + for _, replicaset in pairs(cfg.sharding) do + replicaset.uuid = nil + + for _, replica in pairs(replicaset.replicas) do + replica.uuid = nil + end + end + + return cfg +end + -- -- Build new cluster by a given config. -- @@ -435,6 +451,11 @@ local function cluster_new(g, cfg) replica:start({wait_for_readiness = false}) end + -- UUIDs are optional in vshard configuration in + -- Tarantool >= 3.0 + vshard >= 0.1.25 + identification_mode == 'name_as_key'. + -- With tests, we want to ensure that we don't rely on it. + cfg = strip_uuid_from_sharding_if_name_is_key(cfg) + for _, master in pairs(masters) do master:wait_for_readiness() master:exec(function(cfg)