Skip to content

Commit

Permalink
ddl: reload and retry on sharding info mismatch
Browse files Browse the repository at this point in the history
If sharding info mismatch has happened, sharding info will be reloaded
on router. After that, request will be retried with new sharding info
(expect for pairs requests due to its nature, they must be retried
manually).

There are no detectable performance drops introduced in this patch.

Closes #212
  • Loading branch information
DifferentialOrange committed Apr 19, 2022
1 parent 70ae4ff commit 2053ff8
Show file tree
Hide file tree
Showing 25 changed files with 459 additions and 180 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* Fix processing storage error for tuple-merger implementation of
select/pairs (#271).
* Do not change input tuple object in requests.
* Add automatic reload of DDL schema (#212).

## [0.10.0] - 01-12-21

Expand Down
15 changes: 0 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,26 +117,11 @@ Table below describe what operations supports custom sharding key:

Current limitations for using custom sharding key:

- It's not possible to update sharding keys automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.common.sharding_key').update_cache()`
(this function updates both caches: sharding key cache and sharding function
cache, but returned value is sharding key from cache).
- No support of JSON path for sharding key, see
[#219](https://github.com/tarantool/crud/issues/219).
- `primary_index_fieldno_map` is not cached, see
[#243](https://github.com/tarantool/crud/issues/243).

Current limitations for using custom sharding functions:

- It's not possible to update sharding functions automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.common.sharding_func').update_cache()`
(this function updates both caches: sharding key cache and sharding function
cache, but returned value is sharding function from cache).

### Insert

```lua
Expand Down
15 changes: 12 additions & 3 deletions crud/borders.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local call = require('crud.common.call')
local utils = require('crud.common.utils')
Expand Down Expand Up @@ -76,7 +77,7 @@ local function call_get_border_on_router(border_name, space_name, index_name, op

local space = utils.get_space(space_name, vshard.router.routeall())
if space == nil then
return nil, BorderError:new("Space %q doesn't exist", space_name), true
return nil, BorderError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
end

local index
Expand All @@ -87,7 +88,9 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
end

if index == nil then
return nil, BorderError:new("Index %q of space %q doesn't exist", index_name, space_name), true
return nil,
BorderError:new("Index %q of space %q doesn't exist", index_name, space_name),
const.NEED_SCHEMA_RELOAD
end

local primary_index = space.index[0]
Expand Down Expand Up @@ -131,8 +134,14 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
for _, storage_result in pairs(results) do
local storage_result = storage_result[1]
if storage_result.err ~= nil then
local err_wrapped = BorderError:new("Failed to get %s: %s", border_name, storage_result.err)

local need_reload = schema.result_needs_reload(space, storage_result)
return nil, BorderError:new("Failed to get %s: %s", border_name, storage_result.err), need_reload
if need_reload then
return nil, err_wrapped, const.NEED_SCHEMA_RELOAD
end

return nil, err_wrapped
end

local tuple = storage_result.res
Expand Down
6 changes: 6 additions & 0 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local errors = require('errors')

local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock

local CallError = errors.new_class('CallError')
Expand Down Expand Up @@ -40,6 +41,11 @@ function call.get_vshard_call_name(mode, prefer_replica, balance)
end

local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
-- Do not rewrite ShardingHashMismatchError class.
if err.class_name == sharding_utils.ShardingHashMismatchError.name then
return errors.wrap(err)
end

if err.type == 'ClientError' and type(err.message) == 'string' then
if err.message == string.format("Procedure '%s' is not defined", func_name) then
if func_name:startswith('_crud.') then
Expand Down
6 changes: 5 additions & 1 deletion crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,9 @@ local const = {}
const.RELOAD_RETRIES_NUM = 1
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
const.FETCH_SHARDING_METADATA_TIMEOUT = 3 -- 3 seconds
const.SHARDING_RELOAD_RETRIES_NUM = 1

return const
const.NEED_SCHEMA_RELOAD = 0x0001000
const.NEED_SHARDING_RELOAD = 0x0001001

return const
2 changes: 1 addition & 1 deletion crud/common/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ function schema.wrap_func_reload(func, ...)
while true do
res, err, need_reload = func(...)

if err == nil or not need_reload then
if err == nil or need_reload ~= const.NEED_SCHEMA_RELOAD then
break
end

Expand Down
67 changes: 64 additions & 3 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ local errors = require('errors')

local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false})
local ShardingHashMismatchError = errors.new_class("ShardingHashMismatchError", {capture_stack = false})

local const = require('crud.common.const')
local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local storage_metadata_cache = require('crud.common.sharding.storage_metadata_cache')
local sharding_utils = require('crud.common.sharding.utils')

local sharding = {}

Expand Down Expand Up @@ -114,12 +115,72 @@ function sharding.check_sharding_hash(space_name, sharding_func_hash, sharding_k

if storage_func_hash ~= sharding_func_hash or storage_key_hash ~= sharding_key_hash then
local err_msg = ('crud: Sharding hash mismatch for space %s. ' ..
'Please refresh sharding data and retry your request.'
'Sharding info will be refreshed after receiving this error. ' ..
'Please retry your request.'
):format(space_name)
return nil, ShardingHashMismatchError:new(err_msg)
return nil, sharding_utils.ShardingHashMismatchError:new(err_msg)
end

return true
end

function sharding.result_needs_sharding_reload(err)
return err.class_name == sharding_utils.ShardingHashMismatchError.name
end

function sharding.wrap_method(method, space_name, ...)
local i = 0

local res, err, need_reload
while true do
res, err, need_reload = method(space_name, ...)

if err == nil or need_reload ~= const.NEED_SHARDING_RELOAD then
break
end

sharding_metadata_module.reload_sharding_cache(space_name)

i = i + 1

if i > const.SHARDING_RELOAD_RETRIES_NUM then
break
end
end

return res, err, need_reload
end

-- This wrapper assumes reload is performed inside the method and
-- expect ShardingHashMismatchError error to be thrown.
function sharding.wrap_select_method(method, space_name, ...)
local i = 0

local ok, res, err
while true do
ok, res, err = pcall(method, space_name, ...)

if ok == true then
break
end

-- Error thrown from merger casted to string,
-- so the only way to identify it is string.find.
local str_err = tostring(res)
if (str_err:find(sharding_utils.ShardingHashMismatchError.name) == nil) then
error(res)
end

-- Reload is performed inside the merger.

i = i + 1

if i > const.SHARDING_RELOAD_RETRIES_NUM then
error(res)
end
end

return res, err
end

return sharding
15 changes: 13 additions & 2 deletions crud/common/sharding/sharding_metadata.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local fiber = require('fiber')
local errors = require('errors')
local log = require('log')

local call = require('crud.common.call')
local const = require('crud.common.const')
Expand Down Expand Up @@ -85,10 +86,11 @@ end
-- a sharding metadata by a single one, other fibers will wait while
-- cache.fetch_lock become unlocked during timeout passed to
-- _fetch_on_router().
-- metadata_map_name == nil means forced reload.
local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
dev_checks('number', 'string', 'string')
dev_checks('number', 'string', '?string')

if cache[metadata_map_name] ~= nil then
if (metadata_map_name ~= nil) and (cache[metadata_map_name]) ~= nil then
return
end

Expand Down Expand Up @@ -186,6 +188,15 @@ function sharding_metadata_module.update_sharding_func_cache(space_name)
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
end

function sharding_metadata_module.reload_sharding_cache(space_name)
cache.drop_caches()

local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, space_name, nil)
if err ~= nil then
log.warn('Failed to reload sharding cache: %s', err)
end
end

function sharding_metadata_module.init()
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
end
Expand Down
3 changes: 3 additions & 0 deletions crud/common/sharding/utils.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local digest = require('digest')
local errors = require('errors')
local msgpack = require('msgpack')

local utils = {}
Expand All @@ -8,6 +9,8 @@ utils.SPACE_SHARDING_KEY_FIELDNO = 2
utils.SPACE_SHARDING_FUNC_NAME_FIELDNO = 2
utils.SPACE_SHARDING_FUNC_BODY_FIELDNO = 3

utils.ShardingHashMismatchError = errors.new_class("ShardingHashMismatchError", {capture_stack = false})

function utils.extract_sharding_func_def(tuple)
if not tuple then
return nil
Expand Down
15 changes: 0 additions & 15 deletions crud/common/sharding_func.lua

This file was deleted.

13 changes: 0 additions & 13 deletions crud/common/sharding_key.lua

This file was deleted.

5 changes: 3 additions & 2 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local vshard = require('vshard')
local fun = require('fun')
local bit = require('bit')

local const = require('crud.common.const')
local schema = require('crud.common.schema')
local dev_checks = require('crud.common.dev_checks')

Expand Down Expand Up @@ -631,12 +632,12 @@ end
local function flatten_obj(space_name, obj)
local space_format, err = utils.get_space_format(space_name, vshard.router.routeall())
if err ~= nil then
return nil, FlattenError:new("Failed to get space format: %s", err), true
return nil, FlattenError:new("Failed to get space format: %s", err), const.NEED_SCHEMA_RELOAD
end

local tuple, err = utils.flatten(obj, space_format)
if err ~= nil then
return nil, FlattenError:new("Object is specified in bad format: %s", err), true
return nil, FlattenError:new("Object is specified in bad format: %s", err), const.NEED_SCHEMA_RELOAD
end

return tuple
Expand Down
18 changes: 13 additions & 5 deletions crud/count.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local vshard = require('vshard')
local fiber = require('fiber')

local call = require('crud.common.call')
local const = require('crud.common.const')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local filters = require('crud.compare.filters')
Expand Down Expand Up @@ -122,7 +123,7 @@ local function call_count_on_router(space_name, user_conditions, opts)

local space = utils.get_space(space_name, replicasets)
if space == nil then
return nil, CountError:new("Space %q doesn't exist", space_name), true
return nil, CountError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
end

local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
Expand All @@ -135,7 +136,7 @@ local function call_count_on_router(space_name, user_conditions, opts)
sharding_key_as_index_obj = sharding_key_data.value,
})
if err ~= nil then
return nil, CountError:new("Failed to plan count: %s", err), true
return nil, CountError:new("Failed to plan count: %s", err), const.NEED_SCHEMA_RELOAD
end

-- set replicasets to count from
Expand Down Expand Up @@ -188,7 +189,7 @@ local function call_count_on_router(space_name, user_conditions, opts)
local err
replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id)
if err ~= nil then
return nil, err, true
return nil, err, const.NEED_SCHEMA_RELOAD
end
else
skip_sharding_hash_check = true
Expand Down Expand Up @@ -219,7 +220,13 @@ local function call_count_on_router(space_name, user_conditions, opts)
}, call_opts)

if err ~= nil then
return nil, CountError:new("Failed to call count on storage-side: %s", err)
local err_wrapped = CountError:new("Failed to call count on storage-side: %s", err)

if sharding.result_needs_sharding_reload(err) then
return nil, err_wrapped, const.NEED_SHARDING_RELOAD
end

return nil, err_wrapped
end

if results.err ~= nil then
Expand Down Expand Up @@ -289,7 +296,8 @@ function count.call(space_name, user_conditions, opts)
mode = '?string',
})

return schema.wrap_func_reload(call_count_on_router, space_name, user_conditions, opts)
return schema.wrap_func_reload(sharding.wrap_method,
call_count_on_router, space_name, user_conditions, opts)
end

return count
Loading

0 comments on commit 2053ff8

Please sign in to comment.