-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added readview support for select and pairs. Closes #343
- Loading branch information
1 parent
2d3d479
commit 667658b
Showing
7 changed files
with
3,536 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,301 @@ | ||
local fiber = require('fiber') | ||
local checks = require('checks') | ||
local errors = require('errors') | ||
local tarantool = require('tarantool') | ||
|
||
|
||
local stash = require('crud.common.stash') | ||
local utils = require('crud.common.utils') | ||
local sharding = require('crud.common.sharding') | ||
local select_executor = require('crud.select.executor') | ||
local select_filters = require('crud.compare.filters') | ||
local dev_checks = require('crud.common.dev_checks') | ||
local schema = require('crud.common.schema') | ||
local select = require('crud.select.compat.select') | ||
local stats = require('crud.stats') | ||
|
||
local ReadviewError = errors.new_class('ReadviewError', {capture_stack = false}) | ||
|
||
local readview = {} | ||
|
||
|
||
local function readview_open_on_storage(readview_name) | ||
local result = {} | ||
if not utils.tarantool_version_at_least(2, 11, 0) or | ||
tarantool.package ~= 'Tarantool Enterprise' then | ||
result.replica_info = nil | ||
result.err = ReadviewError:new("Tarantool does not support readview") | ||
return result | ||
end | ||
local read_view = box.read_view.open({name = readview_name}) | ||
|
||
if read_view == nil then | ||
result.replica_info = nil | ||
result.err = ReadviewError:new("Error creating readview") | ||
return result | ||
end | ||
|
||
local replica_info = {} | ||
replica_info.uuid = box.info().uuid | ||
replica_info.id = read_view.id | ||
result.replica_info = replica_info | ||
|
||
return result | ||
end | ||
|
||
local function readview_close_on_storage(readview_uuid) | ||
dev_checks('table') | ||
|
||
local list = box.read_view.list() | ||
local readview_id | ||
for _, replica_info in pairs(readview_uuid) do | ||
if replica_info.uuid == box.info().uuid then | ||
readview_id = replica_info.id | ||
end | ||
end | ||
|
||
for k,v in pairs(list) do | ||
if v.id == readview_id then | ||
list[k]:close() | ||
end | ||
end | ||
|
||
return nil | ||
end | ||
|
||
local function select_readview_on_storage(space_name, index_id, conditions, opts, readview_id) | ||
dev_checks('string', 'number', '?table', { | ||
scan_value = 'table', | ||
after_tuple = '?table', | ||
tarantool_iter = 'number', | ||
limit = 'number', | ||
scan_condition_num = '?number', | ||
field_names = '?table', | ||
sharding_key_hash = '?number', | ||
sharding_func_hash = '?number', | ||
skip_sharding_hash_check = '?boolean', | ||
yield_every = '?number', | ||
fetch_latest_metadata = '?boolean', | ||
}, 'number') | ||
|
||
local cursor = {} | ||
if opts.fetch_latest_metadata then | ||
local replica_schema_version | ||
if box.info.schema_version ~= nil then | ||
replica_schema_version = box.info.schema_version | ||
else | ||
replica_schema_version = box.internal.schema_version() | ||
end | ||
cursor.storage_info = { | ||
replica_uuid = box.info().uuid, | ||
replica_schema_version = replica_schema_version, | ||
} | ||
end | ||
|
||
local list = box.read_view.list() | ||
local space | ||
|
||
for k,v in pairs(list) do | ||
if v.id == readview_id then | ||
space = list[k].space[space_name] | ||
end | ||
end | ||
|
||
if space == nil then | ||
return cursor, ReadviewError:new("Space %q doesn't exist", space_name) | ||
end | ||
|
||
space.format = box.space[space_name]:format() | ||
local space_format = box.space[space_name] | ||
if space_format == nil then | ||
return cursor, ReadviewError:new("Space %q doesn't exist", space_name) | ||
end | ||
|
||
local index = space.index[index_id] | ||
local index_format = space_format.index[index_id] | ||
if index == nil then | ||
return cursor, ReadviewError:new("Index with ID %s doesn't exist", index_id) | ||
end | ||
|
||
local _, err = sharding.check_sharding_hash(space_name, | ||
opts.sharding_func_hash, | ||
opts.sharding_key_hash, | ||
opts.skip_sharding_hash_check) | ||
|
||
if err ~= nil then | ||
return nil, err | ||
end | ||
|
||
local filter_func, err = select_filters.gen_func(space_format, conditions, { | ||
tarantool_iter = opts.tarantool_iter, | ||
scan_condition_num = opts.scan_condition_num, | ||
}) | ||
if err ~= nil then | ||
return cursor, ReadviewError:new("Failed to generate tuples filter: %s", err) | ||
end | ||
|
||
-- execute select | ||
local resp, err = select_executor.execute(space_format, index_format, filter_func, { | ||
scan_value = opts.scan_value, | ||
after_tuple = opts.after_tuple, | ||
tarantool_iter = opts.tarantool_iter, | ||
limit = opts.limit, | ||
yield_every = opts.yield_every, | ||
readview = true, | ||
readview_index = index | ||
}) | ||
if err ~= nil then | ||
return cursor, ReadviewError:new("Failed to execute select: %s", err) | ||
end | ||
|
||
if resp.tuples_fetched < opts.limit or opts.limit == 0 then | ||
cursor.is_end = true | ||
else | ||
local last_tuple = resp.tuples[#resp.tuples] | ||
cursor.after_tuple = last_tuple | ||
end | ||
|
||
cursor.stats = { | ||
tuples_lookup = resp.tuples_lookup, | ||
tuples_fetched = resp.tuples_fetched, | ||
} | ||
|
||
-- getting tuples with user defined fields (if `fields` option is specified) | ||
-- and fields that are needed for comparison on router (primary key + scan key) | ||
local filtered_tuples = schema.filter_tuples_fields(resp.tuples, opts.field_names) | ||
|
||
local result = {cursor, filtered_tuples} | ||
|
||
local select_module_compat_info = stash.get(stash.name.select_module_compat_info) | ||
if not select_module_compat_info.has_merger then | ||
if opts.fetch_latest_metadata then | ||
result[3] = cursor.storage_info.replica_schema_version | ||
end | ||
end | ||
|
||
return unpack(result) | ||
end | ||
|
||
local Readview_obj = {} | ||
Readview_obj.__index = Readview_obj | ||
|
||
local select_call = stats.wrap(select.call, stats.op.SELECT) | ||
|
||
function Readview_obj:select(space_name, user_conditions, opts) | ||
opts = opts or {} | ||
opts.readview = true | ||
opts.readview_uuid = self._uuid | ||
|
||
return select_call(space_name, user_conditions, opts) | ||
end | ||
|
||
local pairs_call = stats.wrap(select.pairs, stats.op.SELECT, {pairs = true}) | ||
function Readview_obj:pairs(space_name, user_conditions, opts) | ||
opts = opts or {} | ||
opts.readview = true | ||
opts.readview_uuid = self._uuid | ||
|
||
return pairs_call(space_name, user_conditions, opts) | ||
end | ||
|
||
function readview.init() | ||
_G._crud.readview_open_on_storage = readview_open_on_storage | ||
_G._crud.readview_close_on_storage = readview_close_on_storage | ||
_G._crud.select_readview_on_storage = select_readview_on_storage | ||
end | ||
|
||
function Readview_obj:close(opts) | ||
local opts = opts or {} | ||
if self._opened == false then | ||
return | ||
end | ||
|
||
local opts = {} | ||
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router) | ||
if err ~= nil then | ||
return ReadviewError:new(err) | ||
end | ||
|
||
local replicasets, err = vshard_router:routeall() | ||
if err ~= nil then | ||
return ReadviewError:new(err) | ||
end | ||
if opts.timeout == nil then | ||
opts.timeout = 3 | ||
end | ||
|
||
local results = {} | ||
local errors = {} | ||
for _, replicaset in pairs(replicasets) do | ||
for replica_uuid, replica in pairs(replicaset.replicas) do | ||
for _, value in pairs(self._uuid) do | ||
if replica_uuid == value.uuid then | ||
local replica_result, replica_err = replica.conn:call('_crud.readview_close_on_storage', | ||
{self._uuid}, {timeout = opts.timeout}) | ||
table.insert(results, replica_result) | ||
if replica_err ~= nil then | ||
table.insert(errors, ReadviewError:new("Failed to close Readview on storage: %s", replica_err)) | ||
end | ||
end | ||
end | ||
end | ||
end | ||
|
||
if next(errors) ~= nil then | ||
return errors | ||
end | ||
|
||
self._opened = false | ||
return nil | ||
|
||
end | ||
|
||
function Readview_obj:__gc() | ||
fiber.new(self.close, self) | ||
end | ||
|
||
function Readview_obj.create(name, vshard_router, opts) | ||
local readview = {} | ||
setmetatable(readview, Readview_obj) | ||
readview._name = name | ||
local results, err = vshard_router:map_callrw('_crud.readview_open_on_storage', {readview._name}, opts) | ||
|
||
if err ~= nil then | ||
return nil, ReadviewError:new("Failed to call readview_open_on_storage on storage-side: %s", err) | ||
end | ||
|
||
local uuid = {} | ||
local errors = {} | ||
for _, replicaset_results in pairs(results) do | ||
for _, replica_result in pairs(replicaset_results) do | ||
if replica_result.err ~= nil then | ||
table.insert(errors, replica_result.err) | ||
end | ||
table.insert(uuid, replica_result.replica_info) | ||
end | ||
end | ||
|
||
readview._uuid = uuid | ||
readview._opened = true | ||
|
||
if next(errors) ~= nil then | ||
return nil, errors | ||
end | ||
return readview, nil | ||
end | ||
|
||
function readview.new(readview_name) | ||
checks('?string') | ||
local opts = {} | ||
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router) | ||
if err ~= nil then | ||
return nil, ReadviewError:new(err) | ||
end | ||
|
||
local readview_obj, err = Readview_obj.create(readview_name, vshard_router, opts) | ||
|
||
return readview_obj, err | ||
end | ||
|
||
|
||
return readview |
Oops, something went wrong.