Skip to content

Commit

Permalink
internal: use single router object
Browse files Browse the repository at this point in the history
This patch is the groundwork for vshard groups and custom routers
support.

After this patch, vshard router object is retrieved only in the single
point of a request. (Except for name resolving in statistics.)

Test runs have shown that this patch do not affects the performance of
crud requests.

Part of #44
  • Loading branch information
DifferentialOrange committed Aug 26, 2022
1 parent 40ffd8b commit abb879e
Show file tree
Hide file tree
Showing 32 changed files with 306 additions and 221 deletions.
13 changes: 7 additions & 6 deletions crud/borders.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,14 @@ else
end
end

local function call_get_border_on_router(border_name, space_name, index_name, opts)
checks('string', 'string', '?string|number', {
local function call_get_border_on_router(vshard_router, border_name, space_name, index_name, opts)
checks('table', 'string', 'string', '?string|number', {
timeout = '?number',
fields = '?table',
})

opts = opts or {}

local vshard_router = vshard.router.static
local replicasets = vshard_router:routeall()
local space = utils.get_space(space_name, replicasets)
if space == nil then
Expand Down Expand Up @@ -105,7 +104,7 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
replicasets = replicasets,
timeout = opts.timeout,
}
local results, err = call.map(
local results, err = call.map(vshard_router,
STAT_FUNC_NAME,
{border_name, space_name, index.id, field_names},
call_opts
Expand Down Expand Up @@ -161,8 +160,10 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
end

local function get_border(border_name, space_name, index_name, opts)
return schema.wrap_func_reload(
call_get_border_on_router, border_name, space_name, index_name, opts
local vshard_router = vshard.router.static

return schema.wrap_func_reload(vshard_router, call_get_border_on_router,
border_name, space_name, index_name, opts
)
end

Expand Down
30 changes: 15 additions & 15 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
local vshard = require('vshard')
local errors = require('errors')

local dev_checks = require('crud.common.dev_checks')
Expand Down Expand Up @@ -41,14 +40,13 @@ function call.get_vshard_call_name(mode, prefer_replica, balance)
return 'callbre'
end

local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, bucket_id)
-- Do not rewrite ShardingHashMismatchError class.
if err.class_name == sharding_utils.ShardingHashMismatchError.name then
return errors.wrap(err)
end

if replicaset_uuid == nil then
local vshard_router = vshard.router.static
local replicaset, _ = vshard_router:route(bucket_id)
if replicaset == nil then
return CallError:new(
Expand All @@ -67,8 +65,8 @@ local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
))
end

function call.map(func_name, func_args, opts)
dev_checks('string', '?table', {
function call.map(vshard_router, func_name, func_args, opts)
dev_checks('table', 'string', '?table', {
mode = 'string',
prefer_replica = '?boolean',
balance = '?boolean',
Expand All @@ -88,7 +86,11 @@ function call.map(func_name, func_args, opts)

local iter = opts.iter
if iter == nil then
iter, err = BaseIterator:new({func_args = func_args, replicasets = opts.replicasets})
iter, err = BaseIterator:new({
func_args = func_args,
replicasets = opts.replicasets,
vshard_router = vshard_router,
})
if err ~= nil then
return nil, err
end
Expand Down Expand Up @@ -136,8 +138,8 @@ function call.map(func_name, func_args, opts)
return postprocessor:get()
end

function call.single(bucket_id, func_name, func_args, opts)
dev_checks('number', 'string', '?table', {
function call.single(vshard_router, bucket_id, func_name, func_args, opts)
dev_checks('table', 'number', 'string', '?table', {
mode = 'string',
prefer_replica = '?boolean',
balance = '?boolean',
Expand All @@ -151,13 +153,12 @@ function call.single(bucket_id, func_name, func_args, opts)

local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local vshard_router = vshard.router.static
local res, err = vshard_router[vshard_call_name](vshard_router, bucket_id, func_name, func_args, {
timeout = timeout,
})

if err ~= nil then
return nil, wrap_vshard_err(err, func_name, nil, bucket_id)
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
end

if res == box.NULL then
Expand All @@ -167,25 +168,24 @@ function call.single(bucket_id, func_name, func_args, opts)
return res
end

function call.any(func_name, func_args, opts)
dev_checks('string', '?table', {
function call.any(vshard_router, func_name, func_args, opts)
dev_checks('table', 'string', '?table', {
timeout = '?number',
})

local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local vshard_router = vshard.router.static
local replicasets, err = vshard_router:routeall()
if replicasets == nil then
return nil, CallError:new("Failed to get all replicasets: %s", err.err)
return nil, CallError:new("Failed to get router replicasets: %s", err.err)
end
local replicaset = select(2, next(replicasets))

local res, err = replicaset:call(func_name, func_args, {
timeout = timeout,
})
if err ~= nil then
return nil, wrap_vshard_err(err, func_name, replicaset.uuid)
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset.uuid)
end

if res == box.NULL then
Expand Down
5 changes: 2 additions & 3 deletions crud/common/map_call_cases/base_iter.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
local errors = require('errors')
local vshard = require('vshard')

local dev_checks = require('crud.common.dev_checks')
local GetReplicasetsError = errors.new_class('GetReplicasetsError')
Expand All @@ -24,14 +23,14 @@ function BaseIterator:new(opts)
dev_checks('table', {
func_args = '?table',
replicasets = '?table',
vshard_router = 'table',
})

local replicasets, err
if opts.replicasets ~= nil then
replicasets = opts.replicasets
else
local vshard_router = vshard.router.static
replicasets, err = vshard_router:routeall()
replicasets, err = opts.vshard_router:routeall()
if err ~= nil then
return nil, GetReplicasetsError:new("Failed to get all replicasets: %s", err.err)
end
Expand Down
7 changes: 4 additions & 3 deletions crud/common/map_call_cases/base_postprocessor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ local BasePostprocessor = {}
-- @function new
--
-- @return[1] table postprocessor
function BasePostprocessor:new()
function BasePostprocessor:new(vshard_router)
local postprocessor = {
results = {},
early_exit = false,
errs = nil
errs = nil,
vshard_router = vshard_router,
}

setmetatable(postprocessor, self)
Expand Down Expand Up @@ -58,7 +59,7 @@ function BasePostprocessor:collect(result_info, err_info)

if err ~= nil then
self.results = nil
self.errs = err_info.err_wrapper(err, unpack(err_info.wrapper_args))
self.errs = err_info.err_wrapper(self.vshard_router, err, unpack(err_info.wrapper_args))
self.early_exit = true

return self.early_exit
Expand Down
3 changes: 2 additions & 1 deletion crud/common/map_call_cases/batch_insert_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ function BatchInsertIterator:new(opts)
tuples = 'table',
space = 'table',
execute_on_storage_opts = 'table',
vshard_router = 'table',
})

local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space)
local sharding_data, err = sharding.split_tuples_by_replicaset(opts.vshard_router, opts.tuples, opts.space)
if err ~= nil then
return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err)
end
Expand Down
2 changes: 1 addition & 1 deletion crud/common/map_call_cases/batch_postprocessor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function BatchPostprocessor:collect(result_info, err_info)
err_to_wrap = err.err
end

local err_obj = err_info.err_wrapper(err_to_wrap, unpack(err_info.wrapper_args))
local err_obj = err_info.err_wrapper(self.vshard_router, err_to_wrap, unpack(err_info.wrapper_args))
err_obj.operation_data = err.operation_data
err_obj.space_schema_hash = err.space_schema_hash

Expand Down
10 changes: 7 additions & 3 deletions crud/common/map_call_cases/batch_upsert_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ function BatchUpsertIterator:new(opts)
space = 'table',
operations = 'table',
execute_on_storage_opts = 'table',
vshard_router = 'table',
})

local sharding_data, err = sharding.split_tuples_by_replicaset(opts.tuples, opts.space, {
operations = opts.operations,
})
local sharding_data, err = sharding.split_tuples_by_replicaset(
opts.vshard_router,
opts.tuples,
opts.space,
{operations = opts.operations})

if err ~= nil then
return nil, SplitTuplesError:new("Failed to split tuples by replicaset: %s", err.err)
end
Expand Down
6 changes: 2 additions & 4 deletions crud/common/schema.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
local fiber = require('fiber')
local msgpack = require('msgpack')
local digest = require('digest')
local vshard = require('vshard')
local errors = require('errors')
local log = require('log')

Expand Down Expand Up @@ -86,18 +85,17 @@ end
-- This wrapper is used for functions that can fail if router uses outdated
-- space schema. In case of such errors these functions returns `need_reload`
-- for schema-dependent errors.
function schema.wrap_func_reload(func, ...)
function schema.wrap_func_reload(vshard_router, func, ...)
local i = 0

local res, err, need_reload
while true do
res, err, need_reload = func(...)
res, err, need_reload = func(vshard_router, ...)

if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then
break
end

local vshard_router = vshard.router.static
local ok, reload_schema_err = reload_schema(vshard_router)
if not ok then
log.warn("Failed to reload schema: %s", reload_schema_err)
Expand Down
35 changes: 15 additions & 20 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
local vshard = require('vshard')
local errors = require('errors')

local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
Expand All @@ -13,8 +12,7 @@ local sharding_utils = require('crud.common.sharding.utils')

local sharding = {}

function sharding.get_replicasets_by_bucket_id(bucket_id)
local vshard_router = vshard.router.static
function sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id)
local replicaset, err = vshard_router:route(bucket_id)
if replicaset == nil then
return nil, GetReplicasetsError:new("Failed to get replicaset for bucket_id %s: %s", bucket_id, err.err)
Expand All @@ -25,14 +23,13 @@ function sharding.get_replicasets_by_bucket_id(bucket_id)
}
end

function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
dev_checks('string', '?', '?number|cdata')
function sharding.key_get_bucket_id(vshard_router, space_name, key, specified_bucket_id)
dev_checks('table', 'string', '?', '?number|cdata')

if specified_bucket_id ~= nil then
return { bucket_id = specified_bucket_id }
end

local vshard_router = vshard.router.static
local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name)
if err ~= nil then
return nil, err
Expand All @@ -48,13 +45,12 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
return { bucket_id = vshard_router:bucket_id_strcrc32(key) }
end

function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
function sharding.tuple_get_bucket_id(vshard_router, tuple, space, specified_bucket_id)
if specified_bucket_id ~= nil then
return { bucket_id = specified_bucket_id }
end

local sharding_index_parts = space.index[0].parts
local vshard_router = vshard.router.static
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space.name)
if err ~= nil then
return nil, err
Expand All @@ -64,7 +60,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end
local key = utils.extract_key(tuple, sharding_index_parts)

local bucket_id_data, err = sharding.key_get_bucket_id(space.name, key, nil)
local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space.name, key, nil)
if err ~= nil then
return nil, err
end
Expand All @@ -76,7 +72,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
}
end

function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
function sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space, specified_bucket_id)
local bucket_id_fieldno, err = utils.get_bucket_id_fieldno(space)
if err ~= nil then
return nil, BucketIDError:new("Failed to get bucket ID fieldno: %s", err)
Expand All @@ -98,7 +94,7 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
local sharding_data = { bucket_id = tuple[bucket_id_fieldno] }

if sharding_data.bucket_id == nil then
sharding_data, err = sharding.tuple_get_bucket_id(tuple, space)
sharding_data, err = sharding.tuple_get_bucket_id(vshard_router, tuple, space)
if err ~= nil then
return nil, err
end
Expand Down Expand Up @@ -144,18 +140,18 @@ function sharding.batching_result_needs_sharding_reload(errs, tuples_count)
return sharding_errs_count == tuples_count
end

function sharding.wrap_method(method, space_name, ...)
function sharding.wrap_method(vshard_router, method, space_name, ...)
local i = 0

local res, err, need_reload
while true do
res, err, need_reload = method(space_name, ...)
res, err, need_reload = method(vshard_router, space_name, ...)

if err == nil or need_reload ~= const.NEED_SHARDING_RELOAD then
break
end

sharding_metadata_module.reload_sharding_cache(space_name)
sharding_metadata_module.reload_sharding_cache(vshard_router, space_name)

i = i + 1

Expand All @@ -169,12 +165,12 @@ end

-- This wrapper assumes reload is performed inside the method and
-- expect ShardingHashMismatchError error to be thrown.
function sharding.wrap_select_method(method, space_name, ...)
function sharding.wrap_select_method(vshard_router, method, space_name, ...)
local i = 0

local ok, res, err
while true do
ok, res, err = pcall(method, space_name, ...)
ok, res, err = pcall(method, vshard_router, space_name, ...)

if ok == true then
break
Expand Down Expand Up @@ -212,8 +208,8 @@ end
-- @return[1] batches
-- Map where key is a replicaset and value
-- is table of tuples related to this replicaset
function sharding.split_tuples_by_replicaset(tuples, space, opts)
dev_checks('table', 'table', {
function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts)
dev_checks('table', 'table', 'table', {
operations = '?table',
})

Expand All @@ -227,7 +223,7 @@ function sharding.split_tuples_by_replicaset(tuples, space, opts)
local sharding_data
local err
for i, tuple in ipairs(tuples) do
sharding_data, err = sharding.tuple_set_and_return_bucket_id(tuple, space)
sharding_data, err = sharding.tuple_set_and_return_bucket_id(vshard_router, tuple, space)
if err ~= nil then
return nil, BucketIDError:new("Failed to get bucket ID: %s", err)
end
Expand All @@ -244,7 +240,6 @@ function sharding.split_tuples_by_replicaset(tuples, space, opts)
skip_sharding_hash_check = false
end

local vshard_router = vshard.router.static
local replicaset, err = vshard_router:route(sharding_data.bucket_id)
if replicaset == nil then
return nil, GetReplicasetsError:new(
Expand Down
Loading

0 comments on commit abb879e

Please sign in to comment.