Skip to content

Commit

Permalink
Extract sharding key from conditions
Browse files Browse the repository at this point in the history
PR #181 introduced support of DDL sharding keys. But if sharding key
hasn't got a separate index in schema, select with equal conditions
for all required sharding key fields still led to map-reduce instead of
a single storage call. This patch introduces impoved support of
sharding keys extraction and fixes the issue.

Closes #213
  • Loading branch information
DifferentialOrange committed Nov 24, 2021
1 parent 3ac211c commit 408d1cf
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

* Use tuple-merger backed select implementation on tarantool 2.10+ (it gives
less pressure on Lua GC).
* DDL sharding key now can be extracted from select conditions even if
there are no separate index.

## [0.9.0] - 20-10-21

Expand Down
69 changes: 61 additions & 8 deletions crud/select/plan.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
local errors = require('errors')

local compare_conditions = require('crud.compare.conditions')
local sharding_key_module = require('crud.common.sharding_key')
local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')

Expand Down Expand Up @@ -48,6 +49,46 @@ local function get_index_for_condition(space_indexes, space_format, condition)
end
end

local function extract_sharding_key_from_conditions(conditions, ddl_sharding_key, space_indexes, space_field_positions)
if ddl_sharding_key == nil then
return nil
end

-- If name is both valid index name and field name,
-- it is interpreted as index name.
local filled_fields = {}
for _, condition in ipairs(conditions) do
if condition.operator ~= compare_conditions.operators.EQ then
goto continue
end

local index = space_indexes[condition.operand]
if index ~= nil then
for i, part in ipairs(index.parts) do
filled_fields[part.fieldno] = condition.values[i]
end

goto continue
end

local fieldno = space_field_positions[condition.operand]
filled_fields[fieldno] = condition.values[1]

::continue::
end

local sharding_key = {}
for i, v in ipairs(ddl_sharding_key.parts) do
if filled_fields[v.fieldno] == nil then
return nil
end

sharding_key[i] = filled_fields[v.fieldno]
end

return sharding_key
end

local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
if #scan_value < #sharding_index.parts then
return nil
Expand Down Expand Up @@ -90,7 +131,7 @@ end
-- and these fields are ordered by field_names + primary key + scan key
-- this order can be differ from order in space format
-- so we need to cast after_tuple to space format for scrolling tuples on storage
local function construct_after_tuple_by_fields(space_format, field_names, tuple)
local function construct_after_tuple_by_fields(space_field_positions, field_names, tuple)
if tuple == nil then
return nil
end
Expand All @@ -99,15 +140,10 @@ local function construct_after_tuple_by_fields(space_format, field_names, tuple)
return tuple
end

local positions = {}
local transformed_tuple = {}

for i, field in ipairs(space_format) do
positions[field.name] = i
end

for i, field_name in ipairs(field_names) do
local fieldno = positions[field_name]
local fieldno = space_field_positions[field_name]
if fieldno == nil then
return nil, FilterFieldsError:new(
'Space format doesn\'t contain field named %q', field_name
Expand Down Expand Up @@ -145,6 +181,12 @@ function select_plan.new(space, conditions, opts)
local scan_value
local scan_condition_num

local space_field_positions = {}

for i, field in ipairs(space_format) do
space_field_positions[field.name] = i
end

-- search index to iterate over
for i, condition in ipairs(conditions) do
scan_index = get_index_for_condition(space_indexes, space_format, condition)
Expand Down Expand Up @@ -177,7 +219,7 @@ function select_plan.new(space, conditions, opts)
-- handle opts.first
local total_tuples_count
local scan_after_tuple, err = construct_after_tuple_by_fields(
space_format, field_names, opts.after_tuple
space_field_positions, field_names, opts.after_tuple
)
if err ~= nil then
return nil, err
Expand Down Expand Up @@ -235,6 +277,17 @@ function select_plan.new(space, conditions, opts)
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
end

if sharding_key == nil then
-- Ignore possible errors to preserve old behavior
-- since here it affects only extracting sharding_key from
-- conditions and it should not be critical to select call.
local ddl_sharding_key = sharding_key_module.fetch_on_router(space_name)

sharding_key = extract_sharding_key_from_conditions(
conditions, ddl_sharding_key, space_indexes, space_field_positions
)
end

local plan = {
conditions = conditions,
space_name = space_name,
Expand Down
15 changes: 15 additions & 0 deletions test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ package.preload['customers-storage'] = function()
{path = 'name', is_nullable = false, type = 'string'},
},
}
local age_index = {
name = 'age',
type = 'TREE',
unique = false,
parts = {
{path = 'age', is_nullable = false, type = 'number'},
},
}
local secondary_index = {
name = 'secondary',
type = 'TREE',
Expand Down Expand Up @@ -100,13 +108,20 @@ package.preload['customers-storage'] = function()
table.insert(customers_age_key_schema.indexes, primary_index)
table.insert(customers_age_key_schema.indexes, bucket_id_index)

local customers_name_age_key_different_indexes_schema = table.deepcopy(customers_schema)
customers_name_age_key_different_indexes_schema.sharding_key = {'name', 'age'}
table.insert(customers_name_age_key_different_indexes_schema.indexes, primary_index)
table.insert(customers_name_age_key_different_indexes_schema.indexes, bucket_id_index)
table.insert(customers_name_age_key_different_indexes_schema.indexes, age_index)

local schema = {
spaces = {
customers_name_key = customers_name_key_schema,
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
customers_age_key = customers_age_key_schema,
customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema,
}
}

Expand Down
12 changes: 12 additions & 0 deletions test/helpers/storage_stat.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,16 @@ function storage_stat.diff(a, b)
return diff
end

-- Accepts collect (or diff) return value and returns
-- total number of select requests across all storages.
function storage_stat.total(stats)
local total = 0

for _, stat in pairs(stats) do
total = total + (stat.select_requests or 0)
end

return total
end

return storage_stat
95 changes: 76 additions & 19 deletions test/integration/ddl_sharding_key_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pgroup.before_each(function(g)
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index')
helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key')
helpers.truncate_space_on_cluster(g.cluster, 'customers_age_key')
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_age_key_different_indexes')
end)

pgroup.test_insert_object = function(g)
Expand Down Expand Up @@ -279,13 +280,7 @@ pgroup.test_select = function(g)
t.assert_equals(result.rows[1], tuple)
end

-- TODO: After enabling support of sharding keys that are not equal to primary
-- keys, we should handle it differently: it is not enough to look just on scan
-- value, we should traverse all conditions. Now missed cases lead to
-- map-reduce. Will be resolved in #213.
pgroup.test_select_wont_lead_map_reduce = function(g)
local space_name = 'customers_name_key_uniq_index'

local prepare_data_name_sharding_key = function(g, space_name)
local conn_s1 = g.cluster:server('s1-master').net_box
local conn_s2 = g.cluster:server('s2-master').net_box

Expand All @@ -301,12 +296,80 @@ pgroup.test_select_wont_lead_map_reduce = function(g)
-- bucket_id is 1161, storage is s-2
local result = conn_s2.space[space_name]:insert({4, 1161, 'James Joyce', 59})
t.assert_not_equals(result, nil)
end

local prepare_data_name_age_sharding_key = function(g, space_name)
local conn_s1 = g.cluster:server('s1-master').net_box
local conn_s2 = g.cluster:server('s2-master').net_box

-- bucket_id is 2310, storage is s-1
local result = conn_s1.space[space_name]:insert({1, 2310, 'Viktor Pelevin', 58})
t.assert_not_equals(result, nil)
-- bucket_id is 63, storage is s-2
local result = conn_s2.space[space_name]:insert({2, 63, 'Isaac Asimov', 72})
t.assert_not_equals(result, nil)
-- bucket_id is 2901, storage is s-1
local result = conn_s1.space[space_name]:insert({3, 2901, 'Aleksandr Solzhenitsyn', 89})
t.assert_not_equals(result, nil)
-- bucket_id is 1365, storage is s-2
local result = conn_s2.space[space_name]:insert({4, 1365, 'James Joyce', 59})
t.assert_not_equals(result, nil)
end

local cases = {
select_for_indexed_sharding_key = {
space_name = 'customers_name_key_uniq_index',
prepare_data = prepare_data_name_sharding_key,
conditions = {{'==', 'name', 'Viktor Pelevin'}},
},
select_for_sharding_key_as_index_part = {
space_name = 'customers_name_key',
prepare_data = prepare_data_name_sharding_key,
conditions = {{'==', 'name', 'Viktor Pelevin'}},
},
select_for_sharding_key_as_several_indexes_parts = {
space_name = 'customers_name_age_key_different_indexes',
prepare_data = prepare_data_name_age_sharding_key,
conditions = {{'==', 'name', 'Viktor Pelevin'}, {'==', 'age', 58}},
},
select_by_index_cond_for_sharding_key_as_several_indexes_parts = {
space_name = 'customers_name_age_key_different_indexes',
prepare_data = prepare_data_name_age_sharding_key,
conditions = {{'==', 'id', {1, 'Viktor Pelevin'}}, {'==', 'age', 58}},
}
}

for name, case in pairs(cases) do
pgroup[('test_%s_wont_lead_to_map_reduce'):format(name)] = function(g)
case.prepare_data(g, case.space_name)

local stat_a = storage_stat.collect(g.cluster)

local result, err = g.cluster.main_server.net_box:call('crud.select', {
case.space_name, case.conditions
})
t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 1)

local stat_b = storage_stat.collect(g.cluster)

-- Check a number of select() requests made by CRUD on cluster's storages
-- after calling select() on a router. Make sure only a single storage has
-- a single select() request. Otherwise we lead to map-reduce.
local stats = storage_stat.diff(stat_b, stat_a)
t.assert_equals(storage_stat.total(stats), 1, 'Select request was not a map reduce')
end
end

pgroup.test_select_for_part_of_sharding_key_will_lead_to_map_reduce = function(g)
local space_name = 'customers_name_age_key_different_indexes'
prepare_data_name_age_sharding_key(g, space_name)

local stat_a = storage_stat.collect(g.cluster)

-- Select a tuple with name 'Viktor Pelevin'.
local result, err = g.cluster.main_server.net_box:call('crud.select', {
space_name, {{'==', 'name', 'Viktor Pelevin'}}
space_name, {{'==', 'age', 58}},
})
t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
Expand All @@ -315,16 +378,10 @@ pgroup.test_select_wont_lead_map_reduce = function(g)
local stat_b = storage_stat.collect(g.cluster)

-- Check a number of select() requests made by CRUD on cluster's storages
-- after calling select() on a router. Make sure only a single storage has
-- a single select() request. Otherwise we lead map-reduce.
t.assert_equals(storage_stat.diff(stat_b, stat_a), {
['s-1'] = {
select_requests = 0,
},
['s-2'] = {
select_requests = 1,
},
})
-- after calling select() on a router. Make sure it was a map-reduce
-- since we do not have sharding key values in conditions.
local stats = storage_stat.diff(stat_b, stat_a)
t.assert_equals(storage_stat.total(stats), 2, 'Select request was a map reduce')
end

pgroup.test_select_secondary_idx = function(g)
Expand Down

0 comments on commit 408d1cf

Please sign in to comment.