Skip to content

Commit

Permalink
Provide an API to get storages initialization state
Browse files Browse the repository at this point in the history
There is an issue with using CRUD functionality if not all storages are
up. New function is added to get the information about storages
state: initialized or not. So, a user can poll state and wait for
storages to be initialized before making CRUD calls.

Resolves #229
  • Loading branch information
psergee committed Jun 22, 2022
1 parent 8e00652 commit 3e2b158
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
* `crud.get_storages_state` function to get storages initialization state (#229).

### Changed

Expand Down
6 changes: 6 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ crud.stats = stats.get
-- @function reset_stats
crud.reset_stats = stats.reset

-- @refer utils.get_storages_state
-- @function get_storages_state
crud.get_storages_state = utils.get_storages_state

--- Initializes crud on node
--
-- Exports all functions that are used for calls
Expand All @@ -135,6 +139,8 @@ function crud.init_storage()
count.init()
borders.init()
sharding_metadata.init()

_G._crud.is_init = true
end

function crud.init_router()
Expand Down
9 changes: 4 additions & 5 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock
local const = require('crud.common.const')

local CallError = errors.new_class('CallError')

local call = {}

call.DEFAULT_VSHARD_CALL_TIMEOUT = 2

function call.get_vshard_call_name(mode, prefer_replica, balance)
dev_checks('string', '?boolean', '?boolean')

Expand Down Expand Up @@ -79,7 +78,7 @@ function call.map(func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err
if opts.replicasets ~= nil then
Expand Down Expand Up @@ -134,7 +133,7 @@ function call.single(bucket_id, func_name, func_args, opts)
return nil, err
end

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, {
timeout = timeout,
Expand All @@ -156,7 +155,7 @@ function call.any(func_name, func_args, opts)
timeout = '?number',
})

local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err = vshard.router.routeall()
if replicasets == nil then
Expand Down
2 changes: 2 additions & 0 deletions crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ const.SHARDING_RELOAD_RETRIES_NUM = 1
const.NEED_SCHEMA_RELOAD = 0x0001000
const.NEED_SHARDING_RELOAD = 0x0001001

const.DEFAULT_VSHARD_CALL_TIMEOUT = 2

return const
53 changes: 53 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ local ShardingError = errors.new_class('ShardingError', {capture_stack = false})
local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_stack = false})
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
local NotInitializedError = errors.new_class('NotInitialized')
local GetReplicaStateError = errors.new_class('GetStorageStateError')
local fiber_clock = require('fiber').clock

local utils = {}

Expand Down Expand Up @@ -711,4 +713,55 @@ function utils.update_storage_call_error_description(err, func_name, replicaset_
return err
end

--- Polls replicas for storage state
--
-- @function get_storages_state
--
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @return a table of storage states by replica uuid.
function utils.get_storages_state(opts)
local replicasets, err = vshard.router.routeall()
if replicasets == nil then
return nil, GetReplicaStateError:new("Failed to get all replicasets: %s", err.err)
end

opts = opts or {}

local futures_by_replicas = {}
local replicas_state = {}
local async_opts = {is_async = true}
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

for _, replicasets in pairs(replicasets) do
for replica_uuid, replica in pairs(replicasets.replicas) do
replicas_state[replica_uuid] = {name = replica.name, storage_initialized = false}
local ok, future = pcall(replica.conn.eval, replica.conn,
[[return rawget(rawget(_G, "_crud") or {}, "is_init") ~= nil]],
{}, async_opts)
if ok then
futures_by_replicas[replica_uuid] = future
end
end
end

local deadline = fiber_clock() + timeout
for replica_uuid, future in pairs(futures_by_replicas) do
local wait_timeout = deadline - fiber_clock()
if wait_timeout < 0 then
wait_timeout = 0
end

local result = future:wait_result(wait_timeout)
if result then
replicas_state[replica_uuid].storage_initialized = result[1]
else
future:discard()
end
end

return replicas_state
end

return utils
89 changes: 89 additions & 0 deletions test/integration/storages_state_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
local fio = require('fio')

local t = require('luatest')

local helpers = require('test.helper')

local fiber = require("fiber")

local pgroup = t.group('replicas_state', {
{engine = 'memtx'}
})

local all_storages_initialized = false

local function wait_storages_init(g)
local storages_initialized = false
local attempts_left = 5
local wait_for_init_timeout = 1
while (attempts_left > 0 and not storages_initialized) do
local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {})
t.assert_equals(err, nil, "Error getting storags states")
storages_initialized = true
for _,v in pairs(results) do
if not v.storage_initialized then
storages_initialized = false
end
end
if not storages_initialized then
fiber.sleep(wait_for_init_timeout)
attempts_left = attempts_left-1
end
end
return storages_initialized
end

pgroup.before_all(function(g)
g.cluster = helpers.Cluster:new({
datadir = fio.tempdir(),
server_command = helpers.entrypoint('srv_select'),
use_vshard = true,
replicasets = helpers.get_test_replicasets(),
env = {
['ENGINE'] = g.params.engine,
},
})
g.cluster:start()

-- wait for storages to initialize
all_storages_initialized = wait_storages_init(g)
end)

pgroup.after_all(function(g)
helpers.stop_cluster(g.cluster)
fio.rmtree(g.cluster.datadir)
end)

pgroup.test_crud_storage_status_of_stopped_servers = function(g)
t.assert_equals(all_storages_initialized, true)

g.cluster:server("s2-replica"):stop()

local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {})
t.assert_equals(err, nil, "Error getting storags states")

t.assert_equals(results[helpers.uuid('b', 1)].storage_initialized, true)
t.assert_equals(results[helpers.uuid('c', 1)].storage_initialized, true)
t.assert_equals(results[helpers.uuid('c', 2)].storage_initialized, false)
end

pgroup.test_disabled_storage_role = function(g)
t.assert_equals(all_storages_initialized, true)

-- stop crud storage role on one replica
local server = g.cluster:server("s1-replica")
local results = server.net_box:eval([[
local serviceregistry = require("cartridge.service-registry")
serviceregistry.get("crud-storage").stop()
return true
]])

t.assert_not_equals(results, nil, "Fail to disable storage role")

local results, err = g.cluster.main_server.net_box:call("crud.get_storages_state", {})
t.assert_equals(err, nil, "Error getting storags states")

t.assert_equals(results[helpers.uuid('b', 1)].storage_initialized, true)
t.assert_equals(results[helpers.uuid('b', 2)].storage_initialized, false)
t.assert_equals(results[helpers.uuid('c', 1)].storage_initialized, true)
end

0 comments on commit 3e2b158

Please sign in to comment.