Skip to content

Commit

Permalink
api: support vshard groups
Browse files Browse the repository at this point in the history
This patch adds Cartridge vshard groups [1] and non-default vshard
router [2] support to CRUD operations.

To specify Cartridge vshard group, pass vshard group name with
`vshard_router` option of a crud operation.
```lua
crud.select('customers_ddl',
            {{'=', 'age', 41}},
            {vshard_router = 'customers'})
```

To use non-default vshard router, pass vshard router object with
`vshard_router` option of a crud operation.
```lua
local router = vshard.router.new(cfg)

crud.select('customers_ddl',
            {{'=', 'age', 41}},
            {vshard_router = router})
```

If `vshard_router` is not specified, default vshard router is used to
process the request. If default vshard router is not found (for example,
if it is a Cartridge application with vshard groups) and option is not
specified, the error is returned.

1. https://www.tarantool.io/en/doc/latest/book/cartridge/cartridge_dev/#using-multiple-vshard-storage-groups
2. https://www.tarantool.io/ru/doc/latest/reference/reference_rock/vshard/vshard_router/#router-api-new

Closes #44
  • Loading branch information
DifferentialOrange committed Aug 26, 2022
1 parent abb879e commit 91f8e0c
Show file tree
Hide file tree
Showing 24 changed files with 2,258 additions and 77 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added
* `crud.storage_info` function to get storages status (#229).
* Support `vshard_router` option in operations for Cartridge vshard groups
or non-default vshard routers (#44).

### Changed
* Deprecate using space id in `crud.len` (#255).
Expand Down
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array contains one inserted row, error.

Expand Down Expand Up @@ -259,6 +262,9 @@ where:
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
rollback on a storage, where the operation is failed, report error
about what tuples were rollback, default is `false`
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array with inserted rows, array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -393,6 +399,9 @@ where:
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
the replicas
* `balance` (`?boolean`) - use replica according to vshard load balancing policy
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array contains one row, error.

Expand Down Expand Up @@ -426,6 +435,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array contains one updated row, error.

Expand Down Expand Up @@ -458,6 +470,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array contains one deleted row (empty for vinyl), error.

Expand Down Expand Up @@ -492,6 +507,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns inserted or replaced rows and metadata or nil with error.

Expand Down Expand Up @@ -544,6 +562,9 @@ where:
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
rollback on a storage, where the operation is failed, report error
about what tuples were rollback, default is `false`
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array with inserted/replaced rows, array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -676,6 +697,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and empty array of rows or nil, error.

Expand Down Expand Up @@ -733,6 +757,9 @@ where:
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
rollback on a storage, where the operation is failed, report error
about what tuples were rollback, default is `false`
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -869,6 +896,9 @@ where:
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
the replicas
* `balance` (`?boolean`) - use replica according to vshard load balancing policy
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster


Returns metadata and array of rows, error.
Expand Down Expand Up @@ -1007,6 +1037,9 @@ where:
* `space_name` (`string`) - name of the space
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns true or nil with error.

Expand Down Expand Up @@ -1040,6 +1073,9 @@ where:
* `space_name` (`string`) - name of the space
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns number or nil with error.

Expand Down Expand Up @@ -1089,6 +1125,7 @@ where:
* `opts`:
* `timeout` (`?number`) - maximum time (in seconds, default: 2) to wait for response from
cluster instances.
* `vshard_router` (`?string|table`) - Cartridge vshard group name or vshard router instance.

Returns storages status table by instance UUID or nil with error. Status table fields:
* `status` contains a string representing the status:
Expand Down Expand Up @@ -1158,6 +1195,9 @@ where:
* `balance` (`?boolean`) - use replica according to
[vshard load balancing policy](https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_api/#router-api-call),
default value is `false`
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

```lua
crud.count('customers', {{'==', 'age', 35}})
Expand Down
20 changes: 16 additions & 4 deletions crud/borders.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
Expand Down Expand Up @@ -71,10 +70,9 @@ local function call_get_border_on_router(vshard_router, border_name, space_name,
checks('table', 'string', 'string', '?string|number', {
timeout = '?number',
fields = '?table',
vshard_router = '?string|table',
})

opts = opts or {}

local replicasets = vshard_router:routeall()
local space = utils.get_space(space_name, replicasets)
if space == nil then
Expand Down Expand Up @@ -160,7 +158,11 @@ local function call_get_border_on_router(vshard_router, border_name, space_name,
end

local function get_border(border_name, space_name, index_name, opts)
local vshard_router = vshard.router.static
opts = opts or {}
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
if err ~= nil then
return nil, BorderError:new(err)
end

return schema.wrap_func_reload(vshard_router, call_get_border_on_router,
border_name, space_name, index_name, opts
Expand All @@ -183,6 +185,11 @@ end
-- @tparam ?table opts.fields
-- Field names for getting only a subset of fields
--
-- @tparam ?string|table opts.vshard_router
-- Cartridge vshard group name or vshard router instance.
-- Set this parameter if your space is not a part of the
-- default vshard cluster.
--
-- @return[1] result
-- @treturn[2] nil
-- @treturn[2] table Error description
Expand All @@ -206,6 +213,11 @@ end
-- @tparam ?table opts.fields
-- Field names for getting only a subset of fields
--
-- @tparam ?string|table opts.vshard_router
-- Cartridge vshard group name or vshard router instance.
-- Set this parameter if your space is not a part of the
-- default vshard cluster.
--
-- @return[1] result
-- @treturn[2] nil
-- @treturn[2] table Error description
Expand Down
7 changes: 4 additions & 3 deletions crud/common/sharding_func.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local log = require('log')
local vshard = require('vshard')

local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local utils = require('crud.common.utils')

local sharding_func_cache = {}

Expand All @@ -15,8 +15,9 @@ function sharding_func_cache.update_cache(space_name, vshard_router)
log.warn("require('crud.common.sharding_func').update_cache()" ..
"is deprecated and will be removed in future releases")

if vshard_router == nil then
vshard_router = vshard.router.static
local vshard_router, err = utils.get_vshard_router_instance(vshard_router)
if err ~= nil then
return nil, err
end

return sharding_metadata_module.update_sharding_func_cache(vshard_router, space_name)
Expand Down
7 changes: 4 additions & 3 deletions crud/common/sharding_key.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local log = require('log')
local vshard = require('vshard')

local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local utils = require('crud.common.utils')

local sharding_key_cache = {}

Expand All @@ -13,8 +13,9 @@ function sharding_key_cache.update_cache(space_name, vshard_router)
log.warn("require('crud.common.sharding_key').update_cache()" ..
"is deprecated and will be removed in future releases")

if vshard_router == nil then
vshard_router = vshard.router.static
local vshard_router, err = utils.get_vshard_router_instance(vshard_router)
if err ~= nil then
return nil, err
end

return sharding_metadata_module.update_sharding_key_cache(vshard_router, space_name)
Expand Down
89 changes: 85 additions & 4 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ local fun = require('fun')
local bit = require('bit')
local log = require('log')

local is_cartridge, cartridge = pcall(require, 'cartridge')

local const = require('crud.common.const')
local schema = require('crud.common.schema')
local dev_checks = require('crud.common.dev_checks')
Expand All @@ -17,6 +19,7 @@ local GetSpaceFormatError = errors.new_class('GetSpaceFormatError', {capture_sta
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
local NotInitializedError = errors.new_class('NotInitialized')
local StorageInfoError = errors.new_class('StorageInfoError')
local VshardRouterError = errors.new_class('VshardRouterError', {capture_stack = false})
local fiber_clock = require('fiber').clock

local utils = {}
Expand Down Expand Up @@ -758,16 +761,23 @@ end
-- @tparam ?number opts.timeout
-- Function call timeout
--
-- @tparam ?string|table opts.vshard_router
-- Cartridge vshard group name or vshard router instance.
--
-- @return a table of storage states by replica uuid.
function utils.storage_info(opts)
local vshard_router = vshard.router.static
opts = opts or {}

local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
if err ~= nil then
return nil, StorageInfoError:new(err)
end

local replicasets, err = vshard_router:routeall()
if replicasets == nil then
return nil, StorageInfoError:new("Failed to get all replicasets: %s", err.err)
return nil, StorageInfoError:new("Failed to get router replicasets: %s", err.err)
end

opts = opts or {}

local futures_by_replicas = {}
local replica_state_by_uuid = {}
local async_opts = {is_async = true}
Expand Down Expand Up @@ -835,4 +845,75 @@ function utils.storage_info_on_storage()
return {status = "running"}
end

local expected_vshard_api = {
'routeall', 'route', 'bucket_id_strcrc32',
'callrw', 'callro', 'callbro', 'callre',
'callbre', 'map_callrw'
}

--- Verifies that a table has expected vshard
-- router handles.
local function verify_vshard_router(router)
dev_checks("table")

for _, func_name in ipairs(expected_vshard_api) do
if type(router[func_name]) ~= 'function' then
return false
end
end

return true
end

--- Get a vshard router instance from a parameter.
--
-- If a string passed, extract router instance from
-- Cartridge vshard groups. If table passed, verifies
-- that a table is a vshard router instance.
--
-- @function get_vshard_router_instance
--
-- @param[opt] router name of a vshard group or a vshard router
-- instance
--
-- @return[1] table vshard router instance
-- @treturn[2] nil
-- @treturn[2] table Error description
function utils.get_vshard_router_instance(router)
dev_checks('?string|table')

local router_instance

if type(router) == 'string' then
if not is_cartridge then
return nil, VshardRouterError:new("Vshard groups are supported only in Tarantool Cartridge")
end

local router_service = cartridge.service_get('vshard-router')
assert(router_service ~= nil)

router_instance = router_service.get(router)
if router_instance == nil then
return nil, VshardRouterError:new("Vshard group %s is not found", router)
end
elseif type(router) == 'table' then
if not verify_vshard_router(router) then
return nil, VshardRouterError:new("Invalid opts.vshard_router table value, " ..
"a vshard router instance has been expected")
end

router_instance = router
else
assert(type(router) == 'nil')
router_instance = vshard.router.static

if router_instance == nil then
return nil, VshardRouterError:new("Default vshard group is not found and custom " ..
"is not specified with opts.vshard_router")
end
end

return router_instance
end

return utils
Loading

0 comments on commit 91f8e0c

Please sign in to comment.