From 6c67d9bf68181a632c31f94b2e9ab9bb5faf1818 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Mon, 22 Nov 2021 16:46:00 +0300 Subject: [PATCH] Extract sharding key from conditions PR #181 introduced support of DDL sharding keys. But if sharding key hasn't got a separate index in schema, select with equal conditions for all required sharding key fields still led to map-reduce instead of a single storage call. This patch introduces impoved support of sharding keys extraction and fixes the issue. Closes #213 --- crud/select/plan.lua | 69 ++++++++++++++-- test/entrypoint/srv_ddl.lua | 15 ++++ test/helpers/storage_stat.lua | 12 +++ test/integration/ddl_sharding_key_test.lua | 95 +++++++++++++++++----- 4 files changed, 164 insertions(+), 27 deletions(-) diff --git a/crud/select/plan.lua b/crud/select/plan.lua index 260ed2078..194e14f46 100644 --- a/crud/select/plan.lua +++ b/crud/select/plan.lua @@ -1,6 +1,7 @@ local errors = require('errors') local compare_conditions = require('crud.compare.conditions') +local sharding_key_module = require('crud.common.sharding_key') local utils = require('crud.common.utils') local dev_checks = require('crud.common.dev_checks') @@ -48,6 +49,46 @@ local function get_index_for_condition(space_indexes, space_format, condition) end end +local function extract_sharding_key_from_conditions(conditions, ddl_sharding_key, space_indexes, space_field_positions) + if ddl_sharding_key == nil then + return nil + end + + -- If name is both valid index name and field name, + -- it is interpreted as index name. + local filled_fields = {} + for _, condition in ipairs(conditions) do + if condition.operator ~= compare_conditions.operators.EQ then + goto continue + end + + local index = space_indexes[condition.operand] + if index ~= nil then + for i, part in ipairs(index.parts) do + filled_fields[part.fieldno] = condition.values[i] + end + + goto continue + end + + local fieldno = space_field_positions[condition.operand] + filled_fields[fieldno] = condition.values[1] + + ::continue:: + end + + local sharding_key = {} + for i, v in ipairs(ddl_sharding_key.parts) do + if filled_fields[v.fieldno] == nil then + return nil + end + + sharding_key[i] = filled_fields[v.fieldno] + end + + return sharding_key +end + local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index) if #scan_value < #sharding_index.parts then return nil @@ -90,7 +131,7 @@ end -- and these fields are ordered by field_names + primary key + scan key -- this order can be differ from order in space format -- so we need to cast after_tuple to space format for scrolling tuples on storage -local function construct_after_tuple_by_fields(space_format, field_names, tuple) +local function construct_after_tuple_by_fields(space_field_positions, field_names, tuple) if tuple == nil then return nil end @@ -99,15 +140,10 @@ local function construct_after_tuple_by_fields(space_format, field_names, tuple) return tuple end - local positions = {} local transformed_tuple = {} - for i, field in ipairs(space_format) do - positions[field.name] = i - end - for i, field_name in ipairs(field_names) do - local fieldno = positions[field_name] + local fieldno = space_field_positions[field_name] if fieldno == nil then return nil, FilterFieldsError:new( 'Space format doesn\'t contain field named %q', field_name @@ -145,6 +181,12 @@ function select_plan.new(space, conditions, opts) local scan_value local scan_condition_num + local space_field_positions = {} + + for i, field in ipairs(space_format) do + space_field_positions[field.name] = i + end + -- search index to iterate over for i, condition in ipairs(conditions) do scan_index = get_index_for_condition(space_indexes, space_format, condition) @@ -177,7 +219,7 @@ function select_plan.new(space, conditions, opts) -- handle opts.first local total_tuples_count local scan_after_tuple, err = construct_after_tuple_by_fields( - space_format, field_names, opts.after_tuple + space_field_positions, field_names, opts.after_tuple ) if err ~= nil then return nil, err @@ -235,6 +277,17 @@ function select_plan.new(space, conditions, opts) sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index) end + if sharding_key == nil then + -- Ignore possible errors to preserve old behavior + -- since here it affects only extracting sharding_key from + -- conditions and it should not be critical to select call. + local ddl_sharding_key = sharding_key_module.fetch_on_router(space_name) + + sharding_key = extract_sharding_key_from_conditions( + conditions, ddl_sharding_key, space_indexes, space_field_positions + ) + end + local plan = { conditions = conditions, space_name = space_name, diff --git a/test/entrypoint/srv_ddl.lua b/test/entrypoint/srv_ddl.lua index 61c35da08..3f81a4f4b 100755 --- a/test/entrypoint/srv_ddl.lua +++ b/test/entrypoint/srv_ddl.lua @@ -61,6 +61,14 @@ package.preload['customers-storage'] = function() {path = 'name', is_nullable = false, type = 'string'}, }, } + local age_index = { + name = 'age', + type = 'TREE', + unique = false, + parts = { + {path = 'age', is_nullable = false, type = 'number'}, + }, + } local secondary_index = { name = 'secondary', type = 'TREE', @@ -100,6 +108,12 @@ package.preload['customers-storage'] = function() table.insert(customers_age_key_schema.indexes, primary_index) table.insert(customers_age_key_schema.indexes, bucket_id_index) + local customers_name_age_key_different_indexes_schema = table.deepcopy(customers_schema) + customers_name_age_key_different_indexes_schema.sharding_key = {'name', 'age'} + table.insert(customers_name_age_key_different_indexes_schema.indexes, primary_index) + table.insert(customers_name_age_key_different_indexes_schema.indexes, bucket_id_index) + table.insert(customers_name_age_key_different_indexes_schema.indexes, age_index) + local schema = { spaces = { customers_name_key = customers_name_key_schema, @@ -107,6 +121,7 @@ package.preload['customers-storage'] = function() customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema, customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema, customers_age_key = customers_age_key_schema, + customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema, } } diff --git a/test/helpers/storage_stat.lua b/test/helpers/storage_stat.lua index 63cdd1681..06feb42c5 100644 --- a/test/helpers/storage_stat.lua +++ b/test/helpers/storage_stat.lua @@ -95,4 +95,16 @@ function storage_stat.diff(a, b) return diff end +-- Accepts collect (or diff) return value and returns +-- total number of select requests across all storages. +function storage_stat.total(stats) + local total = 0 + + for _, stat in pairs(stats) do + total = total + (stat.select_requests or 0) + end + + return total +end + return storage_stat diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 3bdaf67b6..c0f1a9969 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -51,6 +51,7 @@ pgroup.before_each(function(g) helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index') helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key') helpers.truncate_space_on_cluster(g.cluster, 'customers_age_key') + helpers.truncate_space_on_cluster(g.cluster, 'customers_name_age_key_different_indexes') end) pgroup.test_insert_object = function(g) @@ -279,13 +280,7 @@ pgroup.test_select = function(g) t.assert_equals(result.rows[1], tuple) end --- TODO: After enabling support of sharding keys that are not equal to primary --- keys, we should handle it differently: it is not enough to look just on scan --- value, we should traverse all conditions. Now missed cases lead to --- map-reduce. Will be resolved in #213. -pgroup.test_select_wont_lead_map_reduce = function(g) - local space_name = 'customers_name_key_uniq_index' - +local prepare_data_name_sharding_key = function(g, space_name) local conn_s1 = g.cluster:server('s1-master').net_box local conn_s2 = g.cluster:server('s2-master').net_box @@ -301,12 +296,80 @@ pgroup.test_select_wont_lead_map_reduce = function(g) -- bucket_id is 1161, storage is s-2 local result = conn_s2.space[space_name]:insert({4, 1161, 'James Joyce', 59}) t.assert_not_equals(result, nil) +end + +local prepare_data_name_age_sharding_key = function(g, space_name) + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- bucket_id is 2310, storage is s-1 + local result = conn_s1.space[space_name]:insert({1, 2310, 'Viktor Pelevin', 58}) + t.assert_not_equals(result, nil) + -- bucket_id is 63, storage is s-2 + local result = conn_s2.space[space_name]:insert({2, 63, 'Isaac Asimov', 72}) + t.assert_not_equals(result, nil) + -- bucket_id is 2901, storage is s-1 + local result = conn_s1.space[space_name]:insert({3, 2901, 'Aleksandr Solzhenitsyn', 89}) + t.assert_not_equals(result, nil) + -- bucket_id is 1365, storage is s-2 + local result = conn_s2.space[space_name]:insert({4, 1365, 'James Joyce', 59}) + t.assert_not_equals(result, nil) +end + +local cases = { + select_for_indexed_sharding_key = { + space_name = 'customers_name_key_uniq_index', + prepare_data = prepare_data_name_sharding_key, + conditions = {{'==', 'name', 'Viktor Pelevin'}}, + }, + select_for_sharding_key_as_index_part = { + space_name = 'customers_name_key', + prepare_data = prepare_data_name_sharding_key, + conditions = {{'==', 'name', 'Viktor Pelevin'}}, + }, + select_for_sharding_key_as_several_indexes_parts = { + space_name = 'customers_name_age_key_different_indexes', + prepare_data = prepare_data_name_age_sharding_key, + conditions = {{'==', 'name', 'Viktor Pelevin'}, {'==', 'age', 58}}, + }, + select_by_index_cond_for_sharding_key_as_several_indexes_parts = { + space_name = 'customers_name_age_key_different_indexes', + prepare_data = prepare_data_name_age_sharding_key, + conditions = {{'==', 'id', {1, 'Viktor Pelevin'}}, {'==', 'age', 58}}, + } +} + +for name, case in pairs(cases) do + pgroup[('test_%s_wont_lead_to_map_reduce'):format(name)] = function(g) + case.prepare_data(g, case.space_name) + + local stat_a = storage_stat.collect(g.cluster) + + local result, err = g.cluster.main_server.net_box:call('crud.select', { + case.space_name, case.conditions + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local stat_b = storage_stat.collect(g.cluster) + + -- Check a number of select() requests made by CRUD on cluster's storages + -- after calling select() on a router. Make sure only a single storage has + -- a single select() request. Otherwise we lead to map-reduce. + local stats = storage_stat.diff(stat_b, stat_a) + t.assert_equals(storage_stat.total(stats), 1, 'Select request was not a map reduce') + end +end + +pgroup.test_select_for_part_of_sharding_key_will_lead_to_map_reduce = function(g) + local space_name = 'customers_name_age_key_different_indexes' + prepare_data_name_age_sharding_key(g, space_name) local stat_a = storage_stat.collect(g.cluster) - -- Select a tuple with name 'Viktor Pelevin'. local result, err = g.cluster.main_server.net_box:call('crud.select', { - space_name, {{'==', 'name', 'Viktor Pelevin'}} + space_name, {{'==', 'age', 58}}, }) t.assert_equals(err, nil) t.assert_not_equals(result, nil) @@ -315,16 +378,10 @@ pgroup.test_select_wont_lead_map_reduce = function(g) local stat_b = storage_stat.collect(g.cluster) -- Check a number of select() requests made by CRUD on cluster's storages - -- after calling select() on a router. Make sure only a single storage has - -- a single select() request. Otherwise we lead map-reduce. - t.assert_equals(storage_stat.diff(stat_b, stat_a), { - ['s-1'] = { - select_requests = 0, - }, - ['s-2'] = { - select_requests = 1, - }, - }) + -- after calling select() on a router. Make sure it was a map-reduce + -- since we do not have sharding key values in conditions. + local stats = storage_stat.diff(stat_b, stat_a) + t.assert_equals(storage_stat.total(stats), 2, 'Select request was a map reduce') end pgroup.test_select_secondary_idx = function(g)