From f3aa7c977ae3f92664b8c4db992fd7491b1f9e5d Mon Sep 17 00:00:00 2001 From: psergee Date: Wed, 22 Jun 2022 14:22:01 +0300 Subject: [PATCH] Provide an API to get storages initialization state There is an issue with using CRUD functionality if not all storages are up. New function is added to get the information about storages state: initialized or not. So, a user can poll state and wait for storages to be initialized before making CRUD calls. Resolves #229 --- CHANGELOG.md | 1 + README.md | 42 +++++++++ crud.lua | 6 ++ crud/common/call.lua | 9 +- crud/common/const.lua | 2 + crud/common/utils.lua | 64 +++++++++++++ test/integration/storages_state_test.lua | 114 +++++++++++++++++++++++ 7 files changed, 233 insertions(+), 5 deletions(-) create mode 100644 test/integration/storages_state_test.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index dd50a2687..55e5ee7df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +* `crud.storage_info` function to get storages status (#229). ### Changed diff --git a/README.md b/README.md index 429c7a3c4..d375fd180 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ It also provides the `crud-storage` and `crud-router` roles for - [Cut extra objects](#cut-extra-objects) - [Truncate](#truncate) - [Len](#len) + - [Storage info](#storage-info) - [Count](#count) - [Call options for crud methods](#call-options-for-crud-methods) - [Statistics](#statistics) @@ -1074,6 +1075,47 @@ crud.len('customers') ... ``` +### Storage info + +```lua +-- Get storages status +local result, err = crud.storage_info(opts) +``` + +where: + +* `opts`: + * `timeout` (`?number`) - maximum time (in seconds) to wait for response from + cluster instances. + +Returns storages status table by instance UUID or nil with error. Status table fields: +"status" contains a string representing the status: +* "running" - storage is initialized. +* "uninitialized" - storage is not initialized or disabled. +* "error" - error getting the status from a remote instance. Connection error, for example. +"is_master" is true if an instance is a master. False - otherwise. + + +**Example:** + +```lua +crud.storage_info() +``` +``` +--- +- 5c3392a3-ce89-4aec-83f3-6cb5f18e60c3: + status: error + is_master: true + 376435fc-7871-4686-9817-75df1a093e41: + status: running + is_master: false + afe7f578-943f-4bd9-b636-6356760f6586: + status: uninitialized + is_master: true +... +``` + + ### Count `CRUD` supports multi-conditional count, treating a cluster as a single space. diff --git a/crud.lua b/crud.lua index 05d36ec4a..3b1fe7ff7 100644 --- a/crud.lua +++ b/crud.lua @@ -138,6 +138,10 @@ crud.stats = stats.get -- @function reset_stats crud.reset_stats = stats.reset +-- @refer utils.storage_info +-- @function storage_info +crud.storage_info = utils.storage_info + --- Initializes crud on node -- -- Exports all functions that are used for calls @@ -165,6 +169,8 @@ function crud.init_storage() count.init() borders.init() sharding_metadata.init() + + _G._crud.is_initialized = function() return true end end function crud.init_router() diff --git a/crud/common/call.lua b/crud/common/call.lua index a4659e4a2..a89c414b2 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -5,6 +5,7 @@ local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') local fiber_clock = require('fiber').clock +local const = require('crud.common.const') local BaseIterator = require('crud.common.map_call_cases.base_iter') local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') @@ -13,8 +14,6 @@ local CallError = errors.new_class('CallError') local call = {} -call.DEFAULT_VSHARD_CALL_TIMEOUT = 2 - function call.get_vshard_call_name(mode, prefer_replica, balance) dev_checks('string', '?boolean', '?boolean') @@ -84,7 +83,7 @@ function call.map(func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local iter = opts.iter if iter == nil then @@ -149,7 +148,7 @@ function call.single(bucket_id, func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, { timeout = timeout, @@ -171,7 +170,7 @@ function call.any(func_name, func_args, opts) timeout = '?number', }) - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local replicasets, err = vshard.router.routeall() if replicasets == nil then diff --git a/crud/common/const.lua b/crud/common/const.lua index d452e1bf5..7a13cf91a 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1 const.NEED_SCHEMA_RELOAD = 0x0001000 const.NEED_SHARDING_RELOAD = 0x0001001 +const.DEFAULT_VSHARD_CALL_TIMEOUT = 2 + return const diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 5b3614b25..4c5c81478 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -3,6 +3,7 @@ local ffi = require('ffi') local vshard = require('vshard') local fun = require('fun') local bit = require('bit') +local log = require('log') local const = require('crud.common.const') local schema = require('crud.common.schema') @@ -15,6 +16,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false}) local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false}) local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false}) local NotInitializedError = errors.new_class('NotInitialized') +local GetReplicaStateError = errors.new_class('GetStorageStateError') +local fiber_clock = require('fiber').clock local utils = {} @@ -748,4 +751,65 @@ function utils.list_slice(list, start_index, end_index) return slice end +--- Polls replicas for storage state +-- +-- @function storage_info +-- +-- @tparam ?number opts.timeout +-- Function call timeout +-- +-- @return a table of storage states by replica uuid. +function utils.storage_info(opts) + local replicasets, err = vshard.router.routeall() + if replicasets == nil then + return nil, GetReplicaStateError:new("Failed to get all replicasets: %s", err.err) + end + + opts = opts or {} + + local futures_by_replicas = {} + local replica_state_by_uuid = {} + local async_opts = {is_async = true} + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT + + for _, replicaset in pairs(replicasets) do + for replica_uuid, replica in pairs(replicaset.replicas) do + replica_state_by_uuid[replica_uuid] = {status = "error", + is_master = replicaset.master == replica} + local ok, res = pcall(replica.conn.call, replica.conn, "_crud.is_initialized", + {}, async_opts) + if ok then + futures_by_replicas[replica_uuid] = res + elseif res ~= nil then + log.error("Error getting storage info for %s: %s", replica_uuid, res) + end + end + end + + local deadline = fiber_clock() + timeout + for replica_uuid, future in pairs(futures_by_replicas) do + local wait_timeout = deadline - fiber_clock() + if wait_timeout < 0 then + wait_timeout = 0 + end + + local result, err = future:wait_result(wait_timeout) + if result == nil then + future:discard() + if err ~= nil then + local str_err = tostring(err) + if (string.find(str_err, " is not defined") ~= nil) then + replica_state_by_uuid[replica_uuid].status = "uninitialized" + else + log.error("Error getting storage info for %s: %s", replica_uuid, err) + end + end + else + replica_state_by_uuid[replica_uuid].status = result[1] and "running" or "uninitialized" + end + end + + return replica_state_by_uuid +end + return utils diff --git a/test/integration/storages_state_test.lua b/test/integration/storages_state_test.lua new file mode 100644 index 000000000..2476f7407 --- /dev/null +++ b/test/integration/storages_state_test.lua @@ -0,0 +1,114 @@ +local fio = require('fio') + +local t = require('luatest') + +local helpers = require('test.helper') + +local fiber = require("fiber") + +local pgroup = t.group('replicas_state', { + {engine = 'memtx'} +}) + +local all_storages_initialized = false + +local function wait_storages_init(g) + local storages_initialized = false + local attempts_left = 5 + local wait_for_init_timeout = 1 + while (attempts_left > 0 and not storages_initialized) do + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storage status") + storages_initialized = true + for _,v in pairs(results) do + if v.status ~= "running" then + storages_initialized = false + end + end + if not storages_initialized then + fiber.sleep(wait_for_init_timeout) + attempts_left = attempts_left-1 + end + end + return storages_initialized +end + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_select'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + + -- wait for storages to initialize + all_storages_initialized = wait_storages_init(g) +end) + +pgroup.after_all(function(g) + helpers.stop_cluster(g.cluster) + fio.rmtree(g.cluster.datadir) +end) + +pgroup.test_crud_storage_status_of_stopped_servers = function(g) + t.assert_equals(all_storages_initialized, true) + + g.cluster:server("s2-replica"):stop() + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storags states") + + local instance = results[helpers.uuid('b', 1)] + t.assert_equals(instance.status, "running") + t.assert_equals(instance.is_master, true) + + local instance = results[helpers.uuid('b', 2)] + t.assert_equals(instance.is_master, false) + + instance = results[helpers.uuid('c', 1)] + t.assert_equals(instance.status, "running") + t.assert_equals(instance.is_master, true) + + instance = results[helpers.uuid('c', 2)] + t.assert_equals(instance.status, "error") -- peer closed + t.assert_equals(instance.is_master, false) + + g.cluster:server("s2-replica"):start() +end + +pgroup.test_disabled_storage_role = function(g) + t.assert_equals(wait_storages_init(g), true) + + -- stop crud storage role on one replica + local server = g.cluster:server("s1-replica") + local results = server.net_box:eval([[ + local serviceregistry = require("cartridge.service-registry") + serviceregistry.get("crud-storage").stop() + return true + ]]) + + t.assert_not_equals(results, nil, "Fail to disable storage role") + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storags states") + + local instance = results[helpers.uuid('b', 1)] + t.assert_equals(instance.status, "running") + t.assert_equals(instance.is_master, true) + + instance = results[helpers.uuid('b', 2)] + t.assert_equals(instance.status, "uninitialized") + t.assert_equals(instance.is_master, false) + + instance = results[helpers.uuid('c', 1)] + t.assert_equals(instance.status, "running") + t.assert_equals(instance.is_master, true) + + instance = results[helpers.uuid('c', 2)] + t.assert_equals(instance.status, "running") + t.assert_equals(instance.is_master, false) +end