-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
8 changed files
with
3,368 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,285 @@ | ||
local fiber = require('fiber') | ||
local checks = require('checks') | ||
local errors = require('errors') | ||
local tarantool = require('tarantool') | ||
|
||
|
||
local stash = require('crud.common.stash') | ||
local utils = require('crud.common.utils') | ||
local sharding = require('crud.common.sharding') | ||
local select_executor = require('crud.select.executor') | ||
local select_filters = require('crud.compare.filters') | ||
local dev_checks = require('crud.common.dev_checks') | ||
local schema = require('crud.common.schema') | ||
local select = require('crud.select.compat.select') | ||
|
||
local ReadviewError = errors.new_class('ReadviewError', {capture_stack = false}) | ||
|
||
local readview = {} | ||
|
||
|
||
local function readview_open_on_storage(readview_name) | ||
if not utils.tarantool_version_at_least(2, 11, 0) or | ||
tarantool.package ~= 'Tarantool Enterprise' then | ||
ReadviewError:new("Tarantool does'nt support readview") | ||
return nil | ||
end | ||
local read_view = box.read_view.open({name = readview_name}) | ||
|
||
if read_view == nil then | ||
return ReadviewError:new("Error creating readview") | ||
end | ||
|
||
local result = {} | ||
result.uuid = box.info().uuid | ||
result.id = read_view.id | ||
|
||
return result | ||
end | ||
|
||
local function readview_close_on_storage(readview_uuid) | ||
dev_checks('table') | ||
|
||
local list = box.read_view.list() | ||
local readview_id | ||
for _, replica_info in pairs(readview_uuid) do | ||
if replica_info.uuid == box.info().uuid then | ||
readview_id = replica_info.id | ||
end | ||
end | ||
|
||
if readview_id == nil then | ||
return nil | ||
end | ||
|
||
for k,v in pairs(list) do | ||
if v.id == readview_id then | ||
list[k]:close() | ||
return list[k]:info() | ||
end | ||
end | ||
|
||
return nil | ||
end | ||
|
||
local function select_readview_on_storage(space_name, index_id, conditions, opts, readview_id) | ||
dev_checks('string', 'number', '?table', { | ||
scan_value = 'table', | ||
after_tuple = '?table', | ||
tarantool_iter = 'number', | ||
limit = 'number', | ||
scan_condition_num = '?number', | ||
field_names = '?table', | ||
sharding_key_hash = '?number', | ||
sharding_func_hash = '?number', | ||
skip_sharding_hash_check = '?boolean', | ||
yield_every = '?number', | ||
fetch_latest_metadata = '?boolean', | ||
}, 'number') | ||
|
||
local cursor = {} | ||
if opts.fetch_latest_metadata then | ||
local replica_schema_version | ||
if box.info.schema_version ~= nil then | ||
replica_schema_version = box.info.schema_version | ||
else | ||
replica_schema_version = box.internal.schema_version() | ||
end | ||
cursor.storage_info = { | ||
replica_uuid = box.info().uuid, | ||
replica_schema_version = replica_schema_version, | ||
} | ||
end | ||
|
||
local list = box.read_view.list() | ||
local space | ||
|
||
for k,v in pairs(list) do | ||
if v.id == readview_id then | ||
space = list[k].space[space_name] | ||
end | ||
end | ||
|
||
if space == nil then | ||
return cursor, ReadviewError:new("Space %q doesn't exist", space_name) | ||
end | ||
|
||
space.format = box.space[space_name]:format() | ||
local space_format = box.space[space_name] | ||
if space_format == nil then | ||
return cursor, ReadviewError:new("Space %q doesn't exist", space_name) | ||
end | ||
|
||
local index = space.index[index_id] | ||
local index_format = space_format.index[index_id] | ||
if index == nil then | ||
return cursor, ReadviewError:new("Index with ID %s doesn't exist", index_id) | ||
end | ||
|
||
local _, err = sharding.check_sharding_hash(space_name, | ||
opts.sharding_func_hash, | ||
opts.sharding_key_hash, | ||
opts.skip_sharding_hash_check) | ||
|
||
if err ~= nil then | ||
return nil, err | ||
end | ||
|
||
local filter_func, err = select_filters.gen_func(space_format, conditions, { | ||
tarantool_iter = opts.tarantool_iter, | ||
scan_condition_num = opts.scan_condition_num, | ||
}) | ||
if err ~= nil then | ||
return cursor, ReadviewError:new("Failed to generate tuples filter: %s", err) | ||
end | ||
|
||
-- execute select | ||
local resp, err = select_executor.execute_readview(space_format, index, index_format, filter_func, { | ||
scan_value = opts.scan_value, | ||
after_tuple = opts.after_tuple, | ||
tarantool_iter = opts.tarantool_iter, | ||
limit = opts.limit, | ||
yield_every = opts.yield_every, | ||
}) | ||
if err ~= nil then | ||
return cursor, ReadviewError:new("Failed to execute select: %s", err) | ||
end | ||
|
||
if resp.tuples_fetched < opts.limit or opts.limit == 0 then | ||
cursor.is_end = true | ||
else | ||
local last_tuple = resp.tuples[#resp.tuples] | ||
cursor.after_tuple = last_tuple | ||
end | ||
|
||
cursor.stats = { | ||
tuples_lookup = resp.tuples_lookup, | ||
tuples_fetched = resp.tuples_fetched, | ||
} | ||
|
||
-- getting tuples with user defined fields (if `fields` option is specified) | ||
-- and fields that are needed for comparison on router (primary key + scan key) | ||
local filtered_tuples = schema.filter_tuples_fields(resp.tuples, opts.field_names) | ||
|
||
local result = {cursor, filtered_tuples} | ||
|
||
local select_module_compat_info = stash.get(stash.name.select_module_compat_info) | ||
if not select_module_compat_info.has_merger then | ||
if opts.fetch_latest_metadata then | ||
result[3] = cursor.storage_info.replica_schema_version | ||
end | ||
end | ||
|
||
return unpack(result) | ||
end | ||
|
||
local Readview_obj = {} | ||
Readview_obj.__index = Readview_obj | ||
|
||
function Readview_obj:select(space_name, user_conditions, opts) | ||
opts = opts or {} | ||
opts.readview = true | ||
opts.readview_uuid = self._uuid | ||
|
||
return select.call(space_name, user_conditions, opts) | ||
end | ||
|
||
|
||
function Readview_obj:pairs(space_name, user_conditions, opts) | ||
opts = opts or {} | ||
opts.readview = true | ||
opts.readview_uuid = self._uuid | ||
|
||
return select.pairs(space_name, user_conditions, opts) | ||
end | ||
|
||
function readview.init() | ||
_G._crud.readview_open_on_storage = readview_open_on_storage | ||
_G._crud.readview_close_on_storage = readview_close_on_storage | ||
_G._crud.select_readview_on_storage = select_readview_on_storage | ||
end | ||
|
||
function Readview_obj:close() | ||
if self._opened == false then | ||
return | ||
end | ||
|
||
local opts = {} | ||
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router) | ||
if err ~= nil then | ||
return ReadviewError:new(err) | ||
end | ||
|
||
local replicasets, err = vshard_router:routeall() | ||
if err ~= nil then | ||
return ReadviewError:new(err) | ||
end | ||
|
||
local results = {} | ||
local errors = {} | ||
for _, replicaset in pairs(replicasets) do | ||
for replica_uuid, replica in pairs(replicaset.replicas) do | ||
for _, value in pairs(self._uuid) do | ||
if replica_uuid == value.uuid then | ||
local replica_result, replica_err = replica.conn:call('_crud.readview_close_on_storage', | ||
{self._uuid}, nil) | ||
table.insert(results, replica_result) | ||
if replica_err ~= nil then | ||
table.insert(errors, ReadviewError:new("Failed to close Readview on storage: %s", replica_err)) | ||
end | ||
end | ||
end | ||
end | ||
end | ||
|
||
if next(errors) ~= nil then | ||
return errors | ||
end | ||
|
||
self._opened = false | ||
return nil | ||
|
||
end | ||
|
||
function Readview_obj:__gc() | ||
fiber.new(self.close, self) | ||
end | ||
|
||
function Readview_obj.create(name, vshard_router, opts) | ||
local readview = {} | ||
setmetatable(readview, Readview_obj) | ||
readview._name = name | ||
local results, err = vshard_router:map_callrw('_crud.readview_open_on_storage', {readview._name}, opts) | ||
|
||
if err ~= nil then | ||
return nil, ReadviewError:new("Failed to call readview_open_on_storage on storage-side: %s", err) | ||
end | ||
|
||
local uuid = {} | ||
for _, replicaset_results in pairs(results) do | ||
for _, replica_result in pairs(replicaset_results) do | ||
table.insert(uuid, replica_result) | ||
end | ||
end | ||
|
||
readview._uuid = uuid | ||
readview._opened = true | ||
|
||
return readview, nil | ||
end | ||
|
||
function readview.new(readview_name) | ||
checks('?string') | ||
local opts = {} | ||
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router) | ||
if err ~= nil then | ||
return nil, ReadviewError:new(err) | ||
end | ||
|
||
local readview_obj, err = Readview_obj.create(readview_name, vshard_router, opts) | ||
|
||
return readview_obj, err | ||
end | ||
|
||
|
||
return readview |
Oops, something went wrong.