Skip to content

Commit

Permalink
Support custom sharding key in select
Browse files Browse the repository at this point in the history
NOTE: Prior to this patch CRUD assumes an index is unique. It was true
for the primary key, but it is not guaranteed for a sharding key. Patch
adds a tests with select() for non-unique index that failed due to
assumption regarding uniq index in crud/select/plan.lua. Seems we
can remove this condition and fix tests that relies on
total_tuple_count == 1. See also related discussion in [1].

1. #181 (comment)

Part of #166

Reviewed-by: Oleg Babin <[email protected]>
Reviewed-by: Alexander Turenko <[email protected]>
  • Loading branch information
ligurio committed Nov 26, 2021
1 parent a987ee4 commit cab9555
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 9 deletions.
6 changes: 6 additions & 0 deletions crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local common = require('crud.select.compat.common')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -50,12 +51,17 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("Space %q doesn't exist", space_name), true
end
local space_format = space:format()
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
if err ~= nil then
return nil, err
end

-- plan select
local plan, err = select_plan.new(space, conditions, {
first = opts.first,
after_tuple = opts.after,
field_names = opts.field_names,
sharding_key_as_index_obj = sharding_key_as_index_obj,
})

if err ~= nil then
Expand Down
6 changes: 6 additions & 0 deletions crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -102,13 +103,18 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("Space %q doesn't exist", space_name), true
end
local space_format = space:format()
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
if err ~= nil then
return nil, err
end

-- plan select
local plan, err = select_plan.new(space, conditions, {
first = opts.first,
after_tuple = opts.after,
field_names = opts.field_names,
force_map_call = opts.force_map_call,
sharding_key_as_index_obj = sharding_key_as_index_obj,
})

if err ~= nil then
Expand Down
8 changes: 2 additions & 6 deletions crud/select/plan.lua
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ function select_plan.new(space, conditions, opts)
after_tuple = '?table|cdata',
field_names = '?table',
force_map_call = '?boolean',
sharding_key_as_index_obj = '?table',
})

conditions = conditions ~= nil and conditions or {}
Expand Down Expand Up @@ -226,19 +227,14 @@ function select_plan.new(space, conditions, opts)
end
end

local sharding_index = primary_index -- XXX: only sharding by primary key is supported
local sharding_index = opts.sharding_key_as_index_obj or primary_index

-- get sharding key value
local sharding_key
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
end

if sharding_key ~= nil and opts.force_map_call ~= true then
total_tuples_count = 1
scan_iter = box.index.REQ
end

local plan = {
conditions = conditions,
space_name = space_name,
Expand Down
47 changes: 47 additions & 0 deletions test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ package.preload['customers-storage'] = function()
{path = 'name', is_nullable = false, type = 'string'},
},
}
local primary_index_id = {
name = 'id',
type = 'TREE',
unique = true,
parts = {
{path = 'id', is_nullable = false, type = 'unsigned'},
},
}
local bucket_id_index = {
name = 'bucket_id',
type = 'TREE',
Expand All @@ -45,15 +53,54 @@ package.preload['customers-storage'] = function()
{path = 'bucket_id', is_nullable = false, type = 'unsigned'},
}
}
local name_index = {
name = 'name',
type = 'TREE',
unique = true,
parts = {
{path = 'name', is_nullable = false, type = 'string'},
},
}
local secondary_index = {
name = 'secondary',
type = 'TREE',
unique = false,
parts = {
{path = 'id', is_nullable = false, type = 'unsigned'},
{path = 'name', is_nullable = false, type = 'string'},
},
}

local customers_name_key_schema = table.deepcopy(customers_schema)
customers_name_key_schema.sharding_key = {'name'}
table.insert(customers_name_key_schema.indexes, primary_index)
table.insert(customers_name_key_schema.indexes, bucket_id_index)

local customers_name_key_uniq_index_schema = table.deepcopy(customers_schema)
customers_name_key_uniq_index_schema.sharding_key = {'name'}
table.insert(customers_name_key_uniq_index_schema.indexes, primary_index)
table.insert(customers_name_key_uniq_index_schema.indexes, bucket_id_index)
table.insert(customers_name_key_uniq_index_schema.indexes, name_index)

local customers_name_key_non_uniq_index_schema = table.deepcopy(customers_schema)
customers_name_key_non_uniq_index_schema.sharding_key = {'name'}
name_index.unique = false
table.insert(customers_name_key_non_uniq_index_schema.indexes, primary_index)
table.insert(customers_name_key_non_uniq_index_schema.indexes, bucket_id_index)
table.insert(customers_name_key_non_uniq_index_schema.indexes, name_index)

local customers_secondary_idx_name_key_schema = table.deepcopy(customers_schema)
customers_secondary_idx_name_key_schema.sharding_key = {'name'}
table.insert(customers_secondary_idx_name_key_schema.indexes, primary_index_id)
table.insert(customers_secondary_idx_name_key_schema.indexes, secondary_index)
table.insert(customers_secondary_idx_name_key_schema.indexes, bucket_id_index)

local schema = {
spaces = {
customers_name_key = customers_name_key_schema,
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
}
}

Expand Down
128 changes: 128 additions & 0 deletions test/integration/ddl_sharding_key_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local crud = require('crud')
local t = require('luatest')

local helpers = require('test.helper')
local storage_stat = require('test.helpers.storage_stat')

local ok = pcall(require, 'ddl')
if not ok then
Expand Down Expand Up @@ -46,6 +47,9 @@ pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end)

pgroup.before_each(function(g)
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key')
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_uniq_index')
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index')
helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key')
end)

pgroup.test_insert_object = function(g)
Expand Down Expand Up @@ -246,3 +250,127 @@ pgroup.test_upsert = function(g)
local result = conn_s2.space['customers_name_key']:get({1, 'John'})
t.assert_equals(result, nil)
end

-- The main purpose of testcase is to verify that CRUD will calculate bucket_id
-- using secondary sharding key (name) correctly and will get tuple on storage
-- in replicaset s2.
-- bucket_id was calculated using function below:
-- function(key)
-- return require('vshard.hash').strcrc32(key) % 3000 + 1
-- end
-- where 3000 is a default number of buckets used in vshard.
pgroup.test_select = function(g)
-- bucket_id is 234, storage is s-2
local tuple = {8, 234, 'Ptolemy', 20}

-- Put tuple to s2 replicaset.
local conn_s2 = g.cluster:server('s2-master').net_box
local result = conn_s2.space['customers_name_key']:insert(tuple)
t.assert_not_equals(result, nil)

local conditions = {{'==', 'name', 'Ptolemy'}}
local result, err = g.cluster.main_server.net_box:call('crud.select', {
'customers_name_key', conditions,
})

t.assert_equals(err, nil)
t.assert_equals(#result.rows, 1)
t.assert_equals(result.rows[1], tuple)
end

-- TODO: After enabling support of sharding keys that are not equal to primary
-- keys, we should handle it differently: it is not enough to look just on scan
-- value, we should traverse all conditions. Now missed cases lead to
-- map-reduce. Will be resolved in #213.
pgroup.test_select_wont_lead_map_reduce = function(g)
local space_name = 'customers_name_key_uniq_index'

local conn_s1 = g.cluster:server('s1-master').net_box
local conn_s2 = g.cluster:server('s2-master').net_box

-- bucket_id is 477, storage is s-2
local result = conn_s2.space[space_name]:insert({1, 477, 'Viktor Pelevin', 58})
t.assert_not_equals(result, nil)
-- bucket_id is 401, storage is s-1
local result = conn_s1.space[space_name]:insert({2, 401, 'Isaac Asimov', 72})
t.assert_not_equals(result, nil)
-- bucket_id is 2804, storage is s-2
local result = conn_s2.space[space_name]:insert({3, 2804, 'Aleksandr Solzhenitsyn', 89})
t.assert_not_equals(result, nil)
-- bucket_id is 1161, storage is s-2
local result = conn_s2.space[space_name]:insert({4, 1161, 'James Joyce', 59})
t.assert_not_equals(result, nil)

local stat_a = storage_stat.collect(g.cluster)

-- Select a tuple with name 'Viktor Pelevin'.
local result, err = g.cluster.main_server.net_box:call('crud.select', {
space_name, {{'==', 'name', 'Viktor Pelevin'}}
})
t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 1)

local stat_b = storage_stat.collect(g.cluster)

-- Check a number of select() requests made by CRUD on cluster's storages
-- after calling select() on a router. Make sure only a single storage has
-- a single select() request. Otherwise we lead map-reduce.
t.assert_equals(storage_stat.diff(stat_b, stat_a), {
['s-1'] = {
select_requests = 0,
},
['s-2'] = {
select_requests = 1,
},
})
end

pgroup.test_select_secondary_idx = function(g)
local tuple = {2, box.NULL, 'Ivan', 20}

-- insert tuple
local result, err = g.cluster.main_server.net_box:call('crud.insert', {
'customers_secondary_idx_name_key', tuple
})

t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 1)

local conditions = {{'==', 'name', 'Ivan'}}

local result, err = g.cluster.main_server.net_box:call('crud.select', {
'customers_secondary_idx_name_key', conditions,
})

t.assert_equals(err, nil)
t.assert_equals(#result.rows, 1)
t.assert_equals(result.rows[1], {2, 1366, 'Ivan', 20})
end

pgroup.test_select_non_unique_index = function(g)
local space_name = 'customers_name_key_non_uniq_index'
local customers = helpers.insert_objects(g, space_name, {
{id = 1, name = 'Viktor Pelevin', age = 58},
{id = 2, name = 'Isaac Asimov', age = 72},
{id = 3, name = 'Aleksandr Solzhenitsyn', age = 89},
{id = 4, name = 'James Joyce', age = 59},
{id = 5, name = 'Oscar Wilde', age = 46},
-- First tuple with name 'Ivan Bunin'.
{id = 6, name = 'Ivan Bunin', age = 83},
{id = 7, name = 'Ivan Turgenev', age = 64},
{id = 8, name = 'Alexander Ostrovsky', age = 63},
{id = 9, name = 'Anton Chekhov', age = 44},
-- Second tuple with name 'Ivan Bunin'.
{id = 10, name = 'Ivan Bunin', age = 83},
})
t.assert_equals(#customers, 10)

local result, err = g.cluster.main_server.net_box:call('crud.select', {
space_name, {{'==', 'name', 'Ivan Bunin'}}
})
t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 2)
end
3 changes: 0 additions & 3 deletions test/unit/select_plan_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ g.test_is_scan_by_full_sharding_key_eq = function()

t.assert_equals(err, nil)

t.assert_equals(plan.total_tuples_count, 1)
t.assert_equals(plan.sharding_key, {15})

-- id is a part of scan index
Expand All @@ -173,7 +172,6 @@ g.test_is_scan_by_full_sharding_key_eq = function()

t.assert_equals(plan.index_id, 3) -- index name_id is used
t.assert_equals(plan.scan_value, {'Ivan', 11})
t.assert_equals(plan.total_tuples_count, 1)
t.assert_equals(plan.sharding_key, {11})

-- other index is first
Expand Down Expand Up @@ -221,7 +219,6 @@ g.test_is_scan_by_full_sharding_key_eq = function()

t.assert_equals(err, nil)

t.assert_equals(plan.total_tuples_count, 1)
t.assert_equals(plan.sharding_key, {1, 0})
end

Expand Down

0 comments on commit cab9555

Please sign in to comment.