From b9b1692b67813924b10c11c36684f53089a04ad5 Mon Sep 17 00:00:00 2001 From: Sergey Bronnikov Date: Fri, 26 Nov 2021 16:36:15 +0300 Subject: [PATCH] Support custom sharding key in delete, update and get Part of #166 Reviewed-by: Alexander Turenko Reviewed-by: Oleg Babin --- crud/common/sharding_key.lua | 75 +++++++ crud/common/sharding_key_cache.lua | 10 + crud/common/utils.lua | 14 ++ crud/delete.lua | 13 +- crud/get.lua | 13 +- crud/update.lua | 13 +- test/entrypoint/srv_ddl.lua | 6 + test/integration/ddl_sharding_key_test.lua | 217 +++++++++++++++++++++ test/unit/sharding_key_test.lua | 129 ++++++++++++ 9 files changed, 487 insertions(+), 3 deletions(-) diff --git a/crud/common/sharding_key.lua b/crud/common/sharding_key.lua index 6bb68c93c..27adf7830 100644 --- a/crud/common/sharding_key.lua +++ b/crud/common/sharding_key.lua @@ -7,6 +7,7 @@ local dev_checks = require('crud.common.dev_checks') local cache = require('crud.common.sharding_key_cache') local utils = require('crud.common.utils') +local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false}) local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false}) local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false}) @@ -151,12 +152,86 @@ function sharding_key_module.fetch_on_router(space_name, timeout) "Fetching sharding key for space '%s' is failed", space_name) end +-- Make sure sharding key definition is a part of primary key. +local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) + dev_checks('string', 'table', 'table') + + if cache.is_part_of_pk[space_name] ~= nil then + return cache.is_part_of_pk[space_name] + end + + local is_part_of_pk = true + local pk_fieldno_map = utils.get_index_fieldno_map(primary_index_parts) + for _, part in ipairs(sharding_key_as_index_obj.parts) do + if pk_fieldno_map[part.fieldno] == nil then + is_part_of_pk = false + break + end + end + cache.is_part_of_pk[space_name] = is_part_of_pk + + return is_part_of_pk +end + +-- Build an array with sharding key values. Function extracts those values from +-- primary key that are part of sharding key (passed as index object). +local function extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) + dev_checks('table', 'table', 'table') + + -- TODO: extract_from_index() calculates primary_index_parts on each + -- request. It is better to cache it's value. + -- https://github.com/tarantool/crud/issues/243 + local primary_index_fieldno_map = utils.get_index_fieldno_map(primary_index_parts) + + local sharding_key = {} + for _, part in ipairs(sharding_key_as_index_obj.parts) do + -- part_number cannot be nil because earlier we checked that tuple + -- field names defined in sharding key definition are part of primary + -- key. + local part_number = primary_index_fieldno_map[part.fieldno] + assert(part_number ~= nil) + local field_value = primary_key[part_number] + table.insert(sharding_key, field_value) + end + + return sharding_key +end + +-- Extract sharding key from pk. +-- Returns a table with sharding key or pair of nil and error. +function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout) + dev_checks('string', 'table', '?', '?number') + + local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name, timeout) + if err ~= nil then + return nil, err + end + if sharding_key_as_index_obj == nil then + return primary_key + end + + local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) + if res == false then + return nil, ShardingKeyError:new( + "Sharding key for space %q is missed in primary index, specify bucket_id", + space_name + ) + end + if type(primary_key) ~= 'table' then + primary_key = {primary_key} + end + + return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) +end + function sharding_key_module.init() _G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage end sharding_key_module.internal = { as_index_object = as_index_object, + extract_from_index = extract_from_index, + is_part_of_pk = is_part_of_pk, } return sharding_key_module diff --git a/crud/common/sharding_key_cache.lua b/crud/common/sharding_key_cache.lua index 4be7e7b23..a1ab39652 100644 --- a/crud/common/sharding_key_cache.lua +++ b/crud/common/sharding_key_cache.lua @@ -4,5 +4,15 @@ local sharding_key_cache = {} sharding_key_cache.sharding_key_as_index_obj_map = nil sharding_key_cache.fetch_lock = fiber.channel(1) +sharding_key_cache.is_part_of_pk = {} + +function sharding_key_cache.drop_caches() + sharding_key_cache.sharding_key_as_index_obj_map = nil + if sharding_key_cache.fetch_lock ~= nil then + sharding_key_cache.fetch_lock:close() + end + sharding_key_cache.fetch_lock = fiber.channel(1) + sharding_key_cache.is_part_of_pk = {} +end return sharding_key_cache diff --git a/crud/common/utils.lua b/crud/common/utils.lua index e23d48331..d7a629417 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -421,6 +421,20 @@ function utils.get_bucket_id_fieldno(space, shard_index_name) return bucket_id_index.parts[1].fieldno end +-- Build a map with field number as a keys and part number +-- as a values using index parts as a source. +function utils.get_index_fieldno_map(index_parts) + dev_checks('table') + + local fieldno_map = {} + for i, part in ipairs(index_parts) do + local fieldno = part.fieldno + fieldno_map[fieldno] = i + end + + return fieldno_map +end + -- Build a map with field names as a keys and fieldno's -- as a values using space format as a source. function utils.get_format_fieldno_map(space_format) diff --git a/crud/delete.lua b/crud/delete.lua index 9f0497fe9..deca7318b 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -5,6 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') +local sharding_key_module = require('crud.common.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -55,7 +56,17 @@ local function call_delete_on_router(space_name, key, opts) key = key:totable() end - local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id) + local sharding_key = key + if opts.bucket_id == nil then + local err + local primary_index_parts = space.index[0].parts + sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + if err ~= nil then + return nil, err + end + end + + local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) local call_opts = { mode = 'write', timeout = opts.timeout, diff --git a/crud/get.lua b/crud/get.lua index f00957e54..7f474e859 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -5,6 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') +local sharding_key_module = require('crud.common.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -58,7 +59,17 @@ local function call_get_on_router(space_name, key, opts) key = key:totable() end - local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id) + local sharding_key = key + if opts.bucket_id == nil then + local err + local primary_index_parts = space.index[0].parts + sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + if err ~= nil then + return nil, err + end + end + + local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) local call_opts = { mode = opts.mode or 'read', prefer_replica = opts.prefer_replica, diff --git a/crud/update.lua b/crud/update.lua index 3c5513108..8e7b1aa0c 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -5,6 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') +local sharding_key_module = require('crud.common.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -83,6 +84,16 @@ local function call_update_on_router(space_name, key, user_operations, opts) key = key:totable() end + local sharding_key = key + if opts.bucket_id == nil then + local err + local primary_index_parts = space.index[0].parts + sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + if err ~= nil then + return nil, err + end + end + local operations = user_operations if not utils.tarantool_supports_fieldpaths() then operations, err = utils.convert_operations(user_operations, space_format) @@ -91,7 +102,7 @@ local function call_update_on_router(space_name, key, user_operations, opts) end end - local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id) + local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) local call_opts = { mode = 'write', timeout = opts.timeout, diff --git a/test/entrypoint/srv_ddl.lua b/test/entrypoint/srv_ddl.lua index c1d1e3ad1..61c35da08 100755 --- a/test/entrypoint/srv_ddl.lua +++ b/test/entrypoint/srv_ddl.lua @@ -95,12 +95,18 @@ package.preload['customers-storage'] = function() table.insert(customers_secondary_idx_name_key_schema.indexes, secondary_index) table.insert(customers_secondary_idx_name_key_schema.indexes, bucket_id_index) + local customers_age_key_schema = table.deepcopy(customers_schema) + customers_age_key_schema.sharding_key = {'age'} + table.insert(customers_age_key_schema.indexes, primary_index) + table.insert(customers_age_key_schema.indexes, bucket_id_index) + local schema = { spaces = { customers_name_key = customers_name_key_schema, customers_name_key_uniq_index = customers_name_key_uniq_index_schema, customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema, customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema, + customers_age_key = customers_age_key_schema, } } diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 2ff890991..670a775bb 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -50,6 +50,7 @@ pgroup.before_each(function(g) helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_uniq_index') helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index') helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key') + helpers.truncate_space_on_cluster(g.cluster, 'customers_age_key') end) pgroup.test_insert_object = function(g) @@ -374,3 +375,219 @@ pgroup.test_select_non_unique_index = function(g) t.assert_not_equals(result, nil) t.assert_equals(#result.rows, 2) end + +pgroup.test_update = function(g) + -- bucket_id is 1366, storage is s-2 + local tuple = {2, 1366, 'Ivan', 10} + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- Put tuple with to s1 replicaset. + local result = conn_s1.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple with to s2 replicaset. + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Update a tuple. + local update_operations = { + {'+', 'age', 10}, + } + local result, err = g.cluster.main_server.net_box:call('crud.update', { + 'customers_name_key', {2, 'Ivan'}, update_operations, + }) + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows, {{2, 1366, 'Ivan', 20}}) + + -- Tuple on s1 replicaset was not updated. + local result = conn_s1.space['customers_name_key']:get({2, 'Ivan'}) + t.assert_equals(result, {2, 1366, 'Ivan', 10}) + + -- Tuple on s2 replicaset was updated. + local result = conn_s2.space['customers_name_key']:get({2, 'Ivan'}) + t.assert_equals(result, {2, 1366, 'Ivan', 20}) +end + +pgroup.test_get = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {7, 596, 'Dimitrion', 20} + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Get a tuple. + local result, err = g.cluster.main_server.net_box:call('crud.get', { + 'customers_name_key', {7, 'Dimitrion'}, + }) + t.assert_equals(err, nil) + t.assert_equals(result.rows, {{7, 596, 'Dimitrion', 20}}) +end + +pgroup.test_delete = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {7, 596, 'Dimitrion', 20} + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- Put tuple to s1 replicaset. + local result = conn_s1.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Delete tuple. + local _, err = g.cluster.main_server.net_box:call('crud.delete', { + 'customers_name_key', {7, 'Dimitrion'}, + }) + t.assert_equals(err, nil) + + -- There is a tuple on s1 replicaset. + local result = conn_s1.space['customers_name_key']:get({7, 'Dimitrion'}) + t.assert_equals(result, {7, 596, 'Dimitrion', 20}) + + -- There is no tuple on s2 replicaset. + local result = conn_s2.space['customers_name_key']:get({7, 'Dimitrion'}) + t.assert_equals(result, nil) +end + +pgroup.test_delete_incomplete_sharding_key = function(g) + local tuple = {2, box.NULL, 'Viktor Pelevin', 58} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_age_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local result, err = g.cluster.main_server.net_box:call('crud.delete', { + 'customers_age_key', {58, 'Viktor Pelevin'} + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_age_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_get_incomplete_sharding_key = function(g) + local tuple = {2, box.NULL, 'Viktor Pelevin', 58} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_age_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local result, err = g.cluster.main_server.net_box:call('crud.get', { + 'customers_age_key', {58, 'Viktor Pelevin'} + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_age_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_update_incomplete_sharding_key = function(g) + local tuple = {2, box.NULL, 'Viktor Pelevin', 58} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_age_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local update_operations = { + {'=', 'age', 60}, + } + + local result, err = g.cluster.main_server.net_box:call('crud.update', { + 'customers_age_key', {2, 'Viktor Pelevin'}, update_operations, + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_age_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_get_secondary_idx = function(g) + local tuple = {4, box.NULL, 'Leo', 44} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_secondary_idx_name_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + -- get + local result, err = g.cluster.main_server.net_box:call('crud.get', + {'customers_secondary_idx_name_key', {4, 'Leo'}}) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_secondary_idx_name_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_update_secondary_idx = function(g) + local tuple = {6, box.NULL, 'Victor', 58} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_secondary_idx_name_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local update_operations = { + {'=', 'age', 58}, + } + + local result, err = g.cluster.main_server.net_box:call('crud.update', { + 'customers_secondary_idx_name_key', {6, 'Victor'}, update_operations, + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_secondary_idx_name_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_delete_secondary_idx = function(g) + local tuple = {8, box.NULL, 'Alexander', 37} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_secondary_idx_name_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local result, err = g.cluster.main_server.net_box:call('crud.delete', { + 'customers_secondary_idx_name_key', {8, 'Alexander'} + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_secondary_idx_name_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end diff --git a/test/unit/sharding_key_test.lua b/test/unit/sharding_key_test.lua index 1949f87b7..110eab1c7 100644 --- a/test/unit/sharding_key_test.lua +++ b/test/unit/sharding_key_test.lua @@ -1,5 +1,6 @@ local t = require('luatest') local sharding_key_module = require('crud.common.sharding_key') +local cache = require('crud.common.sharding_key_cache') local utils = require('crud.common.utils') local helpers = require('test.helper') @@ -29,6 +30,7 @@ g.after_each(function() box.space._ddl_sharding_key:drop() end box.space.fetch_on_storage:drop() + cache.drop_caches() end) g.test_as_index_object_positive = function() @@ -102,3 +104,130 @@ g.test_fetch_on_storage_negative = function() local metadata_map = sharding_key_module.fetch_on_storage() t.assert_equals(metadata_map, nil) end + +g.test_extract_from_index_sharding_key_direct_order = function() + local primary_index_parts = { + {fieldno = 1}, + {fieldno = 2}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 1}, + {fieldno = 2}, + } + } + local primary_key = {'name', 'age'} + + local extract_from_index = sharding_key_module.internal.extract_from_index + local sharding_key = extract_from_index(primary_key, + primary_index_parts, + sharding_key_as_index_obj) + t.assert_equals(sharding_key, {'name', 'age'}) +end + +g.test_extract_from_index_sharding_key_reverse_order = function() + local primary_index_parts = { + {fieldno = 1}, + {fieldno = 2}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + {fieldno = 1}, + } + } + local primary_key = {'name', 'age'} + + local extract_from_index = sharding_key_module.internal.extract_from_index + local sharding_key = extract_from_index(primary_key, + primary_index_parts, + sharding_key_as_index_obj) + t.assert_equals(sharding_key, {'age', 'name'}) +end + +g.test_extract_from_index_sharding_key_single_field = function() + local primary_index_parts = { + {fieldno = 1}, + {fieldno = 2}, + {fieldno = 3}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + } + } + local primary_key = {'name', 'age', 'location'} + + local extract_from_index = sharding_key_module.internal.extract_from_index + local sharding_key = extract_from_index(primary_key, + primary_index_parts, + sharding_key_as_index_obj) + t.assert_equals(sharding_key, {'age'}) +end + +g.test_extract_from_index_sharding_key_none_fields = function() + local primary_index_parts = { + {fieldno = 1}, + {fieldno = 3}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + } + } + local primary_key = {'name', 'age', 'location'} + + local extract_from_index = sharding_key_module.internal.extract_from_index + local ok, err = pcall(extract_from_index, primary_key, + primary_index_parts, + sharding_key_as_index_obj) + t.assert_equals(ok, false) + t.assert_str_contains(err, 'assertion failed') +end + +g.test_get_index_fieldno_map = function() + local index_parts = { + {fieldno = 2}, + {fieldno = 3}, + } + + local fieldno_map = utils.get_index_fieldno_map(index_parts) + t.assert_equals(fieldno_map, { + [2] = 1, + [3] = 2 + }) +end + +g.test_is_part_of_pk_positive = function() + local space_name = 'is_part_of_pk' + local index_parts = { + {fieldno = 2}, + {fieldno = 3}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + } + } + + local is_part_of_pk = sharding_key_module.internal.is_part_of_pk + local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) + t.assert_equals(res, true) +end + +g.test_is_part_of_pk_negative = function() + local space_name = 'is_part_of_pk' + local index_parts = { + {fieldno = 1}, + {fieldno = 3}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + } + } + + local is_part_of_pk = sharding_key_module.internal.is_part_of_pk + local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) + t.assert_equals(res, false) +end