From d053623d616682bc9d8dc233a141c1b0063a22d9 Mon Sep 17 00:00:00 2001 From: psergee Date: Wed, 22 Jun 2022 14:22:01 +0300 Subject: [PATCH] Provide an API to get storages initialization state There is an issue with using CRUD functionality if not all storages are up. New function is added to get the information about storages state: initialized or not. So, a user can poll state and wait for storages to be initialized before making CRUD calls. Resolves #229 --- CHANGELOG.md | 1 + README.md | 48 +++++- crud.lua | 6 + crud/common/call.lua | 9 +- crud/common/const.lua | 2 + crud/common/utils.lua | 86 +++++++++++ test/integration/storages_state_test.lua | 177 +++++++++++++++++++++++ 7 files changed, 323 insertions(+), 6 deletions(-) create mode 100644 test/integration/storages_state_test.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 932d76ed..5f4e6914 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +* `crud.storage_info` function to get storages status (#229, PR #299). ### Changed diff --git a/README.md b/README.md index 429c7a3c..901692e9 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ It also provides the `crud-storage` and `crud-router` roles for - [Cut extra objects](#cut-extra-objects) - [Truncate](#truncate) - [Len](#len) + - [Storage info](#storage-info) - [Count](#count) - [Call options for crud methods](#call-options-for-crud-methods) - [Statistics](#statistics) @@ -1074,6 +1075,51 @@ crud.len('customers') ... ``` +### Storage info + +```lua +-- Get storages status +local result, err = crud.storage_info(opts) +``` + +where: + +* `opts`: + * `timeout` (`?number`) - maximum time (in seconds, default: 2) to wait for response from + cluster instances. + +Returns storages status table by instance UUID or nil with error. Status table fields: +* `status` contains a string representing the status: + * `"running"` - storage is initialized and running. + * `"uninitialized"` - storage is not initialized or disabled. + * `"error"` - error getting the status from a storage. Connection error, for example. +* `is_master` is `true` if an instance is a master, `false` - otherwise. +* `message` is `nil` unless a problem occurs with getting storage status. + + +**Example:** + +```lua +crud.storage_info() +``` +``` +--- +- fe1b5bd9-42d4-4955-816c-3aa015e0eb81: + status: running + is_master: true + a1eefe51-9869-4c4c-9676-76431b08c97a: + status: running + is_master: true + 777415f4-d656-440e-8834-7124b7267b6d: + status: uninitialized + is_master: false + e1b2e202-b0f7-49cd-b0a2-6b3a584f995e: + status: error + message: 'connect, called on fd 36, aka 127.0.0.1:49762: Connection refused' + is_master: false +... +``` + ### Count `CRUD` supports multi-conditional count, treating a cluster as a single space. @@ -1240,7 +1286,7 @@ returns. `count` is the total requests count since instance start or stats restart. `time` is the total time of requests execution. `latency_average` is `time` / `count`. `latency_quantile_recent` is the 0.99 quantile of request execution -time for a recent period (see +time for a recent period (see [`metrics` summary API](https://www.tarantool.io/ru/doc/latest/book/monitoring/api_reference/#summary)). It is computed only if `metrics` driver is used and quantiles are enabled. `latency_quantile_recent` value may be `-nan` if there diff --git a/crud.lua b/crud.lua index 05d36ec4..b190b630 100644 --- a/crud.lua +++ b/crud.lua @@ -138,6 +138,10 @@ crud.stats = stats.get -- @function reset_stats crud.reset_stats = stats.reset +-- @refer utils.storage_info +-- @function storage_info +crud.storage_info = utils.storage_info + --- Initializes crud on node -- -- Exports all functions that are used for calls @@ -165,6 +169,8 @@ function crud.init_storage() count.init() borders.init() sharding_metadata.init() + + _G._crud.storage_info_on_storage = utils.storage_info_on_storage end function crud.init_router() diff --git a/crud/common/call.lua b/crud/common/call.lua index a4659e4a..a89c414b 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -5,6 +5,7 @@ local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') local fiber_clock = require('fiber').clock +local const = require('crud.common.const') local BaseIterator = require('crud.common.map_call_cases.base_iter') local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') @@ -13,8 +14,6 @@ local CallError = errors.new_class('CallError') local call = {} -call.DEFAULT_VSHARD_CALL_TIMEOUT = 2 - function call.get_vshard_call_name(mode, prefer_replica, balance) dev_checks('string', '?boolean', '?boolean') @@ -84,7 +83,7 @@ function call.map(func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local iter = opts.iter if iter == nil then @@ -149,7 +148,7 @@ function call.single(bucket_id, func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, { timeout = timeout, @@ -171,7 +170,7 @@ function call.any(func_name, func_args, opts) timeout = '?number', }) - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local replicasets, err = vshard.router.routeall() if replicasets == nil then diff --git a/crud/common/const.lua b/crud/common/const.lua index d452e1bf..7a13cf91 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1 const.NEED_SCHEMA_RELOAD = 0x0001000 const.NEED_SHARDING_RELOAD = 0x0001001 +const.DEFAULT_VSHARD_CALL_TIMEOUT = 2 + return const diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 5b3614b2..9153c3df 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -3,6 +3,7 @@ local ffi = require('ffi') local vshard = require('vshard') local fun = require('fun') local bit = require('bit') +local log = require('log') local const = require('crud.common.const') local schema = require('crud.common.schema') @@ -15,6 +16,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false}) local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false}) local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false}) local NotInitializedError = errors.new_class('NotInitialized') +local StorageInfoError = errors.new_class('StorageInfoError') +local fiber_clock = require('fiber').clock local utils = {} @@ -748,4 +751,87 @@ function utils.list_slice(list, start_index, end_index) return slice end +--- Polls replicas for storage state +-- +-- @function storage_info +-- +-- @tparam ?number opts.timeout +-- Function call timeout +-- +-- @return a table of storage states by replica uuid. +function utils.storage_info(opts) + local replicasets, err = vshard.router.routeall() + if replicasets == nil then + return nil, StorageInfoError:new("Failed to get all replicasets: %s", err.err) + end + + opts = opts or {} + + local futures_by_replicas = {} + local replica_state_by_uuid = {} + local async_opts = {is_async = true} + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT + + for _, replicaset in pairs(replicasets) do + for replica_uuid, replica in pairs(replicaset.replicas) do + replica_state_by_uuid[replica_uuid] = { + status = "error", + is_master = replicaset.master == replica + } + local ok, res = pcall(replica.conn.call, replica.conn, "_crud.storage_info_on_storage", + {}, async_opts) + if ok then + futures_by_replicas[replica_uuid] = res + else + local err_msg = string.format("Error getting storage info for %s", replica_uuid) + if res ~= nil then + log.error("%s: %s", err_msg, res) + replica_state_by_uuid[replica_uuid].message = tostring(res) + else + log.error(err_msg) + replica_state_by_uuid[replica_uuid].message = err_msg + end + end + end + end + + local deadline = fiber_clock() + timeout + for replica_uuid, future in pairs(futures_by_replicas) do + local wait_timeout = deadline - fiber_clock() + if wait_timeout < 0 then + wait_timeout = 0 + end + + local result, err = future:wait_result(wait_timeout) + if result == nil then + future:discard() + local err_msg = string.format("Error getting storage info for %s", replica_uuid) + if err ~= nil then + if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then + replica_state_by_uuid[replica_uuid].status = "uninitialized" + else + log.error("%s: %s", err_msg, err) + replica_state_by_uuid[replica_uuid].message = tostring(err) + end + else + log.error(err_msg) + replica_state_by_uuid[replica_uuid].message = err_msg + end + else + replica_state_by_uuid[replica_uuid].status = result[1].status or "uninitialized" + end + end + + return replica_state_by_uuid +end + +--- Storage status information. +-- +-- @function storage_info_on_storage +-- +-- @return a table with storage status. +function utils.storage_info_on_storage() + return {status = "running"} +end + return utils diff --git a/test/integration/storages_state_test.lua b/test/integration/storages_state_test.lua new file mode 100644 index 00000000..015fd664 --- /dev/null +++ b/test/integration/storages_state_test.lua @@ -0,0 +1,177 @@ +local fio = require('fio') + +local t = require('luatest') + +local helpers = require('test.helper') + +local fiber = require("fiber") + +local pgroup = t.group('storage_info', { + {engine = 'memtx'} +}) + +-- Waits for storages to initialize. +-- This is a workaround for "peer closed" errors for some connections right after the cluster start. +-- Retry is required to give a small timeout to reconnect. +local function wait_storages_init(g) + local storages_initialized = false + local attempts_left = 5 + local wait_for_init_timeout = 1 + while (attempts_left > 0 and not storages_initialized) do + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storage status") + storages_initialized = true + local count = 0 + for _, v in pairs(results) do + count = count + 1 + if v.status ~= "running" then + storages_initialized = false + end + end + if count ~= 4 then -- Make sure the results count is equal to the cluster instances count. + return false + end + if not storages_initialized then + fiber.sleep(wait_for_init_timeout) + attempts_left = attempts_left - 1 + end + end + return storages_initialized +end + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_select'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() +end) + +pgroup.before_each(function(g) + t.assert_equals(wait_storages_init(g), true) +end) + +pgroup.after_all(function(g) + helpers.stop_cluster(g.cluster) + fio.rmtree(g.cluster.datadir) +end) + +pgroup.test_crud_storage_status_of_stopped_servers = function(g) + g.cluster:server("s2-replica"):stop() + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "running", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "error", + is_master = false, + message = "Peer closed" + } + }) +end + +pgroup.after_test('test_crud_storage_status_of_stopped_servers', function(g) + g.cluster:server("s2-replica"):start() +end) + +pgroup.test_disabled_storage_role = function(g) + -- stop crud storage role on one replica + local server = g.cluster:server("s1-replica") + local results = server.net_box:eval([[ + local serviceregistry = require("cartridge.service-registry") + serviceregistry.get("crud-storage").stop() + return true + ]]) + + t.assert_not_equals(results, nil, "Failed to disable storage role") + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "uninitialized", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "running", + is_master = false + } + }) +end + +pgroup.after_test('test_disabled_storage_role', function(g) + g.cluster:server("s1-replica").net_box:eval([[ + local serviceregistry = require("cartridge.service-registry") + serviceregistry.get("crud-storage").init() + return true + ]]) +end) + +pgroup.test_storage_call_failure = function(g) + -- stop crud storage role on one replica + local server = g.cluster:server("s2-replica") + local results = server.net_box:eval([[ + _G.saved_storage_info_on_storage = _crud.storage_info_on_storage + _crud.storage_info_on_storage = {} + return true + ]]) + + t.assert_not_equals(results, nil, "Eval failed") + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "running", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "error", + is_master = false, + message = "attempt to call a table value" + } + }) +end + +pgroup.after_test('test_storage_call_failure', function(g) + g.cluster:server("s2-replica").net_box:eval([[ + _crud.storage_info_on_storage = _G.saved_storage_info_on_storage + _G.saved_storage_info_on_storage = nil + return true + ]]) +end)