diff --git a/CHANGELOG.md b/CHANGELOG.md index dd50a268..d9bc2287 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] ### Added +* `crud.storage_info` function to get storages status (#229, PR #299). ### Changed diff --git a/README.md b/README.md index 429c7a3c..901692e9 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ It also provides the `crud-storage` and `crud-router` roles for - [Cut extra objects](#cut-extra-objects) - [Truncate](#truncate) - [Len](#len) + - [Storage info](#storage-info) - [Count](#count) - [Call options for crud methods](#call-options-for-crud-methods) - [Statistics](#statistics) @@ -1074,6 +1075,51 @@ crud.len('customers') ... ``` +### Storage info + +```lua +-- Get storages status +local result, err = crud.storage_info(opts) +``` + +where: + +* `opts`: + * `timeout` (`?number`) - maximum time (in seconds, default: 2) to wait for response from + cluster instances. + +Returns storages status table by instance UUID or nil with error. Status table fields: +* `status` contains a string representing the status: + * `"running"` - storage is initialized and running. + * `"uninitialized"` - storage is not initialized or disabled. + * `"error"` - error getting the status from a storage. Connection error, for example. +* `is_master` is `true` if an instance is a master, `false` - otherwise. +* `message` is `nil` unless a problem occurs with getting storage status. + + +**Example:** + +```lua +crud.storage_info() +``` +``` +--- +- fe1b5bd9-42d4-4955-816c-3aa015e0eb81: + status: running + is_master: true + a1eefe51-9869-4c4c-9676-76431b08c97a: + status: running + is_master: true + 777415f4-d656-440e-8834-7124b7267b6d: + status: uninitialized + is_master: false + e1b2e202-b0f7-49cd-b0a2-6b3a584f995e: + status: error + message: 'connect, called on fd 36, aka 127.0.0.1:49762: Connection refused' + is_master: false +... +``` + ### Count `CRUD` supports multi-conditional count, treating a cluster as a single space. @@ -1240,7 +1286,7 @@ returns. `count` is the total requests count since instance start or stats restart. `time` is the total time of requests execution. `latency_average` is `time` / `count`. `latency_quantile_recent` is the 0.99 quantile of request execution -time for a recent period (see +time for a recent period (see [`metrics` summary API](https://www.tarantool.io/ru/doc/latest/book/monitoring/api_reference/#summary)). It is computed only if `metrics` driver is used and quantiles are enabled. `latency_quantile_recent` value may be `-nan` if there diff --git a/crud.lua b/crud.lua index 05d36ec4..b190b630 100644 --- a/crud.lua +++ b/crud.lua @@ -138,6 +138,10 @@ crud.stats = stats.get -- @function reset_stats crud.reset_stats = stats.reset +-- @refer utils.storage_info +-- @function storage_info +crud.storage_info = utils.storage_info + --- Initializes crud on node -- -- Exports all functions that are used for calls @@ -165,6 +169,8 @@ function crud.init_storage() count.init() borders.init() sharding_metadata.init() + + _G._crud.storage_info_on_storage = utils.storage_info_on_storage end function crud.init_router() diff --git a/crud/common/call.lua b/crud/common/call.lua index a4659e4a..a89c414b 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -5,6 +5,7 @@ local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') local fiber_clock = require('fiber').clock +local const = require('crud.common.const') local BaseIterator = require('crud.common.map_call_cases.base_iter') local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') @@ -13,8 +14,6 @@ local CallError = errors.new_class('CallError') local call = {} -call.DEFAULT_VSHARD_CALL_TIMEOUT = 2 - function call.get_vshard_call_name(mode, prefer_replica, balance) dev_checks('string', '?boolean', '?boolean') @@ -84,7 +83,7 @@ function call.map(func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local iter = opts.iter if iter == nil then @@ -149,7 +148,7 @@ function call.single(bucket_id, func_name, func_args, opts) return nil, err end - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, { timeout = timeout, @@ -171,7 +170,7 @@ function call.any(func_name, func_args, opts) timeout = '?number', }) - local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local replicasets, err = vshard.router.routeall() if replicasets == nil then diff --git a/crud/common/const.lua b/crud/common/const.lua index d452e1bf..7a13cf91 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1 const.NEED_SCHEMA_RELOAD = 0x0001000 const.NEED_SHARDING_RELOAD = 0x0001001 +const.DEFAULT_VSHARD_CALL_TIMEOUT = 2 + return const diff --git a/crud/common/state.lua b/crud/common/state.lua deleted file mode 100644 index 940739a7..00000000 --- a/crud/common/state.lua +++ /dev/null @@ -1,23 +0,0 @@ -local uuid = require('uuid') - -local state = {} - -local states = {} - -function state.get(id) - id = id or uuid.str() - local op_state = states[id] - if op_state == nil then - op_state = { id = id } - states[id] = op_state - end - return op_state -end - -function state.clear(id) - assert(id ~= nil) - states[id] = nil - return true -end - -return state diff --git a/crud/common/utils.lua b/crud/common/utils.lua index 5b3614b2..9153c3df 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -3,6 +3,7 @@ local ffi = require('ffi') local vshard = require('vshard') local fun = require('fun') local bit = require('bit') +local log = require('log') local const = require('crud.common.const') local schema = require('crud.common.schema') @@ -15,6 +16,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false}) local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false}) local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false}) local NotInitializedError = errors.new_class('NotInitialized') +local StorageInfoError = errors.new_class('StorageInfoError') +local fiber_clock = require('fiber').clock local utils = {} @@ -748,4 +751,87 @@ function utils.list_slice(list, start_index, end_index) return slice end +--- Polls replicas for storage state +-- +-- @function storage_info +-- +-- @tparam ?number opts.timeout +-- Function call timeout +-- +-- @return a table of storage states by replica uuid. +function utils.storage_info(opts) + local replicasets, err = vshard.router.routeall() + if replicasets == nil then + return nil, StorageInfoError:new("Failed to get all replicasets: %s", err.err) + end + + opts = opts or {} + + local futures_by_replicas = {} + local replica_state_by_uuid = {} + local async_opts = {is_async = true} + local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT + + for _, replicaset in pairs(replicasets) do + for replica_uuid, replica in pairs(replicaset.replicas) do + replica_state_by_uuid[replica_uuid] = { + status = "error", + is_master = replicaset.master == replica + } + local ok, res = pcall(replica.conn.call, replica.conn, "_crud.storage_info_on_storage", + {}, async_opts) + if ok then + futures_by_replicas[replica_uuid] = res + else + local err_msg = string.format("Error getting storage info for %s", replica_uuid) + if res ~= nil then + log.error("%s: %s", err_msg, res) + replica_state_by_uuid[replica_uuid].message = tostring(res) + else + log.error(err_msg) + replica_state_by_uuid[replica_uuid].message = err_msg + end + end + end + end + + local deadline = fiber_clock() + timeout + for replica_uuid, future in pairs(futures_by_replicas) do + local wait_timeout = deadline - fiber_clock() + if wait_timeout < 0 then + wait_timeout = 0 + end + + local result, err = future:wait_result(wait_timeout) + if result == nil then + future:discard() + local err_msg = string.format("Error getting storage info for %s", replica_uuid) + if err ~= nil then + if err.type == 'ClientError' and err.code == box.error.NO_SUCH_PROC then + replica_state_by_uuid[replica_uuid].status = "uninitialized" + else + log.error("%s: %s", err_msg, err) + replica_state_by_uuid[replica_uuid].message = tostring(err) + end + else + log.error(err_msg) + replica_state_by_uuid[replica_uuid].message = err_msg + end + else + replica_state_by_uuid[replica_uuid].status = result[1].status or "uninitialized" + end + end + + return replica_state_by_uuid +end + +--- Storage status information. +-- +-- @function storage_info_on_storage +-- +-- @return a table with storage status. +function utils.storage_info_on_storage() + return {status = "running"} +end + return utils diff --git a/test/integration/storages_state_test.lua b/test/integration/storages_state_test.lua new file mode 100644 index 00000000..015fd664 --- /dev/null +++ b/test/integration/storages_state_test.lua @@ -0,0 +1,177 @@ +local fio = require('fio') + +local t = require('luatest') + +local helpers = require('test.helper') + +local fiber = require("fiber") + +local pgroup = t.group('storage_info', { + {engine = 'memtx'} +}) + +-- Waits for storages to initialize. +-- This is a workaround for "peer closed" errors for some connections right after the cluster start. +-- Retry is required to give a small timeout to reconnect. +local function wait_storages_init(g) + local storages_initialized = false + local attempts_left = 5 + local wait_for_init_timeout = 1 + while (attempts_left > 0 and not storages_initialized) do + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storage status") + storages_initialized = true + local count = 0 + for _, v in pairs(results) do + count = count + 1 + if v.status ~= "running" then + storages_initialized = false + end + end + if count ~= 4 then -- Make sure the results count is equal to the cluster instances count. + return false + end + if not storages_initialized then + fiber.sleep(wait_for_init_timeout) + attempts_left = attempts_left - 1 + end + end + return storages_initialized +end + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_select'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() +end) + +pgroup.before_each(function(g) + t.assert_equals(wait_storages_init(g), true) +end) + +pgroup.after_all(function(g) + helpers.stop_cluster(g.cluster) + fio.rmtree(g.cluster.datadir) +end) + +pgroup.test_crud_storage_status_of_stopped_servers = function(g) + g.cluster:server("s2-replica"):stop() + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "running", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "error", + is_master = false, + message = "Peer closed" + } + }) +end + +pgroup.after_test('test_crud_storage_status_of_stopped_servers', function(g) + g.cluster:server("s2-replica"):start() +end) + +pgroup.test_disabled_storage_role = function(g) + -- stop crud storage role on one replica + local server = g.cluster:server("s1-replica") + local results = server.net_box:eval([[ + local serviceregistry = require("cartridge.service-registry") + serviceregistry.get("crud-storage").stop() + return true + ]]) + + t.assert_not_equals(results, nil, "Failed to disable storage role") + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "uninitialized", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "running", + is_master = false + } + }) +end + +pgroup.after_test('test_disabled_storage_role', function(g) + g.cluster:server("s1-replica").net_box:eval([[ + local serviceregistry = require("cartridge.service-registry") + serviceregistry.get("crud-storage").init() + return true + ]]) +end) + +pgroup.test_storage_call_failure = function(g) + -- stop crud storage role on one replica + local server = g.cluster:server("s2-replica") + local results = server.net_box:eval([[ + _G.saved_storage_info_on_storage = _crud.storage_info_on_storage + _crud.storage_info_on_storage = {} + return true + ]]) + + t.assert_not_equals(results, nil, "Eval failed") + + local results, err = g.cluster.main_server.net_box:call("crud.storage_info", {}) + t.assert_equals(err, nil, "Error getting storages info") + + t.assert_equals(results, { + [helpers.uuid('b', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('b', 2)] = { + status = "running", + is_master = false + }, + [helpers.uuid('c', 1)] = { + status = "running", + is_master = true + }, + [helpers.uuid('c', 2)] = { + status = "error", + is_master = false, + message = "attempt to call a table value" + } + }) +end + +pgroup.after_test('test_storage_call_failure', function(g) + g.cluster:server("s2-replica").net_box:eval([[ + _crud.storage_info_on_storage = _G.saved_storage_info_on_storage + _G.saved_storage_info_on_storage = nil + return true + ]]) +end)