From 3ca66dd5ec22f965e03b778ec22134f947f82029 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Mon, 22 Nov 2021 16:46:00 +0300 Subject: [PATCH] Extract sharding key from conditions PR #181 introduced support of DDL sharding keys. But if sharding key hasn't got a separate index in schema, select with equal conditions for all required sharding key fields still led to map-reduce instead of a single storage call. This patch introduces impoved support of sharding keys extraction and fixes the issue. Closes #213 --- CHANGELOG.md | 2 + README.md | 2 - crud/select/plan.lua | 88 +++++++++++------- test/entrypoint/srv_ddl.lua | 33 +++++++ test/helpers/storage_stat.lua | 12 +++ test/integration/ddl_sharding_key_test.lua | 101 +++++++++++++++++---- test/unit/select_plan_test.lua | 2 +- 7 files changed, 185 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb9cad0e7..e332d0163 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. * Use tuple-merger backed select implementation on tarantool 2.10+ (it gives less pressure on Lua GC). +* DDL sharding key now can be extracted from select conditions even if + there are no separate index. ## [0.9.0] - 20-10-21 diff --git a/README.md b/README.md index 1e5e5bf67..c62a5412a 100644 --- a/README.md +++ b/README.md @@ -101,8 +101,6 @@ Current limitations for using custom sharding key: updated on storages, see [#212](https://github.com/tarantool/crud/issues/212). However it is possible to do it manually with `require('crud.sharding_key').update_cache()`. -- CRUD select may lead map reduce in some cases, see - [#213](https://github.com/tarantool/crud/issues/213). - No support of JSON path for sharding key, see [#219](https://github.com/tarantool/crud/issues/219). - `primary_index_fieldno_map` is not cached, see diff --git a/crud/select/plan.lua b/crud/select/plan.lua index 260ed2078..f807301a1 100644 --- a/crud/select/plan.lua +++ b/crud/select/plan.lua @@ -48,49 +48,75 @@ local function get_index_for_condition(space_indexes, space_format, condition) end end -local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index) - if #scan_value < #sharding_index.parts then +local function extract_sharding_key_from_conditions(conditions, sharding_index, space_indexes, fieldno_map) + if sharding_index == nil then return nil end - if scan_index.id == sharding_index.id then - return scan_value - end + -- If name is both valid index name and field name, + -- it is interpreted as index name. + local filled_fields = {} + for _, condition in ipairs(conditions) do + if condition.operator ~= compare_conditions.operators.EQ then + goto continue + end - local scan_value_fields_values = {} - for i, scan_index_part in ipairs(scan_index.parts) do - scan_value_fields_values[scan_index_part.fieldno] = scan_value[i] - end + local index = space_indexes[condition.operand] + if index ~= nil then + for i, part in ipairs(index.parts) do + filled_fields[part.fieldno] = condition.values[i] + end - -- check that sharding key is included in the scan index fields - local sharding_key = {} - for _, sharding_key_part in ipairs(sharding_index.parts) do - local fieldno = sharding_key_part.fieldno + goto continue + end - -- sharding key isn't included in scan key - if scan_value_fields_values[fieldno] == nil then - return nil + local fieldno = fieldno_map[condition.operand] + if fieldno == nil then + goto continue end + filled_fields[fieldno] = condition.values[1] - local field_value = scan_value_fields_values[fieldno] + ::continue:: + end - -- sharding key contains nil values - if field_value == nil then + local sharding_key = {} + for i, v in ipairs(sharding_index.parts) do + if filled_fields[v.fieldno] == nil then return nil end - table.insert(sharding_key, field_value) + sharding_key[i] = filled_fields[v.fieldno] end return sharding_key end +local function get_sharding_key_from_scan_value(scan_value, scan_index, scan_iter, sharding_index) + if scan_value == nil then + return nil + end + + if scan_iter ~= box.index.EQ and scan_iter ~= box.index.REQ then + return nil + end + + if #scan_value < #sharding_index.parts then + return nil + end + + if scan_index.id == sharding_index.id then + return scan_value + end + + return nil +end + -- We need to construct after_tuple by field_names -- because if `fields` option is specified we have after_tuple with partial fields -- and these fields are ordered by field_names + primary key + scan key -- this order can be differ from order in space format -- so we need to cast after_tuple to space format for scrolling tuples on storage -local function construct_after_tuple_by_fields(space_format, field_names, tuple) +local function construct_after_tuple_by_fields(fieldno_map, field_names, tuple) if tuple == nil then return nil end @@ -99,15 +125,10 @@ local function construct_after_tuple_by_fields(space_format, field_names, tuple) return tuple end - local positions = {} local transformed_tuple = {} - for i, field in ipairs(space_format) do - positions[field.name] = i - end - for i, field_name in ipairs(field_names) do - local fieldno = positions[field_name] + local fieldno = fieldno_map[field_name] if fieldno == nil then return nil, FilterFieldsError:new( 'Space format doesn\'t contain field named %q', field_name @@ -145,6 +166,8 @@ function select_plan.new(space, conditions, opts) local scan_value local scan_condition_num + local fieldno_map = utils.get_format_fieldno_map(space_format) + -- search index to iterate over for i, condition in ipairs(conditions) do scan_index = get_index_for_condition(space_indexes, space_format, condition) @@ -176,9 +199,7 @@ function select_plan.new(space, conditions, opts) -- handle opts.first local total_tuples_count - local scan_after_tuple, err = construct_after_tuple_by_fields( - space_format, field_names, opts.after_tuple - ) + local scan_after_tuple, err = construct_after_tuple_by_fields(fieldno_map, field_names, opts.after_tuple) if err ~= nil then return nil, err end @@ -230,9 +251,10 @@ function select_plan.new(space, conditions, opts) local sharding_index = opts.sharding_key_as_index_obj or primary_index -- get sharding key value - local sharding_key - if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then - sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index) + local sharding_key = get_sharding_key_from_scan_value(scan_value, scan_index, scan_iter, sharding_index) + + if sharding_key == nil then + sharding_key = extract_sharding_key_from_conditions(conditions, sharding_index, space_indexes, fieldno_map) end local plan = { diff --git a/test/entrypoint/srv_ddl.lua b/test/entrypoint/srv_ddl.lua index 61c35da08..f240c743e 100755 --- a/test/entrypoint/srv_ddl.lua +++ b/test/entrypoint/srv_ddl.lua @@ -61,6 +61,14 @@ package.preload['customers-storage'] = function() {path = 'name', is_nullable = false, type = 'string'}, }, } + local age_index = { + name = 'age', + type = 'TREE', + unique = false, + parts = { + {path = 'age', is_nullable = false, type = 'number'}, + }, + } local secondary_index = { name = 'secondary', type = 'TREE', @@ -71,6 +79,17 @@ package.preload['customers-storage'] = function() }, } + local three_fields_index = { + name = 'three_fields', + type = 'TREE', + unique = false, + parts = { + {path = 'age', is_nullable = false, type = 'number'}, + {path = 'name', is_nullable = false, type = 'string'}, + {path = 'id', is_nullable = false, type = 'unsigned'}, + }, + } + local customers_name_key_schema = table.deepcopy(customers_schema) customers_name_key_schema.sharding_key = {'name'} table.insert(customers_name_key_schema.indexes, primary_index) @@ -100,6 +119,18 @@ package.preload['customers-storage'] = function() table.insert(customers_age_key_schema.indexes, primary_index) table.insert(customers_age_key_schema.indexes, bucket_id_index) + local customers_name_age_key_different_indexes_schema = table.deepcopy(customers_schema) + customers_name_age_key_different_indexes_schema.sharding_key = {'name', 'age'} + table.insert(customers_name_age_key_different_indexes_schema.indexes, primary_index) + table.insert(customers_name_age_key_different_indexes_schema.indexes, bucket_id_index) + table.insert(customers_name_age_key_different_indexes_schema.indexes, age_index) + + local customers_name_age_key_three_fields_index_schema = table.deepcopy(customers_schema) + customers_name_age_key_three_fields_index_schema.sharding_key = {'name', 'age'} + table.insert(customers_name_age_key_three_fields_index_schema.indexes, primary_index_id) + table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index) + table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index) + local schema = { spaces = { customers_name_key = customers_name_key_schema, @@ -107,6 +138,8 @@ package.preload['customers-storage'] = function() customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema, customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema, customers_age_key = customers_age_key_schema, + customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema, + customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema, } } diff --git a/test/helpers/storage_stat.lua b/test/helpers/storage_stat.lua index 63cdd1681..06feb42c5 100644 --- a/test/helpers/storage_stat.lua +++ b/test/helpers/storage_stat.lua @@ -95,4 +95,16 @@ function storage_stat.diff(a, b) return diff end +-- Accepts collect (or diff) return value and returns +-- total number of select requests across all storages. +function storage_stat.total(stats) + local total = 0 + + for _, stat in pairs(stats) do + total = total + (stat.select_requests or 0) + end + + return total +end + return storage_stat diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 3bdaf67b6..1fa3bdfe5 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -51,6 +51,8 @@ pgroup.before_each(function(g) helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index') helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key') helpers.truncate_space_on_cluster(g.cluster, 'customers_age_key') + helpers.truncate_space_on_cluster(g.cluster, 'customers_name_age_key_different_indexes') + helpers.truncate_space_on_cluster(g.cluster, 'customers_name_age_key_three_fields_index') end) pgroup.test_insert_object = function(g) @@ -279,13 +281,7 @@ pgroup.test_select = function(g) t.assert_equals(result.rows[1], tuple) end --- TODO: After enabling support of sharding keys that are not equal to primary --- keys, we should handle it differently: it is not enough to look just on scan --- value, we should traverse all conditions. Now missed cases lead to --- map-reduce. Will be resolved in #213. -pgroup.test_select_wont_lead_map_reduce = function(g) - local space_name = 'customers_name_key_uniq_index' - +local prepare_data_name_sharding_key = function(g, space_name) local conn_s1 = g.cluster:server('s1-master').net_box local conn_s2 = g.cluster:server('s2-master').net_box @@ -301,12 +297,85 @@ pgroup.test_select_wont_lead_map_reduce = function(g) -- bucket_id is 1161, storage is s-2 local result = conn_s2.space[space_name]:insert({4, 1161, 'James Joyce', 59}) t.assert_not_equals(result, nil) +end + +local prepare_data_name_age_sharding_key = function(g, space_name) + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- bucket_id is 2310, storage is s-1 + local result = conn_s1.space[space_name]:insert({1, 2310, 'Viktor Pelevin', 58}) + t.assert_not_equals(result, nil) + -- bucket_id is 63, storage is s-2 + local result = conn_s2.space[space_name]:insert({2, 63, 'Isaac Asimov', 72}) + t.assert_not_equals(result, nil) + -- bucket_id is 2901, storage is s-1 + local result = conn_s1.space[space_name]:insert({3, 2901, 'Aleksandr Solzhenitsyn', 89}) + t.assert_not_equals(result, nil) + -- bucket_id is 1365, storage is s-2 + local result = conn_s2.space[space_name]:insert({4, 1365, 'James Joyce', 59}) + t.assert_not_equals(result, nil) +end + +local cases = { + select_for_indexed_sharding_key = { + space_name = 'customers_name_key_uniq_index', + prepare_data = prepare_data_name_sharding_key, + conditions = {{'==', 'name', 'Viktor Pelevin'}}, + }, + select_for_sharding_key_as_index_part = { + space_name = 'customers_name_key', + prepare_data = prepare_data_name_sharding_key, + conditions = {{'==', 'name', 'Viktor Pelevin'}}, + }, + select_for_sharding_key_as_several_indexes_parts = { + space_name = 'customers_name_age_key_different_indexes', + prepare_data = prepare_data_name_age_sharding_key, + conditions = {{'==', 'name', 'Viktor Pelevin'}, {'==', 'age', 58}}, + }, + select_by_index_cond_for_sharding_key_as_several_indexes_parts = { + space_name = 'customers_name_age_key_different_indexes', + prepare_data = prepare_data_name_age_sharding_key, + conditions = {{'==', 'id', {1, 'Viktor Pelevin'}}, {'==', 'age', 58}}, + }, + select_by_partial_index_cond_for_sharding_key_included = { + space_name = 'customers_name_age_key_three_fields_index', + prepare_data = prepare_data_name_age_sharding_key, + conditions = {{'==', 'three_fields', {58, 'Viktor Pelevin', nil}}}, + }, +} + +for name, case in pairs(cases) do + pgroup[('test_%s_wont_lead_to_map_reduce'):format(name)] = function(g) + case.prepare_data(g, case.space_name) + + local stat_a = storage_stat.collect(g.cluster) + + local result, err = g.cluster.main_server.net_box:call('crud.select', { + case.space_name, case.conditions + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local stat_b = storage_stat.collect(g.cluster) + + -- Check a number of select() requests made by CRUD on cluster's storages + -- after calling select() on a router. Make sure only a single storage has + -- a single select() request. Otherwise we lead to map-reduce. + local stats = storage_stat.diff(stat_b, stat_a) + t.assert_equals(storage_stat.total(stats), 1, 'Select request was not a map reduce') + end +end + +pgroup.test_select_for_part_of_sharding_key_will_lead_to_map_reduce = function(g) + local space_name = 'customers_name_age_key_different_indexes' + prepare_data_name_age_sharding_key(g, space_name) local stat_a = storage_stat.collect(g.cluster) - -- Select a tuple with name 'Viktor Pelevin'. local result, err = g.cluster.main_server.net_box:call('crud.select', { - space_name, {{'==', 'name', 'Viktor Pelevin'}} + space_name, {{'==', 'age', 58}}, }) t.assert_equals(err, nil) t.assert_not_equals(result, nil) @@ -315,16 +384,10 @@ pgroup.test_select_wont_lead_map_reduce = function(g) local stat_b = storage_stat.collect(g.cluster) -- Check a number of select() requests made by CRUD on cluster's storages - -- after calling select() on a router. Make sure only a single storage has - -- a single select() request. Otherwise we lead map-reduce. - t.assert_equals(storage_stat.diff(stat_b, stat_a), { - ['s-1'] = { - select_requests = 0, - }, - ['s-2'] = { - select_requests = 1, - }, - }) + -- after calling select() on a router. Make sure it was a map-reduce + -- since we do not have sharding key values in conditions. + local stats = storage_stat.diff(stat_b, stat_a) + t.assert_equals(storage_stat.total(stats), 2, 'Select request was a map reduce') end pgroup.test_select_secondary_idx = function(g) diff --git a/test/unit/select_plan_test.lua b/test/unit/select_plan_test.lua index cc0dee3ff..3227b67f4 100644 --- a/test/unit/select_plan_test.lua +++ b/test/unit/select_plan_test.lua @@ -185,7 +185,7 @@ g.test_is_scan_by_full_sharding_key_eq = function() t.assert_equals(err, nil) t.assert_equals(plan.total_tuples_count, nil) - t.assert_equals(plan.sharding_key, nil) + t.assert_equals(plan.sharding_key, {15}) -- gt local plan, err = select_plan.new(box.space.customers, {