diff --git a/CHANGELOG.md b/CHANGELOG.md index 1796a4731..345bd3bd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +* `crud.get_storages_state` function to get storages initialization state (#229). ### Changed diff --git a/crud.lua b/crud.lua index 3f2b5c599..48b65b3f3 100644 --- a/crud.lua +++ b/crud.lua @@ -111,6 +111,10 @@ crud.stats = stats.get -- @function reset_stats crud.reset_stats = stats.reset +-- @refer utils.get_storages_state +-- @function get_storages_state +crud.get_storages_state = utils.get_storages_state + --- Initializes crud on node -- -- Exports all functions that are used for calls @@ -135,6 +139,8 @@ function crud.init_storage() count.init() borders.init() sharding_metadata.init() + + _G._crud.is_init = true end function crud.init_router() diff --git a/crud/common/call.lua b/crud/common/call.lua index f3a1b997d..e0e958db6 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -5,13 +5,12 @@ local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') local fiber_clock = require('fiber').clock +local const = require('crud.common.const') local CallError = errors.new_class('CallError') local call = {} -call.DEFAULT_VSHARD_CALL_TIMEOUT = 2 - function call.get_vshard_call_name(mode, prefer_replica, balance) dev_checks('string', '?boolean', '?boolean') @@ -79,7 +78,7 @@ function call.map(func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local replicasets, err if opts.replicasets ~= nil then @@ -134,7 +133,7 @@ function call.single(bucket_id, func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, { timeout = timeout, @@ -156,7 +155,7 @@ function call.any(func_name, func_args, opts) timeout = '?number', }) - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local replicasets, err = vshard.router.routeall() if replicasets == nil then diff --git a/crud/common/const.lua b/crud/common/const.lua index d452e1bf5..7a13cf91a 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1 const.NEED_SCHEMA_RELOAD = 0x0001000 const.NEED_SHARDING_RELOAD = 0x0001001 +const.DEFAULT_VSHARD_CALL_TIMEOUT = 2 + return const diff --git a/crud/common/utils.lua b/crud/common/utils.lua index bcc961a30..69b59ad19 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -15,6 +15,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false}) local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false}) local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false}) local NotInitializedError = errors.new_class('NotInitialized') +local GetReplicaStateError = errors.new_class('GetStorageStateError') +local fiber_clock = require('fiber').clock local utils = {} @@ -711,4 +713,55 @@ function utils.update_storage_call_error_description(err, func_name, replicaset_ return err end +--- Polls replicas for storage state +-- +-- @function get_storages_state +-- +-- @tparam ?number opts.timeout +-- Function call timeout +-- +-- @return a table of storage states by replica uuid. +function utils.get_storages_state(opts) + local replicasets, err = vshard.router.routeall() + if replicasets == nil then + return nil, GetReplicaStateError:new("Failed to get all replicasets: %s", err.err) + end + + opts = opts or {} + + local futures_by_replicas = {} + local replicas_state = {} + local async_opts = {is_async = true} + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT + + for _, replicasets in pairs(replicasets) do + for replica_uuid, replica in pairs(replicasets.replicas) do + replicas_state[replica_uuid] = {name = replica.name, storage_initialized = false} + local ok, future = pcall(replica.conn.eval, replica.conn, + [[return rawget(rawget(_G, "_crud") or {}, "is_init") ~= nil]], + {}, async_opts) + if ok then + futures_by_replicas[replica_uuid] = future + end + end + end + + local deadline = fiber_clock() + timeout + for replica_uuid, future in pairs(futures_by_replicas) do + local wait_timeout = deadline - fiber_clock() + if wait_timeout < 0 then + wait_timeout = 0 + end + + local result = future:wait_result(wait_timeout) + if result then + replicas_state[replica_uuid].storage_initialized = result[1] + else + future:discard() + end + end + + return replicas_state +end + return utils diff --git a/test/integration/storages_state_test.lua b/test/integration/storages_state_test.lua new file mode 100644 index 000000000..f9141932c --- /dev/null +++ b/test/integration/storages_state_test.lua @@ -0,0 +1,89 @@ +local fio = require('fio') + +local t = require('luatest') + +local helpers = require('test.helper') + +local fiber = require("fiber") + +local pgroup = t.group('replicas_state', { + {engine = 'memtx'} +}) + +local all_storages_initialized = false + +local function wait_storages_init(g) + local storages_initialized = false + local attempts_left = 5 + local wait_for_init_timeout = 1 + while (attempts_left > 0 and not storages_initialized) do + local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {}) + t.assert_equals(err, nil, "Error getting storags states") + storages_initialized = true + for _,v in pairs(results) do + if not v.storage_initialized then + storages_initialized = false + end + end + if not storages_initialized then + fiber.sleep(wait_for_init_timeout) + attempts_left = attempts_left-1 + end + end + return storages_initialized +end + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_select'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + + -- wait for storages to initialize + all_storages_initialized = wait_storages_init(g) +end) + +pgroup.after_all(function(g) + helpers.stop_cluster(g.cluster) + fio.rmtree(g.cluster.datadir) +end) + +pgroup.test_crud_storage_status_of_stopped_servers = function(g) + t.assert_equals(all_storages_initialized, true) + + g.cluster:server("s2-replica"):stop() + + local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {}) + t.assert_equals(err, nil, "Error getting storags states") + + t.assert_equals(results[helpers.uuid('b', 1)].storage_initialized, true) + t.assert_equals(results[helpers.uuid('c', 1)].storage_initialized, true) + t.assert_equals(results[helpers.uuid('c', 2)].storage_initialized, false) +end + +pgroup.test_disabled_storage_role = function(g) + t.assert_equals(all_storages_initialized, true) + + -- stop crud storage role on one replica + local server = g.cluster:server("s1-replica") + local results = server.net_box:eval([[ + local serviceregistry = require("cartridge.service-registry") + serviceregistry.get("crud-storage").stop() + return true + ]]) + + t.assert_not_equals(results, nil, "Fail to disable storage role") + + local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {}) + t.assert_equals(err, nil, "Error getting storags states") + + t.assert_equals(results[helpers.uuid('b', 1)].storage_initialized, true) + t.assert_equals(results[helpers.uuid('b', 2)].storage_initialized, false) + t.assert_equals(results[helpers.uuid('c', 1)].storage_initialized, true) +end