Skip to content

Commit

Permalink
Support custom sharding key in delete, update and get
Browse files Browse the repository at this point in the history
Part of #166

Reviewed-by: Alexander Turenko <[email protected]>
Reviewed-by: Oleg Babin <[email protected]>
  • Loading branch information
ligurio committed Nov 26, 2021
1 parent cab9555 commit 447b81b
Show file tree
Hide file tree
Showing 9 changed files with 487 additions and 3 deletions.
75 changes: 75 additions & 0 deletions crud/common/sharding_key.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding_key_cache')
local utils = require('crud.common.utils')

local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})
local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false})

Expand Down Expand Up @@ -151,12 +152,86 @@ function sharding_key_module.fetch_on_router(space_name, timeout)
"Fetching sharding key for space '%s' is failed", space_name)
end

-- Make sure sharding key definition is a part of primary key.
local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
dev_checks('string', 'table', 'table')

if cache.is_part_of_pk[space_name] ~= nil then
return cache.is_part_of_pk[space_name]
end

local is_part_of_pk = true
local pk_fieldno_map = utils.get_index_fieldno_map(primary_index_parts)
for _, part in ipairs(sharding_key_as_index_obj.parts) do
if pk_fieldno_map[part.fieldno] == nil then
is_part_of_pk = false
break
end
end
cache.is_part_of_pk[space_name] = is_part_of_pk

return is_part_of_pk
end

-- Build an array with sharding key values. Function extracts those values from
-- primary key that are part of sharding key (passed as index object).
local function extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
dev_checks('table', 'table', 'table')

-- TODO: extract_from_index() calculates primary_index_parts on each
-- request. It is better to cache it's value.
-- https://github.com/tarantool/crud/issues/243
local primary_index_fieldno_map = utils.get_index_fieldno_map(primary_index_parts)

local sharding_key = {}
for _, part in ipairs(sharding_key_as_index_obj.parts) do
-- part_number cannot be nil because earlier we checked that tuple
-- field names defined in sharding key definition are part of primary
-- key.
local part_number = primary_index_fieldno_map[part.fieldno]
assert(part_number ~= nil)
local field_value = primary_key[part_number]
table.insert(sharding_key, field_value)
end

return sharding_key
end

-- Extract sharding key from pk.
-- Returns a table with sharding key or pair of nil and error.
function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout)
dev_checks('string', 'table', '?', '?number')

local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name, timeout)
if err ~= nil then
return nil, err
end
if sharding_key_as_index_obj == nil then
return primary_key
end

local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
if res == false then
return nil, ShardingKeyError:new(
"Sharding key for space %q is missed in primary index, specify bucket_id",
space_name
)
end
if type(primary_key) ~= 'table' then
primary_key = {primary_key}
end

return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
end

function sharding_key_module.init()
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
end

sharding_key_module.internal = {
as_index_object = as_index_object,
extract_from_index = extract_from_index,
is_part_of_pk = is_part_of_pk,
}

return sharding_key_module
10 changes: 10 additions & 0 deletions crud/common/sharding_key_cache.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,15 @@ local sharding_key_cache = {}

sharding_key_cache.sharding_key_as_index_obj_map = nil
sharding_key_cache.fetch_lock = fiber.channel(1)
sharding_key_cache.is_part_of_pk = {}

function sharding_key_cache.drop_caches()
sharding_key_cache.sharding_key_as_index_obj_map = nil
if sharding_key_cache.fetch_lock ~= nil then
sharding_key_cache.fetch_lock:close()
end
sharding_key_cache.fetch_lock = fiber.channel(1)
sharding_key_cache.is_part_of_pk = {}
end

return sharding_key_cache
14 changes: 14 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,20 @@ function utils.get_bucket_id_fieldno(space, shard_index_name)
return bucket_id_index.parts[1].fieldno
end

-- Build a map with field number as a keys and part number
-- as a values using index parts as a source.
function utils.get_index_fieldno_map(index_parts)
dev_checks('table')

local fieldno_map = {}
for i, part in ipairs(index_parts) do
local fieldno = part.fieldno
fieldno_map[fieldno] = i
end

return fieldno_map
end

-- Build a map with field names as a keys and fieldno's
-- as a values using space format as a source.
function utils.get_format_fieldno_map(space_format)
Expand Down
13 changes: 12 additions & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local vshard = require('vshard')
local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local sharding_key_module = require('crud.common.sharding_key')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

Expand Down Expand Up @@ -55,7 +56,17 @@ local function call_delete_on_router(space_name, key, opts)
key = key:totable()
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local sharding_key = key
if opts.bucket_id == nil then
local err
local primary_index_parts = space.index[0].parts
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
if err ~= nil then
return nil, err
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
13 changes: 12 additions & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local vshard = require('vshard')
local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local sharding_key_module = require('crud.common.sharding_key')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

Expand Down Expand Up @@ -58,7 +59,17 @@ local function call_get_on_router(space_name, key, opts)
key = key:totable()
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local sharding_key = key
if opts.bucket_id == nil then
local err
local primary_index_parts = space.index[0].parts
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
if err ~= nil then
return nil, err
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = opts.mode or 'read',
prefer_replica = opts.prefer_replica,
Expand Down
13 changes: 12 additions & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local vshard = require('vshard')
local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local sharding_key_module = require('crud.common.sharding_key')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

Expand Down Expand Up @@ -83,6 +84,16 @@ local function call_update_on_router(space_name, key, user_operations, opts)
key = key:totable()
end

local sharding_key = key
if opts.bucket_id == nil then
local err
local primary_index_parts = space.index[0].parts
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)
if err ~= nil then
return nil, err
end
end

local operations = user_operations
if not utils.tarantool_supports_fieldpaths() then
operations, err = utils.convert_operations(user_operations, space_format)
Expand All @@ -91,7 +102,7 @@ local function call_update_on_router(space_name, key, user_operations, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
6 changes: 6 additions & 0 deletions test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@ package.preload['customers-storage'] = function()
table.insert(customers_secondary_idx_name_key_schema.indexes, secondary_index)
table.insert(customers_secondary_idx_name_key_schema.indexes, bucket_id_index)

local customers_age_key_schema = table.deepcopy(customers_schema)
customers_age_key_schema.sharding_key = {'age'}
table.insert(customers_age_key_schema.indexes, primary_index)
table.insert(customers_age_key_schema.indexes, bucket_id_index)

local schema = {
spaces = {
customers_name_key = customers_name_key_schema,
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
customers_age_key = customers_age_key_schema,
}
}

Expand Down
Loading

0 comments on commit 447b81b

Please sign in to comment.