Skip to content

Commit

Permalink
ddl: fail on sharding info mismatch
Browse files Browse the repository at this point in the history
Return error if router sharding info differs from storage sharding info.
Comparison is based on sharding hash values. Hashes are provided with
each relevant request.

Hashes are extracted together with sharding key and sharding func
definitions on router during request execution.

After this patch, the performance of insert requests decreased by 5%,
the performance of select requests decreased by 1.5%.

Part of #212
  • Loading branch information
DifferentialOrange committed Apr 19, 2022
1 parent a11f4a4 commit 70ae4ff
Show file tree
Hide file tree
Showing 16 changed files with 686 additions and 76 deletions.
63 changes: 48 additions & 15 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ local errors = require('errors')

local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false})
local ShardingHashMismatchError = errors.new_class("ShardingHashMismatchError", {capture_stack = false})

local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local storage_metadata_cache = require('crud.common.sharding.storage_metadata_cache')

local sharding = {}

Expand All @@ -25,37 +27,49 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
dev_checks('string', '?', '?number|cdata')

if specified_bucket_id ~= nil then
return specified_bucket_id
return { bucket_id = specified_bucket_id }
end

local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
if err ~= nil then
return nil, err
end

if sharding_func ~= nil then
return sharding_func(key)
if sharding_func_data.value ~= nil then
return {
bucket_id = sharding_func_data.value(key),
sharding_func_hash = sharding_func_data.hash,
}
end

return vshard.router.bucket_id_strcrc32(key)
return { bucket_id = vshard.router.bucket_id_strcrc32(key) }
end

function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
if specified_bucket_id ~= nil then
return specified_bucket_id
return { bucket_id = specified_bucket_id }
end

local sharding_index_parts = space.index[0].parts
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
if err ~= nil then
return nil, err
end
if sharding_key_as_index_obj ~= nil then
sharding_index_parts = sharding_key_as_index_obj.parts
if sharding_key_data.value ~= nil then
sharding_index_parts = sharding_key_data.value.parts
end
local key = utils.extract_key(tuple, sharding_index_parts)

return sharding.key_get_bucket_id(space.name, key)
local bucket_id_data, err = sharding.key_get_bucket_id(space.name, key, nil)
if err ~= nil then
return nil, err
end

return {
bucket_id = bucket_id_data.bucket_id,
sharding_func_hash = bucket_id_data.sharding_func_hash,
sharding_key_hash = sharding_key_data.hash
}
end

function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
Expand All @@ -77,16 +91,35 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
end
end

local bucket_id = tuple[bucket_id_fieldno]
if bucket_id == nil then
bucket_id, err = sharding.tuple_get_bucket_id(tuple, space)
local sharding_data = { bucket_id = tuple[bucket_id_fieldno] }

if sharding_data.bucket_id == nil then
sharding_data, err = sharding.tuple_get_bucket_id(tuple, space)
if err ~= nil then
return nil, err
end
tuple[bucket_id_fieldno] = bucket_id
tuple[bucket_id_fieldno] = sharding_data.bucket_id
end

return sharding_data
end

function sharding.check_sharding_hash(space_name, sharding_func_hash, sharding_key_hash, skip_sharding_hash_check)
if skip_sharding_hash_check == true then
return true
end

local storage_func_hash = storage_metadata_cache.get_sharding_func_hash(space_name)
local storage_key_hash = storage_metadata_cache.get_sharding_key_hash(space_name)

if storage_func_hash ~= sharding_func_hash or storage_key_hash ~= sharding_key_hash then
local err_msg = ('crud: Sharding hash mismatch for space %s. ' ..
'Please refresh sharding data and retry your request.'
):format(space_name)
return nil, ShardingHashMismatchError:new(err_msg)
end

return bucket_id
return true
end

return sharding
10 changes: 8 additions & 2 deletions crud/common/sharding/sharding_metadata.lua
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ end)

local function fetch_on_router(space_name, metadata_map_name, timeout)
if cache[metadata_map_name] ~= nil then
return cache[metadata_map_name][space_name]
return {
value = cache[metadata_map_name][space_name],
hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name]
}
end

local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT
Expand All @@ -131,7 +134,10 @@ local function fetch_on_router(space_name, metadata_map_name, timeout)
end

if cache[metadata_map_name] ~= nil then
return cache[metadata_map_name][space_name]
return {
value = cache[metadata_map_name][space_name],
hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name],
}
end

return nil, FetchShardingMetadataError:new(
Expand Down
31 changes: 26 additions & 5 deletions crud/count.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ local function count_on_storage(space_name, index_id, conditions, opts)
tarantool_iter = 'number',
yield_every = '?number',
scan_condition_num = '?number',
sharding_func_hash = '?number',
sharding_key_hash = '?number',
skip_sharding_hash_check = '?boolean',
})

opts = opts or {}
Expand All @@ -38,6 +41,14 @@ local function count_on_storage(space_name, index_id, conditions, opts)
return nil, CountError:new("Index with ID %s doesn't exist", index_id)
end

local _, err = sharding.check_sharding_hash(space_name,
opts.sharding_func_hash,
opts.sharding_key_hash,
opts.skip_sharding_hash_check)
if err ~= nil then
return nil, err
end

local value = opts.scan_value

local filter_func, err = filters.gen_func(space, conditions, {
Expand Down Expand Up @@ -114,14 +125,14 @@ local function call_count_on_router(space_name, user_conditions, opts)
return nil, CountError:new("Space %q doesn't exist", space_name), true
end

local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
if err ~= nil then
return nil, err
end

-- plan count
local plan, err = count_plan.new(space, conditions, {
sharding_key_as_index_obj = sharding_key_as_index_obj,
sharding_key_as_index_obj = sharding_key_data.value,
})
if err ~= nil then
return nil, CountError:new("Failed to plan count: %s", err), true
Expand Down Expand Up @@ -159,21 +170,28 @@ local function call_count_on_router(space_name, user_conditions, opts)
-- eye to resharding. However, AFAIU, the optimization
-- does not make the result less consistent (sounds
-- weird, huh?).
local sharding_func_hash = nil
local skip_sharding_hash_check = nil

local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
local bucket_id_data, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

assert(bucket_id ~= nil)
assert(bucket_id_data.bucket_id ~= nil)

sharding_func_hash = bucket_id_data.sharding_func_hash

local err
replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id)
replicasets_to_count, err = sharding.get_replicasets_by_bucket_id(bucket_id_data.bucket_id)
if err ~= nil then
return nil, err, true
end
else
skip_sharding_hash_check = true
end

local yield_every = opts.yield_every or DEFAULT_YIELD_EVERY
Expand All @@ -191,6 +209,9 @@ local function call_count_on_router(space_name, user_conditions, opts)
tarantool_iter = plan.tarantool_iter,
yield_every = yield_every,
scan_condition_num = plan.scan_condition_num,
sharding_func_hash = sharding_func_hash,
sharding_key_hash = sharding_key_data.hash,
skip_sharding_hash_check = skip_sharding_hash_check,
}

local results, err = call.map(COUNT_FUNC_NAME, {
Expand Down
43 changes: 36 additions & 7 deletions crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,29 @@ local delete = {}

local DELETE_FUNC_NAME = '_crud.delete_on_storage'

local function delete_on_storage(space_name, key, field_names)
dev_checks('string', '?', '?table')
local function delete_on_storage(space_name, key, field_names, opts)
dev_checks('string', '?', '?table', {
sharding_key_hash = '?number',
sharding_func_hash = '?number',
skip_sharding_hash_check = '?boolean',
})

opts = opts or {}

local space = box.space[space_name]
if space == nil then
return nil, DeleteError:new("Space %q doesn't exist", space_name)
end

local _, err = sharding.check_sharding_hash(space_name,
opts.sharding_func_hash,
opts.sharding_key_hash,
opts.skip_sharding_hash_check)

if err ~= nil then
return nil, err
end

-- add_space_schema_hash is false because
-- reloading space format on router can't avoid delete error on storage
return schema.wrap_box_space_func_result(space, 'delete', {key}, {
Expand Down Expand Up @@ -57,35 +72,49 @@ local function call_delete_on_router(space_name, key, opts)
key = key:totable()
end

local sharding_key_hash = nil
local skip_sharding_hash_check = nil

local sharding_key = key
if opts.bucket_id == nil then
local primary_index_parts = space.index[0].parts

local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
if err ~= nil then
return nil, err
end

sharding_key, err = sharding_key_module.extract_from_pk(space_name,
sharding_key_as_index_obj,
sharding_key_data.value,
primary_index_parts, key)
if err ~= nil then
return nil, err
end

sharding_key_hash = sharding_key_data.hash
else
skip_sharding_hash_check = true
end

local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local delete_on_storage_opts = {
sharding_func_hash = bucket_id_data.sharding_func_hash,
sharding_key_hash = sharding_key_hash,
skip_sharding_hash_check = skip_sharding_hash_check,
}

local call_opts = {
mode = 'write',
timeout = opts.timeout,
}

local storage_result, err = call.single(
bucket_id, DELETE_FUNC_NAME,
{space_name, key, opts.fields},
bucket_id_data.bucket_id, DELETE_FUNC_NAME,
{space_name, key, opts.fields, delete_on_storage_opts},
call_opts
)

Expand Down
43 changes: 36 additions & 7 deletions crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,29 @@ local get = {}

local GET_FUNC_NAME = '_crud.get_on_storage'

local function get_on_storage(space_name, key, field_names)
dev_checks('string', '?', '?table')
local function get_on_storage(space_name, key, field_names, opts)
dev_checks('string', '?', '?table', {
sharding_key_hash = '?number',
sharding_func_hash = '?number',
skip_sharding_hash_check = '?boolean',
})

opts = opts or {}

local space = box.space[space_name]
if space == nil then
return nil, GetError:new("Space %q doesn't exist", space_name)
end

local _, err = sharding.check_sharding_hash(space_name,
opts.sharding_func_hash,
opts.sharding_key_hash,
opts.skip_sharding_hash_check)

if err ~= nil then
return nil, err
end

-- add_space_schema_hash is false because
-- reloading space format on router can't avoid get error on storage
return schema.wrap_box_space_func_result(space, 'get', {key}, {
Expand Down Expand Up @@ -61,36 +76,50 @@ local function call_get_on_router(space_name, key, opts)
end

local sharding_key = key
local sharding_key_hash = nil
local skip_sharding_hash_check = nil

if opts.bucket_id == nil then
local primary_index_parts = space.index[0].parts

local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
if err ~= nil then
return nil, err
end

sharding_key, err = sharding_key_module.extract_from_pk(space_name,
sharding_key_as_index_obj,
sharding_key_data.value,
primary_index_parts, key)
if err ~= nil then
return nil, err
end

sharding_key_hash = sharding_key_data.hash
else
skip_sharding_hash_check = true
end

local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
local bucket_id_data, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local get_on_storage_opts = {
sharding_func_hash = bucket_id_data.sharding_func_hash,
sharding_key_hash = sharding_key_hash,
skip_sharding_hash_check = skip_sharding_hash_check,
}

local call_opts = {
mode = opts.mode or 'read',
prefer_replica = opts.prefer_replica,
balance = opts.balance,
timeout = opts.timeout,
}

local storage_result, err = call.single(
bucket_id, GET_FUNC_NAME,
{space_name, key, opts.fields},
bucket_id_data.bucket_id, GET_FUNC_NAME,
{space_name, key, opts.fields, get_on_storage_opts},
call_opts
)

Expand Down
Loading

0 comments on commit 70ae4ff

Please sign in to comment.