From c071be3e5cc5b7a988f88136dade8ed6afae7d19 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Thu, 28 Dec 2023 17:25:35 +0300 Subject: [PATCH] crud: support vshard with no UUIDs in config PR #404 has introduced vshard 0.1.25 + Tarantool 3.0 "name as key" identification mode based on UUIDs extraction. If works fine if vshard configuration (or Tarantool 3.0 configuration which builds vshard one) provides UUIDs, but fails if it isn't. Since UUIDs are optional and won't be provided in most cases, it makes crud fails to work on most Tarantool 3.0 vshard clusters. This patch fixes the issue. Now the code uses name as key, if corresponding mode is enabled, and uuid otherwise. Patch doesn't cover `select_old` since it runs only on pre-3.0 Tarantool. Unfortunately, code relies on vshard internals since now there is no other way [1]. This patch covers new mode support for readview code as well. It likely was broken before this patch even if UUIDs were provided. 1. https://github.com/tarantool/vshard/issues/460 Follows #404 Closes #407 --- CHANGELOG.md | 5 + crud.lua | 9 +- crud/common/call.lua | 30 +++-- crud/common/map_call_cases/base_iter.lua | 4 +- .../map_call_cases/batch_insert_iter.lua | 10 +- .../map_call_cases/batch_upsert_iter.lua | 10 +- crud/common/schema.lua | 4 +- crud/common/sharding/init.lua | 25 +++- crud/common/utils.lua | 112 ++++++++---------- crud/common/vshard_utils.lua | 56 +++++++++ crud/readview.lua | 104 ++++++++++------ crud/select.lua | 3 +- crud/select/compat/select.lua | 12 +- crud/select/compat/select_old.lua | 1 + crud/select/merger.lua | 80 ++++++++----- test/helper.lua | 37 ++++++ test/integration/select_readview_test.lua | 14 ++- test/integration/storages_state_test.lua | 78 ++++++++---- test/unit/call_test.lua | 8 +- test/unit/not_initialized_test.lua | 2 +- test/vshard_helpers/vtest.lua | 27 ++++- 21 files changed, 429 insertions(+), 202 deletions(-) create mode 100644 crud/common/vshard_utils.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 66c78f40..7086f6bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## Unreleased + +### Fixed +* Compatibility with vshard configuration if UUIDs are omitted (#407). + ## [1.4.2] - 25-12-23 ### Added diff --git a/crud.lua b/crud.lua index 5d1e1e1d..5c33b927 100644 --- a/crud.lua +++ b/crud.lua @@ -173,12 +173,13 @@ function crud.init_storage() local user = nil if not box.info.ro then - local replicaset_uuid, replicaset = utils.get_self_vshard_replicaset() + local replicaset_key, replicaset = utils.get_self_vshard_replicaset() if replicaset == nil or replicaset.master == nil then - error(string.format('Failed to find a vshard configuration for ' .. - ' replicaset with replicaset_uuid %s.', - replicaset_uuid)) + error(string.format( + 'Failed to find a vshard configuration ' .. + 'for storage replicaset with key %q.', + replicaset_key)) end user = luri.parse(replicaset.master.uri).login or 'guest' end diff --git a/crud/common/call.lua b/crud/common/call.lua index 8fc75e8c..95ed8eb4 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -40,13 +40,13 @@ function call.get_vshard_call_name(mode, prefer_replica, balance) return 'callbre' end -local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, bucket_id) +local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, bucket_id) -- Do not rewrite ShardingHashMismatchError class. if err.class_name == sharding_utils.ShardingHashMismatchError.name then return errors.wrap(err) end - if replicaset_uuid == nil then + if replicaset_id == nil then local replicaset, _ = vshard_router:route(bucket_id) if replicaset == nil then return CallError:new( @@ -54,14 +54,20 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, b ) end - replicaset_uuid = replicaset.uuid + replicaset_id = utils.get_replicaset_id(vshard_router, replicaset) + + if replicaset_id == nil then + return CallError:new( + "Function returned an error, but we couldn't figure out the replicaset id: %s", err + ) + end end - err = utils.update_storage_call_error_description(err, func_name, replicaset_uuid) + err = utils.update_storage_call_error_description(err, func_name, replicaset_id) err = errors.wrap(err) return CallError:new(utils.format_replicaset_error( - replicaset_uuid, "Function returned an error: %s", err + replicaset_id, "Function returned an error: %s", err )) end @@ -104,13 +110,13 @@ function call.map(vshard_router, func_name, func_args, opts) local futures_by_replicasets = {} local call_opts = {is_async = true} while iter:has_next() do - local args, replicaset = iter:get() + local args, replicaset, replicaset_id = iter:get() local future = replicaset[vshard_call_name](replicaset, func_name, args, call_opts) - futures_by_replicasets[replicaset.uuid] = future + futures_by_replicasets[replicaset_id] = future end local deadline = fiber_clock() + timeout - for replicaset_uuid, future in pairs(futures_by_replicasets) do + for replicaset_id, future in pairs(futures_by_replicasets) do local wait_timeout = deadline - fiber_clock() if wait_timeout < 0 then wait_timeout = 0 @@ -119,14 +125,14 @@ function call.map(vshard_router, func_name, func_args, opts) local result, err = future:wait_result(wait_timeout) local result_info = { - key = replicaset_uuid, + key = replicaset_id, value = result, } local err_info = { err_wrapper = wrap_vshard_err, err = err, - wrapper_args = {func_name, replicaset_uuid}, + wrapper_args = {func_name, replicaset_id}, } local early_exit = postprocessor:collect(result_info, err_info) @@ -179,13 +185,13 @@ function call.any(vshard_router, func_name, func_args, opts) if replicasets == nil then return nil, CallError:new("Failed to get router replicasets: %s", err.err) end - local replicaset = select(2, next(replicasets)) + local replicaset_id, replicaset = next(replicasets) local res, err = replicaset:call(func_name, func_args, { timeout = timeout, }) if err ~= nil then - return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset.uuid) + return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id) end if res == box.NULL then diff --git a/crud/common/map_call_cases/base_iter.lua b/crud/common/map_call_cases/base_iter.lua index 0fd0e00d..452e1599 100644 --- a/crud/common/map_call_cases/base_iter.lua +++ b/crud/common/map_call_cases/base_iter.lua @@ -66,11 +66,13 @@ end -- -- @return[1] table func_args -- @return[2] table replicaset +-- @return[3] string replicaset_id function BaseIterator:get() + local replicaset_id = self.next_index local replicaset = self.next_replicaset self.next_index, self.next_replicaset = next(self.replicasets, self.next_index) - return self.func_args, replicaset + return self.func_args, replicaset, replicaset_id end return BaseIterator diff --git a/crud/common/map_call_cases/batch_insert_iter.lua b/crud/common/map_call_cases/batch_insert_iter.lua index 5fcbce14..37867f1b 100644 --- a/crud/common/map_call_cases/batch_insert_iter.lua +++ b/crud/common/map_call_cases/batch_insert_iter.lua @@ -40,7 +40,7 @@ function BatchInsertIterator:new(opts) return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err) end - local next_replicaset, next_batch = next(sharding_data.batches) + local next_index, next_batch = next(sharding_data.batches) local execute_on_storage_opts = opts.execute_on_storage_opts execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash @@ -51,7 +51,7 @@ function BatchInsertIterator:new(opts) space_name = opts.space.name, opts = execute_on_storage_opts, batches_by_replicasets = sharding_data.batches, - next_index = next_replicaset, + next_index = next_index, next_batch = next_batch, } @@ -67,8 +67,10 @@ end -- -- @return[1] table func_args -- @return[2] table replicaset +-- @return[3] string replicaset_id function BatchInsertIterator:get() - local replicaset = self.next_index + local replicaset_id = self.next_index + local replicaset = self.next_batch.replicaset local func_args = { self.space_name, self.next_batch.tuples, @@ -77,7 +79,7 @@ function BatchInsertIterator:get() self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset + return func_args, replicaset, replicaset_id end return BatchInsertIterator diff --git a/crud/common/map_call_cases/batch_upsert_iter.lua b/crud/common/map_call_cases/batch_upsert_iter.lua index fc393f1a..d25e3ee8 100644 --- a/crud/common/map_call_cases/batch_upsert_iter.lua +++ b/crud/common/map_call_cases/batch_upsert_iter.lua @@ -48,7 +48,7 @@ function BatchUpsertIterator:new(opts) return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err) end - local next_replicaset, next_batch = next(sharding_data.batches) + local next_index, next_batch = next(sharding_data.batches) local execute_on_storage_opts = opts.execute_on_storage_opts execute_on_storage_opts.sharding_func_hash = sharding_data.sharding_func_hash @@ -59,7 +59,7 @@ function BatchUpsertIterator:new(opts) space_name = opts.space.name, opts = execute_on_storage_opts, batches_by_replicasets = sharding_data.batches, - next_index = next_replicaset, + next_index = next_index, next_batch = next_batch, } @@ -75,8 +75,10 @@ end -- -- @return[1] table func_args -- @return[2] table replicaset +-- @return[3] string replicaset_id function BatchUpsertIterator:get() - local replicaset = self.next_index + local replicaset_id = self.next_index + local replicaset = self.next_batch.replicaset local func_args = { self.space_name, self.next_batch.tuples, @@ -86,7 +88,7 @@ function BatchUpsertIterator:get() self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset + return func_args, replicaset, replicaset_id end return BatchUpsertIterator diff --git a/crud/common/schema.lua b/crud/common/schema.lua index 387d37bf..6137cc91 100644 --- a/crud/common/schema.lua +++ b/crud/common/schema.lua @@ -8,6 +8,7 @@ local ReloadSchemaError = errors.new_class('ReloadSchemaError', {capture_stack = local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') +local utils = require('crud.common.vshard_utils') local schema = {} @@ -234,7 +235,8 @@ function schema.wrap_func_result(space, func, args, opts) replica_schema_version = box.internal.schema_version() end result.storage_info = { - replica_uuid = box.info().uuid, + replica_uuid = box.info().uuid, -- Backward compatibility. + replica_id = utils.get_self_vshard_replica_id(), -- Replacement for replica_uuid. replica_schema_version = replica_schema_version, } end diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index b5842917..72c4dfe7 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -18,8 +18,13 @@ function sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id) return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err) end + local replicaset_id = utils.get_replicaset_id(vshard_router, replicaset) + if replicaset_id == nil then + return nil, GetReplicasetsError:new("Failed to get replicaset id for bucket_id %s replicaset", bucket_id) + end + return { - [replicaset.uuid] = replicaset, + [replicaset_id] = replicaset, } end @@ -206,8 +211,8 @@ end -- Specified space -- -- @return[1] batches --- Map where key is a replicaset and value --- is table of tuples related to this replicaset +-- Map where key is a replicaset id and value +-- is replicaset and table of tuples related to this replicaset function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) dev_checks('table', 'table', 'table', { operations = '?table', @@ -247,7 +252,17 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) sharding_data.bucket_id, err.err) end - local record_by_replicaset = batches[replicaset] or {tuples = {}} + local replicaset_id = utils.get_replicaset_id(vshard_router, replicaset) + if replicaset_id == nil then + return nil, GetReplicasetsError:new( + "Failed to get replicaset id for bucket_id %s replicaset", + sharding_data.bucket_id) + end + + local record_by_replicaset = batches[replicaset_id] or { + replicaset = replicaset, + tuples = {}, + } table.insert(record_by_replicaset.tuples, tuple) if opts.operations ~= nil then @@ -255,7 +270,7 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) table.insert(record_by_replicaset.operations, opts.operations[i]) end - batches[replicaset] = record_by_replicaset + batches[replicaset_id] = record_by_replicaset end return { diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 7d624d1f..cc542aee 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -101,30 +101,31 @@ function utils.table_count(table) return cnt end -function utils.format_replicaset_error(replicaset_uuid, msg, ...) +function utils.format_replicaset_error(replicaset_id, msg, ...) dev_checks("string", "string") return string.format( "Failed for %s: %s", - replicaset_uuid, + replicaset_id, string.format(msg, ...) ) end -local function get_replicaset_by_replica_uuid(replicasets, uuid) - for _, replicaset in pairs(replicasets) do - for _, replica in pairs(replicaset.replicas) do - if replica.uuid == uuid then - return replicaset +local function get_replicaset_by_replica_id(replicasets, id) + for replicaset_id, replicaset in pairs(replicasets) do + for replica_id, _ in pairs(replicaset.replicas) do + if replica_id == id then + return replicaset_id, replicaset end end end - return nil + return nil, nil end -function utils.get_spaces(vshard_router, timeout, replica_uuid) - local replicasets, replicaset +function utils.get_spaces(vshard_router, timeout, replica_id) + local replicasets, replicaset, replicaset_id + timeout = timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local deadline = fiber.clock() + timeout while ( @@ -135,15 +136,15 @@ function utils.get_spaces(vshard_router, timeout, replica_uuid) -- Try to get master with timeout. fiber.yield() replicasets = vshard_router:routeall() - if replica_uuid ~= nil then + if replica_id ~= nil then -- Get the same replica on which the last DML operation was performed. -- This approach is temporary and is related to [1], [2]. -- [1] https://github.com/tarantool/crud/issues/236 -- [2] https://github.com/tarantool/crud/issues/361 - replicaset = get_replicaset_by_replica_uuid(replicasets, replica_uuid) + replicaset_id, replicaset = get_replicaset_by_replica_id(replicasets, replica_id) break else - replicaset = select(2, next(replicasets)) + replicaset_id, replicaset = next(replicasets) end if replicaset ~= nil and replicaset.master ~= nil and @@ -162,22 +163,22 @@ function utils.get_spaces(vshard_router, timeout, replica_uuid) local error_msg = string.format( 'The master was not found in replicaset %s, ' .. 'check status of the master and repeat the operation later', - replicaset.uuid) + replicaset_id) return nil, GetSpaceError:new(error_msg) end if replicaset.master.conn.error ~= nil then local error_msg = string.format( 'The connection to the master of replicaset %s is not valid: %s', - replicaset.uuid, replicaset.master.conn.error) + replicaset_id, replicaset.master.conn.error) return nil, GetSpaceError:new(error_msg) end return replicaset.master.conn.space, nil, replicaset.master.conn.schema_version end -function utils.get_space(space_name, vshard_router, timeout, replica_uuid) - local spaces, err, schema_version = utils.get_spaces(vshard_router, timeout, replica_uuid) +function utils.get_space(space_name, vshard_router, timeout, replica_id) + local spaces, err, schema_version = utils.get_spaces(vshard_router, timeout, replica_id) if spaces == nil then return nil, err @@ -208,20 +209,30 @@ function utils.fetch_latest_metadata_when_single_storage(space, space_name, netb -- [1] https://github.com/tarantool/crud/issues/236 -- [2] https://github.com/tarantool/crud/issues/361 local latest_space, err + assert(storage_info.replica_schema_version ~= nil, 'check the replica_schema_version value from storage ' .. 'for correct use of the fetch_latest_metadata opt') - assert(storage_info.replica_uuid ~= nil, - 'check the replica_uuid value from storage ' .. - 'for correct use of the fetch_latest_metadata opt') + + local replica_id + if storage_info.replica_id == nil then -- Backward compatibility. + assert(storage_info.replica_uuid ~= nil, + 'check the replica_uuid value from storage ' .. + 'for correct use of the fetch_latest_metadata opt') + replica_id = storage_info.replica_uuid + else + replica_id = storage_info.replica_id + end + assert(netbox_schema_version ~= nil, 'check the netbox_schema_version value from net_box conn on router ' .. 'for correct use of the fetch_latest_metadata opt') + if storage_info.replica_schema_version ~= netbox_schema_version then local ok, reload_schema_err = schema.reload_schema(vshard_router) if ok then latest_space, err = utils.get_space(space_name, vshard_router, - opts.timeout, storage_info.replica_uuid) + opts.timeout, replica_id) if err ~= nil then local warn_msg = "Failed to fetch space for latest schema actualization, metadata may be outdated: %s" log.warn(warn_msg, err) @@ -1053,7 +1064,7 @@ function utils.check_name_isident(name) return true end -function utils.update_storage_call_error_description(err, func_name, replicaset_uuid) +function utils.update_storage_call_error_description(err, func_name, replicaset_id) if err == nil then return nil end @@ -1067,7 +1078,7 @@ function utils.update_storage_call_error_description(err, func_name, replicaset_ err = NotInitializedError:new("Function %s is not registered: " .. "crud isn't initialized on replicaset %q or crud module versions mismatch " .. "between router and storage", - func_name, replicaset_uuid or "Unknown") + func_name, replicaset_id or "Unknown") else err = NotInitializedError:new("Function %s is not registered", func_name) end @@ -1123,7 +1134,7 @@ end -- @tparam ?string|table opts.vshard_router -- Cartridge vshard group name or vshard router instance. -- --- @return a table of storage states by replica uuid. +-- @return a table of storage states by replica id. function utils.storage_info(opts) opts = opts or {} @@ -1138,35 +1149,35 @@ function utils.storage_info(opts) end local futures_by_replicas = {} - local replica_state_by_uuid = {} + local replica_state_by_id = {} local async_opts = {is_async = true} local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT for _, replicaset in pairs(replicasets) do - for _, replica in pairs(replicaset.replicas) do - replica_state_by_uuid[replica.uuid] = { + for replica_id, replica in pairs(replicaset.replicas) do + replica_state_by_id[replica_id] = { status = "error", is_master = replicaset.master == replica } local ok, res = pcall(replica.conn.call, replica.conn, CRUD_STORAGE_INFO_FUNC_NAME, {}, async_opts) if ok then - futures_by_replicas[replica.uuid] = res + futures_by_replicas[replica_id] = res else - local err_msg = string.format("Error getting storage info for %s", replica.uuid) + local err_msg = string.format("Error getting storage info for %s", replica_id) if res ~= nil then log.error("%s: %s", err_msg, res) - replica_state_by_uuid[replica.uuid].message = tostring(res) + replica_state_by_id[replica_id].message = tostring(res) else log.error(err_msg) - replica_state_by_uuid[replica.uuid].message = err_msg + replica_state_by_id[replica_id].message = err_msg end end end end local deadline = fiber.clock() + timeout - for replica_uuid, future in pairs(futures_by_replicas) do + for replica_id, future in pairs(futures_by_replicas) do local wait_timeout = deadline - fiber.clock() if wait_timeout < 0 then wait_timeout = 0 @@ -1175,24 +1186,24 @@ function utils.storage_info(opts) local result, err = future:wait_result(wait_timeout) if result == nil then future:discard() - local err_msg = string.format("Error getting storage info for %s", replica_uuid) + local err_msg = string.format("Error getting storage info for %s", replica_id) if err ~= nil then if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then - replica_state_by_uuid[replica_uuid].status = "uninitialized" + replica_state_by_id[replica_id].status = "uninitialized" else log.error("%s: %s", err_msg, err) - replica_state_by_uuid[replica_uuid].message = tostring(err) + replica_state_by_id[replica_id].message = tostring(err) end else log.error(err_msg) - replica_state_by_uuid[replica_uuid].message = err_msg + replica_state_by_id[replica_id].message = err_msg end else - replica_state_by_uuid[replica_uuid].status = result[1].status or "uninitialized" + replica_state_by_id[replica_id].status = result[1].status or "uninitialized" end end - return replica_state_by_uuid + return replica_state_by_id end --- Storage status information. @@ -1314,29 +1325,8 @@ function utils.is_cartridge_hotreload_supported() return true, cartridge_hotreload end -function utils.get_self_vshard_replicaset() - local box_info = box.info() - - local ok, storage_info = pcall(vshard.storage.info) - assert(ok, 'vshard.storage.cfg() must be called first') - - local replicaset_uuid - if box_info.replicaset ~= nil then - replicaset_uuid = box_info.replicaset.uuid - else - replicaset_uuid = box_info.cluster.uuid - end - - local replicaset - -- Identification key may be name since vshard 0.1.25. - -- See also https://github.com/tarantool/vshard/issues/460. - for _, v in pairs(storage_info.replicasets) do - if v.uuid == replicaset_uuid then - replicaset = v - end - end - - return replicaset_uuid, replicaset +for k, v in pairs(require('crud.common.vshard_utils')) do + utils[k] = v end return utils diff --git a/crud/common/vshard_utils.lua b/crud/common/vshard_utils.lua new file mode 100644 index 00000000..f19ddca3 --- /dev/null +++ b/crud/common/vshard_utils.lua @@ -0,0 +1,56 @@ +local vshard = require('vshard') + +local vshard_utils = {} + +function vshard_utils.get_self_vshard_replicaset() + local box_info = box.info() + + local ok, storage_info = pcall(vshard.storage.info) + assert(ok, 'vshard.storage.cfg() must be called first') + + if vshard_utils.get_vshard_identification_mode() == 'name_as_key' then + local replicaset_name = box_info.replicaset.name + + return replicaset_name, storage_info.replicasets[replicaset_name] + else + local replicaset_uuid + if box_info.replicaset ~= nil then + replicaset_uuid = box_info.replicaset.uuid + else + replicaset_uuid = box_info.cluster.uuid + end + + return replicaset_uuid, storage_info.replicasets[replicaset_uuid] + end +end + +function vshard_utils.get_self_vshard_replica_id() + local box_info = box.info() + + if vshard_utils.get_vshard_identification_mode() == 'name_as_key' then + return box_info.name + else + return box_info.uuid + end +end + +function vshard_utils.get_replicaset_id(vshard_router, replicaset) + -- https://github.com/tarantool/vshard/issues/460. + local known_replicasets = vshard_router:routeall() + + for known_replicaset_id, known_replicaset in pairs(known_replicasets) do + if known_replicaset == replicaset then + return known_replicaset_id + end + end + + return nil +end + +function vshard_utils.get_vshard_identification_mode() + -- https://github.com/tarantool/vshard/issues/460. + assert(vshard.storage.internal.current_cfg ~= nil, 'available only on vshard storage') + return vshard.storage.internal.current_cfg.identification_mode +end + +return vshard_utils diff --git a/crud/readview.lua b/crud/readview.lua index e5c5ece0..cb7c8c7f 100644 --- a/crud/readview.lua +++ b/crud/readview.lua @@ -51,24 +51,35 @@ local function readview_open_on_storage(readview_name) ReadviewError:assert(false, ("Error creating readview")) end - local replica_info = {} - replica_info.uuid = box.info().uuid - replica_info.id = read_view.id + return { + id = read_view.id, - return replica_info, nil + uuid = box.info().uuid, -- Backward compatibility. + replica_id = utils.get_self_vshard_replica_id(), -- Replacement for uuid. + } end -local function readview_close_on_storage(readview_uuid) +local function readview_close_on_storage(info) dev_checks('table') - local list = box.read_view.list() + local replica_id = utils.get_self_vshard_replica_id() + local readview_id - for _, replica_info in pairs(readview_uuid) do - if replica_info.uuid == box.info().uuid then + for _, replica_info in pairs(info) do + local found = false + + if replica_info.replica_id == replica_id then + found = true + elseif replica_info.uuid == box.info().uuid then -- Backward compatibility. + found = true + end + + if found then readview_id = replica_info.id end end + local list = box.read_view.list() for k,v in pairs(list) do if v.id == readview_id then list[k]:close() @@ -106,7 +117,8 @@ local function select_readview_on_storage(space_name, index_id, conditions, opts replica_schema_version = box.internal.schema_version() end cursor.storage_info = { - replica_uuid = box.info().uuid, + replica_uuid = box.info().uuid, -- Backward compatibility. + replica_id = utils.get_self_vshard_replica_id(), -- Replacement for replica_uuid. replica_schema_version = replica_schema_version, } end @@ -206,7 +218,7 @@ local select_call = stats.wrap(select.call, stats.op.SELECT) function Readview_obj:select(space_name, user_conditions, opts) opts = opts or {} opts.readview = true - opts.readview_uuid = self._uuid + opts.readview_info = self._info if self.opened == false then return nil, ReadviewError:new("Read view is closed") @@ -220,7 +232,7 @@ local pairs_call = stats.wrap(select.pairs, stats.op.SELECT, {pairs = true}) function Readview_obj:pairs(space_name, user_conditions, opts) opts = opts or {} opts.readview = true - opts.readview_uuid = self._uuid + opts.readview_info = self._info if self.opened == false then return nil, ReadviewError:new("Read view is closed") @@ -259,21 +271,39 @@ function Readview_obj:close(opts) end local errors = {} - for _, replicaset in pairs(replicasets) do - for _, replica in pairs(replicaset.replicas) do - for _, value in pairs(self._uuid) do - if replica.uuid == value.uuid then - local replica_result, replica_err = replica.conn:call(CRUD_CLOSE_FUNC_NAME, - {self._uuid}, {timeout = opts.timeout}) - if replica_err ~= nil then - table.insert(errors, ReadviewError:new("Failed to close Readview on storage: %s", replica_err)) - end - if replica_err == nil and (not replica_result) then - table.insert(errors, ReadviewError:new("Readview was not found on storage: %s", replica.uuid)) - end - end + for replicaset_id, replicaset in pairs(replicasets) do + local replicaset_info = self._info[replicaset_id] + + if replicaset_info == nil then + goto next_replicaset + end + + for replica_id, replica in pairs(replicaset.replicas) do + local found = false + + if replicaset_info.replica_id == replica_id then + found = true + elseif replicaset_info.uuid == replica.uuid then -- Backward compatibility. + found = true end + + if not found then + goto next_replica + end + + local replica_result, replica_err = replica.conn:call(CRUD_CLOSE_FUNC_NAME, + {self._info}, {timeout = opts.timeout}) + if replica_err ~= nil then + table.insert(errors, ReadviewError:new("Failed to close Readview on storage: %s", replica_err)) + end + if replica_err == nil and (not replica_result) then + table.insert(errors, ReadviewError:new("Readview was not found on storage: %s", replica_id)) + end + + ::next_replica:: end + + ::next_replicaset:: end if next(errors) ~= nil then @@ -303,28 +333,26 @@ function Readview_obj.create(vshard_router, opts) setmetatable(readview, Readview_obj) readview._name = opts.name - local results, err, err_uuid = vshard_router:map_callrw(CRUD_OPEN_FUNC_NAME, + local results, err, err_id = vshard_router:map_callrw(CRUD_OPEN_FUNC_NAME, {readview._name}, {timeout = opts.timeout}) if err ~= nil then - return nil, - ReadviewError:new("Failed to call readview_open_on_storage on storage-side: storage uuid: %s err: %s", - err_uuid, err) + return nil, ReadviewError:new( + "Failed to call readview_open_on_storage on storage-side: storage id: %s err: %s", + err_id, err + ) end - local uuid = {} - local errors = {} - for _, replicaset_results in pairs(results) do - for _, replica_result in pairs(replicaset_results) do - table.insert(uuid, replica_result) - end + -- map_callrw response format: + -- {replicaset_id1 = {res1}, replicaset_id2 = {res2}, ...} + local info = {} + for replicaset_id, replicaset_results in pairs(results) do + local _, replica_info = next(replicaset_results) + info[replicaset_id] = replica_info end - readview._uuid = uuid + readview._info = info readview.opened = true - if next(errors) ~= nil then - return nil, errors - end return readview, nil end diff --git a/crud/select.lua b/crud/select.lua index ffb850cc..a095b589 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -53,7 +53,8 @@ local function select_on_storage(space_name, index_id, conditions, opts) replica_schema_version = box.internal.schema_version() end cursor.storage_info = { - replica_uuid = box.info().uuid, + replica_uuid = box.info().uuid, -- Backward compatibility. + replica_id = utils.get_self_vshard_replica_id(), -- Replacement for replica_uuid. replica_schema_version = replica_schema_version, } end diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index cc309b70..abd40c19 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -31,7 +31,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, yield_every = '?number', call_opts = 'table', readview = '?boolean', - readview_uuid = '?table', + readview_info = '?table', }) opts = opts or {} @@ -178,7 +178,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, local merger if opts.readview then - merger = Merger.new_readview(vshard_router, replicasets_to_select, opts.readview_uuid, + merger = Merger.new_readview(vshard_router, replicasets_to_select, opts.readview_info, space, plan.index_id, common.READVIEW_SELECT_FUNC_NAME, {space_name, plan.index_id, plan.conditions, select_opts}, {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts} @@ -215,7 +215,7 @@ function select_module.pairs(space_name, user_conditions, opts) balance = '?boolean', timeout = '?number', readview = '?boolean', - readview_uuid = '?table', + readview_info = '?table', vshard_router = '?string|table', @@ -267,7 +267,7 @@ function select_module.pairs(space_name, user_conditions, opts) fetch_latest_metadata = opts.fetch_latest_metadata, }, readview = opts.readview, - readview_uuid = opts.readview_uuid, + readview_info = opts.readview_info, } local iter, err = schema.wrap_func_reload( @@ -332,7 +332,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, balance = '?boolean', timeout = '?number', readview = '?boolean', - readview_uuid = '?table', + readview_info = '?table', vshard_router = '?string|table', @@ -379,7 +379,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, fetch_latest_metadata = opts.fetch_latest_metadata, }, readview = opts.readview, - readview_uuid = opts.readview_uuid, + readview_info = opts.readview_info, } local iter, err = schema.wrap_func_reload( diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 930ba454..009f6dce 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -67,6 +67,7 @@ local function select_iteration(space_name, plan, opts) end local tuples = {} + -- Old select works with vshard without `name_as_key` support. for replicaset_uuid, replicaset_results in pairs(results) do -- Stats extracted with callback here and not passed -- outside to wrapper because fetch for pairs can be diff --git a/crud/select/merger.lua b/crud/select/merger.lua index 46dbcff5..b8e358e1 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -244,44 +244,62 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_ end -local function new_readview(vshard_router, replicasets, readview_uuid, space, index_id, func_name, func_args, opts) +local function new_readview(vshard_router, replicasets, readview_info, space, index_id, func_name, func_args, opts) opts = opts or {} local call_opts = opts.call_opts -- Request a first data chunk and create merger sources. local merger_sources = {} - for _, replicaset in pairs(replicasets) do - for replica_uuid, replica in pairs(replicaset.replicas) do - for _, value in pairs(readview_uuid) do - if replica_uuid == value.uuid then - -- Perform a request. - local buf = buffer.ibuf() - local net_box_opts = {is_async = true, buffer = buf, - skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} - func_args[4].readview_id = value.id - local future = replica.conn:call(func_name, func_args, net_box_opts) - - -- Create a source. - local context = { - net_box_opts = net_box_opts, - buffer = buf, - func_name = func_name, - func_args = func_args, - replicaset = replicaset, - vshard_call_name = nil, - timeout = call_opts.timeout, - fetch_latest_metadata = call_opts.fetch_latest_metadata, - space_name = space.name, - vshard_router = vshard_router, - readview = true, - future_replica = replica - } - local state = {future = future} - local source = merger_lib.new_buffer_source(fetch_chunk, context, state) - table.insert(merger_sources, source) - end + for replicaset_id, replicaset in pairs(replicasets) do + local replicaset_info = readview_info[replicaset_id] + + if replicaset_info == nil then + goto next_replicaset + end + + for replica_id, replica in pairs(replicaset.replicas) do + local found = false + + if replicaset_info.replica_id == replica_id then + found = true + elseif replicaset_info.uuid == replica.uuid then -- Backward compatibility. + found = true end + + if not found then + goto next_replica + end + + -- Perform a request. + local buf = buffer.ibuf() + local net_box_opts = {is_async = true, buffer = buf, + skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} + func_args[4].readview_id = replicaset_info.id + local future = replica.conn:call(func_name, func_args, net_box_opts) + + -- Create a source. + local context = { + net_box_opts = net_box_opts, + buffer = buf, + func_name = func_name, + func_args = func_args, + replicaset = replicaset, + vshard_call_name = nil, + timeout = call_opts.timeout, + fetch_latest_metadata = call_opts.fetch_latest_metadata, + space_name = space.name, + vshard_router = vshard_router, + readview = true, + future_replica = replica + } + local state = {future = future} + local source = merger_lib.new_buffer_source(fetch_chunk, context, state) + table.insert(merger_sources, source) + + ::next_replica:: end + + ::next_replicaset:: end -- Trick for performance. diff --git a/test/helper.lua b/test/helper.lua index 3d044f70..1f823c0b 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -832,4 +832,41 @@ function helpers.schema_compatibility(schema) return schema end +function helpers.string_replace(base, old_fragment, new_fragment) + local i, j = base:find(old_fragment) + + if i == nil then + return base + end + + local prefix = '' + if i > 1 then + prefix = base:sub(1, i - 1) + end + + local suffix = '' + if j < base:len() then + suffix = base:sub(j + 1, base:len()) + end + + return prefix .. new_fragment .. suffix +end + +function helpers.assert_string_contains_pattern_with_replicaset_id(str, pattern) + local uuid_pattern = "%w+%-0000%-0000%-0000%-00000000000%d" + local name_pattern = "s%-%d" -- All existing clusters uses this pattern, but this assumption is unstable. + + local found = false + for _, id_pattern in pairs({uuid_pattern, name_pattern}) do + -- pattern is expected to be like "Failed for [replicaset_id]". + local full_pattern = helpers.string_replace('[replicaset_id]', id_pattern) + if str:find(full_pattern) ~= nil then + found = true + break + end + end + + t.assert(found, ("string %q does not contain pattern %q"):format(str, pattern)) +end + return helpers diff --git a/test/integration/select_readview_test.lua b/test/integration/select_readview_test.lua index f5efb5d8..6a003d9a 100644 --- a/test/integration/select_readview_test.lua +++ b/test/integration/select_readview_test.lua @@ -2308,10 +2308,18 @@ pgroup.test_stop_select = function(g) g.cluster:server('s2-master'):start() if g.params.backend == helpers.backend.VSHARD then - g.cluster:server('s2-master'):exec(function(cfg) - require('vshard.storage').cfg(cfg, box.info.uuid) + local bootstrap_key + if type(g.params.backend_cfg) == 'table' + and g.params.backend_cfg.identification_mode == 'name_as_key' then + bootstrap_key = 'name' + else + bootstrap_key = 'uuid' + end + + g.cluster:server('s2-master'):exec(function(cfg, bootstrap_key) + require('vshard.storage').cfg(cfg, box.info[bootstrap_key]) require('crud').init_storage() - end, {g.cfg}) + end, {g.cfg, bootstrap_key}) end local _, err = g.cluster.main_server.net_box:eval([[ diff --git a/test/integration/storages_state_test.lua b/test/integration/storages_state_test.lua index 10ccf41a..9b7f348a 100644 --- a/test/integration/storages_state_test.lua +++ b/test/integration/storages_state_test.lua @@ -49,30 +49,63 @@ pgroup.after_all(function(g) helpers.stop_cluster(g.cluster, g.params.backend) end) +local function build_storage_info(g, array_info) + local is_vshard = g.params.backend == 'vshard' + local name_as_key = is_vshard and ( + type(g.params.backend_cfg) == 'table' + and g.params.backend_cfg.identification_mode == 'name_as_key' + ) + + local keys + if name_as_key then + keys = { + 's1-master', + 's1-replica', + 's2-master', + 's2-replica', + } + else + keys = { + helpers.uuid('b', 1), + helpers.uuid('b', 10), + helpers.uuid('c', 1), + helpers.uuid('c', 10), + } + end + + local res = {} + for i, v in ipairs(array_info) do + res[keys[i]] = v + end + + return res +end + pgroup.test_crud_storage_status_of_stopped_servers = function(g) g.cluster:server("s2-replica"):stop() local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) t.assert_equals(err, nil, "Error getting storages info") - t.assert_equals(results, { - [helpers.uuid('b', 1)] = { + + t.assert_equals(results, build_storage_info(g, { + { status = "running", is_master = true }, - [helpers.uuid('b', 10)] = { + { status = "running", is_master = false }, - [helpers.uuid('c', 1)] = { + { status = "running", is_master = true }, - [helpers.uuid('c', 10)] = { + { status = "error", is_master = false, message = "Peer closed" - } - }) + }, + })) end pgroup.after_test('test_crud_storage_status_of_stopped_servers', function(g) @@ -97,24 +130,24 @@ pgroup.test_disabled_storage_role = function(g) local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) t.assert_equals(err, nil, "Error getting storages info") - t.assert_equals(results, { - [helpers.uuid('b', 1)] = { + t.assert_equals(results, build_storage_info(g, { + { status = "running", is_master = true }, - [helpers.uuid('b', 10)] = { + { status = "uninitialized", is_master = false }, - [helpers.uuid('c', 1)] = { + { status = "running", is_master = true }, - [helpers.uuid('c', 10)] = { + { status = "running", - is_master = false - } - }) + is_master = false, + }, + })) end pgroup.after_test('test_disabled_storage_role', function(g) @@ -138,26 +171,25 @@ pgroup.test_storage_call_failure = function(g) local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) t.assert_equals(err, nil, "Error getting storages info") - - t.assert_equals(results, { - [helpers.uuid('b', 1)] = { + t.assert_equals(results, build_storage_info(g, { + { status = "running", is_master = true }, - [helpers.uuid('b', 10)] = { + { status = "running", is_master = false }, - [helpers.uuid('c', 1)] = { + { status = "running", is_master = true }, - [helpers.uuid('c', 10)] = { + { status = "error", is_master = false, message = "attempt to call a table value" - } - }) + }, + })) end pgroup.after_test('test_storage_call_failure', function(g) diff --git a/test/unit/call_test.lua b/test/unit/call_test.lua index 9405e200..319d53b5 100644 --- a/test/unit/call_test.lua +++ b/test/unit/call_test.lua @@ -93,7 +93,7 @@ pgroup.test_map_non_existent_func = function(g) ]]) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") t.assert_str_contains(err.err, "Function non_existent_func is not registered") end @@ -106,7 +106,7 @@ pgroup.test_single_non_existent_func = function(g) ]]) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") t.assert_str_contains(err.err, "Function non_existent_func is not registered") end @@ -178,7 +178,7 @@ pgroup.test_timeout = function(g) ]], {timeout + 0.1, timeout}) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") helpers.assert_timeout_error(err.err) end @@ -316,6 +316,6 @@ pgroup.test_any_vshard_call_timeout = function(g) ]], {timeout + 0.1, timeout}) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") helpers.assert_timeout_error(err.err) end diff --git a/test/unit/not_initialized_test.lua b/test/unit/not_initialized_test.lua index cdd6ae20..2e40a23b 100644 --- a/test/unit/not_initialized_test.lua +++ b/test/unit/not_initialized_test.lua @@ -60,7 +60,7 @@ pgroup.test_insert = function(g) ]]) t.assert_equals(results, nil) - t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-00000000000%d", true) + helpers.assert_string_contains_pattern_with_replicaset_id(err.err, "Failed for [replicaset_id]") t.assert_str_contains(err.err, "crud isn't initialized on replicaset") end diff --git a/test/vshard_helpers/vtest.lua b/test/vshard_helpers/vtest.lua index 0097d919..7dd2e82c 100644 --- a/test/vshard_helpers/vtest.lua +++ b/test/vshard_helpers/vtest.lua @@ -150,7 +150,7 @@ local function config_new(templ, additional_cfg) end if res.identification_mode == 'name_as_key' then - replica.uuid = replica_uuid + replica.uuid = replica_uuid -- Optional, will clean up from 'sharding' later. replicas[replica_name] = replica else replica.name = replica_name @@ -159,7 +159,7 @@ local function config_new(templ, additional_cfg) end if res.identification_mode == 'name_as_key' then - replicaset.uuid = replicaset_uuid + replicaset.uuid = replicaset_uuid -- Optional, will clean up from 'sharding' later. sharding[replicaset_name] = replicaset else replicaset.name = replicaset_name @@ -245,7 +245,7 @@ local function cluster_bootstrap(g, cfg) local rs_uuid if cfg.identification_mode == 'name_as_key' then - rs_uuid = rs.uuid + rs_uuid = replicaset_name_to_uuid(rs_id) else rs_uuid = rs_id end @@ -353,6 +353,22 @@ local function cluster_wait_vclock_all(g) end end +local function strip_uuid_from_sharding_if_name_is_key(cfg) + if cfg.identification_mode ~= 'name_as_key' then + return cfg + end + + for _, replicaset in pairs(cfg.sharding) do + replicaset.uuid = nil + + for _, replica in pairs(replicaset.replicas) do + replica.uuid = nil + end + end + + return cfg +end + -- -- Build new cluster by a given config. -- @@ -435,6 +451,11 @@ local function cluster_new(g, cfg) replica:start({wait_for_readiness = false}) end + -- UUIDs are optional in vshard configuration in + -- Tarantool >= 3.0 + vshard >= 0.1.25 + identification_mode == 'name_as_key'. + -- With tests, we want to ensure that we don't rely on it. + cfg = strip_uuid_from_sharding_if_name_is_key(cfg) + for _, master in pairs(masters) do master:wait_for_readiness() master:exec(function(cfg)