Skip to content

Commit

Permalink
Support custom sharding key in select
Browse files Browse the repository at this point in the history
Part of #166
  • Loading branch information
ligurio committed Nov 18, 2021
1 parent bad6591 commit bb6e271
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 1 deletion.
6 changes: 6 additions & 0 deletions crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local common = require('crud.select.compat.common')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -50,12 +51,17 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("Space %q doesn't exist", space_name), true
end
local space_format = space:format()
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
if err ~= nil then
return nil, err
end

-- plan select
local plan, err = select_plan.new(space, conditions, {
first = opts.first,
after_tuple = opts.after,
field_names = opts.field_names,
sharding_key_as_index_obj = sharding_key_as_index_obj,
})

if err ~= nil then
Expand Down
6 changes: 6 additions & 0 deletions crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding_key')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -102,13 +103,18 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("Space %q doesn't exist", space_name), true
end
local space_format = space:format()
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
if err ~= nil then
return nil, err
end

-- plan select
local plan, err = select_plan.new(space, conditions, {
first = opts.first,
after_tuple = opts.after,
field_names = opts.field_names,
force_map_call = opts.force_map_call,
sharding_key_as_index_obj = sharding_key_as_index_obj,
})

if err ~= nil then
Expand Down
3 changes: 2 additions & 1 deletion crud/select/plan.lua
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ function select_plan.new(space, conditions, opts)
after_tuple = '?table|cdata',
field_names = '?table',
force_map_call = '?boolean',
sharding_key_as_index_obj = '?table',
})

conditions = conditions ~= nil and conditions or {}
Expand Down Expand Up @@ -226,7 +227,7 @@ function select_plan.new(space, conditions, opts)
end
end

local sharding_index = primary_index -- XXX: only sharding by primary key is supported
local sharding_index = opts.sharding_key_as_index_obj or primary_index

-- get sharding key value
local sharding_key
Expand Down
133 changes: 133 additions & 0 deletions test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#!/usr/bin/env tarantool

require('strict').on()
_G.is_initialized = function() return false end

local log = require('log')
local errors = require('errors')
local cartridge = require('cartridge')
local ddl = require('ddl')

package.preload['customers-storage'] = function()
return {
role_name = 'customers-storage',
init = function()
local engine = os.getenv('ENGINE') or 'memtx'
local customers_schema = {
engine = engine,
is_local = true,
temporary = false,
format = {
{name = 'id', is_nullable = false, type = 'unsigned'},
{name = 'bucket_id', is_nullable = false, type = 'unsigned'},
{name = 'name', is_nullable = false, type = 'string'},
{name = 'age', is_nullable = false, type = 'number'},
},
indexes = {
-- This table is intentionally blank.
},
}

local primary_index = {
name = 'id',
type = 'TREE',
unique = true,
parts = {
{path = 'id', is_nullable = false, type = 'unsigned'},
{path = 'name', is_nullable = false, type = 'string'},
},
}
local primary_index_id = {
name = 'id',
type = 'TREE',
unique = true,
parts = {
{path = 'id', is_nullable = false, type = 'unsigned'},
},
}
local bucket_id_index = {
name = 'bucket_id',
type = 'TREE',
unique = false,
parts = {
{path = 'bucket_id', is_nullable = false, type = 'unsigned'},
}
}
local name_index = {
name = 'name',
type = 'TREE',
unique = true,
parts = {
{path = 'name', is_nullable = false, type = 'string'},
},
}
local secondary_index = {
name = 'secondary',
type = 'TREE',
unique = false,
parts = {
{path = 'id', is_nullable = false, type = 'unsigned'},
{path = 'name', is_nullable = false, type = 'string'},
},
}

local customers_name_key_schema = table.deepcopy(customers_schema)
customers_name_key_schema.sharding_key = {'name'}
table.insert(customers_name_key_schema.indexes, primary_index)
table.insert(customers_name_key_schema.indexes, bucket_id_index)

local customers_name_key_uniq_index_schema = table.deepcopy(customers_schema)
customers_name_key_uniq_index_schema.sharding_key = {'name'}
table.insert(customers_name_key_uniq_index_schema.indexes, primary_index)
table.insert(customers_name_key_uniq_index_schema.indexes, bucket_id_index)
table.insert(customers_name_key_uniq_index_schema.indexes, name_index)

local customers_name_key_non_uniq_index_schema = table.deepcopy(customers_schema)
customers_name_key_non_uniq_index_schema.sharding_key = {'name'}
name_index.unique = false
table.insert(customers_name_key_non_uniq_index_schema.indexes, primary_index)
table.insert(customers_name_key_non_uniq_index_schema.indexes, bucket_id_index)
table.insert(customers_name_key_non_uniq_index_schema.indexes, name_index)

local customers_secondary_idx_name_key_schema = table.deepcopy(customers_schema)
customers_secondary_idx_name_key_schema.sharding_key = {'name'}
table.insert(customers_secondary_idx_name_key_schema.indexes, primary_index_id)
table.insert(customers_secondary_idx_name_key_schema.indexes, secondary_index)
table.insert(customers_secondary_idx_name_key_schema.indexes, bucket_id_index)

local schema = {
spaces = {
customers_name_key = customers_name_key_schema,
customers_name_key_uniq_index = customers_name_key_uniq_index_schema,
customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema,
customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema,
}
}

if not box.info.ro then
local ok, err = ddl.set_schema(schema)
if not ok then
error(err)
end
end
end,
}
end

local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, {
advertise_uri = 'localhost:3301',
http_port = 8081,
bucket_count = 3000,
roles = {
'customers-storage',
'cartridge.roles.crud-router',
'cartridge.roles.crud-storage',
},
})

if not ok then
log.error('%s', err)
os.exit(1)
end

_G.is_initialized = cartridge.is_healthy
181 changes: 181 additions & 0 deletions test/integration/ddl_sharding_key_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
local fio = require('fio')

local t = require('luatest')

local helpers = require('test.helper')

local ok = pcall(require, 'ddl')
if not ok then
t.skip('Lua module ddl is required to run test')
end

local pgroup = t.group('ddl_sharding_key', {
{engine = 'memtx'},
{engine = 'vinyl'},
})

pgroup.before_all(function(g)
g.cluster = helpers.Cluster:new({
datadir = fio.tempdir(),
server_command = helpers.entrypoint('srv_ddl'),
use_vshard = true,
replicasets = helpers.get_test_replicasets(),
env = {
['ENGINE'] = g.params.engine,
},
})
g.cluster:start()
local result, err = g.cluster.main_server.net_box:eval([[
local ddl = require('ddl')
local ok, err = ddl.get_schema()
return ok, err
]])
t.assert_equals(type(result), 'table')
t.assert_equals(err, nil)
end)

pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end)

pgroup.before_each(function(g)
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key')
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_uniq_index')
helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index')
helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key')
end)

pgroup.test_select = function(g)
local tuple = {2, box.NULL, 'Ivan', 20}

-- insert tuple
local result, err = g.cluster.main_server.net_box:call('crud.insert', {
'customers_name_key', tuple
})

t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 1)

local conditions = {{'==', 'id', 2}}

local result, err = g.cluster.main_server.net_box:call('crud.select', {
'customers_name_key', conditions,
})

t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 1)
end

-- Right now CRUD's plan for select doesn't support sharding key and it leads
-- to map reduce (select on all replicasets). To avoid map-reduce one need to
-- add a separate index by field name, used in select's condition. We plan to
-- fix this in scope of https://github.com/tarantool/crud/issues/213
pgroup.test_select_wont_lead_map_reduce = function(g)
local space_name = 'customers_name_key_uniq_index'
local customers = helpers.insert_objects(g, space_name, {
{id = 1, name = 'Viktor Pelevin', age = 58},
{id = 2, name = 'Isaac Asimov', age = 72},
{id = 3, name = 'Aleksandr Solzhenitsyn', age = 89},
{id = 4, name = 'James Joyce', age = 59},
{id = 5, name = 'Oscar Wilde', age = 46},
{id = 6, name = 'Ivan Bunin', age = 83},
{id = 7, name = 'Ivan Turgenev', age = 64},
{id = 8, name = 'Alexander Ostrovsky', age = 63},
{id = 9, name = 'Anton Chekhov', age = 44},
})
t.assert_equals(#customers, 9)

-- Disable vshard's rebalancer and account current statistics of SELECT
-- calls on storages before calling CRUD select. Rebalancer may screw up
-- statistics of SELECT calls, so we will disable it.
local servers = g.cluster.servers
local select_total_counter_before = 0
for n, _ in ipairs(servers) do
local c = g.cluster.servers[n].net_box:eval([[
local vshard = require('vshard')
vshard.storage.rebalancer_disable()
assert(vshard.storage.sync(2) == true)
assert(vshard.storage.rebalancing_is_in_progress() == false)
return box.stat().SELECT.total
]])
select_total_counter_before = select_total_counter_before + c
end

-- Make a CRUD's SELECT.
local result, err = g.cluster.main_server.net_box:call('crud.select', {
space_name, {{'==', 'name', 'Anton Chekhov'}}
})
t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 1)

-- Enable vshard's rebalancer and account current statistics of SELECT
-- calls on storages after calling CRUD select.
local select_total_counter_after = 0
for n, _ in ipairs(servers) do
local c = g.cluster.servers[n].net_box:eval([[
local vshard = require('vshard')
local stat = box.stat().SELECT.total
vshard.storage.rebalancer_enable()
return stat
]])
select_total_counter_after = select_total_counter_after + c
end

-- Compare total counters of SELECT calls on cluster's storages before and
-- after calling SELECT on router. Make sure no more than 1 storage changed
-- SELECT counter. Otherwise we lead map reduce.
local diff = select_total_counter_after - select_total_counter_before
t.assert_le(diff, 4)
t.assert_ge(diff, 2)
end

pgroup.test_select_secondary_idx = function(g)
local tuple = {2, box.NULL, 'Ivan', 20}

-- insert tuple
local result, err = g.cluster.main_server.net_box:call('crud.insert', {
'customers_secondary_idx_name_key', tuple
})

t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 1)

local conditions = {{'==', 'name', 'Ivan'}}

local result, err = g.cluster.main_server.net_box:call('crud.select', {
'customers_secondary_idx_name_key', conditions,
})

t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 1)
end

pgroup.test_non_unique_index = function(g)
local space_name = 'customers_name_key_non_uniq_index'
local customers = helpers.insert_objects(g, space_name, {
{id = 1, name = 'Viktor Pelevin', age = 58},
{id = 2, name = 'Isaac Asimov', age = 72},
{id = 3, name = 'Aleksandr Solzhenitsyn', age = 89},
{id = 4, name = 'James Joyce', age = 59},
{id = 5, name = 'Oscar Wilde', age = 46},
{id = 6, name = 'Ivan Bunin', age = 83},
{id = 7, name = 'Ivan Turgenev', age = 64},
{id = 8, name = 'Alexander Ostrovsky', age = 63},
{id = 9, name = 'Anton Chekhov', age = 44},
{id = 10, name = 'Ivan Bunin', age = 83},
})
t.assert_equals(#customers, 10)

local result, err = g.cluster.main_server.net_box:call('crud.select', {
space_name, {{'==', 'name', 'Ivan Bunin'}}
})
t.assert_equals(err, nil)
t.assert_not_equals(result, nil)
t.assert_equals(#result.rows, 2)
end

0 comments on commit bb6e271

Please sign in to comment.