Skip to content

Commit

Permalink
feature: master presence timout for get space
Browse files Browse the repository at this point in the history
Added timeout condition for the validation of master presence in
replicaset and for the master connection to the `utils.get_space`
method.

Closes #95
  • Loading branch information
GRISHNOV committed Jan 27, 2023
1 parent 73bf5bf commit 00fcbfb
Show file tree
Hide file tree
Showing 19 changed files with 82 additions and 54 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
* Added timeout condition for the validation of master presence in
replicaset and for the master connection (#95).

### Changed

Expand Down
33 changes: 22 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ where:
* `space_name` (`string`) - name of the space to insert an object
* `tuple` / `object` (`table`) - tuple/object to insert
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
Expand Down Expand Up @@ -263,7 +264,8 @@ where:
* `space_name` (`string`) - name of the space to insert an object
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `fields` (`?table`) - field names for getting only a subset of fields
* `stop_on_error` (`?boolean`) - stop on a first error and report error
regarding the failed operation and error about what tuples were not
Expand Down Expand Up @@ -411,7 +413,8 @@ where:
* `opts`:
* `fields` (`?table`) - field names for getting only a subset of fields
* `bucket_id` (`?number|cdata`) - bucket ID
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `mode` (`?string`, `read` or `write`) - if `write` is specified then `get` is
performed on master, default value is `read`
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
Expand Down Expand Up @@ -450,7 +453,8 @@ where:
* `key` (`any`) - primary key value
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/)
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
Expand Down Expand Up @@ -485,7 +489,8 @@ where:
* `space_name` (`string`) - name of the space
* `key` (`any`) - primary key value
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
Expand Down Expand Up @@ -522,7 +527,8 @@ where:
* `space_name` (`string`) - name of the space
* `tuple` / `object` (`table`) - tuple/object to insert or replace exist one
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
Expand Down Expand Up @@ -581,7 +587,8 @@ where:
* `space_name` (`string`) - name of the space to insert/replace an object
* `tuples` / `objects` (`table`) - array of tuples/objects to insert
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `fields` (`?table`) - field names for getting only a subset of fields
* `stop_on_error` (`?boolean`) - stop on a first error and report error
regarding the failed operation and error about what tuples were not
Expand Down Expand Up @@ -730,7 +737,8 @@ where:
* `tuple` / `object` (`table`) - tuple/object to insert if there is no existing tuple which matches the key fields
* `operations` (`table`) - update [operations](https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_space/update/) if there is an existing tuple which matches the key fields of tuple
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
Expand Down Expand Up @@ -785,7 +793,8 @@ where:
if there is tuple with duplicate key then existing tuple will
be updated with update operations
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `fields` (`?table`) - field names for getting only a subset of fields
* `stop_on_error` (`?boolean`) - stop on a first error and report error
regarding the failed operation and error about what tuples were not
Expand Down Expand Up @@ -1108,7 +1117,8 @@ where:

* `space_name` (`string`) - name of the space
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds)
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster
Expand Down Expand Up @@ -1220,7 +1230,8 @@ where:
* `opts`:
* `yield_every` (`?number`) - number of tuples processed to yield after,
`yield_every` should be > 0, default value is 100
* `timeout` (`?number`) - `vshard.call` timeout (in seconds), default value is 2
* `timeout` (`?number`) - `vshard.call` timeout and vshard master
discovery timeout (in seconds), default value is 2
* `bucket_id` (`?number|cdata`) - bucket ID
* `force_map_call` (`?boolean`) - if `true`
then the map call is performed without any optimizations even,
Expand Down
7 changes: 5 additions & 2 deletions crud/borders.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ local function call_get_border_on_router(vshard_router, border_name, space_name,
vshard_router = '?string|table',
})

local replicasets = vshard_router:routeall()
local space, err = utils.get_space(space_name, replicasets)
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, BorderError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand All @@ -100,6 +99,10 @@ local function call_get_border_on_router(vshard_router, border_name, space_name,
local cmp_key_parts = utils.merge_primary_key_parts(index.parts, primary_index.parts)
local field_names = utils.enrich_field_names_with_cmp_key(opts.fields, cmp_key_parts, space:format())

local replicasets, err = vshard_router:routeall()
if err ~= nil then
return nil, BorderError:new("Failed to get router replicasets: %s", err)
end
local call_opts = {
mode = 'read',
replicasets = replicasets,
Expand Down
34 changes: 26 additions & 8 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack =
local NotInitializedError = errors.new_class('NotInitialized')
local StorageInfoError = errors.new_class('StorageInfoError')
local VshardRouterError = errors.new_class('VshardRouterError', {capture_stack = false})
local fiber_clock = require('fiber').clock
local fiber = require('fiber')

local utils = {}

Expand Down Expand Up @@ -96,8 +96,25 @@ function utils.format_replicaset_error(replicaset_uuid, msg, ...)
)
end

function utils.get_space(space_name, replicasets)
local replicaset = select(2, next(replicasets))
function utils.get_space(space_name, vshard_router, timeout)
local replicasets, replicaset
timeout = timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
local deadline = fiber.clock() + timeout
while (
-- Break if the deadline condition is exceeded.
-- Handling for deadline errors are below in the code.
fiber.clock() < deadline
) do
-- Try to get master with timeout.
fiber.yield()
replicasets = vshard_router:routeall()
replicaset = select(2, next(replicasets))
if replicaset ~= nil and
replicaset.master ~= nil and
replicaset.master.conn.error == nil then
break
end
end

if replicaset == nil then
return nil, GetSpaceError:new(
Expand All @@ -119,13 +136,14 @@ function utils.get_space(space_name, replicasets)
replicaset.uuid, replicaset.master.conn.error)
return nil, GetSpaceError:new(error_msg)
end

local space = replicaset.master.conn.space[space_name]

return space
end

function utils.get_space_format(space_name, replicasets)
local space, err = utils.get_space(space_name, replicasets)
function utils.get_space_format(space_name, vshard_router)
local space, err = utils.get_space(space_name, vshard_router)
if err ~= nil then
return nil, GetSpaceFormatError:new("An error occurred during the operation: %s", err)
end
Expand Down Expand Up @@ -664,7 +682,7 @@ function utils.cut_rows(rows, metadata, field_names)
end

local function flatten_obj(vshard_router, space_name, obj, skip_nullability_check)
local space_format, err = utils.get_space_format(space_name, vshard_router:routeall())
local space_format, err = utils.get_space_format(space_name, vshard_router)
if err ~= nil then
return nil, FlattenError:new("Failed to get space format: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down Expand Up @@ -835,9 +853,9 @@ function utils.storage_info(opts)
end
end

local deadline = fiber_clock() + timeout
local deadline = fiber.clock() + timeout
for replica_uuid, future in pairs(futures_by_replicas) do
local wait_timeout = deadline - fiber_clock()
local wait_timeout = deadline - fiber.clock()
if wait_timeout < 0 then
wait_timeout = 0
end
Expand Down
12 changes: 5 additions & 7 deletions crud/count.lua
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,7 @@ local function call_count_on_router(vshard_router, space_name, user_conditions,
return nil, CountError:new("Failed to parse conditions: %s", err)
end

local replicasets, err = vshard_router:routeall()
if err ~= nil then
return nil, CountError:new("Failed to get router replicasets: %s", err)
end

local space, err = utils.get_space(space_name, replicasets)
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, CountError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down Expand Up @@ -169,7 +164,10 @@ local function call_count_on_router(vshard_router, space_name, user_conditions,
check_count_safety(space_name, plan, opts)

-- set replicasets to count from
local replicasets_to_count = replicasets
local replicasets_to_count, err = vshard_router:routeall()
if err ~= nil then
return nil, CountError:new("Failed to get router replicasets: %s", err)
end

-- Whether to call one storage replicaset or perform
-- map-reduce?
Expand Down
2 changes: 1 addition & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ local function call_delete_on_router(vshard_router, space_name, key, opts)
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, DeleteError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down
2 changes: 1 addition & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ local function call_get_on_router(vshard_router, space_name, key, opts)
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, GetError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down
2 changes: 1 addition & 1 deletion crud/insert.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ local function call_insert_on_router(vshard_router, space_name, original_tuple,
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, InsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down
2 changes: 1 addition & 1 deletion crud/insert_many.lua
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ local function call_insert_many_on_router(vshard_router, space_name, original_tu
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, {
InsertManyError:new("An error occurred during the operation: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion crud/len.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ function len.call(space_name, opts)
return nil, LenError:new(err)
end

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, LenError:new("An error occurred during the operation: %s", err)
end
Expand Down
2 changes: 1 addition & 1 deletion crud/replace.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ local function call_replace_on_router(vshard_router, space_name, original_tuple,
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, ReplaceError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down
2 changes: 1 addition & 1 deletion crud/replace_many.lua
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ local function call_replace_many_on_router(vshard_router, space_name, original_t
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, {
ReplaceManyError:new("An error occurred during the operation: %s", err)
Expand Down
12 changes: 5 additions & 7 deletions crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
return nil, SelectError:new("Failed to parse conditions: %s", err)
end

local replicasets, err = vshard_router:routeall()
if err ~= nil then
return nil, SelectError:new("Failed to get router replicasets: %s", err)
end

local space, err = utils.get_space(space_name, replicasets)
local space, err = utils.get_space(space_name, vshard_router)
if err ~= nil then
return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down Expand Up @@ -84,7 +79,10 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
end

-- set replicasets to select from
local replicasets_to_select = replicasets
local replicasets_to_select, err = vshard_router:routeall()
if err ~= nil then
return nil, SelectError:new("Failed to get router replicasets: %s", err)
end

-- Whether to call one storage replicaset or perform
-- map-reduce?
Expand Down
12 changes: 5 additions & 7 deletions crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,7 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
return nil, SelectError:new("Failed to parse conditions: %s", err)
end

local replicasets, err = vshard_router:routeall()
if err ~= nil then
return nil, SelectError:new("Failed to get router replicasets: %s", err)
end

local space, err = utils.get_space(space_name, replicasets)
local space, err = utils.get_space(space_name, vshard_router)
if err ~= nil then
return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down Expand Up @@ -151,7 +146,10 @@ local function build_select_iterator(vshard_router, space_name, user_conditions,
end

-- set replicasets to select from
local replicasets_to_select = replicasets
local replicasets_to_select, err = vshard_router:routeall()
if err ~= nil then
return nil, SelectError:new("Failed to get router replicasets: %s", err)
end

-- See explanation of this logic in
-- crud/select/compat/select.lua.
Expand Down
2 changes: 1 addition & 1 deletion crud/stats/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ local function resolve_space_name(space_id)
return nil
end

local space, err = utils.get_space(space_id, replicasets)
local space, err = utils.get_space(space_id, vshard_router)
if err ~= nil then
log.warn("An error occurred during getting space: %s", err)
return nil
Expand Down
2 changes: 1 addition & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ local function call_update_on_router(vshard_router, space_name, key, user_operat
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, UpdateError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down
2 changes: 1 addition & 1 deletion crud/upsert.lua
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ local function call_upsert_on_router(vshard_router, space_name, original_tuple,
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, UpsertError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD
end
Expand Down
2 changes: 1 addition & 1 deletion crud/upsert_many.lua
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ local function call_upsert_many_on_router(vshard_router, space_name, original_tu
vshard_router = '?string|table',
})

local space, err = utils.get_space(space_name, vshard_router:routeall())
local space, err = utils.get_space(space_name, vshard_router, opts.timeout)
if err ~= nil then
return nil, {
UpsertManyError:new("An error occurred during the operation: %s", err)
Expand Down
Loading

0 comments on commit 00fcbfb

Please sign in to comment.