Skip to content

Commit

Permalink
Extract sharding key from conditions
Browse files Browse the repository at this point in the history
PR #181 introduced support of DDL sharding keys. But if sharding key
hasn't got a separate index in schema, select with equal conditions
for all required sharding key fields still led to map-reduce instead of
a single storage call. This patch introduces impoved support of
sharding keys extraction and fixes the issue.

Closes #213
  • Loading branch information
DifferentialOrange authored and Totktonada committed Dec 1, 2021
1 parent cc2e0e8 commit dc1cdc4
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 55 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

* Use tuple-merger backed select implementation on tarantool 2.10+ (it gives
less pressure on Lua GC).
* DDL sharding key now can be extracted from select conditions even if
there are no separate index.

## [0.9.0] - 20-10-21

Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ Current limitations for using custom sharding key:
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.sharding_key').update_cache()`.
- CRUD select may lead map reduce in some cases, see
[#213](https://github.com/tarantool/crud/issues/213).
- No support of JSON path for sharding key, see
[#219](https://github.com/tarantool/crud/issues/219).
- `primary_index_fieldno_map` is not cached, see
Expand Down
112 changes: 79 additions & 33 deletions crud/select/plan.lua
Original file line number Diff line number Diff line change
Expand Up @@ -48,49 +48,91 @@ local function get_index_for_condition(space_indexes, space_format, condition)
end
end

local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
if #scan_value < #sharding_index.parts then
return nil
end
local function extract_sharding_key_from_conditions(conditions, sharding_index, space_indexes, fieldno_map)
dev_checks('table', 'table', 'table', 'table')

-- If name is both valid index name and field name,
-- it is interpreted as index name.
local filled_fields = {}
for _, condition in ipairs(conditions) do
if condition.operator ~= compare_conditions.operators.EQ then
goto continue
end

if scan_index.id == sharding_index.id then
return scan_value
end
local index = space_indexes[condition.operand]
if index ~= nil then
for i, part in ipairs(index.parts) do
-- Consider the following case:
-- index_0: {'foo', 'bar'},
-- index_1: {'baz', 'bar'},
-- conditions: {{'==', 'index_0', {1, 2}}, {'==', 'index_1', {3, nil}}}.
-- To check that nil parts will not overwrite already filled_fields,
-- we verify that filled_fields[part.fieldno] is empty. If there are
-- more than one non-null different value in conditions with equal operator,
-- request is already in conflict and it doesn't matter what sharding key we
-- will return.
if filled_fields[part.fieldno] == nil then
filled_fields[part.fieldno] = condition.values[i]
end
end

local scan_value_fields_values = {}
for i, scan_index_part in ipairs(scan_index.parts) do
scan_value_fields_values[scan_index_part.fieldno] = scan_value[i]
goto continue
end

local fieldno = fieldno_map[condition.operand]
if fieldno == nil then
goto continue
end
filled_fields[fieldno] = condition.values[1]

::continue::
end

-- check that sharding key is included in the scan index fields
local sharding_key = {}
for _, sharding_key_part in ipairs(sharding_index.parts) do
local fieldno = sharding_key_part.fieldno

-- sharding key isn't included in scan key
if scan_value_fields_values[fieldno] == nil then
for i, v in ipairs(sharding_index.parts) do
if filled_fields[v.fieldno] == nil then
return nil
end

local field_value = scan_value_fields_values[fieldno]
sharding_key[i] = filled_fields[v.fieldno]
end

-- sharding key contains nil values
if field_value == nil then
return nil
return sharding_key
end

local function get_sharding_key_from_scan_value(scan_value, scan_index, scan_iter, sharding_index)
dev_checks('?', 'table', 'number', 'table')

if scan_value == nil then
return nil
end

if scan_iter ~= box.index.EQ and scan_iter ~= box.index.REQ then
return nil
end

if scan_index.id == sharding_index.id then
if type(scan_value) ~= 'table' then
return scan_value
end

table.insert(sharding_key, field_value)
for i, _ in ipairs(sharding_index.parts) do
if scan_value[i] == nil then return nil end
end
return scan_value
end

return sharding_key
return nil
end

-- We need to construct after_tuple by field_names
-- because if `fields` option is specified we have after_tuple with partial fields
-- and these fields are ordered by field_names + primary key + scan key
-- this order can be differ from order in space format
-- so we need to cast after_tuple to space format for scrolling tuples on storage
local function construct_after_tuple_by_fields(space_format, field_names, tuple)
local function construct_after_tuple_by_fields(fieldno_map, field_names, tuple)
dev_checks('table', '?table', '?table|cdata')

if tuple == nil then
return nil
end
Expand All @@ -99,15 +141,10 @@ local function construct_after_tuple_by_fields(space_format, field_names, tuple)
return tuple
end

local positions = {}
local transformed_tuple = {}

for i, field in ipairs(space_format) do
positions[field.name] = i
end

for i, field_name in ipairs(field_names) do
local fieldno = positions[field_name]
local fieldno = fieldno_map[field_name]
if fieldno == nil then
return nil, FilterFieldsError:new(
'Space format doesn\'t contain field named %q', field_name
Expand Down Expand Up @@ -145,6 +182,8 @@ function select_plan.new(space, conditions, opts)
local scan_value
local scan_condition_num

local fieldno_map = utils.get_format_fieldno_map(space_format)

-- search index to iterate over
for i, condition in ipairs(conditions) do
scan_index = get_index_for_condition(space_indexes, space_format, condition)
Expand Down Expand Up @@ -177,7 +216,7 @@ function select_plan.new(space, conditions, opts)
-- handle opts.first
local total_tuples_count
local scan_after_tuple, err = construct_after_tuple_by_fields(
space_format, field_names, opts.after_tuple
fieldno_map, field_names, opts.after_tuple
)
if err ~= nil then
return nil, err
Expand Down Expand Up @@ -230,9 +269,11 @@ function select_plan.new(space, conditions, opts)
local sharding_index = opts.sharding_key_as_index_obj or primary_index

-- get sharding key value
local sharding_key
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
local sharding_key = get_sharding_key_from_scan_value(scan_value, scan_index, scan_iter, sharding_index)

if sharding_key == nil then
sharding_key = extract_sharding_key_from_conditions(conditions, sharding_index,
space_indexes, fieldno_map)
end

local plan = {
Expand All @@ -251,4 +292,9 @@ function select_plan.new(space, conditions, opts)
return plan
end

select_plan.internal = {
get_sharding_key_from_scan_value = get_sharding_key_from_scan_value,
extract_sharding_key_from_conditions = extract_sharding_key_from_conditions
}

return select_plan
33 changes: 33 additions & 0 deletions test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ package.preload['customers-storage'] = function()
{path = 'name', is_nullable = false, type = 'string'},
},
}
local age_index = {
name = 'age',
type = 'TREE',
unique = false,
parts = {
{path = 'age', is_nullable = false, type = 'number'},
},
}
local secondary_index = {
name = 'secondary',
type = 'TREE',
Expand All @@ -71,6 +79,17 @@ package.preload['customers-storage'] = function()
},
}

local three_fields_index = {
name = 'three_fields',
type = 'TREE',
unique = false,
parts = {
{path = 'age', is_nullable = false, type = 'number'},
{path = 'name', is_nullable = false, type = 'string'},
{path = 'id', is_nullable = false, type = 'unsigned'},
},
}

local customers_name_key_schema = table.deepcopy(customers_schema)
customers_name_key_schema.sharding_key = {'name'}
table.insert(customers_name_key_schema.indexes, primary_index)
Expand Down Expand Up @@ -100,13 +119,27 @@ package.preload['customers-storage'] = function()
table.insert(customers_age_key_schema.indexes, primary_index)
table.insert(customers_age_key_schema.indexes, bucket_id_index)

local customers_name_age_key_different_indexes_schema = table.deepcopy(customers_schema)
customers_name_age_key_different_indexes_schema.sharding_key = {'name', 'age'}
table.insert(customers_name_age_key_different_indexes_schema.indexes, primary_index)
table.insert(customers_name_age_key_different_indexes_schema.indexes, bucket_id_index)
table.insert(customers_name_age_key_different_indexes_schema.indexes, age_index)

local customers_name_age_key_three_fields_index_schema = table.deepcopy(customers_schema)
customers_name_age_key_three_fields_index_schema.sharding_key = {'name', 'age'}
table.insert(customers_name_age_key_three_fields_index_schema.indexes, primary_index_id)
table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index)
table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index)

local schema = {
spaces = {
customers_name_key = customers_name_key_schema,
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
customers_age_key = customers_age_key_schema,
customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema,
customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema,
}
}

Expand Down
12 changes: 12 additions & 0 deletions test/helpers/storage_stat.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,16 @@ function storage_stat.diff(a, b)
return diff
end

-- Accepts collect (or diff) return value and returns
-- total number of select requests across all storages.
function storage_stat.total(stats)
local total = 0

for _, stat in pairs(stats) do
total = total + (stat.select_requests or 0)
end

return total
end

return storage_stat
Loading

0 comments on commit dc1cdc4

Please sign in to comment.